文章目录
- 原理
- CheckPoint与缓存区别
- 工作机制
- 初始化阶段
- 处理阶段
- 完成阶段
原理
由于单台机器资源原因,一些RDD缓存内部不现实,需要借助外部机器共同承担资源问题引入了checkPoint
CheckPoint与缓存区别
缓存不会切断RDD的依赖链,如果持久化缓存失效,依赖链重新计算恢复RDD中数据
checkPoint是将RDD存储本地磁盘或HDFS可以通过直接读取检查点恢复对应RDD
RDD调用checkPoint后 checkPointRDD会成为下游RDD的上游依赖
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
object TestSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("Chapter5_2_1_8")
val sc = new SparkContext(conf)
sc.setCheckpointDir("./checkPint")
val rddData1 = sc.parallelize(Array(5, 5, 15, 15), 2)
val r2 = rddData1.map(_ + 10)
r2.checkpoint()
r2.collect()
sc.stop()
}
}
必须提前指定 sc.setCheckpointDir("./checkPint") 否则报错
Exception in thread "main" org.apache.spark.SparkException: Checkpoint directory has not been set in the SparkContext
at org.apache.spark.rdd.RDD.checkpoint(RDD.scala:1548)
at TestSpark$.main(WriteSpark.scala:13)
at TestSpark.main(WriteSpark.scala)
工作机制
初始化阶段
设置checkPoint存储路径,实例化RDDCheckPointData对象,并对RDD状态进行标记,标记为已初始化
处理阶段
对于每一个job执行结束,job依赖链最后一个RDD会调用doCheckPoint方法,通过递归方式从依赖链最顶层RDD向下遍历,如果当前RDD需要进行checkPoint,则将其状态由初始化转换为执行中
完成阶段
将RDD以序列化方式保存到检查点目录中,并设置其状态为已完成,最后执行clearDependencies清理该RDD上游依赖