2018-08-04期mapreduce倒排索引编程案例2(jobcontroll方式)

author author     2022-12-20     164

关键词:

1、第一阶段MapReduce任务程序

package cn.itcast.bigdata.index;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/**

* 利用MapReduce实现输入多个文件中单词在每个文件中出现的次数,输出格式如下:

* hello (a.txt 2,b.txt 1,c.txt 4)

* tom (a.txt 5,b.txt 3)

* 实现方法:采用倒排索引算法并结合jobControll实现

* 本案例中所有的Mapper、Reducer、Job均采用匿名内部类实现

* @author songjq

*

*/

public class IndexStepOne

/**

* 第一阶段Mapper处理后输出数据格式为

* <k2> <v2>

* <hello:a.txt> <1>

* <hello:a.txt> <1>

* <hello:b.txt> <1>

* @author songjq

*

*/

static class IndexStepOneMapper extends Mapper<LongWritable, Text, Text, IntWritable>

/**

* 格式:<hello-->a.txt,1><helle-->b.txt,1>

*/

private Text tkey = new Text();

private IntWritable tvalue = new IntWritable(1);

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException

FileSplit inputSplit = (FileSplit) context.getInputSplit();

String fileName = inputSplit.getPath().getName();

String line = value.toString();

String[] split = line.split(" ");

for (String val : split)

tkey.set(val + "-->" + fileName);

context.write(tkey, tvalue);

/**

* 第一阶段Mapper输出数据格式为

* <k2> <v2>

* <hello:a.txt> <1>

* <hello:a.txt> <1>

* <hello:b.txt> <1>

* 第一阶段Reducer处理后输出到HDFS数据格式为

* <k3> <v3>

* <hello> <a.txt-->2>

* <hello> <b.txt-->1>

* @author songjq

*

*/

static class IndexStepOneReducer extends Reducer<Text, IntWritable, Text, LongWritable>

private LongWritable tvalue = new LongWritable(0);

@Override

protected void reduce(Text key, Iterable<IntWritable> values, Context ctx)

throws IOException, InterruptedException

long count = 0;

for(IntWritable value:values)

count++;

tvalue.set(count);

ctx.write(key, tvalue);

2、第二阶段MapReduce任务程序

package cn.itcast.bigdata.index;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

/**

* 利用MapReduce实现输入多个文件中单词在每个文件中出现的次数,输出格式如下:

* hello (a.txt 2,b.txt 1,c.txt 4)

* tom (a.txt 5,b.txt 3)

* 实现方法:采用倒排索引算法并结合jobControll实现

* 本案例中所有的Mapper、Reducer、Job均采用匿名内部类实现

* @author songjq

*

*/

public class IndexStepTwo

/**

* 第二阶段Mapper

* 第二阶段Mapper输入数据为第一阶段Reducer输出到HDFS的数据,格式为

* hello a.txt-->2

* hello b.txt-->1

* 通过第二阶段Mapper处理,输出数据格式为

* <k2> <v2>

* <hello> <a.txt-->2,b.txt-->1>

* @author songjq

*

*/

static class IndexStepTwoMapper extends Mapper<LongWritable, Text, Text, Text>

private Text tkey = new Text();

private Text tvalue = new Text();

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException

String line = value.toString();

String[] split = line.split(" ");

if(split.length>1)

String[] split2 = split[0].split("-->");

tkey.set(split2[0]);

if(split2.length>1)

tvalue.set(split2[1]+"-->"+split[1]);

context.write(tkey, tvalue);

/**

* 第二阶段Reducer

* 通过第二阶段Reducer处理后,为最终输出结果,输出格式为

* <k4> <v4>

* <hello> <(a.txt 2,b.txt 1)>

* @author songjq

*

*/

static class IndexStepTwoReducer extends Reducer<Text, Text, Text, Text>

private Text tval = new Text();

@Override

protected void reduce(Text key, Iterable<Text> values, Context ctx)

throws IOException, InterruptedException

StringBuffer sb = new StringBuffer();

for(Text value:values)

sb.append(value+" ");

tval.set(sb.toString());

ctx.write(key, tval);

3、利用jobControll来实现依赖任务的提交

package cn.itcast.bigdata.index;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.jobcontrol.JobControl;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import cn.itcast.bigdata.index.IndexStepOne.IndexStepOneMapper;

import cn.itcast.bigdata.index.IndexStepOne.IndexStepOneReducer;

import cn.itcast.bigdata.index.IndexStepTwo.IndexStepTwoMapper;

import cn.itcast.bigdata.index.IndexStepTwo.IndexStepTwoReducer;

/**

* 简单的job串联可以使用jobControll来实现 更复杂的job的调度可以考虑用shell脚本来写,或者干脆用现成的任务调度工具oozie来做

* 这里使用简单的jobControll来实现两个阶段MapReduce任务依赖提交处理

* 由于第二阶段的Mapper输入需要依赖第一阶段Reducer的输出,因此可以利用jobControll来实现第二阶段Mapper的等待,直到

* 第一阶段Reducer输出后,第二阶段的job才开始提交处理

* 核心方法:

* controlledJob2.addDependingJob(controlledJob1);

* @author songjq

*

*/

public class OnceSubmitClient

public static void main(String[] args) throws Exception

// 构造第一阶段的基本job对象job1

Configuration conf1 = new Configuration();

Job job1 = Job.getInstance(conf1, "inexStepOne");

job1.setJarByClass(OnceSubmitClient.class);

job1.setMapperClass(IndexStepOneMapper.class);

job1.setReducerClass(IndexStepOneReducer.class);

job1.setMapOutputKeyClass(Text.class);

job1.setMapOutputValueClass(IntWritable.class);

job1.setOutputKeyClass(Text.class);

job1.setOutputValueClass(LongWritable.class);

FileInputFormat.setInputPaths(job1, new Path(args[0]));

FileOutputFormat.setOutputPath(job1, new Path(args[1]));

// 构造第二阶段的基本job对象job2

Configuration conf2 = new Configuration();

Job job2 = Job.getInstance(conf2, "inexStepTwo");

job2.setJarByClass(OnceSubmitClient.class);

job2.setMapperClass(IndexStepTwoMapper.class);

job2.setReducerClass(IndexStepTwoReducer.class);

job2.setMapOutputKeyClass(Text.class);

job2.setMapOutputValueClass(Text.class);

job2.setOutputKeyClass(Text.class);

job2.setOutputValueClass(Text.class);

// 第二个job的输出是第一个job的输入

FileInputFormat.setInputPaths(job2, new Path(args[1]));

FileOutputFormat.setOutputPath(job2, new Path(args[2]));

// ControlledJob是基本的job的封装

ControlledJob controlledJob1 = new ControlledJob(conf1);

// 将job1封装到controlledJob1中去

controlledJob1.setJob(job1);

ControlledJob controlledJob2 = new ControlledJob(conf2);

// 将job2封装到controlledJob2中去

controlledJob2.setJob(job2);

// 先构造一个job控制器

JobControl jobControl = new JobControl("index");

// 指定两个job之间的依赖关系

controlledJob2.addDependingJob(controlledJob1);

// 向job控制器中添加job

jobControl.addJob(controlledJob1);

jobControl.addJob(controlledJob2);

// 创建一个线程去启动jobControl

Thread thread = new Thread(jobControl);

thread.start();

// 如果job没有运行完,主线程就等等

while (!jobControl.allFinished())

thread.sleep(500);

int succeedSize = jobControl.getSuccessfulJobList().size();

//0正常退出 1异常退出

System.exit(succeedSize == 2 ? 0 : 1);


mapreduce高级案例倒排索引(代码片段)

理解【倒排索引】的功能熟悉mapreduce中的combine功能根据需求编码实现【倒排索引】的功能,旨在理解mapreduce的功能。一:理解【倒排索引】的功能1.1倒排索引:由于不是根据文档来确定文档所包含的内容,而是进行相反的操作,... 查看详情

mapreduce编程之倒排索引

任务要求://输入文件格式186616294961101310770244611012345671202345678120987654110289783927418661629496//输出文件格式格式11018661629496|13107702446|987654|18661629496|13107702446|987654|1201234567|2345678|1234567|2345678 查看详情

mapreduce编程倒排索引构建

一、倒排索引简介倒排索引(英语:Invertedindex),也常被称为反向索引、置入档案或反向档案,是一种索引方法,被用来存储在全文搜索下某个单词在一个文档或者一组文档中的存储位置的映射。它是文档检索系统中最常用的... 查看详情

mapreduce编程倒排索引构建

一、倒排索引简单介绍倒排索引(英语:Invertedindex),也常被称为反向索引、置入档案或反向档案,是一种索引方法,被用来存储在全文搜索下某个单词在一个文档或者一组文档中的存储位置的映射。它是文档检索系统中最经... 查看详情

如何设计一个特殊的 MapReduce 倒排索引?

】如何设计一个特殊的MapReduce倒排索引?【英文标题】:HowtodesignaspecialMapReduceinvertedindex?【发布时间】:2015-11-1718:46:44【问题描述】:在这种情况下,我有很多日志。每条日志包含时间、ip、url、内容等。问题1:我要做的是确定... 查看详情

大数据讲课笔记5.5mapreduce经典案例——倒排索引(代码片段)

...现倒排索引一、导入新课通过上节课的学习,我们对MapReduce运行模 查看详情

第3节mapreduce高级:4倒排索引的建立(代码片段)

倒排索引建立需求分析需求:有大量的文本(文档、网页),需要建立搜索索引最终实现的结果就是哪个单词在哪个文章当中出现了多少次 思路分析:首选将文档的内容全部读取出来,加上文档的名字作为key,文档的value为1... 查看详情

自定义combiner实现文件倒排索引

...op.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.InputSplit;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop... 查看详情

海量数据处理-字典树和倒排索引(代码片段)

...字典树和倒排索引海量数据处理我们已经提到过分而治之mapreduce,和排序相关的专题,今天我们来看一下之前也有简单介绍过的字典树和倒排索引。倒排索引倒排索引是一种索引方法,常用在搜索引擎中,这个数... 查看详情

正排索引倒排索引

参考:正排索引和倒排索引倒排索引为什么叫倒排索引?倒排索引、正排索引系列一invertedindex,称为反向索引更为妥当。简单记为:正排索引:文档—》单词倒排索引:单词—》文档倒排索引有着广泛的... 查看详情

elasticsearch倒排索引的理解(代码片段)

文章目录倒排索引1.正向索引2.倒排索引3.正向和倒排区分4.正向和倒排优缺点:倒排索引首先倒排索引的概念是这样的:倒排索引源于实际应用中需要根据属性的值来查找记录。这种索引表中的每一项都包括一个属性值和... 查看详情

elasticsearch倒排索引的理解(代码片段)

文章目录倒排索引1.正向索引2.倒排索引3.正向和倒排区分4.正向和倒排优缺点:倒排索引首先倒排索引的概念是这样的:倒排索引源于实际应用中需要根据属性的值来查找记录。这种索引表中的每一项都包括一个属性值和... 查看详情

elasticsearch框架学习倒排索引详解(代码片段)

定义:倒排索引源于实际应用中需要根据属性的值来查找记录这种索引表中的每一项都包括一个属性值和具有该属性值的各记录的地址由于不是由记录来确定属性值,而是由属性值来确定记录的位置,因而称为倒排索引(invertedind... 查看详情

elasticsearch倒排索引

...个基于Lucene实现的分布式全文检索引擎,其实Elasticsearch倒排索引就是Lucene的倒排索引。数据检索是ES的一项核心功能,它的底层实现也是离不开倒排索引的,通过倒排索引技术可以提高数据的检索效率,理解倒排索引的原理很重... 查看详情

什么是倒排索引?

创建倒排索引,分为以下几步:1)创建文档列表:lucene首先对原始文档数据进行编号(DocID),形成列表,就是一个文档列表2)创建倒排索引列表然后对文档中数据进行分词,得 查看详情

什么是倒排索引?

创建倒排索引,分为以下几步:1)创建文档列表:lucene首先对原始文档数据进行编号(DocID),形成列表,就是一个文档列表2)创建倒排索引列表然后对文档中数据进行分词,得 查看详情

elastic之倒排索引

...关键词的映射过程(正向索引)缺点:费时便利全部文档倒排反向建立索引:  关键词--》文档的映射反向到倒排索引:将索引的关键词出现的文档的位置和出现频率通过文档的形式记录下来,以达到跟快更高速的查询检索&nbs... 查看详情

倒排索引

倒排索引(invertedindex)常被成为反向索引、置入文档和反向档案,是一种索引方法,被用来存储在全文搜索下某个单词在一个文档或者一组文档中的存储位置的映射。是文档检索系统中最常用的数据结构。 例如:下面是要... 查看详情