文章目录
- 程序优化
 
- 尽可能减少和避免shuffle
 - 使用Kryo 作为序列化方案
 
- 程序中使用
 - 对比原始序列化方案与调优方案
 
- 尽可能批量操作数据
 - 合理设置分区数
 - 合理设置批处理间隔
 
- 数据优化
 
- 自定义partitioner缓解数据倾斜
 - 数据补全
 
- 资源优化
 
程序优化
尽可能减少和避免shuffle
(1) map task中,将内存中的数据以文件形式写到磁盘中
(2) reduce task 中,通过网络I/O读取map task中溢写的文件,进行聚合,由于join操作前后分区策略不一致造成shuffle,数据量较少(一般低于3G)可以使用广播变量机制在同一个stage中完成join操作
未优化前
val rddData1 = sc.parallelize(Array(("Alice", 15), ("Bob", 18), ("Thomas", 20), ("Catalina", 25)))
   val rddData2 = sc.parallelize(Array(("Alice", "Female"), ("Thomas", "Male"), ("Tom", "Male")))
   val rddData3 = rddData1.join(rddData2, 3)
   println(rddData3.collect.mkString(","))优化后,避免shuffle
val data1 = Map(("Alice", 15), ("Bob", 18), ("Thomas", 20), ("Catalina", 25))
    val rddData2 = sc.parallelize(Array(("Alice", "Female"), ("Thomas", "Male"), ("Tom", "Male")))
    val rddData1Broadcast = sc.broadcast(data1)
    val rddData3 = rddData2.map(t => {
      val data1Map = rddData1Broadcast.value
      if(data1Map.contains(t._1)){
        (t._1, (data1Map(t._1), t._2))
      }else{
        null
      }
    }).filter(_ != null)
    println(rddData3.collect().mkString(","))使用Kryo 作为序列化方案
在编程中,涉及到跨进程通信(例如节点之间数据传输),通信时传输数据必须进行序列化
程序中使用
(1)在RDD中的各类转换操作内引用外部变量必须能够序列化
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
class Person(_name: String) extends Serializable {
  def name = _name
  override def toString: String = _name
}
object Chapter10_1_2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("Chapter10_1_2")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .registerKryoClasses(Array(classOf[Person]))
    val sc = new SparkContext(conf)
    val arrPerson = new ArrayBuffer[Person]()
    for(i <- 1 to 999999){
      arrPerson += new Person("姓名" + i)
    }
    val rddData1 = sc.parallelize(arrPerson, 2)
    rddData1.persist(StorageLevel.MEMORY_ONLY_SER)
    rddData1.collect()
    Thread.sleep(3600 * 1000)
  }
}(2)自定义类型作为RDD泛型
val visitorRDD = sc.parallelize[Person](
      Array(
        ("Bob", 15),
        ("Thomas", 28),
        ("Tom", 18),
        ("Galen", 35),
        ("Catalina", 12),
        ("Karen", 9),
        ("Boris", 20)),
      3)对比原始序列化方案与调优方案
使用ObjectOutputStream,ObjectInputStream API 实现序列化与反序列化,spark默认采用这种方案,但是不高效,因此spark提供了基于Kryo方案
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
object Test{
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[2]")
      .setAppName("Test")
      .set("spark.executor.memory", "6g")
      .set("spark.driver.memory", "6g")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .registerKryoClasses(Array(classOf[Person]))
    val sc = new SparkContext(conf)
    val rdd = sc.parallelize(1 to 10)
    rdd.foreach(i => {
      //单条处理数据
    })
    rdd.foreachPartition(iterator => {
      for(i <- iterator){
        //批量处理数据
      }
    })
  }
}尽可能批量操作数据
在spark中,无论是转换操作还是输出操作,遍历RDD,DataFrame,DataSet中每个元素时,尽量先按照分区遍历,然后批量处理当前分区中数据,用来提升处理效率,例如rdd.ForeachPartition代替rdd.foreach,用rdd.mapPartition代替 rdd.map ,用ForeachBatch Sink代替 Foreach Sink
但是直接面向分区操作,一次性将分区中所有数据加载到内存中,若分区数据量较大,则GC无法回收,会出现内存溢出
合理设置分区数
在spark输出数据时,每一个分区产生一个文件,若某些分区数据不存在,会产生空文件,在适当时机使用coalesce操作将多个分区合并,有利于文件管理
合理设置批处理间隔
在spark页面查看数据的total delay ,选择合适的批处理时间
数据优化
对于数据的分布不均,到处单个节点的算力不均衡,程序长时间运行,可能会拖垮节点,降低处理效率
自定义partitioner缓解数据倾斜
import org.apache.spark.util.Utils
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
import scala.util.Random
class UserPartitioner(partitions: Int) extends Partitioner{
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
  val random = new Random()
  def numPartitions: Int = partitions
  override def getPartition(key: Any): Int = key match {
    case null => nonNegativeMod((random.nextFloat() * 100000).toInt.hashCode, numPartitions)
    case _ => nonNegativeMod((key.toString + "_" + (random.nextFloat() * 100000).toInt).hashCode, numPartitions)
  }
  def nonNegativeMod(x: Int, mod: Int): Int = {
    val rawMod = x % mod
    rawMod + (if (rawMod < 0) mod else 0)
  }
}
object Test {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("Test")
    val sc = new SparkContext(conf)
    val arr = Array(
      ("用户A", "www.baidu.com"),
      ("用户A", "www.baidu.com"),
      ("用户A", "www.baidu.com"),
      ("用户A", "www.baidu.com"),
      ("用户A", "www.baidu.com"),
      ("用户A", "www.baidu.com"),
      ("用户A", "www.baidu.com"),
      ("用户A", "www.baidu.com"),
      ("用户A", "www.baidu.com"),
      ("用户A", "www.baidu.com"),
      ("用户A", "www.baidu.com"),
      ("用户A", "www.baidu.com"),
      ("用户A", "www.baidu.com"),
      ("用户A", "www.baidu.com"),
      ("用户B", "www.baidu.com"),
      ("用户B", "www.baidu.com")
    )
    val rddData = sc
      .parallelize(arr)
      .partitionBy(new UserPartitioner(2))
    val result = rddData.map(t => {
      (t._1, "http://" + t._2)
    })
    println(result.collect.mkString(","))
    Thread.sleep(3600 * 1000)
  }
}数据补全
1.缓存维度数据在spark中(维度数据较少情况)
2.数据补全.先将数据补全,避免频繁和外部系统交互
资源优化
spark.driver.cores: 设置Driver进程数,只在cluster 模式下有效
spark.driver.memory: 设置Driver进程内存,建议通过脚本提交 --driver-memory方式配置










