0
点赞
收藏
分享

微信扫一扫

[spark streaming] 架构和运行机制

i奇异 2022-08-09 阅读 112


本期内容:

  1. Spark Streaming架构

  2. Spark Streaming运行机制

       3.解密Spark Streaming Job架构和运行机制

       4.解密Spark Streaming 容错架构和运行机制

  Spark​​大数据​​分析框架的核心部件: spark Core、spark  Streaming流计算、GraphX图计算、MLlib机器学习、Spark SQL、Tachyon文件系统、SparkR计算引擎等主要部件.

  

[spark streaming] 架构和运行机制_时间间隔

  Spark Streaming 其实是构建在spark core之上的一个应用程序,要构建一个强大的Spark应用程序 ,spark  Streaming是一个值得借鉴的参考,spark  Streaming涉及多个job交叉配合,基本涉及到了spark的所有的核心组件,精通掌握spark streaming是至关重要的。

 

  Spark Streaming基础概念理解:

    1. 离散流:(Discretized Stream ,DStream):这是spark streaming对内部的持续的实时数据流的抽象描述,也即我们处理的一个实时数据流,在spark streaming中对应一个DStream ;

    2. 批数据:将实时流时间以时间为单位进行分批,将数据处理转化为时间片数据的批处理;

    3. 时间片或者批处理时间间隔:逻辑级别的对数据进行定量的标准,以时间片作为拆分流数据的依据;

    4. 窗口长度:一个窗口覆盖的流数据的时间长度。比如说要每隔5分钟统计过去30分钟的数据,窗口长度为6,因为30分钟是batch interval 的6倍;

    5. 滑动时间间隔:比如说要每隔5分钟统计过去30分钟的数据,窗口时间间隔为5分钟;

    6. input DStream :一个inputDStream是一个特殊的DStream 将spark streaming连接到一个外部数据源来读取数据。

    7. Receiver :长时间(可能7*24小时)运行在Excutor之上,每个Receiver负责一个inuptDStream (比如读取一个kafka消息的输入流)。每个Receiver,加上inputDStream 会占用一个core/slot ;

    

  Spark Core处理的每一步都是基于RDD的,RDD之间有依赖关系。下图中的RDD的DAG显示的是有3个Action,会触发3个job,RDD自下向上依赖,RDD产生job就会具体的执行。从DSteam Graph中可以看到,DStream的逻辑与RDD基本一致,它就是在RDD的基础上加上了时间的依赖。RDD的DAG又可以叫空间维度,也就是说整个Spark Streaming多了一个时间维度,也可以成为时空维度。

  

[spark streaming] 架构和运行机制_数据_02

 

  从这个角度来讲,可以将Spark Streaming放在坐标系中。其中Y轴就是对RDD的操作,RDD的依赖关系构成了整个job的逻辑,而X轴就是时间。随着时间的流逝,固定的时间间隔(Batch Interval)就会生成一个job实例,进而在集群中运行。

[spark streaming] 架构和运行机制_时间间隔_03

  对于Spark Streaming来说,当不同的数据来源的数据流进来的时候,基于固定的时间间隔,会形成一系列固定不变的数据集或event集合(例如来自flume和kafka)。而这正好与RDD基于固定的数据集不谋而合,事实上,由DStream基于固定的时间间隔行程的RDD Graph正是基于某一个batch的数据集的。

  从上图中可以看出,在每一个Batch上,空间维度的RDD依赖关系都是一样的,不同的是这个五个Batch流入的数据规模和内容不一样,所以说生成的是不同的RDD依赖关系的实例,所以说RDD的Graph脱胎于DStream的Graph,也就是说DStream就是RDD的模板,不同的时间间隔,生成不同的RDD Graph实例。

 

  从源码解读DStream :

  

[spark streaming] 架构和运行机制_时间间隔_04

  从这里可以看出,DStream就是Spark Streaming的核心,就想Spark Core的核心是RDD,它也有dependency和compute。更为关键的是下面的代码:

[spark streaming] 架构和运行机制_数据_05

  这是一个HashMap,以时间为key,以RDD为Value,这也正应证了随着时间流逝,不断的生成RDD,产生依赖关系的job,并通过JbScheduler在集群上运行。再次验证了DStream就是RDD的模版。

  DStream可以说是逻辑级别的,RDD就是物理级别的,DStream所表达的最终都是通过RDD的转化实现的。前者是更高级别的抽象,后者是底层的实现。DStream实际上就是在时间维度上对RDD集合的封装,DStream与RDD的关系就是随着时间流逝不断的产生RDD,对DStream的操作就是在固定时间上操作RDD。

 

  总结:

    在空间维度上的业务逻辑作用于DStream,随着时间的流逝,每个Batch Interval形成了具体的数据集,产生了RDD,对RDD进行Transform操作,进而形成了RDD的依赖关系RDD DAG,形成Job。然后JobScheduler根据时间调度,基于RDD的依赖关系,把作业发布到Spark Cluster上去运行,不断的产生Spark作业。

一、解密Spark Streaming Job架构和运行机制

通过代码洞察Job的执行过程:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

​object​​​ ​​OnlineForeachRDD​​​​2​​​​DB {​

​def​​​ ​​main(args​​​​:​​​ ​​Array[String]){​

​/*​

​* 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息​

​*/​

​val​​​ ​​conf ​​​​=​​​ ​​new​​​ ​​SparkConf() ​​​​//创建SparkConf对象​

​conf.setAppName(​​​​"OnlineForeachRDD"​​​​) ​​​​//设置应用程序的名称​

​//    conf.setMaster("spark://Master:7077") //此时,程序在Spark集群​

​conf.setMaster(​​​​"local[6]"​​​​)​

 

​//设置batchDuration时间间隔来控制Job生成的频率并且创建Spark Streaming执行的入口​

​val​​​ ​​ssc ​​​​=​​​ ​​new​​​ ​​StreamingContext(conf, Seconds(​​​​5​​​​))​

 

​val​​​ ​​lines ​​​​=​​​ ​​ssc.socketTextStream(​​​​"Master"​​​​, ​​​​9999​​​​)​

 

​val​​​ ​​words ​​​​=​​​ ​​lines.flatMap(​​​​_​​​​.split(​​​​" "​​​​))​

​val​​​ ​​wordCounts ​​​​=​​​ ​​words.map(x ​​​​=​​​​> (x, ​​​​1​​​​)).reduceByKey(​​​​_​​​ ​​+ ​​​​_​​​​)​

​wordCounts.foreachRDD { rdd ​​​​=​​​​>​

​rdd.foreachPartition { partitionOfRecords ​​​​=​​​​> {​

​// ConnectionPool is a static, lazily initialized pool of connections​

​val​​​ ​​connection ​​​​=​​​ ​​ConnectionPool.getConnection()​

​partitionOfRecords.foreach(record ​​​​=​​​​> {​

​val​​​ ​​sql ​​​​=​​​ ​​"insert into streaming_itemcount(item,count) values('"​​​ ​​+ record.​​​​_​​​​1​​​ ​​+ ​​​​"',"​​​ ​​+ record.​​​​_​​​​2​​​ ​​+ ​​​​")"​

​val​​​ ​​stmt ​​​​=​​​ ​​connection.createStatement();​

​stmt.executeUpdate(sql);​

 

​})​

​ConnectionPool.returnConnection(connection)  ​​​​// return to the pool for future reuse​

​}​

​}​

​}​

​ssc.start()​

​ssc.awaitTermination()​

​}​

​}​

通过观察Job在Spark集群上运行的Log和结合源代码分析出如下流程:

  1. 创建SparkConf,设置Spark程序运行时的配置信息;
  2. 创建StreamingContext,设置batchDuration时间间隔来控制Job生成的频率并且创建Spark Streaming执行的入口;
  3. 在创建StreamingContext的过程中,会实例化JobScheduler和JobGenerator,调用StreamingContext的start方法时,在JobScheduler.start()内部实例化EventLoop,并执行EventLoop.start()进行消息循环,在JobScheduler的start方法内部会构造JobGenerator和ReveiverTracker,并分别调用它们的start方法;
  4. JobGenerator启动后会不断的根据batchDuration生成一个个的Job;
  5. ReceiverTracker启动后首先在Spark Cluster中启动Receiver,其实是在Executor中首先启动ReceiverSupervisor,Receiver收到数据后会通过ReceiverSupervisor存储到Executor并且把数据的Metadata信息发送给Driver中的ReceiverTracker,在ReceiverTracker内部会通过ReceivedBlockTracker来管理接受到的元数据信息(元数据:数据存储的位置、索引等)。
  6. 时间不断的流动,job怎么产生的?每个BatchInterval会产生一个具体的Job,其实这里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD 的DAG而已,从Java角度讲,相当于Runnable接口实例,此时要想运行Job需要提交给JobScheduler,在JobScheduler中通过线程池的方式找到一个单独的线程来提交Job到集群运行(其实是在线程中基于RDD的Action触发真正的作业的运行);

      为什么使用线程池呢?

  1. 作业不断生成,所以为了提升效率,我们需要线程池;这和在Executor中通过线程池执行Task有异曲同工之妙;
  2. 有可能设置了Job的FAIR公平调度的方式,这个时候也需要多线程的支持;

二、解密Spark Streaming 容错架构和运行机制

        

      我们知道DStream与RDD的关系就是随着时间流逝不断的产生RDD,对DStream的操作就是在固定时间上操作RDD。所以从某种意义上而言,Spark Streaming的基于DStream的容错机制,实际上就是划分到每一次形成的RDD的容错机制,

      这也是Spark Streaming的高明之处。

      Spark Streaming的容错要考虑两个方面:

  1. Driver运行失败时的恢复
    使用Checkpoint,记录Driver运行时的状态,失败后可以读取Checkpoint并恢复Driver状态。
  2. 具体的每次Job运行失败时的恢复
    要考虑到Receiver的失败恢复,也要考虑到RDD计算失败的恢复。Receiver可以采用写wal日志的方式。RDD的容错是spark core天生提供的,基于RDD的特性,它的容错机制主要就是两种:

      01. 基于checkpoint

              在stage之间,是宽依赖,产生了shuffle操作,lineage链条过于复杂和冗长,这时候就需要做checkpoint。

     02. 基于lineage(血统)的容错:

          一般而言,spark选择血统容错,因为对于大规模的数据集,做检查点的成本很高。考虑到RDD的依赖关系,每个stage内部都是窄依赖,此时一般基于lineage容错,方便高效。

举报

相关推荐

0 条评论