Window介绍
DataStream:
https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/datastream/operators/windows/
SQL:
https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/table/sql/queries/window-tvf/
1、为什么需要Window?
2、Window有哪些控制属性?
3、基于时间的滑动和滚动窗口
窗口的长度(大小) > 窗口的间隔 : 如每隔5s, 计算最近10s的数据 【滑动窗口】
窗口的长度(大小) = 窗口的间隔: 如每隔10s,计算最近10s的数据 【滚动窗口】
窗口的长度(大小) < 窗口的间隔: 每隔15s,计算最近10s的数据 【没有名字,不用】
滚动窗口 Tumble (DataStream Tumbling Window)
滑动窗口 HOP (DataStream Sliding Window)
累积窗口 Cumulate (DataStream没有)
CUMULATE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL '1' HOURS, INTERVAL '1' DAYS))
这里我们基于时间属性 ts,在表 EventTable 上定义了一个统计周期为 1 天、累积步长为 1 小时的累积窗口。注意第三个参数为步长 step ,第四个参数则是最大窗口长度。
4、processTime Window
/**
*
* {"username":"zs","price":20}
* {"username":"lisi","price":15}
* {"username":"lisi","price":20}
* {"username":"zs","price":20}
* {"username":"zs","price":20}
* {"username":"zs","price":20}
* {"username":"zs","price":20}
*
*/
//窗口触发的条件 1. 系统时间大于等于窗口的结束时间 2. 窗口内有数据
//滚动窗口 TUMBLE(TABLE KafkaTable, DESCRIPTOR(event_time), INTERVAL '10' SECOND)
//滑动窗口 :每隔10秒,计算最近10秒数据。统计每个用户在最近10秒消费的次数和总金额
CREATE TABLE KafkaTable (
`username` STRING,
`price` INT,
`event_time` as proctime() -- 计算列
) WITH (
'connector' = 'kafka',
'topic' = 'topic1',
'properties.bootstrap.servers' = 'hadoop11:9092,hadoop12:9092,hadoop13:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
select username,window_start,window_end,count(*) cnt,sum(price) total_price
from table(TUMBLE(TABLE KafkaTable, DESCRIPTOR(event_time), INTERVAL '10' SECOND))
group by username,window_start,window_end;
//滑动窗口:每隔30秒,计算最近1分钟每隔用户消费次数和消费总金额。
CREATE TABLE KafkaTable (
`username` STRING,
`price` INT,
`event_time` as proctime() -- 计算列
) WITH (
'connector' = 'kafka',
'topic' = 'topic1',
'properties.bootstrap.servers' = 'hadoop11:9092,hadoop12:9092,hadoop13:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
select username,window_start,window_end,count(*) cnt,sum(price) total_price
from table(HOP(TABLE KafkaTable, DESCRIPTOR(event_time), INTERVAL '30' SECOND, INTERVAL '60' SECOND))
group by username,window_start,window_end;