Python微信订餐小程序课程视频
https://edu.csdn.net/course/detail/36074
Python实战量化交易理财系统
https://edu.csdn.net/course/detail/35475
手写RPC框架(六)整合Netty
Netty简介:
Netty是一个基于NIO的,提供异步,事件驱动的网络应用工具,具有高性能高可靠性等特点。
使用传统的Socket来进行网络通信,服务端每一个连接都要新建一个线程,清楚处理完成后通过输出流返回给客户端。而Netty通过NIO的方式,服务端实现为一个请求一个线程,客户端发送的连接请求会注册到多路复用器上,多路复用器轮询到连接有I/O请求时才会启动一个线程进行处理。
这次我们通过Netty来实现网络通信,替代Socket,提高框架性能。
- 引入Netty
io.netty
netty-all
4.1.73.Final
- netty服务端
public class NettyServer{
private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
private String serverAddress; //启动地址
private int serverPort; //启动端口
private EventLoopGroup boss = null;
private EventLoopGroup worker = null;
public NettyServer(String serverAddress, int serverPort) {
this.serverAddress = serverAddress;
this.serverPort = serverPort;
}
public void startNettyServer() throws Exception {
//netty调度模块,负责接收请求
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
//netty调度模块,负责处理请求
NioEventLoopGroup workGroup = new NioEventLoopGroup();
//启动类
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//传输数据的channel
ChannelPipeline pipeline = ch.pipeline();
//解码器
pipeline.addLast(new StringDecoder());
//编码器
pipeline.addLast(new StringEncoder());
//业务逻辑
pipeline.addLast(new RpcServerHandler());
}
});
try {
//端口绑定
ChannelFuture sync = bootstrap.bind(serverAddress, serverPort).sync();
sync.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
@PreDestroy
public void destory() throws InterruptedException {
boss.shutdownGracefully().sync();
worker.shutdownGracefully().sync();
logger.info("关闭Netty");
}
}
在这里通过startServer方法会启动netty服务端,当有请求时,会进入**RpcServerHandler()
**方法中进行处理。
@ChannelHandler.Sharable
public class RpcServerHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
//msg为接收到的请求信息
RpcResponse response = new RpcResponse();
//将请求信息解码
RpcRegisterEntity rpcRegisterEntity=JSON.parseObject(msg,RpcRegisterEntity.class);
//通过反射得到远程调用的类并执行该方法
Object result = invoke(rpcRegisterEntity);
try {
//返回体
response.setResult(result);
} catch (Exception exception) {
exception.printStackTrace();
response.setException(exception);
}
//写入返回数据
ctx.writeAndFlush(JSON.toJSONString(response));
}
private Object invoke(RpcRegisterEntity entity) {
try {
//接口名
String interfaceName = entity.getServiceImplClassFullName();
// String implClassName = RegisterCenter.getProviderData(interfaceName);
//类名
String implClassName = entity.getServiceImplClassFullName();
Class<?> clazz = Class.forName(implClassName);
String methodName = entity.getMethodName();
Class<?>[] parameterTypes = entity.getParameterTypes();
Object[] parameters = entity.getParameters();
Method method = clazz.getMethod(methodName, parameterTypes);
//通过反射得到结果
return method.invoke(clazz.newInstance(), parameters);
} catch (ClassNotFoundException | NoSuchMethodException | InvocationTargetException | IllegalAccessException | InstantiationException e) {
e.printStackTrace();
return e;
}
}
//当Channel处理于活动状态时被调用
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// System.out.println(ctx.channel().remoteAddress().toString());
super.channelActive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// System.out.println(JSON.toJSONString(cause));
super.exceptionCaught(ctx, cause);
}
}
- netty消费端
public class RpcClient{
private static final Logger logger = LoggerFactory.getLogger(RpcClient.class);
private EventLoopGroup group;
private Channel channel;
private String ip;
private int port;
private RpcConsumerHandler rpcConsumerHandler=new RpcConsumerHandler();
private ExecutorService executorService = Executors.newCachedThreadPool();
public RpcClient(String ip, int port) {
this.ip = ip;
this.port = port;
initClient();
}
public void initClient() {
try {
//1.创建线程组
group = new NioEventLoopGroup();
//2.创建启动助手
Bootstrap bootstrap = new Bootstrap();
//3.设置参数
bootstrap.group(group)
//传输数据用的channel
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new StringEncoder());
pipeline.addLast(new StringDecoder());
//添加客户端处理类
pipeline.addLast(rpcConsumerHandler);
}
});
//4.连接Netty服务端
connect(bootstrap, ip, port, 5);
} catch (Exception exception) {
exception.printStackTrace();
if (channel != null) {
channel.close();
}
if (group != null) {
group.shutdownGracefully();
}
}
}
private void connect(Bootstrap bootstrap, String host, int port, int retry) {
ChannelFuture channelFuture = bootstrap.connect(host, port).addListener(future -> {
if (future.isSuccess()) {
logger.info("连接服务端成功");
} else if (retry == 0) {
logger.error("重试次数已用完,放弃连接");
} else {
//第几次重连:
int order = (5 - retry) + 1;
//本次重连的间隔
int delay = 1 << order;
logger.error("{} : 连接失败,第 {} 重连....", new Date(), order);
bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit.SECONDS);
}
});
channel = channelFuture.channel();
}
/**
* 提供给调用者主动关闭资源的方法
*/
public void close() {
if (channel != null) {
channel.close();
}
if (group != null) {
group.shutdownGracefully();
}
}
/**
* 提供消息发送的方法
*/
public Object send(String msg) throws ExecutionException, InterruptedException {
rpcConsumerHandler.setRequestMsg(msg);
Future submit = executorService.submit(rpcConsumerHandler);
return submit.get();
}
public void destroy() throws Exception {
if (channel != null) {
channel.close();
}
if (group != null) {
group.shutdownGracefully();
}
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
}
当创建好客户端时,发送请求时数据会交由**rpcConsumerHandler
**处理,
public class RpcConsumerHandler extends SimpleChannelInboundHandler<String> implements Callable {
ChannelHandlerContext context;
//发送的消息
String requestMsg;
//服务端返回的消息
String responseMsg;
public void setRequestMsg(String requestMsg) {
this.requestMsg = requestMsg;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
context = ctx;
}
//接收由服务端返回的数据
@Override
protected synchronized void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
System.out.println("客户端结果:"+msg);
responseMsg = msg;
//唤醒等待的线程
notify();
}
//发送数据
@Override
public synchronized Object call() throws Exception {
//消息发送
context.writeAndFlush(requestMsg);
//线程等待
wait();
return responseMsg;
}
}
- 调用
//新建连接
RpcClient rpcClient = getClient(rpcRegisterEntity.getHost(), rpcRegisterEntity.getPort());
//发送数据
Object responseMsg = rpcClient.send(JSON.toJSONString(rpcRegisterEntity));
//解析返回的数据
RpcResponse rpcResponse = JSON.parseObject(responseMsg.toString(), RpcResponse.class);