设置eventTime的方式分两种情况:
- 升序数据提取时间戳
- 直接使用:.assignAscendingTimestamps(_.timestamp * 1000L)
- 乱序数据提取时间戳,有三种种构造方式(1.10版本只有前两种,flink版本1.11以后建议使用方式三)
- 方式一:AssignerWithPeriodicWatermarks
- 周期性的生成 watermark,默认周期是200ms,也可以通过setAutoWatermarkInterval设置周期时间
- 常用的实现类是:BoundedOutOfOrdernessTimestampExtractor(延时时间)
- 【1.10及以前的版本,建议使用这种方式】
- 方式二:AssignerWithPunctuatedWatermarks:
- 阶段性的生成 watermark,即每来一条数据就生成一个wm
- 方式三:WatermarkStrategy (Flink 1.11.0及版本以上)
- Flink 1.11.0版本以上版本,建议使用这种方式生成watermark
- 固定乱序长度策略(forBoundedOutOfOrderness)
- 通过调用WatermarkStrategy对象上的forBoundedOutOfOrderness方法来实现,接收一个Duration类型的参数作为最大乱序(out of order)长度。WatermarkStrategy对象上的withTimestampAssigner方法为从事件数据中提取时间戳提供了接口
- 一般使用这种策略
- 单调递增策略(forMonotonousTimestamps)
- 通过调用WatermarkStrategy对象上的forMonotonousTimestamps方法来实现,无需任何参数,相当于将forBoundedOutOfOrderness策略的最大乱序长度outOfOrdernessMillis设置为0。
- 不生成策略(noWatermarks)
- WatermarkStrategy.noWatermarks()
- 方式一:AssignerWithPeriodicWatermarks
示例:
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 设置生成watermark的时间间隔,系统默认为200毫秒,一般使用系统默认即可
env.getConfig.setAutoWatermarkInterval(5000)
val sensorStream: DataStream[SensorReading] = env
.readTextFile("source.txt")
.map(new MyMapToSensorReading)
// 1、给一个没有乱序,时间为升序的流设置一个EventTime
val ascendingStream = sensorStream
.assignAscendingTimestamps(_.timestamp)
// 2、当流中存在时间乱序问题,引入watermark,并设置延迟时间
// 2.1 使用 AssignerWithPeriodicWatermarks 的常用实现类 BoundedOutOfOrdernessTimestampExtractor 设置EventTime
/**
* 知识点:
* 1、BoundedOutOfOrdernessTimestampExtractor中的泛型为流中数据的类型
* 2、传入的参数为 watermark 的最大延迟时间(即允许数据迟到的时间)
* 3、重写的extractTimestamp方法返回的是设置数据中EventTime的字段,单位为毫秒,需要将时间转换成Long(最近时间为13位的长整形)才能返回
* 4、当我们能大约估计到流中的最大乱序时,建议使用此中方式,比较方便
*/
val watermarkStream: DataStream[SensorReading] = sensorStream
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
override def extractTimestamp(element: SensorReading): Long = {
element.timestamp * 1000
}
})
// 2.2 使用 AssignerWithPunctuatedWatermarks 设置EventTime 不常用,演示略
// 2.3 使用 WatermarkStrategy 设置EventTime (Flink 1.12.0版本以上)
val jsonObjDS = kafkaDS.map(line => GsonUtils.parserObject(line))
// 提取 eventtime
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner[JsonObject] {
override def extractTimestamp(jsonObj: JsonObject, l: Long): Long = {
jsonObj.get("ts").getAsLong
}
}))