spark版本是1.3+
Woker启动Executor过程并向Driver注册时序图: 1.launchExecutor Master发送消息让Worker启动Executor
2.Worker new() Master 发送给Worker的消息,让Worker启动Execitor,LaunchExecutor是一个Case Class,里面封装以后要启动的Executor的信息 new ExecutorRunner 创建ExcutorRunner,将参数都放到其中,然后在通过他启动Executor 注册ExecutorID -> Executor放到一个map中,对应关系 executors(appId + "/" + execId) = manager
3.Worker start() 调用ExecutorRunner的start方法来启动Executor Java子进程 manager.start()
4.ExecutorRunner new() 先创建一个线程对象,然后通过一个线程来启动一个java子进程 workerThread = new Thread("ExecutorRunner for " + fullId) { override def run() { fetchAndRunExecutor() } }
5.ExecutorRunner workerThread.start()
6.ExecutorRunner 程序经常也会遇到进程挂掉的情况,一些状态没有正确的保存下来,这时候就需要在JVM关掉的时候执行一些清理现场的代码 Runtime.getRuntime.addShutdownHook(shutdownHook) shutdownHook = new Thread() { override def run() { killProcess(Some("Worker shutting down")) } }
7.ExecutorRunner fetchAndRunExecutor() 线程对象调用该方法启动java子进程
8.ExecutorRunner fetchAndRunExecutor() 保存参数 val builder = CommandUtils.buildProcessBuilder(appDesc.command, memory, sparkHome.getAbsolutePath, substituteVariables)
9.ExecutorRunner 启动一个子进程 -> CoarseGrainedExecutorBackend的main方法 process = builder.start()
10.CoarseGrainedExecutorBackend main() Executor进程执行的入口
11.CoarseGrainedExecutorBackend run() 调用RUN方法 run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
12.CoarseGrainedExecutorBackend preStart() CoarseGrainedExecutorBackend的生命周期方法 跟Driver建立连接 driver = context.actorSelection(driverUrl)
13.CoarseGrainedSchedulerBackend RegisterExecutor Executor向DriverActor发送消息 case RegisterExecutor(executorId, hostPort, cores, logUrls) ...
14.DriverActor CoarseGrainedExecutorBackend DriverActor发送给Executor的消息,告诉他已经注册成功 case RegisteredExecutor => ...
15.DriverActor CoarseGrainedSchedulerBackend 查看是否有任务需要提交(DriverActor -> Executor) makeOffers()
16.CoarseGrainedExecutorBackend new() 创建一个Executor实例,用来执行业务逻辑 executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
17.Executor 初始化线程池 val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")
18.Executor Executor向DriverActor发送心跳,是否存活 startDriverHeartbeater()