文章目录
一、window 概念
Keyed Windows
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
Non-Keyed Windows
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
一般真实的流都是无界的,怎样处理无界的数据?
- 可以把无限的数据流进行切分,得到有限的数据集进行处理 —— 也
就是得到有界流 - 窗口(window)就是将无限流切割为有限流的一种方式,它会将流
数据分发到有限大小的桶(bucket)中进行分析
二、 时间窗口(Time Window)
官方文档
1)滚动窗口(Tumbling Windows)
【特点】
- 将数据依据固定的窗口长度对数据进行切分
- 时间对齐,窗口长度固定,没有重叠
【示例代码】
val input: DataStream[T] = ...
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>)
2)滑动窗口(Sliding Windows)
例如,您可以将大小为10分钟的窗口滑动5分钟。这样,每隔5分钟就会出现一个窗口,其中包含在最后10分钟内到达的事件,如下图所示:
【特点】
- 滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口
长度和滑动间隔组成 - 窗口长度固定,可以有重叠
【示例代码】
val input: DataStream[T] = ...
// sliding event-time windows
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
// sliding processing-time windows
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
// sliding processing-time windows offset by -8 hours
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>)
3)会话窗口(Session Windows)
【特点】
- 由一系列事件组合一个指定时间长度的 timeout 间隙组成,也就是
一段时间没有接收到新数据就会生成新的窗口 - 时间无对齐
- 窗口长度不固定,也不会重叠
【示例代码】
val input: DataStream[T] = ...
// event-time session windows with static gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)
// event-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
override def extract(element: String): Long = {
// determine and return session gap
}
}))
.<windowed transformation>(<window function>)
// processing-time session windows with static gap
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)
// processing-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
override def extract(element: String): Long = {
// determine and return session gap
}
}))
.<windowed transformation>(<window function>)
三、window API
- 我们可以用
.window()
来定义一个窗口,然后基于这个 window 去做一些聚
合或者其它处理操作。注意 window () 方法必须在keyBy
之后才能用。 - Flink 提供了更加简单的三种类型时间窗口用于定义时
间窗口,也提供了countWindowAll
来定义计数窗口。
四、窗口分配器(window assigner)
1)增量聚合函数(incremental aggregation functions)
- 每条数据到来就进行计算,保持一个简单的状态
- ReduceFunction
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
- AggregateFunction
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate)
2)全窗口函数(full window functions)
- 先把窗口所有数据收集起来,等到计算的时候会遍历所有数据
- ProcessWindowFunction
一个ProcessWindowFunction可以这样定义和使用:
val input: DataStream[(String, Long)] = ...
input
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.process(new MyProcessWindowFunction())
/* ... */
class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {
def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]) = {
var count = 0L
for (in <- input) {
count = count + 1
}
out.collect(s"Window ${context.window} count: $count")
}
}
3)其它可选window API
- .trigger() —— 触发器,定义 window 什么时候关闭,触发计算并输出结果
- .evictor() —— 移除器,定义移除某些数据的逻辑
- .allowedLateness() —— 允许处理迟到的数据
- .sideOutputLateData() —— 将迟到的数据放入侧输出流
- .getSideOutput() —— 获取侧输出流
五、Flink 中的时间语义
官方文档
Flink 明确支持以下三种时间语义:
-
事件时间(event time): 事件产生的时间,记录的是设备生产(或者存储)事件的时间
-
摄取时间(ingestion time): 数据进入Flink的时间,Flink 读取事件时记录的时间
-
处理时间(processing time):执行操作算子的本地系统时间,与机器相关
上面图片来源:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/concepts/time/
六、设置 Event Time
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
var env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
七、水位线(Watermark)
官方文档
1)为什么需要水位线(Watermark)
2)如何利用Watermark处理乱序数据问题?
- Watermark 是一种衡量 Event Time 进展的机制,可以设定延迟触发;
- Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用
Watermark 机制结合 window 来实现; - 数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据,
都已经到达了,因此,window 的执行也是由 Watermark 触发的; - watermark 用来让程序自己平衡延迟和结果正确性。
3)watermark 的特点
- watermark 是一条特殊的数据记录
- watermark 必须单调递增,以确保任务的事件时间时钟在向前推进,而不
是在后退 - watermark 与数据的时间戳相关
4)watermark 的传递
5)watermark 策略与应用
1)Watermark 策略简介
使用 Flink API 时需要设置一个同时包含 TimestampAssigner 和 WatermarkGenerator 的 WatermarkStrategy。WatermarkStrategy 工具类中也提供了许多常用的 watermark 策略,并且用户也可以在某些必要场景下构建自己的 watermark 策略。WatermarkStrategy 接口如下:
public interface WatermarkStrategy<T>
extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{
/**
* 根据策略实例化一个可分配时间戳的 {@link TimestampAssigner}。
*/
@Override
TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);
/**
* 根据策略实例化一个 watermark 生成器。
*/
@Override
WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}
【例如】你想要要使用有界无序(bounded-out-of-orderness)watermark 生成器和一个 lambda 表达式作为时间戳分配器,那么可以按照如下方式实现:
WatermarkStrategy
.forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
.withTimestampAssigner(new SerializableTimestampAssigner[(Long, String)] {
override def extractTimestamp(element: (Long, String), recordTimestamp: Long): Long = element._1
})
2)使用 Watermark 策略应用
WatermarkStrategy 可以在 Flink 应用程序中的两处使用:
- 第一种是直接在数据源上使用
- 第二种是直接在非数据源的操作之后使用。
【示例】仅当无法直接在数据源上设置策略时,才应该使用第二种方式(在任意转换操作之后设置 WatermarkStrategy):
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream: DataStream[MyEvent] = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter())
val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
.filter( _.severity == WARNING )
.assignTimestampsAndWatermarks(<watermark strategy>)
withTimestampsAndWatermarks
.keyBy( _.getGroup )
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.reduce( (a, b) => a.add(b) )
.addSink(...)
【示例】处理空闲数据源
WatermarkStrategy
.forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
.withIdleness(Duration.ofMinutes(1))
3)使用场景
- 对于排好序的数据,不需要延迟触发,可以只指定时间戳就行了。
// 注意时间是毫秒,所以根据时间戳不同,可能需要乘以1000
dataStream.assignAscendingTimestamps(_.timestamp * 1000)
- Flink 暴露了 TimestampAssigner 接口供我们实现,使我们可以自定义如
何从事件数据中抽取时间戳和生成watermark。
// MyAssigner 可以有两种类型,都继承自 TimestampAssigner
dataStream.assignAscendingTimestamps(new MyAssigner())
4)TimestampAssigner
1、AssignerWithPeriodicWatermarks
- 周期性的生成 watermark:系统会周期性的将 watermark 插入到流中
- 默认周期是200毫秒,可以使用 ExecutionConfig.setAutoWatermarkInterval()
方法进行设置 - 升序和前面乱序的处理 BoundedOutOfOrderness ,都是基于周期性
watermark 的。
2、AssignerWithPunctuatedWatermarks
- 没有时间周期规律,可打断的生成 watermark
可以弃用 AssignerWithPeriodicWatermarks 和 AssignerWithPunctuatedWatermarks 了
未完待续~