0
点赞
收藏
分享

微信扫一扫

关于Spark中Stage的传输Pipeline

灯火南山 2022-02-14 阅读 72
sparkscala

关于Spark中Stage的传输Pipeline



object Pipeline {
  def main(args: Array[String]): Unit = {
    //创建连接
    val conf = new SparkConf()
    conf.setMaster("local").setAppName("PPLine");
    val sc = new SparkContext(conf)
    //设置一个两个并发数的数组(2个分区数)
    val rdd = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8), 2)
  
    val rdd1: RDD[Int] = rdd.map(x => {
      println("rdd1----[pid" + TaskContext.get.partitionId + "]----" + x)
      x
    })
    val rdd2: RDD[Int] = rdd1.filter(x => {
      println("rdd2----[pid" + TaskContext.get.partitionId + "]----" + x)
      true
    })
    val rdd3: RDD[(String, Int)] = rdd2.map(x => {
      println("rdd3----[pid" + TaskContext.get.partitionId + "]----" + x)
      Tuple2("yjx" + x % 3, x)
    })

    //在RDD4中进行一个分区运算,在RDD4算子之后会被单独划分出一个stage用于计算(2——>4遇到宽依赖)
    val rdd4: RDD[(String, Int)] = rdd3.reduceByKey((sum: Int, value: Int) => {
      println("rdd4----[pid" + TaskContext.get.partitionId + "]----" + sum + "---" + value)
      sum + value
    }, 3)

    val rdd5: RDD[(String, Int)] = rdd4.map(x => {
      println("rdd5----[pid" + TaskContext.get.partitionId + "]----" + x)
      x
    })
    //启动算子
    rdd5.count
    sc.stop()
  }
}

//含义如下:
  /*
      stage1(
        for(i=0;i<2;i++){
          new Thread(
            textFile-->rdd1()
            rdd2()
            rdd3()-->ShuffleWrite
          ).start()
        }
      )
      stage2(
        for(i=0;i<1;i++){
          new Thread(
            shuffleRead--->rdd4()
            rdd5()
          ).start()
        }
      )
    */


在这里插入图片描述


在这里插入图片描述

举报

相关推荐

0 条评论