RDD的特性二 : RDD的缓存
一、RDD缓存的意义
首先让我们来看一个小案例
查看数据集
需求:
- 统计访问最多的IP,以及最少的IP
步骤
- 创建sc
- 读取文件获取数据集
- 取出IP相关信息
- 简单清洗数据
- 统计IP出现的系数
- 统计出现系数最少的IP
- 统计出现次数最多的IP
代码
package SparkRDD.RDD的缓存
import org.apache.commons.lang3.StringUtils
import org.apache.spark.{SparkConf, SparkContext}
object IP {
def main(args: Array[String]): Unit = {
// 1. 创建sc
val conf = new SparkConf().setAppName("ip").setMaster("local[6]")
val sc = new SparkContext(conf)
// 2. 读取文件获取数据集
val data = sc.textFile("dataset/Dataset-Unicauca-Version2-87Atts.csv")
//data.take(5).foreach(println(_))
// 3. 取出IP相关信息
val ip = data.map(item => (item.split(",")(1),1) )
//ip.take(5).foreach(println(_))
// 4. 简单清洗数据
val clean = ip.filter( item => StringUtils.isNotEmpty(item._1)) // 导入lang3的包
//clean.take(20).foreach(println(_))
// 5. 统计IP出现的系数
val reduce = clean.reduceByKey( (curr,agg )=> curr+agg )
reduce.take(20).foreach(println(_))
// 6. 统计出现系数最多的IP
val max = reduce.sortBy(item => item._2,ascending = false).first()
println(max) // (10.200.7.218,295431)
// 7. 统计出现次数最少的IP
val min = reduce.sortBy(item => item._2,ascending = true).first()
println(min) // (65.52.108.190,1)
}
}
在以上的案例中我们对数据集中的IP相关信息进行了提取,并且操作分析。在Spark-core中转换算子的作用是生成新的RDD,以及RDD之间的依赖关系(逻辑执行计划),行动算子的作用是生成job,有后续的执行端去执行物理计划,每一次行动算子的执行都会重新执行一次所有转换操作。在统计IP出现次数最多和最少的时候,我们进行了两次first的action操作,而在每一次的action操作中,又包含有两次shuffle操作(reducebykey、sortby),所以共会执行4次shuffle操作,而shuffle操作会在集群内对数据进行大量的拷贝,这样一来对于内存压力就会很大,例如上面的数据集下载下来有1G多,当然这样的数据量相对还是很少的,假如说是按照T级来运行的数据集,那就会有些慢了。这时就需要用到RDD的缓存,缓存的意义就在于在执行多个job的时候将转换操作(逻辑计划)缓存下来,直接使用,不需要重复的进行计算。
补充:
二、RDD缓存的API
1. cache()
def test: Unit ={
val conf = new SparkConf().setAppName("ip统计").setMaster("local[6]")
val sc = new SparkContext(conf)
val resource = sc.textFile("src/main/scala/SparkRDD/RDD的缓存/ip.txt")
print(resource.take(5))
val ipRDD = resource.map( item => ( item.split(",")(0) , 1) )
val cleanRDD = ipRDD.filter( item => StringUtils.isNotEmpty(item._1) )
var aggRDD = cleanRDD.reduceByKey( (curr,agg) => curr + agg )
// 调用cache方法将Transformation操作缓存
aggRDD = aggRDD.cache()
val maxRDD = aggRDD.sortBy( item => item._2,ascending = true ).first() // Action操作1
val minRDD = aggRDD.sortBy( item => item._2,ascending = false ).first() // Action操作2
println("max:"+maxRDD,"min:"+minRDD)
/** 使用缓存
* ResultStage 9 (first at cacheTest.scala:25) finished in 0.006 s
* Job 3 finished: first at cacheTest.scala:25, took 0.032388 s
*/
/** 不使用缓存
* ResultStage 9 (first at test.scala:31) finished in 0.006 s
* Job 3 finished: first at test.scala:31, took 0.048808 s
*/
}
由上面案例可以看出使用了cache()方法进行缓存之后,代码的执行时间会有所缩短,当数据量十分庞大的时候就会十分有效。
2. persist()
@Test
def test: Unit ={
val conf = new SparkConf().setAppName("ip统计").setMaster("local[6]")
val sc = new SparkContext(conf)
val resource = sc.textFile("src/main/scala/RDD的缓存/ip.txt")
val ipRDD = resource.map( item => ( item.split(",")(0) , 1) )
val cleanRDD = ipRDD.filter( item => StringUtils.isNotEmpty(item._1) )
var aggRDD = cleanRDD.reduceByKey( (curr,agg) => curr + agg )
// 调用 persist方法 将Transformation操作缓存,并设置缓存级别
aggRDD = aggRDD.persist(StorageLevel.MEMORY_ONLY) // 默认
println(aggRDD.getStorageLevel) // 获取当前缓存级别
/**
* Total input paths to process : 1
* StorageLevel(memory, deserialized, 1 replicas)
*/
val maxRDD = aggRDD.sortBy( item => item._2,ascending = true ).first() // Action操作1
val minRDD = aggRDD.sortBy( item => item._2,ascending = false ).first() // Action操作2
println("max:"+maxRDD,"min:"+minRDD)
}
三、存储的级别:
1.使用缓存的作用
是否缓存在磁盘上?
稳定!
是否缓存使用内存上?
提高效率!
是否缓存使用堆外内存?
在Java管辖之外的,不安全,由spark管理
是否在缓存前序列化?
取决于数据量
是否需要有副本?
将数据分发给多个worker,提供多个副本
缓存级别就是spark内设的参数属性,每一个缓存级别对应了上面5个内容
底层API:
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
}
* 每个缓存级别就是一个StorageLevel对象
2.缓存级别的设置与查看
// 调用 persist方法 将Transformation操作缓存,并设置缓存级别
aggRDD = aggRDD.persist(StorageLevel.MEMORY_ONLY) // 默认
println(aggRDD.getStorageLevel) // 获取当前缓存级别
/**
* Total input paths to process : 1
* StorageLevel(memory, deserialized, 1 replicas)
*/
3.缓存级别
源码:
补充: