目录
一、概述
在Spark中,很多地方都涉及网络通信,比如Spark各个组件间的消息互通
、用户文件与Jar包的上传
、节点间的Shuffle过程数据传输
、Block数据的复制与备份
等。Spark1.6之前,Spark Rpc是基于Akka实现的,Akka是基于Scala语言的异步消息框架,但由于Akka不适合大文件的传输;所以,Spark1.6之前Rpc通过Akka来实现,而大文件是基于Jetty实现的HttpFileServer。然而,Spark1.6移除了Akka(https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-5293),原因概括为:
- 使用Spark用户很多,但是由于Akka不同版本之间无法互相通信,这就要求用户必须使用跟Spark完全匹配的Akka版本,导致用户无法升级Akka;
- Spark的Akka配置是针对Spark自身来调优的,很可能与用户自定义开发的Akka配置冲突;
- Spark使用的Akka特性很少,这部分特性很容易自己实现。同时,这部分代码量相比Akka来说少很多,Debug比较容易。如果遇到什么Bug,也可以自己Fix,不需要等Akka上游发布新版本。而且,Spark升级Akka本身又因为第一点会强制要求用户升级使用的Akka,对于某些用户来说是不现实的;
- 在Spark2.0.0中,移除了Jetty,同时借鉴于Akka的设计,重构了基于Netty的Rpc框架体系,其中Rpc和大文件传输都是使用Netty。
注:如下涉及的源码版本为Spark2.1.0,下面部分图片摘之网上大佬博客。
二、RPC架构
2.1 简略类图
2.2 详细类图
2.3 组件简介
组件 | 描述 |
---|---|
RpcEnv | 全称RPC Environment,管理整个RpcEndpoint的生命周期,也就是每个Rpc端点运行时依赖的环境 |
NettyRpcEnv | RpcEnv的唯一实现类,也就是Netty通信实现 |
RpcEndpoint | RPC端点,也就是Spark通信实体,每个通信实体为一个通信Rpc端点。而且,均需要实现RpcEndpoint接口,比如DriverEndpoint,MasterEndpont,内部根据不同端点的需求,设计不同的消息和不同的业务处理 |
Dispatcher | 消息分发器(来自netty的概念),负责将RpcMessage分发至对应的RpcEndpoint。Dispatcher中包含一个 MessageLoop,它读取LinkedBlockingQueue投递的RpcMessage,根据客户端指定的Endpoint标识,找到Endpoint的 Inbox,然后投递进去,由于是阻塞队列,当没有消息的时候自然阻塞,一旦有消息,就开始工作。Dispatcher的 ThreadPool负责消费这些 Message |
EndpointData | 每个Endpoint都有一个对应的EndpointData,EndpointData内部包含了RpcEndpoint、NettyRpcEndpointRef信息,以及一个Inbox。收信箱Inbox内部有一个InboxMessage链表,发送到该endpoint的消息,就是添加到该链表,同时将整个EndpointData添加Dispatcher到阻塞队列receivers中,由Dispatcher线程异步处理 |
Inbox | 一个本地端点对应一个收件箱 ,Inbox里面有一个InboxMessage的链表,InboxMessage有很多子类,可以是远程调用过来的RpcMessage,可以是远程调用过来的fire-and-forget的单向消息OneWayMessage,还可以是各种服务启动,链路建立断开等的Message,这些Message都会在Inbox内部方法做模式匹配,调用相应的RpcEndpoint的函数 |
RpcEndPointRef | RpcEndpointRef是一个对RpcEndpoint远程引用对象,通过它可以向远程RpcEndpoint端发送消息以进行通信 |
NettyRpcEndpointRef | RpcEndpointRef的唯一实现类,RpcEndpointRef的NettyRpcEnv版本。此类的行为取决于它的创建位置。在“拥有”RpcEndpoint的节点上,它是RpcEndpointAddress实例的简单包装器 |
RpcEndpointAddress | 主要包含了RpcAddress (host和port) 和 rpc endpoint name的信息 |
Outbox | 一个远程端点对应一个发件箱 ,NettyRpcEnv中包含一个 ConcurrentHashMap[RpcAddress, Outbox]。当消息放入 Outbox 后,紧接着将消息通过 TransportClient 发送出去 |
TransportContext | 主要用于创建TransportServer和TransportClientFactory的上下文,以及使用TransportChannelHandler建立netty channel pipeline的上下文。TransportClient 提供了两种通信协议:控制层面的RPC,以及数据层面的 chunk抓取。用户通过构造方法传入的rpcHandler负责处理RPC请求。并且,rpcHandler负责设置流,可以使用零拷贝IO以数据块的形式流式传输流。TransportServer和TransportClientFactory都为每一个channel创建一个 TransportChannelHandler对象。每一个TransportChannelHandler包含一个TransportClient,这使服务器进程能够在现有通道上将消息发送回客户端 |
TransportServer | TransportServer是RPC框架的服务端,可提供高效的、低级别的流服务 |
TransportServerBootstrap | 定义了服务端引导程序的规范,服务端引导程序旨在当客户端与服务端建立连接之后,在服务端持有的客户端管道上执行的引导程序。用于初始化TransportServer |
TransportClientFactory | 创建传输客户端(TransportClient)的传输客户端工厂类 |
TransportClient | RPC框架的客户端,用于获取预先协商好的流中的连续块。TransportClient旨在允许有效传输大量数据,这些数据将被拆分成几百KB到几MB的块。简言之,可以认为TransportClient就是Spark Rpc最底层的基础客户端类。主要用于向server端发送Rpc请求和从server端获取流的chunk块 |
TransportClientBootstrap | 是在TransportClient上执行的客户端引导程序,主要对连接建立时进行一些初始化的准备(例如验证、加密)。TransportClientBootstrap所作的操作往往是昂贵的,好在建立的连接可以重用。用于初始化TransportClient |
TransportChannelHandler | 传输层的handler,负责委托请求给TransportRequestHandler,委托响应给TransportResponseHandler。在传输层中创建的所有通道都是双向的。当客户端使用RequestMessage启动Netty通道(由服务器的RequestHandler处理)时,服务器将生成ResponseMessage(由客户端的ResponseHandler处理)。但是,服务器也会在同一个Channel上获取句柄,因此它可能会开始向客户端发送RequestMessages。这意味着客户端还需要一个RequestHandler,而Server需要一个ResponseHandler,用于客户端对服务器请求的响应。此类还处理来自io.netty.handler.timeout.IdleStateHandler 的超时。如果存在未完成的提取或RPC请求,并且通道闲置超过requestTimeoutMs,我们认为连接超时。当TransportChannelHandler读取到的request是RequestMessage类型时,则将此消息的处理进一步交给TransportRequestHandler,当request是ResponseMessage时,则将此消息的处理进一步交给TransportResponseHandler |
TransportResponseHandler | 用于处理服务端的响应,并且对发出请求的客户端进行响应 |
TransportRequestHandler | 用于处理客户端的请求并在写完块数据后返回 |
MessageEncoder | 在将消息放入管道前,先对消息内容进行编码,防止管道另一端读取时丢包和解析错误 |
MessageDecoder | 对从管道中读取的ByteBuf进行解析,防止丢包和解析错误 |
TransportFrameDecoder | 对从管道中读取的ByteBuf按照数据帧进行解析 |
StreamManager | 处理ChunkFetchRequest和StreamRequest请求 |
RpcHandler | 处理RpcRequest和OneWayMessage请求 |
Message | Message是消息的抽象接口,消息实现类都直接或间接的实现了RequestMessage或ResponseMessage接口 |
三、组件原理
3.1 Message消息
协议是应用层通信的基础,它提供了应用层通信的数据表示,以及编码和解码的能力。在Spark Network Common中,继承Akka中的定义,将协议命名为Message,它继承Encodable,提供了Encode的能力。
其中,RequestMessage具体实现有四种,分别是:
StreamRequest
:此类消息表示向远程服务发起请求,以获取流式数据。Stream消息主要用于Driver到executor传输jar、file文件等。RpcRequest
:此类消息主要是远程Rpc服务端需要处理的消息,是一种需要服务端向客户端回复的Rpc请求信息类型。ChunkFetchRequest
:请求获取流单个块的序列。ChunkFetch消息用于抽象所有Spark中涉及到数据拉取操作时需要传输的消息。OneWayMessage
:此类消息也需要由远程RPC服务端处理,与RpcRequest不同的是不需要服务端向客户端回复。
由于OneWayMessage不需要响应,所以ResponseMessage对于成功或失败状态的实现各有两种,分别是:
StreamResponse
:处理StreamRequest成功后返回的消息;StreamFailure
:处理StreamRequest失败后返回的消息;RpcResponse
:处理RpcRequest成功后返回的消息;RpcFailure
:处理RpcRequest失败后返回的消息;ChunkFetchSuccess
:处理ChunkFetchRequest成功后返回的消息;ChunkFetchFailure
:处理ChunkFetchRequest失败后返回的消息;
3.2 通信架构
Spark的Rpc框架是基于Actor模型,各个组件可以认为是一个独立实体,各个实体之间通过消息来进行通信。具体各个组件之间的关系图如下:
RpcEnv
:为RpcEndpoint提供处理消息的环境。RpcEnv负责RpcEndpoint整个生命周期的管理,包括:Endpoint注册、Endpoint之间消息的路由,以及Endpoint停止,而NettyRpcEnv目前是其唯一实现。RpcEndpoint
:服务端,是根据接收的消息类型进行对应处理,一个RpcEndpoint经历的过程依次是:create -> onStart -> receive -> onStop
。其中,onStart
在接收任务消息前调用(在注册时候做为第一个自己处理的消息调用),receive
和receiveAndReply
分别用来接收send
和ask
过来的消息。RpcEndpointRef
:客户端,是对远程RpcEndpoint的一个引用。当我们需要向一个具体的RpcEndpoint发送消息时,一般需要获取到该RpcEndpoint的引用,然后通过该引用发送消息,提供了send
(单向发送,提供fire-and-forget语义)和ask
(带返回的请求,提供请求响应的语义)的消息发送方式。其中,需要返回response的ask方式,带有超时机制,可以同步阻塞等待,也可以返回一个Future句柄,不阻塞发起请求的工作线程。另外,RpcEndpointRef能够自动的区分做到本地调用或者远程Rpc调用。RpcAddress
:表示远程的RpcEndpointRef的地址,包含host、port。
3.3 SparkEnv初始化
SparkEnv保存Application运行时的环境信息,包括 RpcEnv、Serializer、Block Manager和ShuffleManager 等,并且为Driver端和Executor端分别提供不同的创建方式。其中,RpcEnv维持着Spark 节点间的通信,并负责将传递过来的消息转发给RpcEndpoint。
可以知道Executor启动是CoarseGrainedExecutorBackend
入口类负责的,以sparkExecutor的RpcEnv初始化说明为例,看一下createExecutorEnv()的代码逻辑:
private[spark] object CoarseGrainedExecutorBackend extends Logging {
private def run(
driverUrl: String,
executorId: String,
hostname: String,
cores: Int,
appId: String,
workerUrl: Option[String],
userClassPath: Seq[URL]) {
...
// 创建SparkEnv
val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false)
// 注册CoarseGrainedExecutorBackend
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
// 注册WorkerWatcher
workerUrl.foreach { url =>
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
env.rpcEnv.awaitTermination()
SparkHadoopUtil.get.stopCredentialUpdater()
}
}
def main(args: Array[String]) {
...
run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
System.exit(0)
}
}
SparkEnv#createExecutorEnv
代码逻辑
object SparkEnv extends Logging {
private[spark] val driverSystemName = "sparkDriver"
private[spark] val executorSystemName = "sparkExecutor"
/**
* 创建一个coarse-grained(粗粒度)的Executor SparkEnv
*/
private[spark] def createExecutorEnv(
conf: SparkConf,
executorId: String,
hostname: String,
port: Int,
numCores: Int,
ioEncryptionKey: Option[Array[Byte]],
isLocal: Boolean): SparkEnv = {
val env = create(
conf,
executorId,
hostname,
hostname,
port,
isLocal,
numCores,
ioEncryptionKey
)
SparkEnv.set(env)
env
}
private def create(
conf: SparkConf,
executorId: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
isLocal: Boolean,
numUsableCores: Int,
ioEncryptionKey: Option[Array[Byte]],
listenerBus: LiveListenerBus = null,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
// 判断是否是Driver
val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER
// Driver必须有ListenerBus
if (isDriver) {
assert(listenerBus != null, "Attempted to create driver SparkEnv with null listener bus!")
}
// 创建SecurityManager
val securityManager = new SecurityManager(conf, ioEncryptionKey)
ioEncryptionKey.foreach { _ =>
if (!securityManager.isSaslEncryptionEnabled()) {
logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " +
"wire.")
}
}
// 启动应用名称
val systemName = if (isDriver) driverSystemName else executorSystemName
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf,
securityManager, clientMode = !isDriver)
// 指定RpcEnv实际绑定端口号,防止端口为0,或者被占用。
// 在非驱动模式下,可能因为不会监听传入的链接,导致RpcEnv地址可能为空。
if (isDriver) {
conf.set("spark.driver.port", rpcEnv.address.port.toString)
} else if (rpcEnv.address != null) {
conf.set("spark.executor.port", rpcEnv.address.port.toString)
logInfo(s"Setting spark.executor.port to: ${rpcEnv.address.port.toString}")
}
...
// 创建序列化器
val serializer = instantiateClassFromConf[Serializer](
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
logDebug(s"Using serializer: ${serializer.getClass}")
val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
val closureSerializer = new JavaSerializer(conf)
def registerOrLookupEndpoint(
name: String, endpointCreator: => RpcEndpoint):
RpcEndpointRef = {
if (isDriver) {
logInfo("Registering " + name)
rpcEnv.setupEndpoint(name, endpointCreator)
} else {
RpcUtils.makeDriverRef(name, conf, rpcEnv)
}
}
// 创建BroadcastManager
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
val mapOutputTracker = if (isDriver) {
new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
} else {
new MapOutputTrackerWorker(conf)
}
// Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint
// requires the MapOutputTracker itself
mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
new MapOutputTrackerMasterEndpoint(
rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
// 指定ShuffleManager
val shortShuffleMgrNames = Map(
"sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
"tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
// 指定内存管理器
val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
val memoryManager: MemoryManager =
if (useLegacyMemoryManager) {
new StaticMemoryManager(conf, numUsableCores)
} else {
UnifiedMemoryManager(conf, numUsableCores)
}
val blockManagerPort = if (isDriver) {
conf.get(DRIVER_BLOCK_MANAGER_PORT)
} else {
conf.get(BLOCK_MANAGER_PORT)
}
val blockTransferService =
new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
blockManagerPort, numUsableCores)
// 创建BlockManagerMaster
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
conf, isDriver)
// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
blockTransferService, securityManager, numUsableCores)
val metricsSystem = if (isDriver) {
// Don't start metrics system right now for Driver.
// We need to wait for the task scheduler to give us an app ID.
// Then we can start the metrics system.
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
} else {
// We need to set the executor ID before the MetricsSystem is created because sources and
// sinks specified in the metrics configuration file will want to incorporate this executor's
// ID into the metrics they report.
conf.set("spark.executor.id", executorId)
val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
ms.start()
ms
}
// 创建OutputCommitCoordinator
val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
new OutputCommitCoordinator(conf, isDriver)
}
val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)
// 创建SparkEnv
val envInstance = new SparkEnv(
executorId,
rpcEnv,
serializer,
closureSerializer,
serializerManager,
mapOutputTracker,
shuffleManager,
broadcastManager,
blockManager,
securityManager,
metricsSystem,
memoryManager,
outputCommitCoordinator,
conf)
// Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is
// called, and we only need to do it for driver. Because driver may run as a service, and if we
// don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.
if (isDriver) {
val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
envInstance.driverTmpDir = Some(sparkFilesDir)
}
envInstance
}
3.4 RpcEnv
RpcEnv不仅从外部接口与Akka基本一致,在内部的实现上,也基本差不多,都是按照MailBox的设计思路来实现的;
RpcEnv Server模型:
RpcEnv Client模型:
如上图所示,RpcEnv即充当着Server,同时也提供内部实现Client。 当作为Server时,RpcEnv会初始化一个Server,并注册NettyRpcHandler,一般情况下,简单业务可以在RpcHandler直接完成请求的处理,但是考虑一个RpcEnv的Server上会挂载了很多个RpcEndpoint,每个RpcEndpoint的RPC请求频率不可控,因此需要对一定的分发机制和队列来维护这些请求,其中Dispatcher为分发器,InBox即为请求队列;
在将RpcEndpoint注册到RpcEnv过程中,也间接的将RpcEnv注册到Dispatcher分发器中,Dispatcher针对每个RpcEndpoint维护一个InBox,在Dispatcher维持一个线程池(线程池大小默认为系统可用的核数,当然也可以通过spark.rpc.netty.dispatcher.numThreads进行配置),线程针对每个InBox里面的请求进行处理。当然实际的处理过程是由RpcEndpoint来完成。
其次RpcEnv也完成Client的功能实现,RpcEndpointRef是以RpcEndpoint为单位,即如果一个进程需要和远程机器上N个RpcEndpoint服务进行通信,就对应N个RpcEndpointRef(后端的实际的网络连接是公用,这个是TransportClient内部提供了连接池来实现的),当调用一个RpcEndpointRef的ask/send等接口时候,会将把“消息内容+RpcEndpointRef+本地地址”一起打包为一个RequestMessage,交由RpcEnv进行发送。注意这里打包的消息里面包括RpcEndpointRef本身是很重要的,从而可以由Server端识别出这个消息对应的是哪一个RpcEndpoint。
和发送端一样,在RpcEnv中,针对每个remote端的host:port维护一个队列,即OutBox,RpcEnv的发送仅仅是把消息放入到相应的队列中,但是和发送端不一样的是:在OutBox中没有维护一个所谓的线程池来定时清理OutBox,而是通过一堆synchronized来实现的
下面我们看下RpcEnv相关类图: