0
点赞
收藏
分享

微信扫一扫

spark源码系列01-Woker启动Executor过程并向Driver注册_李孟_新浪博客

宁静的猫 2022-02-22 阅读 57

spark版本是1.3+

spark源码系列01-Woker启动Executor过程并向Driver注册_李孟_新浪博客_java​​


Woker启动Executor过程并向Driver注册时序图: 1.launchExecutor Master发送消息让Worker启动Executor

2.Worker new() Master 发送给Worker的消息,让Worker启动Execitor,LaunchExecutor是一个Case Class,里面封装以后要启动的Executor的信息 new ExecutorRunner 创建ExcutorRunner,将参数都放到其中,然后在通过他启动Executor 注册ExecutorID -> Executor放到一个map中,对应关系 executors(appId + "/" + execId) = manager

3.Worker start() 调用ExecutorRunner的start方法来启动Executor Java子进程 manager.start() 

4.ExecutorRunner new() 先创建一个线程对象,然后通过一个线程来启动一个java子进程 workerThread = new Thread("ExecutorRunner for " + fullId) {       override def run() { fetchAndRunExecutor() }     }

5.ExecutorRunner  workerThread.start()

6.ExecutorRunner  程序经常也会遇到进程挂掉的情况,一些状态没有正确的保存下来,这时候就需要在JVM关掉的时候执行一些清理现场的代码 Runtime.getRuntime.addShutdownHook(shutdownHook) shutdownHook = new Thread() {       override def run() {         killProcess(Some("Worker shutting down"))       }     }


7.ExecutorRunner fetchAndRunExecutor() 线程对象调用该方法启动java子进程

8.ExecutorRunner fetchAndRunExecutor() 保存参数 val builder = CommandUtils.buildProcessBuilder(appDesc.command, memory,         sparkHome.getAbsolutePath, substituteVariables)


9.ExecutorRunner  启动一个子进程 -> CoarseGrainedExecutorBackend的main方法 process = builder.start()


10.CoarseGrainedExecutorBackend main() Executor进程执行的入口


11.CoarseGrainedExecutorBackend run() 调用RUN方法 run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)

12.CoarseGrainedExecutorBackend preStart() CoarseGrainedExecutorBackend的生命周期方法 跟Driver建立连接 driver = context.actorSelection(driverUrl)

13.CoarseGrainedSchedulerBackend RegisterExecutor Executor向DriverActor发送消息  case RegisterExecutor(executorId, hostPort, cores, logUrls) ...

14.DriverActor CoarseGrainedExecutorBackend DriverActor发送给Executor的消息,告诉他已经注册成功 case RegisteredExecutor => ...


15.DriverActor CoarseGrainedSchedulerBackend 查看是否有任务需要提交(DriverActor -> Executor) makeOffers()

16.CoarseGrainedExecutorBackend new() 创建一个Executor实例,用来执行业务逻辑 executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)


17.Executor  初始化线程池 val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")

18.Executor Executor向DriverActor发送心跳,是否存活 startDriverHeartbeater()

举报

相关推荐

0 条评论