代码调优
- spark调优
- 对多次使用的RDD进行持久化
- 对多次使用的RDD进行持久化
- 使用高性能的算子
- 1. 使用reduceByKey/aggregateByKey替代groupByKey
- 2. 使用mapPartitions替代普通map Transformation算子
- 3. 使用foreachPartitions替代foreach Action算子
- 4.使用filter之后进行coalesce操作
- 5.使用repartitionAndSortWithinPartitions替代repartition与sort类操作代码
- 6.repartition:coalesce(numPartitions,true) 增多分区使用这个
- 7.coalesce(numPartitions,false) 减少分区 没有shuffle只是合并 partition
- 广播大变量
- 使用Kryo优化序列化性能
- 优化数据结构
- 使用高性能的库fastutil
spark调优
- 避免创建重复的RDD
- 尽可能复用同一个RDD
- 对多次使用的RDD进行持久化
- 尽量避免使用shuffle类的算子
- 使用map-side预聚合的shuffle操作
- 使用高性能的算子
- 广播大变量,如果变量很小,不广播也可以
- 使用Kryo优化序列化性能
- 优化数据结构
- 使用高性能的库fastutil
对多次使用的RDD进行持久化
如何选择一种最合适的持久化策略
1. 默认情况下,性能最高的当然是MEMORY_ONLY,但前提是你的内存必须足够足够大, 可以绰绰有余地存放下整个RDD的所有数据。因为==不进行序列化与反序列化==操作,就避免了这部分的性能开销;对这个RDD的后续算子操作,都是基于纯内存中的数据的操作 ,不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传送到其他节点上。但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种 策略的场景还是有限的,如果RDD中数据比较多时(比如几十亿),直接用这种持久化级别,会导致JVM的OOM内存溢出异常。
2. 如果使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用 MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中,此时每个 partition仅仅是一个字节数组而已,大大减少了对象数量,并降低了内存占用。这种级别比MEMORY_ONLY多出来的性能开销,主要就是序列化与反序列化的开销。但是后续算 子可以基于纯内存进行操作,因此性能总体还是比较高的。此外,可能发生的问题同上, 如果RDD中的数据量过多的话,还是可能会导致OOM内存溢出的异常。
对多次使用的RDD进行持久化
如何选择一种最合适的持久化策略
-
如果纯内存的级别都无法使用,那么建议使用MEMORY_AND_DISK_SER策略,而不是 MEMORY_AND_DISK策略。因为既然到了这一步,就说明RDD的数据量很大,内存无 法完全放下。序列化后的数据比较少,可以节省内存和磁盘的空间开销。同时该策略会优 先尽量尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。
-
通常不建议使用DISK_ONLY和后缀为_2的级别:因为完全基于磁盘文件进行数据的读写 ,会导致性能急剧降低,有时还不如重新计算一次所有RDD。后缀为_2的级别,必须将 所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性 能开销,除非是要求作业的高可用性,否则不建议使用。
使用高性能的算子
1. 使用reduceByKey/aggregateByKey替代groupByKey
reduceByKey/aggregateByKey替代groupByKey计算同一组内最大值以及平均值
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
/**
* @author 郭帅帅
* @2022-01-08-21:25
*
*/
object Demo5TestKyro {
/**
* 使用kryo序列化方式代替默认序列化方式(objectOutPutStream/objectInPutStream)
* 性能提高10倍
*
*
* spark 三个地方涉及到序列化
*
* 1、算子里面用到可外部变量
* 2、RDD 类型为自定义类型,同时使用checkpoint 或者 使用shuffle类算子的时候会产生序列化
* 3、 cache SER
*/
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("app")
//序列化方式
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
//指定注册序列化的类,自定义
.set("spark.kryo.registrator", "com.shujia.optimize.MyRegisterKryo")
val sc: SparkContext = new SparkContext(conf)
sc.setCheckpointDir("Spark/data/stu/checkpoint")
val data: RDD[String] = sc.textFile("Spark/data/stu/students.txt")
/**
* 自定义对象比字符串赵勇内存更多
* 因为自定义对象由对象头信息
*
*/
var stuRDD: RDD[Student] = data
.map(_.split(","))
.map(line => Student(line(0), line(1), line(2).toInt, line(3), line(4)))
///checkpoint 产生序列化
stuRDD.checkpoint()
//shuffle 类算子产生序列化
stuRDD.map(s => (s.id, s)).groupByKey().foreach(println)
// //对RDD 持久化会产生序列化
// stuRDD = stuRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)
//
//
// stuRDD.foreach(println)
while (true) {
}
}
case class Student(id: String, name: String, age: Int, gender: String, clazz: String)
}
-- MyRegisterKryo
import com.twitter.chill.Kryo
import optimize.Demo5TestKyro.Student
import org.apache.spark.serializer.KryoRegistrator
class MyRegisterKryo {
class MyRegisterKryo extends KryoRegistrator {
//注册类
override def registerClasses(kryo: Kryo): Unit = {
//注册Student类
//注册之后student类序列化的时候就会使用kryo
//classOf 获取类对象
kryo.register(classOf[Student])
kryo.register(classOf[Int])
kryo.register(classOf[String])
//可以同时注册多个
// kryo.register()
}
}
优化数据结构
Java中,有三种类型比较耗费内存:
-
对象,每个Java对象都有对象头、引用等额外的信息,因此比较占用内存空间。
-
字符串,每个字符串内部都有一个字符数组以及长度等额外信息。
-
集合类型,比如HashMap、LinkedList等,因为集合类型内部通常会使用一些内部类来 封装集合元素,比如Map.Entry。
-
因此Spark官方建议,在Spark编码实现中,特别是对于算子函数中的代码,尽量不要使用上述三种数据结构,尽量使用字符串替代对象,使用原始类型(比如 Int、Long)替代字符串,使用数组替代集合类型,这样尽可能地减少内存占用 ,从而降低GC频率,提升性能。
使用高性能的库fastutil
fastutil介绍:
- fastutil是扩展了Java标准集合框架(Map、List、Set;HashMap、ArrayList、 HashSet)的类库,提供了特殊类型的map、set、list和queue;
- fastutil能够提供更小的内存占用,更快的存取速度;我们使用fastutil提供的集合类,来 替代自己平时使用的JDK的原生的Map、List、Set,好处在于,fastutil集合类,可以减 小内存的占用,并且在进行集合的遍历、根据索引(或者key)获取元素的值和设置元素 的值的时候,提供更快的存取速度;
- fastutil最新版本要求Java 7以及以上版本;
- fastutil的每一种集合类型,都实现了对应的Java中的标准接口(比如fastutil的map,实 现了Java的Map接口),因此可以直接放入已有系统的任何代码中。
- fastutil的每一种集合类型,都实现了对应的Java中的标准接口(比如fastutil的 map,实现了Java的Map接口),因此可以直接放入已有系统的任何代码中。
使用?