0
点赞
收藏
分享

微信扫一扫

Flink时间概念


时间概念类型

Flink根据时间产生的位置不同:将时间区分为三种时间概念

  1. 事件生成时间
  2. 事件接入时间
  3. 事件处理时间

Flink流式处理中,绝大部分的业务都会使用eventTime,一般只在eventTime无法使用时,考虑其他时间属性

Flink中默认使用Process Time的时间概念,如果需要使用EventTime的时间属性,需要进行设置

val env = StreamExecutionEnvironment.getExecutionEnvironment

// 从调用时刻开始给env创建的每一个stream追加时间特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

不同的事件时间设置方式

  def test15(): Unit ={
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 从调用时刻开始给env创建的每一个stream追加时间特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
}

EventTime和Watermark

       通常情况下,由于网络或系统等外部因素影响,事件数据不能及时传输至Flink系统中,导致数据乱序到达或延迟到达等问题, 因此徐璈一种机制能控制能控制数据处理的过程和进度,比如基于事件时间的window创建后,具体该如何确定属于该Window的数据元素已经全部到达,如果确定全部到达,就可以对Window的所有数据做窗口机损操作(如汇总,分组等),如果没有到达,则继续等待该窗口中的数据全部到达才开始处理,这种情况下就需要用到水位线(WaterMarks)机制,他能衡量数据处理进度,保证时间数据全部到达Flink系统,或者在乱序及延迟到达时,也能像预期一样计算正确并连续的结果

WaterMark

将用读取进入系统的最新事件时间减去固定的时间间隔作为Watermark, 时间间隔为用户外部配置的支持最大延迟到达的时间长度, 理论上不会有时间超过该时间间隔到达,否则就认为是迟到事件或异常事件

原理

       当事件接入Flink系统时,会在Sources Operator中根据当前事件时间产生Watermarks时间戳,记为X, 进入到Flink系统的数据时间,记为Y,如果Y< X, 则代表WaterMark X时间戳之前的所有事件均已到达

带有Timestamp和Watermark的原函数(Source Funtion with TImestamps And Watermarks)

数据流源矿业直接为他们产生的数据元素分配timestamp,并且他们也能发送水印,这样,就没必要再定义timestamp分配器了,需要注意的是:如果一个timestamp分配器被使用的话,由源提供的任何timestamp和watermark都会被重写

override def run(ctx: SourceContext[MyType]): Unit = {
while (/* condition */) {
val next: MyType = getNext()
ctx.collectWithTimestamp(next, next.eventTimestamp)

if (next.hasWatermarkTime) {
ctx.emitWatermark(new Watermark(next.getWatermarkTime))
}
}
}

时间戳分配器/水印生成器(TImestamp Assigners / Watermark Generators)

Timestamp分配器获取一个流并生成一个新的带有Timestamp元素和水印的流。如果原始流已经有时间戳和/或水印,则Timestamp分配程序将覆盖它们

Timestamp分配器通常在数据源之后立即指定,但这并不是严格要求的。通常是在timestamp分配器之前先解析(MapFunction)和过滤(FilterFunction)。在任何情况下,都需要在事件时间上的第一个操作(例如第一个窗口操作)之前指定timestamp分配程序。有一个特殊情况,当使用Kafka作为流作业的数据源时,Flink允许在源内部指定timestamp分配器和watermark生成器。更多关于如何进行的信息请参考Kafka Connector的文档。 接下来的部分展示了要创建自己的timestamp 抽取器和watermark发射器,程序员需要实现的主要接口。想要查看Flink预定义的抽取器,

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val stream: DataStream[MyEvent] = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter())

val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
.filter( _.severity == WARNING )
.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())

withTimestampsAndWatermarks
.keyBy( _.getGroup )
.timeWindow(Time.seconds(10))
.reduce( (a, b) => a.add(b) )
.addSink(...)

周期性水印(With Periodic Watermarks)

AssignerWithPeriodicWatermarks分配时间戳并定期生成水印(这可能依赖于流元素,或者纯粹基于处理时间)。 watermark生成的时间间隔(每n毫秒)是通过ExecutionConfig.setAutoWatermarkInterval(…)定义的。每次调用分配器的getCurrentWatermark()方法时,如果返回的watermark非空且大于前一个watermark,则会发出新的watermark。 这里我们展示了两个使用周期性水印生成的时间戳分配器的简单示例。请注意,Flink附带了一个BoundedOutOfOrdernessTimestampExtractor,类似于下面所示的BoundedOutOfOrdernessGenerator,您可以在这里阅读相关内容。

/**
* This generator generates watermarks assuming that elements arrive out of order,
* but only to a certain degree. The latest elements for a certain timestamp t will arrive
* at most n milliseconds after the earliest elements for timestamp t.
*/
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {

val maxOutOfOrderness = 3500L // 3.5 seconds

var currentMaxTimestamp: Long = _

override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
val timestamp = element.getCreationTime()
currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
timestamp
}

override def getCurrentWatermark(): Watermark = {
// return the watermark as current highest timestamp minus the out-of-orderness bound
new Watermark(currentMaxTimestamp - maxOutOfOrderness)
}
}

/**
* This generator generates watermarks that are lagging behind processing time by a fixed amount.
* It assumes that elements arrive in Flink after a bounded delay.
*/
class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {

val maxTimeLag = 5000L // 5 seconds

override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
element.getCreationTime
}

override def getCurrentWatermark(): Watermark = {
// return the watermark as current time minus the maximum time lag
new Watermark(System.currentTimeMillis() - maxTimeLag)
}
}

每个Kafka分区的Timestamp(TimeStamps per Kafka Partion)

当使用Apache Kafka作为数据源时,每个Kafka分区可能有一个简单的事件时间模式(递增timestamp或有界的无序)。然而,当使用来自Kafka的流时,多个分区通常是并行使用的,将事件与分区交叉,破坏了每个分区的数据模型(这是Kafka消费者客户端所固有的工作方式)

在这种情况下,您可以使用Flink支持Kafka-partition-aware生成水印。该特性可以在Kafka消费者内部生成watermarks,每个分区的watermarks合并方式与流shuffles时合并watermarks的方式相同。

例如,如果事件时间戳严格按照Kafka分区递增排列,那么使用升序时间戳水印生成器生成每个分区的水印将产生完美的整体水印。

下图展示了如何使用每个kafka分区生成水印,以及在这种情况下水印如何通过流数据传播。

val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema, props)
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[MyType] {
def extractAscendingTimestamp(element: MyType): Long = element.eventTimestamp
})

val stream: DataStream[MyType] = env.addSource(kafkaSource)

Flink时间概念_数据

参考链接: ​​https://www.jianshu.com/p/e6c7957d76d9​​

举报

相关推荐

0 条评论