MapReduce编程模型
一、MapReduce编程模型简介
二、什么是MapReduce
- MapReduce可以用除Java以外的其他语言来实现
三、MapReduce的优缺点
四、MapReduce程序设计方法
由图可知:
输入文本(可以不只一个),按行提取文本文档的单词,形成行<k1,v1>
键值对(具体形式很多,例如<行数,字符偏移>);
通过Spliting
将<k1,v1>细化为单词键值对<k2,v2>
,Map分发到各个节点,同时将<k2,v2>
归结为list(<k2,v2>);
在进行计算统计前,先用Shuffing
将相同主键k2归结在一起形成<k2, list(v2)>
;
Reduce阶段直接对<k2, list(v2)>
进行合计得到list(<k3,v3>)
并将结果返回主节点。
五、WordCount编程实例
-
测试说明
以下是测试样例:测试输入样例数据集:文本文档test1.txt, test2.txt。
文档test1.txt中的内容为:
tale as old as time
true as it can be
beauty and the beast
文档test2.txt中的内容为:
ever just the same
ever as before
beauty and the beast
词频统计代码:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
/*
* MapReduceBase类:实现Mapper和Reducer接口的基类
* Mapper接口:
* WritableComparable接口:实现WritableComparable的类可以相互比较。所有被用作key的类要实现此接口。
* LongWritable表示每一行起始偏移量
* 第一个Text是用来接受文件中的内容,
* 第二个Text是用来输出给Reduce类的key,
* IntWritable是用来输出给Reduce类的value
*/
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
/*
*LongWritable,IntWritable,Text是Hadoop中实现的用于封装Java数据类型的类,这些类实现了WritableComparable接口,
*都能够被串行化,便于在分布式环境中进行数据交换,可以视为long,int,String数据类型的替代。
*/
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();//Text实现了BinaryComparable类,可以作为key值
/*
* Mapper接口中的map方法:
* void map(K1 key, V1 value, OutputCollector<K2,V2> output, Reporter reporter)
* 映射一个单个的输入<K1,V1>对到一个中间输出<K2,V2>对
* 中间输出对不需要和输入对是相同的类型,输入对可以映射到0个或多个输出对。
* OutputCollector接口:收集Mapper和Reducer输出的<K,V>对。
* OutputCollector接口的collect(k, v)方法:增加一个(k,v)对到output
* Reporter 用于报告整个应用的运行进度
*/
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
/** 原始数据(以test1.txt为例):
*tale as old as time
true as it can be
beauty and the beast
map阶段,数据如下形式作为map的输入值:key为偏移量
<0 tale as old as time>
<21 world java hello>
<39 you me too>
*/
/**
* 解析(Spliting)后以得到键值对<K2,V2>(仅以test1.txt为例)
* 格式如下:前者是键值,后者数字是值
* tale 1
* as 1
* old 1
* as 1
* time 1
* true 1
* as 1
* it 1
* can 1
* be 1
* beauty 1
* and 1
* the 1
* beast 1
* 这些键值对作为map的输出数据
*/
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
/*
* reduce过程是对输入键值对洗牌(Shuffing)形成<K2,list(V2)>格式数据(仅以test1.txt为例):
* (tablie [1])
* (as [1,1,1])
* (old [1])
* (time [1])
* (true [1])
* (it [1])
* (can [1])
* (be [1])
* (beauty [1])
* (and [1])
* (the [1])
* (beast [1])
* 作为reduce的输入
*
*/
public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException {
//****请补全reduce对<k2, list(v2)> 进行合计得到list(<k3,v3>)过程****//
/*********begin*********/
int count=0;
for(IntWritable itr:values)
{
count+=itr.get(); /*循环统计*/
}
/*统计完成后,将结果输出.....*/
result.set(count);
context.write(key,result);
/*********end**********/
}
}
public static void main(String[] args) throws Exception {
/**
* JobConf:map/reduce的job配置类,向hadoop框架描述map-reduce执行的工作
* 构造方法:JobConf()、JobConf(Class exampleClass)、JobConf(Configuration conf)等
*/
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
/*
* 需要配置输入和输出的HDFS的文件路径参数
* 可以使用"Usage: wordcount <in> <out>"实现程序运行时动态指定输入输出
*/
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");//Job(Configuration conf,String jobName)设置job名称
job.setJarByClass(WordCount.class);//为job设置Mapper类
job.setCombinerClass(IntSumReducer.class);//为job设置Combiner类
job.setMapperClass(TokenizerMapper.class); //为设置map函数
job.setReducerClass(IntSumReducer.class); //为设置reduce函数
job.setOutputKeyClass(Text.class);//为设置输出的key类型
job.setOutputValueClass(IntWritable.class);//为设置输出的value类型
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//为map-reduce任务设置InputFormat实现类,设置输入路径
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//为map-reduce任务设置OutputFormat实现类,设置输出路径
System.exit(job.waitForCompletion(true) ? 0 : 1);//如果程序正常运行成功,程序就会正常退出
}
}
执行程序
在程序编写完成后,还需要将代码打包成jar文件并运行。用eclipse自带的打包工具
导出为wordcount.jar文件,上传至集群任一节点,并在该节点执行命令:
hadoop jar wordcount.jar hellohadoop .WordCount /user/test/input /user/test/output
Hellohadoop为程序的包名,WordCount为程序的类名,/user/test/input为HDFS存放文本的目录
(如果指定一个目录为MapReduce输人路径,则MapReduce会将该路径下的所有文件作为输入文件;如果指定一个文件,则MapReduce只会将该文件作为输人
),/user/test/output为作业的输出路径(该路径在作业运行前必须不存在
)。
命令执行后,屏幕会打出有关任务进度的日志:
22/04/22 00:53:23 INFO mapreduce .Job: Running job: job 1442977437282 0466
22/04/22 00:53:34 INFO mapreduce .Job: Job job_ 1442977437282 0466 running in uber mode : false
15/12/14 00:53:34 INFO mapreduce.Job: map 0%reduce 0%
22/04/22 00:53:46 INFO mapreduce.Job: map 100%reduce 0%
22/04/22 00:53:54 INFO mapreduce .Job: map 100%reduce 100%
...
当任务完成后,屏幕会输出相应的日志:
22/04/22 00:53:55 INFO mapreduce.Job: Job job_1442977437282_0466
completed successfully
22/04/22 00:53:55 INFO mapreduce.Job: Counters: 49
...
接下来我们查看输出目录:
hadoop fs- 1s/user/test/output
会看到:
-rw-r--r-- 2 zkpk supergroup 0 2022-04-22 00:53/user/test/ output/_ SUCCESS
一rw-r--r-- 2 zkpk supergroup 3721 2022-04-22 00:53/user/test/output3/part- r-0000
其中,SUCCESS文件是一个空的标志文件,它标志该作业成功完成,而结果则存放在part-00000。执行命令:
hadoop fs- cat/user/test/output/part- r- 0000
得到结果:
and 2
as 4
be 1
beast 2
beauty 2
before 1
can 1
ever 2
it 1
just 1
old 1
same 1
tale 1
the 3
time 1
true 1
六、Hadoop MapReduce架构
以下是hadoop1.0的架构,2.0参考(略)
Map Task 执行过程如下图 所示。由该图可知,Map Task 先将对应的split 迭代解析成一个个key/value 对,依次调用用户自定义的map() 函数进行处理,最终将临时结果存放到本地磁盘上,其中临时数据被分成若干个partition,每个partition 将被一个Reduce Task 处理。
Reduce Task 执行过程如下图所示。该过程分为三个阶段:
①从远程节点上读取MapTask 中间结果(称为“Shuffle 阶段”);
②按照key 对key/value 对进行排序(称为“Sort 阶段”);
③依次读取<key, value list>,调用用户自定义的reduce() 函数处理,并将最终结果存到HDFS 上(称为“Reduce 阶段”)。
七、MapReduce实战开发
文件内容合并去重
输入文件file1的样例如下:
20150101 x
20150102 y
20150103 x
20150104 y
20150105 z
20150106 x
输入文件file2的样例如下:
20150101 y
20150102 y
20150103 x
20150104 z
20150105 y
案例代码如下:
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Merge {
/**
* @param args
* 对A,B两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件C
*/
//在这重载map函数,直接将输入中的value复制到输出数据的key上 注意在map方法中要抛出异常:throws IOException,InterruptedException
public static class Map extends Mapper<Object, Text, Text, Text>{
/********** Begin **********/
public void map(Object key, Text value, Context content)
throws IOException, InterruptedException {
Text text1 = new Text();
Text text2 = new Text();
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
text1.set(itr.nextToken());
text2.set(itr.nextToken());
content.write(text1, text2);
}
}
/********** End **********/
}
//在这重载reduce函数,直接将输入中的key复制到输出数据的key上 注意在reduce方法上要抛出异常:throws IOException,InterruptedException
public static class Reduce extends Reducer<Text, Text, Text, Text> {
/********** Begin **********/
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Set<String> set = new TreeSet<String>();
for(Text tex : values){
set.add(tex.toString());
}
for(String tex : set){
context.write(key, new Text(tex));
}
}
/********** End **********/
}
public static void main(String[] args) throws Exception{
// TODO Auto-generated method stub
Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://localhost:9000");
Job job = Job.getInstance(conf,"Merge and duplicate removal");
job.setJarByClass(Merge.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
String inputPath = "/user/tmp/input/"; //在这里设置输入路径
String outputPath = "/user/tmp/output/"; //在这里设置输出路径
FileInputFormat.addInputPath(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
根据输入文件file1和file2合并得到的输出文件file3的样例如下:
20150101 x
20150101 y
20150102 y
20150103 x
20150104 y
20150104 z
20150105 y
20150105 z
20150106 x