MapReduce
MapReduce详细知识汇总
MapReduce定义
MapReduce是一个分布式运算程序的编程框架,
是用户开发“基于Hadoop的数据分析应用”的核心框架
MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件
整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上
MapReduce核心编程思想
1、分布式的运算程序往往需要分成至少2个阶段
2、第一个阶段的map task 并发实例,完全并行运行,互不相干
3、第二个阶段的reduce task并发实例互不相干,
但是他们的数据依赖于上一个阶段的所有maptask并发实例的输出
4、MapReduce编程模型只能包含一个map阶段和一个reduce阶段,
如果用户的业务逻辑非常复杂,那么就只能多个mapreduce程序,串行运行
MapReduce进程
1、MrAppMaster:负责整个程序的过程调度及状态协调
2、MapTask:负责Map阶段额度整个数据处理流程
3、ReduceTask:负责Reduce阶段的整个数据处理流程
MapReduce程序运行流程分析
1、上传待处理文本
2、客户端submit前,获取待处理数据的信息,然后根据参数配置,形成一个任务分配的规则
3、将数据进行切片,获得切片信息
4、根据切片信息计算出MapTask数量
5、MapTask读取待处理文件的数据
6、对数据进行逻辑计算,形成map数据
7、收集map的kv信息,加载到缓存
8、按照k分区排序后写入磁盘
9、所有MapTask任务完成后,启动相应数量的reduceTask,
并告知reduceTask处理数据范围(数据分区)
10、reduceTask获取数据,并运算
11、输出结果到文件
数据切片及MapTask并行度决定机制
1、一个job的map阶段并行度由客户端在提交job时决定
2、每一个split切片分配一个mapTask并行实例处理
3、默认情况下,切片大小=blocksize
4、切片时不考虑数据集整体,而是逐个针对每个文件单独切片
MapTask工作机制
1、Read阶段:MapTask通过用户编写的RecordReader从输入InputSplit中解析出一个个KV
2、Map阶段:该节点主要是讲解析出的KV交给用户编写map()函数处理,并产生一系列新的KV
3、Collect阶段:在用户编写map()函数中,当数据处理完成后,
一般会调用OutputCollector.collect()输出结果,在该函数内部,
它会将生成的KV分区,并写入一个环形内存缓冲区中
4、溢写阶段:当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,
生成一个临时文件,需要注意的是,将数据写入本地磁盘之前,
先要对数据进行依次本地排序,并在必要时对数据进行合并、压缩等操作
5、Combine阶段:当所有数据处理完成后,MapTask对所有临时文件进行依次合并,
以确保最终只会产生一个数据文件
Shuffle机制
Map方法之后,Reduce方法之前的数据处理过程称为Shuffle
1、MapTask方法执行完成之后,收集输出的KV对,放入内存缓冲区
2、从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
3、多个溢出文件会被合并成大的溢出文件
4、在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对Key的索引进行排序
5、ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据
6、ReduceTask会取到同一个分区的来自不同MapTask的结果文件,
ReduceTask会将这些文件再进行合并(归并排序)
7、合并成大文件后,shuffle的过程也就结束了,数据写到磁盘,
后面进入ReduceTask的逻辑运算过程
缓冲区的大小可以通过参数调整,参数:io.sort.mb 默认100M
ReduceTask工作机制
1、Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,
如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中
2、Merge阶段:在远程拷贝数据得同时,
ReduceTask启动了两个后台线程对内存和磁盘上得文件进行合并,
以防止内存使用过多或磁盘文件过多
3、Sort阶段:按照MapReduce语义,
用户编写reduce函数输入数据是按Key进行聚集得一组数据。
为了将Key相同得数据聚在一起,Hadoop采用了基于排序的策略。
由于各个MapTask已经实现对自己的处理结果进行了局部排序,
因此,ReduceTask只需要对所有数据进行一次归并排序
4、Reduce阶段:计算结果输出文件
设置ReduceTask并行度(个数)
ReduceTask的并行度同样影响整个job的执行并发度和执行效率,
但于MapTask的并发数由切片数决定不同,
ReduceTask数量的决定是可以直接手动设置:job.setNumReduceTasks(4);
ReduceTask默认值是1,所以输出文件个数为一个
排序概述
对于MapTask,它会将处理的结果展示放到一个缓冲区中,
当缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次排序,
并将这些有序数据写道磁盘上,而当数据处理完毕后,
它会对磁盘上所有文件进行一次合并,以将这些文件合并成一个大的有序文件
对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,
如果文件大小超过一定阈值,则放到磁盘上,否则放到内存中,
如果自盘上文件数目达到一定阈值,则进行一次合并以生成一个更大文件,
如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据写到磁盘上,
当所有数据拷贝完毕后,ReduceTask同意对内存和磁盘上的所有数据进行一次归并排序
排序的分类
1、部分排序:MapReduce根据输入记录的键对数据集排序,保证输出的每个文件内部有序
2、全排序:最终输出结果只有一个文件,且文件内部有序。实现方法是设置一个ReduceTask。
但该方法再处理大型文件时效率极低,因为一台机器处理所有文件,
完全丧失了MapReduce所提供的并行架构
3、辅助排序:MapReduce框架在记录到达Reducer之前按键对记录排序,
但键所对应的值并没有被排序,
一般来说,大多数MapReduce程序会避免让Reduce函数依赖于值得排序,
但是,有时也需要通过特定得方法对键进行排序和分组等以实现对值得排序
4、二次排序:在自定义排序过程中,如果compareTo中得判断条件为两个即为二次排序
压缩策略和原则
压缩式提高Hadoop运行效率的一致优化策略
通过对Mapper、Reducer运行过程的数据进行压缩,以减少磁盘IO,提高MR程序运行速度。
压缩基本原则:
1、运行密集型的job,少用压缩
2、IO密集型的job,多用压缩
压缩方式
Gzip
优点:压缩率比较高,而且压缩/解压速度也比较快
缺点:不支持Spilt
应用场景:当每个文件压缩之后再130M以内,都可以考虑用Gzip压缩格式
Bzip2
优点:支持Split,具有很高的压缩率,比Gzip还高
缺点:压缩/解压速度慢,不支持Native
应用场景:适合速度要求不高,但需要较高的压缩率的适合
Lzo压缩
优点:压缩/解压速度比较快,支持Split
缺点:压缩率比Gzip要低一些,Hadoop本身不支持,需要安装
应用场景:一个很大的文件,压缩之后大于200M以后三个的可以考虑,
而且单个问价越大,Lzo优点约明显
Snappy
优点:高速压缩速度和合理的压缩率
缺点:不支持Spilt,压缩率比Gzip要低,Hadoop不支持,需要安装
应用场景:当MapReduce作业的Map输出的数据较大的适合,
作为Map到Reduce的中间数据的压缩格式,
或者作为MapReduce作业的输出和另一个MapReduce作业的输入
MapReduce跑的慢的原因
1、计算机性能
CPU、内存、此怕健康、网络
2、I/O操作优化
1、数据倾斜
2、Map和Reduce数设置不合理
3、Map运行时间太长,导致Reduce等待过久
4、小文件过多
5、大量的不可分块的超大文件
6、Spill次数过多
7、Merge次数过多等
MapReduce优化方法
数据输入阶段
1、合并小文件:在执行MR任务前将小文件进行合并,大量的小文件会产生大量的Map任务,
增大Map任务装载次数,而任务的装载比较耗时,从而导致MR运行较慢
2、采用CombineTextInputFormat来作为输入,解决输入端大量小文件场景
MAP阶段
1、减少益写次数:通过调整io.sort.mb及sort.spill.percent参数值,
增大触发Spill的内存上线,
减少Spill次数,从而减少磁盘IO
2、减少合并次数:通过调正io.sort.factor参数,增大Merge的文件数目,减少Merge的次数
从而缩短MR处理时间
3、在Map之后,不影响业务逻辑前提下,先进行Combine处理,减少IO
Reduce阶段
1、合理设置Map和Reduce数:两个都不能设置太少,也不能设置太多,太少会导致Task等待,
延长处理时间;太多会导致Map、Reduce任务间竞争资源,造成处理超时等错误
2、设置Map、Reduce共存:调整slowstart.completedmaps参数,使Map运行到一定程度后
Reduce也开始运行,减少Reduce的等待时间
3、规避使用Reduce:因为Reduce在用于连接数据集的时候将会产生大量的网络消耗
4、合理设置Reduce端的buffer:默认情况下,数据达到一个阈值的时候,
buffer中的数据就会写入磁盘,然后Reduce会从磁盘中获得所有的数据,
也就是说,buffer和Reduce是没有直接关联的,中间多次写磁盘、读磁盘的过程。
既然由这个弊端,那就可以通过参数来配置,使得buffer中的一部分数据可以直接输送到Reduce
从而减少IO开销
IO传输
1、采用数据压缩的方式,减少网络IO的时间,安装Snappy和Lzo压缩编辑器
2、使用SequenceFile二进制文件
数据倾斜问题
1、可以通过对原始数据进行抽样得到的结果集来预设分区边界值
2、基于输出键的背景自定义分区
3、使用Combine可以大量地减少数据倾斜
4、采用Map Join ,尽量避免Reduce Join
常用的参数调优
(1)以下参数是在用户自己的MR应用程序中配置就可以生效(mapred-default.xml)
配置参数 | 参数说明 |
---|---|
mapreduce.map.memory.mb | 一个MapTask可使用的资源上限(单位:MB),默认为1024。如果MapTask实际使用的资源量超过该值,则会被强制杀死。 |
mapreduce.reduce.memory.mb | 一个ReduceTask可使用的资源上限(单位:MB),默认为1024。如果ReduceTask实际使用的资源量超过该值,则会被强制杀死。 |
mapreduce.map.cpu.vcores | 每个MapTask可使用的最多cpu core数目,默认值: 1 |
mapreduce.reduce.cpu.vcores | 每个ReduceTask可使用的最多cpu core数目,默认值: 1 |
mapreduce.reduce.shuffle.parallelcopies | 每个Reduce去Map中取数据的并行数。默认值是5 |
mapreduce.reduce.shuffle.merge.percent | Buffer中的数据达到多少比例开始写入磁盘。默认值0.66 |
mapreduce.reduce.shuffle.input.buffer.percent | Buffer大小占Reduce可用内存的比例。默认值0.7 |
mapreduce.reduce.input.buffer.percent | 指定多少比例的内存用来存放Buffer中的数据,默认值是0.0 |
(2)应该在YARN启动之前就配置在服务器的配置文件中才能生效(yarn-default.xml)
配置参数 | 参数说明 |
---|---|
yarn.scheduler.minimum-allocation-mb | 给应用程序Container分配的最小内存,默认值:1024 |
yarn.scheduler.maximum-allocation-mb | 给应用程序Container分配的最大内存,默认值:8192 |
yarn.scheduler.minimum-allocation-vcores | 每个Container申请的最小CPU核数,默认值:1 |
yarn.scheduler.maximum-allocation-vcores | 每个Container申请的最大CPU核数,默认值:32 |
yarn.nodemanager.resource.memory-mb | 给Containers分配的最大物理内存,默认值:8192 |
(3)Shuffle性能优化的关键参数,应在YARN启动之前就配置好(mapred-default.xml)
表4-14
配置参数 | 参数说明 |
---|---|
mapreduce.task.io.sort.mb | Shuffle的环形缓冲区大小,默认100m |
mapreduce.map.sort.spill.percent | 环形缓冲区溢出的阈值,默认80% |
2.容错相关参数(MapReduce性能优化)
配置参数 | 参数说明 |
---|---|
mapreduce.map.maxattempts | 每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。 |
mapreduce.reduce.maxattempts | 每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。 |
mapreduce.task.timeout | Task超时时间,经常需要设置的一个参数,该参数表达的意思为:如果一个Task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该Task处于Block状态,可能是卡住了,也许永远会卡住,为了防止因为用户程序永远Block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是600000。如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”。 |
小文件弊端
HDFS上每个文件都要在NameNode上建立一个索引,这个索引的大小约为150byte,
这样当小文件比较多的时候,就会产生很多的索引文件,
一方面会大量占用NameNode的内存空间,
另一方面就是索引文件过大使得索引速度变慢
小文件解决方案
1、在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS
2、在业务处理之前,在HDFS上使用MapReduce程序对小文件进行合并
3、在MapReduce处理时,可采用CombineTextInputFormat提高效率