简介:
MapReduce是一个基于集群的高性能并行计算平台,MapReduce是一个并行计算与运行的软件框架,MapReduce是一个并行程序设计模型与方法.
特点:
①分布可靠,对数据集的操作分发给集群中的多个节点实现可靠性,每个节点周期性返回它完成的任务和最新的状态
②封装了实现细节,基于框架API编程,面向业务展开分布式编码
③提供跨语言编程的能力
MapReduce的主要功能:
1.1数据划分和计算任务调度
1.2数据/代码互相定位
1.3系统优化
1.4出错检测和恢复
MapReduce的运行流程:
由上图可以看到MapReduce执行下来主要包含这样几个步骤:
1) 首先正式提交作业代码,并对输入数据源进行切片
2) master调度worker执行map任务
3) worker当中的map任务读取输入源切片
4) worker执行map任务,将任务输出保存在本地
5) master调度worker执行reduce任务,reduce worker读取map任务的输出文件
6) 执行reduce任务,将任务输出保存到HDFS
运行流程详解
MAP:HDFS目录的数据输入进来,然后切块,将切好的块分给不同的计算机,各计算机将块按照本地规约分区,将不同的区按照key排序,然后将任务输出保存在本地.
Reduce:将数据从远程拷贝下来,然后按照key将数据合并并处理最后输出
Map阶段由一定数量的Map Task组成,流程如下:
■ 输入数据格式解析:InputFormat
■ 输入数据处理:Mapper
■ 数据分区:Partitioner
■ 数据按照key排序
■ 本地规约:Combiner(相当于local reducer,可选)
■ 将任务输出保存在本地
Reduce阶段由一定数量的Reduce Task组成,流程如下:
■ 数据远程拷贝
■ 数据按照key排序和文件合并merge
■ 数据处理:Reducer
■ 数据输出格式:OutputFormat
通常我们把从Mapper阶段输出数据到Reduce阶段的reduce计算之间的过程称之为shuffle
MapReduce Java API应用
1、MapReduce开发流程
➢ 搭建开发环境,参考HDFS环境搭建,基本一致
➢ 基于MapReduce框架编写代码,Map、Reduce、Driver三部分组成。
➢ 编译打包,将源代码打成的包和依赖jar包打成一个包
➢ 上传至运行环境
➢ 运行hadoop jar命令,现已由yarn jar替代,建议使用新命令提交执行
具体提交命令为:
yarn jar testhdfs-jar-with-dependencies.jar com.tianliangedu.driver.WordCount /tmp/tianliangedu/input /tmp/tianliangedu/output3
➢ 通过yarn web ui查看执行过程
➢ 查看执行结果
2、WordCount代码实现
Mapper:是MapReduce计算框架中Map过程的封装
Text:Hadoop对Java String类的封装,适用于Hadoop对文本字符串的处理
IntWritable:Hadoop对Java Integer类的封装,适用于Hadoop整型的处理
Context:Hadoop环境基于上下文的操作对象,如Map中key/value的输出、分布式缓存数据、分布式参数传递等
StringTokenizer:对String对象字符串的操作类,做基于空白字符的切分操作工具类
-
2.1 Map类编写
-
package com.tianliangedu.mapper; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MyTokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { // 暂存每个传过来的词频计数,均为1,省掉重复申请空间 private final static IntWritable one = new IntWritable(1); // 暂存每个传过来的词的值,省掉重复申请空间 private Text word = new Text(); // 核心map方法的具体实现,逐个<key,value>对去处理 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // 用每行的字符串值初始化StringTokenizer StringTokenizer itr = new StringTokenizer(value.toString()); // 循环取得每个空白符分隔出来的每个元素 while (itr.hasMoreTokens()) { // 将取得出的每个元素放到word Text对象中 word.set(itr.nextToken()); // 通过context对象,将map的输出逐个输出 context.write(word, one); } } }
2.2 Reduce类编写
-
Reducer:是MapReduce计算框架中Reduce过程的封装
-
package com.tianliangedu.reducer; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; //reduce类,实现reduce函数 public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); //核心reduce方法的具体实现,逐个<key,List(v1,v2)>去处理 public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //暂存每个key组中计算总和 int sum = 0; //加强型for,依次获取迭代器中的每个元素值,即为一个一个的词频数值 for (IntWritable val : values) { //将key组中的每个词频数值sum到一起 sum += val.get(); } //将该key组sum完成的值放到result IntWritable中,使可以序列化输出 result.set(sum); //将计算结果逐条输出 context.write(key, result); } }
2.3 Driver类编写
-
➢ Configuration:与HDFS中的Configuration一致,负责参数的加载和传递 ➢ Job:作业,是对一轮MapReduce任务的抽象,即一个MapReduce的执行全过程的管理类 ➢ FileInputFormat:指定输入数据的工具类,用于指定任务的输入数据路径 ➢ FileOutputFormat:指定输出数据的工具类,用于指定任务的输出数据路径
package com.tianliangedu.driver; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import com.tianliangedu.mapper.MyTokenizerMapper; import com.tianliangedu.reducer.IntSumReducer; public class WordCountDriver { // 启动mr的driver方法 public static void main(String[] args) throws Exception { // 得到集群配置参数 Configuration conf = new Configuration(); // 设置到本次的job实例中 Job job = Job.getInstance(conf, "天亮WordCount"); // 指定本次执行的主类是WordCount job.setJarByClass(WordCountDriver.class); // 指定map类 job.setMapperClass(MyTokenizerMapper.class); // 指定combiner类,要么不指定,如果指定,一般与reducer类相同 job.setCombinerClass(IntSumReducer.class); // 指定reducer类 job.setReducerClass(IntSumReducer.class); // 指定job输出的key和value的类型,如果map和reduce输出类型不完全相同,需要重新设置map的output的key和value的class类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 指定输入数据的路径 FileInputFormat.addInputPath(job, new Path(args[0])); // 指定输出路径,并要求该输出路径一定是不存在的 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 指定job执行模式,等待任务执行完成后,提交任务的客户端才会退出! System.exit(job.waitForCompletion(true) ? 0 : 1); } }
2.4本地模拟分布式计算环境运行mapreduce
鉴于远程运行进行代码测试的复杂性,以及其它新框架均开始支持本地local环境模拟分布式计算运行, 故mapreduce从2.x开始也已经支持本地环境
具体做法请参见辅助资料集” 06-本地local环境模拟mapreduce并行计算的操作步骤”。
2.5 Maven打包
使用Maven命令,基于配置的Maven插件实现代码打包。
2.6 上传到运行环境
使用rz命令将打好的运行包上传到集群环境中。
2.7 运行WordCount程序
具体提交命令为:
具体提交命令为: yarn jar testhdfs-jar-with-dependencies.jar com.tianliangedu.driver.WordCount /tmp/tianliangedu/input /tmp/tianliangedu/output3
2.8 查看执行过程
Web访问地址为:http://cluster1.hadoop:8088/ui2/#/yarn-apps/apps
-
2.9 查看执行结果
3、标准代码实现
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//启动mr的driver类
public class WordCountDriver {
//map类,实现map函数
public static class MyTokenizerMapper extends
Mapper<Object, Text, Text, IntWritable> {
//暂存每个传过来的词频计数,均为1,省掉重复申请空间
private final static IntWritable one = new IntWritable(1);
//暂存每个传过来的词的值,省掉重复申请空间
private Text word = new Text();
//核心map方法的具体实现,逐个<key,value>对去处理
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
//用每行的字符串值初始化StringTokenizer
StringTokenizer itr = new StringTokenizer(value.toString());
//循环取得每个空白符分隔出来的每个元素
while (itr.hasMoreTokens()) {
//将取得出的每个元素放到word Text对象中
word.set(itr.nextToken());
//通过context对象,将map的输出逐个输出
context.write(word, one);
}
}
}
//reduce类,实现reduce函数
public static class IntSumReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
//核心reduce方法的具体实现,逐个<key,List(v1,v2)>去处理
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
//暂存每个key组中计算总和
int sum = 0;
//加强型for,依次获取迭代器中的每个元素值,即为一个一个的词频数值
for (IntWritable val : values) {
//将key组中的每个词频数值sum到一起
sum += val.get();
}
//将该key组sum完成的值放到result IntWritable中,使可以序列化输出
result.set(sum);
//将计算结果逐条输出
context.write(key, result);
}
}
//启动mr的driver方法
public static void main(String[] args) throws Exception {
//得到集群配置参数
Configuration conf = new Configuration();
//设置到本次的job实例中
Job job = Job.getInstance(conf, "天亮WordCount");
//通过指定相关字节码对象,找到所属的主jar包
job.setJarByClass(WordCountDriver.class);
//指定map类
job.setMapperClass(MyTokenizerMapper.class);
//指定combiner类,要么不指定,如果指定,一般与reducer类相同
job.setCombinerClass(IntSumReducer.class);
//指定reducer类
job.setReducerClass(IntSumReducer.class);
//指定job输出的key和value的类型,如果map和reduce输出类型不完全相同,需要重新设置map的output的key和value的class类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//指定输入数据的路径
FileInputFormat.addInputPath(job, new Path(args[0]));
//指定输出路径,并要求该输出路径一定是不存在的
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//指定job执行模式,等待任务执行完成后,提交任务的客户端才会退出!
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
MapReduce Shell应用
1、MapReduce的二级命令
mapred称为一级命令,直接输入mapred回车,即可查看二级命令:
2、MapReduce的三级命令
输入一级命令mapred后,再任意输入一个二级命令,即可查看三级命令:
3、MapReduce shell应用
-
查看当前正在执行的job任务
-
先提交一个WordCount任务,然后使用mapred job -list查看任务列表
-
-
终止(kill)一个任务的执行
由于某种原因,要立即终止某任务的执行,则使用mapred job -kill job-id。
构造场景:先提交一个WordCount job,然后通过kill job-id来终止任务:
-
查看一个job的日志
使用mapred shell命令,通过job-id可以查看job的工作日志。
命令格式为:mapred job -logs job-id:
MapReduce技术特征
1、向“外”横向扩展,而非向“上”纵向扩展
➢ 集群的构建完全选用价格便宜、易于扩展的低端商用服务器,而非价格昂贵不易扩展的商用服务
➢ 大规模数据处理和大规模数据存储的需要,讲求集群综合能力,而非单台机器处理能力,横向增加机器节点数据量
2、失效被认为是常态
➢ 使用大量普通服务器,节点硬件和软件出错是常态
➢ 具备多种有效的错误检测和恢复机制,在某个计算节点失效后会自动转移到别的计算节点。某个任务节点失败后其他节点能够无缝接管失效节点的计算任务
➢ 当失效节点恢复后自动无缝加入集群,不需要管理员人工进行系统配置
3、移动计算,把处理向数据迁移(数据本地性)
➢ 采用代码/数据互定位的功能,计算和数据在同一个机器节点或者是同一个机架中,发挥数据本地化特点
➢ 可避免跨机器节点或是机架传输数据,提高运行效率
4、顺序处理数据、避免随机访问数据
➢ 磁盘的顺序访问远比随机访问快得多,因此MapReduce设计为面向顺序式大规模数据的磁盘访问处理
➢ 利用集群中的大量数据存储节点同时访问数据,实现面向大数据集批处理的高吞吐量的并行处理
5、推测执行
➢ 一个作业由若干个Map任务和Reduce任务构成,整个作业完成的时间取决于最慢的任务的完成时间。由于节点硬件、软件问题,某些任务可能运行很慢
➢ 采用推测执行机制,发现某个任务的运行速度远低于任务平均速度,会为慢的任务启动一个备份任务,同时运行。哪个先运行完,采用哪个结果。
6、平滑无缝的可扩展性
➢ 可弹性的增加或减少集群计算节点来调节计算能力
➢ 计算的性能随着节点数的增加保持接近线性程度的增长
7、为应用开发隐藏系统底层细节
➢ 并行编程有很多困难,需要考虑多线程中复杂繁琐的细节,诸如分布式存储管理、数据分发、数据通信和同步、计算结果收集等细节问题。
➢ MapReduce提供了一种抽象机制将程序员与系统层细节隔离开,程序员只需关注业务,其他具体执行交由框架处理即可。