Hadoop源码分析:
回退上一级源码快捷键:ctrl+alt+ <-
数据输入:
InputFormat:
-
getSplits(具体由FileInput实现)方法一
public List<InputSplit> getSplits(JobContext job) throws IOException { //创建一个时间戳 StopWatch sw = new StopWatch().start(); //最小的切片大小 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); //getLong(SPLIT_MAXSIZE,Long.MAX_VALUE) //MAX_VALUE = 0x7fffffffffffffffL long maxSize = getMaxSplitSize(job); // generate splits //创建存储切片对象列表 List<InputSplit> splits = new ArrayList<InputSplit>(); //获取filestatus 通过filestatus获取文件内容信息 List<FileStatus> files = listStatus(job); for (FileStatus file: files) { //获取文件路径 Path path = file.getPath(); //获取文件长度 long length = file.getLen(); //如果文件不为空 if (length != 0) { BlockLocation[] blkLocations; //获取block的位置信息 if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { FileSystem fs = path.getFileSystem(job.getConfiguration()); blkLocations = fs.getFileBlockLocations(file, 0, length); } //被压缩文件能否被切分 if (isSplitable(job, path)) { //获取文件的block信息 long blockSize = file.getBlockSize(); //return Math.max(minSize, Math.min(maxSize, blockSize)); //获取切片大小 long splitSize = computeSplitSize(blockSize, minSize, maxSize); //创建剩余字节数 long bytesRemaining = length; //bytesRemaining SPLIT_SLOP = 1.1 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); //创建具体的split splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); //创建剩余的偏移量,如果剩余的不多的话,小于1.1倍的block块,直接打包成一个split bytesRemaining -= splitSize; } //文件大小 小于128*1.1mb if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } } else { // not splitable splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts())); } } else { //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } // Save the number of input files for metrics/loadgen job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS)); } return splits; }
总结:
1、一个block块可以被切分成一个split
2、split实际上是一个逻辑分区,并没有将文件实际划分成多个小文件
3、split切分文件大小可以变更,可以通过long splitSize = computeSplitSize(blockSize, minSize, maxSize);
-
getSplits(具体由CombineFileInputFormat实现)方法二
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-f3OqnMkb-1649427937920)(G:\数加科技\day56\day04\2022年4月6日\笔记\CombineFileinputFromat.png)]
public List<InputSplit> getSplits(JobContext job) throws IOException { long minSizeNode = 0; long minSizeRack = 0; long maxSize = 0; Configuration conf = job.getConfiguration(); // the values specified by setxxxSplitSize() takes precedence over the // values that might have been specified in the config if (minSplitSizeNode != 0) { minSizeNode = minSplitSizeNode; } else { minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0); } if (minSplitSizeRack != 0) { minSizeRack = minSplitSizeRack; } else { minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0); } if (maxSplitSize != 0) { maxSize = maxSplitSize; } else { maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0); // If maxSize is not configured, a single split will be generated per // node. } if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) { throw new IOException("Minimum split size pernode " + minSizeNode + " cannot be larger than maximum split size " + maxSize); } if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) { throw new IOException("Minimum split size per rack " + minSizeRack + " cannot be larger than maximum split size " + maxSize); } if (minSizeRack != 0 && minSizeNode > minSizeRack) { throw new IOException("Minimum split size per node " + minSizeNode + " cannot be larger than minimum split " + "size per rack " + minSizeRack); } // all the files in input set List<FileStatus> stats = listStatus(job); List<InputSplit> splits = new ArrayList<InputSplit>(); if (stats.size() == 0) { return splits; } // In one single iteration, process all the paths in a single pool. // Processing one pool at a time ensures that a split contains paths // from a single pool only. for (MultiPathFilter onepool : pools) { ArrayList<FileStatus> myPaths = new ArrayList<FileStatus>(); // pick one input path. If it matches all the filters in a pool, // add it to the output set for (Iterator<FileStatus> iter = stats.iterator(); iter.hasNext();) { FileStatus p = iter.next(); if (onepool.accept(p.getPath())) { myPaths.add(p); // add it to my output set iter.remove(); } } // create splits for all files in this pool. getMoreSplits(job, myPaths, maxSize, minSizeNode, minSizeRack, splits); } // create splits for all files that are not in any pool. getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits); // free up rackToNodes map rackToNodes.clear(); return splits; }
-
createRecordReader()
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
//切片开始的偏移量
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
// open the file and seek to the start of the split
final FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(file);
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
if (null!=codec) {
isCompressedInput = true;
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn =
((SplittableCompressionCodec)codec).createInputStream(
fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
in = new CompressedSplitLineReader(cIn, job,
this.recordDelimiterBytes);
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn;
} else {
//具体文件切分
in = new SplitLineReader(codec.createInputStream(fileIn,
decompressor), job, this.recordDelimiterBytes);
filePosition = fileIn;
}
} else {
fileIn.seek(start);
in = new UncompressedSplitLineReader(
fileIn, job, this.recordDelimiterBytes, split.getLength());
filePosition = fileIn;
}
// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
}