文章目录
- 1.scala的worldcount
- 完整版本
- 精简版本
- java版本
1.scala的worldcount
完整版本
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local") setAppName ("wc")
val context = new SparkContext(conf)
val lines: RDD[String] = context.textFile("./wc.txt")
val words: RDD[String] = lines.flatMap(x => {x.split(" ")})
val pair: RDD[(String, Int)] = words.map(x => {(x,1)})
val result: RDD[(String, Int)] = pair.reduceByKey((x,y) => {x+y})
result.foreach(println)
}
精简版本
new SparkContext(new SparkConf().setMaster("local") setAppName ("wc")).textFile("./wc.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).foreach(println)
java版本
代码思路:通过flatMap切割,然后mapToPair转换成Tuple<word,1>,然后reduceByKey,最后foreach打印
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("wc");
JavaSparkContext context = new JavaSparkContext(conf);
JavaRDD<String> lineRDD = context.textFile("./wc.txt");
JavaRDD<String> wordRDD = lineRDD.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String s) throws Exception {
String[] split = s.split(" ");
return Arrays.asList(split);
}
});
JavaPairRDD<String, Integer> wordAndOne = wordRDD.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2(s, 1);
}
});
JavaPairRDD<String, Integer> resultRDD = wordAndOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
resultRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> result) throws Exception {
System.out.println(result);
}
});
java自定义排序WordCount的排序规则
思路
通过mapToPair交换key-value,然后对key进行sortByKey重新回调 再mapToPair交换下key-value
sortByKey(false) : 降序
sortByKey(true) : 升序