一. 切片与MapTask并行度决定机制
现有如下的问题: 1G的数据, 启动8个MapTask, 可以提高集群的并发处理能力. 那么1K的数据, 如果也启动8个MapTask, 会提高集群性能吗? MapTask并行任务是否是越多越好呢? 哪些因素影响了MapTask并行度?
MapTask并行度决定机制
首先需要区分两个概念:
1. 数据块: 数据块(Blocks)是HDFS物理上把数据分成不同的块. 数据块是HDFS的存储数据单位
2. 数据切片: 数据切片只是在逻辑上对输入进行分片, 并不会在磁盘上将其切分进行存储. 数据切片是MapRudece程序计算输入数据的单位, 一个切片会对应启动一个MapTask.
需要注意的是, 默认情况下, 切片大小=数据块大小. 这是由HDFS的"数据本地化优化"的特性决定的, 也即在存储输入数据的节点上运行map任务, 无需集群带宽资源, 便可获得最佳性能. 如果分片跨越2个数据块,对于任何一个HDFS节点(基本不可能同时存储这2个数据块), 分片中的另外一块数据就需要通过网络传输到map任务节点, 与使用本地数据运行map任务相比, 效率则更低. 同时, 对数据进行切片时不会考虑从数据整体, 而是会逐个针对每一个文件进行单独的切片.
二. Job提交流程源码和切片源码详解
Job提交流程源码
FileInputFormat切片源码解析(input.getSplits(job))
(1) 先找到数据存储的目录
(2) 遍历处理目录下的每一个文件, 每个文件中:
a) 获取文件大小fs.sizeOf(ss.txt)
b) 计算切片大小
> minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
getFormatMinSplitSize()方法返回一个long类型的1
getMinSplitSiez(JobContext job)方法返回SPLIT_MINSIZE常量. 这个常量参数由mapred-default.xml参数配置文件决定, 默认为0.
因此, minSize返回一个1L
> maxSize = getMaxSplitSize(job)
getMaxSplitSize(JobContext context) 方法返回SPLIT_MAXSIZE常量, 这个常量参数同样由mapred-default.xml参数配置文件决定. 但默认情况下没有配置此常量的参数, 因此在方法返回参数时, 由getLong()方法提供默认值: Long.MAX_VALUE.
c) 判断这个文件是否可以进行切割. 若可进行切割, 则获取块大小
long blockSize = file.getBlockSize();
本地模式下运行时, 块大小默认为32M
d) 计算切片大小
long splitSize = this.computeSplitSize(blockSIze, minSize, maxSize);
在computeSplitSize()方法中, 返回值如下:
return Math.max(minSize, Math.min(maxSize, blockSize));
在默认情况下, 切片大小 = 块大小
e) 进行切片, 每次完成切片后, 都需要判断剩下部分的大小是否大于块的1.1倍, 如果小于1.1倍, 则就将剩下的部分划为一块切片
while(((double) bytesRemaining)/splitSize > SPLIT_SLOP)
f) 将切片信息写入一个切片规划文件中, 暂时存储在.staging文件夹
(3) 提交切片规划文件到YARN上, YARN上的MrAppMaster根据切片规划文件计算需要开启MapTask的个数
**************************************************************************************************************
如何调整切片的大小?
如果需要调大切片, 则需要将minSize调大; 如果需要更小的切片, 则需要将maxSize调小
**************************************************************************************************************
获取切片信息API
// 获取切片信息API
String name = inputSplit.getPath().getName();
// 根据文件类型获取切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();
二. TextInputFormat
在运行MapReduce程序时, 输入的文件格式多种多样, 包括: 基于行的日志文件, 二进制格式文件, 数据库表等等, 那么针对不同的数据类型, MapReduce也同样提供了很多不同种类的数据格式. FileInputFormat常见的接口实现类包括: TextInputFormat, KeyValueTextInputFormat, NLineInputFormat, CombineTextInputFormat, 以及自定义InputFormat等.
TextInputFormat
TextInputFormat是FileInputFormat的默认实现类, 按行读取每条记录, 其键值对是<LongWritable, Text>
示例:
CombineTextInputFormat
框架默认的TextInputFormat切片机制是对每个文件进行单独的切片规划, 不管文件多小, 都会产生一个单独的切片, 且都会交给一个MapTask. 如果一个任务中包含多个小文件, 那么就会产生大量的MapTask, 导致处理效率低下. 因此, 在这种情况下, 需要使用CombineTextInputFormat切片机制进行处理.
1) 虚拟存储切片最大值设置
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 4M
注意: 虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置
2) 切片机制
三. MapReduce工作流程
> 关于环形缓冲区内的元数据(Meta data), 其中包含以下几类数据:
index: 记录数据开始的位置
partition: 记录数据存放于哪个分区
keystart: 记录数据的key值开始的索引
valuestart: 记录数据的value值开始的索引
> 环形缓冲区默认大小是100M, 如果数据占到磁盘空间总大小的80%时, 则会开始反向写入. 这么做的原因是: 可以在向文件溢写的过程中, 同时开启向环形缓冲区写入的线程. 若新数据写入的速度较快, 则会等待旧数据溢写完成后继续进行写入. 如果直到缓冲区写满才进行溢写, 则需要等待溢写完成之后才可以重新对缓冲区进行写入.
缓冲区的大小可以通过参数调整: mapreduce.task.is.sort.mb 默认100M
四. Shuffle机制(混洗)
Map方法之后, Reduce方法之前的数据处理过程称之为Shuffle.
Shuffle机制
具体Shuffle过程如下:
1) MapTask收集map()方法输出的kv对, 放到内存缓冲区中.
2) 从内存缓冲区不断溢写到本地磁盘文件, 可能会溢出多个文件
3) 多个溢出文件会被合并成大的溢出文件
4) 在溢出过程即合并的过程中, 都需要调用Partitioner进行分区和针对key的排序(按字典进行快速排序)
5) ReduceTask根据自己的分区号, 从各个MapTask机器上拉取相应的结果分区数据
6) ReduceTask会抓取到来自不同MapTask同一分区的结果文件, ReduceTask会将这些文件再进行合并(Merge Sort)
7) 合并成大文件后, Shuffle的过程也就结束了, 后面就要进入ReduceTask的逻辑运算过程
Partition分区
默认的Partitioner分区代码如下:
public class HashPartitioner<K, V> extends Partitioner<K, V>{
public int getPartition(K key, V value, int numReduceTasks){
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
默认分区是根据key的hashCode对ReudceTasks个数取模得到的. 用户无法指定key存储在哪个分区.
自定义Partitioner
1)自定义Partitioner方法需要重写getPartition()方法
public class CustomPartitioner exxtends Partitioner<TExt, FlowBean> {
@Override
public int getPartition(Text key, FlowBean value, int numPartitions)
// 控制分区代码逻辑
... ...
return partition;
}
}
2) 在Job驱动中, 设置自定义Partitioner
job.setPartitionerClass(CustomPartitioner.class);
3) 自定义Partition后, 要根据自定义Partition的逻辑设置相应数量的ReduceTask
job.setNumReduceTasks(5);
分区总结
1) 如果ReduceTask的数量 > getPartition的结果数, 则会多产生几个空的输出文件part-r-000xx;
2) 如果1 < ReduceTask的数量 < getPartition的结果数, 则有一部分分区数据无法存储, 会产生Exception;
3) 如果ReduceTask的数量 = 1, 则不管MapTask端输出多少个分区文件, 最终结果都会交给这一个ReduceTask, 最终也就只会产生一个结果文件part-r-00000. 因为在源代码中, 当用户把ReduceTask的个数设置为1时, 会进入程序本身已经定义好的getPartitioner匿名内部类;
if (this.partitions > 1) {
this.partitioner = (Partitioner)ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
} else {
this.partitioner = new Partitioner<K, V>() {
public int getPartition(K key, V value, int numPartitions) {
return NewOutputCollector.this.partitions - 1;
}
};
}
4) 分区号必须从零开始, 逐一累加