Spark 应用执行机制分析
Spark应用的基本概念
首先,介绍 Spark 应用涉及的一些基本概念:
Term | Meaning |
Application应用 | 基于 Spark 构建的用户程序。由 Driver 和集群上的 Executor 组成。 |
Application jar | 包含用户的 Spark 应用程序的 jar。在某些情况下,用户会希望创建一个包含其应用程序及其依赖项的“uber jar”。用户的 jar 不应包含 Hadoop 或 Spark 库,但是,这些库将在运行时添加。 |
Driver program | 运行应用程序的 main() 函数并创建 SparkContext 的进程 |
Cluster manager集群管理器 | 用于获取集群资源的外部服务(例Standalone、Mesos、YARN) |
Deploy mode部署模式 | 区分 Driver 进程的运行位置。在“集群”模式下,在集群内启动驱动程序。在“客户端”模式下,提交者在集群外部启动 Driver。 |
Worker node | 可以在集群中运行应用程序代码的任何节点 |
Executor | Application 运行在 Worker 节点上的一个进程,该进程负责运行 Tasks 并将数据保存在内存或磁盘中。每个 application 都有自己的 executors。 |
再介绍 Spark 应用(application)执行过程中的各个组件的概念:
1、Task(任务):RDD的一个分区队医一个 Task,Task 是单个分区上的最小处理流程单元。
2、TaskSet(任务集):一组关联的,但相互之间没有 Shuffle 依赖关系的 Task 集合。
3、Stage(阶段):Spark 中每个job 会因为 RDD 之间的依赖关系拆分成多组任务集合,也叫作任务集(TaskSet),一个 Stage 都包含一个 TaskSet。
4、job(作业):RDD 中由 action 算子,即行动操作触发生成的由一个或多个 Stage 组成的计算作业。
5、DAGScheduler:面向 Stage 的任务调度器,接收 Spark 提交的作业,根据 RDD 的依赖关系划分 Stage,并提交 Stage 给 TaskScheduler。
6、TaskScheduler:面向任务的调度器,接收 DAGScheduler 提交过来的 Stage,然后将 TaskSet 分发给 Worker Node 运行并返回结果。
Spark调度机制
3、应用程序的执行
(1) RDD 依赖
(2) 阶段的划分
(3) 任务的切分
(4) 任务的调度
(5) 任务的执行
Spark 应用程序其实是一系列对RDD的操作,这些操作中分为转换算子和行动算子,转换算子是 lazy 加载的,只是定义了 逻辑操作,并没有执行,只有遇到了行动算子,才会触发任务的执行。Spark运行调度流程大概是这样:1、spark应用程序进行各种转换操作,通过行动算子来触发作业运行。提交之后根据 RDD 之间的依赖关系构建 DAG (有向无环图),然后 DAG 图由 DAGSchedule 进行解析
2、DAGSchedule 对 DAG 图进行拆分成阶段(stage),拆分的依据是 RDD 之间是否有 Shuffle 操作,或根据 RDD 之间的宽窄依赖关系进行拆分,拆分为 stage 后,每个 stage 包含一个或多个任务,然后将任务 交给 TaskSchedule 进行调度运行。
3、TaskSchedule 接收来自 DAGSchedule 发送过来的任务集,然后将其分发到 Worker 节点的 Executor 中运行。如果某个任务运行失败,TaskSchedule 负责重试。
4、Worker 中的 Executor 收到 TaskSchedule 发送过来的任务之后,以多线程的方式运行,每一个线程运行一个任务,任务运行结束后将结果返回给 TaskSchedule。
job的提交
以 count 算子为例,count 函数的源码为:
// 返回 RDD 中的元素数量
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
其中 sc 为 SparkContext 的对象,可见在 RDD 的源码的 count 方法中触发了 SparkContext 的 runJob 方法来提交作业,由其内部隐式的完成,不需要用户显示提交。经过多次调用 runJob 后,会调用 dagSchedule.job方法
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
waiter.completionFuture.value.get match {
case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
case scala.util.Failure(exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
val callerStackTrace = Thread.currentThread().getStackTrace.tail
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
throw exception
}
}
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// Check to make sure we are not launching a task on a partition that does not exist.
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}
// SPARK-23626: `RDD.getPartitions()` can be slow, so we eagerly compute
// `.partitions` on every RDD in the DAG to ensure that `getPartitions()`
// is evaluated outside of the DAGScheduler's single-threaded event loop:
eagerlyComputePartitionsForRddAndAncestors(rdd)
val jobId = nextJobId.getAndIncrement()
if (partitions.isEmpty) {
val clonedProperties = Utils.cloneProperties(properties)
if (sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) == null) {
clonedProperties.setProperty(SparkContext.SPARK_JOB_DESCRIPTION, callSite.shortForm)
}
val time = clock.getTimeMillis()
listenerBus.post(
SparkListenerJobStart(jobId, time, Seq.empty, clonedProperties))
listenerBus.post(
SparkListenerJobEnd(jobId, time, JobSucceeded))
// Return immediately if the job is running 0 tasks
return new JobWaiter[U](this, jobId, 0, resultHandler)
}
assert(partitions.nonEmpty)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler)
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
Utils.cloneProperties(properties)))
waiter
}
在 DAGScheduler 的 runJob() 方法中有个 submitJob() 方法,该方法会为这个 Job 生成一个 Job ID 并且创建了一个 jobWaiter 对象来监听 Job 的执行情况,而 Job 是由多个 Task 组成的,因此只有 Job 的所有 Task 都成功完成,Job 才标记为成功;任意一个 Task 失败都会标记该 Job 失败。然后通过 DAGScheduleEventProcessLoop 进行处理,最后在 DAGSchedulerEventProcessLoop 消息接收方法 OnReceive -> doOnReceive,接收到 JobSubmitted 样例类进行模式匹配后,继续调用 DAGScheduler 的 handleJobSubmittd 方法来提交作业,然后进行阶段的划分。
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
Utils.cloneProperties(properties)))
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
阶段(stage)的划分
当一个 Job 被提交以后,DAGSchedule 会从 RDD 依赖链的末端触发,遍历整个 RDD 依赖链,从而划分调度阶段。调度阶段的划分主要基于是否为宽依赖进行的(shuffleDependency)。也就是说 ShuffleDependency 前为一个阶段,后面以 Shuffle 后的 RDD 开始是一个新的阶段。
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties): Unit = {
// 根据最后一个 RDD 回溯,获取最后一个调度阶段 finalStage
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: BarrierJobSlotsNumberCheckFailed =>
// If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically.
val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId,
(_: Int, value: Int) => value + 1)
logWarning(s"Barrier stage in job $jobId requires ${e.requiredConcurrentTasks} slots, " +
s"but only ${e.maxConcurrentTasks} are available. " +
s"Will retry up to ${maxFailureNumTasksCheck - numCheckFailures + 1} more times")
if (numCheckFailures <= maxFailureNumTasksCheck) {
messageScheduler.schedule(
new Runnable {
override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,
partitions, callSite, listener, properties))
},
timeIntervalNumTasksCheck,
TimeUnit.SECONDS
)
return
} else {
// Job failed, clear internal data.
barrierJobIdToNumTasksCheckFailures.remove(jobId)
listener.jobFailed(e)
return
}
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
// Job submitted, clear internal data.
barrierJobIdToNumTasksCheckFailures.remove(jobId)
// 根据最后一个调度阶段 finalStage 生成作业
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos,
Utils.cloneProperties(properties)))
submitStage(finalStage)
}
作业的提交完成后开始阶段的划分,代码的实现是从 handleJobSubmitted 开始的,该方法根据最后一个 RDD 生成的 ResultStage 开始的,具体方法从 finalRDD 使用 getMissingParentStage 找出其依赖的祖先 RDD 是否存在 Shuffle 操作,如果没有 Shuffle 操作,则本次作业仅有一个 ResultStage,该 ResultStage 不存在父调度阶段;如果存在 Shuffle 操作,则本次作业存在一个 ResultStage 和至少一个 ShuffleStage。
private def getMissingParentStages(stage: Stage): List[Stage] = {
val missing = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new ListBuffer[RDD[_]]
waitingForVisit += stage.rdd
def visit(rdd: RDD[_]): Unit = {
if (!visited(rdd)) {
visited += rdd
val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
if (rddHasUncachedPartitions) {
// 遍历 RDD 依赖链
for (dep <- rdd.dependencies) {
dep match {
// 如果遇见 shuffleDependency,则依据此依赖关系划分Stage,并添加该 Stage 的父 Stage 到哈希列表中
case shufDep: ShuffleDependency[_, _, _] =>
val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
if (!mapStage.isAvailable) {
missing += mapStage
}
case narrowDep: NarrowDependency[_] =>
waitingForVisit.prepend(narrowDep.rdd)
}
}
}
}
}
while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.remove(0))
}
missing.toList
}
Stage 的调度
在 Stage 的划分过程中,会产生一个或多个相互关联的 Stage。其中,真正执行 Action 算子的 RDD 所在的 Stage 被称为 Final Stage。DAGScheduler 会从 final Stage 生成作业实例。
任务的提交
当调度阶段完成后,在 DAGScheduler 的 submitMissingTasks 方法中,会根据调度阶段 Partitin 个数拆分对应个数任务,这些任务组成一个任务集(taskSet)提交到 TaskScheduler 进行处理。对于 ResultStage (作业中最后的调度阶段)生成 ResultTask,对于 ShuffleMapStage 生成 ShuffleMapTask。submitMissingTasks 方法具体代码如下:
private def submitMissingTasks(stage: Stage, jobId: Int): Unit = {
logDebug("submitMissingTasks(" + stage + ")")
// Before find missing partition, do the intermediate state clean work first.
// The operation here can make sure for the partially completed intermediate stage,
// `findMissingPartitions()` returns all partitions every time.
stage match {
case sms: ShuffleMapStage if stage.isIndeterminate && !sms.isAvailable =>
mapOutputTracker.unregisterAllMapAndMergeOutput(sms.shuffleDep.shuffleId)
sms.shuffleDep.newShuffleMergeState()
case _ =>
}
// Figure out the indexes of partition ids to compute.
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
// Use the scheduling pool, job group, description, etc. from an ActiveJob associated
// with this Stage
val properties = jobIdToActiveJob(jobId).properties
addPySparkConfigsToProperties(stage, properties)
runningStages += stage
// SparkListenerStageSubmitted should be posted before testing whether tasks are
// serializable. If tasks are not serializable, a SparkListenerStageCompleted event
// will be posted, which should always come after a corresponding SparkListenerStageSubmitted
// event.
stage match {
case s: ShuffleMapStage =>
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
// Only generate merger location for a given shuffle dependency once.
if (s.shuffleDep.shuffleMergeAllowed) {
if (!s.shuffleDep.isShuffleMergeFinalizedMarked) {
prepareShuffleServicesForShuffleMapStage(s)
} else {
// Disable Shuffle merge for the retry/reuse of the same shuffle dependency if it has
// already been merge finalized. If the shuffle dependency was previously assigned
// merger locations but the corresponding shuffle map stage did not complete
// successfully, we would still enable push for its retry.
s.shuffleDep.setShuffleMergeAllowed(false)
logInfo(s"Push-based shuffle disabled for $stage (${stage.name}) since it" +
" is already shuffle merge finalized")
}
}
case s: ResultStage =>
outputCommitCoordinator.stageStart(
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
}
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
case NonFatal(e) =>
stage.makeNewStageAttempt(partitionsToCompute.size)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo,
Utils.cloneProperties(properties)))
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
// If there are tasks to execute, record the submission time of the stage. Otherwise,
// post the even without the submission time, which indicates that this stage was
// skipped.
if (partitionsToCompute.nonEmpty) {
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
}
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo,
Utils.cloneProperties(properties)))
// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
// Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
// the serialized copy of the RDD and for each task we will deserialize it, which means each
// task gets a different copy of the RDD. This provides stronger isolation between tasks that
// might modify state of objects referenced in their closures. This is necessary in Hadoop
// where the JobConf/Configuration object is not thread-safe.
var taskBinary: Broadcast[Array[Byte]] = null
var partitions: Array[Partition] = null
try {
// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
// For ResultTask, serialize and broadcast (rdd, func).
var taskBinaryBytes: Array[Byte] = null
// taskBinaryBytes and partitions are both effected by the checkpoint status. We need
// this synchronization in case another concurrent job is checkpointing this RDD, so we get a
// consistent view of both variables.
RDDCheckpointData.synchronized {
taskBinaryBytes = stage match {
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage =>
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}
partitions = stage.rdd.partitions
}
if (taskBinaryBytes.length > TaskSetManager.TASK_SIZE_TO_WARN_KIB * 1024) {
logWarning(s"Broadcasting large task binary with size " +
s"${Utils.bytesToString(taskBinaryBytes.length)}")
}
taskBinary = sc.broadcast(taskBinaryBytes)
} catch {
// In the case of a failure during serialization, abort the stage.
case e: NotSerializableException =>
abortStage(stage, "Task not serializable: " + e.toString, Some(e))
runningStages -= stage
// Abort execution
return
case e: Throwable =>
abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
// Abort execution
return
}
val artifacts = jobIdToActiveJob(jobId).artifacts
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber(), taskBinary,
part, stage.numPartitions, locs, artifacts, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
stage.rdd.isBarrier())
}
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptNumber(),
taskBinary, part, stage.numPartitions, locs, id, artifacts, properties,
serializedTaskMetrics, Option(jobId), Option(sc.applicationId),
sc.applicationAttemptId, stage.rdd.isBarrier())
}
}
} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
if (tasks.nonEmpty) {
logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
val shuffleId = stage match {
case s: ShuffleMapStage => Some(s.shuffleDep.shuffleId)
case _: ResultStage => None
}
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber(), jobId, properties,
stage.resourceProfileId, shuffleId))
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
// the stage as completed here in case there are no tasks to run
stage match {
case stage: ShuffleMapStage =>
logDebug(s"Stage ${stage} is actually done; " +
s"(available: ${stage.isAvailable}," +
s"available outputs: ${stage.numAvailableOutputs}," +
s"partitions: ${stage.numPartitions})")
if (!stage.shuffleDep.isShuffleMergeFinalizedMarked &&
stage.shuffleDep.getMergerLocs.nonEmpty) {
checkAndScheduleShuffleMergeFinalize(stage)
} else {
processShuffleMapStageCompletion(stage)
}
case stage : ResultStage =>
logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")
markStageAsFinished(stage)
submitWaitingChildStages(stage)
}
}
}
在 DAGScheduler 的 submitMissingTasks 方法中 有一段
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber(), jobId, properties,
stage.resourceProfileId, shuffleId))
由此可以看出 DAGSchedule 将 Tasks 交给了 TaskScheduler,将 tasks 封装成了 TaskSet 对象,可以看到 TaskScheduler 中的 submitTasks 方法中最后调用了 backend.reviveOffers() 来请求计算资源,
override def submitTasks(taskSet: TaskSet): Unit = {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks "
+ "resource profile " + taskSet.resourceProfileId)
this.synchronized {
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets.foreach { case (_, ts) =>
ts.isZombie = true
}
stageTaskSets(taskSet.stageAttemptId) = manager
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run(): Unit = {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS, TimeUnit.MILLISECONDS)
}
hasReceivedTask = true
}
backend.reviveOffers()
}
override def reviveOffers(): Unit = Utils.tryLogNonFatalError {
driverEndpoint.send(ReviveOffers)
}
dirverEndpoint发送后,CoarseGrainedSchedulerBackend 包中 receive 方法会进行处理,接下来通过 makeOffers() 方法来处理 ReviveOffers 的消息。其中最后会通过 launchTasks 方法发送消息来完成对 tasks 的启动,然后 CoarseGraindeExecutorBackend 接收到 LauchTask 消息,调用 Executor.scala 中的 lauchTask 方法,初始化一个 TaskRunner,再将 TaskRunner 对象放进 ThreadPool 中去运行。
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
val taskId = taskDescription.taskId
val tr = createTaskRunner(context, taskDescription)
runningTasks.put(taskId, tr)
val killMark = killMarks.get(taskId)
if (killMark != null) {
tr.kill(killMark._1, killMark._2)
killMarks.remove(taskId)
}
threadPool.execute(tr)
if (decommissioned) {
log.error(s"Launching a task while in decommissioned state.")
}
}