Flink SQL 也支持三种窗口类型,分别为 Tumble Windows、HOP Windows 和 Session Windows,其中 HOP Windows 对应 Table API 中的 Sliding Window,同时每种窗口分别有相 应的使用场景和方法。
案例:统计最近每 5 秒中内,每个基站的通话成功时间总和:
package tablesql
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, Table, Tumble}
import org.apache.flink.types.Row
import window.StationLog
/**
* @Author yqq
* @Date 2021/12/28 19:10
* @Version 1.0
*/
object TestTumbleWindowBySQL {
def main(args: Array[String]): Unit = {
//每隔5秒中统计,每个基站的通话数量,假设数据乱序,最多延迟3秒,需要水位线
val streaEnv = StreamExecutionEnvironment.getExecutionEnvironment
val settings = EnvironmentSettings.newInstance().inStreamingMode().inStreamingMode().build()
//定义采用EventTime作为时间语义
streaEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(streaEnv, settings)
//读取数据
//读取数据源
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.scala._
val stream: DataStream[StationLog] = streaEnv.socketTextStream("node1", 8888)
.map(line => {
val arr: Array[String] = line.split(",")
new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
})//引入Watermark,让窗口延迟触发
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[StationLog](Time.seconds(3)) {
override def extractTimestamp(element: StationLog): Long = {
element.callTime
}
})
//滑动窗口,窗口大小5秒,需求,统计每5秒内,每个基站的成功通话总时长
//注册一张表,并且指定EventTime是哪个字段
tableEnv.registerDataStream(
"t_station_log",stream,'sid,'callOut,'callInput,'callType,'callTime.rowtime,'duration)
//sql开窗
val result: Table = tableEnv.sqlQuery(
"select sid,sum(duration) as ds from t_station_log where callType='success' group by " +
"tumble(callTime,interval '5' second),sid")
//打印结果
tableEnv.toRetractStream[Row](result)
.filter(_._1==true)
.print()
tableEnv.execute("sql")
}
}