0
点赞
收藏
分享

微信扫一扫

学习大数据的第50天(Mapreduce篇)Hadoop源码分析

工程与房产肖律师 2022-04-13 阅读 41

Hadoop源码分析:

回退上一级源码快捷键:ctrl+alt+ <-

数据输入:

InputFormat:
  • getSplits(具体由FileInput实现)方法一

    public List<InputSplit> getSplits(JobContext job) throws IOException {
      //创建一个时间戳
      StopWatch sw = new StopWatch().start();
        //最小的切片大小
      long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
        //getLong(SPLIT_MAXSIZE,Long.MAX_VALUE)
        //MAX_VALUE = 0x7fffffffffffffffL
      long maxSize = getMaxSplitSize(job);
    
      // generate splits
        //创建存储切片对象列表
      List<InputSplit> splits = new ArrayList<InputSplit>();
        //获取filestatus 通过filestatus获取文件内容信息
      List<FileStatus> files = listStatus(job);
      for (FileStatus file: files) {
          //获取文件路径
        Path path = file.getPath();
          //获取文件长度
        long length = file.getLen();
          //如果文件不为空
        if (length != 0) {
          BlockLocation[] blkLocations;
            //获取block的位置信息
          if (file instanceof LocatedFileStatus) {
            blkLocations = ((LocatedFileStatus) file).getBlockLocations();
          } else {
            FileSystem fs = path.getFileSystem(job.getConfiguration());
            blkLocations = fs.getFileBlockLocations(file, 0, length);
          }
            //被压缩文件能否被切分
          if (isSplitable(job, path)) {
              //获取文件的block信息
            long blockSize = file.getBlockSize();
              //return Math.max(minSize, Math.min(maxSize, blockSize));
              //获取切片大小
            long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    
              //创建剩余字节数
            long bytesRemaining = length;
              //bytesRemaining  SPLIT_SLOP = 1.1
            while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
              int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                //创建具体的split
              splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                          blkLocations[blkIndex].getHosts(),
                          blkLocations[blkIndex].getCachedHosts()));
                //创建剩余的偏移量,如果剩余的不多的话,小于1.1倍的block块,直接打包成一个split
              bytesRemaining -= splitSize;
            }
    
              //文件大小 小于128*1.1mb
            if (bytesRemaining != 0) {
              int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
              splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                         blkLocations[blkIndex].getHosts(),
                         blkLocations[blkIndex].getCachedHosts()));
            }
          } else { // not splitable
            splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                        blkLocations[0].getCachedHosts()));
          }
        } else { 
          //Create empty hosts array for zero length files
          splits.add(makeSplit(path, 0, length, new String[0]));
        }
      }
      // Save the number of input files for metrics/loadgen
      job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
      sw.stop();
      if (LOG.isDebugEnabled()) {
        LOG.debug("Total # of splits generated by getSplits: " + splits.size()
            + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
      }
      return splits;
    }
    

    总结:

    1、一个block块可以被切分成一个split

    2、split实际上是一个逻辑分区,并没有将文件实际划分成多个小文件

    3、split切分文件大小可以变更,可以通过long splitSize = computeSplitSize(blockSize, minSize, maxSize);

  • getSplits(具体由CombineFileInputFormat实现)方法二

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-f3OqnMkb-1649427937920)(G:\数加科技\day56\day04\2022年4月6日\笔记\CombineFileinputFromat.png)]

    public List<InputSplit> getSplits(JobContext job) 
        throws IOException {
        long minSizeNode = 0;
        long minSizeRack = 0;
        long maxSize = 0;
        Configuration conf = job.getConfiguration();
    
        // the values specified by setxxxSplitSize() takes precedence over the
        // values that might have been specified in the config
        if (minSplitSizeNode != 0) {
          minSizeNode = minSplitSizeNode;
        } else {
          minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
        }
        if (minSplitSizeRack != 0) {
          minSizeRack = minSplitSizeRack;
        } else {
          minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
        }
        if (maxSplitSize != 0) {
          maxSize = maxSplitSize;
        } else {
          maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
          // If maxSize is not configured, a single split will be generated per
          // node.
        }
        if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
          throw new IOException("Minimum split size pernode " + minSizeNode +
                                " cannot be larger than maximum split size " +
                                maxSize);
        }
        if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
          throw new IOException("Minimum split size per rack " + minSizeRack +
                                " cannot be larger than maximum split size " +
                                maxSize);
        }
        if (minSizeRack != 0 && minSizeNode > minSizeRack) {
          throw new IOException("Minimum split size per node " + minSizeNode +
                                " cannot be larger than minimum split " +
                                "size per rack " + minSizeRack);
        }
    
        // all the files in input set
        List<FileStatus> stats = listStatus(job);
        List<InputSplit> splits = new ArrayList<InputSplit>();
        if (stats.size() == 0) {
          return splits;    
        }
    
        // In one single iteration, process all the paths in a single pool.
        // Processing one pool at a time ensures that a split contains paths
        // from a single pool only.
        for (MultiPathFilter onepool : pools) {
          ArrayList<FileStatus> myPaths = new ArrayList<FileStatus>();
          
          // pick one input path. If it matches all the filters in a pool,
          // add it to the output set
          for (Iterator<FileStatus> iter = stats.iterator(); iter.hasNext();) {
            FileStatus p = iter.next();
            if (onepool.accept(p.getPath())) {
              myPaths.add(p); // add it to my output set
              iter.remove();
            }
          }
          // create splits for all files in this pool.
          getMoreSplits(job, myPaths, maxSize, minSizeNode, minSizeRack, splits);
        }
    
        // create splits for all files that are not in any pool.
        getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits);
    
        // free up rackToNodes map
        rackToNodes.clear();
        return splits;    
      }
    
  • createRecordReader()

public void initialize(InputSplit genericSplit,
                       TaskAttemptContext context) throws IOException {
  FileSplit split = (FileSplit) genericSplit;
  Configuration job = context.getConfiguration();
  this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
  	//切片开始的偏移量
  start = split.getStart();
  end = start + split.getLength();
  final Path file = split.getPath();

  // open the file and seek to the start of the split
  final FileSystem fs = file.getFileSystem(job);
  fileIn = fs.open(file);
  
  CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
  if (null!=codec) {
    isCompressedInput = true;  
    decompressor = CodecPool.getDecompressor(codec);
    if (codec instanceof SplittableCompressionCodec) {
      final SplitCompressionInputStream cIn =
        ((SplittableCompressionCodec)codec).createInputStream(
          fileIn, decompressor, start, end,
          SplittableCompressionCodec.READ_MODE.BYBLOCK);
      in = new CompressedSplitLineReader(cIn, job,
          this.recordDelimiterBytes);
      start = cIn.getAdjustedStart();
      end = cIn.getAdjustedEnd();
      filePosition = cIn;
    } else {
        //具体文件切分
      in = new SplitLineReader(codec.createInputStream(fileIn,
          decompressor), job, this.recordDelimiterBytes);
      filePosition = fileIn;
    }
  } else {
    fileIn.seek(start);
    in = new UncompressedSplitLineReader(
        fileIn, job, this.recordDelimiterBytes, split.getLength());
    filePosition = fileIn;
  }
  // If this is not the first split, we always throw away first record
  // because we always (except the last split) read one extra line in
  // next() method.
  if (start != 0) {
    start += in.readLine(new Text(), 0, maxBytesToConsume(start));
  }
  this.pos = start;
}
举报

相关推荐

0 条评论