0
点赞
收藏
分享

微信扫一扫

6 spark入门键值对操作sortByKey、groupByKey、groupBy、cogroup


SortByKey

从名字就能看到,是将Key排序用的。如一个PariRDD-["A":1, "C":4, "B":3, "B":5],按Key排序的话就是A、B、C。注意,这个方法只是对Key进行排序,value不排序。

上代码

/**
* 用于对pairRDD按照key进行排序
* @author wuweifeng wrote on 2018/4/18.
*/
public class Test {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
//spark对普通List的reduce操作
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
List<Tuple2<String, Integer>> data = new ArrayList<>();
data.add(new Tuple2<>("A", 10));
data.add(new Tuple2<>("B", 1));
data.add(new Tuple2<>("A", 6));
data.add(new Tuple2<>("C", 5));
data.add(new Tuple2<>("B", 3));

JavaPairRDD<String, Integer> originRDD = javaSparkContext.parallelizePairs(data);
//true为升序,false为倒序
System.out.println(originRDD.sortByKey(true).collect());
System.out.println(originRDD.sortByKey(false).collect());
}
}

结果是

[(A,10), (A,6), (B,1), (B,3), (C,5)]

[(C,5), (B,1), (B,3), (A,10), (A,6)]

GroupByKey

类似于mysql中的groupBy,是按key进行分组,形成结果为RDD[key,Iterable[value]],即value变成了集合。

/**
*
* @author wuweifeng wrote on 2018/4/18.
*/
public class Test {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
//spark对普通List的reduce操作
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
List<Tuple2<String, Integer>> data = new ArrayList<>();
data.add(new Tuple2<>("A", 10));
data.add(new Tuple2<>("B", 1));
data.add(new Tuple2<>("A", 6));
data.add(new Tuple2<>("C", 5));
data.add(new Tuple2<>("B", 3));

JavaPairRDD<String, Integer> originRDD = javaSparkContext.parallelizePairs(data);

System.out.println(originRDD.groupByKey().collect());
}
}

结果是[(B,[1, 3]), (A,[10, 6]), (C,[5])]

GroupBy

和GroupByKey类似,只不过groupByKey是指明了按照Key进行分组,所以作用对象必须是PairRDD型的。而GroupBy明显是不知道该按什么进行分组,即分组规则需要我们自己设定。所以groupBy的参数是接收一个函数,该函数的返回值将作为Key。

public class Test {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
//spark对普通List的reduce操作
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());
List<Integer> data = new ArrayList<>();
data.add(10);
data.add(1);
data.add(6);
data.add(5);
data.add(3);

JavaRDD<Integer> originRDD = javaSparkContext.parallelize(data);
Map map = originRDD.groupBy(x -> {
if (x % 2 == 0) {
return "even";
} else {
return "odd";
}
}).collectAsMap();

System.out.println(map);
}
}

结果是{odd=[1, 5, 3], even=[10, 6]}

参数里的算法就是判断奇数偶数。

cogroup

这个是groupByKey的升级版,groupByKey是对一个RDD里key相同的value进行组合成一个集合。

cogroup则是对多个RDD里key相同的,合并成集合的集合,例如RDD1.cogroup(RDD2,RDD3,…RDDN), 可以得到(key,Iterable[value1],Iterable[value2],Iterable[value3],…,Iterable[valueN]) 

看代码

public class Test {
public static void main(String[] args) {
SparkSession sparkSession = SparkSession.builder().appName("JavaWordCount").master("local").getOrCreate();
//spark对普通List的reduce操作
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkSession.sparkContext());

JavaRDD<Tuple2<String, Integer>> rdd1 = javaSparkContext.parallelize(Arrays.asList(
new Tuple2<>("A", 10),
new Tuple2<>("B", 20),
new Tuple2<>("A", 30),
new Tuple2<>("B", 40)));
JavaRDD<Tuple2<String, Integer>> rdd2 = javaSparkContext.parallelize(Arrays.asList(
new Tuple2<>("A", 100),
new Tuple2<>("B", 200),
new Tuple2<>("A", 300),
new Tuple2<>("B", 400)));
JavaRDD<Tuple2<String, Integer>> rdd3 = javaSparkContext.parallelize(Arrays.asList(
new Tuple2<>("A", 1000),
new Tuple2<>("B", 2000),
new Tuple2<>("A", 3000),
new Tuple2<>("B", 4000)));

JavaPairRDD<String, Integer> pairRDD1 = JavaPairRDD.fromJavaRDD(rdd1);
JavaPairRDD<String, Integer> pairRDD2 = JavaPairRDD.fromJavaRDD(rdd2);
JavaPairRDD<String, Integer> pairRDD3 = JavaPairRDD.fromJavaRDD(rdd3);
JavaPairRDD<String, Tuple3<Iterable<Integer>, Iterable<Integer>, Iterable<Integer>>> pairRDD = pairRDD1.cogroup(pairRDD2, pairRDD3);
System.out.println(pairRDD.collect());
}
}

结果是:

[(B,([20, 40],[200, 400],[2000, 4000])), (A,([10, 30],[100, 300],[1000, 3000]))]


举报

相关推荐

Spark 键值对 RDD

0 条评论