0
点赞
收藏
分享

微信扫一扫

flink学习记录之水位线

往复随安_5bb5 2022-02-23 阅读 97
flink

水位线的概念理解及工作原理:

watermark是用于处理由于网络、背压等原因产生的乱序事件,窗口结束时间+延迟时间=最大waterMark值,即当waterMark值大于的上述计算出的最大waterMark值,该窗口内的数据就属于迟到的数据,无法参与window计算;代码中生成水位线的时间(即调用assignTimestampsAndWatermarks(WatermarkStrategy<T>watermarkStrategy)方法的时间)可为source之后,也可为使用算子之后,。

真实的项目中不光要考虑延迟时间,还要考虑长时间无数据无法触发计算的情况。

生成水位线的源码解析(flink1.11版本之后):

WatermarkStrategy接口继承了接口TimestampAssignerSupplier<T>以及接口WatermarkGeneratorSupplier<T>。其中TimestampAssignerSupplier<T>中的一个方法createTimestampAssigner(Context context) 返回一个TimestampAssigner对象,这个对象中有一个longextractTimestamp方法,作用是从Flink消费的记录中抽取时间,既可以理解为我们如果要通过业务时间进行统计时,需要通过该方法来提取记录的业务时间。所以用到业务时间的话,一定要根据自己的业务场景对该方法进行具体的实现。否则Flink会提供一个默认的实现RecordTimestampAssigner<>()(默认直接返回记录里边的时间)。

 
 

其中继承自WatermarkGeneratorSupplier<T>的方法为createWatermarkGenerator(Context context),这个方法返回一个WatermarkGenerator对象,返回的对象WatermarkGenerator有两个方法:

1.onEvent()方法:每条记录进来都会调用一次这个方法,入参有3个,第一个是记录,第二个是记录携带的时间,如果注册了时间就会有,第三个参数时水印发射器WatermarkOutputoutput,可以通过这个参数对水印进行发射,用户可以根据自己的业务场景来编写自己的水印生成以及发射逻辑。该方法的重点是每条记录都会调用.

2.onPeriodicEmit():该方法是Flink提供的一个定时器方法,每隔一段时间会调用此方法,入参是WatermarkOutputoutput,用户可以通过这个方法每隔一段时间发送一次水印,当记录数过多时,每条记录都发送一次水印明显不合适,也影响性能,此时可以通过这个方法进行水印的定时发送,而onEvent只记录当前水印而选择不发射出去。该方法的参数配置为env.getConfig().setAutoWatermarkInterval(300L),入参是毫秒数,表示隔多少毫秒向下游算子发送一次水印。

flink也提供了三个WatermarkGenerator的实现类,即flink生成水印的默认三种策略:

1.BoundedOutOfOrdernessWatermark:它的onEvent方法每次从事件数据中提取时间戳,并计算最大时间戳maxTimestamp;在周期触发的onPeriodicEmit方法中,生成的Watermark等于最大时间戳maxTimestamp减去最大乱序长度outOfOrdernessMillis,再减1毫秒。

使用:WatermarkStrategy<Map<String,Object>>forBoundedOutOfOrderness(Duration.ofSeconds(1))

2.BoundedOutOfOrdernessWatermark的一个子类,即outOfOrdernessMillis设为0;

使用:与上类似;

3.WatermarkStrategy.noWatermarks():一个算子从多个上游算子中获取数据时,会取上游最小的Watermark作为自身的Watermark,并检测是否满足窗口触发条件。当达不到触发条件,窗口会在内存中缓存大量窗口数据,导致内存不足等问题。

注:flink提供了设置流状态为空闲的withIdleness方法。在设置的超时时间内,当某个数据流一直没有事件数据到达,就标记这个流为空闲。下游算子不需要等待这条数据流产生的Watermark,而取其他上游激活状态的Watermark,来决定是否需要触发窗口计算。

WatermarkStrategy.withIdleness(Duration.ofMillis(5))

上面代码设置超时时间5毫秒,超过这个时间,没有生成Watermark,将流状态设置空闲,当下次有新的Watermark生成并发送到下游时,重新设置为活跃。

FLINK水印算子简要流程:

首先TimestampsAndWatermarksOperator算子会在open方法中初始化用户定义的水印逻辑及方式,并且如果需要定时发送水印会,注册一个定时器触发水印定时发送。

其次,当元素到达算子后会调用processElement(StreamRecord<T>element),如果元素已经被注册了时间,就直接获取时间,或者设置为LONG.MIN_VALUE,然后根据用户定义的timestampAssigner.extractTimestamp从记录中抽取时间属性,然后再将时间写入元素中,最后调用用户定义的watermarkGenerator.onEvent方法,根据用户的逻辑选择刷新水印以及是否发射水印。

如果需要定时发送水印,则会注册一个定时器,通过onProcessingTime来触发定时器的内容,而内容也十分简单,先调用用户定义的watermarkGenerator.onPeriodicEmit方法发送水印,然后获取当前时间,最后注册当前时间加水印定时发送间隔的定时触发器,等待下次触发该方法。

flink sql中的水位线:

如果需要自定义flink sql中的水位线机制,可以自定义connector,将DynamicTableSource(创建动态源表)的子类中的applyWatermark()方法进行重写,使用自定义的WatermarkGenerator即可。

举报

相关推荐

0 条评论