0
点赞
收藏
分享

微信扫一扫

Hadoop MapReduce编程模型

江南北 2022-05-03 阅读 81

MapReduce编程模型

一、MapReduce编程模型简介

二、什么是MapReduce

  • MapReduce可以用除Java以外的其他语言来实现

三、MapReduce的优缺点

四、MapReduce程序设计方法

由图可知:
输入文本(可以不只一个),按行提取文本文档的单词,形成行<k1,v1>键值对(具体形式很多,例如<行数,字符偏移>);
通过Spliting将<k1,v1>细化为单词键值对<k2,v2>,Map分发到各个节点,同时将<k2,v2>归结为list(<k2,v2>);
在进行计算统计前,先用Shuffing将相同主键k2归结在一起形成<k2, list(v2)>
Reduce阶段直接对<k2, list(v2)> 进行合计得到list(<k3,v3>)并将结果返回主节点。

五、WordCount编程实例

  • 测试说明
    以下是测试样例:

    测试输入样例数据集:文本文档test1.txt, test2.txt。

    文档test1.txt中的内容为:

tale as old as time  
true as it can be  
beauty and the beast

文档test2.txt中的内容为:

ever just the same  
ever as before  
beauty and the beast  

词频统计代码:

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
 
public class WordCount {
	/*
 	* MapReduceBase类:实现Mapper和Reducer接口的基类    
 	* Mapper接口: 
 	* WritableComparable接口:实现WritableComparable的类可以相互比较。所有被用作key的类要实现此接口。
 	* LongWritable表示每一行起始偏移量  
    * 第一个Text是用来接受文件中的内容,  
    * 第二个Text是用来输出给Reduce类的key,  
    * IntWritable是用来输出给Reduce类的value    
 	*/ 
 	
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
 	/*
	*LongWritable,IntWritable,Text是Hadoop中实现的用于封装Java数据类型的类,这些类实现了WritableComparable接口,
	*都能够被串行化,便于在分布式环境中进行数据交换,可以视为long,int,String数据类型的替代。
	*/
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();//Text实现了BinaryComparable类,可以作为key值

	/*
	* Mapper接口中的map方法: 
	* void map(K1 key, V1 value, OutputCollector<K2,V2> output, Reporter reporter)
	* 映射一个单个的输入<K1,V1>对到一个中间输出<K2,V2>对 
	* 中间输出对不需要和输入对是相同的类型,输入对可以映射到0个或多个输出对。 	
	* OutputCollector接口:收集Mapper和Reducer输出的<K,V>对。 
	* OutputCollector接口的collect(k, v)方法:增加一个(k,v)对到output 
	* Reporter 用于报告整个应用的运行进度
     */  

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
	    /** 原始数据(以test1.txt为例):
   	    *tale as old as time
		true as it can be
		beauty and the beast
		map阶段,数据如下形式作为map的输入值:key为偏移量
			<0  tale as old as time>
 			<21 world java hello>
 			<39 you me too>	
    	*/
    	 
    	/**
	    * 解析(Spliting)后以得到键值对<K2,V2>(仅以test1.txt为例)
        * 格式如下:前者是键值,后者数字是值
        * tale 1
        * as 1
        * old 1
        * as 1
        * time 1
        * true 1
        * as 1
        * it 1
        * can 1
        * be 1
        * beauty 1
        * and 1
        * the 1
        * beast 1
        * 这些键值对作为map的输出数据
        */
	    StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            context.write(word, one);
        }
      }
    }
  
 
    public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
        private IntWritable result = new IntWritable();
	 /*
	 * reduce过程是对输入键值对洗牌(Shuffing)形成<K2,list(V2)>格式数据(仅以test1.txt为例):
	 * (tablie [1])
	 * (as [1,1,1])
	 * (old [1])
	 * (time [1])
	 * (true [1])
	 * (it [1])
	 * (can [1])
	 * (be [1])
	 * (beauty [1])
	 * (and [1])
	 * (the [1])
	 * (beast [1])
	 * 作为reduce的输入
	 * 
	 */
        public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException {
        
            //****请补全reduce对<k2, list(v2)> 进行合计得到list(<k3,v3>)过程****//
            /*********begin*********/
            int count=0;  
			for(IntWritable itr:values)  
			{  
			count+=itr.get();  /*循环统计*/  
			}  
			/*统计完成后,将结果输出.....*/  
			result.set(count);
			context.write(key,result);
            /*********end**********/
        }
    }
 
    public static void main(String[] args) throws Exception {
	  /**
       * JobConf:map/reduce的job配置类,向hadoop框架描述map-reduce执行的工作 
       * 构造方法:JobConf()、JobConf(Class exampleClass)、JobConf(Configuration conf)等 
       */  
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        /*
        * 需要配置输入和输出的HDFS的文件路径参数
        * 可以使用"Usage: wordcount <in> <out>"实现程序运行时动态指定输入输出
        */
        if (otherArgs.length != 2) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }
        Job job = new Job(conf, "word count");//Job(Configuration conf,String jobName)设置job名称
        job.setJarByClass(WordCount.class);//为job设置Mapper类 
        job.setCombinerClass(IntSumReducer.class);//为job设置Combiner类

       
        job.setMapperClass(TokenizerMapper.class); //为设置map函数
        job.setReducerClass(IntSumReducer.class);  //为设置reduce函数
        job.setOutputKeyClass(Text.class);//为设置输出的key类型
        job.setOutputValueClass(IntWritable.class);//为设置输出的value类型
        
       

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//为map-reduce任务设置InputFormat实现类,设置输入路径
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//为map-reduce任务设置OutputFormat实现类,设置输出路径
        System.exit(job.waitForCompletion(true) ? 0 : 1);//如果程序正常运行成功,程序就会正常退出
    }
}

执行程序

在程序编写完成后,还需要将代码打包成jar文件并运行。用eclipse自带的打包工具
导出为wordcount.jar文件,上传至集群任一节点,并在该节点执行命令:

hadoop jar wordcount.jar hellohadoop .WordCount /user/test/input /user/test/output

Hellohadoop为程序的包名,WordCount为程序的类名,/user/test/input为HDFS存放文本的目录(如果指定一个目录为MapReduce输人路径,则MapReduce会将该路径下的所有文件作为输入文件;如果指定一个文件,则MapReduce只会将该文件作为输人),/user/test/output为作业的输出路径(该路径在作业运行前必须不存在)。

命令执行后,屏幕会打出有关任务进度的日志:

22/04/22 00:53:23 INFO mapreduce .Job: Running job: job 1442977437282 0466
22/04/22 00:53:34 INFO mapreduce .Job: Job job_ 1442977437282 0466 running in uber mode : false
15/12/14 00:53:34 INFO mapreduce.Job: map 0%reduce 0%

22/04/22 00:53:46 INFO mapreduce.Job: map 100%reduce 0%
22/04/22 00:53:54 INFO mapreduce .Job: map 100%reduce 100%
...

当任务完成后,屏幕会输出相应的日志:

22/04/22 00:53:55 INFO mapreduce.Job: Job job_1442977437282_0466
completed successfully
22/04/22 00:53:55 INFO mapreduce.Job: Counters: 49
...

接下来我们查看输出目录:

hadoop fs- 1s/user/test/output

会看到:

-rw-r--r-- 2 zkpk supergroup 0 2022-04-22 00:53/user/test/ output/_ SUCCESS
一rw-r--r-- 2 zkpk supergroup 3721 2022-04-22 00:53/user/test/output3/part- r-0000

其中,SUCCESS文件是一个空的标志文件,它标志该作业成功完成,而结果则存放在part-00000。执行命令:

hadoop fs- cat/user/test/output/part- r- 0000

得到结果:

and 2
as 4
be 1
beast 2
beauty 2
before 1
can 1
ever 2
it 1
just 1
old 1
same 1
tale 1
the 3
time 1
true 1

六、Hadoop MapReduce架构

以下是hadoop1.0的架构,2.0参考(略)
在这里插入图片描述

Map Task 执行过程如下图 所示。由该图可知,Map Task 先将对应的split 迭代解析成一个个key/value 对,依次调用用户自定义的map() 函数进行处理,最终将临时结果存放到本地磁盘上,其中临时数据被分成若干个partition,每个partition 将被一个Reduce Task 处理。
在这里插入图片描述
Reduce Task 执行过程如下图所示。该过程分为三个阶段:
①从远程节点上读取MapTask 中间结果(称为“Shuffle 阶段”);
②按照key 对key/value 对进行排序(称为“Sort 阶段”);
③依次读取<key, value list>,调用用户自定义的reduce() 函数处理,并将最终结果存到HDFS 上(称为“Reduce 阶段”)。

七、MapReduce实战开发

文件内容合并去重

输入文件file1的样例如下:
20150101 x
20150102 y
20150103 x
20150104 y
20150105 z
20150106 x

输入文件file2的样例如下:
20150101 y
20150102 y
20150103 x
20150104 z
20150105 y

案例代码如下:

import java.io.IOException;

import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Merge {

	/**
	 * @param args
	 * 对A,B两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件C
	 */
	//在这重载map函数,直接将输入中的value复制到输出数据的key上 注意在map方法中要抛出异常:throws IOException,InterruptedException
	public static class Map  extends Mapper<Object, Text, Text, Text>{
	
    /********** Begin **********/

        public void map(Object key, Text value, Context content) 
            throws IOException, InterruptedException {  
            Text text1 = new Text();
            Text text2 = new Text();
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                text1.set(itr.nextToken());
                text2.set(itr.nextToken());
                content.write(text1, text2);
            }
        }  
	/********** End **********/
	} 
		
	//在这重载reduce函数,直接将输入中的key复制到输出数据的key上  注意在reduce方法上要抛出异常:throws IOException,InterruptedException
	public static class  Reduce extends Reducer<Text, Text, Text, Text> {
    /********** Begin **********/
        
        public void reduce(Text key, Iterable<Text> values, Context context) 
            throws IOException, InterruptedException {
            Set<String> set = new TreeSet<String>();
            for(Text tex : values){
                set.add(tex.toString());
            }
            for(String tex : set){
                context.write(key, new Text(tex));
            }
        }  
    
	/********** End **********/

	}
	
	public static void main(String[] args) throws Exception{

		// TODO Auto-generated method stub
		Configuration conf = new Configuration();
		conf.set("fs.default.name","hdfs://localhost:9000");
		
		Job job = Job.getInstance(conf,"Merge and duplicate removal");
		job.setJarByClass(Merge.class);
		job.setMapperClass(Map.class);
		job.setCombinerClass(Reduce.class);
		job.setReducerClass(Reduce.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		String inputPath = "/user/tmp/input/";  //在这里设置输入路径
		String outputPath = "/user/tmp/output/";  //在这里设置输出路径

		FileInputFormat.addInputPath(job, new Path(inputPath));
		FileOutputFormat.setOutputPath(job, new Path(outputPath));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

根据输入文件file1和file2合并得到的输出文件file3的样例如下:

20150101 x
20150101 y
20150102 y
20150103 x
20150104 y
20150104 z
20150105 y
20150105 z
20150106 x

举报

相关推荐

0 条评论