0
点赞
收藏
分享

微信扫一扫

sentinel源码分析第十二篇一核心流程一FlowSlot流控


文章目录

  • ​​源码分析一入口checkFlow​​
  • ​​源码分析一canPassCheck​​
  • ​​源码分析一集群流控passClusterCheck​​
  • ​​源码分析一clusterService.requestToken​​
  • ​​源码分析一单机流控passLocalCheck​​
  • ​​快速失败DefaultController​​
  • ​​扩展点一外部配置图​​

源码分析一入口checkFlow

  • 获取所有的流控规则 通过flowRuleManager 获取资源的所有规则
  • 没有通过流控则抛出异常 流控规则和node中的统计信息比较

public class FlowRuleChecker {

public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
获取所有的流控规则 通过flowRuleManager 获取资源的所有规则
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
没有通过流控则抛出异常 流控规则和node中的统计信息比较
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
}

源码分析一canPassCheck

  • 集群模式则处理集群流控
  • 单机模式处理单机流控

public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {

// 来源 从规则中获取限定的来源 一般为default 来源null说明是发出请求 不是别人请求我的资源
String limitApp = rule.getLimitApp();
// 流控规则 flowRule默认default 其他规则基本没有默认值
if (limitApp == null) {
return true;
}
// 集群流控处理
if (rule.isClusterMode()) {
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
// 单机流控处理
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}

源码分析一集群流控passClusterCheck

  • 直接或远程调用TokenService获取限流结果
  • 所有特殊情况则直接通过或者调用单机限流

private static boolean passClusterCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
try {
TokenService clusterService = pickClusterService();
if (clusterService == null) {
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
}
long flowId = rule.getClusterConfig().getFlowId();
如果clusterService是集群server 则直接调用
如果clusterService是集群client 则远程调用
TokenResult result = clusterService.requestToken(flowId, acquireCount, prioritized);
/*
若用户未引入集群限流 client 相关依赖,或者 client 未开启/连接失败/通信失败,则对于开启了集群模式的规则:
根据配置直接通过限流或者退化到 local 模式的限流
*/
return applyTokenResult(result, rule, context, node, acquireCount, prioritized);
} catch (Throwable ex) {
}
根据配置直接通过限流或者退化到 local 模式的限流
return fallbackToLocalOrPass(rule, context, node, acquireCount, prioritized);
}

源码分析一clusterService.requestToken

  • 通过ClusterFlowChecker.acquireClusterToken判断是否被限流或者排队

public TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) {
if (notValidRequest(ruleId, acquireCount)) {
return badRequest();
}
FlowRule rule = ClusterFlowRuleManager.getFlowRuleById(ruleId);
if (rule == null) {
return new TokenResult(TokenResultStatus.NO_RULE_EXISTS);
}
通过ClusterFlowChecker获取流控结果
return ClusterFlowChecker.acquireClusterToken(rule, acquireCount, prioritized);
}

  • 获取相关限流信息
  • 剩余token树大于0 返回成功并处理通过的流量统计信息
  • 如果需要等待返回等待时间
  • 如果发生block则返回被限流,并处理未通过的流量统计信息

final class ClusterFlowChecker {
static TokenResult acquireClusterToken(/*@Valid*/ FlowRule rule, int acquireCount, boolean prioritized) {
Long id = rule.getClusterConfig().getFlowId();
// qps保护
if (!allowProceed(id)) {
return new TokenResult(TokenResultStatus.TOO_MANY_REQUEST);
}
// 获取度量信息
ClusterMetric metric = ClusterMetricStatistics.getMetric(id);
if (metric == null) {
return new TokenResult(TokenResultStatus.FAIL);
}
// 获取相关限流信息
double latestQps = metric.getAvg(ClusterFlowEvent.PASS);
double globalThreshold = calcGlobalThreshold(rule) * ClusterServerConfigManager.getExceedCount();
double nextRemaining = globalThreshold - latestQps - acquireCount;
// 剩余token树大于0
if (nextRemaining >= 0) {
// 处理已经通过的流量统计信息
metric.add(ClusterFlowEvent.PASS, acquireCount);
metric.add(ClusterFlowEvent.PASS_REQUEST, 1);
if (prioritized) {
// Add prioritized pass.
metric.add(ClusterFlowEvent.OCCUPIED_PASS, acquireCount);
}
// Remaining count is cut down to a smaller integer.
// 返回限流结果
return new TokenResult(TokenResultStatus.OK)
.setRemaining((int) nextRemaining)
.setWaitInMs(0);
} else {
if (prioritized) {
// 需要排队则返回排队等待时间
double occupyAvg = metric.getAvg(ClusterFlowEvent.WAITING);
if (occupyAvg <= ClusterServerConfigManager.getMaxOccupyRatio() * globalThreshold) {
int waitInMs = metric.tryOccupyNext(ClusterFlowEvent.PASS, acquireCount, globalThreshold);
// waitInMs > 0 indicates pre-occupy incoming buckets successfully.
if (waitInMs > 0) {
ClusterServerStatLogUtil.log("flow|waiting|" + id);
return new TokenResult(TokenResultStatus.SHOULD_WAIT)
.setRemaining(0)
.setWaitInMs(waitInMs);
}
}
}
// 处理未通过的流量的统计信息
metric.add(ClusterFlowEvent.BLOCK, acquireCount);
metric.add(ClusterFlowEvent.BLOCK_REQUEST, 1);
ClusterServerStatLogUtil.log("flow|block|" + id, acquireCount);
ClusterServerStatLogUtil.log("flow|block_request|" + id, 1);
if (prioritized) {
// Add prioritized block.
metric.add(ClusterFlowEvent.OCCUPIED_BLOCK, acquireCount);
ClusterServerStatLogUtil.log("flow|occupied_block|" + id, 1);
}
// 返回被限流结果
return blockedResult();
}
}
}

源码分析一单机流控passLocalCheck

  • 根据策略选择不同的单机限流controller处理

private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
根据规则[来源 限流策略等]获取正确的统计node
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
根据规则进行逐项检查 流控效果
DefaultController 快速失败
RateLimiterController
WarmUpController 令牌桶
WarmUpRateLimiterController
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}

快速失败DefaultController

  • 获取当前时间窗中已经统计的数据
  • 超过阈值则限流

public class DefaultController implements TrafficShapingController {

public boolean canPass(Node node, int acquireCount, boolean prioritized) {

获取当前时间窗中已经统计的数据
int curCount = avgUsedTokens(node);
超过阈值 限流
if (curCount + acquireCount > count) {

// 有优先权
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
// 判断是不是要sleep一下, 可能提前获取了以后时间窗口的令牌,要sleep平滑下流量
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);

// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}
}

扩展点一外部配置图

sentinel源码分析第十二篇一核心流程一FlowSlot流控_sentinel


举报

相关推荐

0 条评论