0
点赞
收藏
分享

微信扫一扫

大数据Hadoop之——Flink中的Window API+时间语义+Watermark

野见 2022-02-27 阅读 91

文章目录

一、window 概念

Keyed Windows

stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

Non-Keyed Windows

stream
       .windowAll(...)           <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

一般真实的流都是无界的,怎样处理无界的数据?

  • 可以把无限的数据流进行切分,得到有限的数据集进行处理 —— 也
    就是得到有界流
  • 窗口(window)就是将无限流切割为有限流的一种方式,它会将流
    数据分发到有限大小的桶(bucket)中进行分析

二、 时间窗口(Time Window)

官方文档

1)滚动窗口(Tumbling Windows)

【特点】

  • 将数据依据固定的窗口长度对数据进行切分
  • 时间对齐,窗口长度固定,没有重叠

【示例代码】

val input: DataStream[T] = ...

// tumbling event-time windows
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

// tumbling processing-time windows
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

// daily tumbling event-time windows offset by -8 hours.
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>)

2)滑动窗口(Sliding Windows)

例如,您可以将大小为10分钟的窗口滑动5分钟。这样,每隔5分钟就会出现一个窗口,其中包含在最后10分钟内到达的事件,如下图所示:
在这里插入图片描述
【特点】

  • 滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口
    长度和滑动间隔组成
  • 窗口长度固定,可以有重叠

【示例代码】

val input: DataStream[T] = ...

// sliding event-time windows
input
    .keyBy(<key selector>)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>)

// sliding processing-time windows
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>)

// sliding processing-time windows offset by -8 hours
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<windowed transformation>(<window function>)

3)会话窗口(Session Windows)

【特点】

  • 由一系列事件组合一个指定时间长度的 timeout 间隙组成,也就是
    一段时间没有接收到新数据就会生成新的窗口
  • 时间无对齐
  • 窗口长度不固定,也不会重叠

【示例代码】

val input: DataStream[T] = ...

// event-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>)

// event-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
      override def extract(element: String): Long = {
        // determine and return session gap
      }
    }))
    .<windowed transformation>(<window function>)

// processing-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>)


// processing-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
      override def extract(element: String): Long = {
        // determine and return session gap
      }
    }))
    .<windowed transformation>(<window function>)

三、window API

  • 我们可以用 .window() 来定义一个窗口,然后基于这个 window 去做一些聚
    合或者其它处理操作。注意 window () 方法必须在 keyBy 之后才能用
  • Flink 提供了更加简单的三种类型时间窗口用于定义时
    间窗口,也提供了countWindowAll来定义计数窗口

四、窗口分配器(window assigner)

1)增量聚合函数(incremental aggregation functions)

  • 每条数据到来就进行计算,保持一个简单的状态
  • ReduceFunction
val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
  • AggregateFunction
val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(new AverageAggregate)

2)全窗口函数(full window functions)

  • 先把窗口所有数据收集起来,等到计算的时候会遍历所有数据
  • ProcessWindowFunction

一个ProcessWindowFunction可以这样定义和使用:

val input: DataStream[(String, Long)] = ...

input
  .keyBy(_._1)
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .process(new MyProcessWindowFunction())

/* ... */

class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {

  def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]) = {
    var count = 0L
    for (in <- input) {
      count = count + 1
    }
    out.collect(s"Window ${context.window} count: $count")
  }
}

3)其它可选window API

  • .trigger() —— 触发器,定义 window 什么时候关闭,触发计算并输出结果
  • .evictor() —— 移除器,定义移除某些数据的逻辑
  • .allowedLateness() —— 允许处理迟到的数据
  • .sideOutputLateData() —— 将迟到的数据放入侧输出流
  • .getSideOutput() —— 获取侧输出流

五、Flink 中的时间语义

官方文档
Flink 明确支持以下三种时间语义:

  • 事件时间(event time): 事件产生的时间,记录的是设备生产(或者存储)事件的时间

  • 摄取时间(ingestion time): 数据进入Flink的时间,Flink 读取事件时记录的时间

  • 处理时间(processing time):执行操作算子的本地系统时间,与机器相关

在这里插入图片描述

上面图片来源:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/concepts/time/

六、设置 Event Time

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
var env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

七、水位线(Watermark)

官方文档

1)为什么需要水位线(Watermark)

2)如何利用Watermark处理乱序数据问题?

  • Watermark 是一种衡量 Event Time 进展的机制,可以设定延迟触发
  • Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常
    Watermark 机制结合 window 来实现
  • 数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据,
    都已经到达了,因此,window 的执行也是由 Watermark 触发的;
  • watermark 用来让程序自己平衡延迟和结果正确性。

3)watermark 的特点

在这里插入图片描述

  • watermark 是一条特殊的数据记录
  • watermark 必须单调递增,以确保任务的事件时间时钟在向前推进,而不
    是在后退
  • watermark 与数据的时间戳相关

4)watermark 的传递

在这里插入图片描述

5)watermark 策略与应用

1)Watermark 策略简介

使用 Flink API 时需要设置一个同时包含 TimestampAssigner 和 WatermarkGenerator 的 WatermarkStrategy。WatermarkStrategy 工具类中也提供了许多常用的 watermark 策略,并且用户也可以在某些必要场景下构建自己的 watermark 策略。WatermarkStrategy 接口如下:

public interface WatermarkStrategy<T> 
    extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{

    /**
     * 根据策略实例化一个可分配时间戳的 {@link TimestampAssigner}。
     */
    @Override
    TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);

    /**
     * 根据策略实例化一个 watermark 生成器。
     */
    @Override
    WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}

【例如】你想要要使用有界无序(bounded-out-of-orderness)watermark 生成器和一个 lambda 表达式作为时间戳分配器,那么可以按照如下方式实现:

WatermarkStrategy
  .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
  .withTimestampAssigner(new SerializableTimestampAssigner[(Long, String)] {
    override def extractTimestamp(element: (Long, String), recordTimestamp: Long): Long = element._1
  })

2)使用 Watermark 策略应用

WatermarkStrategy 可以在 Flink 应用程序中的两处使用:

  • 第一种是直接在数据源上使用
  • 第二种是直接在非数据源的操作之后使用。

【示例】仅当无法直接在数据源上设置策略时,才应该使用第二种方式(在任意转换操作之后设置 WatermarkStrategy):

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream: DataStream[MyEvent] = env.readFile(
         myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
         FilePathFilter.createDefaultFilter())

val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
        .filter( _.severity == WARNING )
        .assignTimestampsAndWatermarks(<watermark strategy>)

withTimestampsAndWatermarks
        .keyBy( _.getGroup )
        .window(TumblingEventTimeWindows.of(Time.seconds(10)))
        .reduce( (a, b) => a.add(b) )
        .addSink(...)

【示例】处理空闲数据源

WatermarkStrategy
  .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
  .withIdleness(Duration.ofMinutes(1))

3)使用场景

  • 对于排好序的数据,不需要延迟触发,可以只指定时间戳就行了。
// 注意时间是毫秒,所以根据时间戳不同,可能需要乘以1000
dataStream.assignAscendingTimestamps(_.timestamp * 1000)
  • Flink 暴露了 TimestampAssigner 接口供我们实现,使我们可以自定义如
    何从事件数据中抽取时间戳和生成watermark。
// MyAssigner 可以有两种类型,都继承自 TimestampAssigner
dataStream.assignAscendingTimestamps(new MyAssigner())

4)TimestampAssigner

1、AssignerWithPeriodicWatermarks

  • 周期性的生成 watermark:系统会周期性的将 watermark 插入到流中
  • 默认周期是200毫秒,可以使用 ExecutionConfig.setAutoWatermarkInterval()
    方法进行设置
  • 升序和前面乱序的处理 BoundedOutOfOrderness ,都是基于周期性
    watermark 的。

2、AssignerWithPunctuatedWatermarks

  • 没有时间周期规律,可打断的生成 watermark

可以弃用 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks 了

未完待续~

举报

相关推荐

0 条评论