1. 细粒度滑动窗口
在 Apache Flink 中,细粒度滑动窗口(Fine-grained Sliding Window)是处理数据流的一种窗口机制。滑动窗口允许对连续流数据进行批处理,能够对时间窗口内的数据进行聚合、分析或计算。细粒度滑动窗口特别适合需要频繁更新结果的场景。
1.1 滑动窗口的基本概念
滑动窗口由两个关键参数决定:
l 窗口大小(Window Size):每个窗口覆盖的数据范围。
l 滑动步长(Slide Interval):窗口每次移动的距离,即窗口的更新频率。
1.2 细粒度滑动窗口的特点
细粒度滑动窗口是指滑动步长相对窗口大小较小的滑动窗口。也就是说,窗口每次滑动的步长很小,可能只有一个事件或很短的时间间隔。这种窗口允许对流数据进行更细致的分析和更频繁的更新。
1.3 细粒度滑动的影响
当使用细粒度的滑动窗口(窗口长度远远大于滑动步长)时,重叠的窗口过多,一个数
据会属于多个窗口,性能会急剧下降。
根据Flink的窗口分配机制,每一条数据进入缓存时会计算一个开始时间以及一个结束时间,也就是这条数据的窗口。但是从图中可以看到一条数据会出现在多个窗口中,而每个窗口又是不同的对象,这个对象需要将数据存储在自己状态中才能实现对数据的计算,也就是一条数据被几个窗口重叠,就需要计算几次。
重叠次数=窗口大小/滑动步长
假设我们以 3 分钟的频率实时计算 App 内各个子模块近 24 小时的PV 和 UV。我们需要用粒度为 1440 / 3 = 480 的滑动窗口来实现它,细粒度的滑动窗口会带来性能问题,有两点:
1. 状态
对于一个元素,会将其写入对应的(key, window)二元组所圈定的 windowState 状态
中。如果粒度为 480,那么每个元素到来,更新 windowState 时都要遍历 480 个窗口并写
入,开销是非常大的。在采用 RocksDB 作为状态后端时,checkpoint 的瓶颈也尤其明显。
2. 定时器
每一个(key, window)二元组都需要注册两个定时器:一是触发器注册的定时器,用于
决定窗口数据何时输出;二是 registerCleanupTimer()方法注册的清理定时器,用于在窗
口彻底过期(如 allowedLateness 过期)之后及时清理掉窗口的内部状态。细粒度滑动窗
口会造成维护的定时器增多,内存负担加重。
2.优化思路
官方并未做出针对问题的解决方案。
2.1滚动窗口+在线存储+读时聚合的解决方案
(1) 从业务的视角来看,往往窗口的长度是可以被步长所整除的,可以找到窗口长度和窗口步长的最小公约数作为时间分片(一个滚动窗口的长度)。
(2) 每个滚动窗口将其周期内的数据做聚合,存到下游状态或打入外部在线存储(内存数据库如 Redis,LSM-based NoSQL 存储如 HBase)。
(3) 扫描在线存储中对应时间区间(可以灵活指定)的所有行,并将计算结果返回给前端展示。
参考的是Spark窗口的实现sparkstreaming滚动窗口和滑动窗口,也就是在Flink上实现类似Spark的功能。
2.2测试案例
2.2.1提交案例:统计最近 1 小时的 uv,1 秒更新一次(滑动窗口)
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=2048mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.atguigu.flink.tuning.SlideWindowDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \
--sliding-split false
2.2.2执行结果分析
通过在Flink Dashboar中查看资源使用情况可以看见,滑动窗口是最耗费资源的,其达到一个顶峰。
2.2.3核心代码
if (isSlidingSplit) {
uvOneDS
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.reduce(
(value1, value2) -> value1 + value2, //聚合
new SplitTumbleWindowPAWF() //设置滚动窗口的状态(开始时间结束时间)
)
.keyBy(r -> 1)
.process(new SplitWindowAggFunction()).setParallelism(1) //设置窗口的并行度为1
.print().setParallelism(1);
}
其中我们假设滑动窗口的步长为1秒,`TumblingProcessingTimeWindows.of(Time.seconds(1))`,这条语句将滑动窗口切割成很多个一秒钟的滚动窗口,每一秒聚合一次。
2.2.4滚动窗口的并行度
针对的数据量较小的情况,我们需要设置并行度为1,也就是滚动窗口按顺序、一条线的聚合起来。
public class SplitWindowAggFunction extends KeyedProcessFunction<Integer, Tuple3<String, String, Long>, Tuple3<String, String, Long>> {
ValueState<List<Tuple3<String, String, Long>>> splitListState;
List<Tuple3<String, String, Long>> splitList;
int splitNum;
Long windowUvCount;
Tuple3<String, String, Long> windowAggResult;
@Override
public void open(Configuration parameters) throws Exception {
splitListState = getRuntimeContext()
.getState(
new ValueStateDescriptor<List<Tuple3<String, String, Long>>>(
"splitListState",
Types.LIST(Types.TUPLE(Types.STRING, Types.STRING, Types.LONG)),
new ArrayList<Tuple3<String, String, Long>>()
)
);
// 需求:60分钟的窗口,1秒的滑动 ===》 分片数=60分钟/1秒
splitNum = 60 * 60;
windowAggResult = new Tuple3<>();
}
@Override
public void processElement(Tuple3<String, String, Long> value, Context ctx, Collector<Tuple3<String, String, Long>> out) throws Exception {
// 每个时间分片的结果来,先清空统计值
windowAggResult.f2 = 0L;
// 将新的时间分片结果添加到List,删除第一个过期的时间分片
splitList = splitListState.value();
splitList.add(value);
if (splitList.size() >= splitNum) {
if (splitList.size() == (splitNum + 1)) {
splitList.remove(0);
}
// 对时间分片聚合(复用Tuple3对象,减小开销)
for (int i = 0; i < splitList.size(); i++) {
// 累加时间分片的统计结果
windowAggResult.f2 += splitList.get(i).f2;
// windowStart取第一个元素的 开始时间
if (i == 0) {
windowAggResult.f0 = splitList.get(i).f0;
}
// windowEnd取最后一个元素的 结束时间
if (i == (splitList.size() - 1)) {
windowAggResult.f1 = splitList.get(i).f1;
}
}
out.collect(windowAggResult);
}
}
}
2.2.5提交案例:统计最近 1 小时的 uv,1 秒更新一次(滚动窗口+状态存储)
bin/flink run \
-t yarn-per-job \
-d \
-p 5 \
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=2048mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-c com.atguigu.flink.tuning.SlideWindowDemo \
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \
--sliding-split true
可见当我们实现的由滑动窗口到滚动窗口的转化,Flink计算资源会得到极大改善。
3.总结
此项技术基于Flink 1.13,其对 SQL 模块的 Window TVF 进行了一系列的性能优化,可以自动对滑动窗口进行切片解决细粒度滑动问题。