hadoop3-mapreducecovid-19案例实践(代码片段)

小毕超 小毕超     2022-12-07     444

关键词:

一、COVID-19 案例

上篇文章对 MapReduce 进行了介绍,并编写了 WordCount 经典案例的实现,本篇为继续加深 MapReduce 的用法,实践 COVID-19 新冠肺炎案例,下面是上篇文章的地址:

https://blog.csdn.net/qq_43692950/article/details/127195121

COVID-19,简称“新冠肺炎”,世界卫生组织命名为“2019冠状病毒病” [1-2] ,是指2019新型冠状病毒感染导致的肺炎。现有美国 2021-01-28 号,各个县county的新冠疫情累计案例信息,包括确诊病例和死亡病例,数据格式如下所示:

date(日期),county(县),state(州),fips(县编码code),cases(累计确诊病例),deaths(累计死亡病例)
2021-01-28,Pike,Alabama,01109,2704,35
2021-01-28,Randolph,Alabama,01111,1505,37
2021-01-28,Russell,Alabama,01113,3675,16
2021-01-28,Shelby,Alabama,01117,19878,141
2021-01-28,St. Clair,Alabama,01115,8047,147
2021-01-28,Sumter,Alabama,01119,925,28
2021-01-28,Talladega,Alabama,01121,6711,114
2021-01-28,Tallapoosa,Alabama,01123,3258,112
2021-01-28,Tuscaloosa,Alabama,01125,22083,283
2021-01-28,Walker,Alabama,01127,6105,185
2021-01-28,Washington,Alabama,01129,1454,27

数据集下载

https://download.csdn.net/download/qq_43692950/86805389

二、计算各个州的累积cases、deaths

创建 VO 类存储 cases、deaths 个数:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class CountVO implements Writable 

    private Long cases;//确诊病例数
    private Long deaths;//死亡病例数

    public void set(long cases, long deaths) 
        this.cases = cases;
        this.deaths = deaths;
    

    /**
     *  序列化方法
     */
    @Override
    public void write(DataOutput out) throws IOException 
        out.writeLong(cases);
        out.writeLong(deaths);
    

    /**
     * 反序列化方法 注意顺序
     */
    @Override
    public void readFields(DataInput in) throws IOException 
        this.cases = in.readLong();
        this.deaths =in.readLong();
    

    @Override
    public String toString() 
        return  cases +"\\t"+ deaths;
    


创建 Mapper 类,截取出cases、deaths,以为 key ,CountVO 为 Value :

public class SumMapper extends Mapper<LongWritable, Text, Text, CountVO> 

    Text outKey = new Text();
    CountVO outValue = new CountVO();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
        String[] fields = value.toString().split(",");
        //州
        outKey.set(fields[2]);
        //Covid数据 确诊病例 死亡病例
        outValue.set(Long.parseLong(fields[fields.length-2]),Long.parseLong(fields[fields.length-1]));
        context.write(outKey,outValue);
    

创建 Reducer ,对 cases、deaths 累加:

public class SumReducer extends Reducer<Text, CountVO,Text, CountVO> 

    CountVO outValue = new CountVO();

    @Override
    protected void reduce(Text key, Iterable<CountVO> values, Context context) throws IOException, InterruptedException 
        long totalCases = 0;
        long totalDeaths =0;
        //累加统计
        for (CountVO value : values) 
            totalCases += value.getCases();
            totalDeaths +=value.getDeaths();
        
        outValue.set(totalCases,totalDeaths);
        context.write(key,outValue);
    

创建驱动类,加载上面的 Mapper 和 Reducer :

public class SumDriver extends Configured implements Tool 
    public static void main(String[] args) throws Exception
        //配置文件对象
        Configuration conf = new Configuration();
        int status = ToolRunner.run(conf, new SumDriver(), args);
        System.exit(status);
    

    @Override
    public int run(String[] args) throws Exception 
        // 配置本次作业的输入数据路径 和输出数据路径,通过参数传递进来,
        // 如果输入是一个目录,则会读取目录下的所有文件汇总到进行处理
        Path input = new Path(args[0]);
        Path output = new Path(args[1]);

        // 输出目录必须为空,如果不为空则会报错提示
        FileSystem fs = FileSystem.get(getConf());
        if(fs.exists(output))
            fs.delete(output,true);
        
        // 创建作业实例
        Job job = Job.getInstance(getConf(), SumDriver.class.getSimpleName());
        // 设置作业驱动类
        job.setJarByClass(SumDriver.class);

        // 设置作业mapper reducer类
        job.setMapperClass(SumMapper.class);
        job.setReducerClass(SumReducer.class);

        // 设置作业mapper阶段输出key value数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(CountVO.class);
        //设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(CountVO.class);

        // 配置作业的输入数据路径
        FileInputFormat.addInputPath(job, input);
        // 配置作业的输出数据路径
        FileOutputFormat.setOutputPath(job, output);

        return job.waitForCompletion(true)? 0:1;
    

数据集目录和输出目录通过参数传递进来,这里我将数据集放在了 D:/test/input 下:

如果是打包后放在 hadoop 集群运行,则:

hadoop jar <jar path> <driver class path> <args>
# 或者
yarn jar <jar path> <driver class path> <args>


运行成功后,到输出目录查看结果:


已成功统计出相关结果。

三、对上面计算的结果根据deaths进行倒叙排列

上麦已经计算出了每个州的cases、deaths,如果还需要根据deaths进行倒叙排列的话,我们可以针对上面 job 输出的结果在进行处理,利用 MapReducekey的排序行为,将上个 jobvalue 作为本次 jobkey

CountVO 进行修改,通过实现 Comparable 实现排序的效果,不过在上面我们已经实现了 Writable接口,在上篇文章中就讲到 Hadoop 为我们提供了 WritableComparable 已经实现好了 Writable, Comparable ,下面将 CountVO 中的 Writable 换成 WritableComparable

@Data
@AllArgsConstructor
@NoArgsConstructor
public class CountVO implements WritableComparable<CountVO> 

    private Long cases;//确诊病例数
    private Long deaths;//死亡病例数

    public void set(long cases, long deaths) 
        this.cases = cases;
        this.deaths = deaths;
    

    /**
     *  序列化方法
     */
    @Override
    public void write(DataOutput out) throws IOException 
        out.writeLong(cases);
        out.writeLong(deaths);
    

    /**
     * 反序列化方法 注意顺序
     */
    @Override
    public void readFields(DataInput in) throws IOException 
        this.cases = in.readLong();
        this.deaths =in.readLong();
    

    @Override
    public String toString() 
        return  cases +"\\t"+ deaths;
    

    @Override
    public int compareTo(CountVO o) 
        return this.deaths.equals(o.getDeaths()) ? 0 : (this.deaths - o.getDeaths() > 0 ? -1 : 1);
    

compareTo 方法用于将当前对象与方法的参数进行比较。如果指定的数与参数相等返回0。如果指定的数小于参数返回 -1。如果指定的数大于参数返回 1。

创建 MapperkeyCountVO

public class SortSumMapper extends Mapper<LongWritable, Text, CountVO, Text> 

    CountVO outKey = new CountVO();
    Text outValue = new Text();

    @Override
    protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException 
        String[] fields = value.toString().split("\\t");
        outKey.set(Long.parseLong(fields[1]),Long.parseLong(fields[2]));
        outValue.set(fields[0]);
        context.write(outKey,outValue);
    

编写 Reducer, 无需做任何操作直接 write 即可

public class SortSumReducer extends Reducer<CountVO, Text, Text,CountVO> 
    @Override
    protected void reduce(CountVO key, Iterable<Text> values, Context context) throws IOException, InterruptedException 
        Text outKey = values.iterator().next();
        context.write(outKey,key);
    

编写驱动类:

public class SortSumDriver extends Configured implements Tool 
    public static void main(String[] args) throws Exception
        //配置文件对象
        Configuration conf = new Configuration();
        int status = ToolRunner.run(conf, new SortSumDriver(), args);
        System.exit(status);
    

    @Override
    public int run(String[] args) throws Exception 
        // 配置本次作业的输入数据路径 和输出数据路径,通过参数传递进来,
        // 如果输入是一个目录,则会读取目录下的所有文件汇总到进行处理
        Path input = new Path(args[0]);
        Path output = new Path(args[1]);

        // 输出目录必须为空,如果不为空则会报错提示
        FileSystem fs = FileSystem.get(getConf());
        if(fs.exists(output))
            fs.delete(output,true);
        
        // 创建作业实例
        Job job = Job.getInstance(getConf(), SortSumDriver.class.getSimpleName());
        // 设置作业驱动类
        job.setJarByClass(SortSumDriver.class);

        // 设置作业mapper reducer类
        job.setMapperClass(SortSumMapper.class);
        job.setReducerClass(SortSumReducer.class);

        // 设置作业mapper阶段输出key value数据类型
        job.setMapOutputKeyClass(CountVO.class);
        job.setMapOutputValueClass(Text.class);
        //设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(CountVO.class);

        // 配置作业的输入数据路径
        FileInputFormat.addInputPath(job, input);
        // 配置作业的输出数据路径
        FileOutputFormat.setOutputPath(job, output);

        return job.waitForCompletion(true)? 0:1;
    

将上个 job 的结果放在 D:/test/input1 下,执行该驱动类:


执行成功后,到输出目录查看结果:


已经实现根据 死亡病例进行倒叙排列

四、对每个州的 deaths 筛选出Top3的县

修改 CountVO

@Data
@AllArgsConstructor
@NoArgsConstructor
public class CountVO implements WritableComparable<CountVO> 

    private String county;//县
    private Long cases;//确诊病例数
    private Long deaths;//死亡病例数

    public CountVO(CountVO vo)
        this.county = vo.getCounty();
        this.cases = vo.getCases();
        this.deaths = vo.getDeaths();
    

    public void set(long cases, long deaths, String county) 
        this.cases = cases;
        this.deaths = deaths;
        this.county = county;
    

    /**
     * 序列化方法
     */
    @Override
    public void write(DataOutput out) throws IOException 
        out.writeLong(cases);
        out.writeLong(deaths);
        out.writeUTF(county);
    

    /**
     * 反序列化方法 注意顺序
     */
    @Override
    public void readFields(DataInput in) throws IOException 
        this.cases = in.readLong();
        this.deaths = in.readLong();
        this.county = in.readUTF();
    

    @Override
    public String toString() 
        return county + "\\t" + cases + "\\t" + deaths;
    

    @Override
    public int compareTo(CountVO o) 
        return this.deaths.equals(o.getDeaths()) ? 0 : (this.deaths - o.getDeaths() > 0 ? -1 : 1);
    

修改 SumMapper 类:

public class SumMapper extends Mapper<LongWritable, Text, Text, CountVO> 

    Text outKey = new Text();
    CountVO outValue = new CountVO();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
        String[] fields = value.toString().split(",");
        //州
        outKey.set(fields[2]);
        //Covid数据 确诊病例 死亡病例 县
        outValue.set(Long.parseLong(fields[fields.length - 2]