0
点赞
收藏
分享

微信扫一扫

spark大数据分析:sparkStrreaming(20)广播变量与累加器

時小白 2022-02-15 阅读 70



文章目录


对于广播变量与累加器在程序中断后无法从CheckPoint中恢复数据,程序中断会导致数据丢失


自定义累加器实现全实时wordCount

package stream

import org.apache.spark.SparkContext
import org.apache.spark.util.AccumulatorV2

import scala.collection.mutable

/**
* 实现单例用于恢复累加器
* 线程安全的单例模式对累加器进行初始化
*/
object WordCountAccumulator {
@volatile private var instance: WordCountAccumulator[(String, Int)] = null

def getInstance(sc: SparkContext): WordCountAccumulator[(String, Int)] = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = new WordCountAccumulator[(String, Int)]()
}
}
}
if(!instance.isRegistered) sc.register(instance)
instance
}
}
/**
* T 累加器接受的数据类型
* mutable.Map[String, Int] 返回的数据类型
*
* @tparam T
*/
class WordCountAccumulator[T] private extends AccumulatorV2[T, mutable.Map[String, Int]] {
private val _map: mutable.Map[String, Int] = mutable.Map()

//累加器的初始状态
override def isZero: Boolean = _map.isEmpty

//将累加器以及内部数据复制到新的累加器中,返回新的累加器
override def copy(): AccumulatorV2[T, mutable.Map[String, Int]] = {
val newAcc = new WordCountAccumulator[T]
for (t <- _map) {
newAcc._map += t
}
newAcc
}

//重置累加器的数据
override def reset(): Unit = {
_map.clear()
}

//在累加器中累加单词个数
override def add(v: T): Unit = {
val wordMapping = v.asInstanceOf[(String, Int)]
_map += Tuple2(wordMapping._1, wordMapping._2 + _map.getOrElse(wordMapping._1, 0))
}

/**
* 多分区中累加器个数累加到一个里面
*
* @param other
*/
override def merge(other: AccumulatorV2[T, mutable.Map[String, Int]]): Unit = {
val o = other.asInstanceOf[WordCountAccumulator[T]]
for ((x, y) <- _map) {
_map += Tuple2(x, y + o._map.getOrElse(x, 0))
o._map.remove(x)
}

for (t <- o._map) {
_map += t
}
}

//返回累加器中值
override def value: mutable.Map[String, Int] = {
_map
}
}
package stream

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Test05 {
val checkpointPath = "./checkpoint_Chapter8_9_2"

def createContext(host: String, port: Int, checkpointDirectory: String): StreamingContext = {
println("创建新的Context")
val conf = new SparkConf().setMaster("local[*]").setAppName("Chapter8_9_2")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("WARN")
ssc.checkpoint(checkpointDirectory)

val lines = ssc.socketTextStream(host, port)
val words = lines.flatMap(_.split(" "))
val wordMap = words.map(x => (x, 1))
val wordCounts = wordMap.reduceByKey(_ + _)

wordCounts.checkpoint(Seconds(5 * 10))
wordCounts.foreachRDD(rdd => {
val wordCountAccumulator = WordCountAccumulator.getInstance(rdd.sparkContext)
rdd.foreachPartition(p => {
p.foreach(t => {
wordCountAccumulator.add(t)
})
})
wordCountAccumulator.value.foreach(println(_))
})

ssc
}

def main(args: Array[String]): Unit = {
val ssc = StreamingContext.getOrCreate(
checkpointPath,
() => createContext("linux01", 9999, checkpointPath))
ssc.start()
ssc.awaitTermination()
}
}



举报

相关推荐

0 条评论