大数据技术之hadoop(mapreduce)框架原理数据压缩(代码片段)

@从一到无穷大 @从一到无穷大     2022-12-10     211

关键词:

文章目录


1 MapReduce 框架原理

1.1 InputFormat 数据输入

1.1.1 切片与 MapTask并行度决定机制

问题引出

MapTask的并行度决定 Map阶段的任务处理并发度,进而影响到整个 Job的处理速度。
思考:1G的数据,启动 8个 MapTask可以提高集群的并发处理能力。那么1K的数据,也启动 8个MapTask,会提高集群性能吗? MapTask并行任务是否越多越好呢? 哪些因素影响了 MapTask并行度?

MapTask并行度决定机制

数据块: Block是 HDFS物理上把数据分成一块一块。 数据块是 HDFS存储数据单位 。
数据切片: 数据切片只是在逻辑上对输入进行分片, 并不会在磁盘上将其切分成片进行存储 。 数据切片是 MapReduce程序计算输入数据的单位 ,一个切片会对应启动一个 MapTask。

1.1.2 Job 提交流程源码和切片源码详解

Job 提交流程源码详解












FileInputFormat 切片源码解析(input.getSplits(job))




1.1.3 FileInputFormat 切片机制

切片机制

(1)简单地按照文件的内容长度进行切片
(2)切片大小,默认等于Block大小
(3)切片时不考虑数据集整体,而是逐个针对每一个文件单独切片

源码中计算切片大小的公式

Math.max(minSize, Math.min(maxSize, blockSize));
mapreduce.input.fileinputformat.split.minsize=1 默认值为1
mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默认值Long.MAXValue
因此,默认情况下,切片大小=blocksize。

切片大小设置

maxsize(切片最大值):参数如果调得比blockSize小,则会让切片变小,而且就等于配置的这个参数的值。
minsize(切片最小值):参数调的比blockSize大,则可以让切片变得比blockSize还大。

1.1.4 TextInputFormat

FileInputFormat 实现类

思考:在运行MapReduce 程序时,输入的文件格式包括:基于行的日志文件、二进制格式文件、数据库表等。那么,针对不同的数据类型,MapReduce 是如何读取这些数据的呢?

FileInputFormat 常见的接口实现类包括:TextInputFormatKeyValueTextInputFormatNLineInputFormatCombineTextInputFormat 和自定义InputFormat 等。

TextInputFormat

TextInputFormat 是默认的FileInputFormat 实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量,LongWritable类型。值是这行的内容,不包括任何行终止符( 换行符和回车符),Text类型。
以下是一个示例,比如,一个分片包含了如下4条文本记录。

Rich learning form
Intelligent learning engine
Learning more convenient
From the real demand for more close to the enterprise

每条记录表示为以下键/值对:

(0,Rich learning from)
(20 ,Intelligent learning engine)
(49 ,Learning more convenient)
(74 ,From the real demand for more close to the enterprise)

1.1.5 CombineTextInputFormat切片机制

框架默认的 TextInputFormat切片机制是对任务按文件规划切片,不管文件多小都会是一个单独的切片,都会交给一个 MapTask 这样如果有大量小文件就会产生大量的MapTask,处理效率极其低下。

应用场景
CombineTextInputFormat用于小文件过多的场景, 它可以将多个小文件从逻辑上规划到一个切片中,这样多个小文件就可以交给一个 MapTask处理 。

虚拟存储切片最大值设置
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 4m
注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。

切片机制
生成切片过程包括虚拟存储过程和切片过程两部分 。

(1)虚拟存储过程:
将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize 值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2 倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。
例如setMaxInputSplitSize 值为4M,输入文件大小为8.02M,则先逻辑上分成一个4M。剩余大小为4.02M,如果按照4M 逻辑划分,就会出现0.02M 的小的虚拟存储文件,所以将剩余的4.02M 文件切分成(2.01M 和2.01M)两个文件。

(2)切片过程:
(a)判断虚拟存储的文件大小是否大于setMaxInputSplitSize 值,大于等于则单独形成一个切片。
(b)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。
(c)测试举例:有4 个小文件大小分别为1.7M、5.1M、3.4M 以及6.8M 这四个小文件,则虚拟存储之后形成6 个文件块,大小分别为:1.7M,(2.55M、2.55M),3.4M 以及(3.4M、3.4M),最终会形成3 个切片,大小分别为:(1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M

1.1.6 CombineTextInputFormat 案例实操

需求

将输入的大量小文件合并成一个切片统一处理。
(1)输入数据。准备4 个小文件,a.txt,b.txt,c.txt,d.txt。
(2)期望一个切片处理4 个文件。

实现过程

(1)不做任何处理,运行WordCount 案例程序,观察切片个数为4。

(2)在WordcountDriver 中增加如下代码,运行程序,并观察运行的切片个数为3。

// 如果不设置InputFormat,它默认用的是TextInputFormat.class
        job.setInputFormatClass(CombineTextInputFormat.class);

// 虚拟存储切片最大值设置4M
        CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);


(3)在WordcountDriver 中增加如下代码,运行程序,并观察运行的切片个数为1。

// 如果不设置InputFormat,它默认用的是TextInputFormat.class
        job.setInputFormatClass(CombineTextInputFormat.class);

// 虚拟存储切片最大值设置20M
        CombineTextInputFormat.setMaxInputSplitSize(job, 20971520);

1.2 MapReduce 工作流程



上面的流程是整个 MapReduce最全工作流程,但是 Shuffle过程只是从第 7步开始到第 16步结束,具体 Shuffle过程详解,如下:

(1)MapTask 收集我们的 map()方法输出的 kv对,放到内存缓冲区中;
(2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件;
(3)多个溢出文件会被合并成大的溢出文件;
(4)在溢出过程及合并的过程中,都要调用 Partitioner进行分区和针对key 进行排序;
(5)ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据;
(6)ReduceTask会抓取到同一个分区的来自不同 MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序);
(7)合并成大文件后,Shuffle的过程也就结束了,后面进入 ReduceTask 的逻辑运算过程(从文件中取出一个一个的键值对Group,调用用户自定义的reduce()方法)。

注意

(1)Shuffle中的缓冲区大小会影响到 MapReduce程序的执行效率,原则上说,缓冲区越大,磁盘 io的次数越少,执行速度就越快。
(2)缓冲区的大小可以通过参数调整。参数:mapreduce.task.io.sort.mb 默认100M。

1.3 Shuffle 机制

1.3.1 Shuffle 机制

Map 方法之后,Reduce 方法之前的数据处理过程称之为Shuffle。

1.3.2 Partition 分区

问题引出

要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机归属地不同省份输出到不同文件中(分区)。

默认Partitioner分区

默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。

自定义Partitioner步骤

(1)自定义类继承Partitioner,重写getPartition()方法

public class CustomPartitioner extends Partitioner<Text,FlowBean>
	@Override
	public int getPartition(Text key, FlowBean value, int numPartitions) 
	// 控制分区代码逻辑
	… …
		return partition;
	

(2)在Job驱动中,设置自定义Partitioner

job.setPartitionerClass(CustomPartitioner.class);

(3)自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask

job.setNumReduceTasks(5);

分区总结

(1)如果ReduceTask的数量 > getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
(2)如果1< ReduceTask的数量 < getPartition的结果数,则有一部分分区数据无处安放,会Exception;
(3)如果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也就只会产生一个结果文件part-r-00000;
(4)分区号必须从零开始,逐一累加。

案例分析

例如:假设自定义分区数为5,则
(1)job.setNumReduceTasks(1); 会正常运行,只不过会产生一个输出文件会报错
(2)job.setNumReduceTasks(2); 会报错
(3)job.setNumReduceTasks(6); 大于5,程序会正常运行,会产生空文件

1.3.3 Partition 分区案例实操

需求

将统计结果按照手机归属地不同省份输出到不同文件中(分区)
(1)输入数据

(2)期望输出数据:手机号136、137、138、139 开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。

需求分析

分区类代码

package com.Tom.mapreduce.partitioner2;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class ProvincePartitioner extends Partitioner<Text, FlowBean> 
    @Override
    public int getPartition(Text text, FlowBean flowBean, int numPartitions) 
        // text是手机号

        String phone = text.toString();

        String prePhone = phone.substring(0, 3);

        int partition;

        if("136".equals(prePhone))
            partition = 0;
        else if("137".equals(prePhone))
            partition = 1;
        else if("138".equals(prePhone))
            partition = 2;
        else if("139".equals(prePhone))
            partition = 3;
        else 
            partition = 4;
        
        return partition;
    


编写Mapper类

package com.Tom.mapreduce.partitioner2;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> 

    private Text outK = new Text();
    private FlowBean outV = new FlowBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 

        // 1 获取一行
        // 1    13736230513 192.196.100.1   www.atguigu.com 2481    24681   200
        String line = value.toString();

        // 2 切割
        // 1,13736230513,192.196.100.1,www.atguigu.com,2481,24681,200
        // 2    138846544121 192.196.100.2      264 0   200
        String[] split = line.split("\\t");

        // 3 抓取想要的数据
        // 手机号:13736230513
        // 上行流量和下行流量:2481,24681
        String phone = split[1];
        String up = split[split.length - 3];
        String down = split[split.length - 2];

        // 4 封装
        outK.set(phone);
        outV.setUpFlow(Long.parseLong(up));
        outV.setDownFlow(Long.parseLong(down));
        outV.setSumFlow();

        // 5 写出
        context.write(outK, outV);
    

编写Reducer类

package com.Tom.mapreduce.partitioner2;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> 
    private FlowBean outV = new FlowBean();

    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException 

        // 1 遍历集合累加值
        long totalUp = 0;
        long totalDown = 0;

        for (FlowBean value : values) 
            totalUp += value.getUpFlow();
            totalDown += value.getDownFlow();
        

        // 2 封装outK, outV
        outV.setUpFlow(totalUp);
        outV.setDownFlow(totalDown);
        outV.setSumFlow();

        // 3 写出
        context.write(key, outV);
    


编写流量统计的Bean 对象

package com.Tom.mapreduce.partitioner2;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * 1. 定义类实现writable接口
 * 2. 重写序列化和反序列化方法
 * 3. 重写空参构造
 * 4. toString方法
 */
public class FlowBean implements Writable

    private long upFlow;    // 上行流量
    private long downFlow;  // 下行流量
    private long sumFlow;   // 总流量

    // 空参构造
    public FlowBean() 
    

    public long getUpFlow() 
        return upFlow;
    

    public void setUpFlow(long upFlow) 
        this.upFlow = upFlow;
    

    public long getDownFlow() 
        return downFlow;
    

    public void setDownFlow(long downFlow) 
        this.downFlow = downFlow;
    

    public long getSumFlow() 
        return sumFlow;
    

    public void setSumFlow(long sumFlow) 
        this.sumFlow = sumFlow;
    

    public void setSumFlow() 
        this.sumFlow = this.upFlow + this.downFlow;
    

    @Override
    public void write(DataOutput out) throws IOException 
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    

    @Override
    public void readFields(DataInput in) throws IOException 
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();
    

    @Override
    public String toString() 
        return upFlow + "\\t" + downFlow + "\\t" + sumFlow;
    


编写Driver驱动类

package com.huxili.mapreduce.partitioner2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowDirver 
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException 

        // 1 获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2 设置jar
        job.setJarByClass(FlowDirver.class);

        // 3 关联Mapper和Reducer
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        // 4 设置Mapper输出的key和value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        // 5 设置最终数据输出的key和value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        job.setPartitionerClass(ProvincePartitioner.class);
        job.setNumReduceTasks(6);

        // 6 设置数据的输入路径和输出路径
        FileInputFormat.setInputPaths(job, new Path("E:\\\\input\\\\inputflow"));
        FileOutputFormat.setOutputPath(job,new Path("E:\\\\hadoop\\\\output9"));

        // 7 提交job
        job.waitForCompletion(true);
    


将ReduceTasks设置为6,可以看到前5个文件包含对应的结果,第6个文件为空。

1.3.4 WritableComparable 排序

排序概述

排序是MapReduce框架中最重要的操作之一。MapTask 和ReduceTask 均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。
默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。
对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。

排序分类

(1)部分排序
MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。
(2)全排序
最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。
(3)辅助排序:(GroupingComparator分组)
在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。
(4)二次排序
在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。

自定义排序WritableComparable 原理分析

bean 对象做为key 传输,需要实现WritableComparable 接口重写compareTo方法,就可以实现排序。

@Override
public int compareTo(FlowBean bean) 
	int result;
	// 按照总流量大小,倒序排列
	if (this.sumFlow > bean.getSumFlow()) 
		result = -1;
	else if (this.sumFlow < bean.getSumFlow()) 
		result = 1;
	else 
		result = 0;
	
	return result;

1.3.5 WritableComparable 排序案例实操(二次排序)

需求分析

编写FlowBean类

package com.Tom

一文带你了解大数据技术之hadoop(代码片段)

...Hadoop4.Hadoop优势5.Hadoop组成5.1HDFS架构概述5.2YARN架构概述5.3MapReduce架构概述5.4HDFS、YARN、MapReduce三者关系6.大数据技术生态体系7.推荐系统框架 查看详情

大数据技术之hadoop(mapreduce)概述序列化(代码片段)

文章目录1MapReduce概述1.1MapReduce定义1.2MapReduce优缺点1.3MapReduce核心思想1.4MapReduce进程1.5官方WordCount源码1.6常用数据序列化类型1.7MapReduce编程规范1.8WordCount案例实操1.8.1本地测试1.8.2提交到集群测试2Hadoop序列化2.1序列化概述2.2自定义... 查看详情

一文带你了解大数据技术之mapreduce

MapReduce概述1.MapReduce定义2.MapReduce优缺点2.1优点2.2缺点3.MapReduce核心思想4.MapReduce进程5.官方WordCount源码6.常用数据序列化类型7.MapReduce编程规范8.WordCount案例实操1.MapReduce定义MapReduce是一个分布式运算程序的编程框架,是用户开... 查看详情

大数据技术之hadoop(入门)概述运行环境搭建运行模式(代码片段)

...(面试重点)1.3.1HDFS架构概述1.3.2YARN架构概述1.3.3MapReduce架构概述1.3.4HDFS、YARN、MapReduce三者关系1.3.5大数据技术生态体系1.3.6推荐系统框架图2Hadoop运行环境搭建(开发重点)2. 查看详情

大数据之二:hadoop与spark辨析

转载自知乎:https://www.zhihu.com/question/265684961)MapReduce:是一种离线计算框架,将一个算法抽象成Map和Reduce两个阶段进行处理,非常适合数据密集型计算。2)Spark:MapReduce计算框架不适合迭代计算和交互式计算,MapReduce是一种磁盘计... 查看详情

hadoop——hadoop优势组成大数据技术生态体系系统框架图

...(面试重点)1.5.1HDFS架构概述1.5.2YARN架构概述1.5.3MapReduce架构概述1.5.4HDFS、YARN、MapReduce三者关系1.6大数据技术生态体系1.7推荐系统框架图1.4Hadoop优势(4高)1)高可靠性 查看详情

hadoop之mapreduce详解

1、什么是Mapreduce    Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架;   Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,... 查看详情

大数据框架之hadoop:mapreducemapreduce框架原理——数据清洗(etl)(代码片段)

在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。3.9.1数据清洗案例实操-简单解析版1、需求去除日志中字段长... 查看详情

hadoop之mapreduce基础(代码片段)

一。MapReduce概念  Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架;  Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,... 查看详情

大数据离线计算路线图-hadoop工程师,数据分析师

...ase框架的全面深入讲解,为了能轻松掌握相关知识,学习MapReduce开发的20个经典案例讲解以及部分Hadoop源代码的分析,借此深入学习内核原理。方法/步骤Zookeeper入门到精通视频教程详细讲解Zookeeper的安装配置、命令使用、存储结... 查看详情

hadoop,hive,spark之间是啥关系

...ive,zookeeper,hbase等等;Hadoop的框架最核心的设计就是:HDFS和MapReduce。HDFS为海量的数据提供了存储,则MapReduce为海量的数据提供了计算。Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简... 查看详情

hadoop入门概念

...统基础架构。  Hadoop的框架最核心的设计就是:HDFS和MapReduce。HDFS为海量的数据提供了存储,则MapReduce为海量的数据 查看详情

大数据技术核心框架最强知识体系总结||(2021版)(含面试题)

...oop框架Hadoop是大数据开发的重要框架,其核心是HDFS和MapReduce,HDFS为海量的数据提供了存储,MapReduce为海量的数据提供了计算,因此,需要重点掌握,除此之外,还需要掌握Hadoop集群、Hadoop集群管理、YA... 查看详情

hadoop之mapreduce工作原理

...部分组成,分别是分布式文件系统HDFS和分布式计算框架MapReduce。其中,分布式文件系统HDFS主要用于大规模数据的分布式存储,而MapReduce则构建在分布式文件系统上,对于存储在分布式文件系统的数据进行分布式计算。1  ... 查看详情

大数据生态技术体系都有哪些?

...发的分布式系统基础设施。Hadoop框架的核心设计是HDFS和MapReduce。HDFS提供海量数据的存储,MapReduce提供海量数据的计算。Hadoop是一个基本框架,它可以托管许多其他东西,比如Hive。不想用编程语言开发MapReduce的人可以使用Hive进... 查看详情

10.sparksql之快速入门

...时代的来临,Hadoop风靡一时。为了使熟悉RDBMS但又不理解MapReduce的技术人员快速进行大数据开发,Hive应运而生。Hive是当时唯一运行在Hadoop上的SQL-on-Hadoop工具。??但是MapReduce计算过程中大量的中间磁盘落地过程消耗了大量的I/O,... 查看详情

大数据学习之mapreduce基础与yarn集群安装09

...式文件系统HDFS海量数据的计算:hadoop->分布式计算框架MapReduce 2什么是MapReduce? 分布式程序的编程框架,java->sshssm,目的:简化开发!是基于hadoop的数据分析应用的核心框架。mapreduce的功能:将用户编写的业务逻辑代... 查看详情

大数据系列之hadoop框架

Hadoop框架中,有很多优秀的工具,帮助我们解决工作中的问题。Hadoop的位置从上图可以看出,越往右,实时性越高,越往上,涉及到算法等越多。越往上,越往右就越火…… Hadoop框架中一些简介 HDFSHDFS,(HadoopDistributedFi... 查看详情