import org.apache.spark._
object rdd_test {
System.setProperty("hadoop.home.dir", "C:\\hadoop_home\\")
def main(args: Array[String]) {
/*
* Spark算子:RDD基本转换操作之 map、flatMap、distinct学习笔记
*
*/
val sparkConf = new SparkConf().setMaster("local").setAppName("RDD_TEST_chenxun")
val sc = new SparkContext(sparkConf)
val lines = sc.textFile("D:\\spark_data\\data.txt")
for (v <-lines) println(v)
/*
hello chen
hello spark
hello hadoop
*/
//map操作
val mapresult1 = lines.map(line => line.split(" ")).collect()
//Array[Array[String]] = Array(Array(hello, chen), Array(hello, spark), Array(hello, hadoop))
/*
将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。
输入分区与输出分区一对一,即:有多少个输入分区,就有多少个输出分区
mapresult.collect()结果是:(后面会比较flatMap的结果有什么不同)
Array[Array[String]] = Array(Array(hello, chen), Array(hello, spark), Array(hello, hadoop))
可以用下面的方法输出
for (elem <- lines.map(line => line.split(" ")).collect()) {
for(v <- elem){
print(v + " ")
}
println()
}
*/
//flatMap 操作
val mapresult2 = lines.flatMap(line => line.split(" ")).collect()
//Array[String] = Array(hello, chen, hello, spark, hello, hadoop)
/* 属于Transformation算子,第一步和map一样,最后将所有的输出分区合并成一个。
Array[String] = Array(hello, chen, hello, spark, hello, hadoop)
for(elem <- mapresult2){
println(elem)
}
*/
//计算每一行的长度
val lineLengths = lines.map(s => s.length)
for( v <- lineLengths){
println(v)
}
val totalLength = lineLengths.reduce((a, b) =>