关键词:
1实现接口->WritableCompareable
排序操作在hadoop中属于默认的行为。默认按照字典殊勋排序。
2 排序的分类:
1)部分排序
2)全排序
3)辅助排序
4)二次排序
3 案例: 在流量汇总输出文件里的数据 进行分区,每个分区中的数据进行排序
数据预览,这里只是进行了流量的汇总,没有进行分区和排序
1:编写flowbean
实现WritableCompareable完成序列化并且完成排序
package it.dawn.YARNPra.基本用法.排序; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /** * @author Dawn * @date 2019年5月7日09:04:04 * @version 1.0 * 直接继承 WritableComparable, */ public class FlowBean implements WritableComparable<FlowBean> private long upFlow; private long dfFlow; private long flowSum; //无参构造 public FlowBean() //有参构造 public FlowBean(long upFlow,long dfFlow) this.upFlow=upFlow; this.dfFlow=dfFlow; this.flowSum=upFlow+dfFlow; public long getUpFlow() return upFlow; public void setUpFlow(long upFlow) this.upFlow = upFlow; public long getDfFlow() return dfFlow; public void setDfFlow(long dfFlow) this.dfFlow = dfFlow; public long getFlowSum() return flowSum; public void setFlowSum(long flowSum) this.flowSum = flowSum; @Override public void write(DataOutput out) throws IOException // TODO Auto-generated method stub out.writeLong(upFlow); out.writeLong(dfFlow); out.writeLong(flowSum); @Override public void readFields(DataInput in) throws IOException // TODO Auto-generated method stub upFlow=in.readLong(); dfFlow=in.readLong(); flowSum=in.readLong(); @Override public String toString() return upFlow+"\\t"+dfFlow+"\\t"+flowSum; //排序 @Override public int compareTo(FlowBean o) //倒序 return this.flowSum>o.getFlowSum()? -1 : 1;
2:编写Mapper类
package it.dawn.YARNPra.基本用法.排序; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * @author Dawn * @date 2019年5月7日09:24:06 * @version 1.0 * * 输入? * 13480253104 120 1320 1440 * 输出? * <key2 , v2> * <流量上行+\\t+流量下行,手机号码> */ public class FlowSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException //1:读数据 String line=value.toString(); //2:切割 String[] fields=line.split("\\t"); //3:取出指定字段 long upFlow=Long.parseLong(fields[1]); long dfFlow=Long.parseLong(fields[2]); //4:输出到reduce阶段 context.write(new FlowBean(upFlow, dfFlow), new Text(fields[0]));
3:编写Reducer类
package it.dawn.YARNPra.基本用法.排序; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class FlowSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> @Override protected void reduce(FlowBean k3, Iterable<Text> v3, Context context) throws IOException, InterruptedException //直接输出 context.write(v3.iterator().next(), k3);
4:编写Partitioner类
package it.dawn.YARNPra.基本用法.排序; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class FlowSortPartitioner extends Partitioner<FlowBean, Text> @Override public int getPartition(FlowBean key, Text value, int numPartitions) //1: 获取手机前3个数字 String phoneThree=value.toString().substring(0, 3); //2:定义分区号 int partitioner=4; if("135".equals(phoneThree)) return 0; else if("137".equals(phoneThree)) return 1; else if("138".equals(phoneThree)) return 2; else if("139".equals(phoneThree)) return 3; return partitioner;
5:编写driver类
package it.dawn.YARNPra.基本用法.排序; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * @author Dawn * @date 2019年5月7日09:22:12 * @version 1.0 * 需求? * 将数据进行分区,并在每个分区中进行排序 */ public class FlowSortDriver public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException //1:添加配置 Configuration conf=new Configuration(); Job job=Job.getInstance(conf); //2:设置主类 job.setJarByClass(FlowSortDriver.class); //3:设置Mapper和Reduce类 job.setMapperClass(FlowSortMapper.class); job.setReducerClass(FlowSortReducer.class); //4:设置Map输出类 job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); //5:设置Reduce输出类 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //添加自定义分区 job.setPartitionerClass(FlowSortPartitioner.class); job.setNumReduceTasks(5); //6:设置输入输出路径 FileInputFormat.setInputPaths(job, new Path("f:/temp/流量统计结果/out1/part-r-00000")); FileOutputFormat.setOutputPath(job, new Path("f:/temp/流量统计结果/out2")); //7提交任务 boolean rs=job.waitForCompletion(true); System.out.println(rs ? "success" : "fail");
查看最终的输出结果:
flink内核原理学习任务提交流程(代码片段)
...体组件解释二、Flink任务提交流程(yarn-session模式)Java、大数据开发学习要点(持续更新中…) 首先要知道的是,Flink作业提交是由PipelineExecutor(流水线 查看详情
(2021-03-26)大数据学习之flink安装部署以及任务提交/调度详解(代码片段)
...r与Slots3.3.1作用与关系3.3.2任务执行资源合理占用4.程序与数据流4.1数据传输形式4.2任务链(OperatorChains)1.F 查看详情
hive学习之排序(代码片段)
...nbsp;二、每个MapReduce内部排序(sortby)Sortby:对于大规模的数据集来说Orderby的效率特别低。很多情况下,不需要全局排序,此时可以使用sort不要。Sortby是为每个reducer产生一个排序文件,对每个reducer内部进行排序,对全局结果不... 查看详情
大数据学习之mapreduce编程案例一单词计数10
一:单词计数1:单词计数总流程图 2:代码实现1:Map阶段packageit.dawn.YARNPra.wc_hdfs;importjava.io.IOException;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Te 查看详情
(2021-03-11)大数据学习之flink基础概念以及简单demo(代码片段)
Flink的简介1.什么是Flink1.1有边界数据流和无边界数据流1.2有状态计算2.Flink的起源3.为什么要用Flink4.什么是流处理和批处理5.Flink的应用场景6.Flink的简单Demo6.1批处理6.2流处理由于公司要求下周可能会要分享一下Flink。所以这周抽了... 查看详情
大数据学习之hdfs基本命令操作05
1)hdfs的客户端1.网页形式->测试用http://192.168.40.11:50070/dfshealth.html#tab-overview2.命令行形式->测试用3.企业级API 2)查看帮助 hdfsdfs-help 3)查看当前目录信息 hdfsdfs-ls/ 3)上传文件hdfsdfs-put/本地路径/hdfs路径& 查看详情
大数据学习之linux进阶02
大数据学习之Linux进阶1->配置IP 1)修改配置文件 vi/sysconfig/network-scripts/ifcfg-eno16777736 2)注释掉dhcp #BOOTPROTO="dhcp" 3)添加配置(windows->ipconfig-all) IPADDR=192.168.50.179 NETMASK=225.255.255.0 GATEWAY=192. 查看详情
elasticsearch聚合学习之四:结果排序(代码片段)
...聚合的结果以桶(bucket)为单位,放在JSON数组中返回,这些数据是没有排序的,今天来学习如何给这些数据进行排序;系列文章列表《Elasticsearch聚合学习之一:基本操作》;《Elasticsearch聚合学习之二:区间聚合》;《Elasticsearch聚... 查看详情
大数据入门学习之hadoop技术优缺点
大数据入门学习之Hadoop技术优缺点(1)Hadoop具有按位存储和处理数据能力的高可靠性。(2)Hadoop通过可用的计算机集群分配数据,完成存储和计算任务,这些集群可以方便地扩展到数以千计的节点中,具有高扩展性。(3)Hadoop能够在节... 查看详情
大数据学习之kafka消息队列31
一:Kafka概述离线部分:Hadoop->离线计算(hdfs/mapreduce)yarnzookeeper->分布式协调(动物管理员)hive->数据仓库(离线计算/sql)easycodingflume->数据采集sqoop->数据迁移mysql->hdfs/hivehdfs/hive->mysqlAzkaban->任务调度工具hbase-&g 查看详情
大数据学习之sqoop框架25
...apache.org/ 2)场景传统型缺点,分布式存储。把传统型数据库数据迁移。ApacheSqoop(TM)是一种用于在ApacheHadoop和结构化数据存储(如关系数据库)之间高效传输批量数据的工具。 2:Sqoop安装部署1)下载安装包2)解压tar-zx... 查看详情
大数据学习之hdfs的工作机制07
1:namenode+secondaryNameNode工作机制2:datanode工作机制 3:HDFS中的通信(代理对象RPC)下面用代码来实现基本的原理1:服务端代码packageit.dawn.HDFSPra.RPC.server;importjava.io.IOException;importorg.apache.hadoop.HadoopIllegalArgumentExce 查看详情
大数据hadoop学习之搭建hadoop平台(2.1)
关于大数据,一看就懂,一懂就懵。 一、简介 Hadoop的平台搭建,设置为三种搭建方式,第一种是“单节点安装”,这种安装方式最为简单,但是并没有展示出Hadoop的技术优势,适合初学者快速搭建;第二种是“... 查看详情
spark学习之作业优化
...,我们学习了spark的语法、资源调度、sql语法优化和数据倾斜的技巧,今天我们来学习spark中的作业优化,也就是job优化。对往期内容感兴趣的同学可以参考👇:链接:spark学习之处理数据倾斜.链接:spark学习之sparksql... 查看详情
算法学习之排序算法(高速排序)
...思想设要排序的数组是A[0]……A[N-1],首先随意选取一个数据(通常选用数组的第一个数)作为重要数据,然后将全部比它小的数都放到它前面。全部比它大的数都放到它后面。这个过程称为一趟高速排序。值得注意的是,高速... 查看详情
大数据spark“蘑菇云”行动代码学习之adclickedstreamingstats模块分析
2016年大数据Spark“蘑菇云”行动代码学习之AdClickedStreamingStats模块分析 系统背景:用户使用终端设备(IPAD、手机、浏览器)等登录系统,系统采用js脚本发送用户信息和广告点击信息到后台日志,进入flume监控,通过kafka... 查看详情
大数据学习之hadoop生态圈(代码片段)
...6、Hadoop的优点及缺点7、Hadoop组成前言上篇文章讲述了大数据的发展及历程,这篇文章就带大家进入大数据的技术应用,以下文章观点或描述如有错误,请指正!!1、什么是hadoop广义 查看详情
大数据学习之storm实时统计网站访问量案例35
案例一:统计网站访问量(实时统计) 实时流式计算框架:storm 1)spout数据源,接入数据源本地文件如下编写spout程序:packagepvcount;importjava.io.BufferedReader;importjava.io.FileInputStream;importjava.io.FileNotFoundException;importjava.io.I 查看详情