def testCreate(spark: SparkSession) = {
val sc: SparkContext = spark.sparkContext
// 使用makeRDD从集合中创建
val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 2), (2, 3), (3, 4), (4, 5)))
// 使用parallelize从集合中创建
val rdd2: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5))
// 从外部文件中获取(还可以从各种数据库、HDFS等等外部资源中获取)
val rdd3: RDD[String] = sc.textFile("***/RddDemo.scala")
//从rdd2中获取新的rdd
val rdd4: RDD[Int] = rdd2.map(x => x + 2)
}
// List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val rdd1: RDD[Int] = spark.sparkContext.parallelize(1 to 10)
// 可以简写成这个样子rdd1.map(_ + 2)
//List(3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
val rdd2: RDD[Int] = rdd1.map(x => x + 2)
//List(3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
val rdd3: RDD[Int] = rdd1.mapPartitions(x => x.map(e => e + 2))
//List((0,1), (1,2), (2,3), (3,4), (3,5), (4,6), (5,7), (6,8), (7,9), (7,10))
val rdd4: RDD[(Int, Int)] = rdd1.mapPartitionsWithIndex((index, x) => x.map(e => (index, e)))
//List(-2, -1, 0, 1, -1, 0, 1, 2, 0, 1, 2, 3, 1, 2, 3, 4, 2, 3, 4, 5, 3, 4, 5, 6, 4, 5, 6, 7, 5, 6, 7, 8, 6, 7, 8, 9, 7, 8, 9, 10)
val rdd5: RDD[Int] = rdd1.flatMap(x => x - 3 to x)
//List(List(3), List(4, 5), List(7), List(1), List(6), List(2), List(9, 10), List(8))
// 没有指定指定分片数量
val rdd6: RDD[Array[Int]] = rdd1.glom()
val r1: RDD[Int] = sc.parallelize(1 to 10, 3)
//List(List(1, 2, 3), List(7, 8, 9, 10), List(4, 5, 6))
// 指定分片数量
val rdd61: RDD[Array[Int]] = r1.glom()
//List((0,CompactBuffer(3, 6, 9)), (1,CompactBuffer(1, 4, 7, 10)), (2,CompactBuffer(2, 5, 8)))
//每个元素对3取余分组
val rdd7: RDD[(Int, Iterable[Int])] = rdd1.groupBy(x => x % 3)
//List(2, 4, 6, 8, 10)
//求全部偶数集合
val rdd8: RDD[Int] = rdd1.filter(x => x % 2 == 0)
val distinctRdd: RDD[Int] = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4, 5))
// 可以指定并行度 得到的结果顺序可能不一致
// List(1, 2, 3, 4, 5)
val rdd9: RDD[Int] = distinctRdd.distinct()
// List(3, 4, 1, 5, 2)
val rdd91: RDD[Int] = distinctRdd.distinct(3)
//排序 -倒序
val sorted1: RDD[Int] = rdd1.sortBy(x => x, false, 4)
println("操作后数据: " + sorted1.collect().toList)
val s1: RDD[(Int, Int, Int)] = sc.makeRDD(Array((1, 2, 6), (6, 5, 3), (1, 2, 3), (9, 6, 6), (6, 7, 3)))
// 按照第三个参数 第二个参数 第一个参数顺序排序
val sorted2: RDD[(Int, Int, Int)] = s1.sortBy(x => (x._3, x._2, x._1), false)
val sorted3: Array[(Int, Int, Int)] = s1.top(10)(Ordering[(Int, Int, Int)].on(x => (x._3, x._2, x._1)))
//整数排序:1: List(10, 9, 8, 7, 6, 5, 4, 3, 2, 1)
//多参数排序2: List((9,6,6), (1,2,6), (6,7,3), (6,5,3), (1,2,3))
//多参数排序3: List((9,6,6), (1,2,6), (6,7,3), (6,5,3), (1,2,3))
val r2: RDD[Int] = sc.parallelize(1 to 10, 4)
val rdd10: RDD[Int] = r2.coalesce(1)
println("之前分区数量: " + r2.partitions.length + " ;调整之后:" + rdd10.partitions.length)
val rdd11: RDD[Int] = r2.repartition(3)
println("之前分区数量: " + r2.partitions.length + " ;调整之后:" + rdd11.partitions.length)
//之前分区数量: 4 ;coalesce调整之后:1
//之前分区数量: 4 ;repartition调整之后:3
//放回抽样
val rdd12: RDD[Int] = rdd1.sample(true, 0.5, 2)
println("放回抽样:" + rdd12.collect().toList)
val rdd13: RDD[Int] = rdd1.sample(false, 0.5, 2)
println("不放回抽样: " + rdd13.collect().toList)
//放回抽样:List(3, 5, 7, 9, 9)
//不放回抽样: List(2, 3, 5, 8, 10)
// 可以是编译器内的绝对路径,window环境不能执行 linux的shell脚本
val rdd14: RDD[String] = rdd1.pipe("***\\pip.bat")
println("管道执行:" + rdd14.collect().toList)
def testTransferDoubleValue(spark: SparkSession) = {
val sc: SparkContext = spark.sparkContext
val rdd1: RDD[Int] = sc.parallelize(1 to 4)
val rdd2: RDD[Int] = sc.parallelize(2 to 5)
//求并集
val union: RDD[Int] = rdd1.union(rdd2)
println("数据源1: " + rdd1.collect().toList)
println("数据源2: " + rdd2.collect().toList)
println("union(): " + union.collect().toList)
//差集
val subtract: RDD[Int] = rdd1.subtract(rdd2)
println("subtract(): " + subtract.collect().toList)
//交集
val intersection: RDD[Int] = rdd1.intersection(rdd2)
println("intersection(): " + intersection.collect().toList)
//笛卡尔积
val cartesian: RDD[(Int, Int)] = rdd1.cartesian(rdd2)
println("cartesian(): " + cartesian.collect().toList)
//zip 两个rdd的元素个数及分片个数必须是一样的,否则会报错
val zip: RDD[(Int, Int)] = rdd1.zip(rdd2)
println("zip(): " + zip.collect().toList)
}
// 数据源1: List(1, 2, 3, 4)
// 数据源2: List(2, 3, 4, 5)
// union(): List(1, 2, 3, 4, 2, 3, 4, 5)
// subtract(): List(1)
// intersection(): List(2, 3, 4)
// cartesian(): List((1,2), (1,3), (1,4), (1,5), (2,2), (2,3), (2,4), (2,5), (3,2), (3,3), (3,4), (3,5), (4,2), (4,3), (4,4), (4,5))
// zip(): List((1,2), (2,3), (3,4), (4,5))
val sc: SparkContext = spark.sparkContext
val rdd1: RDD[(String, Int)] = sc.makeRDD(Array(("AA", 1), ("AA", 13), ("BB", 2), ("BB", 5), ("CC", 7)))
//分组之后求key对应的数量 指定分片数量之后输出的顺序或会改变
val groupRdd: RDD[(String, Iterable[Int])] = rdd1.groupByKey()
//分组之后进行聚合
val rdd2: RDD[(String, Int)] = groupRdd.map(x => (x._1, x._2.sum))
// 可以使用分片参数
val rdd21: RDD[(String, Int)] = rdd1.groupByKey(3).map(x => (x._1, x._2.sum))
val rdd22: RDD[(String, Int)] = rdd1.groupByKey(new HashPartitioner(3)).map(x => (x._1, x._2.sum))
println("分组求和: " + rdd2.collect().toList)
println("分组求和: " + rdd21.collect().toList)
println("分组求和: " + rdd22.collect().toList)
// 分组求和: List((BB,7), (CC,7), (AA,14))
// 分组求和: List((BB,7), (AA,14), (CC,7))
// 分组求和: List((BB,7), (AA,14), (CC,7))
// 等同上一个计算的结果
val rdd3: RDD[(String, Int)] = rdd1.reduceByKey((a, b) => (a + b))
val rdd31: RDD[(String, Int)] = rdd1.reduceByKey((a, b) => (a + b), 3)
val rdd32: RDD[(String, Int)] = rdd1.reduceByKey(new HashPartitioner(3), (a, b) => (a + b))
println("根据key计算:" + rdd3.collect().toList)
println("根据key计算:" + rdd31.collect().toList)
println("根据key计算:" + rdd32.collect().toList)
// 根据key计算:List((BB,7), (CC,7), (AA,14))
// 根据key计算:List((BB,7), (AA,14), (CC,7))
// 根据key计算:List((BB,7), (AA,14), (CC,7))
// 结果和上一个分组求和完全一致
// 按照key的倒序排列
val rdd4: RDD[(String, Int)] = rdd1.sortByKey(false)
println("根据key计算:" + rdd4.collect().toList)
// 根据key计算:List((CC,7), (BB,2), (BB,5), (AA,1), (AA,13))
val j1: RDD[(String, Int)] = sc.makeRDD(Array(("AA", 1), ("BB", 1), ("CC", 1), ("DD", 1)))
val j2: RDD[(String, Int)] = sc.makeRDD(Array(("AA", 2), ("BB", 2), ("CC", 2)))
val j3: RDD[(String, (Int, Int))] = j1.join(j2, 3)
println("形成新元组操作: " + j3.collect().toList)
// 形成新元组操作: List((BB,(1,2)), (AA,(1,2)), (CC,(1,2)))
val sc: SparkContext = spark.sparkContext
val rdd1: RDD[(String, Int)] = sc.makeRDD(Array(("AA", 1), ("AA", 13), ("BB", 2), ("BB", 5), ("CC", 7)))
val rdd5: RDD[(String, String)] = rdd1.mapValues(x => x + "->Value")
println("仅操作value: " + rdd5.collect().toList)
//仅操作value: List((AA,1->Value), (AA,13->Value), (BB,2->Value), (BB,5->Value), (CC,7->Value))
val sc: SparkContext = spark.sparkContext
val rdd1: RDD[(String, Int)] = sc.makeRDD(Array(("AA", 1), ("AA", 13), ("BB", 2), ("BB", 5), ("CC", 7)))
val value: RDD[(String, (Int, Int))] = rdd1.combineByKey(i => (i, 1), (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int))
=> (acc1._1 + acc2._1, acc1._2 + acc2._2))
println("结果: " + value.collect().toList)
// List(key,(sum,count))
// key联合计算: List((BB,(7,2)), (CC,(7,1)), (AA,(14,2)))
val sc: SparkContext = spark.sparkContext
val rdd1: RDD[(String, Int)] = sc.makeRDD(Array(("AA", 1), ("AA", 13), ("BB", 2), ("BB", 10), ("BB", 5), ("CC", 7)))
val rdd6: RDD[(String, Int)] = rdd1.aggregateByKey(0)((x, y) => math.max(x, y), (x, y) => x + y)
println("key合计算: " + rdd6.collect().toList)
val rdd7: RDD[(String, Int)] = rdd1.foldByKey(0)((x, y) => x + y)
println("key合计算: " + rdd7.collect().toList)
// key合计算: List((BB,17), (CC,7), (AA,14))
val sc: SparkContext = spark.sparkContext
val j1: RDD[(String, Int)] = sc.makeRDD(Array(("AA", 1), ("BB", 1), ("BB", 3), ("CC", 1), ("DD", 1)))
val j2: RDD[(String, Int)] = sc.makeRDD(Array(("AA", 2), ("BB", 2), ("CC", 2)))
val groupRdd1: RDD[(String, (Iterable[Int], Iterable[Int]))] = j1.cogroup(j2)
println("根据key进行分组: " + groupRdd1.collect().toList)
// 根据key进行分组:
//List((DD,(CompactBuffer(1),CompactBuffer())),
// (BB,(CompactBuffer(1, 3),CompactBuffer(2))),
// (CC,(CompactBuffer(1),CompactBuffer(2))),
// (AA,(CompactBuffer(1),CompactBuffer(2))))
val sc: SparkContext = spark.sparkContext
val rdd1: RDD[(String, Int)] = sc.makeRDD(Array(("AA", 1), ("BB", 5), ("BB", 3), ("CC", 7), ("DD", 2)))
println("collect(): " + rdd1.collect().toList)
println("count(): " + rdd1.count())
println("first(): " + rdd1.first())
println("take(3): " + rdd1.take(3).toList)
val countByKey: collection.Map[String, Long] = rdd1.countByKey()
println("countByKey(): " + countByKey)
// 输入的类型和输出的类型要一致
val sum: Int = rdd1.values.reduce((x: Int, y: Int) => x + y)
val sum1: (String, Int) = rdd1.reduce((x: (String, Int), y: (String, Int)) => (x._1 + "-" + y._1, x._2 + y._2))
println("求和的结果: " + sum)
println("求和的结果:+" + sum1)
// 按照key的hashcode值为第一排序规则 value值倒序作为第二排序规则
val takeOrdered: Array[(String, Int)] = rdd1.takeOrdered(3)(Ordering[(Int, Int)].on((x) => (-x._1.hashCode, -x._2)))
println("原数据takeOrdered(3): " + takeOrdered.toList)
// collect(): List((AA,1), (BB,5), (BB,3), (CC,7), (DD,2))
// count(): 5
// first(): (AA,1)
// take(3): List((AA,1), (BB,5), (BB,3))
// countByKey(): Map(DD -> 1, BB -> 2, CC -> 1, AA -> 1)
// 求和的结果: 18
// 求和的结果:+(DD-AA-BB-BB-CC,18)
// 原数据takeOrdered(3): List((DD,2), (CC,7), (BB,5))
//1,2,3,4,5。6.7.8.9.10
val rdd2: RDD[Int] = sc.parallelize(1 to 10)
val aggregate: Int = rdd2.aggregate(0)((x, y) => x + y, (x, y) => x + y)
println("aggregate(0): " + aggregate)
val fold: Int = rdd2.fold(0)((x, y) => x + y)
println("fold(0): " + fold)
// 输出
// aggregate(0): 55
// fold(0): 55
val rdd2: RDD[Int] = sc.parallelize(1 to 10)
rdd2.foreach(x => print(x + ", "))
// 1, 3, 7, 6, 8, 2, 4, 5, 9, 10,
val rdd2: RDD[Int] = sc.parallelize(1 to 10)
rdd2.saveAsTextFile("test.txt")
rdd2.saveAsObjectFile("obj.txt")
// 可以指定文件生成路径,当前是生成在项目的根目录