Flink can process data based on different notions of time.
- Processing time refers to the machine’s system time (also known as epoch time, e.g. Java’s
System.currentTimeMillis()
) that is executing the respective operation. - Event time refers to the processing of streaming data based on timestamps that are attached to each row. The timestamps can encode when an event happened
基于时间的操作(比如时间窗口),需要定义相关的时间语义和时间数据来源的信息。
在Table API 和 SQL 中,会给表单独提供一个逻辑上的时间字段,专门用来在表处理程序中指示时间。
时间属性的数据类型为 TIMESTAMP,它的行为类似于常规时间戳,可以直接访问并且进行计算。
每种类型的表都可以有时间属性:
- 可以在用CREATE TABLE DDL创建表的时候指定
- 可以在
DataStream
中指定 - 可以在定义
TableSource
时指定(过时)
一旦时间属性定义好,它就可以像普通列一样使用,也可以在时间相关的操作中使用。
Table API 程序需要在 streaming environment 中指定时间属性
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
// 或者:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
1、事件时间
事件时间允许程序按照数据中包含的时间来处理,这样可以在有乱序或者晚到的数据的情况下产生一致的处理结果。它可以保证从外部存储读取数据后产生可以复现(replayable)的结果。
除此之外,事件时间可以让程序在流式和批式作业中使用同样的语法。在流式程序中的事件时间属性,在批式程序中就是一个正常的时间字段。
事件时间语义最大的用途就是处理乱序事件或者延迟事件的场景。我们通过设置水位线(watermark)来表示事件时间的进展,而水位线可以根据数据的最大时间戳设置一个延迟时间。这样即使在出现乱序的情况下,对数据的处理也可以获得正确的结果。
为了处理无序事件,并区分流中的迟到事件。Flink 需要从事件数据中提取时间戳,并生成水位线,用来推进事件时间的进展
事件时间属性也有类似于处理时间的二种定义方式:
- 在DDL中定义
- 在 DataStream 到 Table 转换时定义
1.1 在 DDL 中定义
事件时间属性可以用 WATERMARK 语句在 CREATE TABLE DDL 中进行定义。WATERMARK 语句在一个已有字段上定义一个 watermark 生成表达式,同时标记这个已有字段为时间属性字段
Flink 支持和在 TIMESTAMP 列和 TIMESTAMP_LTZ 列上定义事件时间
(1)如果源数据中的时间戳数据表示为年-月-日-时-分-秒,则通常为不带时区信息的字符串值,例如 2020-04-15 20:13:40.564
,建议将事件时间属性定义在 TIMESTAMP
列上:
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time TIMESTAMP(3),
-- 声明 user_action_time 是事件时间属性,并且用 延迟 5 秒的策略来生成 watermark
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
...
);
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
(2)源数据中的时间戳数据表示为一个纪元 (epoch) 时间,通常是一个 long 值,
例如:1618989564564
,建议将事件时间属性定义在 TIMESTAMP_LTZ
列上:
CREATE TABLE user_actions ( user_name STRING, data STRING, ts BIGINT, time_ltz AS TO_TIMESTAMP_LTZ(ts, 3), -- declare time_ltz as event time attribute and use 5 seconds delayed watermark strategy WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND) WITH ( ...);SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)FROM user_actionsGROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
1.2 在 DataStream 到 Table 转换时定义
事件时间属性可以在将 DataStream 转换为表的时候来定义。我们调用fromDataStream()方法创建表时,可以追加参数来定义表中的字段结构;这时可以给某个字段加上.rowtime() 后缀,就表示将当前字段指定为事件时间属性。
这个字段:
- 可以是数据中本不存在、额外追加上去的“逻辑字段”,就像之前 DDL 中定义的第二种情况;
- 可以是本身固有的字段,那么这个字段就会被事件时间属性所覆盖,类型也会被转换为 TIMESTAMP
事件时间属性可以用 .rowtime
后缀在定义 DataStream
schema 的时候来定义。时间戳和 watermark 在这之前一定是在 DataStream
上已经定义好了。在从 DataStream 转换到 Table 时,由于 DataStream
没有时区概念,因此 Flink 总是将 rowtime
属性解析成 TIMESTAMP WITHOUT TIME ZONE
类型,并且将所有事件时间的值都视为 UTC 时区的值。
在从 DataStream
到 Table
转换时定义事件时间属性有两种方式。取决于用 .rowtime
后缀修饰的字段名字是否是已有字段,事件时间字段可以是:
- 在 schema 的结尾追加一个新的字段
- 替换一个已经存在的字段
不管在哪种情况下,事件时间字段都表示 DataStream
中定义的事件的时间戳。
package table.time;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.Duration;import java.time.ZoneId;import static org.apache.flink.table.api.Expressions.$;public class EventTimeAttr { public static void main(String[] args) throws Exception { // 获取流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 获取表环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 设置为上海时区 tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); SingleOutputStreamOperator<Tuple3<String, String,Long>> streamSource = env.addSource( new DataGeneratorSource<Tuple3<String, String,Long>>( new RandomGenerator<Tuple3<String, String,Long>>() { @Override public Tuple3<String, String,Long> next() { return new Tuple3<String, String,Long>( random.nextHexString(5), random.nextHexString(3), System.currentTimeMillis() ); } } )) .returns(Types.TUPLE(Types.STRING, Types.STRING,Types.LONG)); // 基于 stream 中的事件产生时间戳和 watermark SingleOutputStreamOperator<Tuple3<String, String,Long>> stream = streamSource.assignTimestampsAndWatermarks( WatermarkStrategy.<Tuple3<String, String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(1)) .withTimestampAssigner( new SerializableTimestampAssigner<Tuple3<String, String,Long>>() { @Override public long extractTimestamp(Tuple3<String, String,Long> element, long recordTimestamp) { return element.f2; } } ) ); // 声明一个额外的逻辑字段作为事件时间属性 Table table = tableEnv.fromDataStream(stream, $("user_name"), $("data"), // $("timestamp"), $("ts").rowtime()); table.printSchema(); tableEnv.executeSql("select * from " + table).print(); env.execute(); }}
(
`user_name` STRING,
`data` STRING,
`ts` TIMESTAMP(3) *ROWTIME*
)
1.3 使用 TableSource 定义(已过时)
事件时间属性可以在实现了 DefinedRowTimeAttributes
的 TableSource
中定义。getRowtimeAttributeDescriptors()
方法返回 RowtimeAttributeDescriptor
的列表,包含了描述事件时间属性的字段名字、如何计算事件时间、以及 watermark 生成策略等信息。
同时需要确保 getDataStream
返回的 DataStream
已经定义好了时间属性。只有在定义了StreamRecordTimestamp
时间戳分配器的时候,才认为 DataStream
是有时间戳信息的。只有定义了 PreserveWatermarks
watermark 生成策略的 DataStream
的 watermark 才会被保留。反之,则只有时间字段的值是生效的。
// 定义一个有事件时间属性的 table source
public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttributes {
@Override
public TypeInformation<Row> getReturnType() {
String[] names = new String[] {"user_name", "data", "user_action_time"};
TypeInformation[] types =
new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};
return Types.ROW(names, types);
}
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
// 构造 DataStream
// ...
// 基于 "user_action_time" 定义 watermark
DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);
return stream;
}
@Override
public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
// 标记 "user_action_time" 字段是事件时间字段
// 给 "user_action_time" 构造一个时间属性描述符
RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor(
"user_action_time",
new ExistingField("user_action_time"),
new AscendingTimestamps());
List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);
return listRowtimeAttrDescr;
}
}
// register the table source
tEnv.registerTableSource("user_actions", new UserActionSource());
WindowedTable windowedTable = tEnv
.from("user_actions")
.window(Tumble.over(lit(10).minutes()).on($("user_action_time")).as("userActionWindow"));
2、处理时间
处理时间是基于机器的本地时间来处理数据,它是最简单的一种时间概念,但是它不能提供确定性。它既不需要从数据里获取时间,也不需要生成 watermark。
2.1 在创建表的 DDL 中定义
处理时间属性可以在创建表的 DDL 中用计算列的方式定义,用 PROCTIME()
就可以定义处理时间,函数 PROCTIME()
的返回类型是 TIMESTAMP_LTZ
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time AS PROCTIME() -- 声明一个额外的列作为处理时间属性
) WITH (
...
);
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
这里的时间属性,其实是以“计算列”(computed column)的形式定义出来的。所谓的计算列是 Flink SQL 中引入的特殊概念,可以用一个 AS 语句来在表中产生数据中不存在的列,并且可以利用原有的列、各种运算符及内置函数。在前面事件时间属性的定义中,将 ts 字段转换成 TIMESTAMP_LTZ 类型的 ts_ltz,也是计算列的定义方式。
2.2 在 DataStream 到 Table 转换时定义
处 理 时 间 属 性 同 样 可 以 在 将 DataStream 转 换 为 表 的 时 候 来 定 义 。我 们 调 用fromDataStream()方法创建表时,可以用.proctime()后缀来指定处理时间属性字段。由于处理时间是系统时间,原始数据中并没有这个字段,所以处理时间属性一定不能定义在一个已有字段上,只能定义在表结构所有字段的最后,作为额外的逻辑字段出现
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").proctime());