目录
类的关系图
RatioBasedCompactionPolicy
selectCompaction 方法
getCurrentEligibleFiles方法
skipLargeFiles方法
createCompactionRequest方法
filterBulk方法
applyCompactionPolicy方法
removeExcessFiles方法
setIsMajor方法
其他相关文章
Hbase Compaction 源码分析 - CompactionChecker
Hbase Compaction 源码分析 - RatioBasedCompactionPolicy 策略
Hbase Compaction 源码分析 - CompactSplitThread 线程池选择
之前介绍 CompactionChecker 执行时机,这回接着介绍具体的策略
类的关系图
RatioBasedCompactionPolicy
该类在
org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy
selectCompaction 方法
调用过程如下
我们看一下RatioBasedCompactionPolicy 的 selectCompaction 实现方法,实际是在父类 SortedCompactionPolicy 中
//candidateFiles 候选文件,并且按照seqId从最早到最新的排序
//filesCompacting 正在Compcation的文件
//mayUseOffPeak 是否为高峰期
//forceMajor 是否为MajorCompaction,该值在 CompactionChecker 中会设置为true
//返回 符合Compaction的候选列表
public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
final List<StoreFile> filesCompacting, final boolean isUserCompaction,
final boolean mayUseOffPeak, final boolean forceMajor) throws IOException {
// Preliminary compaction subject to filters
ArrayList<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles);
// Stuck and not compacting enough (estimate). It is not guaranteed that we will be
// able to compact more if stuck and compacting, because ratio policy excludes some
// non-compacting files from consideration during compaction (see getCurrentEligibleFiles).
int futureFiles = filesCompacting.isEmpty() ? 0 : 1;
//如果候选文件大于 文件阻塞个数(hbase.hstore.blockingStoreFiles 值,默认为7),
// blockingStoreFiles: 如在任意 HStore 中有超过此数量的 HStoreFiles,则会阻止对此 HRegion 的更新,直到完成压缩或直到超过为 'hbase.hstore.blockingWaitTime' 指定的值。
boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles)
>= storeConfigInfo.getBlockingFileCount();
//删除正在合并的文件
candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);
LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " +
filesCompacting.size() + " compacting, " + candidateSelection.size() +
" eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking");
// If we can't have all files, we cannot do major anyway
//判断是否包含全部文件,如果没有正在合并的文件则为true
boolean isAllFiles = candidateFiles.size() == candidateSelection.size();
//如果是全部文件,并且是MajorCompaction,则不进行文件过滤,否则进行文件过滤,过滤掉大于hbase.hstore.compaction.max.size值的文件
if (!(forceMajor && isAllFiles)) {
//排除大于hbase.hstore.compaction.max.size值的数据,默认Long.MAX_VALUE
//hbase.hstore.compaction.max.size 表示文件大小大于该值的store file 一定会被minor compaction排除
candidateSelection = skipLargeFiles(candidateSelection, mayUseOffPeak);
isAllFiles = candidateFiles.size() == candidateSelection.size();
}
// Try a major compaction if this is a user-requested major compaction,
// or if we do not have too many files to compact and this was requested as a major compaction
//isTryingMajor判断条件有两种
// 1、Major合并为True,且包含所有问文件,且是一个用户合并
// 2、Major合并为True,且包含所有问文件,或者本身就是一个Major合并,同时,必须是candidateSelection的数目小于配置的达到合并条件的最大文件数目
boolean isTryingMajor = (forceMajor && isAllFiles && isUserCompaction)
|| (((forceMajor && isAllFiles) || shouldPerformMajorCompaction(candidateSelection))
&& (candidateSelection.size() < comConf.getMaxFilesToCompact()));
// Or, if there are any references among the candidates.
//判断是否包含分裂后的文件
boolean isAfterSplit = StoreUtils.hasReferences(candidateSelection);
//如果不是isTryingMajor且不包含分裂后的文件,则 createCompactionRequest 方法中进行进一步文件过滤
CompactionRequest result = createCompactionRequest(candidateSelection,
isTryingMajor || isAfterSplit, mayUseOffPeak, mayBeStuck);
ArrayList<StoreFile> filesToCompact = Lists.newArrayList(result.getFiles());
//过滤掉多余最大合并的文件数量
removeExcessFiles(filesToCompact, isUserCompaction, isTryingMajor);
result.updateFiles(filesToCompact);
isAllFiles = (candidateFiles.size() == filesToCompact.size());
result.setOffPeak(!filesToCompact.isEmpty() && !isAllFiles && mayUseOffPeak);
result.setIsMajor(isTryingMajor && isAllFiles, isAllFiles);
return result;
}
该方法主要流程:
1.传入参数与返回类型
candidateFiles 压缩候选文件,并且按照seqId从最早到最新的排序
filesCompacting 正在压缩的文件
mayUseOffPeak 是否为高峰期
forceMajor 是否为MajorCompaction,该值在 CompactionChecker 中会设置为true
isUserCompaction 是否为用户压缩
返回 CompactionRequest ,符合Compaction的候选列表
2.判断是否阻塞,等待合并的文件数量大于blockingStoreFiles,认为是阻塞
如果候选文件大于 文件阻塞个数(hbase.hstore.blockingStoreFiles 值,默认为7),
hbase.hstore.blockingStoreFiles: 如在任意 HStore 中有超过此数量的 HStoreFiles,则会阻止对此 HRegion 的更新,直到完成压缩或直到超过为 'hbase.hstore.blockingWaitTime' 指定的值。
3.从候选列表中candidateSelection删除正在Compaction的文件
4.判断是否包含全部文件,如果没有正在合并的文件isAllFiles则为true
5.如果是全部文件,并且是MajorCompaction,则不进行文件过滤,否则进行文件过滤;
文件过滤方法:skipLargeFiles,过滤掉大于hbase.hstore.compaction.max.size值的文件,该方法后面介绍
6.判断isTryingMajor(判断后续是否为Major使用),判断条件有两种,满足一个即为true:
a.Major(forceMajor)合并为true,且包含所有文件,且是一个用户合并
b.Major(forceMajor)合并为true,且包含所有问文件,或者本身就是一个Major合并,同时,必须是candidateSelection的数目小于配置的达到合并条件的最大文件数目
7.判断candidateSelection是否包含分裂后的文件
8.如果不是isTryingMajor且不包含分裂后的文件,则执行 createCompactionRequest 方法中进行进一步文件过滤,createCompactionRequest方法后面介绍
9.执行removeExcessFiles方法,如果大于最大合并的文件数量,则过滤掉多余的数量,否则不处理;执行removeExcessFiles方法下一步介绍
10.在返回的result中设置本次Compcation的类型(Major或者Minor),调用方法 setIsMajor,下面介绍
getCurrentEligibleFiles方法
protected ArrayList<StoreFile> getCurrentEligibleFiles(ArrayList<StoreFile> candidateFiles,
final List<StoreFile> filesCompacting) {
// candidates = all storefiles not already in compaction queue
if (!filesCompacting.isEmpty()) {
// exclude all files older than the newest file we're currently
// compacting. this allows us to preserve contiguity (HBASE-2856)
StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
int idx = candidateFiles.indexOf(last);
Preconditions.checkArgument(idx != -1);
candidateFiles.subList(0, idx + 1).clear();
}
return candidateFiles;
}
该方法主要流程:
1.从候选文件列表中删除正在Compaction的文件
skipLargeFiles方法
/**
* @param candidates pre-filtrate
* @return filtered subset exclude all files above maxCompactSize
* Also save all references. We MUST compact them
*/
protected ArrayList<StoreFile> skipLargeFiles(ArrayList<StoreFile> candidates,
boolean mayUseOffpeak) {
int pos = 0;
//候选文件大于0 且文件不是分裂后的文件 且文件大小大于配置最大文件大小maxCompactSize时,该文件会被剔除
while (pos < candidates.size() && !candidates.get(pos).isReference()
&& (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize(mayUseOffpeak))) {
++pos;
}
if (pos > 0) {
LOG.debug("Some files are too large. Excluding " + pos
+ " files from compaction candidates");
candidates.subList(0, pos).clear();
}
return candidates;
}
该方法主要流程:
1.判断候选文件大于0 且文件不是分裂后的文件(如果是split后的文件,是需要进行Compaction,不会剔除) 且文件大小大于配置最大文件大小maxCompactSize时,执行++pos
2.pos大于0,清除大的数据
createCompactionRequest方法
protected CompactionRequest createCompactionRequest(ArrayList<StoreFile> candidateSelection,
boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
if (!tryingMajor) {
//进入这里则为minorCompaction
//过滤掉BulkLoad到HBase的文件
candidateSelection = filterBulk(candidateSelection);
//过滤掉不应该Minor的文件
candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak, mayBeStuck);
candidateSelection = checkMinFilesCriteria(candidateSelection,
comConf.getMinFilesToCompact());
}
return new CompactionRequest(candidateSelection);
}
该方法主要流程:
如果不是isTryingMajor且不包含分裂后的文件,则为MinorCompaction 进行进一步文件过滤,否则直接返回
filterBulk方法
/**
* @param candidates pre-filtrate
* @return filtered subset exclude all bulk load files if configured
*/
protected ArrayList<StoreFile> filterBulk(ArrayList<StoreFile> candidates) {
candidates.removeAll(Collections2.filter(candidates, new Predicate<StoreFile>() {
@Override
public boolean apply(StoreFile input) {
//判断该文件是否需要执行MinorCompaction
return input.excludeFromMinorCompaction();
}
}));
return candidates;
}
该方法主要作用:
判断StoreFIle是否设置excludeFromMinorCompaction,也就是过滤掉BulkLoad到HBase的文件
applyCompactionPolicy方法
protected ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates,
boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
if (candidates.isEmpty()) {
return candidates;
}
//前提:,选择待合并的文件按时间排序,最旧的文件排最前。
// we're doing a minor compaction, let's see what files are applicable
int start = 0;
//hbase.hstore.compaction.ratio 1.2
double ratio = comConf.getCompactionRatio();
//判断是否为高峰期,高峰期 ratio 值为5,非高峰期为1.2
if (mayUseOffPeak) {
//获取hbase.hstore.compaction.ratio.offpeak值,默认是5
ratio = comConf.getCompactionRatioOffPeak();
LOG.info("Running an off-peak compaction, selection ratio = " + ratio);
}
// get store file sizes for incremental compacting selection.
//https://blog.csdn.net/bryce123phy/article/details/56003628
//获取待Compaction文件数量
final int countOfFiles = candidates.size();
long[] fileSizes = new long[countOfFiles];//每个file大小
long[] sumSize = new long[countOfFiles];//前几个file大小总和
for (int i = countOfFiles - 1; i >= 0; --i) {
StoreFile file = candidates.get(i);
fileSizes[i] = file.getReader().length();
// calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
//getMaxFilesToCompact 获取最大文件压缩数,默认为10
int tooFar = i + comConf.getMaxFilesToCompact() - 1;
sumSize[i] = fileSizes[i]
+ ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
- ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
}
//getMinFilesToCompact : hbase.hstore.compactionThreshold
// 如在任意一个 HStore 中有超过此数量的 HStoreFiles,
// 则将运行压缩以将所有 HStoreFiles 文件作为一个 HStoreFile 重新写入。
// (每次 memstore 刷新写入一个 HStoreFile)您可通过指定更大数量延长压缩,
// 但压缩将运行更长时间。在压缩期间,更新无法刷新到磁盘。长时间压缩需要足够的内存,
// 以在压缩的持续时间内记录所有更新。如太大,压缩期间客户端会超时。
//getMinCompactSize 最小合并大小
//也就是说,当待合并文件数量大于最小合并数量 并且
// 文件大小大于Math.max(comConf.getMinCompactSize(),(long) (sumSize[start + 1] * ratio)值
// 该端代码意思是过滤比较大的文件,默认认为最早的StoreFile文件大小最大(之前合并过)
//这里高峰期满足条件的数量小于等于非高峰期数量
while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
fileSizes[start] > Math.max(comConf.getMinCompactSize(),
(long) (sumSize[start + 1] * ratio))) {
++start;
}
if (start < countOfFiles) {
//从 countOfFiles 个候选文件中选取 start 个文件进行Compaction
LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
+ " files from " + countOfFiles + " candidates");
} else if (mayBeStuck) {
//mayBeStuck判断规则:
//如果候选文件大于 文件阻塞个数(hbase.hstore.blockingStoreFiles 值,默认为7),
//hbase.hstore.blockingStoreFiles: 如在任意 HStore 中有超过此数量的 HStoreFiles,则会阻止对此 HRegion 的更新,直到完成压缩或直到超过为 'hbase.hstore.blockingWaitTime' 指定的值。
// We may be stuck. Compact the latest files if we can.
int filesToLeave = candidates.size() - comConf.getMinFilesToCompact();
if (filesToLeave >= 0) {
start = filesToLeave;
}
}
candidates.subList(0, start).clear();
return candidates;
}
该方法主要流程
1.判断是否为高峰期,并确认ratio的值
2.计算文件大小
3.增加start变量,过滤掉文件较大的文件
4.判断
a.判断如果不是所有文件都被过滤,则从候选列表清空比较大的文件,我们认为越老的文件(在变量的最前面,所以可以通过++start方式可以过滤大文件),文件占用空间越大
b.判断如果所有文件都被过滤,继续判断是否阻塞(如果候选文件大于 文件阻塞个数(hbase.hstore.blockingStoreFiles 值,默认为7)则视为阻塞),如果阻塞则将start调整,保证Compaction压缩
removeExcessFiles方法
protected void removeExcessFiles(ArrayList<StoreFile> candidates,
boolean isUserCompaction, boolean isMajorCompaction) {
//如果待合并的文件大于配置的最大合并文件数量
int excess = candidates.size() - comConf.getMaxFilesToCompact();
if (excess > 0) {
//如果isMajorCompaction为true并且是用户合并则不过滤
if (isMajorCompaction && isUserCompaction) {
LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact()
+ " files because of a user-requested major compaction");
} else {
//过滤掉多余最大合并文件数量的文件
LOG.debug("Too many admissible files. Excluding " + excess
+ " files from compaction candidates");
candidates.subList(comConf.getMaxFilesToCompact(), candidates.size()).clear();
}
}
}
setIsMajor方法
public void setIsMajor(boolean isMajor, boolean isAllFiles) {
//如果不是全部文件,并且是major压缩,抛异常,也就是说如果有正在Compaction的文件,就不能执行MajorCompaction
assert isAllFiles || !isMajor;
//不是全部文件:则为Minor压缩
//是全部文件:并且是isTryingMajor为true,则为MAJOR,否则则为ALL_FILES
this.isMajor = !isAllFiles ? DisplayCompactionType.MINOR
: (isMajor ? DisplayCompactionType.MAJOR : DisplayCompactionType.ALL_FILES);
}
该方法主要流程:
1.如果不是全部文件,并且是major压缩,抛异常;也就是说如果有正在Compaction的文件,就不能执行MajorCompaction
2. 不是全部文件:则为Minor压缩
是全部文件:并且是isTryingMajor为true,则为MAJOR,否则则为ALL_FILES
DisplayCompactionType 枚举有三个值 MINOR, ALL_FILES, MAJOR
但是判断是否为Major的条件只有
LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName()
+ ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction"
+ (request.isAllFiles() ? " (all files)" : ""));
public boolean isMajor() {
return this.isMajor == DisplayCompactionType.MAJOR;
}
我们从代码中可以看出当类型为ALL_FILES或者MINOR都是MinorCompcation
涉及到的配置参数
值 | 说明 | 默认值 |
hbase.hregion.majorcompaction | 在一个区域中所有 HStoreFiles Major 压缩之间的时间(以毫秒为单位)。要禁用自动的Major压缩,请将此值设置为 0。 | 7天 |
hbase.hregion.majorcompaction.jitter | 抖动外边界以进行最大化压缩。在每个 RegionServer 上,hbase.region.majorcompaction 间隔与此最大边界内的随机分数相乘。在即将运行下一个最大化压缩时加入该 + 或 - 乘积。最大化压缩不应同时发生在各 RegionServer 上。该数越小,压缩越紧密。 所以 major compact的时间间隔 = [7-7*0.5,7+7.0.5] | 0.5 |
hbase.server.thread.wakefrequency | 搜索工作时暂停的时间段(以毫秒为单位)。服务线程如 META 扫描仪、日志滚轮、Major Compcation 线程使用的睡眠间隔。 | 10秒 |
hbase.server.compactchecker.interval.multiplier | hbase后台线程检查因子,hbase.server.compactchecker.interval.multiplier* hbase.server.thread.wakefrequency 就是Compaction Major 检查的周期,比如1000*10秒≈2.77小时 | 1000 |
hbase.hstore.compaction.ratio | 这个ratio参数的作用是判断文件大小 > hbase.hstore.compaction.min.size的StoreFile是否也是适合进行minor compaction的,默认值1.2。更大的值将压缩产生更大的StoreFile,建议取值范围在1.0~1.4之间。大多数场景下也不建议调整该参数。 | 1.2 |
hbase.hstore.compaction.ratio.offpeak | 此参数与compaction ratio参数含义相同,是在原有文件选择策略基础上增加了一个非高峰期的ratio控制,默认值5.0。这个参数受另外两个参数 hbase.offpeak.start.hour 与 hbase.offpeak.end.hour 控制,这两个参数值为[0, 23]的整数,用于定义非高峰期时间段,默认值均为-1表示禁用非高峰期ratio设置。 | 5 |