0
点赞
收藏
分享

微信扫一扫

RDD的特性 ---- RDD的缓存


RDD的特性二 : RDD的缓存

一、RDD缓存的意义

首先让我们来看一个小案例

查看数据集

RDD的特性  ----  RDD的缓存_scala

需求:

  1. 统计访问最多的IP,以及最少的IP

步骤

  1. 创建sc
  2. 读取文件获取数据集
  3. 取出IP相关信息
  4. 简单清洗数据
  5. 统计IP出现的系数
  6. 统计出现系数最少的IP
  7. 统计出现次数最多的IP

代码

package SparkRDD.RDD的缓存

import org.apache.commons.lang3.StringUtils
import org.apache.spark.{SparkConf, SparkContext}

object IP {

def main(args: Array[String]): Unit = {
// 1. 创建sc
val conf = new SparkConf().setAppName("ip").setMaster("local[6]")
val sc = new SparkContext(conf)

// 2. 读取文件获取数据集
val data = sc.textFile("dataset/Dataset-Unicauca-Version2-87Atts.csv")
//data.take(5).foreach(println(_))

// 3. 取出IP相关信息
val ip = data.map(item => (item.split(",")(1),1) )
//ip.take(5).foreach(println(_))

// 4. 简单清洗数据
val clean = ip.filter( item => StringUtils.isNotEmpty(item._1)) // 导入lang3的包
//clean.take(20).foreach(println(_))

// 5. 统计IP出现的系数
val reduce = clean.reduceByKey( (curr,agg )=> curr+agg )
reduce.take(20).foreach(println(_))

// 6. 统计出现系数最多的IP
val max = reduce.sortBy(item => item._2,ascending = false).first()
println(max) // (10.200.7.218,295431)

// 7. 统计出现次数最少的IP
val min = reduce.sortBy(item => item._2,ascending = true).first()
println(min) // (65.52.108.190,1)
}
}

RDD的特性  ----  RDD的缓存_scala_03


在以上的案例中我们对数据集中的IP相关信息进行了提取,并且操作分析。在Spark-core中转换算子的作用是生成新的RDD,以及RDD之间的依赖关系(逻辑执行计划),行动算子的作用是生成job,有后续的执行端去执行物理计划,每一次行动算子的执行都会重新执行一次所有转换操作。在统计IP出现次数最多和最少的时候,我们进行了两次first的action操作,而在每一次的action操作中,又包含有两次shuffle操作(reducebykey、sortby),所以共会执行4次shuffle操作,而shuffle操作会在集群内对数据进行大量的拷贝,这样一来对于内存压力就会很大,例如上面的数据集下载下来有1G多,当然这样的数据量相对还是很少的,假如说是按照T级来运行的数据集,那就会有些慢了。这时就需要用到RDD的缓存,缓存的意义就在于在执行多个job的时候将转换操作(逻辑计划)缓存下来,直接使用,不需要重复的进行计算。

补充:

RDD的特性  ----  RDD的缓存_缓存_04

二、RDD缓存的API

1. cache()

RDD的特性  ----  RDD的缓存_缓存_05

def test: Unit ={

val conf = new SparkConf().setAppName("ip统计").setMaster("local[6]")
val sc = new SparkContext(conf)

val resource = sc.textFile("src/main/scala/SparkRDD/RDD的缓存/ip.txt")
print(resource.take(5))
val ipRDD = resource.map( item => ( item.split(",")(0) , 1) )
val cleanRDD = ipRDD.filter( item => StringUtils.isNotEmpty(item._1) )
var aggRDD = cleanRDD.reduceByKey( (curr,agg) => curr + agg )

// 调用cache方法将Transformation操作缓存
aggRDD = aggRDD.cache()

val maxRDD = aggRDD.sortBy( item => item._2,ascending = true ).first() // Action操作1
val minRDD = aggRDD.sortBy( item => item._2,ascending = false ).first() // Action操作2
println("max:"+maxRDD,"min:"+minRDD)

/** 使用缓存
* ResultStage 9 (first at cacheTest.scala:25) finished in 0.006 s
* Job 3 finished: first at cacheTest.scala:25, took 0.032388 s
*/

/** 不使用缓存
* ResultStage 9 (first at test.scala:31) finished in 0.006 s
* Job 3 finished: first at test.scala:31, took 0.048808 s
*/
}

由上面案例可以看出使用了cache()方法进行缓存之后,代码的执行时间会有所缩短,当数据量十分庞大的时候就会十分有效。

2. persist()

RDD的特性  ----  RDD的缓存_scala_06

@Test
def test: Unit ={

val conf = new SparkConf().setAppName("ip统计").setMaster("local[6]")
val sc = new SparkContext(conf)

val resource = sc.textFile("src/main/scala/RDD的缓存/ip.txt")
val ipRDD = resource.map( item => ( item.split(",")(0) , 1) )
val cleanRDD = ipRDD.filter( item => StringUtils.isNotEmpty(item._1) )
var aggRDD = cleanRDD.reduceByKey( (curr,agg) => curr + agg )

// 调用 persist方法 将Transformation操作缓存,并设置缓存级别
aggRDD = aggRDD.persist(StorageLevel.MEMORY_ONLY) // 默认
println(aggRDD.getStorageLevel) // 获取当前缓存级别

/**
* Total input paths to process : 1
* StorageLevel(memory, deserialized, 1 replicas)
*/

val maxRDD = aggRDD.sortBy( item => item._2,ascending = true ).first() // Action操作1
val minRDD = aggRDD.sortBy( item => item._2,ascending = false ).first() // Action操作2
println("max:"+maxRDD,"min:"+minRDD)
}

三、存储的级别:

1.使用缓存的作用

 是否缓存在磁盘上?
稳定!

是否缓存使用内存上?
提高效率!

是否缓存使用堆外内存?
在Java管辖之外的,不安全,由spark管理

是否在缓存前序列化?
取决于数据量

是否需要有副本?
将数据分发给多个worker,提供多个副本

缓存级别就是spark内设的参数属性,每一个缓存级别对应了上面5个内容

底层API:
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
}

* 每个缓存级别就是一个StorageLevel对象

2.缓存级别的设置与查看

// 调用 persist方法 将Transformation操作缓存,并设置缓存级别
aggRDD = aggRDD.persist(StorageLevel.MEMORY_ONLY) // 默认
println(aggRDD.getStorageLevel) // 获取当前缓存级别

/**
* Total input paths to process : 1
* StorageLevel(memory, deserialized, 1 replicas)
*/

3.缓存级别

源码:

RDD的特性  ----  RDD的缓存_缓存_07


RDD的特性  ----  RDD的缓存_缓存_08


补充:

RDD的特性  ----  RDD的缓存_scala_09


举报

相关推荐

0 条评论