0
点赞
收藏
分享

微信扫一扫

spark大数据分析:spark core(7) RDD 的检查点CheckPoint

zhongjh 2022-02-10 阅读 79



文章目录


  • ​​原理​​
  • ​​CheckPoint与缓存区别​​
  • ​​工作机制​​
  • ​​初始化阶段​​
  • ​​处理阶段​​
  • ​​完成阶段​​


原理

由于单台机器资源原因,一些RDD缓存内部不现实,需要借助外部机器共同承担资源问题引入了checkPoint

CheckPoint与缓存区别

缓存不会切断RDD的依赖链,如果持久化缓存失效,依赖链重新计算恢复RDD中数据

checkPoint是将RDD存储本地磁盘或HDFS可以通过直接读取检查点恢复对应RDD

RDD调用checkPoint后 checkPointRDD会成为下游RDD的上游依赖

import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

object TestSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("Chapter5_2_1_8")
val sc = new SparkContext(conf)
sc.setCheckpointDir("./checkPint")

val rddData1 = sc.parallelize(Array(5, 5, 15, 15), 2)
val r2 = rddData1.map(_ + 10)
r2.checkpoint()
r2.collect()

sc.stop()
}
}

必须提前指定 sc.setCheckpointDir("./checkPint") 否则报错

Exception in thread "main" org.apache.spark.SparkException: Checkpoint directory has not been set in the SparkContext
at org.apache.spark.rdd.RDD.checkpoint(RDD.scala:1548)
at TestSpark$.main(WriteSpark.scala:13)
at TestSpark.main(WriteSpark.scala)

工作机制

初始化阶段

设置checkPoint存储路径,实例化RDDCheckPointData对象,并对RDD状态进行标记,标记为已初始化

处理阶段

对于每一个job执行结束,job依赖链最后一个RDD会调用doCheckPoint方法,通过递归方式从依赖链最顶层RDD向下遍历,如果当前RDD需要进行checkPoint,则将其状态由初始化转换为执行中

完成阶段

将RDD以序列化方式保存到检查点目录中,并设置其状态为已完成,最后执行clearDependencies清理该RDD上游依赖



举报

相关推荐

0 条评论