0
点赞
收藏
分享

微信扫一扫

Flink Table&SQL 时间属性

Flink Table&SQL 时间属性_字段


Flink can process data based on different notions of time.

  • Processing time refers to the machine’s system time (also known as epoch time, e.g. Java’s System.currentTimeMillis()) that is executing the respective operation.
  • Event time refers to the processing of streaming data based on timestamps that are attached to each row. The timestamps can encode when an event happened


基于时间的操作(比如时间窗口),需要定义相关的时间语义和时间数据来源的信息。


在Table API 和 SQL 中,会给表单独提供一个逻辑上的时间字段,专门用来在表处理程序中指示时间。


时间属性的数据类型为 TIMESTAMP,它的行为类似于常规时间戳,可以直接访问并且进行计算。


每种类型的表都可以有时间属性:

  • 可以在用CREATE TABLE DDL创建表的时候指定
  • 可以在 DataStream 中指定
  • 可以在定义 TableSource 时指定(过时)

一旦时间属性定义好,它就可以像普通列一样使用,也可以在时间相关的操作中使用。


Table API 程序需要在 streaming environment 中指定时间属性

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default

// 或者:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

1、事件时间

事件时间允许程序按照数据中包含的时间来处理,这样可以在有乱序或者晚到的数据的情况下产生一致的处理结果。它可以保证从外部存储读取数据后产生可以复现(replayable)的结果。


除此之外,事件时间可以让程序在流式和批式作业中使用同样的语法。在流式程序中的事件时间属性,在批式程序中就是一个正常的时间字段。


事件时间语义最大的用途就是处理乱序事件或者延迟事件的场景。我们通过设置水位线(watermark)来表示事件时间的进展,而水位线可以根据数据的最大时间戳设置一个延迟时间。这样即使在出现乱序的情况下,对数据的处理也可以获得正确的结果。


为了处理无序事件,并区分流中的迟到事件。Flink 需要从事件数据中提取时间戳,并生成水位线,用来推进事件时间的进展


事件时间属性也有类似于处理时间的二种定义方式:

  • 在DDL中定义
  • 在 DataStream 到 Table 转换时定义

1.1 在 DDL 中定义

事件时间属性可以用 WATERMARK 语句在 CREATE TABLE DDL 中进行定义。WATERMARK 语句在一个已有字段上定义一个 watermark 生成表达式,同时标记这个已有字段为时间属性字段


Flink 支持和在 TIMESTAMP 列和 TIMESTAMP_LTZ 列上定义事件时间


(1)如果源数据中的时间戳数据表示为年-月-日-时-分-秒,则通常为不带时区信息的字符串值,例如 ​​2020-04-15 20:13:40.564​​,建议将事件时间属性定义在 TIMESTAMP 列上:

CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time TIMESTAMP(3),
-- 声明 user_action_time 是事件时间属性,并且用 延迟 5 秒的策略来生成 watermark
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

(2)源数据中的时间戳数据表示为一个纪元 (epoch) 时间,通常是一个 long 值,

例如:​​1618989564564​​​,建议将事件时间属性定义在 ​TIMESTAMP_LTZ​ 列上:

CREATE TABLE user_actions ( user_name STRING, data STRING, ts BIGINT, time_ltz AS TO_TIMESTAMP_LTZ(ts, 3), -- declare time_ltz as event time attribute and use 5 seconds delayed watermark strategy WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND) WITH ( ...);SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)FROM user_actionsGROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);

1.2 在 DataStream 到 Table 转换时定义

事件时间属性可以在将 DataStream 转换为表的时候来定义。我们调用fromDataStream()方法创建表时,可以追加参数来定义表中的字段结构;这时可以给某个字段加上.rowtime() 后缀,就表示将当前字段指定为事件时间属性

这个字段:

  • 可以是数据中本不存在、额外追加上去的“逻辑字段”,就像之前 DDL 中定义的第二种情况;
  • 可以是本身固有的字段,那么这个字段就会被事件时间属性所覆盖,类型也会被转换为 TIMESTAMP


事件时间属性可以用 ​​.rowtime​​​ 后缀在定义 ​​DataStream​​​ schema 的时候来定义。时间戳和 watermark 在这之前一定是在 ​​DataStream​​ 上已经定义好了。在从 DataStream 转换到 Table 时,由于 DataStream 没有时区概念,因此 Flink 总是将 ​rowtime​ 属性解析成 ​TIMESTAMP WITHOUT TIME ZONE​ 类型,并且将所有事件时间的值都视为 UTC 时区的值。


在从 ​​DataStream​​​ 到 ​​Table​​​ 转换时定义事件时间属性有两种方式。取决于用 ​​.rowtime​​ 后缀修饰的字段名字是否是已有字段,事件时间字段可以是:

  • 在 schema 的结尾追加一个新的字段
  • 替换一个已经存在的字段

不管在哪种情况下,事件时间字段都表示 ​​DataStream​​ 中定义的事件的时间戳。

package table.time;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.Duration;import java.time.ZoneId;import static org.apache.flink.table.api.Expressions.$;public class EventTimeAttr {    public static void main(String[] args) throws Exception {        // 获取流环境        StreamExecutionEnvironment env =                StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(1);        // 获取表环境        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);        // 设置为上海时区        tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));        SingleOutputStreamOperator<Tuple3<String, String,Long>> streamSource = env.addSource(                        new DataGeneratorSource<Tuple3<String, String,Long>>(                                new RandomGenerator<Tuple3<String, String,Long>>() {                                    @Override                                    public Tuple3<String, String,Long> next() {                                        return new Tuple3<String, String,Long>(                                                random.nextHexString(5),                                                random.nextHexString(3),                                                System.currentTimeMillis()                                        );                                    }                                }                        ))                .returns(Types.TUPLE(Types.STRING, Types.STRING,Types.LONG));        // 基于 stream 中的事件产生时间戳和 watermark        SingleOutputStreamOperator<Tuple3<String, String,Long>> stream = streamSource.assignTimestampsAndWatermarks(                WatermarkStrategy.<Tuple3<String, String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(1))                        .withTimestampAssigner(                                new SerializableTimestampAssigner<Tuple3<String, String,Long>>() {                                    @Override                                    public long extractTimestamp(Tuple3<String, String,Long> element, long recordTimestamp) {                                        return element.f2;                                    }                                }                        )        );        // 声明一个额外的逻辑字段作为事件时间属性        Table table = tableEnv.fromDataStream(stream,                $("user_name"),                $("data"),             //   $("timestamp"),                $("ts").rowtime());        table.printSchema();        tableEnv.executeSql("select * from " + table).print();        env.execute();    }}

(
`user_name` STRING,
`data` STRING,
`ts` TIMESTAMP(3) *ROWTIME*
)

1.3 使用 TableSource 定义(已过时)

事件时间属性可以在实现了 ​​DefinedRowTimeAttributes​​​ 的 ​​TableSource​​​ 中定义。​​getRowtimeAttributeDescriptors()​​​ 方法返回 ​​RowtimeAttributeDescriptor​​ 的列表,包含了描述事件时间属性的字段名字、如何计算事件时间、以及 watermark 生成策略等信息。


同时需要确保 ​​getDataStream​​​ 返回的 ​​DataStream​​​ 已经定义好了时间属性。只有在定义了​​StreamRecordTimestamp​​​ 时间戳分配器的时候,才认为 ​​DataStream​​​是有时间戳信息的。只有定义了 ​​PreserveWatermarks​​​ watermark 生成策略的 ​​DataStream​​ 的 watermark 才会被保留。反之,则只有时间字段的值是生效的。

// 定义一个有事件时间属性的 table source
public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttributes {

@Override
public TypeInformation<Row> getReturnType() {
String[] names = new String[] {"user_name", "data", "user_action_time"};
TypeInformation[] types =
new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};
return Types.ROW(names, types);
}

@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
// 构造 DataStream
// ...
// 基于 "user_action_time" 定义 watermark
DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);
return stream;
}

@Override
public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
// 标记 "user_action_time" 字段是事件时间字段
// 给 "user_action_time" 构造一个时间属性描述符
RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor(
"user_action_time",
new ExistingField("user_action_time"),
new AscendingTimestamps());
List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);
return listRowtimeAttrDescr;
}
}

// register the table source
tEnv.registerTableSource("user_actions", new UserActionSource());

WindowedTable windowedTable = tEnv
.from("user_actions")
.window(Tumble.over(lit(10).minutes()).on($("user_action_time")).as("userActionWindow"));

2、处理时间

处理时间是基于机器的本地时间来处理数据,它是最简单的一种时间概念,但是它不能提供确定性。它既不需要从数据里获取时间,也不需要生成 watermark。

2.1 在创建表的 DDL 中定义

处理时间属性可以在创建表的 DDL 中用计算列的方式定义,用 ​​PROCTIME()​​ 就可以定义处理时间,函数 PROCTIME() 的返回类型是 TIMESTAMP_LTZ 

CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time AS PROCTIME() -- 声明一个额外的列作为处理时间属性
) WITH (
...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

这里的时间属性,其实是以“计算列”(computed column)的形式定义出来的。所谓的计算列是 Flink SQL 中引入的特殊概念,可以用一个 AS 语句来在表中产生数据中不存在的列,并且可以利用原有的列、各种运算符及内置函数。在前面事件时间属性的定义中,将 ts 字段转换成 TIMESTAMP_LTZ 类型的 ts_ltz,也是计算列的定义方式。


2.2 在 DataStream 到 Table 转换时定义

处 理 时 间 属 性 同 样 可 以 在 将 DataStream 转 换 为 表 的 时 候 来 定 义 。我 们 调 用fromDataStream()方法创建表时,可以用.proctime()后缀来指定处理时间属性字段。由于处理时间是系统时间,原始数据中并没有这个字段,所以处理时间属性一定不能定义在一个已有字段上,只能定义在表结构所有字段的最后,作为额外的逻辑字段出现

Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").proctime());

举报

相关推荐

0 条评论