Container申请与Executor启动
一,介绍
yarn-cluster模式下,Spark Container资源的申请与Executor启动相关代码是在ApplicationMaster 主线程中初始化执行一次,其后在Reporter线程中循环重复执行的,其具体实现是在YarnAllocator.scala类中
二,YarnAllocator说明
该类通过调用YARN’s AMRMClient APIs与YARN集群交互,负责向YARN ResourceManager 申请containers,并在申请到containers决定如何使用它们。
该类有三大功能:
1,记录application的资源需求
2,调用allocate方法,同步containers请求给ResourceManager,返回分配的containers;该allocate还发挥了心跳的作用(重复调用)。
3,处理授予application的container,以便可能在其中启动executor。
该类中的public方法要么是线程安全的,要么是有做同步处理的。
2.1,类构造函数
private[yarn] class YarnAllocator(
driverUrl: String,
driverRef: RpcEndpointRef,
conf: YarnConfiguration,
sparkConf: SparkConf,
amClient: AMRMClient[ContainerRequest],
appAttemptId: ApplicationAttemptId,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource],
resolver: SparkRackResolver,
clock: Clock = new SystemClock)
extends Logging
sparkConf: SparkConf 是用户创建SparkContext对象是指定的sparkConf对象,并加入了SparkSubmit 命令行参数。
三,Container资源申请
3.1,常用概念及其类
1,Container:实现子类ContainerPBImpl,是yarn 资源分配的基本单位,其对象由YarnResourceManager 返回给yarn客户端(客户端在ApplicationMaster中),代表资源分配给Application。
重要属性:
private ContainerId containerId = null;
private NodeId nodeId = null; //唯一标示一个节点包含(hostname,port信息),
private Resource resource = null; //表示Container中的资源包含CPU,Memory信息,
private Priority priority = null; //优先级
2,ContainerRequest:yarn客户端提交的资源申请封装器。
重要属性:
final Resource capability; //表示Container中的资源包含CPU,Memory信息,
final List<String> nodes; // 在指定的节点范围内申请
final List<String> racks; // 在指定的机架范围内申请
final Priority priority; //优先级
final boolean relaxLocality; // 是否放松对节点位置的要求。默认情况下yarn调度程序会尝试在请求的位置(特定机架特定节点)分配容器,但他们可能会放松约束,以加快满足分配限制。它们首先将约束放松到与请求节点相同的机架,然后放松到集群中的任何位置。
3.3,调用过程
源头是ApplicationMaster中的调用
3.3.1,allocateResources(): Unit
/**
* Request resources such that, if YARN gives us all we ask for, we'll have a number of containers
* equal to maxExecutors.
*
* Deal with any containers YARN has granted to us by possibly launching executors in them.
*
* This must be synchronized because variables read in this method are mutated by other methods.
*/
def allocateResources(): Unit = synchronized {
updateResourceRequests()
val progressIndicator = 0.1f
// Poll the ResourceManager. This doubles as a heartbeat if there are no pending container
// requests.
val allocateResponse = amClient.allocate(progressIndicator)
val allocatedContainers = allocateResponse.getAllocatedContainers()
allocatorBlacklistTracker.setNumClusterNodes(allocateResponse.getNumClusterNodes)
if (allocatedContainers.size > 0) {
logDebug(("Allocated containers: %d. Current executor count: %d. " +
"Launching executor count: %d. Cluster resources: %s.")
.format(
allocatedContainers.size,
runningExecutors.size,
numExecutorsStarting.get,
allocateResponse.getAvailableResources))
handleAllocatedContainers(allocatedContainers.asScala)
}
val completedContainers = allocateResponse.getCompletedContainersStatuses()
if (completedContainers.size > 0) {
logDebug("Completed %d containers".format(completedContainers.size))
processCompletedContainers(completedContainers.asScala)
logDebug("Finished processing %d completed containers. Current running executor count: %d."
.format(completedContainers.size, runningExecutors.size))
}
}
2, updateResourceRequests() 更新资源请求情况。
/**
* Update the set of container requests that we will sync with the RM based on the number of
* executors we have currently running and our target number of executors.
*
* Visible for testing.
*/
def updateResourceRequests(): Unit = {
val pendingAllocate = getPendingAllocate
val numPendingAllocate = pendingAllocate.size
val missing = targetNumExecutors - numPendingAllocate -
numExecutorsStarting.get - runningExecutors.size
logDebug(s"Updating resource requests, target: $targetNumExecutors, " +
s"pending: $numPendingAllocate, running: ${runningExecutors.size}, " +
s"executorsStarting: ${numExecutorsStarting.get}")
// Split the pending container request into three groups: locality matched list, locality
// unmatched list and non-locality list. Take the locality matched container request into
// consideration of container placement, treat as allocated containers.
// For locality unmatched and locality free container requests, cancel these container
// requests, since required locality preference has been changed, recalculating using
// container placement strategy.
val (localRequests, staleRequests, anyHostRequests) = splitPendingAllocationsByLocality(
hostToLocalTaskCounts, pendingAllocate)
if (missing > 0) {
if (log.isInfoEnabled()) {
var requestContainerMessage = s"Will request $missing executor container(s), each with " +
s"${resource.getVirtualCores} core(s) and " +
s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead)"
if (ResourceRequestHelper.isYarnResourceTypesAvailable() &&
executorResourceRequests.nonEmpty) {
requestContainerMessage ++= s" with custom resources: " + resource.toString
}
logInfo(requestContainerMessage)
}
// cancel "stale" requests for locations that are no longer needed
staleRequests.foreach { stale =>
amClient.removeContainerRequest(stale)
}
val cancelledContainers = staleRequests.size
if (cancelledContainers > 0) {
logInfo(s"Canceled $cancelledContainers container request(s) (locality no longer needed)")
}
// consider the number of new containers and cancelled stale containers available
val availableContainers = missing + cancelledContainers
// to maximize locality, include requests with no locality preference that can be cancelled
val potentialContainers = availableContainers + anyHostRequests.size
val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers(
potentialContainers, numLocalityAwareTasks, hostToLocalTaskCounts,
allocatedHostToContainersMap, localRequests)
val newLocalityRequests = new mutable.ArrayBuffer[ContainerRequest]
containerLocalityPreferences.foreach {
case ContainerLocalityPreferences(nodes, racks) if nodes != null =>
newLocalityRequests += createContainerRequest(resource, nodes, racks)
case _ =>
}
if (availableContainers >= newLocalityRequests.size) {
// more containers are available than needed for locality, fill in requests for any host
for (i <- 0 until (availableContainers - newLocalityRequests.size)) {
newLocalityRequests += createContainerRequest(resource, null, null)
}
} else {
val numToCancel = newLocalityRequests.size - availableContainers
// cancel some requests without locality preferences to schedule more local containers
anyHostRequests.slice(0, numToCancel).foreach { nonLocal =>
amClient.removeContainerRequest(nonLocal)
}
if (numToCancel > 0) {
logInfo(s"Canceled $numToCancel unlocalized container requests to resubmit with locality")
}
}
newLocalityRequests.foreach { request =>
amClient.addContainerRequest(request)
}
if (log.isInfoEnabled()) {
val (localized, anyHost) = newLocalityRequests.partition(_.getNodes() != null)
if (anyHost.nonEmpty) {
logInfo(s"Submitted ${anyHost.size} unlocalized container requests.")
}
localized.foreach { request =>
logInfo(s"Submitted container request for host ${hostStr(request)}.")
}
}
} else if (numPendingAllocate > 0 && missing < 0) {
val numToCancel = math.min(numPendingAllocate, -missing)
logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new desired " +
s"total $targetNumExecutors executors.")
// cancel pending allocate requests by taking locality preference into account
val cancelRequests = (staleRequests ++ anyHostRequests ++ localRequests).take(numToCancel)
cancelRequests.foreach(amClient.removeContainerRequest)
}
}
这里获取还需要申请的Excutor(Container)数量=总共需要的数量-已申请未满足的数量-正在启动的数量-正在运行的数量。
targetNumExecutors:spark application总共需要的的Excutor数量。
默认情况下
spark.dynamicAllocation.enabled=false
targetNumExecutors值为为
spark.executor.instances(spark-defaults.conf)
或者
–num-executors(spark-submit)
命令行参数指定的值,如果未指定就是默认值2;
生产集群一般会将spark.dynamicAllocation.enabled设置为true,允许根据工作负载调整在此application中注册的Executor的数量。此时Executor的数量由以下参数决定:
spark.dynamicAllocation.initialExecutors
spark.dynamicAllocation.maxExecutors
spark.dynamicAllocation.minExecutors
initialnum=max(spark.dynamicAllocation.initialExecutors , spark.executor.instances(spark-defaults.conf)或者–num-executors)
初始化时数量为initialnum,运行时会根据实际情况动态调整其数量,数量在spark.dynamicAllocation.minExecutors ~ spark.dynamicAllocation.maxExecutors 范围内。
val DEFAULT_NUMBER_EXECUTORS = 2
/**
* Getting the initial target number of executors depends on whether dynamic allocation is
* enabled.
* If not using dynamic allocation it gets the number of executors requested by the user.
*/
def getInitialTargetExecutorNumber(
conf: SparkConf,
numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = {
if (Utils.isDynamicAllocationEnabled(conf)) {
val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS)
val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf)
val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors,
s"initial executor number $initialNumExecutors must between min executor number " +
s"$minNumExecutors and max executor number $maxNumExecutors")
initialNumExecutors
} else {
conf.get(EXECUTOR_INSTANCES).getOrElse(numExecutors)
}
}
// A map to store preferred hostname and possible task numbers running on it.
private var hostToLocalTaskCounts: Map[String, Int] = Map.empty
hostToLocalTaskCounts类变量默认情况下是空值,其作用是标明一台主机上可能会有多少个task在执行,这些信息会用来决定申请Yarn机器中的哪些服务器?以及服务器中申请多少Container。具体一台主机上有多少个task 取决于Driver线程中客户代码的实现(读什么数据?数据块分布在哪些主机?),第一次申请资源时,Driver线程还未唤醒,所以hostToLocalTaskCounts=Map.empty ,后续该值的更改时通过后台通讯收到的Driver线程发送的信息来修改的。
参考: 就近计算.
private def splitPendingAllocationsByLocality(
hostToLocalTaskCount: Map[String, Int],
pendingAllocations: Seq[ContainerRequest]
): (Seq[ContainerRequest], Seq[ContainerRequest], Seq[ContainerRequest]) = {
val localityMatched = ArrayBuffer[ContainerRequest]()
val localityUnMatched = ArrayBuffer[ContainerRequest]()
val localityFree = ArrayBuffer[ContainerRequest]()
val preferredHosts = hostToLocalTaskCount.keySet
pendingAllocations.foreach { cr =>
val nodes = cr.getNodes
if (nodes == null) {
localityFree += cr
} else if (nodes.asScala.toSet.intersect(preferredHosts).nonEmpty) {
localityMatched += cr
} else {
localityUnMatched += cr
}
}
(localityMatched, localityUnMatched, localityFree)
}
splitPendingAllocationsByLocality根据业务代码对主机的需求信息将等待的ContainerRequest分为三类:
localRequests(localityMatched):有本地性要求且与用户需求匹配的
staleRequests(localityUnMatched):有本地性要求且与用户需求不匹配的
anyHostRequests(localityFree):无本地性要求的
如果还需要请求container:
对于staleRequests,客户端向yarn提交请求取消申请资源。
继续往下使用LocalityPreferredContainerPlacementStrategy类(Container首选位置放置策略),重新计算当前需要的Container,并生成新的ContainerRequest对象,并添加到资源请求列表中(暂未提交给Yarn,存储在AMRMClientImpl的ask集合对象中)。
为何要重新计算?
因为业务代码(RDD定义在Driver线程)和资源动态申请(Reporter线程)是在两个线程中并发执行的,一个job中stage执行结束,新的stage提交,就近计算的需求可能不一样hostToLocalTaskCounts会重新赋值,所以需要重新生成ContainerRequest对象。
如果等待的数量>0,且在运行+在启动+在等待的数量>目标数量,则取消多出来的等待请求,优先取消本地性匹配程度差的请求。
3.3.2,请求资源
val allocateResponse = amClient.allocate(progressIndicator)
@Override
public AllocateResponse allocate(float progressIndicator)
throws YarnException, IOException {
Preconditions.checkArgument(progressIndicator >= 0,
"Progress indicator should not be negative");
AllocateResponse allocateResponse = null;
List<ResourceRequest> askList = null;
List<ContainerId> releaseList = null;
AllocateRequest allocateRequest = null;
List<String> blacklistToAdd = new ArrayList<String>();
List<String> blacklistToRemove = new ArrayList<String>();
try {
synchronized (this) {
askList = new ArrayList<ResourceRequest>(ask.size());
for(ResourceRequest r : ask) {
// create a copy of ResourceRequest as we might change it while the
// RPC layer is using it to send info across
askList.add(ResourceRequest.newInstance(r.getPriority(),
r.getResourceName(), r.getCapability(), r.getNumContainers(),
r.getRelaxLocality(), r.getNodeLabelExpression()));
}
releaseList = new ArrayList<ContainerId>(release);
// optimistically clear this collection assuming no RPC failure
ask.clear();
release.clear();
blacklistToAdd.addAll(blacklistAdditions);
blacklistToRemove.addAll(blacklistRemovals);
ResourceBlacklistRequest blacklistRequest =
ResourceBlacklistRequest.newInstance(blacklistToAdd,
blacklistToRemove);
allocateRequest =
AllocateRequest.newInstance(lastResponseId, progressIndicator,
askList, releaseList, blacklistRequest);
// clear blacklistAdditions and blacklistRemovals before
// unsynchronized part
blacklistAdditions.clear();
blacklistRemovals.clear();
}
try {
allocateResponse = rmClient.allocate(allocateRequest);
} catch (ApplicationMasterNotRegisteredException e) {
LOG.warn("ApplicationMaster is out of sync with ResourceManager,"
+ " hence resyncing.");
synchronized (this) {
release.addAll(this.pendingRelease);
blacklistAdditions.addAll(this.blacklistedNodes);
for (Map<String, TreeMap<Resource, ResourceRequestInfo>> rr : remoteRequestsTable
.values()) {
for (Map<Resource, ResourceRequestInfo> capabalities : rr.values()) {
for (ResourceRequestInfo request : capabalities.values()) {
addResourceRequestToAsk(request.remoteRequest);
}
}
}
}
// re register with RM
registerApplicationMaster();
allocateResponse = allocate(progressIndicator);
return allocateResponse;
}
synchronized (this) {
// update these on successful RPC
clusterNodeCount = allocateResponse.getNumClusterNodes();
lastResponseId = allocateResponse.getResponseId();
clusterAvailableResources = allocateResponse.getAvailableResources();
if (!allocateResponse.getNMTokens().isEmpty()) {
populateNMTokens(allocateResponse.getNMTokens());
}
if (allocateResponse.getAMRMToken() != null) {
updateAMRMToken(allocateResponse.getAMRMToken());
}
if (!pendingRelease.isEmpty()
&& !allocateResponse.getCompletedContainersStatuses().isEmpty()) {
removePendingReleaseRequests(allocateResponse
.getCompletedContainersStatuses());
}
}
} finally {
// TODO how to differentiate remote yarn exception vs error in rpc
if(allocateResponse == null) {
// we hit an exception in allocate()
// preserve ask and release for next call to allocate()
synchronized (this) {
release.addAll(releaseList);
// requests could have been added or deleted during call to allocate
// If requests were added/removed then there is nothing to do since
// the ResourceRequest object in ask would have the actual new value.
// If ask does not have this ResourceRequest then it was unchanged and
// so we can add the value back safely.
// This assumes that there will no concurrent calls to allocate() and
// so we dont have to worry about ask being changed in the
// synchronized block at the beginning of this method.
for(ResourceRequest oldAsk : askList) {
if(!ask.contains(oldAsk)) {
ask.add(oldAsk);
}
}
blacklistAdditions.addAll(blacklistToAdd);
blacklistRemovals.addAll(blacklistToRemove);
}
}
}
return allocateResponse;
}
拿到Container之后,使用它。
最后获取已经结束了的Executor,发送关闭的通知。
3.3.3,handleAllocatedContainers
拿到containers资源后使用它启动Executors
/**
* Handle containers granted by the RM by launching executors on them.
*
* Due to the way the YARN allocation protocol works, certain healthy race conditions can result
* in YARN granting containers that we no longer need. In this case, we release them.
*
* Visible for testing.
*/
def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)
// Match incoming requests by host
val remainingAfterHostMatches = new ArrayBuffer[Container]
for (allocatedContainer <- allocatedContainers) {
matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,
containersToUse, remainingAfterHostMatches)
}
// Match remaining by rack. Because YARN's RackResolver swallows thread interrupts
// (see SPARK-27094), which can cause this code to miss interrupts from the AM, use
// a separate thread to perform the operation.
val remainingAfterRackMatches = new ArrayBuffer[Container]
if (remainingAfterHostMatches.nonEmpty) {
var exception: Option[Throwable] = None
val thread = new Thread("spark-rack-resolver") {
override def run(): Unit = {
try {
for (allocatedContainer <- remainingAfterHostMatches) {
val rack = resolver.resolve(allocatedContainer.getNodeId.getHost)
matchContainerToRequest(allocatedContainer, rack, containersToUse,
remainingAfterRackMatches)
}
} catch {
case e: Throwable =>
exception = Some(e)
}
}
}
thread.setDaemon(true)
thread.start()
try {
thread.join()
} catch {
case e: InterruptedException =>
thread.interrupt()
throw e
}
if (exception.isDefined) {
throw exception.get
}
}
// Assign remaining that are neither node-local nor rack-local
val remainingAfterOffRackMatches = new ArrayBuffer[Container]
for (allocatedContainer <- remainingAfterRackMatches) {
matchContainerToRequest(allocatedContainer, ANY_HOST, containersToUse,
remainingAfterOffRackMatches)
}
if (remainingAfterOffRackMatches.nonEmpty) {
logDebug(s"Releasing ${remainingAfterOffRackMatches.size} unneeded containers that were " +
s"allocated to us")
for (container <- remainingAfterOffRackMatches) {
internalReleaseContainer(container)
}
}
runAllocatedContainers(containersToUse)
logInfo("Received %d containers from YARN, launching executors on %d of them."
.format(allocatedContainers.size, containersToUse.size))
}
重点关注Executor启动
这里有一个线程池launchContainers,在其中启动ExecutorRunnable,其中封装了NodeManager客户端,Executor启动命令等。启动完毕后,更新状态。
3.3.3,ExecutorRunnable
构造方法
private[yarn] class ExecutorRunnable(
container: Option[Container],
conf: YarnConfiguration,
sparkConf: SparkConf,
masterAddress: String,
executorId: String,
hostname: String,
executorMemory: Int,
executorCores: Int,
appId: String,
securityMgr: SecurityManager,
localResources: Map[String, LocalResource],
resourceProfileId: Int)
run方法首先执行,启动NodeManager客户端,调用startContainer()
def run(): Unit = {
logDebug("Starting Executor Container")
nmClient = NMClient.createNMClient()
nmClient.init(conf)
nmClient.start()
startContainer()
}
def startContainer(): java.util.Map[String, ByteBuffer] = {
val ctx = Records.newRecord(classOf[ContainerLaunchContext])
.asInstanceOf[ContainerLaunchContext]
val env = prepareEnvironment().asJava
ctx.setLocalResources(localResources.asJava)
ctx.setEnvironment(env)
val credentials = UserGroupInformation.getCurrentUser().getCredentials()
val dob = new DataOutputBuffer()
credentials.writeTokenStorageToStream(dob)
ctx.setTokens(ByteBuffer.wrap(dob.getData()))
val commands = prepareCommand()
ctx.setCommands(commands.asJava)
ctx.setApplicationACLs(
YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr).asJava)
// If external shuffle service is enabled, register with the Yarn shuffle service already
// started on the NodeManager and, if authentication is enabled, provide it with our secret
// key for fetching shuffle files later
if (sparkConf.get(SHUFFLE_SERVICE_ENABLED)) {
val secretString = securityMgr.getSecretKey()
val secretBytes =
if (secretString != null) {
// This conversion must match how the YarnShuffleService decodes our secret
JavaUtils.stringToBytes(secretString)
} else {
// Authentication is not enabled, so just provide dummy metadata
ByteBuffer.allocate(0)
}
ctx.setServiceData(Collections.singletonMap("spark_shuffle", secretBytes))
}
// Send the start request to the ContainerManager
try {
nmClient.startContainer(container.get, ctx)
} catch {
case ex: Exception =>
throw new SparkException(s"Exception while starting container ${container.get.getId}" +
s" on host $hostname", ex)
}
}
startContainer方法中,拼接Executor启动命令,并通过客户端发送给NodeManager,NodeManager启动YarnCoarseGrainedExecutorBackend进程(Executor)。
Java进程启动命令大致如下:
四,YarnCoarseGrainedExecutorBackend进程
private[spark] class YarnCoarseGrainedExecutorBackend(
rpcEnv: RpcEnv,
driverUrl: String,
executorId: String,
bindAddress: String,
hostname: String,
cores: Int,
userClassPath: Seq[URL],
env: SparkEnv,
resourcesFile: Option[String],
resourceProfile: ResourceProfile)
extends CoarseGrainedExecutorBackend(
rpcEnv,
driverUrl,
executorId,
bindAddress,
hostname,
cores,
userClassPath,
env,
resourcesFile,
resourceProfile)
YarnCoarseGrainedExecutorBackend进程启动后先执行main方法,
启动了一个名为Executor的EndPoint。其生命周期为constructor -> onStart -> receive* -> onStop。
查看Executor其生命周期 的实现:
onStart中,执行了Executor注册行为。
receive
override def receive: PartialFunction[Any, Unit] = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,
resources = _resources)
driver.get.send(LaunchedExecutor(executorId))
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
taskResources(taskDesc.taskId) = taskDesc.resources
executor.launchTask(this, taskDesc)
}
case KillTask(taskId, _, interruptThread, reason) =>
if (executor == null) {
exitExecutor(1, "Received KillTask command but executor was null")
} else {
executor.killTask(taskId, interruptThread, reason)
}
case StopExecutor =>
stopping.set(true)
logInfo("Driver commanded a shutdown")
// Cannot shutdown here because an ack may need to be sent back to the caller. So send
// a message to self to actually do the shutdown.
self.send(Shutdown)
case Shutdown =>
stopping.set(true)
new Thread("CoarseGrainedExecutorBackend-stop-executor") {
override def run(): Unit = {
// executor.stop() will call `SparkEnv.stop()` which waits until RpcEnv stops totally.
// However, if `executor.stop()` runs in some thread of RpcEnv, RpcEnv won't be able to
// stop until `executor.stop()` returns, which becomes a dead-lock (See SPARK-14180).
// Therefore, we put this line in a new thread.
executor.stop()
}
}.start()
case UpdateDelegationTokens(tokenBytes) =>
logInfo(s"Received tokens of ${tokenBytes.length} bytes")
SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf)
}
receive是实现Executor核心功能的地方,重点查看执行task的情况。
Executor对象在注册Endpoint的时候已经初始化了,其中维护了一个线程池,用来执行TaskRunner。查看taskRunner的run方法的实现。
可以看出TaskRunner中会反序列出Task对象(ShuffleMapTask或者ResultTask),调用task.run方法执行具体的用户业务计算代码,计算RDD操作的结果,最后将执行结果返回给Driver端。