0
点赞
收藏
分享

微信扫一扫

JAVA和Scala版本的WorldCount

幸甚至哉歌以咏志 2022-02-22 阅读 75
javasparkide



文章目录


  • ​​1.scala的worldcount​​

  • ​​完整版本​​
  • ​​精简版本​​

  • ​​java版本​​


1.scala的worldcount

完整版本

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

conf.setMaster("local") setAppName ("wc")
val context = new SparkContext(conf)

val lines: RDD[String] = context.textFile("./wc.txt")

val words: RDD[String] = lines.flatMap(x => {x.split(" ")})

val pair: RDD[(String, Int)] = words.map(x => {(x,1)})

val result: RDD[(String, Int)] = pair.reduceByKey((x,y) => {x+y})

result.foreach(println)
}

精简版本

new SparkContext(new SparkConf().setMaster("local") setAppName ("wc")).textFile("./wc.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).foreach(println)

java版本

代码思路:通过flatMap切割,然后mapToPair转换成Tuple<word,1>,然后reduceByKey,最后foreach打印

SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("wc");
JavaSparkContext context = new JavaSparkContext(conf);
JavaRDD<String> lineRDD = context.textFile("./wc.txt");
JavaRDD<String> wordRDD = lineRDD.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String s) throws Exception {
String[] split = s.split(" ");
return Arrays.asList(split);
}
});

JavaPairRDD<String, Integer> wordAndOne = wordRDD.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2(s, 1);
}
});
JavaPairRDD<String, Integer> resultRDD = wordAndOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
resultRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> result) throws Exception {
System.out.println(result);
}
});

java自定义排序WordCount的排序规则

思路


通过mapToPair交换key-value,然后对key进行sortByKey重新回调 再mapToPair交换下key-value

sortByKey(false) : 降序

sortByKey(true) : 升序




举报

相关推荐

0 条评论