关键词:
在企业开发中,Hadoop框架自带的
InputFormat
类型不能满足所有应用场景,需要自定义InputFormat来解决实际问题。
自定义InputFormat步骤如下:
- (1)自定义一个类继承
FilelnputFormat
。 - (2)自定义一个类继承
RecordReader
,实现一次读取一个完整文件,将文件名为key,文件内容为value。 - (3)在输出时使用
SequenceFileOutPutFormat
输出合并文件。
无论HDFS还是MapReduce,在处理小文件时效率都非常低,但又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。可以自定义InputFormat实现小文件的合并。
1. 需求
将多个小文件合并成一个SequenceFile
文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value(bytes) 对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value。
(1)输入数据
(2)期望输出文件格式
2. 需求分析
-
自定义一个类继承
FileInputFormat
(1)重写isSplitable()
方法,返回false
,让文件不可切,整个文件作为1片。
(2)重写createRecordReader(),返回自定义的RecordReader对象 -
自定义一个类继承
RecordReader
在RecordReader中,nextKeyValue
()是最重要的方法,返回当前读取到的key-value
,如果读到返回true
,调用Mapper的map()来处理,否则返回false
3. 编写程序
MyInputFormat.java
/*
* 1. 改变切片策略,一个文件固定切1片,通过指定文件不可切
*
* 2. 提供RR ,这个RR读取切片的文件名作为key,读取切片的内容封装到bytes作为value
*/
public class MyInputFormat extends FileInputFormat
@Override
public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException
return new MyRecordReader();
@Override
protected boolean isSplitable(JobContext context, Path filename)
return false;
MyRecordReader.java
/*
* RecordReader从MapTask处理的当前切片中读取数据
*
* XXXContext都是Job的上下文,通过XXXContext可以获取Job的配置Configuration对象
*/
public class MyRecordReader extends RecordReader
private Text key;
private BytesWritable value;
private String filename;
private int length;
private FileSystem fs;
private Path path;
private FSDataInputStream is;
private boolean flag=true;
// MyRecordReader在创建后,在进入Mapper的run()之前,自动调用
// 文件的所有内容设置为1个切片,切片的长度等于文件的长度
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException
FileSplit fileSplit=(FileSplit) split;
filename=fileSplit.getPath().getName();
length=(int) fileSplit.getLength();
path=fileSplit.getPath();
//获取当前Job的配置对象
Configuration conf = context.getConfiguration();
//获取当前Job使用的文件系统
fs=FileSystem.get(conf);
is = fs.open(path);
// 读取一组输入的key-value,读到返回true,否则返回false
// 将文件的名称封装为key,将文件的内容封装为BytesWritable类型的value,返回true
// 第二次调用nextKeyValue()返回false
@Override
public boolean nextKeyValue() throws IOException, InterruptedException
if (flag)
//实例化对象
if (key==null)
key=new Text();
if (value==null)
value=new BytesWritable();
//赋值
//将文件名封装到key中
key.set(filename);
// 将文件的内容读取到BytesWritable中
byte [] content=new byte[length];
IOUtils.readFully(is, content, 0, length);
value.set(content, 0, length);
flag=false;
return true;
return false;
//返回当前读取到的key-value中的key
@Override
public Object getCurrentKey() throws IOException, InterruptedException
return key;
//返回当前读取到的key-value中的value
@Override
public Object getCurrentValue() throws IOException, InterruptedException
return value;
//返回读取切片的进度
@Override
public float getProgress() throws IOException, InterruptedException
return 0;
// 在Mapper的输入关闭时调用,清理工作
@Override
public void close() throws IOException
if (is != null)
IOUtils.closeStream(is);
if (fs !=null)
fs.close();
CustomIFMapper.java
public class CustomIFMapper extends Mapper<Text, BytesWritable, Text, BytesWritable>
CustomIFReducer.java
public class CustomIFReducer extends Reducer<Text, BytesWritable, Text, BytesWritable>
CustomIFDriver.java
public class CustomIFDriver
public static void main(String[] args) throws Exception
Path inputPath=new Path("e:/mrinput/custom");
Path outputPath=new Path("e:/mroutput/custom");
//作为整个Job的配置
Configuration conf = new Configuration();
//保证输出目录不存在
FileSystem fs=FileSystem.get(conf);
if (fs.exists(outputPath))
fs.delete(outputPath, true);
// 创建Job
Job job = Job.getInstance(conf);
// 设置Job运行的Mapper,Reducer类型,Mapper,Reducer输出的key-value类型
job.setMapperClass(CustomIFMapper.class);
job.setReducerClass(CustomIFReducer.class);
// Job需要根据Mapper和Reducer输出的Key-value类型准备序列化器,通过序列化器对输出的key-value进行序列化和反序列化
// 如果Mapper和Reducer输出的Key-value类型一致,直接设置Job最终的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
// 设置输入目录和输出目录
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
// 设置输入和输出格式
job.setInputFormatClass(MyInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
// ③运行Job
job.waitForCompletion(true);
mapreduce之自定义partitioner
概述Map方法之后,数据首先进入到分区方法,把数据标记好分区,然后把数据发送到环形缓冲区;reduce的并行数量以及输出文件的个数,由分区数决定. 默认分区是根据key的hashCode对ReduceTasks个数取模得到.自定义步骤1.自定义类继承Pa... 查看详情
自定义inputformat和outputformat
1. 自定义inputFormat1.1需求无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免面临处理大量小文件的场景,此时,就需要有相应解决方案 1.2分析小文件的优化无非以下几种方式:1、 在数据采集的时候,就将... 查看详情
mapreduce之自定义combiner(代码片段)
概述Combinar继承了`Reducer`, 可选过程,在map端的实现分组(是在map端运行的reduce),减小网络IO传输; 使用Combiner需要满足的条件Combiner不能影响最终计算结果例如求平均值就不能使用Combiner输出k-v类型必须与map输出一致自定义过... 查看详情
mapreduce流程
一、InputSplitMapReduce框架调用job.setINputFormatClass定义的InputFormat读取数据InputFormat接口里包括两个方法:getSplits()和createRecordReader(),这两个方法分别用来定义输入分片和读取分片的方法。 1、计算分片调用InputFormat的getSplits()负... 查看详情
mapreduce框架原理inputformat数据输入(代码片段)
MapReduce框架原理-InputFormat数据输入1.切片与MapTask并行度决定机制1)问题引出2)MapTask并行度决定机制2.Job提交流程源码和切片源码详解1)Job提交流程源码详解2)FileInputFormat切片源码解析(input.getSplits(job))... 查看详情
PIG 不读取我的自定义 InputFormat
】PIG不读取我的自定义InputFormat【英文标题】:PIGdoesn\'treadmycustomInputFormat【发布时间】:2012-12-1823:00:43【问题描述】:我有一个自定义的MyInputFormat来处理multi-linedinputs的记录边界问题。但是当我将MyInputFormat放入我的UDF加载函数... 查看详情
mapreduce应用开发
...list(k3,v3)MapReduce中的常用设置:输入数据类型由输入格式(InputFormat)设置map输出的key类型通过setMapOutputKeyClass设置2、MapReduce输入格式:MapReduce处理的输入文件一般存储在HDFS上,这些输入文件的格式多种多样,比如基于行的日志文件... 查看详情
mapreduce的输入格式(代码片段)
1.InputFormat接口 InputFormat接口包含了两个抽象方法:getSplits()和creatRecordReader()。InputFormat决定了Hadoop如何对文件进行分片和接收,它能够从一个job中得到一个split集合(InputSplit[]),然后再为这个split集合配上一个合适的RecordRea... 查看详情
mapreduce的典型编程场景3(代码片段)
1.自定义InputFormat–数据分类输出 需求:小文件的合并 分析: -在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS -在业务处理之前,在HDFS上使用MapReduce程序对小文件进行合并 &... 查看详情
十mapreduce--inputformat以及recordreader抽象类(代码片段)
...切片是如何转为KV给map处理的。?这就涉及到两个抽象类,InputFormat以及RecordReader。具体为什么是这两个抽象类,请看之前input的源码分析1、InputFormatpublicabstractclassInputFormat<K,V>publicInputFormat()publicabstractList<InputSplit>getSplits(J... 查看详情
大数据技术之hadoop(mapreduce)
...义MapReduce核心思想WordCount案例Hadoop序列化MapReduce框架原理InputFormat数据输入MapReduce定义MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。MapReduce核心功能是将用户编写的业务逻... 查看详情
mapreduce框架原理(代码片段)
MapReduce框架原理之InputFormat数据输入(控制)文章目录MapReduce框架原理之InputFormat数据输入(控制)1切片与MapTask并行度决定机制1.1MapTask并行度决定机制2Job提交和切片流程源码详解2.1Job提交流程2.2提交流程图解2.3切片规则3FileInputFormat... 查看详情
十mapreduce--自定义input输入(代码片段)
...之输入原理”中说到实现定义输入的方法,其实就是继承InputFormat以及RecordReader实现其中的方法。下面例子讲解操作。1、需求将多个文件合并成一个大文件(有点类似于combineInputFormat),并输出。大文件中包括小文件所在的路径... 查看详情
hadoop-2.4.1学习之inputformat及源代码分析
...要指定作业输入的格式(未指定时默认的输入格式为TextInputFormat)。在Hadoop中使用InputFormat类或InputFormat接口描述MapReduce作业输入的规范或者格式,之所以说InputFormat类或InputFormat接口是因为在旧的API(hadoop-0.x)中InputFormat被定义... 查看详情
mapreduce输入输出类型格式及实例
...2、一个分片不是数据本身,而是可分片数据的引用。3、InputFormat接口负责生成分片。InputFormat负责处理MR的输入部分,有三个作用:验证作业的输入是否规范。把输入文件切分成InputSp 查看详情
hadoop-2.4.1学习之inputformat及源代码分析
...要指定作业输入的格式(未指定时默认的输入格式为TextInputFormat)。在Hadoop中使用InputFormat类或InputFormat接口描述MapReduce作业输入的规范或者格式,之所以说InputFormat类或InputFormat接口是因为在旧的API(hadoop-0.x)中InputForm 查看详情
7.3mapreduce工作流程
...更详细地说明。为方便,假设集群只包含两个节点首先由InputFormat模块把文件从HDFS中读取出来,并进行格式验证。然后InputFormat还要把数据切分成多个分片split——注意这种切分只是一种逻辑定义,物理上并不发生移动。由记录... 查看详情
转载自定义inputformat
转自:http://blog.csdn.net/jackydai987/article/details/6226108系统默认的TextInputFormat.Java [java] viewplain copy public class TextInputFormat extends FileInputFormat& 查看详情