0
点赞
收藏
分享

微信扫一扫

[spark streaming]JobScheduler内幕实现和深度思考

生活记录馆 2022-12-16 阅读 135


本期内容:

1,JobScheduler内幕实现

2,JobScheduler深度思考

 

摘要:JobScheduler是Spark Streaming整个调度的核心,其地位相当于Spark Core上的调度中心中的DAGScheduler!


一、JobScheduler内幕实现

首先看看源码中对jobScheduler的解释:


/**
* This class schedules jobs to be run on Spark. It uses the JobGenerator to generate
* the jobs and runs them using a thread pool.
*/


我们在看看源码对JobGenerator的解释:

/**
* This class generates jobs from DStreams as well as drives checkpointing and cleaning
* up DStream metadata.
*/


 

问:JobScheduler是在什么地方生成的?

答:JobScheduler是在StreamingContext实例化时产生的,从StreamingContext的源码第183行中可以看出:

      private[streaming] val scheduler = new JobScheduler(this)

 

问:Spark Streaming为啥要设置两条线程? 
答:setMaster指定的两条线程是指程序运行的时候至少需要两条线程。一条线程用于接收数据,需要不断的循环。另一条是处理线程,是我们自己指定的线程数用于作业处理。如StreamingContext的start()方法所示:

 

​def​​​ ​​start()​​​​:​​​ ​​Unit ​​​​=​​​ ​​synchronized {​

​state ​​​​match​​​ ​​{​

​case​​​ ​​INITIALIZED ​​​​=​​​​>​

​startSite.set(DStream.getCreationSite())​

​StreamingContext.ACTIVATION​​​​_​​​​LOCK.synchronized {​

​StreamingContext.assertNoOtherContextIsActive()​

​try​​​ ​​{​

​validate()​

 

​// Start the streaming scheduler in a new thread, so that thread local properties​

​// like call sites and job groups can be reset without affecting those of the​

​// current thread.            //Spark Streaming内部启动的线程,用于整个作业的调度​

​ThreadUtils.runInNewThread(​​​​"streaming-start"​​​​) {​

​sparkContext.setCallSite(startSite.get)​

​sparkContext.clearJobGroup()​

​sparkContext.setLocalProperty(SparkContext.SPARK​​​​_​​​​JOB​​​​_​​​​INTERRUPT​​​​_​​​​ON​​​​_​​​​CANCEL, ​​​​"false"​​​​)​

​scheduler.start()​

​}​

​state ​​​​=​​​ ​​StreamingContextState.ACTIVE​

​} ​​​​catch​​​ ​​{​

​case​​​ ​​NonFatal(e) ​​​​=​​​​>​

​logError(​​​​"Error starting the context, marking it as stopped"​​​​, e)​

​scheduler.stop(​​​​false​​​​)​

​state ​​​​=​​​ ​​StreamingContextState.STOPPED​

​throw​​​ ​​e​

​}​

​StreamingContext.setActiveContext(​​​​this​​​​)​

​}​

​shutdownHookRef ​​​​=​​​ ​​ShutdownHookManager.addShutdownHook(​

​StreamingContext.SHUTDOWN​​​​_​​​​HOOK​​​​_​​​​PRIORITY)(stopOnShutdown)​

​// Registering Streaming Metrics at the start of the StreamingContext​

​assert(env.metricsSystem !​​​​=​​​ ​​null​​​​)​

​env.metricsSystem.registerSource(streamingSource)​

​uiTab.foreach(​​​​_​​​​.attach())​

​logInfo(​​​​"StreamingContext started"​​​​)​

​case​​​ ​​ACTIVE ​​​​=​​​​>​

​logWarning(​​​​"StreamingContext has already been started"​​​​)​

​case​​​ ​​STOPPED ​​​​=​​​​>​

​throw​​​ ​​new​​​ ​​IllegalStateException(​​​​"StreamingContext has already been stopped"​​​​)​

​} }​

进入JobScheduler源码:

​/**   JobScheduler负责逻辑层面的Job,并将其物理级别的运行在Spark之上​

​* This class schedules jobs to be run on Spark. It uses the JobGenerator to generate​

​* the jobs and runs them using a thread pool.​

​*/​

​private​​​​[streaming]​

​class​​​ ​​JobScheduler(​​​​val​​​ ​​ssc​​​​:​​​ ​​StreamingContext) ​​​​extends​​​ ​​Logging {​

 

​//通过JobSet集合,不断地存放接收到的Job​

​private​​​ ​​val​​​ ​​jobSets​​​​:​​​ ​​java.util.Map[Time, JobSet] ​​​​=​​​ ​​new​​​ ​​ConcurrentHashMap[Time, JobSet]  ​​​​//设置并行度,默认为1,想要修改作业运行的并行度在spark-conf或者应用程序中修改此值就中  为什么要修改并发度呢?  答:有时候应用程序中有多个输出,会导致多个job的执行,都是在一个batchDurations里面,job之间执行无需互相等待,所以可以通过设置此值并发执行!     不同的Batch,线程池中有很多的线程,也可以并发运行!​

​private​​​ ​​val​​​ ​​numConcurrentJobs ​​​​=​​​ ​​ssc.conf.getInt(​​​​"spark.streaming.concurrentJobs"​​​​, ​​​​1​​​​)  ​​​​//将逻辑级别的Job转化为物理级别的job就是通过newDaemonFixedThreadPool线程实现的​

​private​​​ ​​val​​​ ​​jobExecutor ​​​​=​

​ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, ​​​​"streaming-job-executor"​​​​)  ​​​​//实例化JobGenerator​

​private​​​ ​​val​​​ ​​jobGenerator ​​​​=​​​ ​​new​​​ ​​JobGenerator(​​​​this​​​​)​

​val​​​ ​​clock ​​​​=​​​ ​​jobGenerator.clock​

​val​​​ ​​listenerBus ​​​​=​​​ ​​new​​​ ​​StreamingListenerBus()​

​//下面三个是说在JobScheduler启动时实例化​

​// These two are created only when scheduler starts.​

​// eventLoop not being null means the scheduler has been started and not stopped​

​var​​​ ​​receiverTracker​​​​:​​​ ​​ReceiverTracker ​​​​=​​​ ​​null​

​// A tracker to track all the input stream information as well as processed record number​

​var​​​ ​​inputInfoTracker​​​​:​​​ ​​InputInfoTracker ​​​​=​​​ ​​null​

 

​private​​​ ​​var​​​ ​​eventLoop​​​​:​​​ ​​EventLoop[JobSchedulerEvent] ​​​​=​​​ ​​null​

 

​def​​​ ​​start()​​​​:​​​ ​​Unit ​​​​=​​​ ​​synchronized {​

​if​​​ ​​(eventLoop !​​​​=​​​ ​​null​​​​) ​​​​return​​​ ​​// scheduler has already been started​

 

​logDebug(​​​​"Starting JobScheduler"​​​​)​

​eventLoop ​​​​=​​​ ​​new​​​ ​​EventLoop[JobSchedulerEvent](​​​​"JobScheduler"​​​​) {​

​override​​​ ​​protected​​​ ​​def​​​ ​​onReceive(event​​​​:​​​ ​​JobSchedulerEvent)​​​​:​​​ ​​Unit ​​​​=​​​ ​​processEvent(event)​

 

​override​​​ ​​protected​​​ ​​def​​​ ​​onError(e​​​​:​​​ ​​Throwable)​​​​:​​​ ​​Unit ​​​​=​​​ ​​reportError(​​​​"Error in job scheduler"​​​​, e)​

​}​

​eventLoop.start()​

 

​// attach rate controllers of input streams to receive batch completion updates​

​for​​​ ​​{​

​inputDStream <- ssc.graph.getInputStreams​

​rateController <- inputDStream.rateController​

​} ssc.addStreamingListener(rateController)​

 

​listenerBus.start(ssc.sparkContext)​

​receiverTracker ​​​​=​​​ ​​new​​​ ​​ReceiverTracker(ssc)​

​inputInfoTracker ​​​​=​​​ ​​new​​​ ​​InputInfoTracker(ssc)​

​receiverTracker.start()​

​jobGenerator.start()​

​logInfo(​​​​"Started JobScheduler"​​​​)​

​}​

二、JobScheduler深度思考

下面从应用程序的输出方法print()入手,反推Job的生成过程:

1.点击应用程序中的print()方法后,跳入DStream的print():

​/**​

​* Print the first ten elements of each RDD generated in this DStream. This is an output​

​* operator, so this DStream will be registered as an output stream and there materialized.​

​*/​

​def​​​ ​​print()​​​​:​​​ ​​Unit ​​​​=​​​ ​​ssc.withScope {​

​print(​​​​10​​​​)​

​}​

2.再次点击上面红线标记的print()方法:

   

​/**​

​* Print the first num elements of each RDD generated in this DStream. This is an output​

​* operator, so this DStream will be registered as an output stream and there materialized.​

​*/​

​def​​​ ​​print(num​​​​:​​​ ​​Int)​​​​:​​​ ​​Unit ​​​​=​​​ ​​ssc.withScope {​

​def​​​ ​​foreachFunc​​​​:​​​ ​​(RDD[T], Time) ​​​​=​​​​> Unit ​​​​=​​​ ​​{​

​(rdd​​​​:​​​ ​​RDD[T], time​​​​:​​​ ​​Time) ​​​​=​​​​> {​

​val​​​ ​​firstNum ​​​​=​​​ ​​rdd.take(num + ​​​​1​​​​)​

​// scalastyle:off println​

​println(​​​​"-------------------------------------------"​​​​)​

​println(​​​​"Time: "​​​ ​​+ time)​

​println(​​​​"-------------------------------------------"​​​​)​

​firstNum.take(num).foreach(println)​

​if​​​ ​​(firstNum.length > num) println(​​​​"..."​​​​)​

​println()​

​// scalastyle:on println​

​}​

​}​

​foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps ​​​​=​​​ ​​false​​​​)​

​}​

从图中红色标记的代码可以得出:SparkStreaming最终执行的时候还是对RDD进行各种逻辑级别的操作!

3.再次点击图上的foreachRDD进入foreachRDD方法:

  

​/**​

​* Apply a function to each RDD in this DStream. This is an output operator, so​

​* 'this' DStream will be registered as an output stream and therefore materialized.​

​* @param foreachFunc foreachRDD function​

​* @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated​

​*                           in the `foreachFunc` to be displayed in the UI. If `false`, then​

​*                           only the scopes and callsites of `foreachRDD` will override those​

​*                           of the RDDs on the display.​

​*/​

​private​​​ ​​def​​​ ​​foreachRDD(​

​foreachFunc​​​​:​​​ ​​(RDD[T], Time) ​​​​=​​​​> Unit,​

​displayInnerRDDOps​​​​:​​​ ​​Boolean)​​​​:​​​ ​​Unit ​​​​=​​​ ​​{​

​new​​​ ​​ForEachDStream(​​​​this​​​​,​

​context.sparkContext.clean(foreachFunc, ​​​​false​​​​), displayInnerRDDOps).register()​

​}​

4.点击上图的ForEachDStream进入ForEachDStream类并找到了generateJob方法:

   

​/**​

​* An internal DStream used to represent output operations like DStream.foreachRDD.​

​* @param parent        Parent DStream​

​* @param foreachFunc   Function to apply on each RDD generated by the parent DStream​

​* @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated​

​*                           by `foreachFunc` will be displayed in the UI; only the scope and​

​*                           callsite of `DStream.foreachRDD` will be displayed.​

​*/​

​private​​​​[streaming]​

​class​​​ ​​ForEachDStream[T​​​​:​​​ ​​ClassTag] (​

​parent​​​​:​​​ ​​DStream[T],​

​foreachFunc​​​​:​​​ ​​(RDD[T], Time) ​​​​=​​​​> Unit,​

​displayInnerRDDOps​​​​:​​​ ​​Boolean​

​) ​​​​extends​​​ ​​DStream[Unit](parent.ssc) {​

 

​override​​​ ​​def​​​ ​​dependencies​​​​:​​​ ​​List[DStream[​​​​_​​​​]] ​​​​=​​​ ​​List(parent)​

 

​override​​​ ​​def​​​ ​​slideDuration​​​​:​​​ ​​Duration ​​​​=​​​ ​​parent.slideDuration​

 

​override​​​ ​​def​​​ ​​compute(validTime​​​​:​​​ ​​Time)​​​​:​​​ ​​Option[RDD[Unit]] ​​​​=​​​ ​​None​

​//根据时间间隔不断的产生Job​

​override​​​ ​​def​​​ ​​generateJob(time​​​​:​​​ ​​Time)​​​​:​​​ ​​Option[Job] ​​​​=​​​ ​​{​

​parent.getOrCompute(time) ​​​​match​​​ ​​{​

​case​​​ ​​Some(rdd) ​​​​=​​​​>​

​val​​​ ​​jobFunc ​​​​=​​​ ​​() ​​​​=​​​​> createRDDWithLocalProperties(time, displayInnerRDDOps) {          ​​​​//基于时间生成的RDD,由于是输出,所以是最后一个RDD,接下来我们只要找出哪儿调用ForEachDStream的generateJob方法,就能知道job最终的生成​

​foreachFunc(rdd, time)​

​}​

​Some(​​​​new​​​ ​​Job(time, jobFunc))​

​case​​​ ​​None ​​​​=​​​​> None​

​}​

​}​

​}​

5.上一讲中我们得出了如下的流程:

    streamingcontext.start-->jobscheduler.start-->receiverTracker.start()-->JobGenterator.start()-->EventLoop-->processEvent()-->generateJobs()-->jobScheduler.receiverTracker.allocateBlocksToBatch(time)-->graph.generateJobs(time)  

 其中最后的graph.generateJobs是DSTreamGraph的方法,进入之:

   

​def​​​ ​​generateJobs(time​​​​:​​​ ​​Time)​​​​:​​​ ​​Seq[Job] ​​​​=​​​ ​​{​

​logDebug(​​​​"Generating jobs for time "​​​ ​​+ time)​

​val​​​ ​​jobs ​​​​=​​​ ​​this​​​​.synchronized {​

​//此时的outputStream就是forEachDStream​

​outputStreams.flatMap { outputStream ​​​​=​​​​>​

​val​​​ ​​jobOption ​​​​=​​​ ​​outputStream.generateJob(time)​

​jobOption.foreach(​​​​_​​​​.setCallSite(outputStream.creationSite))​

​jobOption​

​}​

​}​

​logDebug(​​​​"Generated "​​​ ​​+ jobs.length + ​​​​" jobs for time "​​​ ​​+ time)​

​jobs​

​}​

private val outputStreams = new ArrayBuffer[DStream[_]]()通过查看DStream的子类继承结构和上面的ForEachDStream的generateJob方法,得出DStream的子类中只有ForEachDStream override了DStream的generateJob!最终得出结论:

真正Job的生成是通过ForeachDStream的generateJob来生成的,此时的job是逻辑级别的,真正被物理级别的调用是在JobGenerator中

​/** Generate jobs and perform checkpoint for the given `time`.  */​

​private​​​ ​​def​​​ ​​generateJobs(time​​​​:​​​ ​​Time) {​

​// Set the SparkEnv in this thread, so that job generation code can access the environment​

​// Example: BlockRDDs are created in this thread, and it needs to access BlockManager​

​// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.​

​SparkEnv.set(ssc.env)​

​Try {​

​jobScheduler.receiverTracker.allocateBlocksToBatch(time) ​​​​// allocate received blocks to batch​

​graph.generateJobs(time) ​​​​// generate jobs using allocated block​

​} ​​​​match​​​ ​​{​

​case​​​ ​​Success(jobs) ​​​​=​​​​>​

​val​​​ ​​streamIdToInputInfos ​​​​=​​​ ​​jobScheduler.inputInfoTracker.getInfo(time)​

​jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))​

​case​​​ ​​Failure(e) ​​​​=​​​​>​

​jobScheduler.reportError(​​​​"Error generating jobs for time "​​​ ​​+ time, e)​

​}​

​eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater ​​​​=​​​ ​​false​​​​))​

​}​

进入jobScheduler.submitJobSet方法:

​<​​​​strong​​​​>​​​​//将逻辑级别的Job转化为物理级别的job就是通过newDaemonFixedThreadPool线程实现的</strong>​

​private​​​ ​​val​​​ ​​jobExecutor ​​​​=​

​ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, ​​​​"streaming-job-executor"​​​​)​

​def​​​ ​​submitJobSet(jobSet​​​​:​​​ ​​JobSet) {​

​if​​​ ​​(jobSet.jobs.isEmpty) {​

​logInfo(​​​​"No jobs added for time "​​​ ​​+ jobSet.time)​

​} ​​​​else​​​ ​​{​

​listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))​

​jobSets.put(jobSet.time, jobSet)​

​jobSet.jobs.foreach(job ​​​​=​​​​> jobExecutor.execute(​​​​new​​​ ​​JobHandler(job)))​

​logInfo(​​​​"Added jobs for time "​​​ ​​+ jobSet.time)​

​}​

​}​

至此,整个job的生成、执行就非常清晰了,最后总结如下:

ReceiverTracker启动后会导致运行在Executor端的Receiver启动并且接收数据,ReceiverTracker会记录Receiver接收到的数据meta信息,  

JobGenerator的启动导致每隔BatchDuration,就调用DStreamGraph生成RDD Graph,并生成Job,

JobScheduler中的线程池来提交封装的JobSet对象(时间值,Job,数据源的meta)。Job中封装了业务逻辑,导致最后一个RDD的action被触发,

被DAGScheduler真正调度在Spark集群上执行该Job。

举报

相关推荐

0 条评论