hadoop3-mapreduce分组介绍及案例实践(代码片段)

小毕超 小毕超     2023-01-12     798

关键词:

一、MapReduce 分组

上篇文章对 MapReduce 分区进行了介绍,通过分区规则控制不同的数据进到不同的 reducetask 中,而本篇文章讲的分组则是进到同一个 reducetask 中的数据的归类分组规则,下面是上篇文章的地址:

https://blog.csdn.net/qq_43692950/article/details/127477363

分组在发生在reduce阶段,决定了同一个reduce中哪些数据将组成一组调用一次reduce方法处理。默认分组规则是:key相同的就会分为一组(前后两个key直接比较是否相等)。在reduce阶段进行分组之前,会首先进行数据排序行为(key 的 compareTo 方法)。

如果需要自定义规则,只需继承 WritableComparator,重写Compare方法,如果返回结果是 0 则认为前后两个相等分为一组,该类需要在job 对象中进行设置:

job.setGroupingComparatorClass(xxxx.class);

注意点:
在分组后有可能出现对reduce参数values遍历时发现 key 的值也会变化,比如: key 值是个对象,其中有 a,b 两个属性,其中自定义分组中根据 a 相同则认为一组,可以发现在遍历 values 的时候 keyb 是变化的(前提是Map 阶段的 a 相同 而 b 不相同)。

下面利用上面的特征对前面文章讲解的 COVID-19 案例进行进一步分析,下面是该文章地址:

https://blog.csdn.net/qq_43692950/article/details/127475811

在这篇文章中我们有对每个州的 deaths 筛选出Top3的县,当时使用的 JavaList 排序进行筛选的,这种情况在数据量巨大的情况下很容易产生 OOM,因此可以基于自定义排序加分组的方式计算出 Top3的县 。

二、计算每个州的 deaths 筛选出Top3的县

修改 CountVO 类,修改排序规则,根据 state 正序排列,相同的然后再根据 deaths 倒序排序:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class CountVO implements WritableComparable<CountVO> 

    private String state;//州
    private String county;//县
    private Long deaths;//死亡病例

    public void set(String state, String county, Long deaths) 
        this.state = state;
        this.county = county;
        this.deaths = deaths;
    

    @Override
    public void write(DataOutput out) throws IOException 
        out.writeUTF(state);
        out.writeUTF(county);
        out.writeLong(deaths);
    

    @Override
    public void readFields(DataInput in) throws IOException 
        this.state = in.readUTF();
        this.county = in.readUTF();
        this.deaths = in.readLong();
    

    //排序规则,根据首先根据 state 正序排列,然后根据 deaths 倒序排序
    @Override
    public int compareTo(CountVO o) 
        int i = this.state.compareTo(o.getState());
        if (i == 0)
            return this.deaths.equals(o.getDeaths()) ? 0 : (this.deaths - o.getDeaths() > 0 ? -1 : 1);
        
        return i;
    

    @Override
    public String toString() 
        return "CountVO" +
                "state='" + state + '\\'' +
                ", county='" + county + '\\'' +
                ", deaths=" + deaths +
                '';
    


编写自定义分组,继承 WritableComparator ,并使用 state 分组。

public class Top3GroupingComparator extends WritableComparator 

    protected Top3GroupingComparator()
        super(CountVO.class,true);
    

    @Override
    public int compare(WritableComparable a, WritableComparable b) 
        CountVO aBean = (CountVO) a;
        CountVO bBean = (CountVO) b;
        return aBean.getState().compareTo(bBean.getState());
    

编写 Mapper 类,由于对 key 进行了排序和分组,后面 Reducer 阶段直接对 key 操作即可,这里 value 给个 NullWritable 站位吧:

public class Top3Mapper extends Mapper<LongWritable, Text, CountVO, NullWritable> 

    CountVO outKey = new CountVO();
    NullWritable outValue = NullWritable.get();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
        String[] fields = value.toString().split(",");
        //封装数据: 州、县、死亡病例
        outKey.set(fields[2], fields[1], Long.parseLong(fields[fields.length - 1]));
        context.write(outKey, outValue);
    

编写 Reducer 由于这边对 state 进行了分组,所以 reduce 中的都是同一个 state 的,但是在 Mapper 阶段给的 countydeaths 都是不同的,因此可以通过便利 values 切换 key 的内容,而 key 则根据 deaths 降序排列了,因此前 3 个就是 Top3

public class Top3Reducer extends Reducer<CountVO, NullWritable, CountVO, NullWritable> 
    @Override
    protected void reduce(CountVO key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException 
        int num = 1;
        for (NullWritable value : values) 
            if (num > 3) 
                break;
            
            context.write(key, value);
            num++;
        
    

最后编写驱动类,指定 MapperReducer 以及 自定义分组类:

public class Top3Driver extends Configured implements Tool 

    public static void main(String[] args) throws Exception
        //配置文件对象
        Configuration conf = new Configuration();
        int status = ToolRunner.run(conf, new Top3Driver(), args);
        System.exit(status);
    

    @Override
    public int run(String[] args) throws Exception 
        // 配置本次作业的输入数据路径 和输出数据路径,通过参数传递进来,
        // 如果输入是一个目录,则会读取目录下的所有文件汇总到进行处理
        Path input = new Path(args[0]);
        Path output = new Path(args[1]);

        // 输出目录必须为空,如果不为空则会报错提示
        FileSystem fs = FileSystem.get(getConf());
        if(fs.exists(output))
            fs.delete(output,true);
        
        // 创建作业实例
        Job job = Job.getInstance(getConf(), Top3Driver.class.getSimpleName());
        // 设置作业驱动类
        job.setJarByClass(Top3Driver.class);

        // 设置作业mapper reducer类
        job.setMapperClass(Top3Mapper.class);
        job.setReducerClass(Top3Reducer.class);

        // 设置作业mapper阶段输出key value数据类型
        job.setMapOutputKeyClass(CountVO.class);
        job.setMapOutputValueClass(NullWritable.class);
        //设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型
        job.setOutputKeyClass(CountVO.class);
        job.setOutputValueClass(NullWritable.class);

        //指定分组规则
        job.setGroupingComparatorClass(Top3GroupingComparator.class);

        // 配置作业的输入数据路径
        FileInputFormat.addInputPath(job, input);
        // 配置作业的输出数据路径
        FileOutputFormat.setOutputPath(job, output);

        return job.waitForCompletion(true)? 0:1;
    


运行驱动类,指定数据目录以及输出目录:

执行成功后,到输出目录中查看结果。

hadoop3-mapreduce分区介绍及自定义分区(代码片段)

一、MapReduce分区上篇文章使用COVID-19对MapReduce进一步的案例理解,本篇文章讲解MapReduce分区,下面是上篇文章的地址:https://blog.csdn.net/qq_43692950/article/details/127475811在默认情况下,不管map阶段有多少个并发执行task,... 查看详情

hadoop3-mapreduce分区介绍及自定义分区(代码片段)

一、MapReduce分区上篇文章使用COVID-19对MapReduce进一步的案例理解,本篇文章讲解MapReduce分区,下面是上篇文章的地址:https://blog.csdn.net/qq_43692950/article/details/127475811在默认情况下,不管map阶段有多少个并发执行task,... 查看详情

网络工程6防火墙介绍及配置实操

接上篇《5、路由器介绍及配置实操》之前我们讲解了网络设备路由器的介绍,以及完成了路由器的相关配置实操。本篇我们来讲解防火墙的基础知识以及相应的实操案例。一、什么是防火墙?防火墙是一个安全产品,... 查看详情

秋季软件工程专业实训分组及选题情况

2016年秋季软件工程专业实训分组及选题情况  实训意义  综合实训旨在为学生提供参与完整软件开发过程的实践经历,并对所涉及的应用领域进行初步了解,培养面向领域的工程能力。项目来源主要是学校或相关... 查看详情

hadoop3-mapreduce介绍于基本使用(代码片段)

一、MapReduceHadoopMapReduce是一个软件框架,用于轻松编写应用程序,这些应用程序以可靠,容错的方式并行处理大型硬件集群(数千个节点)上的大量数据(多TB数据集。它是一种面向海量数据处理的一种指... 查看详情

hadoop3-mapreduce介绍于基本使用(代码片段)

一、MapReduceHadoopMapReduce是一个软件框架,用于轻松编写应用程序,这些应用程序以可靠,容错的方式并行处理大型硬件集群(数千个节点)上的大量数据(多TB数据集。它是一种面向海量数据处理的一种指... 查看详情

mongodb高级查询多级分组聚合及时间计算应用实践案例(代码片段)

本文是MongoDB复杂聚合查询实践案例,其中使用较多时间变换及时间计算,粗略整理出来分享学习。1.案例一:筛选并按月份聚合流失客户需求描述:取客户交易特征集中最后一次记录(条件为流失、日交易次... 查看详情

网络工程5路由器介绍及配置实操

...置实例,本篇我们继续来讲解网络设备——路由器的介绍,以及完成路由器的相关配置实操。当我们申请了一条宽带后,一般都需要一台路由器来进行上网,那么路由器到底是什么呢?它的作用是什么呢?... 查看详情

servlet简要介绍及入门案例。

什么是Servlet呢?  Servlet试运行在服务器端的一个小的java程序,接收和相应从客户端发送的请求。那么Servlet的作用是什么呢?处理客户端的请求,并且对请求作出相应。下面是Servlet的简单入门案例:    //编写一个... 查看详情

事件视图索引介绍及案例

一. 事务  1.事务(TRANSACTION)是作为单个逻辑工作单元执行的一系列操作  2.事务的特性: 事务必须具备以下四个属性,简称ACID属性:(1)原子性(Atomicity):事务是一个完整的操作。事务的各步操作是不可分的... 查看详情

ha高可用的arp问题及arp企业级故障案例介绍

dfa 查看详情

navicat介绍及pymysql模块(代码片段)

内容回顾selectdistinct字段1,字段2,。。。from表名where分组之前的过滤条件groupby分组条件having分组之后过滤条件orderby排序字段1asc,排序字段2desclimit5,5as语法中给某个查询结果起别名的时候需要把查询语句中的分号去除(selectname,sa... 查看详情

shell脚本从入门到实战(代码片段)

...:多命令处理Shell中的变量系统变量1.常用系统变量2.案例实操自定义变量1.基本语法2.变量定义规则特殊变量:$n特殊变量:$#特殊变量:\\$*、\\$@特殊变量:$?运算符1.基本语法2.案例实操条件判断1.基本语法2... 查看详情

processing介绍及几个python模式下的案例(代码片段)

一.Processing介绍Processing是一门开源编程语言和与之配套的集成开发环境(IDE)的名称。Processing在电子艺术和视觉设计社区被用来教授编程基础,并运用于大量的新媒体和互动艺术作品中。Processing最开始的时候只是一... 查看详情

lucene介绍及简单入门案例(集成ik分词器)(代码片段)

介绍    Lucene是apache软件基金会4jakarta项目组的一个子项目,是一个开放源代码的全文检索引擎工具包,但它不是一个完整的全文检索引擎,而是一个全文检索引擎的架构,提供了完整的查询引擎和索引引擎,部分文本分析... 查看详情

十mysql聚合函数分组查询及过滤分组(代码片段)

...COUNT()函数1.2SUM()函数1.3AVG()函数1.4MAX()函数1.5MIN()函数二、分组查询及过滤分组2.1创建分组2.2使用HAVING过滤分组2.3WHERE和HAVING的对比前置知识:一、数据库开发与实战专栏导学及数据库基础概念入门二、MySQL介绍及MySQL安装与配... 查看详情

《图解spark:核心技术与案例实战》介绍及书附资源

本书中所使用到的测试数据、代码和安装包放在百度盘提供下载,地址为https://pan.baidu.com/s/1o8ydtKA密码:imaa 另外在百度盘提供本书附录 下载,地址为http://pan.baidu.com/s/1o7Busye密码:shdf 为什么要写这本书在过去的十几年... 查看详情

[z]分区truncate操作的介绍及对全局索引和空间释放影响的案例解析

[z]https://www.2cto.com/database/201301/181226.html环境: [sql][[email protected] ~]$ uname -r2.6.18-308.el5xen[[email protected] ~]$ sqlplus -v SQL*Plus 查看详情