本期内容:
1、Spark Streaming Job生成深度思考
2、Spark Streaming Job生成源码解析
一、Spark Streaming Job生成深度思考
源码解析:
1. 在大数据处理场景中,如果不是流处理的话,一般会有定时任务。例如10分钟触发一次,1个小时触发一次,这就是做流处理的感觉,一切不是流处理,或者与流处理无关的数据都将是没有价值的数据,以前做批处理的时候其实也是隐形的在做流处理,一切处理终将被流处理统一!!
DStreams其实有三种类型:
第一种是输入的DStrams,可以有各种不同的数据来源构建的Stream,例如来自Socket,Kafka,Flume等;
第二种是输出的DStrams,outputStreams 是逻辑级别的Action,由于还是Spark Streaming框架级别的,底层还是会翻译成物理级别的Action,就是ADD的Action;
第三种是Transforms操作从一种DStream转变为另一种DStream,即基于其他DStream产生的。其中DStreamGraph 类记录了数据来源的DStream,和输出类型的DStream
产生DStreams有两种方式:
DSTreams要么基于数据源产生,要么基于其它的DStreams产生;
SparkStreaming流处理基于时间作为触发器,Storm基于事件作为触发器,基于一个又一个的Record!!
二、Spark Streaming Job生成源码解析
Spark 作业动态生成三大核心:
JobGenerator: 负责Job生成。 基于源数据生成;
JobSheduler: 负责Job调度。 基于源数据调度;
ReceiverTracker: 获取元数据。
JobGenerator和ReceiverTracker是JobScheduler的成员,从JobScheduler的start方法可以看出!!
跟踪源码得到如下运行流程:streamingcontext.start-->jobscheduler.start-->receiverTracker.start()-->JobGenterator.start()-->EventLoop-->processEvent()-->generateJobs()-->jobScheduler.receiverTracker.allocateBlocksToBatch(time)-->graph.generateJobs(time)-->jobScheduler.inputInfoTracker.getInfo(time)-->jobScheduler.submitJobSet-->startFirstTime()-->graph.start()-->timer.start()
具体逻辑图如下:
说明:此图引用了spark版本定制班成员http://lqding.blog.51cto.com/9123978/1772958的成果,表示深深的感谢
下面结合调式过程和job生成结构图,逐步追踪源码:
1.val ssc = new StreamingContext(conf, Seconds(5))
ssc.start()程序运行的入口;
2.进入jobscheduler.start()
|
3.进入receiverTracker.start()方法:
|
4.进入jobGenerator.start()方法:
|
首先看EventLoop类
|
进入最重要的processEvent方法:
|
进入generateJobs方法:
|
进入DStreamGraph的generateJobs方法:
|
进入onReceive方法:
|
其中submitJobSet方法,只是把JobSet放到ConcurrentHashMap中,把Job封装为JobHandler提交到jobExecutor线程池中
|
JobHandler对象为实现Runnable 接口,job的run方法导致了func的调用,即基于DStream的业务逻辑
|
最后总结两点:
- Action RDD触发作业的执行,这个时候作为Runnable接口封装,它会定义一个方法,方法里面是基于DStream的依赖关系生成的RDD。翻译的时候是将DStream的依赖关系翻译成RDD的依赖关系,由于DStream的依赖关系最后一个是action级别的,翻译成RDD的时候,RDD的最后一个操作也应该是action级别的,如果翻译的时候直接执行的话,就直接生成了Job,就没有所谓的队列,所以会将翻译的事件放到一个函数中或者一个方法中,因此,如果这个函数没有指定的action触发作业是执行不了的。
- Spark Streaming根据时间不断的去管理我们生成的作业,这个时候我们每个作业又有action级别的操作,这个action操作是对DStream进行逻辑级别的操作,它生成每个Job放到队列的时候,一定会被翻译为RDD的操作,那基于RDD操作的最后一个一定是action级别的,如果翻译的话直接就是触发action的话整个Spark Streaming的Job就不受管理了。因此我们既要保证它的翻译,又要保证对它的管理,把DStream之间的依赖关系转变为RDD之间的依赖关系,最后一个DStream使得action的操作,翻译成一个RDD之间的action操作,整个翻译后的内容它是一块内容,这一块内容是放在一个函数体中的,这个函数体,就是函数的定义,这个函数由于它只是定义还没有执行,所以它里面的RDD的action不会执行,不会触发Job,当我们的JobScheduler要调度Job的时候,转过来在线程池中拿出一条线程执行刚才的封装的方法。