0
点赞
收藏
分享

微信扫一扫

Spark算子:RDD基本转换操作map、flatMap


import org.apache.spark._

object rdd_test {

System.setProperty("hadoop.home.dir", "C:\\hadoop_home\\")
def main(args: Array[String]) {

/*
* Spark算子:RDD基本转换操作之 map、flatMap、distinct学习笔记
*
*/
val sparkConf = new SparkConf().setMaster("local").setAppName("RDD_TEST_chenxun")
val sc = new SparkContext(sparkConf)
val lines = sc.textFile("D:\\spark_data\\data.txt")
for (v <-lines) println(v)
/*
hello chen
hello spark
hello hadoop
*/

//map操作
val mapresult1 = lines.map(line => line.split(" ")).collect()
//Array[Array[String]] = Array(Array(hello, chen), Array(hello, spark), Array(hello, hadoop))
/*
将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。
输入分区与输出分区一对一,即:有多少个输入分区,就有多少个输出分区

mapresult.collect()结果是:(后面会比较flatMap的结果有什么不同)
Array[Array[String]] = Array(Array(hello, chen), Array(hello, spark), Array(hello, hadoop))

可以用下面的方法输出
for (elem <- lines.map(line => line.split(" ")).collect()) {
for(v <- elem){
print(v + " ")
}
println()
}
*/

//flatMap 操作
val mapresult2 = lines.flatMap(line => line.split(" ")).collect()
//Array[String] = Array(hello, chen, hello, spark, hello, hadoop)

/* 属于Transformation算子,第一步和map一样,最后将所有的输出分区合并成一个。
Array[String] = Array(hello, chen, hello, spark, hello, hadoop)
for(elem <- mapresult2){
println(elem)
}
*/

//计算每一行的长度
val lineLengths = lines.map(s => s.length)
for( v <- lineLengths){
println(v)
}

val totalLength = lineLengths.reduce((a, b) =>

举报

相关推荐

0 条评论