0
点赞
收藏
分享

微信扫一扫

sparkRDD中分组取TopN案例以及优化

吃面多放酱 2022-02-17 阅读 46



文章目录


  • ​​分组取topN,使用的是集合中的List排序,会有性能瓶颈,内存溢出​​
  • ​​调用RDD的sortBy方法,对每个RDD中数据进行排序,采用内存+磁盘的方式​​
  • ​​自定义分区器,根据学科分区​​
  • ​​自定义分区的优化,减少shuffle​​


分组取topN,使用的是集合中的List排序,会有性能瓶颈,内存溢出

package rdd

import java.net.URL

import org.apache.spark.{SparkConf, SparkContext}

object FavObjTeacher {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("FavObjTeacher")
val sc = new SparkContext(conf)
val lines = sc.textFile("C:\\Users\\admin\\Desktop\\test\\teacher.log")
// http://bigdata.edu360.cn/laozhang
val subjectTeacherAndOne = lines.map(
x => {
val index = x.lastIndexOf("/")
val teacher = x.substring(index + 1)
val httpHost = x.substring(0, index)
val subject = new URL(httpHost).getHost.split("[.]")(0)
//直接使用map归类
((subject, teacher), 1)
})
//聚合,将学科和老师联合作为key
val reduced = subjectTeacherAndOne.reduceByKey(_+_)
// 分组排序 按学科进行分组
val grouped = reduced.groupBy(_._1._1)

//转换成List,使用scala中的集合特性进行转换
val sorted = grouped.mapValues(_.toList.sortBy(_._2).reverse.take(3))

val r = sorted.collect()
print(r.toBuffer)
}
}

调用RDD的sortBy方法,对每个RDD中数据进行排序,采用内存+磁盘的方式

package rdd

import java.net.URL

import org.apache.spark.{SparkConf, SparkContext}

object FavObjTeacher {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("FavObjTeacher")
val sc = new SparkContext(conf)
val lines = sc.textFile("C:\\Users\\admin\\Desktop\\test\\teacher.log")
// http://bigdata.edu360.cn/laozhang
val subjectTeacherAndOne = lines.map(
x => {
val index = x.lastIndexOf("/")
val teacher = x.substring(index + 1)
val httpHost = x.substring(0, index)
val subject = new URL(httpHost).getHost.split("[.]")(0)
//直接使用map归类
((subject, teacher), 1)
})
//聚合,将学科和老师联合作为key
val reduced = subjectTeacherAndOne.reduceByKey(_ + _)

val subjects = Array("bigdata", "javaee", "php")

for (sb <- subjects) {
// 使用过滤的方式,将RDD中对应的数据中只有一个数据
val filtered = reduced.filter(_._1._1 == sb)
//调用sortBy方法
val favTeacher = filtered.sortBy(_._2, false).take(3)
println(favTeacher.toBuffer)
}
sc.stop()
}
}

自定义分区器,根据学科分区

自定义分区器,partitionBy,mapPartition,其中mapPartition要求传入一个迭代器和传出一个迭代器

import java.net.URL
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
import scala.collection.mutable

object FavTeacher2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("FavObjTeacher")
val sc = new SparkContext(conf)
val lines = sc.textFile("C:\\Users\\admin\\Desktop\\test\\teacher.log")
// http://bigdata.edu360.cn/laozhang
val subjectTeacherAndOne = lines.map(
x => {
val index = x.lastIndexOf("/")
val teacher = x.substring(index + 1)
val httpHost = x.substring(0, index)
val subject = new URL(httpHost).getHost.split("[.]")(0)
//直接使用map归类
((subject, teacher), 1)
})
//聚合,将学科和老师联合作为key
val reduced = subjectTeacherAndOne.reduceByKey(_ + _)

//计算有多少个学科
val subjects = reduced.map(_._1._1).distinct().collect()

val sbPartitioner = new SubjectPartitioner(subjects)

//调用partitionBy方法 按照指定的分区规则排序
val partitionedRDD = reduced.partitionBy(sbPartitioner)

//一次拿出一个分区相当于操作一个分区中的数据 使用算子mapPartition
val sorted = partitionedRDD.mapPartitions(it => {
it.toList.sortBy(_._2).reverse.take(3).iterator
})
val r = sorted.collect()
println(r.toBuffer)
}
}

//自定义分区器
class SubjectPartitioner(sbs: Array[String]) extends Partitioner {
// 此处是写的是主构造器中

//返回分区数量,即 下一个RDD中有多少个分区
override def numPartitions: Int = sbs.length

val rules = new mutable.HashMap[String, Int]()
var i = 0 //分区号
for (elem <- sbs) {
rules.put(elem,i)
i +=1
}
override def getPartition(key: Any): Int = {
// 获取学科名称
val subject = key.asInstanceOf[(String, String)]._1
//根据规则计算分区编号,在scala中对于map对象可以使用 map(key),会直接返回对应的value
rules(subject)
}
}

自定义分区的优化,减少shuffle

reduceBykey ,partitionBy都是会产生shuffle

reduce的时候就按照分区进行聚合,制造一个长度固定的集合

package rdd

import java.net.URL

import org.apache.spark.{Partitioner, SparkConf, SparkContext}

import scala.collection.mutable

object FavTeacher3 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("FavObjTeacher")
val sc = new SparkContext(conf)
val lines = sc.textFile("C:\\Users\\admin\\Desktop\\test\\teacher.log")
// http://bigdata.edu360.cn/laozhang
val subjectTeacherAndOne = lines.map(
x => {
val index = x.lastIndexOf("/")
val teacher = x.substring(index + 1)
val httpHost = x.substring(0, index)
val subject = new URL(httpHost).getHost.split("[.]")(0)
//直接使用map归类
((subject, teacher), 1)
})


val subjects = subjectTeacherAndOne.map(_._1._1).distinct().collect()

val sbPartitioner = new SubjectPartitioner(subjects)

val reduced2 = subjectTeacherAndOne.reduceByKey(sbPartitioner,_ + _)

//一次拿出一个分区相当于操作一个分区中的数据 使用算子mapPartition
val sorted = reduced2.mapPartitions(it => {
it.toList.sortBy(_._2).reverse.take(3).iterator
})
val r = sorted.collect()
// sorted.saveAsTextFile("")
println(r.toBuffer)
}
}
//自定义分区器
class SubjectPartitioner(sbs: Array[String]) extends Partitioner {
// 此处是写的是主构造器中

//返回分区数量,即 下一个RDD中有多少个分区
override def numPartitions: Int = sbs.length

val rules = new mutable.HashMap[String, Int]()
var i = 0 //分区号
for (elem <- sbs) {
rules.put(elem,i)
i +=1
}
override def getPartition(key: Any): Int = {
// 获取学科名称
val subject = key.asInstanceOf[(String, String)]._1
//根据规则计算分区编号,在scala中对于map对象可以使用 map(key),会直接返回对应的value
rules(subject)
}
}



举报

相关推荐

0 条评论