0
点赞
收藏
分享

微信扫一扫

spark源码跟踪(十)container申请与Executor启动

Container申请与Executor启动

一,介绍

yarn-cluster模式下,Spark Container资源的申请与Executor启动相关代码是在ApplicationMaster 主线程中初始化执行一次,其后在Reporter线程中循环重复执行的,其具体实现是在YarnAllocator.scala类中

二,YarnAllocator说明

该类通过调用YARN’s AMRMClient APIs与YARN集群交互,负责向YARN ResourceManager 申请containers,并在申请到containers决定如何使用它们。
该类有三大功能:
1,记录application的资源需求
2,调用allocate方法,同步containers请求给ResourceManager,返回分配的containers;该allocate还发挥了心跳的作用(重复调用)。
3,处理授予application的container,以便可能在其中启动executor。

该类中的public方法要么是线程安全的,要么是有做同步处理的。

2.1,类构造函数

private[yarn] class YarnAllocator(
    driverUrl: String,
    driverRef: RpcEndpointRef,
    conf: YarnConfiguration,
    sparkConf: SparkConf,
    amClient: AMRMClient[ContainerRequest],
    appAttemptId: ApplicationAttemptId,
    securityMgr: SecurityManager,
    localResources: Map[String, LocalResource],
    resolver: SparkRackResolver,
    clock: Clock = new SystemClock)
  extends Logging 

sparkConf: SparkConf 是用户创建SparkContext对象是指定的sparkConf对象,并加入了SparkSubmit 命令行参数。

三,Container资源申请

3.1,常用概念及其类

    1,Container:实现子类ContainerPBImpl,是yarn 资源分配的基本单位,其对象由YarnResourceManager 返回给yarn客户端(客户端在ApplicationMaster中),代表资源分配给Application。
重要属性:
private ContainerId containerId = null;
private NodeId nodeId = null; //唯一标示一个节点包含(hostname,port信息),
private Resource resource = null; //表示Container中的资源包含CPU,Memory信息,
private Priority priority = null; //优先级

2,ContainerRequest:yarn客户端提交的资源申请封装器。
重要属性:

  final Resource capability; //表示Container中的资源包含CPU,Memory信息,
    final List<String> nodes;  // 在指定的节点范围内申请
    final List<String> racks;    // 在指定的机架范围内申请
    final Priority priority;    //优先级
    final boolean relaxLocality; // 是否放松对节点位置的要求。默认情况下yarn调度程序会尝试在请求的位置(特定机架特定节点)分配容器,但他们可能会放松约束,以加快满足分配限制。它们首先将约束放松到与请求节点相同的机架,然后放松到集群中的任何位置。

3.3,调用过程

源头是ApplicationMaster中的调用
在这里插入图片描述

3.3.1,allocateResources(): Unit


/**
 * Request resources such that, if YARN gives us all we ask for, we'll have a number of containers
 * equal to maxExecutors.
 *
 * Deal with any containers YARN has granted to us by possibly launching executors in them.
 *
 * This must be synchronized because variables read in this method are mutated by other methods.
 */
def allocateResources(): Unit = synchronized {
  updateResourceRequests()


  val progressIndicator = 0.1f
  // Poll the ResourceManager. This doubles as a heartbeat if there are no pending container
  // requests.
  val allocateResponse = amClient.allocate(progressIndicator)


  val allocatedContainers = allocateResponse.getAllocatedContainers()
  allocatorBlacklistTracker.setNumClusterNodes(allocateResponse.getNumClusterNodes)


  if (allocatedContainers.size > 0) {
    logDebug(("Allocated containers: %d. Current executor count: %d. " +
      "Launching executor count: %d. Cluster resources: %s.")
      .format(
        allocatedContainers.size,
        runningExecutors.size,
        numExecutorsStarting.get,
        allocateResponse.getAvailableResources))


    handleAllocatedContainers(allocatedContainers.asScala)
  }


  val completedContainers = allocateResponse.getCompletedContainersStatuses()
  if (completedContainers.size > 0) {
    logDebug("Completed %d containers".format(completedContainers.size))
    processCompletedContainers(completedContainers.asScala)
    logDebug("Finished processing %d completed containers. Current running executor count: %d."
      .format(completedContainers.size, runningExecutors.size))
  }
}

2, updateResourceRequests() 更新资源请求情况。


/**
 * Update the set of container requests that we will sync with the RM based on the number of
 * executors we have currently running and our target number of executors.
 *
 * Visible for testing.
 */
def updateResourceRequests(): Unit = {
  val pendingAllocate = getPendingAllocate
  val numPendingAllocate = pendingAllocate.size
  val missing = targetNumExecutors - numPendingAllocate -
    numExecutorsStarting.get - runningExecutors.size
  logDebug(s"Updating resource requests, target: $targetNumExecutors, " +
    s"pending: $numPendingAllocate, running: ${runningExecutors.size}, " +
    s"executorsStarting: ${numExecutorsStarting.get}")


  // Split the pending container request into three groups: locality matched list, locality
  // unmatched list and non-locality list. Take the locality matched container request into
  // consideration of container placement, treat as allocated containers.
  // For locality unmatched and locality free container requests, cancel these container
  // requests, since required locality preference has been changed, recalculating using
  // container placement strategy.
  val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
    hostToLocalTaskCounts, pendingAllocate)


  if (missing > 0) {
    if (log.isInfoEnabled()) {
      var requestContainerMessage = s"Will request $missing executor container(s), each with " +
          s"${resource.getVirtualCores} core(s) and " +
          s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead)"
      if (ResourceRequestHelper.isYarnResourceTypesAvailable() &&
          executorResourceRequests.nonEmpty) {
        requestContainerMessage ++= s" with custom resources: " + resource.toString
      }
      logInfo(requestContainerMessage)
    }


    // cancel "stale" requests for locations that are no longer needed
    staleRequests.foreach { stale =>
      amClient.removeContainerRequest(stale)
    }
    val cancelledContainers = staleRequests.size
    if (cancelledContainers > 0) {
      logInfo(s"Canceled $cancelledContainers container request(s) (locality no longer needed)")
    }


    // consider the number of new containers and cancelled stale containers available
    val availableContainers = missing + cancelledContainers


    // to maximize locality, include requests with no locality preference that can be cancelled
    val potentialContainers = availableContainers + anyHostRequests.size


    val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers(
      potentialContainers, numLocalityAwareTasks, hostToLocalTaskCounts,
        allocatedHostToContainersMap, localRequests)


    val newLocalityRequests = new mutable.ArrayBuffer[ContainerRequest]
    containerLocalityPreferences.foreach {
      case ContainerLocalityPreferences(nodes, racks) if nodes != null =>
        newLocalityRequests += createContainerRequest(resource, nodes, racks)
      case _ =>
    }


    if (availableContainers >= newLocalityRequests.size) {
      // more containers are available than needed for locality, fill in requests for any host
      for (i <- 0 until (availableContainers - newLocalityRequests.size)) {
        newLocalityRequests += createContainerRequest(resource, null, null)
      }
    } else {
      val numToCancel = newLocalityRequests.size - availableContainers
      // cancel some requests without locality preferences to schedule more local containers
      anyHostRequests.slice(0, numToCancel).foreach { nonLocal =>
        amClient.removeContainerRequest(nonLocal)
      }
      if (numToCancel > 0) {
        logInfo(s"Canceled $numToCancel unlocalized container requests to resubmit with locality")
      }
    }


    newLocalityRequests.foreach { request =>
      amClient.addContainerRequest(request)
    }


    if (log.isInfoEnabled()) {
      val (localized, anyHost) = newLocalityRequests.partition(_.getNodes() != null)
      if (anyHost.nonEmpty) {
        logInfo(s"Submitted ${anyHost.size} unlocalized container requests.")
      }
      localized.foreach { request =>
        logInfo(s"Submitted container request for host ${hostStr(request)}.")
      }
    }
  } else if (numPendingAllocate > 0 && missing < 0) {
    val numToCancel = math.min(numPendingAllocate, -missing)
    logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new desired " +
      s"total $targetNumExecutors executors.")
    // cancel pending allocate requests by taking locality preference into account
    val cancelRequests = (staleRequests ++ anyHostRequests ++ localRequests).take(numToCancel)
    cancelRequests.foreach(amClient.removeContainerRequest)
  }
}

在这里插入图片描述
这里获取还需要申请的Excutor(Container)数量=总共需要的数量-已申请未满足的数量-正在启动的数量-正在运行的数量。
targetNumExecutors:spark application总共需要的的Excutor数量。
默认情况下

spark.dynamicAllocation.enabled=false

targetNumExecutors值为为
spark.executor.instances(spark-defaults.conf)
或者
–num-executors(spark-submit)
命令行参数指定的值,如果未指定就是默认值2
生产集群一般会将spark.dynamicAllocation.enabled设置为true,允许根据工作负载调整在此application中注册的Executor的数量。此时Executor的数量由以下参数决定:

spark.dynamicAllocation.initialExecutors
spark.dynamicAllocation.maxExecutors    
spark.dynamicAllocation.minExecutors    

initialnum=max(spark.dynamicAllocation.initialExecutors , spark.executor.instances(spark-defaults.conf)或者–num-executors)

初始化时数量为initialnum,运行时会根据实际情况动态调整其数量,数量在spark.dynamicAllocation.minExecutors ~ spark.dynamicAllocation.maxExecutors 范围内。


val DEFAULT_NUMBER_EXECUTORS = 2


/**
 * Getting the initial target number of executors depends on whether dynamic allocation is
 * enabled.
 * If not using dynamic allocation it gets the number of executors requested by the user.
 */
def getInitialTargetExecutorNumber(
    conf: SparkConf,
    numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = {
  if (Utils.isDynamicAllocationEnabled(conf)) {
    val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS)
    val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf)
    val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
    require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors,
      s"initial executor number $initialNumExecutors must between min executor number " +
        s"$minNumExecutors and max executor number $maxNumExecutors")


    initialNumExecutors
  } else {
    conf.get(EXECUTOR_INSTANCES).getOrElse(numExecutors)
  }
}

在这里插入图片描述

// A map to store preferred hostname and possible task numbers running on it.
private var hostToLocalTaskCounts: Map[String, Int] = Map.empty 

hostToLocalTaskCounts类变量默认情况下是空值,其作用是标明一台主机上可能会有多少个task在执行,这些信息会用来决定申请Yarn机器中的哪些服务器?以及服务器中申请多少Container。具体一台主机上有多少个task 取决于Driver线程中客户代码的实现(读什么数据?数据块分布在哪些主机?),第一次申请资源时,Driver线程还未唤醒,所以hostToLocalTaskCounts=Map.empty ,后续该值的更改时通过后台通讯收到的Driver线程发送的信息来修改的。

参考: 就近计算.

private def splitPendingAllocationsByLocality(
    hostToLocalTaskCount: Map[String, Int],
    pendingAllocations: Seq[ContainerRequest]
  ): (Seq[ContainerRequest], Seq[ContainerRequest], Seq[ContainerRequest]) = {
  val localityMatched = ArrayBuffer[ContainerRequest]()
  val localityUnMatched = ArrayBuffer[ContainerRequest]()
  val localityFree = ArrayBuffer[ContainerRequest]()


  val preferredHosts = hostToLocalTaskCount.keySet
  pendingAllocations.foreach { cr =>
    val nodes = cr.getNodes
    if (nodes == null) {
      localityFree += cr
    } else if (nodes.asScala.toSet.intersect(preferredHosts).nonEmpty) {
      localityMatched += cr
    } else {
      localityUnMatched += cr
    }
  }


  (localityMatched, localityUnMatched, localityFree)
}

splitPendingAllocationsByLocality根据业务代码对主机的需求信息将等待的ContainerRequest分为三类:

localRequests(localityMatched):有本地性要求且与用户需求匹配的
staleRequests(localityUnMatched):有本地性要求且与用户需求不匹配的
anyHostRequests(localityFree):无本地性要求的

如果还需要请求container:
对于staleRequests,客户端向yarn提交请求取消申请资源。
在这里插入图片描述

继续往下使用LocalityPreferredContainerPlacementStrategy类(Container首选位置放置策略),重新计算当前需要的Container,并生成新的ContainerRequest对象,并添加到资源请求列表中(暂未提交给Yarn,存储在AMRMClientImpl的ask集合对象中)。
为何要重新计算?
因为业务代码(RDD定义在Driver线程)和资源动态申请(Reporter线程)是在两个线程中并发执行的,一个job中stage执行结束,新的stage提交,就近计算的需求可能不一样hostToLocalTaskCounts会重新赋值,所以需要重新生成ContainerRequest对象。

在这里插入图片描述
如果等待的数量>0,且在运行+在启动+在等待的数量>目标数量,则取消多出来的等待请求,优先取消本地性匹配程度差的请求。
在这里插入图片描述

3.3.2,请求资源

val allocateResponse = amClient.allocate(progressIndicator)

@Override
public AllocateResponse allocate(float progressIndicator) 
    throws YarnException, IOException {
  Preconditions.checkArgument(progressIndicator >= 0,
      "Progress indicator should not be negative");
  AllocateResponse allocateResponse = null;
  List<ResourceRequest> askList = null;
  List<ContainerId> releaseList = null;
  AllocateRequest allocateRequest = null;
  List<String> blacklistToAdd = new ArrayList<String>();
  List<String> blacklistToRemove = new ArrayList<String>();
  
  try {
    synchronized (this) {
      askList = new ArrayList<ResourceRequest>(ask.size());
      for(ResourceRequest r : ask) {
        // create a copy of ResourceRequest as we might change it while the 
        // RPC layer is using it to send info across
        askList.add(ResourceRequest.newInstance(r.getPriority(),
            r.getResourceName(), r.getCapability(), r.getNumContainers(),
            r.getRelaxLocality(), r.getNodeLabelExpression()));
      }
      releaseList = new ArrayList<ContainerId>(release);
      // optimistically clear this collection assuming no RPC failure
      ask.clear();
      release.clear();


      blacklistToAdd.addAll(blacklistAdditions);
      blacklistToRemove.addAll(blacklistRemovals);
      
      ResourceBlacklistRequest blacklistRequest =
          ResourceBlacklistRequest.newInstance(blacklistToAdd,
              blacklistToRemove);
      
      allocateRequest =
          AllocateRequest.newInstance(lastResponseId, progressIndicator,
            askList, releaseList, blacklistRequest);
      // clear blacklistAdditions and blacklistRemovals before 
      // unsynchronized part
      blacklistAdditions.clear();
      blacklistRemovals.clear();
    }


    try {
      allocateResponse = rmClient.allocate(allocateRequest);
    } catch (ApplicationMasterNotRegisteredException e) {
      LOG.warn("ApplicationMaster is out of sync with ResourceManager,"
          + " hence resyncing.");
      synchronized (this) {
        release.addAll(this.pendingRelease);
        blacklistAdditions.addAll(this.blacklistedNodes);
        for (Map<String, TreeMap<Resource, ResourceRequestInfo>> rr : remoteRequestsTable
          .values()) {
          for (Map<Resource, ResourceRequestInfo> capabalities : rr.values()) {
            for (ResourceRequestInfo request : capabalities.values()) {
              addResourceRequestToAsk(request.remoteRequest);
            }
          }
        }
      }
      // re register with RM
      registerApplicationMaster();
      allocateResponse = allocate(progressIndicator);
      return allocateResponse;
    }


    synchronized (this) {
      // update these on successful RPC
      clusterNodeCount = allocateResponse.getNumClusterNodes();
      lastResponseId = allocateResponse.getResponseId();
      clusterAvailableResources = allocateResponse.getAvailableResources();
      if (!allocateResponse.getNMTokens().isEmpty()) {
        populateNMTokens(allocateResponse.getNMTokens());
      }
      if (allocateResponse.getAMRMToken() != null) {
        updateAMRMToken(allocateResponse.getAMRMToken());
      }
      if (!pendingRelease.isEmpty()
          && !allocateResponse.getCompletedContainersStatuses().isEmpty()) {
        removePendingReleaseRequests(allocateResponse
            .getCompletedContainersStatuses());
      }
    }
  } finally {
    // TODO how to differentiate remote yarn exception vs error in rpc
    if(allocateResponse == null) {
      // we hit an exception in allocate()
      // preserve ask and release for next call to allocate()
      synchronized (this) {
        release.addAll(releaseList);
        // requests could have been added or deleted during call to allocate
        // If requests were added/removed then there is nothing to do since
        // the ResourceRequest object in ask would have the actual new value.
        // If ask does not have this ResourceRequest then it was unchanged and
        // so we can add the value back safely.
        // This assumes that there will no concurrent calls to allocate() and
        // so we dont have to worry about ask being changed in the
        // synchronized block at the beginning of this method.
        for(ResourceRequest oldAsk : askList) {
          if(!ask.contains(oldAsk)) {
            ask.add(oldAsk);
          }
        }
        
        blacklistAdditions.addAll(blacklistToAdd);
        blacklistRemovals.addAll(blacklistToRemove);
      }
    }
  }
  return allocateResponse;
}

在这里插入图片描述
拿到Container之后,使用它。
最后获取已经结束了的Executor,发送关闭的通知。
在这里插入图片描述
在这里插入图片描述

3.3.3,handleAllocatedContainers

拿到containers资源后使用它启动Executors


/**
 * Handle containers granted by the RM by launching executors on them.
 *
 * Due to the way the YARN allocation protocol works, certain healthy race conditions can result
 * in YARN granting containers that we no longer need. In this case, we release them.
 *
 * Visible for testing.
 */
def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
  val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)


  // Match incoming requests by host
  val remainingAfterHostMatches = new ArrayBuffer[Container]
  for (allocatedContainer <- allocatedContainers) {
    matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,
      containersToUse, remainingAfterHostMatches)
  }


  // Match remaining by rack. Because YARN's RackResolver swallows thread interrupts
  // (see SPARK-27094), which can cause this code to miss interrupts from the AM, use
  // a separate thread to perform the operation.
  val remainingAfterRackMatches = new ArrayBuffer[Container]
  if (remainingAfterHostMatches.nonEmpty) {
    var exception: Option[Throwable] = None
    val thread = new Thread("spark-rack-resolver") {
      override def run(): Unit = {
        try {
          for (allocatedContainer <- remainingAfterHostMatches) {
            val rack = resolver.resolve(allocatedContainer.getNodeId.getHost)
            matchContainerToRequest(allocatedContainer, rack, containersToUse,
              remainingAfterRackMatches)
          }
        } catch {
          case e: Throwable =>
            exception = Some(e)
        }
      }
    }
    thread.setDaemon(true)
    thread.start()


    try {
      thread.join()
    } catch {
      case e: InterruptedException =>
        thread.interrupt()
        throw e
    }


    if (exception.isDefined) {
      throw exception.get
    }
  }


  // Assign remaining that are neither node-local nor rack-local
  val remainingAfterOffRackMatches = new ArrayBuffer[Container]
  for (allocatedContainer <- remainingAfterRackMatches) {
    matchContainerToRequest(allocatedContainer, ANY_HOST, containersToUse,
      remainingAfterOffRackMatches)
  }


  if (remainingAfterOffRackMatches.nonEmpty) {
    logDebug(s"Releasing ${remainingAfterOffRackMatches.size} unneeded containers that were " +
      s"allocated to us")
    for (container <- remainingAfterOffRackMatches) {
      internalReleaseContainer(container)
    }
  }


  runAllocatedContainers(containersToUse)


  logInfo("Received %d containers from YARN, launching executors on %d of them."
    .format(allocatedContainers.size, containersToUse.size))
}

重点关注Executor启动
在这里插入图片描述
这里有一个线程池launchContainers,在其中启动ExecutorRunnable,其中封装了NodeManager客户端,Executor启动命令等。启动完毕后,更新状态。

3.3.3,ExecutorRunnable

构造方法

private[yarn] class ExecutorRunnable(
    container: Option[Container],
    conf: YarnConfiguration,
    sparkConf: SparkConf,
    masterAddress: String,
    executorId: String,
    hostname: String,
    executorMemory: Int,
    executorCores: Int,
    appId: String,
    securityMgr: SecurityManager,
    localResources: Map[String, LocalResource],
    resourceProfileId: Int) 

run方法首先执行,启动NodeManager客户端,调用startContainer()

def run(): Unit = {
  logDebug("Starting Executor Container")
  nmClient = NMClient.createNMClient()
  nmClient.init(conf)
  nmClient.start()
  startContainer()
}
def startContainer(): java.util.Map[String, ByteBuffer] = {
  val ctx = Records.newRecord(classOf[ContainerLaunchContext])
    .asInstanceOf[ContainerLaunchContext]
  val env = prepareEnvironment().asJava


  ctx.setLocalResources(localResources.asJava)
  ctx.setEnvironment(env)


  val credentials = UserGroupInformation.getCurrentUser().getCredentials()
  val dob = new DataOutputBuffer()
  credentials.writeTokenStorageToStream(dob)
  ctx.setTokens(ByteBuffer.wrap(dob.getData()))


  val commands = prepareCommand()


  ctx.setCommands(commands.asJava)
  ctx.setApplicationACLs(
    YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr).asJava)


  // If external shuffle service is enabled, register with the Yarn shuffle service already
  // started on the NodeManager and, if authentication is enabled, provide it with our secret
  // key for fetching shuffle files later
  if (sparkConf.get(SHUFFLE_SERVICE_ENABLED)) {
    val secretString = securityMgr.getSecretKey()
    val secretBytes =
      if (secretString != null) {
        // This conversion must match how the YarnShuffleService decodes our secret
        JavaUtils.stringToBytes(secretString)
      } else {
        // Authentication is not enabled, so just provide dummy metadata
        ByteBuffer.allocate(0)
      }
    ctx.setServiceData(Collections.singletonMap("spark_shuffle", secretBytes))
  }


  // Send the start request to the ContainerManager
  try {
    nmClient.startContainer(container.get, ctx)
  } catch {
    case ex: Exception =>
      throw new SparkException(s"Exception while starting container ${container.get.getId}" +
        s" on host $hostname", ex)
  }
}

startContainer方法中,拼接Executor启动命令,并通过客户端发送给NodeManager,NodeManager启动YarnCoarseGrainedExecutorBackend进程(Executor)。
Java进程启动命令大致如下:

在这里插入图片描述

四,YarnCoarseGrainedExecutorBackend进程

private[spark] class YarnCoarseGrainedExecutorBackend(
    rpcEnv: RpcEnv,
    driverUrl: String,
    executorId: String,
    bindAddress: String,
    hostname: String,
    cores: Int,
    userClassPath: Seq[URL],
    env: SparkEnv,
    resourcesFile: Option[String],
    resourceProfile: ResourceProfile)
  extends CoarseGrainedExecutorBackend(
    rpcEnv,
    driverUrl,
    executorId,
    bindAddress,
    hostname,
    cores,
    userClassPath,
    env,
    resourcesFile,
    resourceProfile)

YarnCoarseGrainedExecutorBackend进程启动后先执行main方法,在这里插入图片描述
启动了一个名为Executor的EndPoint。其生命周期为constructor -> onStart -> receive* -> onStop。
在这里插入图片描述

查看Executor其生命周期 的实现:
onStart中,执行了Executor注册行为。
在这里插入图片描述
receive

override def receive: PartialFunction[Any, Unit] = {
  case RegisteredExecutor =>
    logInfo("Successfully registered with driver")
    try {
      executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,
        resources = _resources)
      driver.get.send(LaunchedExecutor(executorId))
    } catch {
      case NonFatal(e) =>
        exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
    }


  case LaunchTask(data) =>
    if (executor == null) {
      exitExecutor(1, "Received LaunchTask command but executor was null")
    } else {
      val taskDesc = TaskDescription.decode(data.value)
      logInfo("Got assigned task " + taskDesc.taskId)
      taskResources(taskDesc.taskId) = taskDesc.resources
      executor.launchTask(this, taskDesc)
    }


  case KillTask(taskId, _, interruptThread, reason) =>
    if (executor == null) {
      exitExecutor(1, "Received KillTask command but executor was null")
    } else {
      executor.killTask(taskId, interruptThread, reason)
    }


  case StopExecutor =>
    stopping.set(true)
    logInfo("Driver commanded a shutdown")
    // Cannot shutdown here because an ack may need to be sent back to the caller. So send
    // a message to self to actually do the shutdown.
    self.send(Shutdown)


  case Shutdown =>
    stopping.set(true)
    new Thread("CoarseGrainedExecutorBackend-stop-executor") {
      override def run(): Unit = {
        // executor.stop() will call `SparkEnv.stop()` which waits until RpcEnv stops totally.
        // However, if `executor.stop()` runs in some thread of RpcEnv, RpcEnv won't be able to
        // stop until `executor.stop()` returns, which becomes a dead-lock (See SPARK-14180).
        // Therefore, we put this line in a new thread.
        executor.stop()
      }
    }.start()


  case UpdateDelegationTokens(tokenBytes) =>
    logInfo(s"Received tokens of ${tokenBytes.length} bytes")
    SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf)
}

receive是实现Executor核心功能的地方,重点查看执行task的情况。
在这里插入图片描述

在这里插入图片描述
Executor对象在注册Endpoint的时候已经初始化了,其中维护了一个线程池,用来执行TaskRunner。查看taskRunner的run方法的实现。
在这里插入图片描述
可以看出TaskRunner中会反序列出Task对象(ShuffleMapTask或者ResultTask),调用task.run方法执行具体的用户业务计算代码,计算RDD操作的结果,最后将执行结果返回给Driver端。
在这里插入图片描述

在这里插入图片描述

举报

相关推荐

0 条评论