0
点赞
收藏
分享

微信扫一扫

Spark SQL编程之RDD-RDD转换

Mezereon 2022-04-29 阅读 143

  def testCreate(spark: SparkSession) = {
    val sc: SparkContext = spark.sparkContext
    // 使用makeRDD从集合中创建
    val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 2), (2, 3), (3, 4), (4, 5)))

    // 使用parallelize从集合中创建
    val rdd2: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5))

    // 从外部文件中获取(还可以从各种数据库、HDFS等等外部资源中获取)
    val rdd3: RDD[String] = sc.textFile("***/RddDemo.scala")

    //从rdd2中获取新的rdd
    val rdd4: RDD[Int] = rdd2.map(x => x + 2)
  }
    // List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    val rdd1: RDD[Int] = spark.sparkContext.parallelize(1 to 10)
    //     可以简写成这个样子rdd1.map(_ + 2)
    //List(3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
    val rdd2: RDD[Int] = rdd1.map(x => x + 2)
        
    //List(3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
    val rdd3: RDD[Int] = rdd1.mapPartitions(x => x.map(e => e + 2))
    //List((0,1), (1,2), (2,3), (3,4), (3,5), (4,6), (5,7), (6,8), (7,9), (7,10))
    val rdd4: RDD[(Int, Int)] = rdd1.mapPartitionsWithIndex((index, x) => x.map(e => (index, e)))
    //List(-2, -1, 0, 1, -1, 0, 1, 2, 0, 1, 2, 3, 1, 2, 3, 4, 2, 3, 4, 5, 3, 4, 5, 6, 4, 5, 6, 7, 5, 6, 7, 8, 6, 7, 8, 9, 7, 8, 9, 10)
    val rdd5: RDD[Int] = rdd1.flatMap(x => x - 3 to x)
    //List(List(3), List(4, 5), List(7), List(1), List(6), List(2), List(9, 10), List(8))
    // 没有指定指定分片数量
    val rdd6: RDD[Array[Int]] = rdd1.glom()
    
    val r1: RDD[Int] = sc.parallelize(1 to 10, 3)
    //List(List(1, 2, 3), List(7, 8, 9, 10), List(4, 5, 6))
    // 指定分片数量
    val rdd61: RDD[Array[Int]] = r1.glom()
    //List((0,CompactBuffer(3, 6, 9)), (1,CompactBuffer(1, 4, 7, 10)), (2,CompactBuffer(2, 5, 8)))
    //每个元素对3取余分组
    val rdd7: RDD[(Int, Iterable[Int])] = rdd1.groupBy(x => x % 3)
    //List(2, 4, 6, 8, 10)
    //求全部偶数集合
    val rdd8: RDD[Int] = rdd1.filter(x => x % 2 == 0)
    val distinctRdd: RDD[Int] = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4, 5))
    // 可以指定并行度 得到的结果顺序可能不一致
    // List(1, 2, 3, 4, 5)
    val rdd9: RDD[Int] = distinctRdd.distinct()
    // List(3, 4, 1, 5, 2)
    val rdd91: RDD[Int] = distinctRdd.distinct(3)
    //排序 -倒序
    val sorted1: RDD[Int] = rdd1.sortBy(x => x, false, 4)
    println("操作后数据: " + sorted1.collect().toList)

    val s1: RDD[(Int, Int, Int)] = sc.makeRDD(Array((1, 2, 6), (6, 5, 3), (1, 2, 3), (9, 6, 6), (6, 7, 3)))
    // 按照第三个参数 第二个参数 第一个参数顺序排序
    val sorted2: RDD[(Int, Int, Int)] = s1.sortBy(x => (x._3, x._2, x._1), false)

    val sorted3: Array[(Int, Int, Int)] = s1.top(10)(Ordering[(Int, Int, Int)].on(x => (x._3, x._2, x._1)))

//整数排序:1: List(10, 9, 8, 7, 6, 5, 4, 3, 2, 1)
//多参数排序2: List((9,6,6), (1,2,6), (6,7,3), (6,5,3), (1,2,3))
//多参数排序3: List((9,6,6), (1,2,6), (6,7,3), (6,5,3), (1,2,3))
    val r2: RDD[Int] = sc.parallelize(1 to 10, 4)
    val rdd10: RDD[Int] = r2.coalesce(1)
    println("之前分区数量: " + r2.partitions.length + " ;调整之后:" + rdd10.partitions.length)
    val rdd11: RDD[Int] = r2.repartition(3)
    println("之前分区数量: " + r2.partitions.length + " ;调整之后:" + rdd11.partitions.length)
//之前分区数量: 4 ;coalesce调整之后:1
//之前分区数量: 4 ;repartition调整之后:3
  //放回抽样
    val rdd12: RDD[Int] = rdd1.sample(true, 0.5, 2)
    println("放回抽样:" + rdd12.collect().toList)
    val rdd13: RDD[Int] = rdd1.sample(false, 0.5, 2)
    println("不放回抽样: " + rdd13.collect().toList)
//放回抽样:List(3, 5, 7, 9, 9)
//不放回抽样: List(2, 3, 5, 8, 10)
    // 可以是编译器内的绝对路径,window环境不能执行 linux的shell脚本
    val rdd14: RDD[String] = rdd1.pipe("***\\pip.bat")
    println("管道执行:" + rdd14.collect().toList)
 def testTransferDoubleValue(spark: SparkSession) = {
    val sc: SparkContext = spark.sparkContext
    val rdd1: RDD[Int] = sc.parallelize(1 to 4)
    val rdd2: RDD[Int] = sc.parallelize(2 to 5)
    //求并集
    val union: RDD[Int] = rdd1.union(rdd2)
    println("数据源1: " + rdd1.collect().toList)
    println("数据源2: " + rdd2.collect().toList)
    println("union(): " + union.collect().toList)
    //差集
    val subtract: RDD[Int] = rdd1.subtract(rdd2)
    println("subtract(): " + subtract.collect().toList)
    //交集
    val intersection: RDD[Int] = rdd1.intersection(rdd2)
    println("intersection(): " + intersection.collect().toList)
    //笛卡尔积
    val cartesian: RDD[(Int, Int)] = rdd1.cartesian(rdd2)
    println("cartesian(): " + cartesian.collect().toList)
    //zip 两个rdd的元素个数及分片个数必须是一样的,否则会报错
    val zip: RDD[(Int, Int)] = rdd1.zip(rdd2)
    println("zip(): " + zip.collect().toList)
  }
// 数据源1: List(1, 2, 3, 4)
// 数据源2: List(2, 3, 4, 5)
// union(): List(1, 2, 3, 4, 2, 3, 4, 5)
// subtract(): List(1)
// intersection(): List(2, 3, 4)
// cartesian(): List((1,2), (1,3), (1,4), (1,5), (2,2), (2,3), (2,4), (2,5), (3,2), (3,3), (3,4), (3,5), (4,2), (4,3), (4,4), (4,5))
// zip(): List((1,2), (2,3), (3,4), (4,5))
    val sc: SparkContext = spark.sparkContext
    val rdd1: RDD[(String, Int)] = sc.makeRDD(Array(("AA", 1), ("AA", 13), ("BB", 2), ("BB", 5), ("CC", 7)))
    //分组之后求key对应的数量 指定分片数量之后输出的顺序或会改变
    val groupRdd: RDD[(String, Iterable[Int])] = rdd1.groupByKey()
    //分组之后进行聚合
    val rdd2: RDD[(String, Int)] = groupRdd.map(x => (x._1, x._2.sum))
    // 可以使用分片参数
    val rdd21: RDD[(String, Int)] = rdd1.groupByKey(3).map(x => (x._1, x._2.sum))
    val rdd22: RDD[(String, Int)] = rdd1.groupByKey(new HashPartitioner(3)).map(x => (x._1, x._2.sum))
    println("分组求和: " + rdd2.collect().toList)
    println("分组求和: " + rdd21.collect().toList)
    println("分组求和: " + rdd22.collect().toList)

// 分组求和: List((BB,7), (CC,7), (AA,14))
// 分组求和: List((BB,7), (AA,14), (CC,7))
// 分组求和: List((BB,7), (AA,14), (CC,7))
    // 等同上一个计算的结果
    val rdd3: RDD[(String, Int)] = rdd1.reduceByKey((a, b) => (a + b))
    val rdd31: RDD[(String, Int)] = rdd1.reduceByKey((a, b) => (a + b), 3)
    val rdd32: RDD[(String, Int)] = rdd1.reduceByKey(new HashPartitioner(3), (a, b) => (a + b))
    println("根据key计算:" + rdd3.collect().toList)
    println("根据key计算:" + rdd31.collect().toList)
    println("根据key计算:" + rdd32.collect().toList)

// 根据key计算:List((BB,7), (CC,7), (AA,14))
// 根据key计算:List((BB,7), (AA,14), (CC,7))
// 根据key计算:List((BB,7), (AA,14), (CC,7))
// 结果和上一个分组求和完全一致
    // 按照key的倒序排列
    val rdd4: RDD[(String, Int)] = rdd1.sortByKey(false)
    println("根据key计算:" + rdd4.collect().toList)

// 根据key计算:List((CC,7), (BB,2), (BB,5), (AA,1), (AA,13))
    val j1: RDD[(String, Int)] = sc.makeRDD(Array(("AA", 1), ("BB", 1), ("CC", 1), ("DD", 1)))
    val j2: RDD[(String, Int)] = sc.makeRDD(Array(("AA", 2), ("BB", 2), ("CC", 2)))
    val j3: RDD[(String, (Int, Int))] = j1.join(j2, 3)
    println("形成新元组操作: " + j3.collect().toList)
// 形成新元组操作: List((BB,(1,2)), (AA,(1,2)), (CC,(1,2)))
val sc: SparkContext = spark.sparkContext

val rdd1: RDD[(String, Int)] = sc.makeRDD(Array(("AA", 1), ("AA", 13), ("BB", 2), ("BB", 5), ("CC", 7)))    

val rdd5: RDD[(String, String)] = rdd1.mapValues(x => x + "->Value")
println("仅操作value: " + rdd5.collect().toList)

//仅操作value: List((AA,1->Value), (AA,13->Value), (BB,2->Value), (BB,5->Value), (CC,7->Value))
    val sc: SparkContext = spark.sparkContext
    val rdd1: RDD[(String, Int)] = sc.makeRDD(Array(("AA", 1), ("AA", 13), ("BB", 2), ("BB", 5), ("CC", 7)))
    val value: RDD[(String, (Int, Int))] = rdd1.combineByKey(i => (i, 1), (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int))
    => (acc1._1 + acc2._1, acc1._2 + acc2._2))
    println("结果: " + value.collect().toList)
//               List(key,(sum,count))  
// key联合计算: List((BB,(7,2)), (CC,(7,1)), (AA,(14,2)))
    val sc: SparkContext = spark.sparkContext
    val rdd1: RDD[(String, Int)] = sc.makeRDD(Array(("AA", 1), ("AA", 13), ("BB", 2), ("BB", 10), ("BB", 5), ("CC", 7)))
    
    val rdd6: RDD[(String, Int)] = rdd1.aggregateByKey(0)((x, y) => math.max(x, y), (x, y) => x + y)
    println("key合计算: " + rdd6.collect().toList)

    val rdd7: RDD[(String, Int)] = rdd1.foldByKey(0)((x, y) => x + y)
    println("key合计算: " + rdd7.collect().toList)

// key合计算: List((BB,17), (CC,7), (AA,14))
val sc: SparkContext = spark.sparkContext    
val j1: RDD[(String, Int)] = sc.makeRDD(Array(("AA", 1), ("BB", 1), ("BB", 3), ("CC", 1), ("DD", 1)))
val j2: RDD[(String, Int)] = sc.makeRDD(Array(("AA", 2), ("BB", 2), ("CC", 2)))
val groupRdd1: RDD[(String, (Iterable[Int], Iterable[Int]))] = j1.cogroup(j2)
println("根据key进行分组:    " + groupRdd1.collect().toList)

// 根据key进行分组:    
//List((DD,(CompactBuffer(1),CompactBuffer())),
//     (BB,(CompactBuffer(1, 3),CompactBuffer(2))),
//     (CC,(CompactBuffer(1),CompactBuffer(2))),
//     (AA,(CompactBuffer(1),CompactBuffer(2))))
    val sc: SparkContext = spark.sparkContext
    val rdd1: RDD[(String, Int)] = sc.makeRDD(Array(("AA", 1), ("BB", 5), ("BB", 3), ("CC", 7), ("DD", 2)))
    println("collect(): " + rdd1.collect().toList)
    println("count(): " + rdd1.count())
    println("first(): " + rdd1.first())
    println("take(3): " + rdd1.take(3).toList)

    val countByKey: collection.Map[String, Long] = rdd1.countByKey()
    println("countByKey(): " + countByKey)

    // 输入的类型和输出的类型要一致
    val sum: Int = rdd1.values.reduce((x: Int, y: Int) => x + y)
    val sum1: (String, Int) = rdd1.reduce((x: (String, Int), y: (String, Int)) => (x._1 + "-" + y._1, x._2 + y._2))
    println("求和的结果: " + sum)
    println("求和的结果:+" + sum1)

    // 按照key的hashcode值为第一排序规则 value值倒序作为第二排序规则
    val takeOrdered: Array[(String, Int)] = rdd1.takeOrdered(3)(Ordering[(Int, Int)].on((x) => (-x._1.hashCode, -x._2)))
    println("原数据takeOrdered(3): " + takeOrdered.toList)

//  collect(): List((AA,1), (BB,5), (BB,3), (CC,7), (DD,2))
//  count(): 5
//  first(): (AA,1)
//  take(3): List((AA,1), (BB,5), (BB,3))
//  countByKey(): Map(DD -> 1, BB -> 2, CC -> 1, AA -> 1)
//  求和的结果: 18
//  求和的结果:+(DD-AA-BB-BB-CC,18)
//  原数据takeOrdered(3): List((DD,2), (CC,7), (BB,5))

 

    //1,2,3,4,5。6.7.8.9.10
    val rdd2: RDD[Int] = sc.parallelize(1 to 10)
    val aggregate: Int = rdd2.aggregate(0)((x, y) => x + y, (x, y) => x + y)
    println("aggregate(0): " + aggregate)
    val fold: Int = rdd2.fold(0)((x, y) => x + y)
    println("fold(0): " + fold)

//  输出
    // aggregate(0): 55
    // fold(0): 55
    val rdd2: RDD[Int] = sc.parallelize(1 to 10)
    rdd2.foreach(x => print(x + ", "))

  // 1, 3, 7, 6, 8, 2, 4, 5, 9, 10,
    val rdd2: RDD[Int] = sc.parallelize(1 to 10)
    rdd2.saveAsTextFile("test.txt")
    rdd2.saveAsObjectFile("obj.txt")
//  可以指定文件生成路径,当前是生成在项目的根目录  

 

举报

相关推荐

0 条评论