0
点赞
收藏
分享

微信扫一扫

Spark源码之Rpc架构

夏木之下 2022-01-11 阅读 42

目录

一、概述

在Spark中,很多地方都涉及网络通信,比如Spark各个组件间的消息互通用户文件与Jar包的上传节点间的Shuffle过程数据传输Block数据的复制与备份等。Spark1.6之前,Spark Rpc是基于Akka实现的,Akka是基于Scala语言的异步消息框架,但由于Akka不适合大文件的传输;所以,Spark1.6之前Rpc通过Akka来实现,而大文件是基于Jetty实现的HttpFileServer。然而,Spark1.6移除了Akka(https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-5293),原因概括为:

  • 使用Spark用户很多,但是由于Akka不同版本之间无法互相通信,这就要求用户必须使用跟Spark完全匹配的Akka版本,导致用户无法升级Akka;
  • Spark的Akka配置是针对Spark自身来调优的,很可能与用户自定义开发的Akka配置冲突;
  • Spark使用的Akka特性很少,这部分特性很容易自己实现。同时,这部分代码量相比Akka来说少很多,Debug比较容易。如果遇到什么Bug,也可以自己Fix,不需要等Akka上游发布新版本。而且,Spark升级Akka本身又因为第一点会强制要求用户升级使用的Akka,对于某些用户来说是不现实的;
  • 在Spark2.0.0中,移除了Jetty,同时借鉴于Akka的设计,重构了基于Netty的Rpc框架体系,其中Rpc和大文件传输都是使用Netty。

注:如下涉及的源码版本为Spark2.1.0,下面部分图片摘之网上大佬博客。

二、RPC架构

2.1 简略类图

在这里插入图片描述

2.2 详细类图

在这里插入图片描述

2.3 组件简介

组件描述
RpcEnv全称RPC Environment,管理整个RpcEndpoint的生命周期,也就是每个Rpc端点运行时依赖的环境
NettyRpcEnvRpcEnv的唯一实现类,也就是Netty通信实现
RpcEndpointRPC端点,也就是Spark通信实体,每个通信实体为一个通信Rpc端点。而且,均需要实现RpcEndpoint接口,比如DriverEndpoint,MasterEndpont,内部根据不同端点的需求,设计不同的消息和不同的业务处理
Dispatcher消息分发器(来自netty的概念),负责将RpcMessage分发至对应的RpcEndpoint。Dispatcher中包含一个 MessageLoop,它读取LinkedBlockingQueue投递的RpcMessage,根据客户端指定的Endpoint标识,找到Endpoint的 Inbox,然后投递进去,由于是阻塞队列,当没有消息的时候自然阻塞,一旦有消息,就开始工作。Dispatcher的 ThreadPool负责消费这些 Message
EndpointData每个Endpoint都有一个对应的EndpointData,EndpointData内部包含了RpcEndpoint、NettyRpcEndpointRef信息,以及一个Inbox。收信箱Inbox内部有一个InboxMessage链表,发送到该endpoint的消息,就是添加到该链表,同时将整个EndpointData添加Dispatcher到阻塞队列receivers中,由Dispatcher线程异步处理
Inbox一个本地端点对应一个收件箱,Inbox里面有一个InboxMessage的链表,InboxMessage有很多子类,可以是远程调用过来的RpcMessage,可以是远程调用过来的fire-and-forget的单向消息OneWayMessage,还可以是各种服务启动,链路建立断开等的Message,这些Message都会在Inbox内部方法做模式匹配,调用相应的RpcEndpoint的函数
RpcEndPointRefRpcEndpointRef是一个对RpcEndpoint远程引用对象,通过它可以向远程RpcEndpoint端发送消息以进行通信
NettyRpcEndpointRefRpcEndpointRef的唯一实现类,RpcEndpointRef的NettyRpcEnv版本。此类的行为取决于它的创建位置。在“拥有”RpcEndpoint的节点上,它是RpcEndpointAddress实例的简单包装器
RpcEndpointAddress主要包含了RpcAddress (host和port) 和 rpc endpoint name的信息
Outbox一个远程端点对应一个发件箱,NettyRpcEnv中包含一个 ConcurrentHashMap[RpcAddress, Outbox]。当消息放入 Outbox 后,紧接着将消息通过 TransportClient 发送出去
TransportContext主要用于创建TransportServer和TransportClientFactory的上下文,以及使用TransportChannelHandler建立netty channel pipeline的上下文。TransportClient 提供了两种通信协议:控制层面的RPC,以及数据层面的 chunk抓取。用户通过构造方法传入的rpcHandler负责处理RPC请求。并且,rpcHandler负责设置流,可以使用零拷贝IO以数据块的形式流式传输流。TransportServer和TransportClientFactory都为每一个channel创建一个 TransportChannelHandler对象。每一个TransportChannelHandler包含一个TransportClient,这使服务器进程能够在现有通道上将消息发送回客户端
TransportServerTransportServer是RPC框架的服务端,可提供高效的、低级别的流服务
TransportServerBootstrap定义了服务端引导程序的规范,服务端引导程序旨在当客户端与服务端建立连接之后,在服务端持有的客户端管道上执行的引导程序。用于初始化TransportServer
TransportClientFactory创建传输客户端(TransportClient)的传输客户端工厂类
TransportClientRPC框架的客户端,用于获取预先协商好的流中的连续块。TransportClient旨在允许有效传输大量数据,这些数据将被拆分成几百KB到几MB的块。简言之,可以认为TransportClient就是Spark Rpc最底层的基础客户端类。主要用于向server端发送Rpc请求和从server端获取流的chunk块
TransportClientBootstrap是在TransportClient上执行的客户端引导程序,主要对连接建立时进行一些初始化的准备(例如验证、加密)。TransportClientBootstrap所作的操作往往是昂贵的,好在建立的连接可以重用。用于初始化TransportClient
TransportChannelHandler传输层的handler,负责委托请求给TransportRequestHandler,委托响应给TransportResponseHandler。在传输层中创建的所有通道都是双向的。当客户端使用RequestMessage启动Netty通道(由服务器的RequestHandler处理)时,服务器将生成ResponseMessage(由客户端的ResponseHandler处理)。但是,服务器也会在同一个Channel上获取句柄,因此它可能会开始向客户端发送RequestMessages。这意味着客户端还需要一个RequestHandler,而Server需要一个ResponseHandler,用于客户端对服务器请求的响应。此类还处理来自io.netty.handler.timeout.IdleStateHandler的超时。如果存在未完成的提取或RPC请求,并且通道闲置超过requestTimeoutMs,我们认为连接超时。

当TransportChannelHandler读取到的request是RequestMessage类型时,则将此消息的处理进一步交给TransportRequestHandler,当request是ResponseMessage时,则将此消息的处理进一步交给TransportResponseHandler
TransportResponseHandler用于处理服务端的响应,并且对发出请求的客户端进行响应
TransportRequestHandler用于处理客户端的请求并在写完块数据后返回
MessageEncoder在将消息放入管道前,先对消息内容进行编码,防止管道另一端读取时丢包和解析错误
MessageDecoder对从管道中读取的ByteBuf进行解析,防止丢包和解析错误
TransportFrameDecoder对从管道中读取的ByteBuf按照数据帧进行解析
StreamManager处理ChunkFetchRequest和StreamRequest请求
RpcHandler处理RpcRequest和OneWayMessage请求
MessageMessage是消息的抽象接口,消息实现类都直接或间接的实现了RequestMessage或ResponseMessage接口

三、组件原理

3.1 Message消息

协议是应用层通信的基础,它提供了应用层通信的数据表示,以及编码和解码的能力。在Spark Network Common中,继承Akka中的定义,将协议命名为Message,它继承Encodable,提供了Encode的能力。
在这里插入图片描述
其中,RequestMessage具体实现有四种,分别是:

  • StreamRequest:此类消息表示向远程服务发起请求,以获取流式数据。Stream消息主要用于Driver到executor传输jar、file文件等。
  • RpcRequest:此类消息主要是远程Rpc服务端需要处理的消息,是一种需要服务端向客户端回复的Rpc请求信息类型。
  • ChunkFetchRequest:请求获取流单个块的序列。ChunkFetch消息用于抽象所有Spark中涉及到数据拉取操作时需要传输的消息。
  • OneWayMessage:此类消息也需要由远程RPC服务端处理,与RpcRequest不同的是不需要服务端向客户端回复。

由于OneWayMessage不需要响应,所以ResponseMessage对于成功或失败状态的实现各有两种,分别是:

  • StreamResponse:处理StreamRequest成功后返回的消息;
  • StreamFailure:处理StreamRequest失败后返回的消息;
  • RpcResponse:处理RpcRequest成功后返回的消息;
  • RpcFailure:处理RpcRequest失败后返回的消息;
  • ChunkFetchSuccess:处理ChunkFetchRequest成功后返回的消息;
  • ChunkFetchFailure:处理ChunkFetchRequest失败后返回的消息;

3.2 通信架构

Spark的Rpc框架是基于Actor模型,各个组件可以认为是一个独立实体,各个实体之间通过消息来进行通信。具体各个组件之间的关系图如下:
在这里插入图片描述

  • RpcEnv:为RpcEndpoint提供处理消息的环境。RpcEnv负责RpcEndpoint整个生命周期的管理,包括:Endpoint注册、Endpoint之间消息的路由,以及Endpoint停止,而NettyRpcEnv目前是其唯一实现。
  • RpcEndpoint:服务端,是根据接收的消息类型进行对应处理,一个RpcEndpoint经历的过程依次是:create -> onStart -> receive -> onStop。其中,onStart在接收任务消息前调用(在注册时候做为第一个自己处理的消息调用),receivereceiveAndReply分别用来接收sendask过来的消息。
  • RpcEndpointRef:客户端,是对远程RpcEndpoint的一个引用。当我们需要向一个具体的RpcEndpoint发送消息时,一般需要获取到该RpcEndpoint的引用,然后通过该引用发送消息,提供了send(单向发送,提供fire-and-forget语义)和ask(带返回的请求,提供请求响应的语义)的消息发送方式。其中,需要返回response的ask方式,带有超时机制,可以同步阻塞等待,也可以返回一个Future句柄,不阻塞发起请求的工作线程。另外,RpcEndpointRef能够自动的区分做到本地调用或者远程Rpc调用。
  • RpcAddress:表示远程的RpcEndpointRef的地址,包含host、port。

3.3 SparkEnv初始化

SparkEnv保存Application运行时的环境信息,包括 RpcEnv、Serializer、Block Manager和ShuffleManager 等,并且为Driver端和Executor端分别提供不同的创建方式。其中,RpcEnv维持着Spark 节点间的通信,并负责将传递过来的消息转发给RpcEndpoint。
在这里插入图片描述
可以知道Executor启动是CoarseGrainedExecutorBackend入口类负责的,以sparkExecutor的RpcEnv初始化说明为例,看一下createExecutorEnv()的代码逻辑:

private[spark] object CoarseGrainedExecutorBackend extends Logging {
  private def run(
      driverUrl: String,
      executorId: String,
      hostname: String,
      cores: Int,
      appId: String,
      workerUrl: Option[String],
      userClassPath: Seq[URL]) {
      ...
      // 创建SparkEnv
      val env = SparkEnv.createExecutorEnv(
        driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false)
      // 注册CoarseGrainedExecutorBackend
      env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
        env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
      // 注册WorkerWatcher
      workerUrl.foreach { url =>
        env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
      }
      env.rpcEnv.awaitTermination()
      SparkHadoopUtil.get.stopCredentialUpdater()
    }
  }

  def main(args: Array[String]) {
  	...
    run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
    System.exit(0)
  }
}

SparkEnv#createExecutorEnv代码逻辑

object SparkEnv extends Logging {
    private[spark] val driverSystemName = "sparkDriver"
  private[spark] val executorSystemName = "sparkExecutor"
  
  /**
   * 创建一个coarse-grained(粗粒度)的Executor SparkEnv
   */
  private[spark] def createExecutorEnv(
      conf: SparkConf,
      executorId: String,
      hostname: String,
      port: Int,
      numCores: Int,
      ioEncryptionKey: Option[Array[Byte]],
      isLocal: Boolean): SparkEnv = {
    val env = create(
      conf,
      executorId,
      hostname,
      hostname,
      port,
      isLocal,
      numCores,
      ioEncryptionKey
    )
    SparkEnv.set(env)
    env
  }
  
  private def create(
      conf: SparkConf,
      executorId: String,
      bindAddress: String,
      advertiseAddress: String,
      port: Int,
      isLocal: Boolean,
      numUsableCores: Int,
      ioEncryptionKey: Option[Array[Byte]],
      listenerBus: LiveListenerBus = null,
      mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
    // 判断是否是Driver
    val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER

    // Driver必须有ListenerBus
    if (isDriver) {
      assert(listenerBus != null, "Attempted to create driver SparkEnv with null listener bus!")
    }
    
    // 创建SecurityManager
    val securityManager = new SecurityManager(conf, ioEncryptionKey)
    ioEncryptionKey.foreach { _ =>
      if (!securityManager.isSaslEncryptionEnabled()) {
        logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " +
          "wire.")
      }
    }
    // 启动应用名称
    val systemName = if (isDriver) driverSystemName else executorSystemName
    val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf,
      securityManager, clientMode = !isDriver)

    // 指定RpcEnv实际绑定端口号,防止端口为0,或者被占用。
    // 在非驱动模式下,可能因为不会监听传入的链接,导致RpcEnv地址可能为空。
    if (isDriver) {
      conf.set("spark.driver.port", rpcEnv.address.port.toString)
    } else if (rpcEnv.address != null) {
      conf.set("spark.executor.port", rpcEnv.address.port.toString)
      logInfo(s"Setting spark.executor.port to: ${rpcEnv.address.port.toString}")
    }
    
    ...
    
    // 创建序列化器
    val serializer = instantiateClassFromConf[Serializer](
      "spark.serializer", "org.apache.spark.serializer.JavaSerializer")
    logDebug(s"Using serializer: ${serializer.getClass}")

    val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)

    val closureSerializer = new JavaSerializer(conf)

    def registerOrLookupEndpoint(
        name: String, endpointCreator: => RpcEndpoint):
      RpcEndpointRef = {
      if (isDriver) {
        logInfo("Registering " + name)
        rpcEnv.setupEndpoint(name, endpointCreator)
      } else {
        RpcUtils.makeDriverRef(name, conf, rpcEnv)
      }
    }
    
    // 创建BroadcastManager
    val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)

    val mapOutputTracker = if (isDriver) {
      new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
    } else {
      new MapOutputTrackerWorker(conf)
    }

    // Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint
    // requires the MapOutputTracker itself
    mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
      new MapOutputTrackerMasterEndpoint(
        rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))

    // 指定ShuffleManager
    val shortShuffleMgrNames = Map(
      "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
      "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
    val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
    val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
    val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
    
    // 指定内存管理器
    val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
    val memoryManager: MemoryManager =
      if (useLegacyMemoryManager) {
        new StaticMemoryManager(conf, numUsableCores)
      } else {
        UnifiedMemoryManager(conf, numUsableCores)
      }

    val blockManagerPort = if (isDriver) {
      conf.get(DRIVER_BLOCK_MANAGER_PORT)
    } else {
      conf.get(BLOCK_MANAGER_PORT)
    }

    val blockTransferService =
      new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
        blockManagerPort, numUsableCores)
        
    // 创建BlockManagerMaster
    val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
      BlockManagerMaster.DRIVER_ENDPOINT_NAME,
      new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
      conf, isDriver)

    // NB: blockManager is not valid until initialize() is called later.
    val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
      serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
      blockTransferService, securityManager, numUsableCores)

    val metricsSystem = if (isDriver) {
      // Don't start metrics system right now for Driver.
      // We need to wait for the task scheduler to give us an app ID.
      // Then we can start the metrics system.
      MetricsSystem.createMetricsSystem("driver", conf, securityManager)
    } else {
      // We need to set the executor ID before the MetricsSystem is created because sources and
      // sinks specified in the metrics configuration file will want to incorporate this executor's
      // ID into the metrics they report.
      conf.set("spark.executor.id", executorId)
      val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
      ms.start()
      ms
    }
    
    // 创建OutputCommitCoordinator
    val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
      new OutputCommitCoordinator(conf, isDriver)
    }
    val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
      new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
    outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)
    
    // 创建SparkEnv
    val envInstance = new SparkEnv(
      executorId,
      rpcEnv,
      serializer,
      closureSerializer,
      serializerManager,
      mapOutputTracker,
      shuffleManager,
      broadcastManager,
      blockManager,
      securityManager,
      metricsSystem,
      memoryManager,
      outputCommitCoordinator,
      conf)

    // Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is
    // called, and we only need to do it for driver. Because driver may run as a service, and if we
    // don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.
    if (isDriver) {
      val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
      envInstance.driverTmpDir = Some(sparkFilesDir)
    }

    envInstance
  }

3.4 RpcEnv

RpcEnv不仅从外部接口与Akka基本一致,在内部的实现上,也基本差不多,都是按照MailBox的设计思路来实现的;
RpcEnv Server模型
在这里插入图片描述
RpcEnv Client模型
在这里插入图片描述
如上图所示,RpcEnv即充当着Server,同时也提供内部实现Client。 当作为Server时,RpcEnv会初始化一个Server,并注册NettyRpcHandler,一般情况下,简单业务可以在RpcHandler直接完成请求的处理,但是考虑一个RpcEnv的Server上会挂载了很多个RpcEndpoint,每个RpcEndpoint的RPC请求频率不可控,因此需要对一定的分发机制和队列来维护这些请求,其中Dispatcher为分发器,InBox即为请求队列;
在将RpcEndpoint注册到RpcEnv过程中,也间接的将RpcEnv注册到Dispatcher分发器中,Dispatcher针对每个RpcEndpoint维护一个InBox,在Dispatcher维持一个线程池(线程池大小默认为系统可用的核数,当然也可以通过spark.rpc.netty.dispatcher.numThreads进行配置),线程针对每个InBox里面的请求进行处理。当然实际的处理过程是由RpcEndpoint来完成。
其次RpcEnv也完成Client的功能实现,RpcEndpointRef是以RpcEndpoint为单位,即如果一个进程需要和远程机器上N个RpcEndpoint服务进行通信,就对应N个RpcEndpointRef(后端的实际的网络连接是公用,这个是TransportClient内部提供了连接池来实现的),当调用一个RpcEndpointRef的ask/send等接口时候,会将把“消息内容+RpcEndpointRef+本地地址”一起打包为一个RequestMessage,交由RpcEnv进行发送。注意这里打包的消息里面包括RpcEndpointRef本身是很重要的,从而可以由Server端识别出这个消息对应的是哪一个RpcEndpoint。
和发送端一样,在RpcEnv中,针对每个remote端的host:port维护一个队列,即OutBox,RpcEnv的发送仅仅是把消息放入到相应的队列中,但是和发送端不一样的是:在OutBox中没有维护一个所谓的线程池来定时清理OutBox,而是通过一堆synchronized来实现的
下面我们看下RpcEnv相关类图:

举报

相关推荐

0 条评论