一、问题引出
 
object TestRDDPersist {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("persist")
    	val sc = new SparkContext(conf)
        
        val rdd = sc.makeRDD(List(
        	"hello world", "hello spark"
        ))
        
        val flatRdd = rdd.flatMap(_.split(" "))
        
        val mapRdd = flatRdd.map(word => {
            println("@@@@@@@@@@")
            (word, 1)
        })
        
        
        val reduceRdd = mapRdd.reduceByKey(_ + _)
        reduceRdd.collect().foreach(println)
        
        println("**********")
        
        
        val groupRdd = mapRdd.groupByKey()
        groupRdd.collect().foreach(println)
    }
}
 
二、RDD Cache
 
object TestRDDPersist {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("persist")
    	val sc = new SparkContext(conf)
        
        val rdd = sc.makeRDD(List(
        	"hello world", "hello spark"
        ))
        
        val flatRdd = rdd.flatMap(_.split(" "))
        
        val mapRdd = flatRdd.map(word => {
            println("@@@@@@@@@@")
            (word, 1)
        })
        
        
        mapRdd.persist()
        
        
        val reduceRdd = mapRdd.reduceByKey(_ + _)
        reduceRdd.collect().foreach(println)
        
        println("**********")
        
        
        val groupRdd = mapRdd.groupByKey()
        groupRdd.collect().foreach(println)
        
        
    }
}
object StorageLevel {
    val NONE = new StorageLevel(false, false, false, false)
    val DISK_ONLY = new StorageLevel(true, false, false, false)
    val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) 
    val MEMORY_ONLY = new StorageLevel(false, true, false, true) 
    val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
    val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
    val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
    val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) 
    val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
    val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
    val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
    val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
}
 
三、RDD CheckPoint
 
object TestRDDPersist {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("persist")
    	val sc = new SparkContext(conf)
        
        
        sc.setCheckpointDir("checkpoint")
        
        val rdd = sc.makeRDD(List(
        	"hello world", "hello spark"
        ))
        
        val flatRdd = rdd.flatMap(_.split(" "))
        
        val mapRdd = flatRdd.map(word => {
            println("@@@@@@@@@@")
            (word, 1)
        })
        
        mapRdd.checkpoint()
        
        
        val reduceRdd = mapRdd.reduceByKey(_ + _)
        reduceRdd.collect().foreach(println)
        
        println("**********")
        
        
        val groupRdd = mapRdd.groupByKey()
        groupRdd.collect().foreach(println)
        
        
    }
}
 
四、缓存和检查点区别
 
- cache 和 persist 会在原有的血缘关系中添加新的依赖,一旦数据出错可以重头读取数据;checkpoint 检查点会切断原有的血缘关系,重新建立新的血缘关系,相当于改变数据源
- cache 是将数据临时存储在 JVM 堆内存中,性能较高,但安全性低,persist 可以指定存储级别,将数据临时存储在磁盘文件中,涉及到 IO,性能较低,作业执行完毕后临时文件会被删除;checkpoint 是将数据长久地存储分布式文件系统中,安全性较高,但涉及 IO 且会独立开启一个作业从数据源开始获取数据,所以性能较低,一般在 checkpoint 前先进行 cache,当 checkpoint 时 job 只需从缓存中读取数据即可,可以提高性能