0
点赞
收藏
分享

微信扫一扫

spark大数据分析:sparkStrreaming(21)程序关闭

小a草 2022-01-31 阅读 75



文章目录


关闭程序方法有4种


1.kill杀死,可能数据丢失


2,通过钩子(hook)关闭,需要写shell脚本,麻烦


3.在程序中建立http服务,接受外部消息在程序中关闭,代码较多


4.用hdfs中目录做标记,定期检查hdfs目录是否存在,存在关闭程序,简单方便

依赖

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.4</version>
</dependency>
package stream

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.reflect.io.Path

object Test08 {
var hadoopConf: Configuration = _
val shutdownMarkerPath = new Path("hdfs://linux01:8020/user/admin/tmp/spark_shutdown_marker")
var stopMarker: Boolean = false
def main(args: Array[String]) {
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("Chapter8_10_2")
// .set("spark.streaming.stopGracefullyOnShutdown", "true")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("WARN")

hadoopConf = ssc.sparkContext.hadoopConfiguration
hadoopConf.set("fs.defaultFS", "hdfs://linux01:8020")

val lines = ssc.socketTextStream("linux01", 9999)
val words = lines.flatMap(_.split(" "))
val wordMap = words.map(x => (x, 1))
val wordCounts = wordMap.reduceByKey(_ + _)
wordCounts.print()

ssc.start()
val checkIntervalMillis = 2 * 1000 * 1
var isStopped = false

while (!isStopped) {
println("正在确认关闭状态: ")
isStopped = ssc.awaitTerminationOrTimeout(checkIntervalMillis)
if (isStopped)
println("Spark Streaming Chapter8_10已关闭.")
else
println("Spark Streaming Chapter8_10运行中...")
checkShutdownMarker
if (!isStopped && stopMarker) {
println("准备关闭Spark Streaming")
ssc.stop(true, true)
}
}
}

def checkShutdownMarker = {
if (!stopMarker) {
val fs = FileSystem.get(hadoopConf)
stopMarker = fs.exists(shutdownMarkerPath)
}
}
}

awaitTermination方法会阻塞Driver主线程,使代码无法下行,这里使用awaitTerminationOrTimeout



举报

相关推荐

0 条评论