0
点赞
收藏
分享

微信扫一扫

【Flink】【第六章 Window】

橙子好吃吗 2022-01-14 阅读 200
flink

Window概述

Streaming流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而window是一种切割无限数据为有限块进行处理的手段。
Window是无限数据流处理的核心,Window将一个无限的stream拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作。
flink中的window可以理解为水桶,数据理解为水流,水流源源不断,对于DataStream来说是来一个数据处理一条,有了桶之后,将数据装进桶中再处理,在窗口关闭的时候将计算结果输出

结论: 窗口=数据桶 ;窗口本质=数据 union

Window类型

Flink中Window可以分成两大类

(1)CountWindow:按照指定的数据条数生成一个Window,与时间无关。

  • 滚动计数窗口
  • 滑动计数窗口

(2)TimeWindow: 按照时间生成Window。

  • 滚动窗口
  • 滑动窗口
  • session Window

Spark中窗口只有时间窗口

1.滚动窗口(Tumbling Windows)

(1)将数据依据固定的窗口长度对数据进行切片。
(2)特点:窗口长度固定,窗口之间没有数据重叠;窗口是左闭右开;窗口对齐
滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。例如:如果你指定了一个5分钟大小的滚动窗口,窗口的创建如下图所示:
在这里插入图片描述

2. 滑动窗口(Sliding Windows)

特点:时间对齐,窗口长度固定,窗口之间有数据重叠;左闭右开;窗口对齐
例如,你有10分钟的窗口和5分钟的滑动,那么每个窗口中5分钟的窗口里包含着上个10分钟产生的数据,如下图所示:
在这里插入图片描述
一般情况下,窗口大小都是步长的整数倍,否则会造成有的数据属于n个窗口,有的数据属于n+1个窗口;步长越大,窗口之间重叠数据越少。
窗口1h,步长30min,每个数据属于两个窗口
窗口1h,步长5min,每个数据属于12个窗口

3. 会话窗口(Session Windows)

会话窗口属于时间窗口
特点:窗口的长度不固定,而是当隔一段时间没有数据产生,就关闭窗口;时间无对齐

  • 需要指定session gap time 间隙时间,session gap time是窗口之间的最小时间间隔,当gap time内没有数据产生,就截取窗口
  • 由一系列事件组合一个指定时间长度的timeout间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。
  • session窗口分配器通过session活动来对元素进行分组,session窗口跟滚动窗口和滑动窗口相比,不会有重叠固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个session窗口通过一个session间隔来配置,这个session间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的session将关闭并且后续的元素将被分配到新的session窗口中去。
    在这里插入图片描述

Window API

开窗操作都是建立在keyedStream,因为做聚合往往是以类聚合。普通的dataStream只有windowAll

1.窗口的创建

Flink提供了 .timeWindow.countWindow方法,用于定义时间窗口和计数窗口
① 滚动时间窗口 Tumblingtimewindow

.timeWindow(Time.seconds(5))

② 滑动时间窗口 slidingtimewindow

.timeWindow(Time.seconds(5),Time.seconds(2))

③ 会话窗口 session window

.window(EventTimeSessionWindows.withGap(Time.seconds(5));)

④ 滚动计数窗口 Tumblingcountwindow

.countWindow(5)

⑤ 滑动计数窗口 slidingCountWindow

.countWindow(10,2)

1.1 TimeWindow

TimeWindow是将指定时间范围内的所有数据组成一个window,一次对一个window里面的所有数据进行计算

1.1.1 滚动窗口

Flink默认的时间窗口根据Processing Time 进行窗口的划分,将Flink获取到的数据根据进入Flink的时间划分到不同的窗口中。

DataStream<Tuple2<String, Double>> minTempPerWindowStream = dataStream
                .map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
                    @Override
                    public Tuple2<String, Double> map(SensorReading value) throws Exception {
                        return new Tuple2<>(value.getId(), value.getTemperature());
                    }
                })
                .keyBy(data -> data.f0)
                .timeWindow( Time.seconds(15) )
                .minBy(1);

(1) 时间间隔可以通过Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一个来指定。
(2) 滚动窗口,按照窗口大小时间计算、输出一次

1.1.2 滑动窗口

  • 滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size
  • 下面代码中的sliding_size设置为了5s,也就是说,每5s就计算输出结果一次,每一次计算的window范围是15s内的所有元素。
DataStream<SensorReading> minTempPerWindowStream = dataStream
                .keyBy(SensorReading::getId) 
                .timeWindow( Time.seconds(15), Time.seconds(5) )
                .minBy("temperature");

(1)滑动窗口,按照步长时间输出一次,一共输出 窗口/步长
(2)滚动窗口本质上也是滑动窗口,滚动窗口的步长和窗口大小一致,一次只输出一次,且输出间隔为窗口大小

1.1.3 Session Window

  1. 将连续访问的数据放在一个窗口中,当停止时间超过指定时间,开始切窗口计算。
  2. session window的创建没有简写,必须通过window()方法,参数是窗口分配器

执行效果:连续输入不会看到结果,停止5s,就能输出结果了。

1.2 CountWindow

CountWindow根据窗口中相同key元素的数量来触发执行,执行时只计算元素数量达到窗口大小的key对应的结果

1.2.1 滚动窗口

默认的CountWindow是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。

DataStream<SensorReading> minTempPerWindowStream = dataStream
                .keyBy(SensorReading::getId) 
                .countWindow( 5 )
                .minBy("temperature");

注意: CountWindow是建立在消息个数上的,而且是keyedStream,所以CountWindow的window_size和step_size都是指的是相同Key的元素的个数,不是输入的所有元素的总数。
在这里插入图片描述

1.2.2 滑动窗口

滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。
滑动步长:下面代码中的sliding_size设置为了2,也就是说,每收到两个相同key的数据就计算一次,每一次计算的window范围是5个元素

DataStream<SensorReading> minTempPerWindowStream = dataStream
                .keyBy(SensorReading::getId) 
                .countWindow( 5, 2 )
                .minBy("temperature");

在这里插入图片描述

2. 基于窗口的聚合操作

1.窗口中收集数据的目的是为了做计算的,flink中提供了window function 用来定义要对窗口中收集的数据做的计算操作;

2.flink窗口的计算和输出可以独立进行;
也就是说可以只计算不输出,等到某一时刻才触发输出

2.1 增量聚合函数(incremental aggregation functions)

特点:来一条数据计算一次,但是不输出,当窗口达到闭合的条件的时候才会输出

典型的增量聚合函数有:

  • ReduceFunction
  • AggregateFunction(只能在keyedStream开窗后使用)
  • Max、maxBy,min,minBy

2.2 全量窗口函数(full window functions)

特点:先把窗口所有数据收集起来不做计算,等到窗口关闭的时候,提供一个迭代器存储了窗口中所有的数据,等到触发计算的时候,会遍历所有数据,进行计算。

全窗口函数有:

  • ProcessWindowFunction
  • apply()
/ 
* @param function The window function.
 * @return The data stream that is the result of applying the window function to the window.
 */
public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function) {
WindowFunction:是个接口,有四个参数。
 
* @param <IN> The type of the input value.
 * @param <OUT> The type of the output value.
 * @param <KEY> The type of the key.
 * @param <W> The type of {@code Window} that this window function can be applied on.
 */
@Public
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
 public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);
        SingleOutputStreamOperator<Tuple2<String, Integer>> map = source.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                return new Tuple2<>(value, 1);
            }
        });

        KeyedStream<Tuple2<String, Integer>, Tuple> kStream = map.keyBy(0);
        WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> ttwindow = kStream.timeWindow(Time.seconds(5));
       //todo  用全量函数做窗口的wordCount 全量函数有两种:apply和ProcessWindowFunction
       // 此处展示apply()的使用
       // apply的参数是windowFunction,windowFunction是一个接口,继承了Function和序列化
        SingleOutputStreamOperator<Tuple2<String, Integer>> apply = ttwindow.apply(new myWindowFunction());

        apply.print();
        
        env.execute();

    }
    //todo 做wordCount  windowFunction需要四个泛型
//        * @param <IN> The type of the input value.
//        * @param <OUT> The type of the output value.
//        * @param <KEY> The type of the key.(就是keydStream的key类型)
//        * @param <W> The type of {@code Window} that this window function can be applied on.
//
    public static class myWindowFunction
            implements WindowFunction<Tuple2<String, Integer>,Tuple2<String, Integer>,
            Tuple,TimeWindow>{

        @Override
        public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
             //TODO 实现apply方法,四个参数:
            //  第一个就是KeyedStream的key
            //  第二个是window  可以获取窗口的信息,比如窗口的起始和结束时间
            //  第三个是窗口所有数据组成的迭代器
            //  第四个是Collector,用于写出数据的

            //todo 1.获取key   改成String类型
            String key = tuple.getField(0);
            // todo 2.全量函数会将窗口key相同的数据装到一个迭代器中
            //  遍历迭代器处理key相同的数据集
            Iterator<Tuple2<String, Integer>> inputIter = input.iterator();
            int count = 0;
            while(inputIter.hasNext()){
                count += inputIter.next().f1;
            }
            // todo 3.处理完毕 用collector进行输出  和 flatMap一样
            out.collect(new Tuple2<String,Integer>(key,count));
        }



额外说明:

  • 增量函数比全量效率高
  • 全量的应用场景为必须拿到所有数据才能做计算。

apply函数:参数是WindowFunction

2.3 其它可选API

trigger() 触发器

  • 定义 window 什么时候关闭,触发计算并输出结果
  • 控制窗口什么时候关闭,数据什么时候计算。
  • 窗口的关闭,计算,输出三者都可以独立操作。

evitor() —— 移除器

  • 定义移除某些数据的逻辑
  • 一般不用,窗口前就已经过滤了

以下三个函数只能在事件时间语义下使用

  • .allowedLateness() —— 允许处理迟到的数据
  • .sideOutputLateData() —— 将迟到的数据放入侧输出流
  • .getSideOutput() —— 获取侧输出流
    如果waterMark已经达到了窗口闭合时间,但是允许迟到数据,就会将迟到数据放进侧输出流

完整的窗口调用流程

额外说明:图中中括号是可选项,聚合函数是必选项(开窗就是为了做聚合计算的)

(1) Keyed Windows

在这里插入图片描述

(2) Noe-Keyed Windows

在这里插入图片描述

  • 对于Non-keyed WindowAll来说必须要有触发器,否则不计算,会一直收集数据。
  • TimeWindowAll可以自己触发计算
  • windowAll操作一般不用了

时间窗口的时间偏移量

Flink中时间窗口和Spark中的时间窗口都是整点,比如步长为3s,那么窗口的开始和结束为止必然都是3s的整数倍,原因如下:

在滚动窗口的窗口分配器中:
在这里插入图片描述

  • 滚动窗口的窗口分配器中会有个默认参数offset为0
  • 默认情况下,创建的时间窗口都是以整数时间的,比如窗口为5s,那么窗口范围就是:
    [x分0s,x分5s)、[x分5s,x分10s)。
  • 时间偏移量的作用就是调整窗口的偏移,比如offset设置为1s,那么窗口范围变为:[x分1s,x分6s)、[x分6s,x分11s)…

应用场景:底层时间用的是UTC,对于中国东8区,比UTC标准时间早8h,那么在中国,如果窗口大小开为:Time.days(1),窗口范围就是早晨8点到晚上8点,并非0点到24点。需要将offset设置为-8

时间窗口的起始时间

(1)滚动时间窗口

assignWindows方法用来分配窗口
在这里插入图片描述
其中:now是系统当前时间,start是窗口的起始时间,窗口的起始时间计算方式如下所示:
在这里插入图片描述
当前时间 - (当前时间- offset + 窗口大小)% 窗口大小

  • 比如:窗口大小为:5s,当前时间为9:00:01
  • 9:00:01 - (9:00:01 - 0 + 5s)% 5 = 01 - (06)%5 = 0,所以窗口范围为[9:00:00,9:00:05)
  • 从上面也能看出偏移量能移动窗口的偏移量

+windowSize的目的: timestamp - offset 可能为负数,当时间为1970年的时候,timestamp就是0,这会导致timestamp - 负数,也就是说时刻A开的窗,起始位置会在开窗时刻A的后面

Spark中窗口的起始时间也是这么确定的

(3)滑动时间窗口

在这里插入图片描述
在slidingProcessingTimeWindows类中:
在这里插入图片描述
对于滑动窗口来说,是通过slide确定窗口起始时间点的。

  • 起始时间 = 时间戳 - (时间戳-步长)% 步长

比如一条数据时间是9:01,窗口大小为15min,步长5min=>
1.window数组:Windows = new ArrayList<>(3)
2.最后一个窗口的起始时间:lastStart = 9:00,
3.利用窗口大小和步进遍历,创建新的窗口加入window数组中:

for(long start = lastStart;start>timestamp - size;start -= slide){
Windows.add(new TimeWindow(start,start+size))
}  

窗口左闭右开的问题

TimeWindow类中:

获取当前窗口的最大时间:
在这里插入图片描述
将窗口的end减去了1ms,也就是说窗口大小为5s,但是窗口只接受9:00:0000到9:00:0499时间范围内的数据

滑动窗口的问题

(1)滑动窗口,一个数据属于多少个窗口的问题
在这里插入图片描述
List:对于滑动窗口来说,创建窗口实际上是创建多个,窗口个数=size/slide
(2)窗口大小和滑动步长不是整数倍的问题
假设时间为9:01,窗口大小5min,步长2min
在这里插入图片描述
从图中可以看出来00-01s之间的数据落在三个窗口,01-02的数据落在2个窗口
结论:滑动窗口窗口大小和步长不是整数倍,会导致有的数据会落在 n+1个窗口,有的会落在n个窗口; n = WindowSize / SlideSize

举报

相关推荐

第六章 容器

第六章:接口

第六章总结

PTA第六章

第六章 BOM

java第六章总结

第六章、FOR、IF和while

0 条评论