大数据中按照某个 Key 进行分组,找出每个组内数据的 topN 时,这种情况就 是分组取 topN 问题
解决分组取 TopN 问题有两种方式,第一种就是直接分组,对分组内的数据进 行排序处理。第二种方式就是直接使用定长数组的方式解决分组取 topN 问题。
- 数据源
湖人 24
湖人 8
凯尔特人 0
凯尔特人 21
公牛 23
公牛 1
凯尔特人 34
湖人 23
凯尔特人 88
公牛 13
湖人 6
公牛 10
- scala
package action
import org.apache.spark.{SparkConf, SparkContext}
import scala.util.control.Breaks
/**
* @Author yqq
* @Date 2021/12/10 20:08
* @Version 1.0
*/
object GroupByKeySortTest2 {
def main(args: Array[String]): Unit = {
val context = new SparkContext(
new SparkConf()
.setMaster("local")
.setAppName("group")
)
context.setLogLevel("Error")
context.textFile("data/data.txt")
.map(line=>{
val team = line.split("\t")(0)
val num = line.split("\t")(1).toInt
(team,num)
}).groupByKey().foreach(e=>{
/**
* 定长数组
*/
val key = e._1
val ite = e._2.iterator
val top3 = new Array[Int](3)
val break = new Breaks
while (ite.hasNext){
val num = ite.next()
break.breakable{
for (i<-0 until 3){
if(top3(i)==0){
top3(i)=num
break.break()
}else if (num>top3(i)){
for (j<-2 until (i,-1)){
top3(j)=top3(j-1)
}
top3(i)=num
break.break()
}
}
}
}
println(s"key = $key,num = ${top3.toBuffer}")
/**
* 原生集合排序
*/
// val key = e._1
// val list = e._2.iterator.toList
// val ints = list.sortWith(_>_)
// println(s"key = $key,num = $ints")
// if (ints.length>3){
// for (i<-0 until 3)
// println(s"key = $key,num = ${list(i)}")
// }else{
// for(num<-list)
// println(s"key = $key,num = $num")
// }
})
}
}
3. java
package action;
import org.apache.commons.collections.IteratorUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
/**
* @Author yqq
* @Date 2021/12/10 18:08
* @Version 1.0
*/
public class GroupByKeySortTest {
public static void main(String[] args) {
JavaSparkContext context = new JavaSparkContext(
new SparkConf()
.setMaster("local")
.setAppName("group")
);
context.setLogLevel("Error");
context.textFile("data/data.txt")
.mapToPair(e->{
String team = e.split("\t")[0];
Integer num = Integer.valueOf(e.split("\t")[1]);
return new Tuple2<>(team,num);
}).groupByKey()
.foreach(e->{
/**
* 定长数组方式
*/
String team = e._1;
Iterator<Integer> ite = e._2.iterator();
Integer[] top3 = new Integer[3];
while (ite.hasNext()){
Integer num = ite.next();
for (int i = 0; i < top3.length; i++) {
if (top3[i]==null){
top3[i]=num;
break;
}else if (num>top3[i]){
for (int j = 2; j > i; j--) {
top3[j] = top3[j-1];
}
top3[i] = num;
break;
}
}
}
for (Integer num:top3){
System.out.println("Team:"+team+",num:"+num);
}
/**
* 原生集合排序,数据处理发生在spark集群节点executor里,很危险
*/
// String team = e._1;
// List<Integer> list = IteratorUtils.toList(e._2().iterator());
// Collections.sort(list, new Comparator<Integer>() {
// @Override
// public int compare(Integer o1, Integer o2) {
// return o2-o1;
// }
// });
// if (list.size()>3){
// for (int i = 0; i < 3; i++) {
// System.out.println("Team:"+team+",num:"+list.get(i));
// }
// }else
// for (Integer num:list)
// System.out.println("Team:"+team+",num:"+num);
});
}
}