0
点赞
收藏
分享

微信扫一扫

Flink提取EventTime并生成Watermark几种常用的方式

elvinyang 2022-02-26 阅读 98

设置eventTime的方式分两种情况:

  • 升序数据提取时间戳
    • 直接使用:.assignAscendingTimestamps(_.timestamp * 1000L)
  • 乱序数据提取时间戳,有三种种构造方式(1.10版本只有前两种,flink版本1.11以后建议使用方式三)
    • 方式一:AssignerWithPeriodicWatermarks
      • 周期性的生成 watermark,默认周期是200ms,也可以通过setAutoWatermarkInterval设置周期时间
      • 常用的实现类是:BoundedOutOfOrdernessTimestampExtractor(延时时间)
      • 【1.10及以前的版本,建议使用这种方式】
    • 方式二:AssignerWithPunctuatedWatermarks:
      • 阶段性的生成 watermark,即每来一条数据就生成一个wm
    • 方式三:WatermarkStrategy (Flink 1.11.0及版本以上)
      • Flink 1.11.0版本以上版本,建议使用这种方式生成watermark
      • 固定乱序长度策略(forBoundedOutOfOrderness)
        • 通过调用WatermarkStrategy对象上的forBoundedOutOfOrderness方法来实现,接收一个Duration类型的参数作为最大乱序(out of order)长度。WatermarkStrategy对象上的withTimestampAssigner方法为从事件数据中提取时间戳提供了接口
        • 一般使用这种策略
      • 单调递增策略(forMonotonousTimestamps)
        • 通过调用WatermarkStrategy对象上的forMonotonousTimestamps方法来实现,无需任何参数,相当于将forBoundedOutOfOrderness策略的最大乱序长度outOfOrdernessMillis设置为0。
      • 不生成策略(noWatermarks)
        • WatermarkStrategy.noWatermarks()

示例:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 设置生成watermark的时间间隔,系统默认为200毫秒,一般使用系统默认即可
env.getConfig.setAutoWatermarkInterval(5000)

val sensorStream: DataStream[SensorReading] = env
    .readTextFile("source.txt")
    .map(new MyMapToSensorReading)

// 1、给一个没有乱序,时间为升序的流设置一个EventTime
val ascendingStream = sensorStream
.assignAscendingTimestamps(_.timestamp)
// 2、当流中存在时间乱序问题,引入watermark,并设置延迟时间
// 2.1 使用 AssignerWithPeriodicWatermarks 的常用实现类 BoundedOutOfOrdernessTimestampExtractor 设置EventTime
/**
 * 知识点:
 * 1、BoundedOutOfOrdernessTimestampExtractor中的泛型为流中数据的类型
 * 2、传入的参数为 watermark 的最大延迟时间(即允许数据迟到的时间)
 * 3、重写的extractTimestamp方法返回的是设置数据中EventTime的字段,单位为毫秒,需要将时间转换成Long(最近时间为13位的长整形)才能返回
 * 4、当我们能大约估计到流中的最大乱序时,建议使用此中方式,比较方便
 */ 
val watermarkStream: DataStream[SensorReading] = sensorStream
.assignTimestampsAndWatermarks(
    new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
    override def extractTimestamp(element: SensorReading): Long = {
        element.timestamp * 1000
    }
})

// 2.2 使用 AssignerWithPunctuatedWatermarks 设置EventTime 不常用,演示略

// 2.3 使用 WatermarkStrategy 设置EventTime (Flink 1.12.0版本以上)
val jsonObjDS = kafkaDS.map(line => GsonUtils.parserObject(line))
  // 提取 eventtime
  .assignTimestampsAndWatermarks(
     WatermarkStrategy
     .forBoundedOutOfOrderness(Duration.ofSeconds(2))
     .withTimestampAssigner(new SerializableTimestampAssigner[JsonObject] {
      override def extractTimestamp(jsonObj: JsonObject, l: Long): Long = {
        jsonObj.get("ts").getAsLong
      }
    }))
举报

相关推荐

0 条评论