0
点赞
收藏
分享

微信扫一扫

Spark2.11 任务划分以及执行流程

诗与泡面 2022-03-30 阅读 68

1、spark Application中可以由不同的action触发job,也就是说一个Application里可以有很多的job,每个job是由一个或者多个stage构成的,后面的stage依赖前面的stage,只有前面依赖的stage计算完成后面的stage才会计算; 2、stage划分的就是根据宽依赖如:reduceByKey、groupByKey等前后就需要划分为两个stage; 3、由action(如collect)导致了SparkContext.runJob的执行,最终导致了DAGScheduler中的submitJob的执行向自己发送 JobSubmitted消息(解耦合),

Spark2.11 任务划分以及执行流程_先进先出

当自己接收到JobSubmitted消息后出发handleJobSubmitted方法的执行,在其方法中会创建finalStage;

Spark2.11 任务划分以及执行流程_spark_02

利用createResultStage方法找到parents

Spark2.11 任务划分以及执行流程_先进先出_03

在寻找parents(List<Stage>)的过程中采用广度优先的算法。

Spark2.11 任务划分以及执行流程_spark_04


Spark2.11 任务划分以及执行流程_任务分配_05

计算好依赖链条后开始执行submitStage开始计算最左侧的stage(MissStage)

Spark2.11 任务划分以及执行流程_先进先出_06

这里递归从后向前查找依赖stage,找到第一个stage后执行submitMissingTasks开始计算

Spark2.11 任务划分以及执行流程_任务分配_07

submitMissingTasks里需要计算任务本地性,是根据rdd.getPreferedLocations来计算数据本地性,也说明在任务分配之前就已经确定了任务发往哪个executor了。

Spark2.11 任务划分以及执行流程_任务分配_08

Spark2.11 任务划分以及执行流程_spark_09


然后调用taskScheduleImplement的submitTasks方法

Spark2.11 任务划分以及执行流程_任务分配_10


这里会创建taskSetManager加入调度器(先进先出、公平调度)中,然后开始调用CoarseGrainedSchedulerBackend的reviveOffers方法开始调度。

Spark2.11 任务划分以及执行流程_任务分配_11

然后DAGScheduler开始调度任务执行。

Spark2.11 任务划分以及执行流程_spark_12

CoarseGrainedSchedulerBackend的reviveOffers方法是给自己发送消息ReviveOffers,当接收到消息后会执行makeOffers方法执行launchTask发送任务给Executor.

Spark2.11 任务划分以及执行流程_先进先出_13

当Executor接收到任务后会通过线程池复用的方式执行任务。

当executor执行到runTask时会有ShuffleMapTask和ResultTask,我们以ShuffleMapTask为例看看最后是怎样执行到RDD的compute方法的。

Spark2.11 任务划分以及执行流程_先进先出_14

会执行rdd.iterator

Spark2.11 任务划分以及执行流程_spark_15

如果存储基本不是NONE就执行getOrCompute

Spark2.11 任务划分以及执行流程_spark_16


Spark2.11 任务划分以及执行流程_spark_17

程序会根据运行时的RDD来执行对应的compute方法。

Spark2.11 任务划分以及执行流程_spark_18

Spark2.11 任务划分以及执行流程_先进先出_19

Spark2.11 任务划分以及执行流程_先进先出_20




1、spark Application中可以由不同的action触发job,也就是说一个Application里可以有很多的job,每个job是由一个或者多个stage构成的,后面的stage依赖前面的stage,只有前面依赖的stage计算完成后面的stage才会计算; 2、stage划分的就是根据宽依赖如:reduceByKey、groupByKey等前后就需要划分为两个stage; 3、由action(如collect)导致了SparkContext.runJob的执行,最终导致了DAGScheduler中的submitJob的执行向自己发送 JobSubmitted消息(解耦合),

Spark2.11 任务划分以及执行流程_先进先出

当自己接收到JobSubmitted消息后出发handleJobSubmitted方法的执行,在其方法中会创建finalStage;

Spark2.11 任务划分以及执行流程_spark_02

利用createResultStage方法找到parents

Spark2.11 任务划分以及执行流程_先进先出_03

在寻找parents(List<Stage>)的过程中采用广度优先的算法。

Spark2.11 任务划分以及执行流程_spark_04


Spark2.11 任务划分以及执行流程_任务分配_05

计算好依赖链条后开始执行submitStage开始计算最左侧的stage(MissStage)

Spark2.11 任务划分以及执行流程_先进先出_06

这里递归从后向前查找依赖stage,找到第一个stage后执行submitMissingTasks开始计算

Spark2.11 任务划分以及执行流程_任务分配_07

submitMissingTasks里需要计算任务本地性,是根据rdd.getPreferedLocations来计算数据本地性,也说明在任务分配之前就已经确定了任务发往哪个executor了。

Spark2.11 任务划分以及执行流程_任务分配_08

Spark2.11 任务划分以及执行流程_spark_09


然后调用taskScheduleImplement的submitTasks方法

Spark2.11 任务划分以及执行流程_任务分配_10


这里会创建taskSetManager加入调度器(先进先出、公平调度)中,然后开始调用CoarseGrainedSchedulerBackend的reviveOffers方法开始调度。

Spark2.11 任务划分以及执行流程_任务分配_11

然后DAGScheduler开始调度任务执行。

Spark2.11 任务划分以及执行流程_spark_12

CoarseGrainedSchedulerBackend的reviveOffers方法是给自己发送消息ReviveOffers,当接收到消息后会执行makeOffers方法执行launchTask发送任务给Executor.

Spark2.11 任务划分以及执行流程_先进先出_13

当Executor接收到任务后会通过线程池复用的方式执行任务。

当executor执行到runTask时会有ShuffleMapTask和ResultTask,我们以ShuffleMapTask为例看看最后是怎样执行到RDD的compute方法的。

Spark2.11 任务划分以及执行流程_先进先出_14

会执行rdd.iterator

Spark2.11 任务划分以及执行流程_spark_15

如果存储基本不是NONE就执行getOrCompute

Spark2.11 任务划分以及执行流程_spark_16


Spark2.11 任务划分以及执行流程_spark_17

程序会根据运行时的RDD来执行对应的compute方法。

Spark2.11 任务划分以及执行流程_spark_18

Spark2.11 任务划分以及执行流程_先进先出_19

Spark2.11 任务划分以及执行流程_先进先出_20




举报

相关推荐

0 条评论