0
点赞
收藏
分享

微信扫一扫

Hadoop学习笔记: MapReduce(2)

小_北_爸 2022-02-07 阅读 76

一. 切片与MapTask并行度决定机制

现有如下的问题: 1G的数据, 启动8个MapTask, 可以提高集群的并发处理能力. 那么1K的数据, 如果也启动8个MapTask, 会提高集群性能吗? MapTask并行任务是否是越多越好呢? 哪些因素影响了MapTask并行度?

MapTask并行度决定机制

首先需要区分两个概念:
1. 数据块: 数据块(Blocks)是HDFS物理上把数据分成不同的块. 数据块是HDFS的存储数据单位
2. 数据切片: 数据切片只是在逻辑上对输入进行分片, 并不会在磁盘上将其切分进行存储. 数据切片是MapRudece程序计算输入数据的单位, 一个切片会对应启动一个MapTask.
需要注意的是, 默认情况下, 切片大小=数据块大小. 这是由HDFS的"数据本地化优化"的特性决定的, 也即在存储输入数据的节点上运行map任务, 无需集群带宽资源, 便可获得最佳性能. 
如果分片跨越2个数据块,对于任何一个HDFS节点(基本不可能同时存储这2个数据块), 分片中的另外一块数据就需要通过网络传输到map任务节点, 与使用本地数据运行map任务相比, 效率则更低. 同时, 对数据进行切片时不会考虑从数据整体, 而是会逐个针对每一个文件进行单独的切片.

二. Job提交流程源码和切片源码详解

Job提交流程源码

FileInputFormat切片源码解析(input.getSplits(job))

(1) 先找到数据存储的目录
(2) 遍历处理目录下的每一个文件, 每个文件中:
        a) 获取文件大小fs.sizeOf(ss.txt)
        b) 计算切片大小
        > minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
        getFormatMinSplitSize()方法返回一个long类型的1
        getMinSplitSiez(JobContext job)方法返回SPLIT_MINSIZE常量. 这个常量参数由mapred-default.xml参数配置文件决定, 默认为0.
        因此, minSize返回一个1L
        > maxSize = getMaxSplitSize(job)
        getMaxSplitSize(JobContext context) 方法返回SPLIT_MAXSIZE常量, 这个常量参数同样由mapred-default.xml参数配置文件决定. 但默认情况下没有配置此常量的参数, 因此在方法返回参数时, 由getLong()方法提供默认值: Long.MAX_VALUE.
        c) 判断这个文件是否可以进行切割. 若可进行切割, 则获取块大小
        long blockSize = file.getBlockSize();
        本地模式下运行时, 块大小默认为32M
        d) 计算切片大小
        long splitSize = this.computeSplitSize(blockSIze, minSize, maxSize);
        在computeSplitSize()方法中, 返回值如下:

return Math.max(minSize, Math.min(maxSize, blockSize));

        在默认情况下, 切片大小 = 块大小
        e) 进行切片, 每次完成切片后, 都需要判断剩下部分的大小是否大于块的1.1倍, 如果小于1.1倍, 则就将剩下的部分划为一块切片

while(((double) bytesRemaining)/splitSize > SPLIT_SLOP)

        f) 将切片信息写入一个切片规划文件中, 暂时存储在.staging文件夹

(3) 提交切片规划文件到YARN上, YARN上的MrAppMaster根据切片规划文件计算需要开启MapTask的个数
**************************************************************************************************************
如何调整切片的大小?
如果需要调大切片, 则需要将minSize调大; 如果需要更小的切片, 则需要将maxSize调小
**************************************************************************************************************
获取切片信息API

// 获取切片信息API
String name = inputSplit.getPath().getName();
// 根据文件类型获取切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();

二. TextInputFormat

在运行MapReduce程序时, 输入的文件格式多种多样, 包括: 基于行的日志文件, 二进制格式文件, 数据库表等等, 那么针对不同的数据类型, MapReduce也同样提供了很多不同种类的数据格式. FileInputFormat常见的接口实现类包括: TextInputFormat, KeyValueTextInputFormat, NLineInputFormat, CombineTextInputFormat, 以及自定义InputFormat等.

TextInputFormat

TextInputFormat是FileInputFormat的默认实现类, 按行读取每条记录, 其键值对是<LongWritable, Text>
示例:

CombineTextInputFormat

 框架默认的TextInputFormat切片机制是对每个文件进行单独的切片规划, 不管文件多小, 都会产生一个单独的切片, 且都会交给一个MapTask. 如果一个任务中包含多个小文件, 那么就会产生大量的MapTask, 导致处理效率低下. 因此, 在这种情况下, 需要使用CombineTextInputFormat切片机制进行处理.
1) 虚拟存储切片最大值设置
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 4M
注意: 虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置
2) 切片机制

三. MapReduce工作流程

> 关于环形缓冲区内的元数据(Meta data), 其中包含以下几类数据:
index: 记录数据开始的位置
partition: 记录数据存放于哪个分区
keystart: 记录数据的key值开始的索引
valuestart: 记录数据的value值开始的索引

> 环形缓冲区默认大小是100M, 如果数据占到磁盘空间总大小的80%时, 则会开始反向写入. 这么做的原因是: 可以在向文件溢写的过程中, 同时开启向环形缓冲区写入的线程. 若新数据写入的速度较快, 则会等待旧数据溢写完成后继续进行写入.  如果直到缓冲区写满才进行溢写, 则需要等待溢写完成之后才可以重新对缓冲区进行写入.
缓冲区的大小可以通过参数调整: mapreduce.task.is.sort.mb 默认100M

 四. Shuffle机制(混洗)

Map方法之后, Reduce方法之前的数据处理过程称之为Shuffle.

Shuffle机制

 具体Shuffle过程如下:
1) MapTask收集map()方法输出的kv对, 放到内存缓冲区中.
2) 从内存缓冲区不断溢写到本地磁盘文件, 可能会溢出多个文件
3) 多个溢出文件会被合并成大的溢出文件
4) 在溢出过程即合并的过程中, 都需要调用Partitioner进行分区和针对key的排序(按字典进行快速排序)
5) ReduceTask根据自己的分区号, 从各个MapTask机器上拉取相应的结果分区数据
6) ReduceTask会抓取到来自不同MapTask同一分区的结果文件, ReduceTask会将这些文件再进行合并(Merge Sort)
7) 合并成大文件后, Shuffle的过程也就结束了, 后面就要进入ReduceTask的逻辑运算过程

Partition分区

 默认的Partitioner分区代码如下:

public class HashPartitioner<K, V> extends Partitioner<K, V>{

    public int getPartition(K key, V value, int numReduceTasks){
        return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    }

}

默认分区是根据keyhashCodeReudceTasks个数取模得到的. 用户无法指定key存储在哪个分区.

自定义Partitioner

1)自定义Partitioner方法需要重写getPartition()方法

public class CustomPartitioner exxtends Partitioner<TExt, FlowBean> {
    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions)
        // 控制分区代码逻辑
        ... ...
        return partition;
    }
}

2) 在Job驱动中, 设置自定义Partitioner

job.setPartitionerClass(CustomPartitioner.class);

3) 自定义Partition后, 要根据自定义Partition的逻辑设置相应数量的ReduceTask

job.setNumReduceTasks(5);

分区总结

1) 如果ReduceTask的数量 > getPartition的结果数, 则会多产生几个空的输出文件part-r-000xx;
2) 如果1 < ReduceTask的数量 < getPartition的结果数, 则有一部分分区数据无法存储, 会产生Exception;
3) 如果ReduceTask的数量 = 1, 则不管MapTask端输出多少个分区文件, 最终结果都会交给这一个ReduceTask, 最终也就只会产生一个结果文件part-r-00000. 因为在源代码中, 当用户把ReduceTask的个数设置为1时, 会进入程序本身已经定义好的getPartitioner匿名内部类;

if (this.partitions > 1) {
                this.partitioner = (Partitioner)ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
            } else {
                this.partitioner = new Partitioner<K, V>() {
                    public int getPartition(K key, V value, int numPartitions) {
                        return NewOutputCollector.this.partitions - 1;
                    }
                };
            }

 4) 分区号必须从零开始, 逐一累加

举报

相关推荐

0 条评论