0
点赞
收藏
分享

微信扫一扫

spark大数据分析:spark Struct Strreaming(21) 数据流处理

中间件小哥 2022-01-31 阅读 55



文章目录


  • ​​根据文件生成工作流​​
  • ​​根据kafka生成工作流​​
  • ​​以kafka为数据源,通过Batch模式生成工作流​​
  • ​​根据指定速率生成工作流​​


根据文件生成工作流

当文件夹命名为"key=value"形式时,Struct Strreaming会自动遍历当前文件夹下的子文件,根据文件名实现自动分区

package struct

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

object StructStream02 {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.master("local[*]")
.appName("Chapter9_4_1")
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")

val userSchema = StructType(List(
StructField("name", StringType, nullable = false),
StructField("sex", StringType, nullable = false),
StructField("age", IntegerType, nullable = false)
))
spark.readStream.format("csv")
.schema(userSchema)
.load("D://a.txt")


val result = spark.sql("SELECT sex, AVG(age) FROM t_user GROUP BY sex")

val query = result.writeStream
.outputMode("complete")
.trigger(Trigger.ProcessingTime(0))
.format("console")
.start()

query.awaitTermination()
}
}

根据kafka生成工作流

kafka默认无须自己管理偏移量,在不设置checkPoint的情况下,默认是最新偏移量开始读取数据,在设置checkPoint后,程序重启时,继续上一次的偏移量开始消费

package struct

import org.apache.spark.sql.SparkSession

object StructStream03 {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.master("local[*]")
.appName("Chapter9_4_3")
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")

import spark.implicits._
val inputDataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "note01:9092,note02:9092,note03:9092")
.option("subscribe", "StructStream03")
.load()
val keyValueDataset = inputDataFrame.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
val subwayDataFrame = keyValueDataset.flatMap(t => {
val arr = t._2.split(",")
Array((arr(0), arr(1)), (arr(0), arr(2)))
}).toDF("city", "station_in_or_out")

subwayDataFrame.createTempView("t_subway")

val result = spark.sql("SELECT city, station_in_or_out, count(1) as hot FROM t_subway GROUP BY city, station_in_or_out ORDER BY city, hot desc")

val query = result.writeStream
.outputMode("complete")
.format("console")
.option("checkpointLocation", "./StructStream03")
.start()

query.awaitTermination()
}
}

以kafka为数据源,通过Batch模式生成工作流

这种模式一般需要设置消费时的起始偏移量和结束偏移量,在不设置CheckPoint情况下默认起始方式偏移量earlist,结束偏移量为latest,该模式为一次性作业,非持续性处理数据

package struct

import org.apache.spark.sql.SparkSession

object StructStream04 {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.master("local[*]")
.appName("StructStream04")
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")

import spark.implicits._
val inputDataFrame = spark
//batch方式
.read
.format("kafka")
.option("kafka.bootstrap.servers", "note01:9092,note02:9092,note03:9092")
.option("subscribe", "StructStream04")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()

val keyValueDataset = inputDataFrame.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]

val subwayDataFrame = keyValueDataset.flatMap(t => {
val arr = t._2.split(",")
Array((arr(0), arr(1)), (arr(0), arr(2)))
}).toDF("city", "station_in_or_out")

subwayDataFrame.createTempView("t_subway")

val result = spark.sql("SELECT city, station_in_or_out, count(1) as hot FROM t_subway GROUP BY city, station_in_or_out ORDER BY city, hot desc")

val query = result.write
.format("console")
.save()
}
}

根据指定速率生成工作流

package struct

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger

object StructStream05 {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.master("local[*]")
.appName("Chapter9_4_5")
.getOrCreate()

import spark.implicits._
spark.sparkContext.setLogLevel("WARN")
val rows = spark
.readStream
.format("rate")
//每秒处理条数
.option("rowsPerSecond ", 10)
.option("rampUpTime ", 2)
.option("numPartitions ", 2)
.load()

val query = rows.writeStream
.outputMode("append")
.trigger(Trigger.ProcessingTime(2000))
.format("console")
.start()

query.awaitTermination()
}
}



举报

相关推荐

0 条评论