Window概述
Streaming流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而window是一种切割无限数据为有限块进行处理的手段。
Window是无限数据流处理的核心,Window将一个无限的stream拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作。
flink中的window可以理解为水桶,数据理解为水流,水流源源不断,对于DataStream来说是来一个数据处理一条,有了桶之后,将数据装进桶中再处理,在窗口关闭的时候将计算结果输出
结论: 窗口=数据桶 ;窗口本质=数据 union
Window类型
Flink中Window可以分成两大类
(1)CountWindow:按照指定的数据条数生成一个Window,与时间无关。
- 滚动计数窗口
- 滑动计数窗口
(2)TimeWindow: 按照时间生成Window。
- 滚动窗口
- 滑动窗口
- session Window
Spark中窗口只有时间窗口
1.滚动窗口(Tumbling Windows)
(1)将数据依据固定的窗口长度对数据进行切片。
(2)特点:窗口长度固定,窗口之间没有数据重叠;窗口是左闭右开;窗口对齐
滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。例如:如果你指定了一个5分钟大小的滚动窗口,窗口的创建如下图所示:
2. 滑动窗口(Sliding Windows)
特点:时间对齐,窗口长度固定,窗口之间有数据重叠;左闭右开;窗口对齐
例如,你有10分钟的窗口和5分钟的滑动,那么每个窗口中5分钟的窗口里包含着上个10分钟产生的数据,如下图所示:
一般情况下,窗口大小都是步长的整数倍,否则会造成有的数据属于n个窗口,有的数据属于n+1个窗口;步长越大,窗口之间重叠数据越少。
窗口1h,步长30min,每个数据属于两个窗口
窗口1h,步长5min,每个数据属于12个窗口
3. 会话窗口(Session Windows)
会话窗口属于时间窗口
特点:窗口的长度不固定,而是当隔一段时间没有数据产生,就关闭窗口;时间无对齐
- 需要指定session gap time 间隙时间,session gap time是窗口之间的最小时间间隔,当gap time内没有数据产生,就截取窗口
- 由一系列事件组合一个指定时间长度的timeout间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。
- session窗口分配器通过session活动来对元素进行分组,session窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个session窗口通过一个session间隔来配置,这个session间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的session将关闭并且后续的元素将被分配到新的session窗口中去。
Window API
开窗操作都是建立在keyedStream,因为做聚合往往是以类聚合。普通的dataStream只有windowAll
1.窗口的创建
Flink提供了 .timeWindow
和.countWindow
方法,用于定义时间窗口和计数窗口
① 滚动时间窗口 Tumblingtimewindow
.timeWindow(Time.seconds(5))
② 滑动时间窗口 slidingtimewindow
.timeWindow(Time.seconds(5),Time.seconds(2))
③ 会话窗口 session window
.window(EventTimeSessionWindows.withGap(Time.seconds(5));)
④ 滚动计数窗口 Tumblingcountwindow
.countWindow(5)
⑤ 滑动计数窗口 slidingCountWindow
.countWindow(10,2)
1.1 TimeWindow
TimeWindow是将指定时间范围内的所有数据组成一个window,一次对一个window里面的所有数据进行计算
1.1.1 滚动窗口
Flink默认的时间窗口根据Processing Time 进行窗口的划分,将Flink获取到的数据根据进入Flink的时间划分到不同的窗口中。
DataStream<Tuple2<String, Double>> minTempPerWindowStream = dataStream
.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
@Override
public Tuple2<String, Double> map(SensorReading value) throws Exception {
return new Tuple2<>(value.getId(), value.getTemperature());
}
})
.keyBy(data -> data.f0)
.timeWindow( Time.seconds(15) )
.minBy(1);
(1) 时间间隔可以通过Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一个来指定。
(2) 滚动窗口,按照窗口大小时间计算、输出一次
1.1.2 滑动窗口
- 滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是
window_size
,一个是sliding_size
。 - 下面代码中的sliding_size设置为了5s,也就是说,每5s就计算输出结果一次,每一次计算的window范围是15s内的所有元素。
DataStream<SensorReading> minTempPerWindowStream = dataStream
.keyBy(SensorReading::getId)
.timeWindow( Time.seconds(15), Time.seconds(5) )
.minBy("temperature");
(1)滑动窗口,按照步长时间输出一次,一共输出 窗口/步长
次
(2)滚动窗口本质上也是滑动窗口,滚动窗口的步长和窗口大小一致,一次只输出一次,且输出间隔为窗口大小
1.1.3 Session Window
- 将连续访问的数据放在一个窗口中,当停止时间超过指定时间,开始切窗口计算。
- session window的创建没有简写,必须通过window()方法,参数是窗口分配器
执行效果:连续输入不会看到结果,停止5s,就能输出结果了。
1.2 CountWindow
CountWindow根据窗口中相同key元素的数量来触发执行,执行时只计算元素数量达到窗口大小的key对应的结果
1.2.1 滚动窗口
默认的CountWindow是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。
DataStream<SensorReading> minTempPerWindowStream = dataStream
.keyBy(SensorReading::getId)
.countWindow( 5 )
.minBy("temperature");
注意: CountWindow是建立在消息个数上的,而且是keyedStream,所以CountWindow的window_size和step_size都是指的是相同Key的元素的个数,不是输入的所有元素的总数。
1.2.2 滑动窗口
滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。
滑动步长:下面代码中的sliding_size设置为了2,也就是说,每收到两个相同key的数据就计算一次,每一次计算的window范围是5个元素
DataStream<SensorReading> minTempPerWindowStream = dataStream
.keyBy(SensorReading::getId)
.countWindow( 5, 2 )
.minBy("temperature");
2. 基于窗口的聚合操作
1.窗口中收集数据的目的是为了做计算的,flink中提供了window function
用来定义要对窗口中收集的数据做的计算操作;
2.flink窗口的计算和输出可以独立进行;
也就是说可以只计算不输出,等到某一时刻才触发输出
2.1 增量聚合函数(incremental aggregation functions)
特点:来一条数据计算一次,但是不输出,当窗口达到闭合的条件的时候才会输出
典型的增量聚合函数有:
- ReduceFunction
- AggregateFunction(只能在keyedStream开窗后使用)
- Max、maxBy,min,minBy
2.2 全量窗口函数(full window functions)
特点:先把窗口所有数据收集起来不做计算,等到窗口关闭的时候,提供一个迭代器存储了窗口中所有的数据,等到触发计算的时候,会遍历所有数据,进行计算。
全窗口函数有:
- ProcessWindowFunction
- apply()
/
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function) {
WindowFunction:是个接口,有四个参数。
* @param <IN> The type of the input value.
* @param <OUT> The type of the output value.
* @param <KEY> The type of the key.
* @param <W> The type of {@code Window} that this window function can be applied on.
*/
@Public
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);
SingleOutputStreamOperator<Tuple2<String, Integer>> map = source.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return new Tuple2<>(value, 1);
}
});
KeyedStream<Tuple2<String, Integer>, Tuple> kStream = map.keyBy(0);
WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> ttwindow = kStream.timeWindow(Time.seconds(5));
//todo 用全量函数做窗口的wordCount 全量函数有两种:apply和ProcessWindowFunction
// 此处展示apply()的使用
// apply的参数是windowFunction,windowFunction是一个接口,继承了Function和序列化
SingleOutputStreamOperator<Tuple2<String, Integer>> apply = ttwindow.apply(new myWindowFunction());
apply.print();
env.execute();
}
//todo 做wordCount windowFunction需要四个泛型
// * @param <IN> The type of the input value.
// * @param <OUT> The type of the output value.
// * @param <KEY> The type of the key.(就是keydStream的key类型)
// * @param <W> The type of {@code Window} that this window function can be applied on.
//
public static class myWindowFunction
implements WindowFunction<Tuple2<String, Integer>,Tuple2<String, Integer>,
Tuple,TimeWindow>{
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
//TODO 实现apply方法,四个参数:
// 第一个就是KeyedStream的key
// 第二个是window 可以获取窗口的信息,比如窗口的起始和结束时间
// 第三个是窗口所有数据组成的迭代器
// 第四个是Collector,用于写出数据的
//todo 1.获取key 改成String类型
String key = tuple.getField(0);
// todo 2.全量函数会将窗口key相同的数据装到一个迭代器中
// 遍历迭代器处理key相同的数据集
Iterator<Tuple2<String, Integer>> inputIter = input.iterator();
int count = 0;
while(inputIter.hasNext()){
count += inputIter.next().f1;
}
// todo 3.处理完毕 用collector进行输出 和 flatMap一样
out.collect(new Tuple2<String,Integer>(key,count));
}
额外说明:
- 增量函数比全量效率高
- 全量的应用场景为必须拿到所有数据才能做计算。
apply函数:参数是WindowFunction
2.3 其它可选API
trigger() 触发器
- 定义 window 什么时候关闭,触发计算并输出结果
- 控制窗口什么时候关闭,数据什么时候计算。
- 窗口的关闭,计算,输出三者都可以独立操作。
evitor() —— 移除器
- 定义移除某些数据的逻辑
- 一般不用,窗口前就已经过滤了
以下三个函数只能在事件时间语义下使用
- .allowedLateness() —— 允许处理迟到的数据
- .sideOutputLateData() —— 将迟到的数据放入侧输出流
- .getSideOutput() —— 获取侧输出流
如果waterMark已经达到了窗口闭合时间,但是允许迟到数据,就会将迟到数据放进侧输出流
完整的窗口调用流程
额外说明:图中中括号是可选项,聚合函数是必选项(开窗就是为了做聚合计算的)
(1) Keyed Windows
(2) Noe-Keyed Windows
- 对于Non-keyed WindowAll来说必须要有触发器,否则不计算,会一直收集数据。
- TimeWindowAll可以自己触发计算
- windowAll操作一般不用了
时间窗口的时间偏移量
Flink中时间窗口和Spark中的时间窗口都是整点,比如步长为3s,那么窗口的开始和结束为止必然都是3s的整数倍,原因如下:
在滚动窗口的窗口分配器中:
- 滚动窗口的窗口分配器中会有个默认参数offset为0
- 默认情况下,创建的时间窗口都是以整数时间的,比如窗口为5s,那么窗口范围就是:
[x分0s,x分5s)、[x分5s,x分10s)。 - 时间偏移量的作用就是调整窗口的偏移,比如offset设置为1s,那么窗口范围变为:[x分1s,x分6s)、[x分6s,x分11s)…
应用场景:底层时间用的是UTC,对于中国东8区,比UTC标准时间早8h,那么在中国,如果窗口大小开为:Time.days(1),窗口范围就是早晨8点到晚上8点,并非0点到24点。需要将offset设置为-8
时间窗口的起始时间
(1)滚动时间窗口
assignWindows方法用来分配窗口
其中:now是系统当前时间,start是窗口的起始时间,窗口的起始时间计算方式如下所示:
当前时间 - (当前时间- offset + 窗口大小)% 窗口大小
- 比如:窗口大小为:5s,当前时间为9:00:01
- 9:00:01 - (9:00:01 - 0 + 5s)% 5 = 01 - (06)%5 = 0,所以窗口范围为[9:00:00,9:00:05)
- 从上面也能看出偏移量能移动窗口的偏移量
+windowSize的目的: timestamp - offset 可能为负数,当时间为1970年的时候,timestamp就是0,这会导致timestamp - 负数,也就是说时刻A开的窗,起始位置会在开窗时刻A的后面
Spark中窗口的起始时间也是这么确定的
(3)滑动时间窗口
在slidingProcessingTimeWindows类中:
对于滑动窗口来说,是通过slide确定窗口起始时间点的。
- 起始时间 = 时间戳 - (时间戳-步长)% 步长
比如一条数据时间是9:01,窗口大小为15min,步长5min=>
1.window数组:Windows = new ArrayList<>(3)
2.最后一个窗口的起始时间:lastStart = 9:00,
3.利用窗口大小和步进遍历,创建新的窗口加入window数组中:
for(long start = lastStart;start>timestamp - size;start -= slide){
Windows.add(new TimeWindow(start,start+size))
}
窗口左闭右开的问题
TimeWindow类中:
获取当前窗口的最大时间:
将窗口的end减去了1ms,也就是说窗口大小为5s,但是窗口只接受9:00:0000到9:00:0499时间范围内的数据
滑动窗口的问题
(1)滑动窗口,一个数据属于多少个窗口的问题
List:对于滑动窗口来说,创建窗口实际上是创建多个,窗口个数=size/slide
(2)窗口大小和滑动步长不是整数倍的问题
假设时间为9:01,窗口大小5min,步长2min
从图中可以看出来00-01s之间的数据落在三个窗口,01-02的数据落在2个窗口
结论:滑动窗口窗口大小和步长不是整数倍,会导致有的数据会落在 n+1个窗口,有的会落在n个窗口; n = WindowSize / SlideSize