文章目录
- RDD 清理策略
- 缓存RDD方法
本质: 将反复用到的数据存储到内存或其他存储介质中
好处: 1. 容错,2.对于多次使用的RDD.缓存提高效率
persist 与cache 本质上cache 是 persist 的简略版本
RDD 清理策略
1.4版本之前通过ttl设置过期时间,但是对于一直运行的spark程序是不合理的,1.4之后 升级ContextCleaner 功能
源码
_cleaner =
if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
Some(new ContextCleaner(this))
} else {
None
}
当开发者有意使用缓存RDD ,会对shuffle 数据清理,广播数据清理,累加器数据清理,检查点数据清理
缓存RDD方法
对于使用persist 和cache的,没有执行action 之前都只是暂缓存状态,cache本质是调用persist 的 StorageLevel.MEMORY_ONLY
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def cache(): this.type = persist()
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
persist
/**
* Various [[org.apache.spark.storage.StorageLevel]] defined and utility functions for creating
* new storage levels.
*/
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
/**
* :: DeveloperApi ::
* Return the StorageLevel object with the specified name.
*/
@DeveloperApi
def fromString(s: String): StorageLevel = s match {
case "NONE" => NONE
case "DISK_ONLY" => DISK_ONLY
case "DISK_ONLY_2" => DISK_ONLY_2
case "MEMORY_ONLY" => MEMORY_ONLY
case "MEMORY_ONLY_2" => MEMORY_ONLY_2
case "MEMORY_ONLY_SER" => MEMORY_ONLY_SER
case "MEMORY_ONLY_SER_2" => MEMORY_ONLY_SER_2
case "MEMORY_AND_DISK" => MEMORY_AND_DISK
case "MEMORY_AND_DISK_2" => MEMORY_AND_DISK_2
case "MEMORY_AND_DISK_SER" => MEMORY_AND_DISK_SER
case "MEMORY_AND_DISK_SER_2" => MEMORY_AND_DISK_SER_2
case "OFF_HEAP" => OFF_HEAP
case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: $s")
}
case "OFF_HEAP" => OFF_HEAP 为缓存到内存文件系统中进行共享
如果使用缓存,在spark页面 进入storage 中 storage level可以看见
对于处理结束可以调用 unpersist 释放缓存