在MR实践中,会有很多小文件,单个文件产生一个mapper,资源比较浪费,后续没有reduce逻辑的话,会产生很多小文件,文件数量暴涨,对后续的hive job产生影响。
所以需要在mapper中将多个文件合成一个split作为输入,CombineFileInputFormat满足我们的需求。
CombineFileInputFormat 原理(网上牛人总结):
第一次:将同DN上的所有block生成Split,生成方式:
1.循环nodeToBlocks,获得每个DN上有哪些block
2.循环这些block列表
3.将block从blockToNodes中移除,避免同一个block被包含在多个split中
4.将该block添加到一个有效block的列表中,这个列表主要是保留哪些block已经从blockToNodes中被移除了,方便后面恢复到blockToNodes中
5.向临时变量curSplitSize增加block的大小
6.判断curSplitSize是否已经超过了设置的maxSize
a) 如果超过,执行并添加split信息,并重置curSplitSize和validBlocks
b) 没有超过,继续循环block列表,跳到第2步
7.当前DN上的block列表循环完成,判断剩余的block是否允许被split(剩下的block大小之和是否大于每个DN的最小split大小)
a) 如果允许,执行并添加split信息
b) 如果不被允许,将这些剩余的block归还blockToNodes
8.重置
9.跳到步骤1
1. // process all nodes and create splits that are local
2. // to a node.
3. //创建同一个DN上的split
4. for (Iterator<Map.Entry<String,
5. List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator();
6. iter.hasNext();) {
7.
8. Map.Entry<String, List<OneBlockInfo>> one = iter.next();
9. nodes.add(one.getKey());
10. List<OneBlockInfo> blocksInNode = one.getValue();
11.
12. // for each block, copy it into validBlocks. Delete it from
13. // blockToNodes so that the same block does not appear in
14. // two different splits.
15. for (OneBlockInfo oneblock : blocksInNode) {
16. if (blockToNodes.containsKey(oneblock)) {
17. validBlocks.add(oneblock);
18. blockToNodes.remove(oneblock);
19. curSplitSize += oneblock.length;
20.
21. // if the accumulated split size exceeds the maximum, then
22. // create this split.
23. if (maxSize != 0 && curSplitSize >= maxSize) {
24. // create an input split and add it to the splits array
25. //创建这些block合并后的split,并将其split添加到split列表中
26. addCreatedSplit(job, splits, nodes, validBlocks);
27. //重置
28. 0;
29. validBlocks.clear();
30. }
31. }
32. }
33. // if there were any blocks left over and their combined size is
34. // larger than minSplitNode, then combine them into one split.
35. // Otherwise add them back to the unprocessed pool. It is likely
36. // that they will be combined with other blocks from the same rack later on.
37. //其实这里的注释已经说的很清楚,我再按照我的理解说一下
38. /**
39. * 这里有几种情况:
40. * 1、在这个DN上还有没有被split的block,
41. * 而且这些block的大小大于了在一个DN上的split最小值(没有达到最大值),
42. * 将把这些block合并成一个split
43. * 2、剩余的block的大小还是没有达到,将剩余的这些block
44. * 归还给blockToNodes,等以后统一处理
45. */
46. if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
47. // create an input split and add it to the splits array
48. addCreatedSplit(job, splits, nodes, validBlocks);
49. else {
50. for (OneBlockInfo oneblock : validBlocks) {
51. blockToNodes.put(oneblock, oneblock.hosts);
52. }
53. }
54. validBlocks.clear();
55. nodes.clear();
56. 0;
57. }
第二次:对不再同一个DN上但是在同一个Rack上的block进行合并(只是之前还剩下的block)
1. // if blocks in a rack are below the specified minimum size, then keep them
2. // in 'overflow'. After the processing of all racks is complete, these overflow
3. // blocks will be combined into splits.
4. new ArrayList<OneBlockInfo>();
5. new ArrayList<String>();
6.
7. // Process all racks over and over again until there is no more work to do.
8. //这里处理的就不再是同一个DN上的block
9. //同一个DN上的已经被处理过了(上面的代码),这里是一些
10. //还没有被处理的block
11. while (blockToNodes.size() > 0) {
12.
13. // Create one split for this rack before moving over to the next rack.
14. // Come back to this rack after creating a single split for each of the
15. // remaining racks.
16. // Process one rack location at a time, Combine all possible blocks that
17. // reside on this rack as one split. (constrained by minimum and maximum
18. // split size).
19.
20. // iterate over all racks
21. //创建同机架的split
22. for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter =
23. rackToBlocks.entrySet().iterator(); iter.hasNext();) {
24.
25. Map.Entry<String, List<OneBlockInfo>> one = iter.next();
26. racks.add(one.getKey());
27. List<OneBlockInfo> blocks = one.getValue();
28.
29. // for each block, copy it into validBlocks. Delete it from
30. // blockToNodes so that the same block does not appear in
31. // two different splits.
32. boolean createdSplit = false;
33. for (OneBlockInfo oneblock : blocks) {
34. //这里很重要,现在的blockToNodes说明的是还有哪些block没有被split
35. if (blockToNodes.containsKey(oneblock)) {
36. validBlocks.add(oneblock);
37. blockToNodes.remove(oneblock);
38. curSplitSize += oneblock.length;
39.
40. // if the accumulated split size exceeds the maximum, then
41. // create this split.
42. if (maxSize != 0 && curSplitSize >= maxSize) {
43. // create an input split and add it to the splits array
44. addCreatedSplit(job, splits, getHosts(racks), validBlocks);
45. true;
46. break;
47. }
48. }
49. }
50.
51. // if we created a split, then just go to the next rack
52. if (createdSplit) {
53. 0;
54. validBlocks.clear();
55. racks.clear();
56. continue;
57. }
58.
59. //还有没有被split的block
60. //如果这些block的大小大于了同机架的最小split,
61. //则创建split
62. //否则,将这些block留到后面处理
63. if (!validBlocks.isEmpty()) {
64. if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
65. // if there is a mimimum size specified, then create a single split
66. // otherwise, store these blocks into overflow data structure
67. addCreatedSplit(job, splits, getHosts(racks), validBlocks);
68. else {
69. // There were a few blocks in this rack that remained to be processed.
70. // Keep them in 'overflow' block list. These will be combined later.
71. overflowBlocks.addAll(validBlocks);
72. }
73. }
74. 0;
75. validBlocks.clear();
76. racks.clear();
77. }
78. }
最后,对于既不在同DN也不在同rack的block进行合并(经过前两步还剩下的block),这里源码就没有什么了,就不再贴了
源码总结:
合并,经过了3个步骤。同DN----》同rack不同DN-----》不同rack
将可以合并的block写到同一个split中
下面是实践代码:
原始文件是70M每个的小文件,有些更小,sequence类型,需要自己实现RecordRead(Text就比较简单),key是byteWrite类型,现在需要减少文件个数,每个文件的大小接近block的大小。
自定义CombineSequenceFileInputFormat:
1. package com.hadoop.combineInput;
2.
3. import java.io.IOException;
4.
5. import org.apache.hadoop.mapreduce.InputSplit;
6. import org.apache.hadoop.mapreduce.RecordReader;
7. import org.apache.hadoop.mapreduce.TaskAttemptContext;
8. import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
9. import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
10. import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
11.
12. public class CombineSequenceFileInputFormat<K, V> extends CombineFileInputFormat<K, V> {
13. @SuppressWarnings({ "unchecked", "rawtypes" })
14. @Override
15. public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
16. return new CombineFileRecordReader((CombineFileSplit)split, context, CombineSequenceFileRecordReader.class);
17. }
18. }
实现 CombineSequenceFileRecordReader
1. package com.hadoop.combineInput;
2.
3.
4. import java.io.IOException;
5.
6. import org.apache.hadoop.mapreduce.InputSplit;
7. import org.apache.hadoop.mapreduce.RecordReader;
8. import org.apache.hadoop.mapreduce.TaskAttemptContext;
9. import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
10. import org.apache.hadoop.mapreduce.lib.input.FileSplit;
11. import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
12. import org.apache.hadoop.util.ReflectionUtils;
13.
14.
15. public class CombineSequenceFileRecordReader<K, V> extends RecordReader<K, V> {
16. private CombineFileSplit split;
17. private TaskAttemptContext context;
18. private int index;
19. private RecordReader<K, V> rr;
20.
21. @SuppressWarnings("unchecked")
22. public CombineSequenceFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException, InterruptedException {
23. this.index = index;
24. this.split = (CombineFileSplit) split;
25. this.context = context;
26.
27. this.rr = ReflectionUtils.newInstance(SequenceFileRecordReader.class, context.getConfiguration());
28. }
29.
30. @SuppressWarnings("unchecked")
31. @Override
32. public void initialize(InputSplit curSplit, TaskAttemptContext curContext) throws IOException, InterruptedException {
33. this.split = (CombineFileSplit) curSplit;
34. this.context = curContext;
35.
36. if (null == rr) {
37. class, context.getConfiguration());
38. }
39.
40. new FileSplit(this.split.getPath(index),
41. this.split.getOffset(index), this.split.getLength(index),
42. this.split.getLocations());
43.
44. this.rr.initialize(fileSplit, this.context);
45. }
46.
47. @Override
48. public float getProgress() throws IOException, InterruptedException {
49. return rr.getProgress();
50. }
51.
52. @Override
53. public void close() throws IOException {
54. if (null != rr) {
55. rr.close();
56. null;
57. }
58. }
59.
60. @Override
61. public K getCurrentKey()
62. throws IOException, InterruptedException {
63. return rr.getCurrentKey();
64. }
65.
66. @Override
67. public V getCurrentValue()
68. throws IOException, InterruptedException {
69. return rr.getCurrentValue();
70. }
71.
72. @Override
73. public boolean nextKeyValue() throws IOException, InterruptedException {
74. return rr.nextKeyValue();
75. }
76. }
参考资料:http://sourceforge.net/p/openimaj/code/HEAD/tree/trunk/hadoop/core-hadoop/src/main/java/org/openimaj/hadoop/sequencefile/combine/CombineSequenceFileRecordReader.java
main函数比较简单,这里也贴出来下,方便后续自己记忆:
1. package com.hadoop.combineInput;
2.
3. import java.io.IOException;
4.
5.
6. import org.apache.hadoop.conf.Configuration;
7. import org.apache.hadoop.conf.Configured;
8. import org.apache.hadoop.fs.Path;
9.
10. import org.apache.hadoop.io.BytesWritable;
11. import org.apache.hadoop.io.Text;
12. import org.apache.hadoop.mapreduce.Job;
13. import org.apache.hadoop.mapreduce.Mapper;
14. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
16. import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
17. import org.apache.hadoop.util.Tool;
18. import org.apache.hadoop.util.ToolRunner;
19.
20. public class MergeFiles extends Configured implements Tool {
21. public static class MapClass extends Mapper<BytesWritable, Text, BytesWritable, Text> {
22.
23. public void map(BytesWritable key, Text value, Context context)
24. throws IOException, InterruptedException {
25. context.write(key, value);
26. }
27. // END: MapClass
28.
29.
30. public int run(String[] args) throws Exception {
31. new Configuration();
32. "mapred.max.split.size", "157286400");
33. "mapred.output.compress", true);
34. new Job(conf);
35. "MergeFiles");
36. class);
37.
38. class);
39. class);
40. class);
41. class);
42. class);
43.
44. 0]);
45. new Path(args[1]));
46.
47. 0);
48.
49. return job.waitForCompletion(true) ? 0 : 1;
50. // END: run
51.
52. public static void main(String[] args) throws Exception {
53. int ret = ToolRunner.run(new MergeFiles(), args);
54. System.exit(ret);
55. // END: main
56. } //
性能测试:70M大小的压缩sequence文件,2000个,转换成是700个压缩sequence文件,平均每个200M(可控),blocksize=256,耗时2分半到3分钟。
存在问题:
- 合并后会造成mapper不能本地化,带来mapper的额外开销,需要权衡