关于Spark中Stage的传输Pipeline
object Pipeline {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local").setAppName("PPLine");
val sc = new SparkContext(conf)
val rdd = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8), 2)
val rdd1: RDD[Int] = rdd.map(x => {
println("rdd1----[pid" + TaskContext.get.partitionId + "]----" + x)
x
})
val rdd2: RDD[Int] = rdd1.filter(x => {
println("rdd2----[pid" + TaskContext.get.partitionId + "]----" + x)
true
})
val rdd3: RDD[(String, Int)] = rdd2.map(x => {
println("rdd3----[pid" + TaskContext.get.partitionId + "]----" + x)
Tuple2("yjx" + x % 3, x)
})
val rdd4: RDD[(String, Int)] = rdd3.reduceByKey((sum: Int, value: Int) => {
println("rdd4----[pid" + TaskContext.get.partitionId + "]----" + sum + "---" + value)
sum + value
}, 3)
val rdd5: RDD[(String, Int)] = rdd4.map(x => {
println("rdd5----[pid" + TaskContext.get.partitionId + "]----" + x)
x
})
rdd5.count
sc.stop()
}
}

