MapReduce概念
1.MapReduce是一个基于集群的计算平台
它允许用市场上普通的商用服务器构成一个包含数十、数百至数千个节点的分布和并行计算集群。
2.MapReduce是一个简化分布式编程的计算框架
它提供了一个庞大但设计精良的并行计算软件框架,能自动完成计算任务的并行化处理,自动划分计算数据和计算任务,在集群节点上自动分配和执行任务以及收集计算结果,将数据分布存储、数据通信、容错处理等并行计算涉及到的很多系统底层的复杂细节交由系统负责处理,大大减少了软件开发人员的负担。
3.MapReduce是一个将分布式计算抽象为Map和Reduce两个阶段的编程模型
它借助于函数式程序设计语言Lisp的设计思想,提供了一种简便的并行程序设计方法,用Map和Reduce两个函数编程实现基本的并行计算任务,提供了抽象的操作和并行编程接口,以简单方便地完成大规模数据的编程和计算处理。
MapReduce的执行流程
MapReduce4个独立的实体
1. JobClient:
运行于client node,负责将MapReduce程序打成Jar包存储到HDFS,并把Jar包的路径提交到Jobtracker,由Jobtracker进行任务的分配和监控。
2. JobTracker:
运行于name node,负责接收JobClient提交的Job,调度Job的每一个子task运行于TaskTracker上,并监控它们,如果发现有失败的task就重新运行它。
3. TaskTracker:
运行于data node,负责主动与JobTracker通信,接收作业,并直接执行每一个任务。
4. HDFS:
用来与其它实体间共享作业文件。
MapReduce执行流程(看图理解)
1.RPC请求:
JobClient通过RPC协议向JobTracker请求一个新应用的ID,用于MapReduce作业的ID
2.获取新的jobID:
JobTracker检查作业的输出说明。例如,如果没有指定输出目录或目录已存在,作业就不提交,错误抛回给JobClient,否则,返回新的作业ID给JobClient
3.复制job资源:
JobClient将作业所需的资源(包括作业JAR文件、配置文件和计算所得得输入分片)复制到以作业ID命名的HDFS文件夹中
4.提交任务:
JobClient通过submitApplication()提交作业
5.初始化job:
JobTracker收到调用它的submitApplication()消息后,进行任务初始化
6.获取job资源并分配任务:
JobTracker读取HDFS上的要处理的文件,开始计算输入分片,每一个分片对应一个TaskTracker
7.heartbeat(返回任务状态):
TaskTracker通过心跳机制领取任务(任务的描述信息)
8.获取job资源:
TaskTracker读取HDFS上的作业资源(JAR包、配置文件等)
9.启动任务:
TaskTracker启动一个java child子进程,用来执行具体的任务(MapperTask或ReducerTask)
10.计算结果写入HDFS:
TaskTracker将Reduce结果写入到HDFS当中
MapReduce工作原理
Map任务处理
1.读取HDFS中的文件。每一行解析成一个<k,v>。每一个键值对调用一次map函数
2.重写map(),对第一步产生的<k,v>进行处理,转换为新的<k,v>输出
3.对输出的key、value进行分区
4.对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中
5.对分组后的数据进行归约
Reduce任务处理
1.多个map任务的输出,按照不同的分区,通过网络复制到不同的reduce节点上
2.对多个map的输出进行合并、排序
3.重写reduce函数实现自己的逻辑,对输入的key、value处理,转换成新的key、value输出
4.把reduce的输出保存到文件中
示例(统计某txt文件中的每个单词的个数):
依赖:
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
Map层:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WcMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split("\\s+");
for (String word:words){
context.write(new Text(word),one);
}
}
}
Reduce层:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WcReduce extends Reducer<Text, IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int num = 0;
for (IntWritable n:values){
num+=n.get();//获取InWritable中的基本类型的数据
}
context.write(key,new IntWritable(num));
}
}
测试类:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WcApp {
public static void main(String[] args) throws Exception {
//产生一个job任务
Job job = Job.getInstance(new Configuration(),"wc");
//设置当前类末jar包的主引导类
job.setJarByClass(WcApp.class);
//设置读取文件的位置
FileInputFormat.setInputPaths(job,new Path("C:/新建文本文档.txt"));
//设置结果存放路径文件夹的位置,一定要是空的,否则输出不了
FileOutputFormat.setOutputPath(job,new Path("C:/wcres"));
//把自己的mapper嵌入到mapReduce框架中
job.setMapperClass(WcMapper.class);
//把自己的Mapper类的输出类型通知框架
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//把自己的Reduce嵌入到MapReduce框架中
job.setReducerClass(WcReduce.class);
//把自己的Reduce的输出类型通知MapReduce框架
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//启动框架运行
job.waitForCompletion(true);
}
}