0
点赞
收藏
分享

微信扫一扫

spark大数据分析:spark Struct Strreaming(30)程序优化

鱼板番茄 2022-01-31 阅读 63



文章目录


  • ​​程序优化​​

  • ​​尽可能减少和避免shuffle​​
  • ​​使用Kryo 作为序列化方案​​

  • ​​程序中使用​​
  • ​​对比原始序列化方案与调优方案​​

  • ​​尽可能批量操作数据​​
  • ​​合理设置分区数​​
  • ​​合理设置批处理间隔​​

  • ​​数据优化​​

  • ​​自定义partitioner缓解数据倾斜​​
  • ​​数据补全​​

  • ​​资源优化​​


程序优化

尽可能减少和避免shuffle

(1) map task中,将内存中的数据以文件形式写到磁盘中

(2) reduce task 中,通过网络I/O读取map task中溢写的文件,进行聚合,由于join操作前后分区策略不一致造成shuffle,数据量较少(一般低于3G)可以使用广播变量机制在同一个stage中完成join操作

未优化前

val rddData1 = sc.parallelize(Array(("Alice", 15), ("Bob", 18), ("Thomas", 20), ("Catalina", 25)))
val rddData2 = sc.parallelize(Array(("Alice", "Female"), ("Thomas", "Male"), ("Tom", "Male")))

val rddData3 = rddData1.join(rddData2, 3)
println(rddData3.collect.mkString(","))

优化后,避免shuffle

val data1 = Map(("Alice", 15), ("Bob", 18), ("Thomas", 20), ("Catalina", 25))
val rddData2 = sc.parallelize(Array(("Alice", "Female"), ("Thomas", "Male"), ("Tom", "Male")))

val rddData1Broadcast = sc.broadcast(data1)
val rddData3 = rddData2.map(t => {
val data1Map = rddData1Broadcast.value
if(data1Map.contains(t._1)){
(t._1, (data1Map(t._1), t._2))
}else{
null
}
}).filter(_ != null)
println(rddData3.collect().mkString(","))

使用Kryo 作为序列化方案

在编程中,涉及到跨进程通信(例如节点之间数据传输),通信时传输数据必须进行序列化

程序中使用

(1)在RDD中的各类转换操作内引用外部变量必须能够序列化

import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer
class Person(_name: String) extends Serializable {
def name = _name
override def toString: String = _name
}
object Chapter10_1_2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("Chapter10_1_2")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array(classOf[Person]))

val sc = new SparkContext(conf)

val arrPerson = new ArrayBuffer[Person]()

for(i <- 1 to 999999){
arrPerson += new Person("姓名" + i)
}

val rddData1 = sc.parallelize(arrPerson, 2)
rddData1.persist(StorageLevel.MEMORY_ONLY_SER)
rddData1.collect()

Thread.sleep(3600 * 1000)
}
}

(2)自定义类型作为RDD泛型

val visitorRDD = sc.parallelize[Person](
Array(
("Bob", 15),
("Thomas", 28),
("Tom", 18),
("Galen", 35),
("Catalina", 12),
("Karen", 9),
("Boris", 20)),
3)
对比原始序列化方案与调优方案

使用ObjectOutputStream,ObjectInputStream API 实现序列化与反序列化,spark默认采用这种方案,但是不高效,因此spark提供了基于Kryo方案

import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer


object Test{
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("Test")
.set("spark.executor.memory", "6g")
.set("spark.driver.memory", "6g")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array(classOf[Person]))
val sc = new SparkContext(conf)

val rdd = sc.parallelize(1 to 10)

rdd.foreach(i => {
//单条处理数据
})

rdd.foreachPartition(iterator => {
for(i <- iterator){
//批量处理数据
}
})

}
}

尽可能批量操作数据

在spark中,无论是转换操作还是输出操作,遍历RDD,DataFrame,DataSet中每个元素时,尽量先按照分区遍历,然后批量处理当前分区中数据,用来提升处理效率,例如rdd.ForeachPartition代替rdd.foreach,用rdd.mapPartition代替 rdd.map ,用ForeachBatch Sink代替 Foreach Sink

但是直接面向分区操作,一次性将分区中所有数据加载到内存中,若分区数据量较大,则GC无法回收,会出现内存溢出

合理设置分区数

在spark输出数据时,每一个分区产生一个文件,若某些分区数据不存在,会产生空文件,在适当时机使用coalesce操作将多个分区合并,有利于文件管理

合理设置批处理间隔

在spark页面查看数据的total delay ,选择合适的批处理时间

数据优化

对于数据的分布不均,到处单个节点的算力不均衡,程序长时间运行,可能会拖垮节点,降低处理效率

自定义partitioner缓解数据倾斜

import org.apache.spark.util.Utils
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}

import scala.util.Random


class UserPartitioner(partitions: Int) extends Partitioner{
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

val random = new Random()

def numPartitions: Int = partitions

override def getPartition(key: Any): Int = key match {
case null => nonNegativeMod((random.nextFloat() * 100000).toInt.hashCode, numPartitions)
case _ => nonNegativeMod((key.toString + "_" + (random.nextFloat() * 100000).toInt).hashCode, numPartitions)
}

def nonNegativeMod(x: Int, mod: Int): Int = {
val rawMod = x % mod
rawMod + (if (rawMod < 0) mod else 0)
}
}

object Test {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("Test")
val sc = new SparkContext(conf)

val arr = Array(
("用户A", "www.baidu.com"),
("用户A", "www.baidu.com"),
("用户A", "www.baidu.com"),
("用户A", "www.baidu.com"),
("用户A", "www.baidu.com"),
("用户A", "www.baidu.com"),
("用户A", "www.baidu.com"),
("用户A", "www.baidu.com"),
("用户A", "www.baidu.com"),
("用户A", "www.baidu.com"),
("用户A", "www.baidu.com"),
("用户A", "www.baidu.com"),
("用户A", "www.baidu.com"),
("用户A", "www.baidu.com"),
("用户B", "www.baidu.com"),
("用户B", "www.baidu.com")
)

val rddData = sc
.parallelize(arr)
.partitionBy(new UserPartitioner(2))

val result = rddData.map(t => {
(t._1, "http://" + t._2)
})

println(result.collect.mkString(","))

Thread.sleep(3600 * 1000)
}
}

数据补全

1.缓存维度数据在spark中(维度数据较少情况)

2.数据补全.先将数据补全,避免频繁和外部系统交互

资源优化

spark.driver.cores: 设置Driver进程数,只在cluster 模式下有效

spark.driver.memory: 设置Driver进程内存,建议通过脚本提交 --driver-memory方式配置



举报

相关推荐

0 条评论