0
点赞
收藏
分享

微信扫一扫

Springboot使用Netty集成protobuf开发高性能服务器 (附源码下载)

君之言之 2022-04-18 阅读 47

Springboot-cli 开发脚手架系列

Netty系列:Springboot使用Netty集成protobuf开发高性能服务器


文章目录


前言

首先我们需要使用Netty搭建基础的tcp框架,参考Springboot使用Netty优雅的创建高性能TCP服务器,接下来我们开始集成protubuf。
结尾有完整源码下载地址

1. 下载protoc.exe

官网下载地址

2. 编写.proto文件

编写规则参考官网中文文档
创建message.proto

// 声明使用proto3
syntax = "proto3";
// 包名
option java_package = "com.netty.client.procotol";
// 类名
option java_outer_classname = "MessageBuf";
 
// 消息整合,便于netty导入编码解码器
message Message {
  // 包类型
  PackType packType = 1;
  // 根据包类型多选1
  oneof Pack{
      LoginRequest loginRequest = 2;
	  LoginResponse loginResponse = 3;
	  MessageRequest messageRequest = 4;
	  MessageResponse messageResponse = 5;
  }
  // 包类型枚举
  enum PackType {
     LOGIN_REQ = 0;
	 LOGIN_RESP = 1;
	 MESSAGE_REQ = 2;
	 MESSAGE_RESP = 3;
  }
}
 
// 登录请求,包含用户名
message LoginRequest {
  string username = 1;
  string password = 2;
}

// 登录响应
message LoginResponse {
  int32 code = 1;
  string message = 2;
}
 
// 消息请求
message MessageRequest {
  int32 messageId = 1;
  int32 type = 2;
  string data = 3;
}

// 消息响应
message MessageResponse {
  int32 messageId = 1;
  int32 code = 2;
  string message = 3;
}

3. 生成.java 的协议包

找到第一步下载的protoc.exe,在protoc.exe同目录下执行

# --java_out 输出路径、message.proto 要执行的文件
protoc.exe --java_out=E:\lqd\protoc-3.20.1-rc-1-win64\bin message.proto

在这里插入图片描述

4. netty引入协议文件

  • 在我们tcp项目的基础上加入protobuf依赖
    pom.xml
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.20.1-rc-1</version>
        </dependency>

复制第三步生成的.java文件到我们的tcp项目中,把netty的编码器换成我们第三步生成的协议包即可。
这里的项目开头有介绍搭建教程地址,点开连接根据教程搭建基础框架即可,非常简单,这里就不重复搭建了。
在这里插入图片描述

  • 我们修改channel包下的ChannelInit .java,编码解码器改为Protubuf
@Component
@RequiredArgsConstructor
public class ChannelInit extends ChannelInitializer<SocketChannel> {

    private final MessageHandler messageHandler;

    @Override
    protected void initChannel(SocketChannel channel) {
        channel.pipeline()
                // 心跳时间
                .addLast("idle", new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS))
                // 添加编码解码器
                .addLast(new ProtobufVarint32FrameDecoder())
                // 此处引用我们生成的协议类MessageBuf
                .addLast(new ProtobufDecoder(MessageBuf.Message.getDefaultInstance()))
                .addLast(new ProtobufVarint32LengthFieldPrepender())
                .addLast(new ProtobufEncoder())
                // 添加消息处理器
                .addLast("messageHandler", messageHandler);
    }
}
  • 新增消息构建器,方便我们构建消息体,MessageBuilder .java
/**
 * protobuf 消息构建
 *
 * @author qiding
 */
public class MessageBuilder {

    /**
     * 登录请求
     *
     * @param username 用户名
     * @param password 密码
     */
    public static MessageBuf.Message.Builder newLogin(String username, String password) {
        MessageBuf.LoginRequest.Builder loginMes = MessageBuf.LoginRequest.newBuilder().setUsername(username).setPassword(password);
        return MessageBuf.Message.newBuilder().setLoginRequest(loginMes).setPackType(MessageBuf.Message.PackType.LOGIN_REQ);
    }

    /**
     * 登录响应
     *
     * @param msg  提示消息
     * @param code 错误码
     */
    public static MessageBuf.Message.Builder newLoginResp(String msg, Integer code) {
        MessageBuf.LoginResponse.Builder loginResp = MessageBuf.LoginResponse.newBuilder().setMessage(msg).setCode(code);
        return MessageBuf.Message.newBuilder().setLoginResponse(loginResp).setPackType(MessageBuf.Message.PackType.LOGIN_RESP);
    }

    /**
     * 业务消息请求
     *
     * @param msgId 消息id 请求的id和响应的id需一致,便于判断服务器响应的是哪个业务请求
     * @param data  请求内容
     * @param type  业务类型
     */
    public static MessageBuf.Message.Builder newMessageReq(Integer msgId, String data, Integer type) {
        MessageBuf.MessageRequest.Builder messageReq = MessageBuf.MessageRequest.newBuilder().setMessageId(msgId).setData(data).setType(type);
        return MessageBuf.Message.newBuilder().setMessageRequest(messageReq).setPackType(MessageBuf.Message.PackType.MESSAGE_REQ);
    }

    /**
     * 业务消息响应
     *
     * @param msgId 消息id 请求的id和响应的id需一致,便于判断服务器响应的是哪个业务请求
     * @param msg   提示消息
     * @param code  错误码
     */
    public static MessageBuf.Message.Builder newMessageResp(Integer msgId, String msg, Integer code) {
        MessageBuf.MessageResponse.Builder messageResp = MessageBuf.MessageResponse.newBuilder().setMessageId(msgId).setMessage(msg).setCode(code);
        return MessageBuf.Message.newBuilder().setMessageResponse(messageResp).setPackType(MessageBuf.Message.PackType.MESSAGE_RESP);
    }

}
  • 修改handle包下的消息处理器,MessageHandler .java
/**
 * Protubuf消息处理,单例启动
 *
 * @author qiding
 */
@Slf4j
@Component
@ChannelHandler.Sharable
@RequiredArgsConstructor
public class MessageHandler extends SimpleChannelInboundHandler<MessageBuf.Message> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageBuf.Message message) throws Exception {
        log.debug("\n");
        log.debug("channelId:" + ctx.channel().id());
        log.debug("消息类型:{}", message.getPackType().name());
        switch (message.getPackType()) {
            case LOGIN_REQ:
                log.debug("收到登录消息\n{}", message.getLoginRequest());
                // 回复客户端
                ctx.writeAndFlush(MessageBuilder.newLoginResp("login successfully", 200));
                break;
            case MESSAGE_REQ:
                log.debug("收到普通消息{}", message.getMessageRequest());
                // 回复客户端
                ctx.writeAndFlush(MessageBuilder.newMessageResp(message.getMessageRequest().getMessageId(), "ok", 200));
                break;
            default:
                log.error("不支持的消息类型");
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        log.debug("\n");
        log.debug("断开连接");
        ChannelStore.closeAndClean(ctx);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.debug("\n");
        log.debug("成功建立连接,channelId:{}", ctx.channel().id());
        super.channelActive(ctx);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        log.debug("心跳事件时触发");
    }
}

5. 效果演示

这里需要配合client进行测试
protobuf-client搭建教程参考
在这里插入图片描述
在这里插入图片描述

6. 源码分享

  • Springboot-cli开发脚手架,集合各种常用框架使用案例,完善的文档,致力于让开发者快速搭建基础环境并让应用跑起来。
  • 项目源码github地址
  • 项目源码国内gitee地址
举报

相关推荐

0 条评论