文章目录
- 输出模式使用场景
- Append模式
- Complete输出模式
- Update模式
- 基于File Sink
- 基于Kafka Sink 以Streaming方式输出数据
- 基于Kafka Sink 以Batch方式输出数据
- 基于Console sink输出数据
- 基于Memory Sink
- Foreach Sink
- ForeachBatch Sink
输出模式使用场景
Append模式
默认输出模式,采用这种输出模式,保证数据每行只输出一次,查询过程中如果没有设置Watermark,不能使用聚合操作,使用了只能根据事件时间聚合操作
Complete输出模式
每一个批次的数据处理完毕后,输出截止目前所有的统计结果,因此在使用Complete输出模式,必须包含聚合操作,这种模式使用Watermark无效
Update模式
输出与之前批次相比变动的内容,没有涉及聚合操作,update与append相同,涉及聚合操作,可以基于Watermark清理过期状态
基于File Sink
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
object test{
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.master("local[*]")
.appName("test")
.getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("WARN")
val lines = spark
.readStream
.format("socket")
.option("host", "linux01")
.option("port", 9999)
.load()
val words = lines.as[String]
.flatMap(_.split(" "))
.map(s => (s, s.reverse))
.toDF("原单词", "单词反转")
val query = words.writeStream
.outputMode("append")
.trigger(Trigger.ProcessingTime(0))
.format("json")
.option("path", "G:\\BookData\\chapter9\\9_10_2")
.option("checkpointLocation", "./checkpoint_chapter9_10_2")
.start()
query.awaitTermination()
}
}
file sink 只支持Append模式,支持数据格式为ORC,JSON,CSV
基于Kafka Sink 以Streaming方式输出数据
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
object test {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.master("local[*]")
.appName("test ")
.getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("WARN")
val lines = spark
.readStream
.format("socket")
.option("host", "linux01")
.option("port", 9999)
.load()
val wordCount = lines.as[String]
.flatMap(_.split(" "))
.groupBy("value")
.count()
.map(row => row.getString(0) + "," + row.getLong(1).toString)
.toDF("value")
val query = wordCount
.writeStream
.outputMode("update")
.trigger(Trigger.Continuous(0))
.format("kafka")
.option("kafka.bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092")
.option("topic", "test")
.option("checkpointLocation", "./checkpoint")
.start()
query.awaitTermination()
}
}
向kafka中输入数据只能包含key以及value两种类型,可以不必同时存在,要求都是string类型
基于Kafka Sink 以Batch方式输出数据
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
object test{
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.master("local[*]")
.appName("test")
.getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("WARN")
val rdd = spark.sparkContext
.parallelize(Seq("dog cat dog dog", "flink spark spark"))
.flatMap(_.split(" "))
val wordCount = rdd
.toDF("word")
.groupBy("word")
.count()
.map(row => row.getString(0) + "," + row.getLong(1).toString)
.toDF("value")
wordCount.write
.format("kafka")
.option("kafka.bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092")
.option("topic", "test")
.save()
}
}
基于Console sink输出数据
val query = wordCount
.writeStream
.outputMode("append")
.trigger(Trigger.Continuous(0))
.format("console")
.start()
基于Memory Sink
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
object test{
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.master("local[*]")
.appName("test")
.getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("WARN")
val lines = spark
.readStream
.format("socket")
.option("host", "linux01")
.option("port", 9999)
.load()
val words = lines.as[String]
.flatMap(_.split(" "))
val wordCounts = words
.groupBy("value")
.count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("memory")
.queryName("t_memory_word_count")
.start()
while(true){
Thread.sleep(3 * 1000)
spark.sql("SELECT * FROM t_memory_word_count")
}
query.awaitTermination()
}
}
该模式只支持Append 以及Complete
Foreach Sink
import java.sql.{Connection, PreparedStatement}
import org.apache.spark.sql.{ForeachWriter, Row, SparkSession}
object Test {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.master("local[*]")
.appName("Test")
.getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("WARN")
val lines = spark
.readStream
.format("socket")
.option("host", "linux01")
.option("port", 9999)
.load()
val words = lines.as[String]
.flatMap(_.split(" "))
val wordCounts = words
.groupBy("value")
.count()
.repartition(2)
val query = wordCounts.writeStream
.outputMode("update")
.foreach(new ForeachWriter[Row] {
var conn: Connection = _
var preparedStatement: PreparedStatement = _
var batchCount = 0
override def open(partitionId: Long, epochId: Long): Boolean = {
println("打开连接")
println("partitionId:" + partitionId)
println("epochId:" + epochId)
conn = ConnectionPool.getConnection()
val sql = "INSERT INTO syllabus.t_word_count (`word`, `count`) VALUES (?, ?) ON DUPLICATE KEY UPDATE `word` = ?, `count` = ?;"
preparedStatement = conn.prepareStatement(sql)
conn != null && !conn.isClosed && conn.isValid(5 * 1000) && preparedStatement != null
}
override def process(row: Row): Unit = {
println("处理数据")
val word = row.getString(0)
val count = row.getLong(1).toString
println("word:" + word)
println("count:" + count)
preparedStatement.setString(1, word)
preparedStatement.setString(2, count)
preparedStatement.setString(3, word)
preparedStatement.setString(4, count)
preparedStatement.addBatch()
batchCount += 1
if(batchCount >= 100){
preparedStatement.executeBatch()
conn.commit()
batchCount = 0
}
}
override def close(errorOrNull: Throwable): Unit = {
println("提交数据,释放资源")
preparedStatement.executeBatch()
conn.commit()
batchCount = 0
ConnectionPool.returnConnection(conn)
}
})
.start()
query.awaitTermination()
}
}
ForeachWriter 是一个抽象类,创建实例时
open方法 初始化数据库连接器,open方法返回false时,该分区数据会被跳过,结合业务逻辑在open方法中判断是否有必要输出该分区的数据
process: 抽象方法的具体实现
close:处理数据出现异常,调用close方法,没有异常,数据处理完毕调用close方法
ForeachBatch Sink
import org.apache.spark.sql.{SaveMode, SparkSession}
object Test {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.master("local[*]")
.appName("Test")
.getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("WARN")
val lines = spark
.readStream
.format("socket")
.option("host", "linux01")
.option("port", 9999)
.load()
val words = lines.as[String]
.flatMap(_.split(" "))
val wordCounts = words
.groupBy("value")
.count()
.toDF("word", "count")
.repartition(2)
val properties = new java.util.Properties()
properties.setProperty("user", "root")
properties.setProperty("password", "123456")
val query = wordCounts.writeStream
.outputMode("complete")
.foreachBatch((ds, batchID) => {
println("BatchID:" + batchID)
if(ds.count() != 0){
ds.cache()
ds.write.json("G:\\BookData\\chapter9\\Test\\" + batchID)
ds.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://linux01:3306/syllabus", "t_word_count", properties)
ds.unpersist()
}
}).start()
query.awaitTermination()
}
}