关键词:
一、MapReduce 分组
上篇文章对 MapReduce
分区进行了介绍,通过分区规则控制不同的数据进到不同的 reducetask
中,而本篇文章讲的分组则是进到同一个 reducetask
中的数据的归类分组规则,下面是上篇文章的地址:
分组在发生在reduce
阶段,决定了同一个reduce
中哪些数据将组成一组调用一次reduce
方法处理。默认分组规则是:key
相同的就会分为一组(前后两个key直接比较是否相等)。在reduce
阶段进行分组之前,会首先进行数据排序行为(key 的 compareTo 方法
)。
如果需要自定义规则,只需继承 WritableComparator
,重写Compare
方法,如果返回结果是 0
则认为前后两个相等分为一组,该类需要在job
对象中进行设置:
job.setGroupingComparatorClass(xxxx.class);
注意点:
在分组后有可能出现对reduce
参数values
遍历时发现 key
的值也会变化,比如: key
值是个对象,其中有 a,b
两个属性,其中自定义分组中根据 a
相同则认为一组,可以发现在遍历 values
的时候 key
的 b
是变化的(前提是Map
阶段的 a
相同 而 b
不相同)。
下面利用上面的特征对前面文章讲解的 COVID-19
案例进行进一步分析,下面是该文章地址:
在这篇文章中我们有对每个州的 deaths
筛选出Top3
的县,当时使用的 Java
的 List
排序进行筛选的,这种情况在数据量巨大的情况下很容易产生 OOM
,因此可以基于自定义排序加分组的方式计算出 Top3
的县 。
二、计算每个州的 deaths 筛选出Top3的县
修改 CountVO
类,修改排序规则,根据 state 正序排列,相同的然后再根据 deaths 倒序排序:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CountVO implements WritableComparable<CountVO>
private String state;//州
private String county;//县
private Long deaths;//死亡病例
public void set(String state, String county, Long deaths)
this.state = state;
this.county = county;
this.deaths = deaths;
@Override
public void write(DataOutput out) throws IOException
out.writeUTF(state);
out.writeUTF(county);
out.writeLong(deaths);
@Override
public void readFields(DataInput in) throws IOException
this.state = in.readUTF();
this.county = in.readUTF();
this.deaths = in.readLong();
//排序规则,根据首先根据 state 正序排列,然后根据 deaths 倒序排序
@Override
public int compareTo(CountVO o)
int i = this.state.compareTo(o.getState());
if (i == 0)
return this.deaths.equals(o.getDeaths()) ? 0 : (this.deaths - o.getDeaths() > 0 ? -1 : 1);
return i;
@Override
public String toString()
return "CountVO" +
"state='" + state + '\\'' +
", county='" + county + '\\'' +
", deaths=" + deaths +
'';
编写自定义分组,继承 WritableComparator
,并使用 state
分组。
public class Top3GroupingComparator extends WritableComparator
protected Top3GroupingComparator()
super(CountVO.class,true);
@Override
public int compare(WritableComparable a, WritableComparable b)
CountVO aBean = (CountVO) a;
CountVO bBean = (CountVO) b;
return aBean.getState().compareTo(bBean.getState());
编写 Mapper
类,由于对 key
进行了排序和分组,后面 Reducer
阶段直接对 key
操作即可,这里 value
给个 NullWritable
站位吧:
public class Top3Mapper extends Mapper<LongWritable, Text, CountVO, NullWritable>
CountVO outKey = new CountVO();
NullWritable outValue = NullWritable.get();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
String[] fields = value.toString().split(",");
//封装数据: 州、县、死亡病例
outKey.set(fields[2], fields[1], Long.parseLong(fields[fields.length - 1]));
context.write(outKey, outValue);
编写 Reducer
由于这边对 state
进行了分组,所以 reduce
中的都是同一个 state
的,但是在 Mapper
阶段给的 county
和 deaths
都是不同的,因此可以通过便利 values
切换 key
的内容,而 key 则根据 deaths
降序排列了,因此前 3
个就是 Top3
:
public class Top3Reducer extends Reducer<CountVO, NullWritable, CountVO, NullWritable>
@Override
protected void reduce(CountVO key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException
int num = 1;
for (NullWritable value : values)
if (num > 3)
break;
context.write(key, value);
num++;
最后编写驱动类,指定 Mapper
和 Reducer
以及 自定义分组类:
public class Top3Driver extends Configured implements Tool
public static void main(String[] args) throws Exception
//配置文件对象
Configuration conf = new Configuration();
int status = ToolRunner.run(conf, new Top3Driver(), args);
System.exit(status);
@Override
public int run(String[] args) throws Exception
// 配置本次作业的输入数据路径 和输出数据路径,通过参数传递进来,
// 如果输入是一个目录,则会读取目录下的所有文件汇总到进行处理
Path input = new Path(args[0]);
Path output = new Path(args[1]);
// 输出目录必须为空,如果不为空则会报错提示
FileSystem fs = FileSystem.get(getConf());
if(fs.exists(output))
fs.delete(output,true);
// 创建作业实例
Job job = Job.getInstance(getConf(), Top3Driver.class.getSimpleName());
// 设置作业驱动类
job.setJarByClass(Top3Driver.class);
// 设置作业mapper reducer类
job.setMapperClass(Top3Mapper.class);
job.setReducerClass(Top3Reducer.class);
// 设置作业mapper阶段输出key value数据类型
job.setMapOutputKeyClass(CountVO.class);
job.setMapOutputValueClass(NullWritable.class);
//设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型
job.setOutputKeyClass(CountVO.class);
job.setOutputValueClass(NullWritable.class);
//指定分组规则
job.setGroupingComparatorClass(Top3GroupingComparator.class);
// 配置作业的输入数据路径
FileInputFormat.addInputPath(job, input);
// 配置作业的输出数据路径
FileOutputFormat.setOutputPath(job, output);
return job.waitForCompletion(true)? 0:1;
运行驱动类,指定数据目录以及输出目录:
执行成功后,到输出目录中查看结果。
hadoop3-mapreduce分区介绍及自定义分区(代码片段)
一、MapReduce分区上篇文章使用COVID-19对MapReduce进一步的案例理解,本篇文章讲解MapReduce分区,下面是上篇文章的地址:https://blog.csdn.net/qq_43692950/article/details/127475811在默认情况下,不管map阶段有多少个并发执行task,... 查看详情
hadoop3-mapreduce分区介绍及自定义分区(代码片段)
一、MapReduce分区上篇文章使用COVID-19对MapReduce进一步的案例理解,本篇文章讲解MapReduce分区,下面是上篇文章的地址:https://blog.csdn.net/qq_43692950/article/details/127475811在默认情况下,不管map阶段有多少个并发执行task,... 查看详情
网络工程6防火墙介绍及配置实操
接上篇《5、路由器介绍及配置实操》之前我们讲解了网络设备路由器的介绍,以及完成了路由器的相关配置实操。本篇我们来讲解防火墙的基础知识以及相应的实操案例。一、什么是防火墙?防火墙是一个安全产品,... 查看详情
秋季软件工程专业实训分组及选题情况
2016年秋季软件工程专业实训分组及选题情况 实训意义 综合实训旨在为学生提供参与完整软件开发过程的实践经历,并对所涉及的应用领域进行初步了解,培养面向领域的工程能力。项目来源主要是学校或相关... 查看详情
hadoop3-mapreduce介绍于基本使用(代码片段)
一、MapReduceHadoopMapReduce是一个软件框架,用于轻松编写应用程序,这些应用程序以可靠,容错的方式并行处理大型硬件集群(数千个节点)上的大量数据(多TB数据集。它是一种面向海量数据处理的一种指... 查看详情
hadoop3-mapreduce介绍于基本使用(代码片段)
一、MapReduceHadoopMapReduce是一个软件框架,用于轻松编写应用程序,这些应用程序以可靠,容错的方式并行处理大型硬件集群(数千个节点)上的大量数据(多TB数据集。它是一种面向海量数据处理的一种指... 查看详情
mongodb高级查询多级分组聚合及时间计算应用实践案例(代码片段)
本文是MongoDB复杂聚合查询实践案例,其中使用较多时间变换及时间计算,粗略整理出来分享学习。1.案例一:筛选并按月份聚合流失客户需求描述:取客户交易特征集中最后一次记录(条件为流失、日交易次... 查看详情
网络工程5路由器介绍及配置实操
...置实例,本篇我们继续来讲解网络设备——路由器的介绍,以及完成路由器的相关配置实操。当我们申请了一条宽带后,一般都需要一台路由器来进行上网,那么路由器到底是什么呢?它的作用是什么呢?... 查看详情
servlet简要介绍及入门案例。
什么是Servlet呢? Servlet试运行在服务器端的一个小的java程序,接收和相应从客户端发送的请求。那么Servlet的作用是什么呢?处理客户端的请求,并且对请求作出相应。下面是Servlet的简单入门案例: //编写一个... 查看详情
事件视图索引介绍及案例
一. 事务 1.事务(TRANSACTION)是作为单个逻辑工作单元执行的一系列操作 2.事务的特性: 事务必须具备以下四个属性,简称ACID属性:(1)原子性(Atomicity):事务是一个完整的操作。事务的各步操作是不可分的... 查看详情
ha高可用的arp问题及arp企业级故障案例介绍
dfa 查看详情
navicat介绍及pymysql模块(代码片段)
内容回顾selectdistinct字段1,字段2,。。。from表名where分组之前的过滤条件groupby分组条件having分组之后过滤条件orderby排序字段1asc,排序字段2desclimit5,5as语法中给某个查询结果起别名的时候需要把查询语句中的分号去除(selectname,sa... 查看详情
shell脚本从入门到实战(代码片段)
...:多命令处理Shell中的变量系统变量1.常用系统变量2.案例实操自定义变量1.基本语法2.变量定义规则特殊变量:$n特殊变量:$#特殊变量:\\$*、\\$@特殊变量:$?运算符1.基本语法2.案例实操条件判断1.基本语法2... 查看详情
processing介绍及几个python模式下的案例(代码片段)
一.Processing介绍Processing是一门开源编程语言和与之配套的集成开发环境(IDE)的名称。Processing在电子艺术和视觉设计社区被用来教授编程基础,并运用于大量的新媒体和互动艺术作品中。Processing最开始的时候只是一... 查看详情
lucene介绍及简单入门案例(集成ik分词器)(代码片段)
介绍 Lucene是apache软件基金会4jakarta项目组的一个子项目,是一个开放源代码的全文检索引擎工具包,但它不是一个完整的全文检索引擎,而是一个全文检索引擎的架构,提供了完整的查询引擎和索引引擎,部分文本分析... 查看详情
十mysql聚合函数分组查询及过滤分组(代码片段)
...COUNT()函数1.2SUM()函数1.3AVG()函数1.4MAX()函数1.5MIN()函数二、分组查询及过滤分组2.1创建分组2.2使用HAVING过滤分组2.3WHERE和HAVING的对比前置知识:一、数据库开发与实战专栏导学及数据库基础概念入门二、MySQL介绍及MySQL安装与配... 查看详情
《图解spark:核心技术与案例实战》介绍及书附资源
本书中所使用到的测试数据、代码和安装包放在百度盘提供下载,地址为https://pan.baidu.com/s/1o8ydtKA密码:imaa 另外在百度盘提供本书附录 下载,地址为http://pan.baidu.com/s/1o7Busye密码:shdf 为什么要写这本书在过去的十几年... 查看详情
[z]分区truncate操作的介绍及对全局索引和空间释放影响的案例解析
[z]https://www.2cto.com/database/201301/181226.html环境: [sql][[email protected] ~]$ uname -r2.6.18-308.el5xen[[email protected] ~]$ sqlplus -v SQL*Plus 查看详情