0
点赞
收藏
分享

微信扫一扫

Hadoop中CombineFileInputFormat详解


 


 

在MR实践中,会有很多小文件,单个文件产生一个mapper,资源比较浪费,后续没有reduce逻辑的话,会产生很多小文件,文件数量暴涨,对后续的hive job产生影响。

所以需要在mapper中将多个文件合成一个split作为输入,CombineFileInputFormat满足我们的需求。

CombineFileInputFormat 原理(网上牛人总结):

 

第一次:将同DN上的所有block生成Split,生成方式:

1.循环nodeToBlocks,获得每个DN上有哪些block

2.循环这些block列表

3.将block从blockToNodes中移除,避免同一个block被包含在多个split中

4.将该block添加到一个有效block的列表中,这个列表主要是保留哪些block已经从blockToNodes中被移除了,方便后面恢复到blockToNodes中

5.向临时变量curSplitSize增加block的大小

6.判断curSplitSize是否已经超过了设置的maxSize

a) 如果超过,执行并添加split信息,并重置curSplitSize和validBlocks

b) 没有超过,继续循环block列表,跳到第2步

7.当前DN上的block列表循环完成,判断剩余的block是否允许被split(剩下的block大小之和是否大于每个DN的最小split大小)

a) 如果允许,执行并添加split信息

b) 如果不被允许,将这些剩余的block归还blockToNodes

8.重置

9.跳到步骤1

 

1. // process all nodes and create splits that are local     
2. // to a node.      
3. //创建同一个DN上的split     
4. for (Iterator<Map.Entry<String,      
5.          List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator();      
6.          iter.hasNext();) {     
7.      
8.       Map.Entry<String, List<OneBlockInfo>> one = iter.next();     
9.       nodes.add(one.getKey());     
10.       List<OneBlockInfo> blocksInNode = one.getValue();     
11.      
12. // for each block, copy it into validBlocks. Delete it from      
13. // blockToNodes so that the same block does not appear in      
14. // two different splits.     
15. for (OneBlockInfo oneblock : blocksInNode) {     
16. if (blockToNodes.containsKey(oneblock)) {     
17.           validBlocks.add(oneblock);     
18.           blockToNodes.remove(oneblock);     
19.           curSplitSize += oneblock.length;     
20.      
21. // if the accumulated split size exceeds the maximum, then      
22. // create this split.     
23. if (maxSize != 0 && curSplitSize >= maxSize) {     
24. // create an input split and add it to the splits array     
25. //创建这些block合并后的split,并将其split添加到split列表中     
26.             addCreatedSplit(job, splits, nodes, validBlocks);     
27. //重置     
28. 0;     
29.             validBlocks.clear();     
30.           }     
31.         }     
32.       }     
33. // if there were any blocks left over and their combined size is     
34. // larger than minSplitNode, then combine them into one split.     
35. // Otherwise add them back to the unprocessed pool. It is likely      
36. // that they will be combined with other blocks from the same rack later on.     
37. //其实这里的注释已经说的很清楚,我再按照我的理解说一下     
38. /**  
39.        * 这里有几种情况:  
40.        * 1、在这个DN上还有没有被split的block,  
41.        * 而且这些block的大小大于了在一个DN上的split最小值(没有达到最大值),  
42.        * 将把这些block合并成一个split  
43.        * 2、剩余的block的大小还是没有达到,将剩余的这些block  
44.        * 归还给blockToNodes,等以后统一处理  
45.        */     
46. if (minSizeNode != 0 && curSplitSize >= minSizeNode) {     
47. // create an input split and add it to the splits array     
48.         addCreatedSplit(job, splits, nodes, validBlocks);     
49. else {     
50. for (OneBlockInfo oneblock : validBlocks) {     
51.           blockToNodes.put(oneblock, oneblock.hosts);     
52.         }     
53.       }     
54.       validBlocks.clear();     
55.       nodes.clear();     
56. 0;     
57.     }



第二次:对不再同一个DN上但是在同一个Rack上的block进行合并(只是之前还剩下的block)

 

 



 


1. // if blocks in a rack are below the specified minimum size, then keep them     
2. // in 'overflow'. After the processing of all racks is complete, these overflow     
3. // blocks will be combined into splits.     
4. new ArrayList<OneBlockInfo>();     
5. new ArrayList<String>();     
6.      
7. // Process all racks over and over again until there is no more work to do.     
8. //这里处理的就不再是同一个DN上的block     
9. //同一个DN上的已经被处理过了(上面的代码),这里是一些     
10. //还没有被处理的block     
11. while (blockToNodes.size() > 0) {     
12.      
13. // Create one split for this rack before moving over to the next rack.      
14. // Come back to this rack after creating a single split for each of the      
15. // remaining racks.     
16. // Process one rack location at a time, Combine all possible blocks that     
17. // reside on this rack as one split. (constrained by minimum and maximum     
18. // split size).     
19.      
20. // iterate over all racks      
21. //创建同机架的split     
22. for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter =      
23.            rackToBlocks.entrySet().iterator(); iter.hasNext();) {     
24.      
25.         Map.Entry<String, List<OneBlockInfo>> one = iter.next();     
26.         racks.add(one.getKey());     
27.         List<OneBlockInfo> blocks = one.getValue();     
28.      
29. // for each block, copy it into validBlocks. Delete it from      
30. // blockToNodes so that the same block does not appear in      
31. // two different splits.     
32. boolean createdSplit = false;     
33. for (OneBlockInfo oneblock : blocks) {     
34. //这里很重要,现在的blockToNodes说明的是还有哪些block没有被split     
35. if (blockToNodes.containsKey(oneblock)) {     
36.             validBlocks.add(oneblock);     
37.             blockToNodes.remove(oneblock);     
38.             curSplitSize += oneblock.length;     
39.            
40. // if the accumulated split size exceeds the maximum, then      
41. // create this split.     
42. if (maxSize != 0 && curSplitSize >= maxSize) {     
43. // create an input split and add it to the splits array     
44.               addCreatedSplit(job, splits, getHosts(racks), validBlocks);     
45. true;     
46. break;     
47.             }     
48.           }     
49.         }     
50.      
51. // if we created a split, then just go to the next rack     
52. if (createdSplit) {     
53. 0;     
54.           validBlocks.clear();     
55.           racks.clear();     
56. continue;     
57.         }     
58.      
59. //还有没有被split的block     
60. //如果这些block的大小大于了同机架的最小split,     
61. //则创建split     
62. //否则,将这些block留到后面处理     
63. if (!validBlocks.isEmpty()) {     
64. if (minSizeRack != 0 && curSplitSize >= minSizeRack) {     
65. // if there is a mimimum size specified, then create a single split     
66. // otherwise, store these blocks into overflow data structure     
67.             addCreatedSplit(job, splits, getHosts(racks), validBlocks);     
68. else {     
69. // There were a few blocks in this rack that remained to be processed.     
70. // Keep them in 'overflow' block list. These will be combined later.     
71.             overflowBlocks.addAll(validBlocks);     
72.           }     
73.         }     
74. 0;     
75.         validBlocks.clear();     
76.         racks.clear();     
77.       }     
78.     }


最后,对于既不在同DN也不在同rack的block进行合并(经过前两步还剩下的block),这里源码就没有什么了,就不再贴了

 

 

 

源码总结:

 

合并,经过了3个步骤。同DN----》同rack不同DN-----》不同rack

将可以合并的block写到同一个split中

下面是实践代码:

原始文件是70M每个的小文件,有些更小,sequence类型,需要自己实现RecordRead(Text就比较简单),key是byteWrite类型,现在需要减少文件个数,每个文件的大小接近block的大小。

自定义CombineSequenceFileInputFormat:

 


 


1. package com.hadoop.combineInput;  
2.   
3. import java.io.IOException;  
4.   
5. import org.apache.hadoop.mapreduce.InputSplit;  
6. import org.apache.hadoop.mapreduce.RecordReader;  
7. import org.apache.hadoop.mapreduce.TaskAttemptContext;  
8. import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;  
9. import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;  
10. import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;  
11.   
12. public class CombineSequenceFileInputFormat<K, V> extends CombineFileInputFormat<K, V> {  
13. @SuppressWarnings({ "unchecked", "rawtypes" })  
14. @Override  
15. public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {  
16. return new CombineFileRecordReader((CombineFileSplit)split, context, CombineSequenceFileRecordReader.class);  
17.     }  
18. }


实现 CombineSequenceFileRecordReader

 

 


 


1. package com.hadoop.combineInput;  
2.   
3.   
4. import java.io.IOException;  
5.   
6. import org.apache.hadoop.mapreduce.InputSplit;  
7. import org.apache.hadoop.mapreduce.RecordReader;  
8. import org.apache.hadoop.mapreduce.TaskAttemptContext;  
9. import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;  
10. import org.apache.hadoop.mapreduce.lib.input.FileSplit;  
11. import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;  
12. import org.apache.hadoop.util.ReflectionUtils;  
13.   
14.   
15. public class CombineSequenceFileRecordReader<K, V> extends RecordReader<K, V> {  
16. private CombineFileSplit split;  
17. private TaskAttemptContext context;  
18. private int index;  
19. private RecordReader<K, V> rr;  
20.   
21. @SuppressWarnings("unchecked")  
22. public CombineSequenceFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) throws IOException, InterruptedException {  
23. this.index = index;  
24. this.split = (CombineFileSplit) split;  
25. this.context = context;  
26.   
27. this.rr = ReflectionUtils.newInstance(SequenceFileRecordReader.class, context.getConfiguration());  
28.     }  
29.   
30. @SuppressWarnings("unchecked")  
31. @Override  
32. public void initialize(InputSplit curSplit, TaskAttemptContext curContext) throws IOException, InterruptedException {  
33. this.split = (CombineFileSplit) curSplit;  
34. this.context = curContext;  
35.   
36. if (null == rr) {  
37. class, context.getConfiguration());  
38.         }  
39.   
40. new FileSplit(this.split.getPath(index),  
41. this.split.getOffset(index), this.split.getLength(index),  
42. this.split.getLocations());  
43.           
44. this.rr.initialize(fileSplit, this.context);  
45.     }  
46.   
47. @Override  
48. public float getProgress() throws IOException, InterruptedException {  
49. return rr.getProgress();  
50.     }  
51.   
52. @Override  
53. public void close() throws IOException {  
54. if (null != rr) {  
55.             rr.close();  
56. null;  
57.         }  
58.     }  
59.   
60. @Override  
61. public K getCurrentKey()  
62. throws IOException, InterruptedException {  
63. return rr.getCurrentKey();  
64.     }  
65.   
66. @Override  
67. public V getCurrentValue()  
68. throws IOException, InterruptedException {  
69. return rr.getCurrentValue();  
70.     }  
71.   
72. @Override  
73. public boolean nextKeyValue() throws IOException, InterruptedException {  
74. return rr.nextKeyValue();  
75.     }  
76. }


参考资料:http://sourceforge.net/p/openimaj/code/HEAD/tree/trunk/hadoop/core-hadoop/src/main/java/org/openimaj/hadoop/sequencefile/combine/CombineSequenceFileRecordReader.java

 

main函数比较简单,这里也贴出来下,方便后续自己记忆:

 



 

1. package com.hadoop.combineInput;  
2.   
3. import java.io.IOException;  
4.   
5.   
6. import org.apache.hadoop.conf.Configuration;  
7. import org.apache.hadoop.conf.Configured;  
8. import org.apache.hadoop.fs.Path;  
9.   
10. import org.apache.hadoop.io.BytesWritable;  
11. import org.apache.hadoop.io.Text;  
12. import org.apache.hadoop.mapreduce.Job;  
13. import org.apache.hadoop.mapreduce.Mapper;  
14. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
15. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
16. import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;  
17. import org.apache.hadoop.util.Tool;  
18. import org.apache.hadoop.util.ToolRunner;  
19.   
20. public class MergeFiles extends Configured implements Tool {   
21. public static class MapClass extends Mapper<BytesWritable, Text, BytesWritable, Text> {  
22.   
23. public void map(BytesWritable key, Text value, Context context)  
24. throws IOException, InterruptedException {  
25.             context.write(key, value);  
26.         }  
27. // END: MapClass  
28.   
29.       
30. public int run(String[] args) throws Exception {  
31. new Configuration();  
32. "mapred.max.split.size", "157286400");  
33. "mapred.output.compress", true);  
34. new Job(conf);  
35. "MergeFiles");  
36. class);  
37.   
38. class);  
39. class);  
40. class);  
41. class);  
42. class);  
43.   
44. 0]);  
45. new Path(args[1]));  
46.   
47. 0);  
48.   
49. return job.waitForCompletion(true) ? 0 : 1;  
50. // END: run  
51.   
52. public static void main(String[] args) throws Exception {  
53. int ret = ToolRunner.run(new MergeFiles(), args);  
54.         System.exit(ret);  
55. // END: main  
56. } //


 




性能测试:70M大小的压缩sequence文件,2000个,转换成是700个压缩sequence文件,平均每个200M(可控),blocksize=256,耗时2分半到3分钟。

 

存在问题:

  1. 合并后会造成mapper不能本地化,带来mapper的额外开销,需要权衡
举报

相关推荐

0 条评论