0
点赞
收藏
分享

微信扫一扫

Flink细粒度滑动窗口优化

1. 细粒度滑动窗口

在 Apache Flink 中,细粒度滑动窗口(Fine-grained Sliding Window)是处理数据流的一种窗口机制。滑动窗口允许对连续流数据进行批处理,能够对时间窗口内的数据进行聚合、分析或计算。细粒度滑动窗口特别适合需要频繁更新结果的场景。

1.1 滑动窗口的基本概念

滑动窗口由两个关键参数决定:

l 窗口大小(Window Size):每个窗口覆盖的数据范围。

l 滑动步长(Slide Interval):窗口每次移动的距离,即窗口的更新频率。

1.2 细粒度滑动窗口的特点

细粒度滑动窗口是指滑动步长相对窗口大小较小的滑动窗口。也就是说,窗口每次滑动的步长很小,可能只有一个事件或很短的时间间隔。这种窗口允许对流数据进行更细致的分析和更频繁的更新。

1.3 细粒度滑动的影响

当使用细粒度的滑动窗口(窗口长度远远大于滑动步长)时,重叠的窗口过多,一个数

据会属于多个窗口,性能会急剧下降。

Flink细粒度滑动窗口优化_细粒度

根据Flink的窗口分配机制,每一条数据进入缓存时会计算一个开始时间以及一个结束时间,也就是这条数据的窗口。但是从图中可以看到一条数据会出现在多个窗口中,而每个窗口又是不同的对象,这个对象需要将数据存储在自己状态中才能实现对数据的计算,也就是一条数据被几个窗口重叠,就需要计算几次。

重叠次数=窗口大小/滑动步长

假设我们以 3 分钟的频率实时计算 App 内各个子模块近 24 小时的PV 和 UV。我们需要用粒度为 1440 / 3 = 480 的滑动窗口来实现它,细粒度的滑动窗口会带来性能问题,有两点:

1. 状态

对于一个元素,会将其写入对应的(key, window)二元组所圈定的 windowState 状态

中。如果粒度为 480,那么每个元素到来,更新 windowState 时都要遍历 480 个窗口并写

入,开销是非常大的。在采用 RocksDB 作为状态后端时,checkpoint 的瓶颈也尤其明显。

2. 定时器

每一个(key, window)二元组都需要注册两个定时器:一是触发器注册的定时器,用于

决定窗口数据何时输出;二是 registerCleanupTimer()方法注册的清理定时器,用于在窗

口彻底过期(如 allowedLateness 过期)之后及时清理掉窗口的内部状态。细粒度滑动窗

口会造成维护的定时器增多,内存负担加重。

2.优化思路

官方并未做出针对问题的解决方案。

Flink细粒度滑动窗口优化_滑动窗口_02

2.1滚动窗口+在线存储+读时聚合的解决方案

(1) 从业务的视角来看,往往窗口的长度是可以被步长所整除的,可以找到窗口长度和窗口步长的最小公约数作为时间分片(一个滚动窗口的长度)。

(2) 每个滚动窗口将其周期内的数据做聚合,存到下游状态或打入外部在线存储(内存数据库如 Redis,LSM-based NoSQL 存储如 HBase)。

(3) 扫描在线存储中对应时间区间(可以灵活指定)的所有行,并将计算结果返回给前端展示。

参考的是Spark窗口的实现sparkstreaming滚动窗口和滑动窗口,也就是在Flink上实现类似Spark的功能。

2.2测试案例

2.2.1提交案例:统计最近 1 小时的 uv,1 秒更新一次(滑动窗口)

bin/flink run \ 
-t yarn-per-job \ 
-d \ 
-p 5 \ 
-Drest.flamegraph.enabled=true \
-Dyarn.application.queue=test \ 
-Djobmanager.memory.process.size=1024mb \ 
-Dtaskmanager.memory.process.size=2048mb \ 
-Dtaskmanager.numberOfTaskSlots=2 \ 
-c com.atguigu.flink.tuning.SlideWindowDemo \ 
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \ 
--sliding-split false

2.2.2执行结果分析

Flink细粒度滑动窗口优化_细粒度_03

通过在Flink Dashboar中查看资源使用情况可以看见,滑动窗口是最耗费资源的,其达到一个顶峰。

Flink细粒度滑动窗口优化_细粒度_04

2.2.3核心代码

if (isSlidingSplit) {
            uvOneDS
                    .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
                    .reduce(
                            (value1, value2) -> value1 + value2,	//聚合
                            new SplitTumbleWindowPAWF()	//设置滚动窗口的状态(开始时间结束时间)
                    )
                    .keyBy(r -> 1)
                    .process(new SplitWindowAggFunction()).setParallelism(1)	//设置窗口的并行度为1
                    .print().setParallelism(1);

}

其中我们假设滑动窗口的步长为1秒,`TumblingProcessingTimeWindows.of(Time.seconds(1))`,这条语句将滑动窗口切割成很多个一秒钟的滚动窗口,每一秒聚合一次。

2.2.4滚动窗口的并行度

针对的数据量较小的情况,我们需要设置并行度为1,也就是滚动窗口按顺序、一条线的聚合起来。

public class SplitWindowAggFunction extends KeyedProcessFunction<Integer, Tuple3<String, String, Long>, Tuple3<String, String, Long>> {
    ValueState<List<Tuple3<String, String, Long>>> splitListState;
    List<Tuple3<String, String, Long>> splitList;
    int splitNum;
    Long windowUvCount;
    Tuple3<String, String, Long> windowAggResult;

    @Override
    public void open(Configuration parameters) throws Exception {
        splitListState = getRuntimeContext()
                .getState(
                        new ValueStateDescriptor<List<Tuple3<String, String, Long>>>(
                                "splitListState",
                                Types.LIST(Types.TUPLE(Types.STRING, Types.STRING, Types.LONG)),
                                new ArrayList<Tuple3<String, String, Long>>()
                        )
                );
        // 需求:60分钟的窗口,1秒的滑动 ===》 分片数=60分钟/1秒
        splitNum = 60 * 60;
        windowAggResult = new Tuple3<>();
    }

    @Override
    public void processElement(Tuple3<String, String, Long> value, Context ctx, Collector<Tuple3<String, String, Long>> out) throws Exception {
        // 每个时间分片的结果来,先清空统计值
        windowAggResult.f2 = 0L;
        // 将新的时间分片结果添加到List,删除第一个过期的时间分片
        splitList = splitListState.value();
        splitList.add(value);
        if (splitList.size() >= splitNum) {
            if (splitList.size() == (splitNum + 1)) {
                splitList.remove(0);
            }
            // 对时间分片聚合(复用Tuple3对象,减小开销)
            for (int i = 0; i < splitList.size(); i++) {
                // 累加时间分片的统计结果
                windowAggResult.f2 += splitList.get(i).f2;
                // windowStart取第一个元素的 开始时间
                if (i == 0) {
                    windowAggResult.f0 = splitList.get(i).f0;
                }
                // windowEnd取最后一个元素的 结束时间
                if (i == (splitList.size() - 1)) {
                    windowAggResult.f1 = splitList.get(i).f1;
                }
            }
            out.collect(windowAggResult);
        }

    }

}

Flink细粒度滑动窗口优化_Flink_05

2.2.5提交案例:统计最近 1 小时的 uv,1 秒更新一次(滚动窗口+状态存储)

bin/flink run \ 
-t yarn-per-job \ 
-d \ 
-p 5 \ 
-Drest.flamegraph.enabled=true \ 
-Dyarn.application.queue=test \ 
-Djobmanager.memory.process.size=1024mb \ 
-Dtaskmanager.memory.process.size=2048mb \ 
-Dtaskmanager.numberOfTaskSlots=2 \ 
-c com.atguigu.flink.tuning.SlideWindowDemo \ 
/opt/module/flink-1.13.1/myjar/flink-tuning-1.0-SNAPSHOT.jar \ 
--sliding-split true

Flink细粒度滑动窗口优化_Flink_06

可见当我们实现的由滑动窗口到滚动窗口的转化,Flink计算资源会得到极大改善。


3.总结

此项技术基于Flink 1.13,其对 SQL 模块的 Window TVF 进行了一系列的性能优化,可以自动对滑动窗口进行切片解决细粒度滑动问题。

Flink细粒度滑动窗口优化_细粒度_07

举报

相关推荐

0 条评论