文章目录
- 基于事件时间的窗口操作
- 事件时间窗口方式
- 事件时间窗口生成规则
基于事件时间的窗口操作
在 Struct Strreaming中,可以按照事件真实发生时间对附近范围内的数据进行聚合操作,即基于事件时间窗口进行操作,在这种机制下,不必考虑事件到达顺序与事件发生顺序一致,大大减少了开发者工作量
一条数据可以被称为一个事件,在生成数据时携带的时间可以称为事件时间
案例
package struct
import java.text.SimpleDateFormat
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp
import org.apache.spark.sql.streaming.Trigger
object StructStream06 {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.master("local[*]")
.appName("StructStream06")
.getOrCreate()
import org.apache.spark.sql.functions._
import spark.implicits._
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val lines = spark.readStream.format("socket").option("host", "note01").option("port", 9999).load()
val words = lines.as[String].map(s => {
val arr = s.split(",")
val date = sdf.parse(arr(0))
(new Timestamp(date.getTime, arr(1)))
}).toDF("ts", "word")
val wordCounts = words.groupBy(
window($"ts", "10 minutes", "2 minutes"), $"word"
).count()
val query = wordCounts.writeStream.outputMode("complete").trigger(Trigger.ProcessingTime(0)).format("console")
.start()
query.awaitTermination()
}
}
事件时间窗口方式
val wordCounts = words.groupBy(
window($"ts", "10 minutes", "2 minutes"), $"word"
).count()
$“ts” 指定存放时间的列,10 minutes窗口宽度,2 minutes滑动宽度
事件时间窗口生成规则
package struct
import java.sql.Date
import java.text.SimpleDateFormat
object StructStream07 {
def main(args: Array[String]): Unit = {
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
def main(args: Array[String]): Unit = {
//事件时间
val eventTime = sdf.parse("2019-03-08 12:00:00")
val eventTimestamp = eventTime.getTime
println(eventTimestamp)
//起始时间
val startTime = 0L
//窗口时间宽度: minutes
val windowDuration = 10 * 60 * 1000L
//窗口滑动长度: minutes
val slideDuration = 2 * 60 * 1000L
if(slideDuration > windowDuration) {
println("slideDuration必须小于或等于windowDuration")
return
}
//根据窗口宽度与滑动宽度,计算窗口连续滑动多少次,滑动长度的总和才等于窗口宽度。如果滑动次数为小数,则进位。
val overlappingWindows = math.ceil(windowDuration * 1.0 / slideDuration).toInt
//生成每一个window的起始时间和结束时间
for(i <- 0 until overlappingWindows){
//根据事件时间,按照slideDuration宽度,计算供需多少次滚动,或者说是需要连续生成多少个window,才可以将窗口的起始时间滚动至即将脱离事件时间的位置
val division = (eventTimestamp - startTime) / slideDuration.toDouble
//如果无法整除,则意味着需要一个新的窗口容纳事件,即,小数部分直接进位。
val ceil = math.ceil(division)
//如果恰巧整除,即,移动次数恰巧为整数,则需要新增一个窗口,这是因为window的起始时间和结束时间的区间为“前闭后开”,即[startTime, endTime)
//如果没有被整除,则已经进位,无需再+1。最后将该值作为windowId。
val windowId = if(ceil == division) ceil + 1 else ceil
//计算窗口起始时间 = ()
val windowStart = (windowId + i - overlappingWindows) * slideDuration + startTime
//计算窗口的结束时间 = 当前窗口的起始时间 + 滑动长度
val windowEnd = windowStart + windowDuration
if(eventTimestamp >= windowStart.toLong && eventTimestamp < windowEnd.toLong){
val startTimeString = sdf.format(new Date(windowStart.toLong))
val endTimeString = sdf.format(new Date(windowEnd.toLong))
println("[" + startTimeString + ", " + endTimeString + ")")
}
}
}
}
}