0
点赞
收藏
分享

微信扫一扫

Spark四种性能调优思路(四)——数据倾斜调优

洛茄 2022-01-26 阅读 101



文章目录


  • ​​优化一:提高shuffle并行度​​
  • ​​优化二:过滤key​​
  • ​​优化三:预处理​​
  • ​​优化四:两阶段聚合​​
  • ​​优化五:分拆进行join​​


数据倾斜,英文data skew,就是由于数据分布不均匀,造成的数据以及任务计算时间有差异,绝大多数task任务执行很快结束,个别task任务执行非常缓慢,如果在mr中接触过的就应该知道,dataskew的现象就是程序长时间停留在99%的阶段,但是不结束

表现形式

  • 个别task运行很慢
    绝大多数task任务执行很快结束,个别task任务执行非常缓慢。一个spark程序执行时间是由最慢的task所决定的。这也是数据倾斜中最常见的现象。

Spark四种性能调优思路(四)——数据倾斜调优_spark

  • 突然OOM(Out of Memory)
    正常运行的作业,突然某一天OOM,分析原因,是由于key的分布不均匀造成的。

数据倾斜成因

Spark四种性能调优思路(四)——数据倾斜调优_spark_02

处理数据倾斜的思路

发生数据倾斜的原因是由于在shuffle过程中key的分布不均匀造成

解决方法的本质就是让key变均


  1. 找到key
    使用sample算子可以进行抽样,用样本空间评估整体,比如抽取10% ,就可以计算出每一个key对应的次数 ,那么出现次数最多的那些key就是那些发生数据倾斜的key。
  2. 变均匀
    最常用,也是最有用的方法,给这些key加上一个随机数前缀,进行聚合操作。
  3. 去掉前缀
    基于第二步的结果进行再一次的聚合

优化一:提高shuffle并行度

发生数据倾斜之后,最初的尝试就是提高shuffle的并行度,shuffle算子有第二个参数,比如reduceByKey(func, numPartitions),这种处理方案不能从根本上解决数据倾斜,但是会在一定程度上减轻数据倾斜的压力,因为下游分区数变多,自然每一个分区中的数据,相比较原来就减少了,但是,相同key的数据还是回到一个分区中去,所以如果发生数据倾斜,这种情况下是不可能解决数据倾斜。但是提高shuffle并行度,是解决数据倾斜的第一次尝试!

Spark四种性能调优思路(四)——数据倾斜调优_spark_03

优化二:过滤key

如果把这些发生数据倾斜的key干掉,自然其余的key都是分布均匀的,分布均匀在运行的时候,task运行时间也是相对均匀的,也就不会发生数据倾斜了。但是这种方案没有从根本上解决数据倾斜,同时大多数倾斜下,发生数据倾斜的这部分key还是蛮有用的,不能直接过滤,大家需要和运营、产品、项目等相关人员进行确认之后才可进行过滤。

FilterSkewKey.scala

package dataskew

import org.apache.spark.{SparkConf, SparkContext}

/**
* @Author Daniel
* @Description 过滤key
**/
//过滤掉数据倾斜的key
object FilterSkewKey {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("_01FilterSkewKeyOps")
.setMaster("local[*]")
val sc = new SparkContext(conf)
//模拟数据倾斜的数据
val list = List(
"hive spark hadoop hadoop",
"spark hadoop hive spark",
"spark spark",
"spark spark",
"kafka streaming spark"
)
val listRDD = sc.parallelize(list)
val wordsRDD = listRDD.flatMap(_.split("\\s+"))
//找到发生数据倾斜的key,使用抽样算子sample(true有放回抽样false为无返回抽样,抽样比例取值范围就是0~1)
val sampleRDD = wordsRDD.sample(true, 0.8)
//计数
val counts = sampleRDD.map((_, 1)).countByKey()
println("抽样数据为:")
counts.foreach(println)
//排序拿到出现次数最多的key,即发生数据倾斜的key
val firstArr = counts.toList
.sortWith { case ((k1, c1), (k2, c2)) => c1 > c2 }
.take(1)
//拿到第一个list中的第一个元素
val first = firstArr(0)._1
println("发生数据倾斜的key为:" + first)
//在原RDD中过滤掉发生数据倾斜的key
wordsRDD.filter(word => word != first)
.map((_, 1))
.reduceByKey(_ + _)
.foreach(println)
sc.stop()
}
}

优化三:预处理

在spark阶段发生的数据倾斜,是由于数据分布不均匀造成,而spark加载的数据是从外部的介质拉取过来的,要想让spark阶段不发生dataskew,得让拉取过来的数据本身就是已经处理完数据倾斜之后结果。

这种方案,对于类似一个java应用需要从spark计算的结果中拉取数据,所以就需要spark做快速的响应,所以如果有数据倾斜现象,就应该将这部分的操作转移到上游处理,在spark中就没有这部分shuffle操作,也就不会再有数据倾斜,此时spark相当于数据的快速查询引擎,通常比如spark从hive,或者hdfs查数据的时候可以使用这种方案,而且效果是非常明显。

这种方案,从根本上解决了spark阶段的数据倾斜,因为压根儿就没有shuffle操作,只不过是把对应的操作提前到前置阶段,此时spark就只是利用它的一个特点——快,直接加载外部结果给程序调用者来使用。

优化四:两阶段聚合

  • 分析过程:
    首先需要清楚,这种阶段方案主要是针对xxxByKey类的算子造成的数据倾斜。两阶段聚合=局部聚合+全局聚合。

以(hello, 1),(hello, 1),(hello, 1),(hello, 1)为例


  • 局部聚合:
    给key添加N以内的随机前缀,这里比如加2以内,(0_hello, 1),(1_hello, 1),(0_hello, 1),(1_hello, 1),此时进行聚合统计,结果就变为了(0_hello, 2),(1_hello, 2),这就得到了一个局部的统计结果。
  • 全局聚合:
    在局部聚合的基础之上,将随机的前缀干掉,(hello, 2),(hello, 2),再次进行聚合操作,(hello, 4)。

MergeTwoStage.scala

package dataskew

import org.apache.spark.{SparkConf, SparkContext}

import scala.util.Random

/**
* @Author Daniel
* @Description 两阶段聚合
**/
object MergeTwoStage {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("MergeTwoStage")
.setMaster("local[*]")
val sc = new SparkContext(conf)
val list = List(
"hive spark hadoop hadoop",
"spark hadoop hive spark",
"spark spark",
"spark spark",
"kafka streaming spark"
)
val listRDD = sc.parallelize(list)
val wordsRDD = listRDD.flatMap(_.split("\\s+"))
val pairsRDD = wordsRDD.map((_, 1))
//取样,计数,排序
val sortedKeys = pairsRDD.sample(true, 0.7)
.countByKey()
.toList
.sortWith(_._2 > _._2)
println("排序之后的抽样数据:")
sortedKeys.foreach(println)
//拿到排第一的List,取出第一个元素,即发生数据倾斜的key
val dataSkewKey = (sortedKeys.take(1)) (0)._1
//打散原始数据
val newPairsRDD = pairsRDD.map { case (word, count) => {
if (word == dataSkewKey) { //给数据倾斜的key加上随机数(0或1)
val random = new Random()
(random.nextInt(2) + "_" + word, count)
} else {
//其他的不变
(word, count)
}
}
}
println("打散之后的RDD数据:")
newPairsRDD.foreach(println)
//局部聚合
val partAggr = newPairsRDD.reduceByKey(_ + _)
println("局部聚合之后:")
partAggr.foreach(println)
//全局聚合
val fullAggr = partAggr.map { case (prefixKey, count) => {
if (prefixKey.contains("_")) {
//去掉前缀
(prefixKey.substring(prefixKey.indexOf("_") + 1), count)
} else {
(prefixKey, count)
}
}
}
.reduceByKey(_ + _)
println("全局聚合之后的结果:")
fullAggr.foreach(println)
sc.stop()
}
}

优化五:分拆进行join



join的分类

分为了map-join和reduce-join,这也是mr中的join分类。对于join的操作自然就从这两个出发点去处理。map-join适合处理大小表关联。reduce-join适合处理两张大表关联。

对于map-join,就是大小表关联,可以将小表加载到广播变量中,和大表是用map类的算子完成关联,这样在程序中就不会出现join操作,便不会有shuffle,因此就不会出现数据倾斜。

这个案例在Spark四种性能调优思路(一)——开发调优中优化四:量避免使用shuffle类算子出现过

MapJoin.scala

package optimization

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{SparkConf, SparkContext}

/**
* @Author Daniel
* @Description 使用map+广播变量代替join操作
*
**/
object MapJoin {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName(s"${MapJoin.getClass.getSimpleName}")
.setMaster("local[*]")

val sc = new SparkContext(conf)
unJoinOps(sc)

sc.stop()
}

//借助map类的算子和广播变量相当于mapreduce中的map join
def unJoinOps(sc: SparkContext): Unit = {
//sid, name, age, gender
val stuMap = List(
"1,Jacob,19,male",
"2,William,20,male",
"3,Emily,21,female",
"4,Daniel,20,male",
"5,Olivia,31,female"
).map(line => {
val index = line.indexOf(",")
(line.substring(0, index), line.substring(index + 1))
}).toMap

//转化为广播变量
val stuBC: Broadcast[Map[String, String]] = sc.broadcast(stuMap)

//sid, course, score
val scoreRDD = sc.parallelize(List(
"1,Math,88",
"2,Chinese,75",
"3,English,87",
"4,Math,100",
"6,Chinese,77"
))
scoreRDD.map(line => {
val index = line.indexOf(",")
//sid
val id = line.substring(0, index)
//course, score
val otherInfo = line.substring(index + 1)
//拿到广播变量中的id字段
val baseInfo = stuBC.value.get(id)
//如果id被定义(即有值)
if (baseInfo.isDefined) {
//拼接
(id, baseInfo.get + "," + otherInfo)
} else {
//否则设置为Null值
(id, null)
}
//过滤掉Null值
}).filter(_._2 != null).foreach(println)
}
}

如何进行大表关联



思路
上述的两阶段聚合,并不能够解决掉join类的shuffle,要想处理join类的shuffle,使用这种所谓的分拆数据倾斜key,并进行扩容join操作。



Spark四种性能调优思路(四)——数据倾斜调优_大数据_04

  • 代码实现
    ​package dataskew import java.util.Random import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable.ArrayBuffer /** * @Author Daniel * @Description 分拆join表数据进行关联 **/ object SplitJoin { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName(s"${SplitJoin.getClass.getSimpleName}") .setMaster("local[*]") val sc = new SparkContext(conf) val random = new Random() val left = sc.parallelize(List( ("spark", "left0"), ("spark", "left1"), ("spark", "left2"), ("spark", "left3"), ("spark", "left4"), ("spark", "left5"), ("hadoop", "left"), ("mapreduce", "left"), ("rdd", "left") )) val right = sc.parallelize(List( ("spark", "right0"), ("spark", "right1"), ("hadoop", "right"), ("mapreduce", "right") )) //可以看到左表中key有倾斜,右表正常,所以首先在左表中找到倾斜的key val sortedKeys = left.sample(true, 0.7) .countByKey() .toList .sortWith(_._2 > _._2) println("排序之后的抽样数据:") sortedKeys.foreach(println) val dataSkewKey = (sortedKeys.take(1)) (0)._1 //拆分左右两张表,进行单独处理 val leftDSRDD = left.filter { case (key, value) => key == dataSkewKey } val leftNormalRDD = left.filter { case (key, value) => key != dataSkewKey } val rightDSRDD = right.filter { case (key, value) => key == dataSkewKey } val rightNormalRDD = right.filter { case (key, value) => key != dataSkewKey } //先处理normal数据 val normalJoinedRDD = leftNormalRDD.join(rightNormalRDD) println("正常数据进行join之后的结果: ") normalJoinedRDD.foreach(println) //打散左表异常数据,加上随机数 val leftPrefixDSRDD = leftDSRDD.map { case (key, value) => { val prefix = random.nextInt(3) + "_" (prefix + key, value) } } //使用flatMap将右表对应异常数据进行扩容 val rightPrefixDSRDD = rightDSRDD.flatMap { case (key, value) => { val ab = ArrayBuffer[(String, String)]() //使每个结果均匀分布 for (i <- 0 until 3) { ab.append((i + "_" + key, value)) } ab } } //异常数据进行join val prefixJoinedDSRDD = leftPrefixDSRDD.join(rightPrefixDSRDD) println("异常rdd进行join之后的结果:") prefixJoinedDSRDD.foreach(println) //去掉前缀 val dsJoinedRDD = prefixJoinedDSRDD.map { case (prefix, value) => { (prefix.substring(prefix.indexOf("_") + 1), value) } } //异常数据与正常数据union val finalRDD = dsJoinedRDD.union(normalJoinedRDD) println("最终的join结果:") finalRDD.foreach(println) sc.stop() } } ​上面的join操作,仅仅针对一张表正常,一张表少部分异常,大部分正常的情况
    如果加入左表大部分的key都有倾斜的情况,右表正常,此时的处理方式就不适用了。因为此时,有倾斜的数据占大部分,所以分拆的效果也不明显,左表就得全量添加随机前缀,右表全量扩容。显然对内存资源要求非常高,很容易出现OOM异常。

总结:

如果只是处理较为简单的数据倾斜场景,那么使用上述方案中的某一种基本就可以解决。但是如果要处理一个较为复杂的数据倾斜场景,那么可能需要将多种方案组合起来使用。比如我们可以同时在提高shuffle并行度的同时,过滤掉key这样双管齐下可以更好的解决开发中遇到的问题



举报

相关推荐

0 条评论