0
点赞
收藏
分享

微信扫一扫

RDD的特性 ---- RDD的checkpoint

郝春妮 2022-08-12 阅读 98


RDD的特性 ---- RDD的checkpoint

一、Checkpoint的作用

Checkpooint的主要作用是斩断Rdd的依赖链,并将数据存储在引擎中,例如分布式存储和副本机制的HDFS中。本质上Checkpoint也是一种缓存机制。

1.Checkpoint的方式

RDD的特性  ----  RDD的checkpoint_赋值

2.什么是斩断依赖链

RDD的特性  ----  RDD的checkpoint_hdfs_02


RDD的特性  ----  RDD的checkpoint_spark_03

3.Checkpoint和cache的区别

RDD的特性  ----  RDD的checkpoint_hdfs_04


注意:

最本质的区别在于Checkpoint将RDD计算出来存储在可靠的HDFS上,并且可以斩断依赖链,若出错可以直接通过赋值HDFS中的文件实现容错。
Cache是将RDD存放至内存中,并且未斩断依赖链,若出错只能通过依赖链回溯上一级RDD,重放计算。

二、Checkpoint的使用

@Test
def test: Unit ={

val conf = new SparkConf().setAppName("ip统计").setMaster("local[6]")
val sc = new SparkContext(conf)

// 设置保存 checkPoint 的目录
sc.setCheckpointDir("checkpoint")

val resource = sc.textFile("src/main/scala/RDD的缓存/ip.txt")
val ipRDD = resource.map( item => ( item.split(",")(0) , 1) )
val cleanRDD = ipRDD.filter( item => StringUtils.isNotEmpty(item._1) )
var aggRDD = cleanRDD.reduceByKey( (curr,agg) => curr + agg )

// checkPoint
// aggRDD = aggRDD.cache()
// 不准确的说 checkPoint 是一个Action操作,
// 也就是说调用了 checkPoint 方法后会重新计算一遍RDD的值,然后把结果存在HDFS或本地目录中
// 所以因该在 checkPoint 之前进行一次 cache()
aggRDD = aggRDD.cache()
aggRDD.checkpoint()

val maxRDD = aggRDD.sortBy( item => item._2,ascending = true ).first() // Action操作1
val minRDD = aggRDD.sortBy( item => item._2,ascending = false ).first() // Action操作2
println("max:"+maxRDD,"min:"+minRDD)

}

RDD的特性  ----  RDD的checkpoint_赋值_05

不准确的说 checkPoint 是一个Action操作,也就是说调用了 checkPoint 方法后会重新计算一遍RDD的值,然后把结果存在HDFS或本地目录中,这个过程中就又涉及到资源内存消耗的问题。所以因该在 checkPoint 之前进行一次 cache(),存储的是最后一次转换操作的结果,若后面计算有误,直接去从存储路径的文件中查找文件重新计算。


举报

相关推荐

0 条评论