最后,桶滑动统计流以桶计数流作为来源,按照步长为1、长度为设定的桶数(配置的滑动窗口桶数)的规则划分滑动窗口,并对滑动窗口内的所有桶数据按照各事件类型进行汇总,汇总成最终的窗口健康数据,并将其弹射出去,形成最终的桶滑动统计流,作为Hystrix熔断器进行状态转换的数据支撑。
以上介绍的Hystrix健康统计滑动窗口的执行流程如图5-13所示。
图5-13 Hystrix健康统计滑动窗口的执行流程
为了帮助大家学习Hystrix滑动窗口的执行流程,这里设计一个简单的Hystrix滑动窗口模拟实现用例,对Hystrix滑动窗口数据流的处理过程进行简化,只留下核心部分,简化的模拟执行流程如下:
首先,模拟HystrixCommand的事件发送机制,每100毫秒发送一个随机值(0或1),随机值为0代表失败,为1代表成功,模拟命令完成事件流。
其次,模拟HystrixCommand的桶计数流,以事件流作为来源,将事件流中的事件按照固定时间长度(300毫秒)划分成时间桶滚动窗口,并对时间桶滚动窗口内值为0的事件进行累积,完成之后将累积数据弹射出去,形成桶计数流。
最后,模拟桶计数流作为来源,按照步长为1、长度为设定的桶数
(3)的规则划分滑动窗口,并对滑动窗口内的所有桶数据进行汇总,汇总成最终的失败统计数据,并将其弹射出去,形成最终的桶滑动统计流。
以上模拟Hystrix健康统计滑动窗口的执行流程如图5-14所示。
图5-14 模拟的Hystrix健康统计滑动窗口简化版执行流程
简化的模拟Hystrix健康统计滑动窗口执行流程的实现代码如下:
package com.crazymaker.demo.rxJava.basic;
//省略import
@Slf4j
public class WindowDemo
{
/**
*演示模拟Hystrix的健康统计metric
*/
@Test
public void hystrixTimewindowDemo() throws InterruptedException
{
//创建Random类对象
Random random = new Random();
//模拟Hystrix event事件流,每100毫秒发送一个0或1随机值
//随机值为0代表失败,随机值为1代表成功
Observable eventStream = Observable
.interval(100, TimeUnit.MILLISECONDS) .map(i -> random.nextInt(2));
/**
*完成桶内0值计数的聚合函数
*/
Func1 reduceBucketToSummary =
new Func1<Observable, Observable>()
{
@Override
public Observable call(Observable eventBucket)
{
Observable<List> olist = eventBucket.toList();
Observable countValue = olist.map(list ->
{
long count = list.stream().filter(i -> i == 0).count();
log.info(“{} '0 count:{}”, list.toString(), count);
return count;
});
return countValue;
}
};
/**
*桶计数流
*/
Observable bucketedCounterStream = eventStream
.window(300, TimeUnit.MILLISECONDS)
.flatMap(reduceBucketToSummary); //将时间桶进行聚合,统计事件值为0 的个数
/**
*滑动窗口聚合函数
*/
Func1 reduceWindowToSummary = new Func1<Observable, Observable>()
{
@Override
public Observable call(Observable eventBucket)
{
return eventBucket.reduce(new Func2<Long, Long, Long>()
{
@Override
public Long call(Long bucket1, Long bucket2)
{
/**
*对窗口内的桶进行累加
*/
return bucket1 + bucket2;
}
});
}
};
/**
*桶滑动统计流
*/
Observable bucketedRollingCounterStream = bucketedCounterStream
.window(3, 1)
.flatMap(reduceWindowToSummary);//将滑动窗口进行聚合
bucketedRollingCounterStream.subscribe(sum -> log.info(“滑动窗口的和:{}”, sum));
Thread.sleep(Integer.MAX_VALUE);
}
}
运行这个示例程序,输出的结果部分节选如下:
[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - [0, 0, 0] '0 count:3
[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - [0, 1, 1] '0 count:1
[RxComputationScheduler-1] INFO c.c.d.r 《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》无偿开源 威信搜索公众号【编程进阶路】 xJava.basic.WindowDemo - [1, 0, 1] '0 count:1
[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - 滑动窗口的和:5
[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - [0, 1, 0] '0 count:2
[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - 滑动窗口的和:4
[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - [0, 1, 0] '0 count:2
[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - 滑动窗口的和:5
[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - [1, 1, 1] '0 count:0
[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - 滑动窗口的和:4
[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - [0, 1, 1] '0 count:1
[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - 滑动窗口的和:3
[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - [1, 0, 0] '0 count:2
[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - 滑动窗口的和:3
[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - [1, 1, 1] '0 count:0
[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - 滑动窗口的和:3
[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - [1, 1, 0] '0 count:1
[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - 滑动窗口的和:3
[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - [1, 1, 1] '0 count:0
[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - 滑动窗口的和:1
在这个示例程序的代码中,eventStream流通过interval操作符每100毫秒发送一个随机值(0或1),随机值为0代表失败,为1代表成功,模拟HystrixCommand的事件发送机制。
桶计数流bucketedCounterStream使用window操作符以300毫秒为一个时间桶窗口,将原始的事件流进行拆分,每个时间桶窗口的3事件聚合起来,输出一个新的Observable(子流)。然后,bucketedCounterStream通过flapMap操作将每一个Observable进行扁平化。
桶计数流bucketedCounterStream的处理过程如图5-15所示。
图5-15 模拟的桶计数流bucketedCounterStream的处理过程
bucketedCounterStream的flapMap扁平化操作是通过调用reduceBucketToSummary方法完成的,该方法首先将每一个时间桶窗口内的Observable子流内的元素序列转成一个列表(List),然后进行过滤(留下值为0事件)和统计,返回值为0的元素统计数量(失败数)。
接下来,需要对bucketedCounterStream桶计数进行汇总统计,形成滑动窗口的统计数据,这个工作由
bucketedRollingCounterStream桶滑动统计流完成。
桶滑动统计流仍然使用window和flatMap两个操作符,先在输入流中通过window操作符按照步长为1、长度为3的规则划分滑动窗口,每个滑动窗口的3统计数据被聚集起来,输出一个新的Observable。然后通过flatMap扁平化操作符对每一个Observable进行聚合,计算出各元素的累加值。
模拟的桶滑动统计流
bucketedRollingCounterStream的处理过程如图5-16所示。
图5-16 桶滑动统计流bucketedRollingCounterStream的处理过程
bucketedRollingCounterStream的flapMap扁平化操作是通过调用reduceWindowToSummary方法完成的,该方法通过RxJava的reduce操作符进行“聚合”操作,将Observable子流中的3事件的累加结果计算出来。
Hystrix滑动窗口的核心实现原理
===================
在Hystrix中,业务逻辑以命令模式封装成了一个个命令(HystrixCommand),每个命令执行完成后都会发送命令完成事件(HystrixCommandCompletion)到HystrixCommandCompletion Stream命令完成事件流。HystrixCommandCompletion是Hystrix中核心的事件,它可以代表某个命令执行成功、超时、异常等各种状态,与Hystrix熔断器的状态转换息息相关。
桶计数流BucketedCounterStream是一个抽象类,提供了基本的桶计数器实现。用户在使用Hystrix的时候一般都要配置两个值:timeInMilliseconds(滑动窗口的长度,时间间隔)和numBuckets(滑动窗口中的桶数),每个桶对应的时间长度就是bucketSizeInMs=
timeInMilliseconds/numBuckets,该时间长度可以记为一个时间桶窗口BucketedCounterStream每隔一个时间桶窗口就把这段时间内的所有调用事件聚合到一个累积桶内。下面来看一下它的实现。
protected BucketedCounterStream(final HystrixEventStream inputEventStream, final int numBuckets, final int bucketSizeInM
this.numBuckets = numBuckets;
this.reduceBucketToSummary = new Func1<Observable, Observable>() {
@Override
public Observable call(Observable eventBucket) {
return eventBucket.reduce(getEmptyBucketSummary(),
appendRawEventToBucket);
}
};
…
this.bucketedStream = Observable.defer(new Func0<Observable>() {
@Override
public Observable call() {
return inputEventStream
.observe()