0
点赞
收藏
分享

微信扫一扫

Flink Sql With 1.14 查询 - Window Join(译)

一天清晨 2022-02-24 阅读 62

Window Join #

流媒体

窗口连接将时间维度添加到连接标准本身。这样做时,窗口连接将两个流的元素连接起来,这两个流共享一个公共键并位于同一个窗口中。窗口连接的语义与DataStream 窗口连接的语义相同

对于流式查询,与连续表上的其他连接不同,窗口连接不会发出中间结果,而只会在窗口结束时发出最终结果。此外,窗口连接会在不再需要时清除所有中间状态。

通常,Window Join 与Windowing TVF一起使用。此外,Window Join 可以在其他基于Windowing TVF的操作之后进行,例如Window Aggregation、Window TopN和Window Join。

目前,Window Join 要求 join on 条件包含输入表的窗口开始相等和输入表的窗口结束相等。

Window Join 支持 INNER/LEFT/RIGHT/FULL OUTER/ANTI/SEMI JOIN。

内 / 左 / 右 / 全外  #

下面显示了 INNER / LEFT / RIGHT / FULL OUTER Window Join 语句的语法。

SELECT ...
FROM L [LEFT|RIGHT|FULL OUTER] JOIN R -- L and R are relations applied windowing TVF
ON L.window_start = R.window_start AND L.window_end = R.window_end AND ...

INNER / LEFT / RIGHT / FULL OUTER WINDOW JOIN 的语法非常相似,这里我们只举一个FULL OUTER JOIN 的例子。执行窗口连接时,所有具有公共键和公共翻转窗口的元素都会连接在一起。我们仅给出一个适用于 Tumble Window TVF 的 Window Join 示例。通过将连接的时间区域限定为固定的五分钟间隔,我们将数据集划分为两个不同的时间窗口:[12:00, 12:05) 和 [12:05, 12:10)。L2 和 R2 行无法连接在一起,因为它们落入单独的窗口中。

Flink SQL> desc LeftTable;
+----------+------------------------+------+-----+--------+----------------------------------+
|     name |                   type | null | key | extras |                        watermark |
+----------+------------------------+------+-----+--------+----------------------------------+
| row_time | TIMESTAMP(3) *ROWTIME* | true |     |        | `row_time` - INTERVAL '1' SECOND |
|      num |                    INT | true |     |        |                                  |
|       id |                 STRING | true |     |        |                                  |
+----------+------------------------+------+-----+--------+----------------------------------+

Flink SQL> SELECT * FROM LeftTable;
+------------------+-----+----+
|         row_time | num | id |
+------------------+-----+----+
| 2020-04-15 12:02 |   1 | L1 |
| 2020-04-15 12:06 |   2 | L2 |
| 2020-04-15 12:03 |   3 | L3 |
+------------------+-----+----+

Flink SQL> desc RightTable;
+----------+------------------------+------+-----+--------+----------------------------------+
|     name |                   type | null | key | extras |                        watermark |
+----------+------------------------+------+-----+--------+----------------------------------+
| row_time | TIMESTAMP(3) *ROWTIME* | true |     |        | `row_time` - INTERVAL '1' SECOND |
|      num |                    INT | true |     |        |                                  |
|       id |                 STRING | true |     |        |                                  |
+----------+------------------------+------+-----+--------+----------------------------------+

Flink SQL> SELECT * FROM RightTable;
+------------------+-----+----+
|         row_time | num | id |
+------------------+-----+----+
| 2020-04-15 12:01 |   2 | R2 |
| 2020-04-15 12:04 |   3 | R3 |
| 2020-04-15 12:05 |   4 | R4 |
+------------------+-----+----+

Flink SQL> SELECT L.num as L_Num, L.id as L_Id, R.num as R_Num, R.id as R_Id, L.window_start, L.window_end
           FROM (
               SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
           ) L
           FULL JOIN (
               SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
           ) R
           ON L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end;
+-------+------+-------+------+------------------+------------------+
| L_Num | L_Id | R_Num | R_Id |     window_start |       window_end |
+-------+------+-------+------+------------------+------------------+
|     1 |   L1 |  null | null | 2020-04-15 12:00 | 2020-04-15 12:05 |
|  null | null |     2 |   R2 | 2020-04-15 12:00 | 2020-04-15 12:05 |
|     3 |   L3 |     3 |   R3 | 2020-04-15 12:00 | 2020-04-15 12:05 |
|     2 |   L2 |  null | null | 2020-04-15 12:05 | 2020-04-15 12:10 |
|  null | null |     4 |   R4 | 2020-04-15 12:05 | 2020-04-15 12:10 |
+-------+------+-------+------+------------------+------------------+

注意:为了更好地理解窗口化的行为,我们简化了时间戳值的显示以不显示尾随零,例如如果类型为 ,2020-04-15 08:05则应显示为2020-04-15 08:05:00.000在 Flink SQL 客户端中TIMESTAMP(3)

半连接(SEMI )#

如果在 Semi Window Joins的右侧至少有一个匹配行,则半窗口连接从一个左侧记录返回一行。

Flink SQL> SELECT *
           FROM (
               SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
           ) L WHERE L.num IN (
             SELECT num FROM (   
               SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
             ) R WHERE L.window_start = R.window_start AND L.window_end = R.window_end);
+------------------+-----+----+------------------+------------------+-------------------------+
|         row_time | num | id |     window_start |       window_end |            window_time  |
+------------------+-----+----+------------------+------------------+-------------------------+
| 2020-04-15 12:03 |   3 | L3 | 2020-04-15 12:00 | 2020-04-15 12:05 | 2020-04-15 12:04:59.999 |
+------------------+-----+----+------------------+------------------+-------------------------+

Flink SQL> SELECT *
           FROM (
               SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
           ) L WHERE EXISTS (
             SELECT * FROM (
               SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
             ) R WHERE L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end);
+------------------+-----+----+------------------+------------------+-------------------------+
|         row_time | num | id |     window_start |       window_end |            window_time  |
+------------------+-----+----+------------------+------------------+-------------------------+
| 2020-04-15 12:03 |   3 | L3 | 2020-04-15 12:00 | 2020-04-15 12:05 | 2020-04-15 12:04:59.999 |
+------------------+-----+----+------------------+------------------+-------------------------+

注意:为了更好地理解窗口化的行为,我们简化了时间戳值的显示以不显示尾随零,例如如果类型为 ,2020-04-15 08:05则应显示为2020-04-15 08:05:00.000在 Flink SQL 客户端中TIMESTAMP(3)

反连接(ANTI) #

Anti Window Joins 是 Inner Window Join 的反面:它们包含每个公共窗口中的所有未连接行。

Flink SQL> SELECT *
           FROM (
               SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
           ) L WHERE L.num NOT IN (
             SELECT num FROM (   
               SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
             ) R WHERE L.window_start = R.window_start AND L.window_end = R.window_end);
+------------------+-----+----+------------------+------------------+-------------------------+
|         row_time | num | id |     window_start |       window_end |            window_time  |
+------------------+-----+----+------------------+------------------+-------------------------+
| 2020-04-15 12:02 |   1 | L1 | 2020-04-15 12:00 | 2020-04-15 12:05 | 2020-04-15 12:04:59.999 |
| 2020-04-15 12:06 |   2 | L2 | 2020-04-15 12:05 | 2020-04-15 12:10 | 2020-04-15 12:09:59.999 |
+------------------+-----+----+------------------+------------------+-------------------------+

Flink SQL> SELECT *
           FROM (
               SELECT * FROM TABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
           ) L WHERE NOT EXISTS (
             SELECT * FROM (
               SELECT * FROM TABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time), INTERVAL '5' MINUTES))
             ) R WHERE L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end);
+------------------+-----+----+------------------+------------------+-------------------------+
|         row_time | num | id |     window_start |       window_end |            window_time  |
+------------------+-----+----+------------------+------------------+-------------------------+
| 2020-04-15 12:02 |   1 | L1 | 2020-04-15 12:00 | 2020-04-15 12:05 | 2020-04-15 12:04:59.999 |
| 2020-04-15 12:06 |   2 | L2 | 2020-04-15 12:05 | 2020-04-15 12:10 | 2020-04-15 12:09:59.999 |
+------------------+-----+----+------------------+------------------+-------------------------+

注意:为了更好地理解窗口化的行为,我们简化了时间戳值的显示以不显示尾随零,例如如果类型为 ,2020-04-15 08:05则应显示为2020-04-15 08:05:00.000在 Flink SQL 客户端中TIMESTAMP(3)

局限说明  #

Join 子句局限 #

目前,window join要求join on条件包含窗口开始相等和窗口结束相等。将来,我们还可以简化join on子句,使其只包含窗口TVF为TUMBLE或HOP时的窗口start相等。

输入窗口 TVF 的限制 #

目前,左右输入的窗口 TVF 必须相同。这可以在将来扩展,例如,翻滚窗口加入具有相同窗口大小的滑动窗口。

在直接对 TVF 进行窗口化之后,对 Window Join 的限制 #

目前,如果在 Windowing TVF 之后跟随 Window Join ,则 Windowing TVF 必须使用 Tumble Windows、Hop Windows 或 Cumulate Windows 而不是 Session 窗口。

举报

相关推荐

0 条评论