flink1.19源码学习-RPC通信
RPC概念
RPC,即远程过程调用(Remote Procedure Call),是一种通过网络从远程计算机程序上请求服务的技术,而无需了解底层网络技术的协议。在RPC中,客户机和服务器位于不同的机器上,客户端通过网络调用在服务器端运行的过程,并将结果发送回客户机。这种技术允许程序像调用本地过程一样调用远程过程,使得跨平台、跨机器的服务调用成为可能。
1.两个进程间的相互调用
2.集群中不同节点服务的通信
在flink中RPC通信主要用的是Apache pekko框架。pekko是akka的一个分支。感兴趣的小伙伴自行了解
案例
通过一个小的案例,帮助我们迅速了解pekko该如何使用
pom.xml
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-actor_2.12</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.pekko</groupId>
<artifactId>pekko-remote_2.12</artifactId>
<version>1.0.1</version>
</dependency>
PekkoData
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class PekkoData {
private String info;
}
PekkoRpcReceiverActor
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.japi.pf.ReceiveBuilder;
public class PekkoRpcReceiverActor extends AbstractActor {
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
/**接收到PekkoData消息交给handleMessage处理
*/
.match(PekkoData.class, this::handleMessage)
.build();
}
private void handleMessage(PekkoData message) {
/** 获取发送者,发送者对应的就是actorRef */
ActorRef sender = getSender();
ActorRef self = getSelf();
/** 打印 */
System.out.println("PekkoRpcReceiverActor类收到:" +sender + ":发送的=>" + message.getInfo());
/** 回复消息 向发送者sender 回复word 的消息 回复者是当前actorRef*/
/** 4、Actor 可以改变他自身的状态,可以接收消息,也可以发送消息,还可以生成新的 Actor */
sender.tell(new PekkoData("word"),self);
}
}
PekkoRpcSenderActor
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.japi.pf.ReceiveBuilder;
/**
* 继承AbstractActor定义自己的actor
* Actor可以发送和接收消息
*/
public class PekkoRpcSenderActor extends AbstractActor {
/**
* 实现接收消息
* @return
*/
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
/**接收到PekkoData消息交给handleMessage处理
* flink PekkoRpcActor 155行也是这样处理的
*/
.match(PekkoData.class, this::handleMessage)
.build();
}
private void handleMessage(final PekkoData message) {
/** 获取发送者,发送者对应的就是actorRef */
ActorRef sender = getSender();
/** 打印 */
System.out.println("PekkoRpcSenderActor类收到:" +sender + ":发送的=>" + message.getInfo());
}
}
Demo
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Props;
public class Demo {
public static void main(String[] args) {
/**创建actorSystem*/
ActorSystem actorSystem = ActorSystem.create("flink");
/**构建PekkoRpcActor的ActorRef*/
ActorRef pekkoRpcRef = actorSystem.actorOf(Props.create(PekkoRpcReceiverActor.class), "PekkoRpcReceiverActor");
/**构建PekkoRpcSenderActor的ActorRef*/
ActorRef pekkoRpcSenderRef = actorSystem.actorOf(Props.create(PekkoRpcSenderActor.class), "PekkoRpcSenderActor");
/** pekkoRpcSenderActor作为发送者 向PekkoRpcActor发送 hello*/
pekkoRpcRef.tell(new PekkoData("hello"),pekkoRpcSenderRef);
}
}
结果
pekko特性如下:
1、ActorSystem 是管理 Actor 生命周期的组件,Actor 是负责进行通信的组件
2、每个 Actor 都有一个 MailBox,别的 Actor 发送给它的消息都首先储存在 MailBox 中,通过这种方式可以实现异步通信。
3、每个 Actor 是单线程的处理方式,不断的从 MailBox 拉取消息执行处理,所以对于 Actor 的消息处理,不适合调用会阻塞的处理方法。
4、Actor 可以改变他自身的状态,可以接收消息,也可以发送消息,还可以生成新的 Actor
5、每一个 ActorSystem 和 Actor都在启动的时候会给定一个 name,如果要从 ActorSystem 中获取一个 Actor,则通过以下的方式来进行 Actor 的
获取:pekko.tcp://flink@localhost:6123/user/rpc/resourcemanager_* 来进行定位
6、如果一个 Actor 要和另外一个 Actor 进行通信,则必须先获取对方 Actor 的 ActorRef 对象,然后通过该对象发送消息即可。
7、通过 tell 发送异步消息,不接收响应,通过 ask 发送异步消息,得到 Future 返回,通过异步回到返回处理结果。
8.如果构建actor进行通信,Pekko版本中必须继承AbstractActor 实现createReceive()方法