0
点赞
收藏
分享

微信扫一扫

spark大数据分析:spark Struct Strreaming(24)Stream-Static/Stream模式下的innerjoin操作

悄然丝语 2022-01-31 阅读 74
sparkjavasql



文章目录


  • ​​流与静态数据的join​​
  • ​​Stream-Stream的join​​
  • ​​总结​​


流与静态数据的join

对于流式数据与静态数据的join操作,直接DataFrame之间的join即可

val spark = SparkSession
.builder
.master("local[*]")
.appName("Chapter9_8_1")
.getOrCreate()

import spark.implicits._
spark.sparkContext.setLogLevel("WARN")

val javaList = new java.util.ArrayList[Row]()
javaList.add(Row("Alice", "Female"))
javaList.add(Row("Bob", "Male"))
javaList.add(Row("Thomas", "Male"))

val schema = StructType(List(
StructField("name", StringType, nullable = false),
StructField("sex", StringType, nullable = false)
))

val staticDataFrame = spark.createDataFrame(javaList, schema)

val lines = spark.readStream
.format("socket")
.option("host", "linux01")
.option("port", 9999)
.load()

val streamDataFrame = lines.as[String].map(s => {
val arr = s.split(",")
(arr(0), arr(1).toInt)
}).toDF("name", "age")

val joinResult = streamDataFrame.join(staticDataFrame, "name")

val query = joinResult.writeStream
.outputMode("append")
.trigger(Trigger.ProcessingTime(0))
.format("console")
.start()

query.awaitTermination()

Stream-Stream的join

在spark2.3之后才支持流inner join操作,输出模式必须为Append,两个流发生join操作,Struct Strreaming会维护两个流状态,保障后续流入的数据与之前流入数据发生join操作,但是这会导致状态无限增长,因此两个流发生join,通过Watermark机制来清除过期的状态,避免状态无限增长,默认以两个流中最小watermark为基准,当然在2.4之后可以通过multipleWatermarkPolicy属性进行配置

package struct

import java.text.SimpleDateFormat

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.FileStreamSource.Timestamp
import org.apache.spark.sql.streaming.Trigger

object StructStream09 {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.master("local[*]")
.appName("Chapter9_8_2")
.getOrCreate()

import org.apache.spark.sql.functions._
import spark.implicits._
spark.sparkContext.setLogLevel("WARN")

val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val streamNameSex = spark.readStream
.format("socket")
.option("host", "linux01")
.option("port", 9998)
.load()
.as[String].map(s => {
val arr = s.split(",")
val date = sdf.parse(arr(2))
(arr(0), arr(1), new Timestamp(date.getTime))
}).toDF("name1", "sex", "ts1")

val streamNameAge = spark.readStream
.format("socket")
.option("host", "linux01")
.option("port", 9999)
.load()
.as[String].map(s => {
val arr = s.split(",")
val date = sdf.parse(arr(2))
(arr(0), arr(1).toInt, new Timestamp(date.getTime))
}).toDF("name2", "age", "ts2")

val streamNameSexWithWatermark = streamNameSex.withWatermark("ts1", "2 minutes")
val streamNameAgeWithWatermark = streamNameAge.withWatermark("ts2", "1 minutes")
val joinResult = streamNameSexWithWatermark.join(
streamNameAgeWithWatermark,
expr(
"""
name1 = name2 AND
ts2 >= ts1 AND
ts2 <= ts1 + interval 1 minutes
"""),
joinType = "inner")
val query = joinResult.writeStream
.outputMode("append")
.trigger(Trigger.ProcessingTime(0))
.format("console")
.start()

query.awaitTermination()
}
}

只有当ts1<=ts2<=ts1+1分钟,且name1=name2才会发生join操作,对于joinType如果不设置默认是inner join,还可以选择left_outer / right_outer

总结

static 与 static支持所有join

Stream join Static 支持inner join以及left join

Static join Stream 支持inner join以及right join

流与流之间的join 在left out join 中如果左侧条件在右侧没有找到会出现null值,为提高join的精确度,右边的streaming需要设withWaterMark方法设置Threshold,可能会有数据延迟,但是输出更有意义

right out join 反之



举报

相关推荐

0 条评论