0
点赞
收藏
分享

微信扫一扫

spark源码系列02-sparkcontext_李孟_新浪博客

用的spark版本1.3+

spark源码系列02-sparkcontext_李孟_新浪博客_spark​​

1.SparkSubmit bin/spark-submit --class xx.WordCount  --master spark://ip:7077 --executor-memory 2g --total-executor-cores 4

2.WordCount new()

3.SparkContext 该方法创建一个ActorSystem createSparkEnv 

4.SparkContext  创建Driver的运行时环境,注意这里的numDriverCores是local模式下用来执行计算的cores的个数,如果不是本地模式的话就是0 SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master)) 为driver创建一个SparkEnv conf:SparkConf conf 是对SparkConf的复制 listenerBus 才用监听器模式维护各类事件处理 private[spark] def createDriverEnv(       conf: SparkConf,       isLocal: Boolean,       listenerBus: LiveListenerBus,       numCores: Int,       mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None) ...


5.SparkEnv createDriverEnv createDriverEnv 最终调用的是create方法创建SparkEnv create()

6.SparkEnv  createActorSyetem()  利用AkkaUtils这个工具类创建ActorSystem  AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager)

7.AkkaUtils startServiceOnPort()

8.AkkaUtils doCreateActorSystem

9.AkkaUtils  创建ActorSystem val actorSystem = ActorSystem(name, akkaConf) ActorSystem = apply()

10.SparkContext createTaskScheduler() 根据提交任务时指定url创建相应的TaskScheduler,创建一个TaskScheduler


11.SparkContext new() 创建spark的StandAlone模式 case SPARK_REGEX(sparkUrl) => val scheduler = new TaskSchedulerImpl(sc)

12.SparkContext  new() 创建一个SparkDeploySchedulerBackend  val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)

 13.SparkContext  initialize  调用initialize创建调度器  scheduler.initialize(backend)

 14.SparkContext new()  创建一个DAGScheduler,以后用来把DAG切分成Stage  dagScheduler = new DAGScheduler(this)

 15.SparkContext start()  启动taskScheduler  taskScheduler.start()

举报

相关推荐

0 条评论