url: spark源码系列03-任务提交01
url: spark源码系列03-任务提交01
- DAGScheduler getShuffleMapStage
创建父Stage
val stage =newOrUsedStage(……
- DAGScheduler newOrUsedStage
递归
val stage = newStage(rdd, numTasks, Some(shuffleDep), jobId, callSite)
14.DAGScheduler handleJobSubmitted
……
if (shouldRunLocally) {
// Compute very short actions like first() or take() with no parent stages locally.
//TODO 本地运行
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))
runLocally(job)
} else {
//TODO 集群模式
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.resultOfJob = Some(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
//TODO 开始提交Stage
submitStage(finalStage)
}
- DAGScheduler submitStage
根据最后一个,递归提交Stage
获取他的父Stage,没提交的Stage
val missing = getMissingParentStages(stage).sortBy(_.id)
//TODO判断父Stage
if (missing == Nil) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
//TODO 从前往后提交,开始提交最前面的Stage
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
- DAGScheduler getMissingParentStages
没有提交Stage,跟getParentStages相似
- DAGScheduler submitMissingTasks
DAG提交Stage给TaskScheduler
创建多少个Task,task数量跟分区一样
val tasks: Seq[Task[_]] = if (stage.isShuffleMap)
partitionsToCompute.map { id =>
//TODO 放哪个机器上,移动数据不如移动计算
val locs = getPreferredLocs(stage.rdd, id)
val part = stage.rdd.partitions(id)
//TODO ShuffleMapTask用于拉取上游的数据
new ShuffleMapTask(stage.id, taskBinary, part, locs)
}
} else {
val job = stage.resultOfJob.get
partitionsToCompute.map { id =>
val p: Int = job.partitions(id)
val part = stage.rdd.partitions(p)
val locs = getPreferredLocs(stage.rdd, p)
//TODO ResultTask将计算结果写入hdfs,nosql...
new ResultTask(stage.id, taskBinary, part, locs, id)
}
- ResultTask runTask
得到序列化器
val ser = SparkEnv.get.closureSerializer.newInstance()
反序列化Task,得到RDD和作用在RDD上的函数
val (rdd, func) = ser.deserialize……
开始调用函数,partition针对某个分区,比如拆分单词(”tom”,1)
func(context, rdd.iterator(partition, context))
- ShuffleMapTask runTask
ShuffleMapTask 的职责是为下游的 RDD 计算出输入数据。ShuffleMapTask 要计算出 partition 数据并通过 shuffle write 写入磁盘(由 BlockManager 来管理)来等待下游的 RDD 通过 shuffle read 读取
//TODO 获取shuffleManager 对象manager
val manager = SparkEnv.get.shuffleManager
//TODO 获取用于将分区数据写到文件系统中的写对象writer
writer = manager.getWriterAny, Any
//TODO 触发rdd开始计算,并将计算结果通过writer写入到文件系统
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
//TODO 停止writer并返回结果
return writer.stop(success = true).get