文章目录
- 通过mapGroupsWithState 实现数据分组,手动维护分组状态
- flatMapGroupWithState实现数据分组,手动维护分组状态
提供了两种自定义分组的聚合函数,mapGroupsWithState,flatMapGroupsWithState,允许开发者基于事件时间或处理时间进行有状态的流计算
通过mapGroupsWithState 实现数据分组,手动维护分组状态
根据输入单词,按照事件时间聚合相同分钟数,相同单词出现的次数,并在此过程中通过mapGroupsWithState实现数据分组,手动维护分组状态
package struct
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp
import org.apache.spark.sql.streaming.{GroupStateTimeout, Trigger}
object StructStream10 {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.master("local[*]")
.appName("StructStream10")
.getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("WARN")
val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm")
val wordsDataFrame = spark.readStream
.format("socket")
.option("host", "linux01")
.option("port", 9999)
.load()
.as[String].map(s => {
val arr = s.split(",")
val date = sdf1.parse(arr(0))
(new Timestamp(date.getTime), arr(1))
}).toDF("ts", "word")
val result = wordsDataFrame
.withWatermark("ts", "3 minutes") //设置过期时间3 minutes
.groupByKey[String]((row: Row) => { //调用groupByKey方法自定义分组聚合键,该方法可以接受一个函数,使用 mapGroupsWithState 前提时,必须使用groupByKey实现自定义分组
val timestamp = row.getTimestamp(0)
val currentEventTimeMinute = sdf2.format(new Date(timestamp.getTime))
currentEventTimeMinute + "," + row.getString(1)
})
.mapGroupsWithState[(String, Long), (String, String, Long)](GroupStateTimeout.EventTimeTimeout())((timeAndWord, iterator, groupState) => {
println("当前数据:" + timeAndWord)
println("当前Watermark:" + groupState.getCurrentWatermarkMs())
println("状态是否存在:" + groupState.exists)
println("状态是否过期:" + groupState.hasTimedOut)
var count = 0L
if(groupState.hasTimedOut){
groupState.remove()
}else if(groupState.exists){
val groupCount = groupState.get._2
if(groupCount >= 10){
groupState.remove()
}else{
count = groupState.getOption.getOrElse((timeAndWord, 0L))._2 + iterator.size
groupState.update(timeAndWord, count)
}
}else{
count = iterator.size
groupState.update(timeAndWord, count)
val arr = timeAndWord.split(",")
val timeoutTimestamp = sdf2.parse(arr(0)).getTime
groupState.setTimeoutTimestamp(timeoutTimestamp)
}
if(count != 0){
val arr = timeAndWord.split(",")
(arr(0), arr(1), count)
}else{
null
}
}).filter(_ != null).toDF("time", "word", "count")
val query = result.writeStream
.outputMode("update")
.trigger(Trigger.ProcessingTime(0))
.format("console")
.start()
query.awaitTermination()
}
}
源码
@Experimental
@InterfaceStability.Evolving
def mapGroupsWithState[S: Encoder, U: Encoder](
timeoutConf: GroupStateTimeout)(
func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] = {
val flatMapFunc = (key: K, it: Iterator[V], s: GroupState[S]) => Iterator(func(key, it, s))
Dataset[U](
sparkSession,
FlatMapGroupsWithState[K, V, S, U](
flatMapFunc.asInstanceOf[(Any, Iterator[Any], LogicalGroupState[Any]) => Iterator[Any]],
groupingAttributes,
dataAttributes,
OutputMode.Update,
isMapGroupsWithState = true,
timeoutConf,
child = logicalPlan))
}
使用mapGroupsWithState需要指定两个泛型,S规定了状态的数据类型,U规定了f函数返回值的数据类型
flatMapGroupWithState实现数据分组,手动维护分组状态
flatMapGroupWithState 与 mapGroupsWithState 类似,但是flatMapGroupWithState支持列转行
package chapter9
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode, Trigger}
import org.apache.spark.sql.{Row, SparkSession}
object test10{
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.master("local[*]")
.appName("test10")
.getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("WARN")
val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm")
val wordsDataFrame = spark.readStream
.format("socket")
.option("host", "linux01")
.option("port", 9999)
.load()
.as[String].map(s => {
val arr = s.split(",")
val date = sdf1.parse(arr(0))
(new Timestamp(date.getTime), arr(1))
}).toDF("ts", "gift")
val result = wordsDataFrame
.withWatermark("ts", "3 minutes")
.groupByKey[String]((row: Row) => {
val timestamp = row.getTimestamp(0)
val currentEventTimeMinute = sdf2.format(new Date(timestamp.getTime))
currentEventTimeMinute + "," + row.getString(1)
}).flatMapGroupsWithState[(String, Long), (String, String, Long)](OutputMode.Update(), GroupStateTimeout.EventTimeTimeout())((giftAndTime, iterator, groupState) => {
println("当前数据:" + giftAndTime)
println("当前Watermark:" + groupState.getCurrentWatermarkMs())
println("状态是否存在:" + groupState.exists)
println("状态是否过期:" + groupState.hasTimedOut)
var count = 0L
if(groupState.hasTimedOut){
groupState.remove()
}else if(groupState.exists){
val groupCount = groupState.get._2
if(groupCount >= 10){
groupState.remove()
}else{
count = groupState.getOption.getOrElse((giftAndTime, 0L))._2 + iterator.size
groupState.update(giftAndTime, count)
}
}else{
count = iterator.size
groupState.update(giftAndTime, count)
val arr = giftAndTime.split(",")
val timeoutTimestamp = sdf2.parse(arr(0)).getTime
groupState.setTimeoutTimestamp(timeoutTimestamp)
}
val result = collection.mutable.ArrayBuffer[(String, String, Long)]()
if(count != 0){
val arr1 = giftAndTime.split(",")
val arr2 = arr1(1).split("_")
for(s <- arr2){
result.append((arr1(0).trim, s.trim, count))
}
}
result.iterator
}).toDF("ts", "gift", "count")
// .withWatermark("ts", "3 minutes")
// .groupBy($"gift")
// .count()
val query = result.writeStream
.outputMode("update")
.trigger(Trigger.ProcessingTime(0))
.format("console")
.start()
query.awaitTermination()
}
}