0
点赞
收藏
分享

微信扫一扫

基于Netty手写RPC框架

兵部尚输 2022-04-27 阅读 114

1. RPC 框架

什么是RPC框架?他的全称是remoto proceduer call,他旨在让我们开发者能够像调用本地服务一样调用远程服务。

最常见的就是DUBBO框架。

举一个栗子:

比如在我们的电商系统里面有一个用户服务和商品服务,然后在商品服务中有一个查询商品的接口方法,我们只需要在商品服务中暴露出一个接口,然后将其上传至maven仓库中,这样在我们的用户服务中就可以直接调用了。

image-20220421224847554

RPC框架可以让我们不需要去关心网络通信的复杂细节和复杂组成,只需要聚焦业务的处理即可,所以极大降低了我们开发的门槛。

从上述例子中可以看出使用RPC是很简单的,那如何手写一个RPC框架呢?我们需要遵循什么样的规范呢?我们可以看看下面这张图,了解RPC框架里面都都有哪些内容。

image-20220421224926146

2. 手写RPC框架

我们可以根据上图大致知道一个RPC框架大致分为三部分:消费者、提供者以及消息协议部分。那么接下来我们就可以首先将各个模块创建好。我这里还额外创建了一个netty-rpc-api模块,用来编写对应接口,供消费者及提供者调用。大致结构如下:

image-20220423202115457

我们首先按照整个结构进行初步的分析:

  • netty-rpc-api、netty-rpc-consumer、netty-rpc-provider

    这三个模块为我们的业务代码

  • netty-protocol

    该模块为我们的手写框架,供消费者和提供者调用,实现服务的远程调用。

    netty-rpc-provider 依赖他进行服务的发布,netty-rpc-consumer依赖他进行服务的消费

2.1 框架模块 netty-protocol

我们首先来编写我们的基础版RPC框架,以供客户端和服务端调用。

2.1.1 消息协议部分

在编写框架之前,我们首先定义好我们的消息协议。那么我们的消息头包含那些内容呢?魔数?序列化类型?请求类型?没错,这些都是我们需要的,需要我们定义如下实体类。

@Data
@AllArgsConstructor
public class Header implements Serializable {

    private short magic; // 魔数 2个字节

    private byte serialType; //序列化类型 1个字节

    private byte reqType; // 消息类型 1个字节

    private long requestId; // 请求ID 8个字节

    private int length; //消息体长度 4个字节
    
}

定义好请求头,我们还需要定义Request类,Response类以及消息体类。

@Data
public class RpcRequest implements Serializable {

    private String className; // 类名

    private String methodName; //请求目标方法名

    private Object[] params; // 请求参数

    private Class<?>[] paramsTypes; // 参数类型

}
@Data
public class RpcResponse  implements Serializable {

    private Object data;

    private String msg;
}
@Data
public class RpcProtocol<T> implements Serializable {

    private Header header;

    private T content;

}

以上便是我们初步的消息协议部分,做完这些后,从上图的内容可以看出,我们下一步应该开始编写序列化的相关部分。

这里顺带将后面需要用到的两个实体类贴上,主要处理客户端异步返回结果至指定的请求。

public class RequestHolder {

    /*
    原子性 请求ID
     */
    public static final AtomicLong REQUEST_ID = new AtomicLong();

    /*
    保存请求ID和返回数据的关系
     */
    public static final Map<Long, RpcFuture> REQUEST_MAP = new ConcurrentHashMap<>();
}
@Data
public class RpcFuture<T> {

    private Promise<T> promise;

    public RpcFuture(Promise<T> promise) {
        this.promise = promise;
    }

}

2.1.2 序列化部分

我们可以有多种序列化类型,所以我们首先创建一个序列化接口。

public interface ISerializer {

    /*
    序列化接口
     */
    <T> byte[] serializer(T obj);

    /*
    反序列化接口
     */
    <T> T deserializer(byte[] data, Class<T> clazz);

    /*
    序列化类型
     */
    byte getType();

}

我们这里主要采用两种序列化方式,一种是Java本身的序列化,一种是直接使用JSON进行序列化。

我们只需要创建类实现上方的接口,然后实现相关的方法即可。方法相对比较简单,就不过多讲述。

public class JavaSerializer implements ISerializer {

    @Override
    public <T> byte[] serializer(T obj) {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = null;
        try {
            oos = new ObjectOutputStream(bos);
            oos.writeObject(obj); //序列化
            return bos.toByteArray();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return new byte[0];
    }

    @Override
    public <T> T deserializer(byte[] data, Class<T> clazz) {
        try {
            ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data));
            return (T) ois.readObject();
        } catch (IOException | ClassNotFoundException e) {
            e.printStackTrace();
        }
        return null;
    }

    @Override
    public byte getType() {
        return SerialType.JAVA_SERIAL.code();
    }
}
public class JsonSerializer implements ISerializer {

    @Override
    public <T> byte[] serializer(T obj) {
        return JSON.toJSONString(obj).getBytes(StandardCharsets.UTF_8);
    }

    @Override
    public <T> T deserializer(byte[] data, Class<T> clazz) {
        return JSON.parseObject(new String(data), clazz);
    }

    @Override
    public byte getType() {
        return SerialType.JSON_SERIAL.code();
    }
}

实现完序列化功能后,我们在编写一个管理器供外部调用,类似与我们的工厂模式。

在管理类花总初始化好两个序列化类,然后外部通过传递对应的key获取即可。

public class SerializerManager {

    private final static ConcurrentHashMap<Byte, ISerializer> serializer = new ConcurrentHashMap<>();

    static {
        ISerializer json = new JsonSerializer();
        ISerializer java = new JavaSerializer();
        serializer.put(json.getType(), json);
        serializer.put(java.getType(), java);
    }

    public static ISerializer getSerializer(byte key) {
        ISerializer iSerializer = serializer.get(key);
        if (serializer == null) {
            return new JavaSerializer();
        }
        return iSerializer;
    }
}

2.1.3 常量部分

由于有一些常量经常使用到,为了方便管理,我们定义相关的类存储信息。包括序列化信息,请求类型信息,魔数,header长度等。

public class RpcConstant {

    public final static short MAGIC = 0xca; // 魔树

    public final static int HEAD_TOTAL_LEN = 16; //header总的字节数量

}
public enum ReqType {

    REQUEST((byte) 1),
    RESPONSE((byte) 2),
    HEARTBEAT((byte) 3);


    private byte code;

    ReqType(byte code) {
        this.code = code;
    }

    public byte code() {
        return this.code;
    }

    public static ReqType findByCode(int code) {
        for (ReqType value : ReqType.values()) {
            if (value.code == code) {
                return value;
            }
        }
        return null;
    }
}
public enum SerialType {

    JSON_SERIAL((byte) 1),
    JAVA_SERIAL((byte) 2);


    private byte code;

    SerialType(byte code) {
        this.code = code;
    }

    public byte code() {
        return this.code;
    }
}

2.1.4 编解码器

序列化部分实现玩后,我们就可以编写编解码器了。

编码器

编码器我们需要实现MessageToByteEncoder类,然后重写encode方法。

在这个方法中,我们设置好请求头的相关信息,然后根据请求头中的序列化类型选择相应的方式对内容进行序列化,最后传输数据。

public class RpcEncoder extends MessageToByteEncoder<RpcProtocol<Object>> {
    @Override
    protected void encode(ChannelHandlerContext ctx, RpcProtocol<Object> msg, ByteBuf out) throws Exception {
        System.out.println("============begin RpcEncoder=========");
        Header header = msg.getHeader();
        out.writeShort(header.getMagic());
        out.writeByte(header.getSerialType());
        out.writeByte(header.getReqType());
        out.writeLong(header.getRequestId());
        // 序列化内容
        ISerializer serializer = SerializerManager.getSerializer(header.getSerialType());
        byte[] data = serializer.serializer(msg.getContent());
        out.writeInt(data.length);
        out.writeBytes(data);
    }
}

解码器

解码器我们需要实现ByteToMessageDecoder类,然后重写decode方法。

解码器这里我们做的操作比较多,这里逐步解释下。

  • 首先如果接受的数据长度小于头部的总长度则直接返回。

  • 这里使用markReaderIndex标记索引,后期可以还原数据。

    我们首先读取两个字节的魔数,判读是否相等;

    然后按照消息协议中定义好的顺序逐步读取序列化类型、请求类型以及消息体的内容。

  • 最后根据请求类型以及序列化类型对数据进行反序列化后返回

public class RpcDecoder extends ByteToMessageDecoder {


    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        System.out.println("========begin RpcDecoder==========");

        if (in.readableBytes() < RpcConstant.HEAD_TOTAL_LEN) {
            return;
        }
        in.markReaderIndex(); //标记读取开始索引
        short maci = in.readShort(); //读取2个字节的magic
        if (maci != RpcConstant.MAGIC) {
            throw new IllegalArgumentException("Illegal request parameter 'magic'," + maci);
        }

        byte serialType = in.readByte(); //读取一个字节的序列化类型
        byte reqType = in.readByte(); //读取一个字节的消息类型
        long requestId = in.readLong(); //读取请求id
        int dataLength = in.readInt(); //读取数据报文长度

        if (in.readableBytes() < dataLength) {
            in.resetReaderIndex(); // 还原数据
            return;
        }
        //读取消息体的内容
        byte[] content = new byte[dataLength];
        in.readBytes(content);

        Header header = new Header(maci, serialType, reqType, requestId, dataLength);
        ISerializer serializer = SerializerManager.getSerializer(serialType);//获得序列化类型
        ReqType rt = ReqType.findByCode(reqType);//获得请求类型
        switch (rt) {
            case REQUEST:
                // 将内容反序列化
                RpcRequest request = serializer.deserializer(content, RpcRequest.class);
                // 最好的返回体
                RpcProtocol<RpcRequest> reqProtocol = new RpcProtocol<>();
                reqProtocol.setHeader(header);
                reqProtocol.setContent(request);
                // 传递
                out.add(reqProtocol);
                break;
            case RESPONSE:
                RpcResponse response = serializer.deserializer(content, RpcResponse.class);
                RpcProtocol<RpcResponse> resProtocol = new RpcProtocol<>();
                resProtocol.setHeader(header);
                resProtocol.setContent(response);
                out.add(resProtocol);
                break;
            case HEARTBEAT:
                //TODO
                break;
            default:
                break;
        }

    }
}

2.1.5 NETTY服务端

上述的基础内容编写完毕后,我们就可以开始编写网络通信部分了。

首先开始编写服务端,我们直接按照netty的基础使用进行编码。

NettyServer类主要是服务端代码,我们定义好主从节点,然后将其绑定在ServerBootstrap上,选择需要使用的模型,指定好事件处理器,最后绑定好IP及端口,注意使用sync阻塞。

public class NettyServer {

    private String serverAddress; //服务地址
    private int serverPort; //端口

    public NettyServer(String serverAddress, int serverPort) {
        this.serverAddress = serverAddress;
        this.serverPort = serverPort;
    }

    public void startNettyServer() {
        System.out.println("begin start Netty server");
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();

        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(boss, worker)
                .channel(NioServerSocketChannel.class)
                .childHandler(new RpcServerInitializer());
        try {
            ChannelFuture future = bootstrap.bind(this.serverAddress, this.serverPort).sync();
            System.out.println("Server started Success on Port" + this.serverPort);
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

初始化这里绑定好pipeline,由于我们的header总长度为16,所以需要设置偏移量为12以及length为4;同时绑定好编解码器以及指定事件处理器。

public class RpcServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().
                addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,
                        12,
                        4,
                        0,
                        0))
                .addLast(new RpcDecoder())
                .addLast(new RpcEncoder())
                .addLast(new RpcServerHandler());
    }
}

事件处理器是我们这边的重点,主要需要通过反射去调用对应的方法。

public class RpcServerHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcRequest>> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcRequest> msg) throws Exception {
        RpcProtocol<RpcResponse> resProtocol = new RpcProtocol<>();
        // 处理服务端返回对象
        Header header = msg.getHeader();
        header.setReqType(ReqType.RESPONSE.code());
        // 通过反射调用对应方法
        Object result = invoke(msg.getContent());
        
        resProtocol.setHeader(header);
        
        RpcResponse response = new RpcResponse();
        response.setData(result);
        response.setMsg("success");
        
        resProtocol.setContent(response);

        ctx.writeAndFlush(resProtocol);
    }

    // 通过反射进行调用
    private Object invoke(RpcRequest request) {
        try {
            // 反射加载
            Class<?> clazz = Class.forName(request.getClassName());
            // 加载实例
            Object bean = SpringBeanManager.getBean(clazz);
            // 加载实例调用的方法
            Method method = clazz.getDeclaredMethod(request.getMethodName(), request.getParamsTypes());
            // 通过反射调用
            return method.invoke(bean, request.getParams());
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (NoSuchMethodException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (InvocationTargetException e) {
            e.printStackTrace();
        }
        return null;
    }
}

这里我们直接将请求体中的主体内容传递到invoke方法中,这是主要是一些反射的常用操作,我们这里通过Spring容器来根据class获取相应的实体类,工具类如下:

@Component
public class SpringBeanManager implements ApplicationContextAware {

    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        SpringBeanManager.applicationContext = applicationContext;
    }

    /*
    根据类获取实例
     */
    public static <T> T getBean(Class<T> clzz) {
        return applicationContext.getBean(clzz);
    }
}

反射调用结束之后,将返回结果封装后返回即可,这就是我们服务端的所有操作,整体来说还是很简单的。

2.1.6 NETTY客户端

服务端代码完成后,我们只需要按照之前写客户端的方法编码就行。

总共是两部分,一部分是创建EventLoopGroup以及将其和Bootstrap绑定,然后设置好通信模型和事件处理器即可。最后就是服务端进行数据的发送了。

public class NettyClient {

    private final Bootstrap bootstrap;

    private final EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

    private String serviceAddress;

    private int servicePort;

    public NettyClient(String serviceAddress, int servicePort) {
        System.out.printf("begin init Netty Client,{},{}", serviceAddress, servicePort);
        bootstrap = new Bootstrap();

        bootstrap.group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .handler(new RpcClientInitializer());

        this.serviceAddress = serviceAddress;
        this.servicePort = servicePort;
    }

    // 发送数据包
    public void sendRequest(RpcProtocol<RpcRequest> protocol) throws InterruptedException {
        final ChannelFuture future = bootstrap.connect(this.serviceAddress, this.servicePort).sync();
        //监听是否连接成功
        future.addListener(listener -> {
            if (future.isSuccess()) {
                System.out.printf("connect rpc server {} success.", this.serviceAddress);
            } else {
                System.out.printf("connect rpc server {} failed. ", this.serviceAddress);
                future.cause().printStackTrace();
                eventLoopGroup.shutdownGracefully();
            }
        });
        System.out.println("begin transfer data");
        future.channel().writeAndFlush(protocol);
    }
}
public class RpcClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        System.out.println("begin RpcClientInitializer");
        ch.pipeline().addLast(
                new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,
                        12,
                        4,
                        0, 0))
                .addLast(new LoggingHandler()) // 日志处理
                .addLast(new RpcEncoder())
                .addLast(new RpcDecoder())
                .addLast(new RpcClientHandler());
    }
}

我们在编写客户端的事件处理器时,需要考虑到结果返回到对应的请求上,所以这里每一个请求都有一个对应的Promise

public class RpcClientHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcResponse>> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcResponse> msg) throws Exception {
        System.out.println("receive Rpc Server Result");
        long requestId = msg.getHeader().getRequestId();
        //根据ID获得异步对象
        RpcFuture<RpcResponse> future = RequestHolder.REQUEST_MAP.remove(requestId);
        future.getPromise().setSuccess(msg.getContent()); //返回结果
    }
}

以上便是我们框架部分所有的代码部分,整体来说还算简单。

2.2 netty-rpc-api

该模块就是提供给消费端和生产者端一个可以调用的接口,并没有其他的代码。

public interface IUserService {

    /**
     * 保存用户信息
     */
    String saveUser(String name);

}

2.3 生产者 netty-rpc-provider

该模块需要实现的主要有两部分。

  • 实现接口

    @Service
    public class UserServiceImpl implements IUserService {
        @Override
        public String saveUser(String name) {
            System.out.println("begin save user:" + name);
            return "save user success:" + name;
        }
    }
    
  • 启动netty服务端,共消费端链接

    这里需要通过ComponentScan注解进行扫描,否则netty-protocol模块进行反射调用的时候会找不到对应的类

    @(basePackages = {"com.rpc.example.service", "com.rpc.example.spring"})
    @SpringBootApplication
    public class NettyRpcProvider {
    
        public static void main(String[] args) {
            SpringApplication.run(NettyRpcProvider.class, args);
            // 启动netty服务端
            new NettyServer("127.0.0.1", 8081).startNettyServer();
        }
    }
    

2.4 消费端 netty-rpc-consumer

那我们如何调用远程服务呢?没错,我们需要通过动态代理获取到某个类的实例,然后调用他的方法。

这里我们创建一个动态代理类。

public class RpcClientProxy {

    public <T> T clientProxy(final Class<T> interfaceCls, final String host, int port) {
        return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(),
                new Class<?>[]{interfaceCls}, new RpcInvokerProxy(host, port));
    }
}

在执行方法中封装好指定的类型、方法名、参数,然后通过Netty客户端进行数据的发送,同时异步等待服务端返回数据。

public class RpcInvokerProxy implements InvocationHandler {

    private String host;

    private int port;

    public RpcInvokerProxy(String host, int port) {
        this.host = host;
        this.port = port;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        System.out.println("begin invoke target server");
        RpcProtocol<RpcRequest> reqProtocol = new RpcProtocol<>();
        long requestId = RequestHolder.REQUEST_ID.incrementAndGet();
        Header header = new Header(RpcConstant.MAGIC, SerialType.JSON_SERIAL.code(),
                ReqType.REQUEST.code(), requestId, 0);
        reqProtocol.setHeader(header);

        RpcRequest request = new RpcRequest();
        request.setClassName(method.getDeclaringClass().getName());
        request.setMethodName(method.getName());
        request.setParamsTypes(method.getParameterTypes());
        request.setParams(args);
        reqProtocol.setContent(request);

        //短链接
        NettyClient nettyClient = new NettyClient(host, port);
        // 通过设置DefaultEventLoop进行轮询获取结果
        RpcFuture<RpcResponse> future = new RpcFuture<>(new DefaultPromise<RpcResponse>(new DefaultEventLoop()));
        // 保存请求ID和返回数据的对应关系
        RequestHolder.REQUEST_MAP.put(requestId, future);
        nettyClient.sendRequest(reqProtocol);
        // 返回异步回调的数据
        return future.getPromise().get().getData();
    }
}

2.5 测试

首先我们启动生产者模块,

image-20220427153106802

然后在消费端编写测试代码。

public class MainTest {

    public static void main(String[] args) {
        RpcClientProxy rcp = new RpcClientProxy();
        IUserService userService = rcp.clientProxy(IUserService.class, "127.0.0.1", 8081);
        System.out.println(userService.saveUser("cc "));
    }
    
}

image-20220427153145682

image-20220427153201799

可以看到我们在消费端成功调用了生产端的接口实现,至此,我们就成功实现了一个简单的远程服务调用。

3. 项目地址

基于NETTY首先RPC框架

举报

相关推荐

0 条评论