0
点赞
收藏
分享

微信扫一扫

SpringBoot使用Netty实现远程调2.0


SpringBoot使用Netty实现远程调2.0

前言

不久之前溪源我发布了一篇博客时关于使用如何使用​​Netty​​​实现远程调用,之前做的只是一个简单的​​demo​​,最近在此基础上,进行了进一步的扩展:包括使用反射机制获取类、异常的统一处理等,虽然有一定程度上的改进,但还是有一定程度上的不足,我会持续更新继续改进的。

SpringBoot使用Netty实现远程调用可参考我的博客:​​SpringBoot使用Netty实现远程调用​​

正文

SpringBoot使用Netty实现远程调2.0

在​​SpringBoot使用Netty实现远程调​​用中我们已经可以实现了通过​​Netty​​实现远程调用的功能了,但是之前做的只是点对点的调用,现在本篇博客是在之前的基础做出了一些改动:

  • 利用​​Java​​反射机制,实现传递接口路径和方法名调用对应的方法
  • 借助​​validation-api​​对请求枚举类参数进行校验:可参考我的博客 SpringBoot使用validation-api实现对枚举类参数校验
  • 通过对自定义注解完成日志的打印:可参考我的博客 SpringBoot通过自定义注解实现日志打印
  • 使用​​knife4j​​对接口进行测试 :可参考我的博客 SpringBoot使用knife4j进行在线接口调试

客户端的部分优化

NettyClientUtil:Netty客户端工具类

/**
* Netty客户端
**/
@Slf4j
public class NettyClientUtil {

/**
* 用于RPC远程调用
* @param commandPOJO
* @return
*/
public static ResponseResult rpcNetty(CommandPOJO commandPOJO) {
String commandPOJOStr = FastJsonUtils.convertObjectToJSON(commandPOJO);
NettyClientHandler nettyClientHandler = new NettyClientHandler();
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap()
.group(group)
//该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输
.option(ChannelOption.TCP_NODELAY, true)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast("decoder", new StringDecoder());
socketChannel.pipeline().addLast("encoder", new StringEncoder());
socketChannel.pipeline().addLast(nettyClientHandler);
}
});
try {
ChannelFuture future = bootstrap.connect("127.0.0.1", 8082).sync();
log.info("客户端发送成功....");
//发送消息
future.channel().writeAndFlush(commandPOJOStr);
// 等待连接被关闭
future.channel().closeFuture().sync();
return nettyClientHandler.getResponseResult();
} catch (Exception e) {
log.error("客户端Netty失败", e);
throw new BusinessException(CouponTypeEnum.OPERATE_ERROR);
} finally {
//以一种优雅的方式进行线程退出
group.shutdownGracefully();
}
}

}

NettyClientHandler:客户端处理类

/**
* 客户端处理器
**/
@Slf4j
@Setter
@Getter
public class NettyClientHandler extends ChannelInboundHandlerAdapter {


private ResponseResult responseResult;


@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("客户端Active .....");
}


@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("客户端收到消息: {}", msg.toString());
this.responseResult = (ResponseResult) FastJsonUtils.convertJsonToObject(msg.toString(),ResponseResult.class);
ctx.close();
}


@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

服务端部分优化

NettyServer:服务启动监听器

/**
* 服务启动监听器
**/
@Slf4j
public class NettyServer {

public void start() {
InetSocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 8082);
//new 一个主线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//new 一个工作线程组
EventLoopGroup workGroup = new NioEventLoopGroup(200);
ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ServerChannelInitializer())
.localAddress(socketAddress)
//设置队列大小
.option(ChannelOption.SO_BACKLOG, 1024)
// 两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
.childOption(ChannelOption.SO_KEEPALIVE, true);
//绑定端口,开始接收进来的连接
try {
ChannelFuture future = bootstrap.bind(socketAddress).sync();
log.info("服务器启动开始监听端口: {}", socketAddress.getPort());
future.channel().closeFuture().sync();
} catch (Exception e) {
log.error("服务器开启失败", e);
} finally {
//关闭主线程组
bossGroup.shutdownGracefully();
//关闭工作线程组
workGroup.shutdownGracefully();
}
}
}

NettyServerHandler:服务端处理器

/**
* netty服务端处理器
**/
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {


/**
* 用于获取SpringBean
*/
private static ApplicationContext context = SpringUtil.getApplicationContext();


/**
* 客户端连接会触发
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("Channel active......");
}


/**
* 客户端发消息会触发
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("服务器收到消息: {}", msg.toString());
CommandPOJO commandPOJO = (CommandPOJO) FastJsonUtils.convertJsonToObject(msg.toString(),CommandPOJO.class);
IInvokeService invokeService =context.getBean(IInvokeService.class);
ResponseResult responseResult =invokeService.invokeMethod(commandPOJO);
ctx.write(FastJsonUtils.convertObjectToJSON(responseResult));
ctx.flush();
}


/**
* 发生异常触发
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

InvokeServiceImpl:远程调用的服务类

@Service
@Slf4j
public class InvokeServiceImpl implements IInvokeService {

/**
* 调用方法(带参数)
*
* @param commandPOJO
* @return
*/
@Override
public ResponseResult invokeMethod(CommandPOJO commandPOJO) {
ResponseResult responseResult = ResponseResult.error();
Class<?> tClass = null;
Class<?> rClass = null;
try {
tClass = Class.forName(commandPOJO.getInvokeClass());
try {
rClass = Class.forName(commandPOJO.getRequestPojoPath());
} catch (ClassNotFoundException e) {
log.error("请求类{}不存在,异常{}", commandPOJO.getRequestPojoPath(), e);
throw new BusinessException(CouponTypeEnum.OPERATE_ERROR, "请求类:" + commandPOJO.getRequestPojoPath() + "不存在");
}
Method method = tClass.getMethod(commandPOJO.getInvokeMethod(), rClass);
return ResponseResult.success(method.invoke(tClass.newInstance(), FastJsonUtils.convertJsonToObject(commandPOJO.getRequestParamsJson(), rClass)), CouponTypeEnum.OPERATE_SUCCESS.getCouponTypeDesc());
} catch (ClassNotFoundException classBiz) {
log.error("服务端接口{}不存在,异常{}", commandPOJO.getInvokeClass(), classBiz);
responseResult.setErrorMsg("服务端该接口:" + commandPOJO.getInvokeClass() + "不存在");
} catch (NoSuchMethodException nbiz) {
log.error("服务端接口{}没有方法{},异常{}", commandPOJO.getInvokeClass(), commandPOJO.getInvokeMethod(), nbiz);
responseResult.setErrorMsg("服务端接口:" + commandPOJO.getInvokeClass() + "没有该方法:" + commandPOJO.getInvokeMethod());
} catch (JSONException jsonException) {
log.error("请求json字符串违法{},无法转换成对应的请求类{}", commandPOJO.getRequestParamsJson(), commandPOJO.getRequestPojoPath(), jsonException);
responseResult.setErrorMsg("请求json字符串违法:" + commandPOJO.getRequestParamsJson() + ",无法转换成对应的请求类:" + commandPOJO.getRequestPojoPath());
} catch (BusinessException biz) {
log.error("调用方法业务层异常", biz);
responseResult.setErrorMsg("调用方法业务层异常" + biz.getMessage());
} catch (Exception e) {
log.error("调用方法异常", e);
responseResult.setErrorMsg("调用方法异常" + e.getMessage());
}
return responseResult;
}
}

RpcInvokeServiceImpl:业务方法

@Slf4j
@Service
public class RpcInvokeServiceImpl implements IRpcInvokeService {


/**
* 用于测试远程调用
* @param helloReq
* @return
*/
@Override
public String sysHello(SysHelloReq helloReq) {
return helloReq.getMsg()+",你也好哇";
}
}

验证

验证接口

@RestController
@Slf4j
@Api(value = "远程调用模块")
@RequestMapping("/xiyuanrpc")
public class RPCController {
@PostMapping("/rpcNettybyInvoke")
@ApiOperation(value = "rpc远程调用")
@InvokeParameterCheck
@MethodLogPrint
public ResponseResult rpcNettybyInvoke(@Valid @RequestBody CommandPOJO pojo) {
return NettyClientUtil.rpcNetty(pojo);
}
}

通过​​knife4j​​访问测试接口

SpringBoot使用Netty实现远程调2.0_java

源码

项目源码可从的我的github中获取:​​github源码地址​​

SpringBoot使用Netty实现远程调2.0_spring boot_02


举报

相关推荐

0 条评论