0
点赞
收藏
分享

微信扫一扫

[spark streaming]Job动态生成和深度思考


本期内容:

1、Spark Streaming Job生成深度思考

2、Spark Streaming Job生成源码解析

一、Spark Streaming Job生成深度思考

源码解析:

1. 在大数据处理场景中,如果不是流处理的话,一般会有定时任务。例如10分钟触发一次,1个小时触发一次,这就是做流处理的感觉,一切不是流处理,或者与流处理无关的数据都将是没有价值的数据,以前做批处理的时候其实也是隐形的在做流处理,一切处理终将被流处理统一!!

DStreams其实有三种类型:

第一种是输入的DStrams,可以有各种不同的数据来源构建的Stream,例如来自Socket,Kafka,Flume等;

第二种是输出的DStrams,outputStreams 是逻辑级别的Action,由于还是Spark Streaming框架级别的,底层还是会翻译成物理级别的Action,就是ADD的Action;

第三种是Transforms操作从一种DStream转变为另一种DStream,即基于其他DStream产生的。其中DStreamGraph 类记录了数据来源的DStream,和输出类型的DStream 

产生DStreams有两种方式:

DSTreams要么基于数据源产生,要么基于其它的DStreams产生;

SparkStreaming流处理基于时间作为触发器,Storm基于事件作为触发器,基于一个又一个的Record!!

 

二、Spark Streaming Job生成源码解析

Spark 作业动态生成三大核心: 
JobGenerator: 负责Job生成。 基于源数据生成;
JobSheduler: 负责Job调度。 基于源数据调度;
ReceiverTracker: 获取元数据。

JobGenerator和ReceiverTracker是JobScheduler的成员,从JobScheduler的start方法可以看出!!

跟踪源码得到如下运行流程:streamingcontext.start-->jobscheduler.start-->receiverTracker.start()-->JobGenterator.start()-->EventLoop-->processEvent()-->generateJobs()-->jobScheduler.receiverTracker.allocateBlocksToBatch(time)-->graph.generateJobs(time)-->jobScheduler.inputInfoTracker.getInfo(time)-->jobScheduler.submitJobSet-->startFirstTime()-->graph.start()-->timer.start()


具体逻辑图如下:

说明:此图引用了spark版本定制班成员​​http://lqding.blog.51cto.com/9123978/1772958​​的成果,表示深深的感谢

 

[spark streaming]Job动态生成和深度思考_Streaming

下面结合调式过程和job生成结构图,逐步追踪源码:

1.val ssc = new StreamingContext(conf, Seconds(5))

   ssc.start()程序运行的入口;

2.进入jobscheduler.start()

​def​​​ ​​start()​​​​:​​​ ​​Unit ​​​​=​​​ ​​synchronized {    ​​​​if​​​ ​​(eventLoop !​​​​=​​​ ​​null​​​​) ​​​​return​​​ ​​// scheduler has already been started​

​logDebug(​​​​"Starting JobScheduler"​​​​)​

​eventLoop ​​​​=​​​ ​​new​​​ ​​EventLoop[JobSchedulerEvent](​​​​"JobScheduler"​​​​) {​

​override​​​ ​​protected​​​ ​​def​​​ ​​onReceive(event​​​​:​​​ ​​JobSchedulerEvent)​​​​:​​​ ​​Unit ​​​​=​​​ ​​processEvent(event)​

 

​override​​​ ​​protected​​​ ​​def​​​ ​​onError(e​​​​:​​​ ​​Throwable)​​​​:​​​ ​​Unit ​​​​=​​​ ​​reportError(​​​​"Error in job scheduler"​​​​, e)​

​}    ​​​​//启动消息循环处理线程​

​eventLoop.start()​

 

​// attach rate controllers of input streams to receive batch completion updates​

​for​​​ ​​{​

​inputDStream <- ssc.graph.getInputStreams​

​rateController <- inputDStream.rateController​

​} ssc.addStreamingListener(rateController)​

 

​listenerBus.start(ssc.sparkContext)​

​receiverTracker ​​​​=​​​ ​​new​​​ ​​ReceiverTracker(ssc)​

​inputInfoTracker ​​​​=​​​ ​​new​​​ ​​InputInfoTracker(ssc)    ​​​​//接收源数据​

​receiverTracker.start()    ​​​​//基于源数据生成Job​

​jobGenerator.start()​

​logInfo(​​​​"Started JobScheduler"​​​​)​

​}​

3.进入receiverTracker.start()方法:

​/** Start the endpoint and receiver execution thread. */​

​def​​​ ​​start()​​​​:​​​ ​​Unit ​​​​=​​​ ​​synchronized {​

​if​​​ ​​(isTrackerStarted) {​

​throw​​​ ​​new​​​ ​​SparkException(​​​​"ReceiverTracker already started"​​​​)​

​}​

 

​if​​​ ​​(!receiverInputStreams.isEmpty) {​

​endpoint ​​​​=​​​ ​​ssc.env.rpcEnv.setupEndpoint(        ​​​​//ReceiverTracker接收到源数据后保存在ReceiverTrackerEndpoint中​

​"ReceiverTracker"​​​​, ​​​​new​​​ ​​ReceiverTrackerEndpoint(ssc.env.rpcEnv))​

​if​​​ ​​(!skipReceiverLaunch) launchReceivers()​

​logInfo(​​​​"ReceiverTracker started"​​​​)​

​trackerState ​​​​=​​​ ​​Started​

​}​

​}​

4.进入jobGenerator.start()方法:

​//checkpoint的初始化操作,实例化并启动消息循环体EventLoop,开启定时生成Job的定时器。/** Start generation of jobs */​

​def​​​ ​​start()​​​​:​​​ ​​Unit ​​​​=​​​ ​​synchronized {​

​if​​​ ​​(eventLoop !​​​​=​​​ ​​null​​​​) ​​​​return​​​ ​​// generator has already been started​

 

​// Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.​

​// See SPARK-10125​

​checkpointWriter​

 

​eventLoop ​​​​=​​​ ​​new​​​ ​​EventLoop[JobGeneratorEvent](​​​​"JobGenerator"​​​​) {      ​​​​//匿名内部类重写onReceive方法​

​override​​​ ​​protected​​​ ​​def​​​ ​​onReceive(event​​​​:​​​ ​​JobGeneratorEvent)​​​​:​​​ ​​Unit ​​​​=​​​ ​​processEvent(event)​

 

​override​​​ ​​protected​​​ ​​def​​​ ​​onError(e​​​​:​​​ ​​Throwable)​​​​:​​​ ​​Unit ​​​​=​​​ ​​{​

​jobScheduler.reportError(​​​​"Error in job generator"​​​​, e)​

​}​

​}​

​eventLoop.start()​

 

​if​​​ ​​(ssc.isCheckpointPresent) {​

​restart()​

​} ​​​​else​​​ ​​{​

​startFirstTime()​

​}​

​}​

首先看EventLoop类

​/**​

​* An event loop to receive events from the caller and process all events in the event thread. It​

​* will start an exclusive event thread to process all events.​

​*​

​* Note: The event queue will grow indefinitely. So subclasses should make sure `onReceive` can​

​* handle events in time to avoid the potential OOM. * EvenLoop类中有,后台线程从队列中获取消息,然后调用onReceive方法对该消息进行处理,这里的onReceive方法即匿名内部类中重写onReceive方法的processEvent方法。​

​*/​

​private​​​​[spark] ​​​​abstract​​​ ​​class​​​ ​​EventLoop[E](name​​​​:​​​ ​​String) ​​​​extends​​​ ​​Logging {​

​//存储消息的LinkedBlockingDeque和后台线程​

​private​​​ ​​val​​​ ​​eventQueue​​​​:​​​ ​​BlockingQueue[E] ​​​​=​​​ ​​new​​​ ​​LinkedBlockingDeque[E]()​

 

​private​​​ ​​val​​​ ​​stopped ​​​​=​​​ ​​new​​​ ​​AtomicBoolean(​​​​false​​​​)​

 

​private​​​ ​​val​​​ ​​eventThread ​​​​=​​​ ​​new​​​ ​​Thread(name) {​

​setDaemon(​​​​true​​​​)​​​​//后台线程​

 

​override​​​ ​​def​​​ ​​run()​​​​:​​​ ​​Unit ​​​​=​​​ ​​{​

​try​​​ ​​{​

​while​​​ ​​(!stopped.get) {          ​​​​//后台线程从队列中获取消息​

​val​​​ ​​event ​​​​=​​​ ​​eventQueue.take()​

​try​​​ ​​{​

​onReceive(event)​​​​//对消息进行处理,这里的onReceive方法即匿名内部类中重写onReceive方法的processEvent方法​

​} ​​​​catch​​​ ​​{​

​case​​​ ​​NonFatal(e) ​​​​=​​​​> {​

​try​​​ ​​{​

​onError(e)​

​} ​​​​catch​​​ ​​{​

​case​​​ ​​NonFatal(e) ​​​​=​​​​> logError(​​​​"Unexpected error in "​​​ ​​+ name, e)​

​}​

​}​

​}​

​}​

​} ​​​​catch​​​ ​​{​

​case​​​ ​​ie​​​​:​​​ ​​InterruptedException ​​​​=​​​​> ​​​​// exit even if eventQueue is not empty​

​case​​​ ​​NonFatal(e) ​​​​=​​​​> logError(​​​​"Unexpected error in "​​​ ​​+ name, e)​

​}​

​}​

 

​}​

 

​def​​​ ​​start()​​​​:​​​ ​​Unit ​​​​=​​​ ​​{​

​if​​​ ​​(stopped.get) {​

​throw​​​ ​​new​​​ ​​IllegalStateException(name + ​​​​" has already been stopped"​​​​)​

​}​

​// Call onStart before starting the event thread to make sure it happens before onReceive​

​onStart()​

​eventThread.start()​

​}​

进入最重要的processEvent方法:

​//processEvent方法是对消息类型进行模式匹配,然后路由到对应处理该消息的方法中。消息的处理一般是发给另外一个线程来处理的,消息循环器不处理耗时的业务逻辑/** Processes all events */​

​private​​​ ​​def​​​ ​​processEvent(event​​​​:​​​ ​​JobGeneratorEvent) {​

​logDebug(​​​​"Got event "​​​ ​​+ event)​

​event ​​​​match​​​ ​​{​

​case​​​ ​​GenerateJobs(time) ​​​​=​​​​> generateJobs(time)​

​case​​​ ​​ClearMetadata(time) ​​​​=​​​​> clearMetadata(time)​

​case​​​ ​​DoCheckpoint(time, clearCheckpointDataLater) ​​​​=​​​​>​

​doCheckpoint(time, clearCheckpointDataLater)​

​case​​​ ​​ClearCheckpointData(time) ​​​​=​​​​> clearCheckpointData(time)​

​}​

​}​

进入generateJobs方法:

​在获取到数据后调用DStreamGraph的generateJobs方法来生成Job,具体如下步骤所示:  ​​​​/** Generate jobs and perform checkpoint for the given `time`.  */​

​private​​​ ​​def​​​ ​​generateJobs(time​​​​:​​​ ​​Time) {​

​// Set the SparkEnv in this thread, so that job generation code can access the environment​

​// Example: BlockRDDs are created in this thread, and it needs to access BlockManager​

​// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.​

​SparkEnv.set(ssc.env)​

​Try {      ​​​​//第一步:获取当前时间段里面的数据。根据分配的时间来分配具体要处理的数据。​

​jobScheduler.receiverTracker.allocateBlocksToBatch(time) ​​​​// allocate received blocks to batch      //第二步:生成Job,获取RDD的DAG依赖关系。在此基于DStream生成了RDD实例​

​graph.generateJobs(time) ​​​​// generate jobs using allocated block​

​} ​​​​match​​​ ​​{​

​case​​​ ​​Success(jobs) ​​​​=​​​​>        ​​​​//第三步:获取streamIdToInputInfos的信息。BacthDuractions要处理的数据,以及我们要处理的业务逻辑​

​val​​​ ​​streamIdToInputInfos ​​​​=​​​ ​​jobScheduler.inputInfoTracker.getInfo(time)        ​​​​//第四步:将生成的Job交给jobScheduler​

​jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))​

​case​​​ ​​Failure(e) ​​​​=​​​​>​

​jobScheduler.reportError(​​​​"Error generating jobs for time "​​​ ​​+ time, e)​

​}    ​​​​//第五步:进行checkpoint​

​eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater ​​​​=​​​ ​​false​​​​))​

​}​

进入DStreamGraph的generateJobs方法:

​def​​​ ​​generateJobs(time​​​​:​​​ ​​Time)​​​​:​​​ ​​Seq[Job] ​​​​=​​​ ​​{​

​logDebug(​​​​"Generating jobs for time "​​​ ​​+ time)​

​val​​​ ​​jobs ​​​​=​​​ ​​this​​​​.synchronized {​

​outputStreams.flatMap { outputStream ​​​​=​​​​>      ​​​​//这里的outputStreams是整个DStream中的最后一个DStream。outputStream.generateJob(time)类似于RDD中从后往前推​

​val​​​ ​​jobOption ​​​​=​​​ ​​outputStream.generateJob(time)​

​jobOption.foreach(​​​​_​​​​.setCallSite(outputStream.creationSite))​

​jobOption​

​}​

​}​

​logDebug(​​​​"Generated "​​​ ​​+ jobs.length + ​​​​" jobs for time "​​​ ​​+ time)​

​jobs​

​}​

  

进入onReceive方法:

​/**​

​* Invoked in the event thread when polling events from the event queue.​

​*​

​* Note: Should avoid calling blocking actions in `onReceive`, or the event thread will be blocked​

​* and cannot process events in time. If you want to call some blocking actions, run them in​

​* another thread.<br>不断的从消息队列中获得消息,一旦获得消息就会处理。 <br>不要在onReceive中添加阻塞的消息,如果这样的话会不断的阻塞消息。 <br>消息循环器一般都不会处理具体的业务逻辑,一般消息循环器发现消息以后都会将消息路由给其他的线程去处理​

​*/​

​protected​​​ ​​def​​​ ​​onReceive(event​​​​:​​​ ​​E)​​​​:​​​ ​​Unit​

其中submitJobSet方法,只是把JobSet放到ConcurrentHashMap中,把Job封装为JobHandler提交到jobExecutor线程池中

​def​​​ ​​submitJobSet(jobSet​​​​:​​​ ​​JobSet) {​

​if​​​ ​​(jobSet.jobs.isEmpty) {​

​logInfo(​​​​"No jobs added for time "​​​ ​​+ jobSet.time)​

​} ​​​​else​​​ ​​{​

​listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))​

​jobSets.put(jobSet.time, jobSet)​

​jobSet.jobs.foreach(job ​​​​=​​​​> jobExecutor.execute(​​​​new​​​ ​​JobHandler(job)))​

​logInfo(​​​​"Added jobs for time "​​​ ​​+ jobSet.time)​

​}​

​}​

 

 

​private​​​ ​​val​​​ ​​jobSets​​​​:​​​ ​​java.util.Map[Time, JobSet] ​​​​=​​​ ​​new​​​ ​​ConcurrentHashMap[Time, JobSet]​

  

JobHandler对象为实现Runnable 接口,job的run方法导致了func的调用,即基于DStream的业务逻辑

​def​​​ ​​submitJobSet(jobSet​​​​:​​​ ​​JobSet) {​

​if​​​ ​​(jobSet.jobs.isEmpty) {​

​logInfo(​​​​"No jobs added for time "​​​ ​​+ jobSet.time)​

​} ​​​​else​​​ ​​{​

​listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))​

​jobSets.put(jobSet.time, jobSet)​

​jobSet.jobs.foreach(job ​​​​=​​​​> jobExecutor.execute(​​​​new​​​ ​​JobHandler(job)))​

​logInfo(​​​​"Added jobs for time "​​​ ​​+ jobSet.time)​

​}​

​}​

 

 

​private​​​ ​​val​​​ ​​jobSets​​​​:​​​ ​​java.util.Map[Time, JobSet] ​​​​=​​​ ​​new​​​ ​​ConcurrentHashMap[Time, JobSet]​

  

最后总结两点:

  1. Action RDD触发作业的执行,这个时候作为Runnable接口封装,它会定义一个方法,方法里面是基于DStream的依赖关系生成的RDD。翻译的时候是将DStream的依赖关系翻译成RDD的依赖关系,由于DStream的依赖关系最后一个是action级别的,翻译成RDD的时候,RDD的最后一个操作也应该是action级别的,如果翻译的时候直接执行的话,就直接生成了Job,就没有所谓的队列,所以会将翻译的事件放到一个函数中或者一个方法中,因此,如果这个函数没有指定的action触发作业是执行不了的。
  2. Spark Streaming根据时间不断的去管理我们生成的作业,这个时候我们每个作业又有action级别的操作,这个action操作是对DStream进行逻辑级别的操作,它生成每个Job放到队列的时候,一定会被翻译为RDD的操作,那基于RDD操作的最后一个一定是action级别的,如果翻译的话直接就是触发action的话整个Spark Streaming的Job就不受管理了。因此我们既要保证它的翻译,又要保证对它的管理,把DStream之间的依赖关系转变为RDD之间的依赖关系,最后一个DStream使得action的操作,翻译成一个RDD之间的action操作,整个翻译后的内容它是一块内容,这一块内容是放在一个函数体中的,这个函数体,就是函数的定义,这个函数由于它只是定义还没有执行,所以它里面的RDD的action不会执行,不会触发Job,当我们的JobScheduler要调度Job的时候,转过来在线程池中拿出一条线程执行刚才的封装的方法。
举报

相关推荐

0 条评论