滑动时间窗口
先不说sentinel的算法实现,先说什么是滑动时间窗口,
我们在进行限流的时候,比如通过QPS进行限流,那假如我们以秒为单位,举个例子:
我设置了限流规则,qps是10
如果不使用滑动窗口算法,在统计qps的时候,就有可能会有问题,比如:
在0 - 500ms中的请求数是0,但是在500ms - 1000 ms的请求数是10;在1000ms - 1500ms 之间又进来了5个请求,此时只判断了1000ms - 2000 ms之间的请求数
因为没有把1S进行拆分多个窗口,所以在1000ms之后,请求进来的时候,统计的是1000ms - 2000ms之间的请求数
场景1:如果在1100ms的时候,进来了5个请求,此时统计1000ms - 2000ms 之间的QPS,满足10这个阈值,就会让请求进来;但是如果是从1100ms 往前算1000ms的话,实际上应该统计的是 100ms - 1100ms之间 这1S的请求数,此时会发现,100 - 1100ms之间的请求数,早就大于10了
所以,如果不使用滑动窗口算法,其实也没太大影响,只是统计的请求数会不是特别准确,对系统会有一定的压力
下面再来说,使用滑动窗口算法的情况
假如我们把2S拆分为4个窗口,每个窗口是500ms
还是上面的假设:在0 - 500ms中的请求数是0,但是在500ms - 1000 ms的请求数是10
场景1:如果当前1100ms请求进来,有3次请求,滑动时间窗口算法会统计500ms - 1500ms 这1S之内的值,会发现此时这个时间段内,请求数量已经达到10了,就会限流,具体的演变过程是这样的:
在1100ms,请求进来的时候,会计算一下,1100ms应该在哪个窗口中,因为只有两个窗口:
公式1:
1100ms / 500ms = 2
2 % 窗口数量2 = 0
得到当前应该在第0个窗口
接着会比较两个窗口的起始时间
现在0号窗口的起始时间是0ms,那1100ms所在窗口的起始时间怎么计算?
公式2:
1100ms - 1100ms % 500 = 1000ms
此时就会得到:1100ms所在窗口的起始时间是1000ms,但是实际0号窗口的起始时间是0ms
1000 > 0,此时就会把0-500ms这个窗口进行reset,然后将0号窗口的起始时间设置为1000ms
此时再来统计请求数量,就把两个窗口的请求数量累加即可,1100ms在请求进来的时候,会发现,已经达到了10的阈值,就会进行限流
这里只是把1S拆分为两个窗口来举例,如果我们把窗口拆的越多,理论上而言,统计会越准确,但是成本也会越大,因为这些数据存储在内存中,会有一定的消耗
sentinel滑动窗口算法的实现
StatisticNode在初始化的时候,有这么一行代码
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
第一个参数默认是2,第二个参数是1000ms;
public LeapArray(int sampleCount, int intervalInMs) {
AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.intervalInSecond = intervalInMs / 1000.0;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
在ArrayMetric的构造函数中,会初始化一个LeapArray对象,在这个对象内部,维护了一个atomicReferenceArray,这个数组,存放的就是我们所说的窗口
我们可以看到,sentinel默认把1000ms分为了两个时间窗口,每个窗口占500ms
除了上面之外,还维护了一个分钟维度的,一分钟分为了60个窗口,每个窗口1S
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
这是sentinel所维护的统计类型枚举,比如:时间窗口内请求通过数量、被限流的数量、异常的梳理、成功的数量等
public enum MetricEvent {
/**
* Normal pass.
*/
PASS,
/**
* Normal block.
*/
BLOCK,
EXCEPTION,
SUCCESS,
RT,
/**
* Passed in future quota (pre-occupied, since 1.5.0).
*/
OCCUPIED_PASS
}
我们举其中一个例子:维护当前时间窗口内通过请求的数量;其他类型都是类似的代码
com.alibaba.csp.sentinel.slots.statistic.metric.Metric#addPass
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addPass(count);
在这个方法中,就两行代码,我们分开说
获取当前时间所在的窗口
data.currentWindow()
这个方法有两层含义:
1.如果当前时间,在目前所维护的窗口内,就返回当期那所维护的窗口
2.如果当前时间,不在所维护的窗口内,就会替换现在的窗口
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// 一、计算当前时间应该所处的窗口对应的下标
int idx = calculateTimeIdx(timeMillis);
// 二、根据当前时间,按照每个窗口500ms,计算当前时间所处窗口的起始时间
long windowStart = calculateWindowStart(timeMillis);
// 三、while循环中,有四个场景
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
return old;
} else if (windowStart > old.windowStart()) {
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
在这段代码中,有两个计算方法是至关重要的,分别是我标注注释一和二的位置
这两个公式,在上面举例的时候, 有说到过,就不再说明了
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
long timeId = timeMillis / windowLengthInMs;
// Calculate current index so we can map the timestamp to the leap array.
return (int)(timeId % array.length());
}
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}
针对while(true)中的四种场景,我们分别举例,来看下是如何处理的
前提:1000ms划分为两个窗口
场景一:在300ms的时候,第一次请求进来,根据上面两个公式,计算得到idx为0,windowStart为0ms,此时进入while循环,符合第一个判断,对窗口进行初始化
场景二:在400ms的时候,又进来一次请求,此时根据上面两个公式,计算得到idx为0,windowStart为0ms,此时进入while循环,符合第二个判断,return
场景三:在1300ms的时候,进来一次请求,根据上面两个公式,计算得到idx为0,windowStart为1000ms,此时进入while循环,符合第三个判断,对现在idx为0的窗口进行reset,然后设置起始时间是1000ms
场景四:这个场景,在网上看了下,认可比较多的是这样的,在场景三,重新设置了起始时间是1000之后,在1400ms的时候,又进来一个请求,但此时机器的时间被回调了,回调到1S之前,那此时,实际拿到的当前时间是400ms,重新计算之后,发现idx是0,但是起始时间是 0ms,就符合场景四;场景四我理解是一种异常场景,因为场景四是自己new了一个窗口,这个窗口并没有放到维护的窗口数组中,理论上,这个请求不会被统计到
统计数量
public void addPass(int n) {
add(MetricEvent.PASS, n);
}
public MetricBucket add(MetricEvent event, long n) {
counters[event.ordinal()].add(n);
return this;
}
这个方法就简单了,就是在当前窗口对象中,维护对应类型的count + 1,即可,这里是通过LongAdder来维护的
统计其他类型的数量也是类似的,只是在最后add(MetricEvent.PASS, n);这个方法中,第一个参数不一样而已
总结
在没有看sentinel源码之前,我理解的滑动时间窗口算法,是不停的在数组中新增一个新的窗口,然后remove第一个窗口,但是看了下sentinel的源码,发现他的处理方式是直接覆盖idx位置的窗口
对时间窗口算法也有一个问题:
举例:
如果在300ms的时候,进来5个请求,然后在600ms的时候,进来了3个请求;第三次请求进来,是在1600ms,那此时岂不是会覆盖idx为1的窗口,在统计的时候,不就统计成 0-500ms 和1000ms - 1500ms两个窗口的请求数量了吗?这块不知道会不会有问题
针对这个问题,我再看源码确认下,哪位大哥有结论,也可以一起探讨下