0
点赞
收藏
分享

微信扫一扫

flink时间窗口

扒皮狼 2022-01-06 阅读 78

Flink 窗口应用大致骨架

窗口

窗口是flink中的一类算子,是DataStream的逻辑边界,用于将许多事件按照时间或者其他特征分组,从而将每一组作为整体进行分析。在开发中最常用的是时间窗口和计数窗口。

窗口应用大致骨架
在这里插入图片描述

滚动窗口

在这里插入图片描述
可以用TumblingEventTimeWindows和TumblingProcessingTimeWindows创建一个基于Event Time或Processing Time的滚动时间窗口。

//滚动窗口
// tumbling event-time windows
     input
    .keyBy(...)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
     <window function>(...)
// tumbling processing-time windows
input
    .keyBy(...)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<window function>(...)
// 1 hour tumbling event-time windows offset by 15 minutes.
input
    .keyBy(...)
    .window(TumblingEventTimeWindows.of(Time.hours(1), Time.minutes(15)))
    .<window function>(...)
   
 input
.timeWindow(Time.seconds(30))

滑动窗口

滑动窗口以一个步长(Slide)不断向前滑动,窗口的长度固定。使用时,我们要设置Slide和Size。Slide的大小决定了Flink以多大的频率来创建新的窗口,Slide较小,窗口的个数会很多。Slide小于窗口的Size时,相邻窗口会重叠,一个事件会被分配到多个窗口;Slide大于Size,有些事件可能被丢掉。
在这里插入图片描述

val input: DataStream[T] = ...
 
// sliding event-time windows
input
    .keyBy(...)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<window function>(...)
 
// sliding processing-time windows
input
    .keyBy(<...>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<window function>(...)
 
// sliding processing-time windows offset by -8 hours
input
    .keyBy(<...>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<window function>(...)

窗口函数

1. ReduceFunction
使用reduce算子时,我们要重写一个ReduceFunction。ReduceFunction接受两个相同类型的输入,生成一个输出,即两两合一地进行汇总操作,生成一个同类型的新元素。在窗口上进行reduce的原理与之类似,只不过多了一个窗口状态数据,这个状态数据的数据类型和输入的数据类型是一致的,是之前两两计算的中间结果数据。当数据流中的新元素流入后,ReduceFunction将中间结果和新流入数据两两合一,生成新的数据替换之前的状态数据。

2. AggregateFunction
AggregateFunction也是一种增量计算窗口函数,也只保存了一个中间状态数据,但AggregateFunction使用起来更复杂一些。

在这里插入图片描述

输入类型是IN,输出类型是OUT,中间状态数据是ACC,这样复杂的设计主要是为了解决输入类型、中间状态和输出类型不一致的问题,同时ACC可以自定义,我们可以在ACC里构建我们想要的数据结构。比如我们要计算一个窗口内某个字段的平均值,那么ACC中要保存总和以及个数。

case class StockPrice(symbol: String, price: Double)
// IN: StockPrice
// ACC:(String, Double, Int) - (symbol, sum, count)
// OUT: (String, Double) - (symbol, average)
class AverageAggregate extends AggregateFunction[StockPrice, (String, Double, Int), (String, Double)] {
  override def createAccumulator() = ("", 0, 0)
  override def add(item: StockPrice, accumulator: (String, Double, Int)) =
  (item.symbol, accumulator._2 + item.price, accumulator._3 + 1)
 
  override def getResult(accumulator:(String, Double, Int)) = (accumulator._1 ,accumulator._2 / accumulator._3)
  override def merge(a: (String, Double, Int), b: (String, Double, Int)) =(a._1 ,a._2 + b._2, a._3 + b._3)}
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val input: DataStream[StockPrice] = ...
val average = input
      .keyBy(s => s.symbol)
      .timeWindow(Time.seconds(10))
      .aggregate(new AverageAggregate)

3. ProcessWindowFunction
与前两种方法不同,ProcessWindowFunction要对窗口内的全量数据都缓存。在Flink所有API中,process算子以及其对应的函数是最底层的实现,使用这些函数能够访问一些更加底层的数据,比如,直接操作状态等。

ProcessWindowFunction相比AggregateFunction和ReduceFunction的应用场景更广,能解决的问题也更复杂。但ProcessWindowFunction需要将窗口中所有元素作为状态存储起来,这将占用大量的存储资源,尤其是在数据量大窗口多的场景下,使用不慎可能导致整个程序宕机。

4. ProcessWindowFunction与增量计算相结合
当我们既想访问窗口里的元数据,又不想缓存窗口里的所有数据时,可以将ProcessWindowFunction与增量计算函数相reduce和aggregate结合。对于一个窗口来说,Flink先增量计算,窗口关闭前,将增量计算结果发送给ProcessWindowFunction作为输入再进行处理。

举报

相关推荐

0 条评论