大数据学习之提交job流程,排序11

hidamowang hidamowang     2022-12-07     442

关键词:

1实现接口->WritableCompareable

 

排序操作在hadoop中属于默认的行为。默认按照字典殊勋排序。

 

排序的分类:

 

1)部分排序

2)全排序

3)辅助排序

4)二次排序

 

案例: 在流量汇总输出文件里的数据  进行分区,每个分区中的数据进行排序

 

数据预览,这里只是进行了流量的汇总,没有进行分区和排序

技术图片

 

1:编写flowbean

实现WritableCompareable完成序列化并且完成排序

package it.dawn.YARNPra.基本用法.排序;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/**
 * @author Dawn
 * @date 2019年5月7日09:04:04
 * @version 1.0
 * 直接继承 WritableComparable,
 */
public class FlowBean implements WritableComparable<FlowBean>
	
	private long upFlow;
	private long dfFlow;
	private long flowSum;
	
	//无参构造
	public FlowBean() 
	//有参构造
	public FlowBean(long upFlow,long dfFlow) 
		this.upFlow=upFlow;
		this.dfFlow=dfFlow;
		this.flowSum=upFlow+dfFlow;
	

	public long getUpFlow() 
		return upFlow;
	

	public void setUpFlow(long upFlow) 
		this.upFlow = upFlow;
	

	public long getDfFlow() 
		return dfFlow;
	

	public void setDfFlow(long dfFlow) 
		this.dfFlow = dfFlow;
	

	public long getFlowSum() 
		return flowSum;
	

	public void setFlowSum(long flowSum) 
		this.flowSum = flowSum;
	

	@Override
	public void write(DataOutput out) throws IOException 
		// TODO Auto-generated method stub
		out.writeLong(upFlow);
		out.writeLong(dfFlow);
		out.writeLong(flowSum);
	

	@Override
	public void readFields(DataInput in) throws IOException 
		// TODO Auto-generated method stub
		upFlow=in.readLong();
		dfFlow=in.readLong();
		flowSum=in.readLong();
	

	
	
	@Override
	public String toString() 
		return upFlow+"\\t"+dfFlow+"\\t"+flowSum;
	
	
	//排序
	@Override
	public int compareTo(FlowBean o) 
		//倒序
		return this.flowSum>o.getFlowSum()? -1 : 1;
	


 

2:编写Mapper

 

package it.dawn.YARNPra.基本用法.排序;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * @author Dawn
 * @date 2019年5月7日09:24:06
 * @version 1.0
 * 
 * 输入? 
 * 13480253104	120	1320	1440
 * 输出?
 * <key2                ,   v2>
 * <流量上行+\\t+流量下行,手机号码>
 */
public class FlowSortMapper extends Mapper<LongWritable, Text, FlowBean, Text>

	@Override
	protected void map(LongWritable key, Text value,Context context)
			throws IOException, InterruptedException 
		//1:读数据
		String line=value.toString();
		
		//2:切割
		String[] fields=line.split("\\t");
		
		//3:取出指定字段
		long upFlow=Long.parseLong(fields[1]);
		long dfFlow=Long.parseLong(fields[2]);
		
		//4:输出到reduce阶段
		context.write(new FlowBean(upFlow, dfFlow), new Text(fields[0]));
	
	
	


 

  

3:编写Reducer

 

package it.dawn.YARNPra.基本用法.排序;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowSortReducer extends Reducer<FlowBean, Text, Text, FlowBean>

	@Override
	protected void reduce(FlowBean k3, Iterable<Text> v3, Context context)
			throws IOException, InterruptedException 
		
		//直接输出
		context.write(v3.iterator().next(), k3);
	
	


 

  

4:编写Partitioner

 

 

package it.dawn.YARNPra.基本用法.排序;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class FlowSortPartitioner extends Partitioner<FlowBean, Text>

	@Override
	public int getPartition(FlowBean key, Text value, int numPartitions) 
		
		//1: 获取手机前3个数字
		String phoneThree=value.toString().substring(0, 3);
		
		//2:定义分区号
		int partitioner=4;
		

		if("135".equals(phoneThree)) 
			return 0;
		else if("137".equals(phoneThree)) 
			return 1;
		else if("138".equals(phoneThree)) 
			return 2;
		else if("139".equals(phoneThree)) 
			return 3;
		
		
		return partitioner;
	


  

5:编写driver类

 

package it.dawn.YARNPra.基本用法.排序;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


/**
 * @author Dawn
 * @date 2019年5月7日09:22:12
 * @version 1.0
 * 需求?
 * 将数据进行分区,并在每个分区中进行排序
 */
public class FlowSortDriver 
	
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException 
		//1:添加配置
		Configuration conf=new Configuration();
		Job job=Job.getInstance(conf);
		
		//2:设置主类
		job.setJarByClass(FlowSortDriver.class);
		
		//3:设置Mapper和Reduce类
		job.setMapperClass(FlowSortMapper.class);
		job.setReducerClass(FlowSortReducer.class);
		
		//4:设置Map输出类
		job.setMapOutputKeyClass(FlowBean.class);
		job.setMapOutputValueClass(Text.class);
		
		//5:设置Reduce输出类
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);
		
		//添加自定义分区
		job.setPartitionerClass(FlowSortPartitioner.class);
		job.setNumReduceTasks(5);
		
		//6:设置输入输出路径
		FileInputFormat.setInputPaths(job, new Path("f:/temp/流量统计结果/out1/part-r-00000"));
		FileOutputFormat.setOutputPath(job, new Path("f:/temp/流量统计结果/out2"));
		
		//7提交任务
		boolean rs=job.waitForCompletion(true);
		System.out.println(rs ? "success" : "fail");
	


  

查看最终的输出结果:

技术图片

 

技术图片

 

 

  

flink内核原理学习任务提交流程(代码片段)

...体组件解释二、Flink任务提交流程(yarn-session模式)Java、大数据开发学习要点(持续更新中…)  首先要知道的是,Flink作业提交是由PipelineExecutor(流水线 查看详情

(2021-03-26)大数据学习之flink安装部署以及任务提交/调度详解(代码片段)

...r与Slots3.3.1作用与关系3.3.2任务执行资源合理占用4.程序与数据流4.1数据传输形式4.2任务链(OperatorChains)1.F 查看详情

hive学习之排序(代码片段)

...nbsp;二、每个MapReduce内部排序(sortby)Sortby:对于大规模的数据集来说Orderby的效率特别低。很多情况下,不需要全局排序,此时可以使用sort不要。Sortby是为每个reducer产生一个排序文件,对每个reducer内部进行排序,对全局结果不... 查看详情

大数据学习之mapreduce编程案例一单词计数10

一:单词计数1:单词计数总流程图 2:代码实现1:Map阶段packageit.dawn.YARNPra.wc_hdfs;importjava.io.IOException;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Te 查看详情

(2021-03-11)大数据学习之flink基础概念以及简单demo(代码片段)

Flink的简介1.什么是Flink1.1有边界数据流和无边界数据流1.2有状态计算2.Flink的起源3.为什么要用Flink4.什么是流处理和批处理5.Flink的应用场景6.Flink的简单Demo6.1批处理6.2流处理由于公司要求下周可能会要分享一下Flink。所以这周抽了... 查看详情

大数据学习之hdfs基本命令操作05

1)hdfs的客户端1.网页形式->测试用http://192.168.40.11:50070/dfshealth.html#tab-overview2.命令行形式->测试用3.企业级API 2)查看帮助 hdfsdfs-help 3)查看当前目录信息 hdfsdfs-ls/ 3)上传文件hdfsdfs-put/本地路径/hdfs路径& 查看详情

大数据学习之linux进阶02

大数据学习之Linux进阶1->配置IP 1)修改配置文件 vi/sysconfig/network-scripts/ifcfg-eno16777736 2)注释掉dhcp #BOOTPROTO="dhcp" 3)添加配置(windows->ipconfig-all) IPADDR=192.168.50.179 NETMASK=225.255.255.0 GATEWAY=192. 查看详情

elasticsearch聚合学习之四:结果排序(代码片段)

...聚合的结果以桶(bucket)为单位,放在JSON数组中返回,这些数据是没有排序的,今天来学习如何给这些数据进行排序;系列文章列表《Elasticsearch聚合学习之一:基本操作》;《Elasticsearch聚合学习之二:区间聚合》;《Elasticsearch聚... 查看详情

大数据入门学习之hadoop技术优缺点

大数据入门学习之Hadoop技术优缺点(1)Hadoop具有按位存储和处理数据能力的高可靠性。(2)Hadoop通过可用的计算机集群分配数据,完成存储和计算任务,这些集群可以方便地扩展到数以千计的节点中,具有高扩展性。(3)Hadoop能够在节... 查看详情

大数据学习之kafka消息队列31

一:Kafka概述离线部分:Hadoop->离线计算(hdfs/mapreduce)yarnzookeeper->分布式协调(动物管理员)hive->数据仓库(离线计算/sql)easycodingflume->数据采集sqoop->数据迁移mysql->hdfs/hivehdfs/hive->mysqlAzkaban->任务调度工具hbase-&g 查看详情

大数据学习之sqoop框架25

...apache.org/ 2)场景传统型缺点,分布式存储。把传统型数据库数据迁移。ApacheSqoop(TM)是一种用于在ApacheHadoop和结构化数据存储(如关系数据库)之间高效传输批量数据的工具。 2:Sqoop安装部署1)下载安装包2)解压tar-zx... 查看详情

大数据学习之hdfs的工作机制07

1:namenode+secondaryNameNode工作机制2:datanode工作机制 3:HDFS中的通信(代理对象RPC)下面用代码来实现基本的原理1:服务端代码packageit.dawn.HDFSPra.RPC.server;importjava.io.IOException;importorg.apache.hadoop.HadoopIllegalArgumentExce 查看详情

大数据hadoop学习之搭建hadoop平台(2.1)

   关于大数据,一看就懂,一懂就懵。 一、简介  Hadoop的平台搭建,设置为三种搭建方式,第一种是“单节点安装”,这种安装方式最为简单,但是并没有展示出Hadoop的技术优势,适合初学者快速搭建;第二种是“... 查看详情

spark学习之作业优化

...,我们学习了spark的语法、资源调度、sql语法优化和数据倾斜的技巧,今天我们来学习spark中的作业优化,也就是job优化。对往期内容感兴趣的同学可以参考👇:链接:spark学习之处理数据倾斜.链接:spark学习之sparksql... 查看详情

算法学习之排序算法(高速排序)

...思想设要排序的数组是A[0]……A[N-1],首先随意选取一个数据(通常选用数组的第一个数)作为重要数据,然后将全部比它小的数都放到它前面。全部比它大的数都放到它后面。这个过程称为一趟高速排序。值得注意的是,高速... 查看详情

大数据spark“蘑菇云”行动代码学习之adclickedstreamingstats模块分析

2016年大数据Spark“蘑菇云”行动代码学习之AdClickedStreamingStats模块分析  系统背景:用户使用终端设备(IPAD、手机、浏览器)等登录系统,系统采用js脚本发送用户信息和广告点击信息到后台日志,进入flume监控,通过kafka... 查看详情

大数据学习之hadoop生态圈(代码片段)

...6、Hadoop的优点及缺点7、Hadoop组成前言上篇文章讲述了大数据的发展及历程,这篇文章就带大家进入大数据的技术应用,以下文章观点或描述如有错误,请指正!!1、什么是hadoop广义&#x 查看详情

大数据学习之storm实时统计网站访问量案例35

案例一:统计网站访问量(实时统计) 实时流式计算框架:storm 1)spout数据源,接入数据源本地文件如下编写spout程序:packagepvcount;importjava.io.BufferedReader;importjava.io.FileInputStream;importjava.io.FileNotFoundException;importjava.io.I 查看详情