0
点赞
收藏
分享

微信扫一扫

Hbase Compaction 源码分析 - RatioBasedCompactionPolicy 策略


目录

类的关系图


RatioBasedCompactionPolicy

selectCompaction 方法

getCurrentEligibleFiles方法

skipLargeFiles方法

createCompactionRequest方法

filterBulk方法

applyCompactionPolicy方法

removeExcessFiles方法

setIsMajor方法

 

其他相关文章

Hbase Compaction 源码分析 - CompactionChecker

Hbase Compaction 源码分析 - RatioBasedCompactionPolicy 策略

Hbase Compaction 源码分析 - CompactSplitThread 线程池选择

之前介绍 CompactionChecker 执行时机,这回接着介绍具体的策略

类的关系图

RatioBasedCompactionPolicy

该类在

org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy

 

 

selectCompaction 方法

调用过程如下

Hbase Compaction 源码分析 - RatioBasedCompactionPolicy 策略_hbase

我们看一下RatioBasedCompactionPolicy 的 selectCompaction 实现方法,实际是在父类  SortedCompactionPolicy 中

//candidateFiles 候选文件,并且按照seqId从最早到最新的排序
  //filesCompacting 正在Compcation的文件
  //mayUseOffPeak 是否为高峰期
  //forceMajor 是否为MajorCompaction,该值在 CompactionChecker 中会设置为true
  //返回 符合Compaction的候选列表
  public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
      final List<StoreFile> filesCompacting, final boolean isUserCompaction,
      final boolean mayUseOffPeak, final boolean forceMajor) throws IOException {
    // Preliminary compaction subject to filters
    ArrayList<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles);
    // Stuck and not compacting enough (estimate). It is not guaranteed that we will be
    // able to compact more if stuck and compacting, because ratio policy excludes some
    // non-compacting files from consideration during compaction (see getCurrentEligibleFiles).
    int futureFiles = filesCompacting.isEmpty() ? 0 : 1;
    //如果候选文件大于 文件阻塞个数(hbase.hstore.blockingStoreFiles 值,默认为7),
//   blockingStoreFiles: 如在任意 HStore 中有超过此数量的 HStoreFiles,则会阻止对此 HRegion 的更新,直到完成压缩或直到超过为 'hbase.hstore.blockingWaitTime' 指定的值。
    boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles)
        >= storeConfigInfo.getBlockingFileCount();

    //删除正在合并的文件
    candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);
    LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " +
        filesCompacting.size() + " compacting, " + candidateSelection.size() +
        " eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking");

    // If we can't have all files, we cannot do major anyway
    //判断是否包含全部文件,如果没有正在合并的文件则为true
    boolean isAllFiles = candidateFiles.size() == candidateSelection.size();
    //如果是全部文件,并且是MajorCompaction,则不进行文件过滤,否则进行文件过滤,过滤掉大于hbase.hstore.compaction.max.size值的文件
    if (!(forceMajor && isAllFiles)) {
      //排除大于hbase.hstore.compaction.max.size值的数据,默认Long.MAX_VALUE
      //hbase.hstore.compaction.max.size 表示文件大小大于该值的store file 一定会被minor compaction排除
      candidateSelection = skipLargeFiles(candidateSelection, mayUseOffPeak);
      isAllFiles = candidateFiles.size() == candidateSelection.size();
    }

    // Try a major compaction if this is a user-requested major compaction,
    // or if we do not have too many files to compact and this was requested as a major compaction
    //isTryingMajor判断条件有两种
    // 1、Major合并为True,且包含所有问文件,且是一个用户合并
    // 2、Major合并为True,且包含所有问文件,或者本身就是一个Major合并,同时,必须是candidateSelection的数目小于配置的达到合并条件的最大文件数目
    boolean isTryingMajor = (forceMajor && isAllFiles && isUserCompaction)
        || (((forceMajor && isAllFiles) || shouldPerformMajorCompaction(candidateSelection))
          && (candidateSelection.size() < comConf.getMaxFilesToCompact()));
    // Or, if there are any references among the candidates.
    //判断是否包含分裂后的文件
    boolean isAfterSplit = StoreUtils.hasReferences(candidateSelection);

    //如果不是isTryingMajor且不包含分裂后的文件,则 createCompactionRequest 方法中进行进一步文件过滤
    CompactionRequest result = createCompactionRequest(candidateSelection,
      isTryingMajor || isAfterSplit, mayUseOffPeak, mayBeStuck);

    ArrayList<StoreFile> filesToCompact = Lists.newArrayList(result.getFiles());
    //过滤掉多余最大合并的文件数量
    removeExcessFiles(filesToCompact, isUserCompaction, isTryingMajor);
    result.updateFiles(filesToCompact);

    isAllFiles = (candidateFiles.size() == filesToCompact.size());
    result.setOffPeak(!filesToCompact.isEmpty() && !isAllFiles && mayUseOffPeak);
    result.setIsMajor(isTryingMajor && isAllFiles, isAllFiles);

    return result;
  }

该方法主要流程:

 

1.传入参数与返回类型

candidateFiles 压缩候选文件,并且按照seqId从最早到最新的排序

filesCompacting 正在压缩的文件

mayUseOffPeak 是否为高峰期

forceMajor 是否为MajorCompaction,该值在 CompactionChecker 中会设置为true

isUserCompaction 是否为用户压缩

返回 CompactionRequest ,符合Compaction的候选列表

2.判断是否阻塞,等待合并的文件数量大于blockingStoreFiles,认为是阻塞

如果候选文件大于 文件阻塞个数(hbase.hstore.blockingStoreFiles 值,默认为7),

hbase.hstore.blockingStoreFiles: 如在任意 HStore 中有超过此数量的 HStoreFiles,则会阻止对此 HRegion 的更新,直到完成压缩或直到超过为 'hbase.hstore.blockingWaitTime' 指定的值。

3.从候选列表中candidateSelection删除正在Compaction的文件

4.判断是否包含全部文件,如果没有正在合并的文件isAllFiles则为true

5.如果是全部文件,并且是MajorCompaction,则不进行文件过滤,否则进行文件过滤;

文件过滤方法:skipLargeFiles,过滤掉大于hbase.hstore.compaction.max.size值的文件,该方法后面介绍

6.判断isTryingMajor(判断后续是否为Major使用),判断条件有两种,满足一个即为true:

a.Major(forceMajor)合并为true,且包含所有文件,且是一个用户合并

b.Major(forceMajor)合并为true,且包含所有问文件,或者本身就是一个Major合并,同时,必须是candidateSelection的数目小于配置的达到合并条件的最大文件数目

7.判断candidateSelection是否包含分裂后的文件

8.如果不是isTryingMajor且不包含分裂后的文件,则执行 createCompactionRequest 方法中进行进一步文件过滤,createCompactionRequest方法后面介绍

9.执行removeExcessFiles方法,如果大于最大合并的文件数量,则过滤掉多余的数量,否则不处理;执行removeExcessFiles方法下一步介绍

10.在返回的result中设置本次Compcation的类型(Major或者Minor),调用方法 setIsMajor,下面介绍

getCurrentEligibleFiles方法

protected ArrayList<StoreFile> getCurrentEligibleFiles(ArrayList<StoreFile> candidateFiles,
      final List<StoreFile> filesCompacting) {
    // candidates = all storefiles not already in compaction queue
    if (!filesCompacting.isEmpty()) {
      // exclude all files older than the newest file we're currently
      // compacting. this allows us to preserve contiguity (HBASE-2856)
      StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
      int idx = candidateFiles.indexOf(last);
      Preconditions.checkArgument(idx != -1);
      candidateFiles.subList(0, idx + 1).clear();
    }
    return candidateFiles;
  }

该方法主要流程:

1.从候选文件列表中删除正在Compaction的文件

skipLargeFiles方法

/**
   * @param candidates pre-filtrate
   * @return filtered subset exclude all files above maxCompactSize
   *   Also save all references. We MUST compact them
   */
  protected ArrayList<StoreFile> skipLargeFiles(ArrayList<StoreFile> candidates,
    boolean mayUseOffpeak) {
    int pos = 0;
    //候选文件大于0 且文件不是分裂后的文件 且文件大小大于配置最大文件大小maxCompactSize时,该文件会被剔除
    while (pos < candidates.size() && !candidates.get(pos).isReference()
      && (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize(mayUseOffpeak))) {
      ++pos;
    }
    if (pos > 0) {
      LOG.debug("Some files are too large. Excluding " + pos
          + " files from compaction candidates");
      candidates.subList(0, pos).clear();
    }
    return candidates;
  }

该方法主要流程:

1.判断候选文件大于0 且文件不是分裂后的文件(如果是split后的文件,是需要进行Compaction,不会剔除) 且文件大小大于配置最大文件大小maxCompactSize时,执行++pos

2.pos大于0,清除大的数据

createCompactionRequest方法

protected CompactionRequest createCompactionRequest(ArrayList<StoreFile> candidateSelection,
    boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
    if (!tryingMajor) {
        //进入这里则为minorCompaction
      //过滤掉BulkLoad到HBase的文件
      candidateSelection = filterBulk(candidateSelection);
      //过滤掉不应该Minor的文件
      candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);
      candidateSelection = checkMinFilesCriteria(candidateSelection,
        comConf.getMinFilesToCompact());
    }
    return new CompactionRequest(candidateSelection);
  }

该方法主要流程:

如果不是isTryingMajor且不包含分裂后的文件,则为MinorCompaction 进行进一步文件过滤,否则直接返回

filterBulk方法

/**
   * @param candidates pre-filtrate
   * @return filtered subset exclude all bulk load files if configured
   */
  protected ArrayList<StoreFile> filterBulk(ArrayList<StoreFile> candidates) {
    candidates.removeAll(Collections2.filter(candidates, new Predicate<StoreFile>() {
      @Override
      public boolean apply(StoreFile input) {
   //判断该文件是否需要执行MinorCompaction
        return input.excludeFromMinorCompaction();
      }
    }));
    return candidates;
  }

该方法主要作用:

判断StoreFIle是否设置excludeFromMinorCompaction,也就是过滤掉BulkLoad到HBase的文件

applyCompactionPolicy方法

protected ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates,
    boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
    if (candidates.isEmpty()) {
      return candidates;
    }
//前提:,选择待合并的文件按时间排序,最旧的文件排最前。
    // we're doing a minor compaction, let's see what files are applicable
    int start = 0;
    //hbase.hstore.compaction.ratio 1.2
    double ratio = comConf.getCompactionRatio();
    //判断是否为高峰期,高峰期 ratio 值为5,非高峰期为1.2
    if (mayUseOffPeak) {
      //获取hbase.hstore.compaction.ratio.offpeak值,默认是5
      ratio = comConf.getCompactionRatioOffPeak();
      LOG.info("Running an off-peak compaction, selection ratio = " + ratio);
    }

    // get store file sizes for incremental compacting selection.
    //https://blog.csdn.net/bryce123phy/article/details/56003628
    //获取待Compaction文件数量
    final int countOfFiles = candidates.size();
    long[] fileSizes = new long[countOfFiles];//每个file大小
    long[] sumSize = new long[countOfFiles];//前几个file大小总和
    for (int i = countOfFiles - 1; i >= 0; --i) {
      StoreFile file = candidates.get(i);
      fileSizes[i] = file.getReader().length();
      // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
      //getMaxFilesToCompact 获取最大文件压缩数,默认为10
      int tooFar = i + comConf.getMaxFilesToCompact() - 1;
      sumSize[i] = fileSizes[i]
        + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
        - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
    }

//getMinFilesToCompact : hbase.hstore.compactionThreshold
//    如在任意一个 HStore 中有超过此数量的 HStoreFiles,
//    则将运行压缩以将所有 HStoreFiles 文件作为一个 HStoreFile 重新写入。
//    (每次 memstore 刷新写入一个 HStoreFile)您可通过指定更大数量延长压缩,
//    但压缩将运行更长时间。在压缩期间,更新无法刷新到磁盘。长时间压缩需要足够的内存,
//    以在压缩的持续时间内记录所有更新。如太大,压缩期间客户端会超时。
    //getMinCompactSize 最小合并大小
    //也就是说,当待合并文件数量大于最小合并数量  并且
    // 文件大小大于Math.max(comConf.getMinCompactSize(),(long) (sumSize[start + 1] * ratio)值
    // 该端代码意思是过滤比较大的文件,默认认为最早的StoreFile文件大小最大(之前合并过)
//这里高峰期满足条件的数量小于等于非高峰期数量
    while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
      fileSizes[start] > Math.max(comConf.getMinCompactSize(),
          (long) (sumSize[start + 1] * ratio))) {
      ++start;
    }
    if (start < countOfFiles) {
      //从 countOfFiles 个候选文件中选取 start 个文件进行Compaction
      LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
        + " files from " + countOfFiles + " candidates");
    } else if (mayBeStuck) {
      //mayBeStuck判断规则:
          //如果候选文件大于 文件阻塞个数(hbase.hstore.blockingStoreFiles 值,默认为7),
          //hbase.hstore.blockingStoreFiles: 如在任意 HStore 中有超过此数量的 HStoreFiles,则会阻止对此 HRegion 的更新,直到完成压缩或直到超过为 'hbase.hstore.blockingWaitTime' 指定的值。
      // We may be stuck. Compact the latest files if we can.
      int filesToLeave = candidates.size() - comConf.getMinFilesToCompact();
      if (filesToLeave >= 0) {
        start = filesToLeave;
      }
    }
    candidates.subList(0, start).clear();
    return candidates;
  }

该方法主要流程

1.判断是否为高峰期,并确认ratio的值
2.计算文件大小
3.增加start变量,过滤掉文件较大的文件
4.判断
    a.判断如果不是所有文件都被过滤,则从候选列表清空比较大的文件,我们认为越老的文件(在变量的最前面,所以可以通过++start方式可以过滤大文件),文件占用空间越大
    b.判断如果所有文件都被过滤,继续判断是否阻塞(如果候选文件大于 文件阻塞个数(hbase.hstore.blockingStoreFiles 值,默认为7)则视为阻塞),如果阻塞则将start调整,保证Compaction压缩 

removeExcessFiles方法

protected void removeExcessFiles(ArrayList<StoreFile> candidates,
      boolean isUserCompaction, boolean isMajorCompaction) {
    //如果待合并的文件大于配置的最大合并文件数量
    int excess = candidates.size() - comConf.getMaxFilesToCompact();
    if (excess > 0) {
      //如果isMajorCompaction为true并且是用户合并则不过滤
      if (isMajorCompaction && isUserCompaction) {
        LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact()
            + " files because of a user-requested major compaction");
      } else {
        //过滤掉多余最大合并文件数量的文件
        LOG.debug("Too many admissible files. Excluding " + excess
            + " files from compaction candidates");
        candidates.subList(comConf.getMaxFilesToCompact(), candidates.size()).clear();
      }
    }
  }

setIsMajor方法

public void setIsMajor(boolean isMajor, boolean isAllFiles) {
    //如果不是全部文件,并且是major压缩,抛异常,也就是说如果有正在Compaction的文件,就不能执行MajorCompaction
    assert isAllFiles || !isMajor;
    //不是全部文件:则为Minor压缩
    //是全部文件:并且是isTryingMajor为true,则为MAJOR,否则则为ALL_FILES
    this.isMajor = !isAllFiles ? DisplayCompactionType.MINOR
        : (isMajor ? DisplayCompactionType.MAJOR : DisplayCompactionType.ALL_FILES);
  }

该方法主要流程:

1.如果不是全部文件,并且是major压缩,抛异常;也就是说如果有正在Compaction的文件,就不能执行MajorCompaction

2. 不是全部文件:则为Minor压缩
    是全部文件:并且是isTryingMajor为true,则为MAJOR,否则则为ALL_FILES

DisplayCompactionType 枚举有三个值 MINOR, ALL_FILES, MAJOR
但是判断是否为Major的条件只有
LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName()
+ ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
+ (request.isAllFiles() ? " (all files)" : ""));
public boolean isMajor() {
return this.isMajor == DisplayCompactionType.MAJOR;
}
我们从代码中可以看出当类型为ALL_FILES或者MINOR都是MinorCompcation

涉及到的配置参数


说明

默认值

hbase.hregion.majorcompaction

在一个区域中所有 HStoreFiles Major 压缩之间的时间(以毫秒为单位)。要禁用自动的Major压缩,请将此值设置为 0。

7天

hbase.hregion.majorcompaction.jitter

抖动外边界以进行最大化压缩。在每个 RegionServer 上,hbase.region.majorcompaction 间隔与此最大边界内的随机分数相乘。在即将运行下一个最大化压缩时加入该 + 或 - 乘积。最大化压缩不应同时发生在各 RegionServer 上。该数越小,压缩越紧密。

所以 major compact的时间间隔 = [7-7*0.5,7+7.0.5]

0.5

hbase.server.thread.wakefrequency

搜索工作时暂停的时间段(以毫秒为单位)。服务线程如 META 扫描仪、日志滚轮、Major Compcation 线程使用的睡眠间隔。

10秒

hbase.server.compactchecker.interval.multiplier

hbase后台线程检查因子,hbase.server.compactchecker.interval.multiplier*

hbase.server.thread.wakefrequency 就是Compaction Major 检查的周期,比如1000*10秒≈2.77小时

1000

hbase.hstore.compaction.ratio

这个ratio参数的作用是判断文件大小 > hbase.hstore.compaction.min.size的StoreFile是否也是适合进行minor compaction的,默认值1.2。更大的值将压缩产生更大的StoreFile,建议取值范围在1.0~1.4之间。大多数场景下也不建议调整该参数。

1.2

hbase.hstore.compaction.ratio.offpeak

此参数与compaction ratio参数含义相同,是在原有文件选择策略基础上增加了一个非高峰期的ratio控制,默认值5.0。这个参数受另外两个参数 hbase.offpeak.start.hour 与 hbase.offpeak.end.hour 控制,这两个参数值为[0, 23]的整数,用于定义非高峰期时间段,默认值均为-1表示禁用非高峰期ratio设置。

5

 

 

 

 

 

 

 

举报

相关推荐

0 条评论