0
点赞
收藏
分享

微信扫一扫

手写RPC框架(六)整合Netty

sullay 2022-02-19 阅读 83

Python微信订餐小程序课程视频

https://edu.csdn.net/course/detail/36074

Python实战量化交易理财系统

https://edu.csdn.net/course/detail/35475

手写RPC框架(六)整合Netty

Netty简介:

Netty是一个基于NIO的,提供异步,事件驱动的网络应用工具,具有高性能高可靠性等特点。

使用传统的Socket来进行网络通信,服务端每一个连接都要新建一个线程,清楚处理完成后通过输出流返回给客户端。而Netty通过NIO的方式,服务端实现为一个请求一个线程,客户端发送的连接请求会注册到多路复用器上,多路复用器轮询到连接有I/O请求时才会启动一个线程进行处理。

这次我们通过Netty来实现网络通信,替代Socket,提高框架性能。

  1. 引入Netty


 io.netty
 netty-all
 4.1.73.Final


  1. netty服务端
public class NettyServer{
    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
    private String serverAddress; //启动地址
    private int serverPort; //启动端口

    private EventLoopGroup boss = null;
    private EventLoopGroup worker = null;

    public NettyServer(String serverAddress, int serverPort) {
        this.serverAddress = serverAddress;
        this.serverPort = serverPort;
    }

    public void startNettyServer() throws Exception {
        //netty调度模块,负责接收请求
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        //netty调度模块,负责处理请求
        NioEventLoopGroup workGroup = new NioEventLoopGroup();
        //启动类
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup,workGroup);
        bootstrap.channel(NioServerSocketChannel.class);
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                //传输数据的channel
                ChannelPipeline pipeline = ch.pipeline();
                //解码器
                pipeline.addLast(new StringDecoder());
                //编码器
                pipeline.addLast(new StringEncoder());
                //业务逻辑
                pipeline.addLast(new RpcServerHandler());
            }
        });

        try {
            //端口绑定
            ChannelFuture sync = bootstrap.bind(serverAddress, serverPort).sync();
            sync.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }


    @PreDestroy
    public void destory() throws InterruptedException {
        boss.shutdownGracefully().sync();
        worker.shutdownGracefully().sync();
        logger.info("关闭Netty");
    }
}

在这里通过startServer方法会启动netty服务端,当有请求时,会进入**RpcServerHandler()**方法中进行处理。

@ChannelHandler.Sharable
public class RpcServerHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) {
        //msg为接收到的请求信息
        RpcResponse response = new RpcResponse();
        //将请求信息解码
        RpcRegisterEntity rpcRegisterEntity=JSON.parseObject(msg,RpcRegisterEntity.class);
        //通过反射得到远程调用的类并执行该方法
        Object result = invoke(rpcRegisterEntity);
        try {
            //返回体
            response.setResult(result);
        } catch (Exception exception) {
            exception.printStackTrace();
            response.setException(exception);
        }
        //写入返回数据
        ctx.writeAndFlush(JSON.toJSONString(response));
    }

    private Object invoke(RpcRegisterEntity entity) {
        try {
            //接口名
            String interfaceName = entity.getServiceImplClassFullName();
// String implClassName = RegisterCenter.getProviderData(interfaceName);
            //类名
            String implClassName = entity.getServiceImplClassFullName();
            Class<?> clazz = Class.forName(implClassName);
            String methodName = entity.getMethodName();
            Class<?>[] parameterTypes = entity.getParameterTypes();
            Object[] parameters = entity.getParameters();
            Method method = clazz.getMethod(methodName, parameterTypes);
            //通过反射得到结果
            return method.invoke(clazz.newInstance(), parameters);
        } catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException | IllegalAccessException | InstantiationException e) {
            e.printStackTrace();
            return e;
        }
    }
    //当Channel处理于活动状态时被调用
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
// System.out.println(ctx.channel().remoteAddress().toString());
        super.channelActive(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// System.out.println(JSON.toJSONString(cause));
        super.exceptionCaught(ctx, cause);
    }
}

  1. netty消费端
public class RpcClient{
    private static final Logger logger = LoggerFactory.getLogger(RpcClient.class);
    private EventLoopGroup group;

    private Channel channel;

    private String ip;

    private int port;

    private RpcConsumerHandler rpcConsumerHandler=new RpcConsumerHandler();

    private ExecutorService executorService = Executors.newCachedThreadPool();

    public RpcClient(String ip, int port) {
        this.ip = ip;
        this.port = port;
        initClient();
    }

    public void initClient() {
        try {
            //1.创建线程组
            group = new NioEventLoopGroup();
            //2.创建启动助手
            Bootstrap bootstrap = new Bootstrap();
            //3.设置参数
            bootstrap.group(group)
                    //传输数据用的channel
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) throws Exception {
                            ChannelPipeline pipeline = channel.pipeline();
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new StringDecoder());
                            //添加客户端处理类
                            pipeline.addLast(rpcConsumerHandler);
                        }
                    });
            //4.连接Netty服务端
            connect(bootstrap, ip, port, 5);
        } catch (Exception exception) {
            exception.printStackTrace();
            if (channel != null) {
                channel.close();
            }
            if (group != null) {
                group.shutdownGracefully();
            }
        }
    }

    private void connect(Bootstrap bootstrap, String host, int port, int retry) {
        ChannelFuture channelFuture = bootstrap.connect(host, port).addListener(future -> {
            if (future.isSuccess()) {
                logger.info("连接服务端成功");
            } else if (retry == 0) {
                logger.error("重试次数已用完,放弃连接");
            } else {
                //第几次重连:
                int order = (5 - retry) + 1;
                //本次重连的间隔
                int delay = 1 << order;
                logger.error("{} : 连接失败,第 {} 重连....", new Date(), order);
                bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit.SECONDS);
            }
        });
        channel = channelFuture.channel();
    }
    /**
 * 提供给调用者主动关闭资源的方法
 */
    public void close() {
        if (channel != null) {
            channel.close();
        }
        if (group != null) {
            group.shutdownGracefully();
        }
    }

    /**
 * 提供消息发送的方法
 */
    public Object send(String msg) throws ExecutionException, InterruptedException {
        rpcConsumerHandler.setRequestMsg(msg);
        Future submit = executorService.submit(rpcConsumerHandler);
        return submit.get();
    }

    public void destroy() throws Exception {
        if (channel != null) {
            channel.close();
        }
        if (group != null) {
            group.shutdownGracefully();
        }
    }

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }
}

当创建好客户端时,发送请求时数据会交由**rpcConsumerHandler**处理,

public class RpcConsumerHandler extends SimpleChannelInboundHandler<String> implements Callable {
    ChannelHandlerContext context;
    //发送的消息
    String requestMsg;

    //服务端返回的消息
    String responseMsg;

    public void setRequestMsg(String requestMsg) {
        this.requestMsg = requestMsg;
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        context = ctx;
    }
    //接收由服务端返回的数据
    @Override
    protected synchronized void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
        System.out.println("客户端结果:"+msg);
        responseMsg = msg;
        //唤醒等待的线程
        notify();
    }
    //发送数据
    @Override
    public synchronized Object call() throws Exception {
        //消息发送
        context.writeAndFlush(requestMsg);
        //线程等待
        wait();
        return responseMsg;
    }
}

  1. 调用
//新建连接
RpcClient rpcClient = getClient(rpcRegisterEntity.getHost(), rpcRegisterEntity.getPort());
//发送数据
Object responseMsg = rpcClient.send(JSON.toJSONString(rpcRegisterEntity));
//解析返回的数据
RpcResponse rpcResponse = JSON.parseObject(responseMsg.toString(), RpcResponse.class);

举报

相关推荐

0 条评论