0
点赞
收藏
分享

微信扫一扫

Netty中序列化框架MessagePack的简单实现

小桥流水2016 2022-04-29 阅读 64

解码器

/**

  • @param ctx 上下文

  • @param msg 需要解码的数据

  • @param out 解码列表

*/

@Override

protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception {

final byte[] array;

final int length = msg.readableBytes();

array = new byte[length];

// 获取需要解码的字节数组

msg.getBytes(msg.readerIndex(), array,0,length);

MessagePack msgpack = new MessagePack();

// 反序列化并将结果保存到了解码列表中

out.add(msgpack.read(array));

}

[](()3.客户端


EchoClient

/**

  • MsgPack 编解码器

  • @author 波波烤鸭

  • @email dengpbs@163.com

*/

public class EchoClient {

public static void main(String[] args) throws Exception {

int port = 8080;

if (args != null && args.length > 0) {

try {

port = Integer.valueOf(args[0]);

} catch (NumberFormatException e) {

// 采用默认值

}

}

new EchoClient().connector(port, “127.0.0.1”,10);

}

public void connector(int port, String host,final int sendNumber) throws Exception {

// 配置客户端NIO线程组

EventLoopGroup group = new NioEventLoopGroup();

try {

Bootstrap b = new Bootstrap();

b.group(group).channel(NioSocketChannel.class)

.option(ChannelOption.TCP_NODELAY, true)

.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)

.handler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

//这里设置通过增加包头表示报文长度来避免粘包

ch.pipeline().addLast(“frameDecoder”,new LengthFieldBasedFrameDecoder(1024, 0, 2,0,2));

//增加解码器

ch.pipeline().addLast(“msgpack decoder”,new MsgpackDecoder());

//这里设置读取报文的包头长度来避免粘包

ch.pipeline().addLast(“frameEncoder”,new LengthFieldPrepender(2));

//增加编码器

ch.pipeline().addLast(“msgpack encoder”,new MsgpackEncoder());

// 4.添加自定义的处理器

ch.pipeline().addLast(new EchoClientHandler(sendNumber));

}

});

// 发起异步连接操作

ChannelFuture f = b.connect(host, port).sync();

// 等待客户端链路关闭

f.channel().closeFuture().sync();

}catch(Exception e){

e.printStackTrace();

} finally {

// 优雅退出,释放NIO线程组

group.shutdownGracefully();

}

}

}

EchoClientHandler

/**

  • DelimiterBasedFrameDecoder 案例

  • 自定义处理器

  • @author 波波烤鸭

  • @email dengpbs@163.com

*/

public class EchoServerHandler extends ChannelHandlerAdapter{

@Override

public 《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》无偿开源 威信搜索公众号【编程进阶路】 void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

//UserInfo user = (UserInfo) msg;

System.out.println(“server receive the msgpack message :”+msg);

//ctx.writeAndFlush(user);

ctx.writeAndFlush(msg);

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

cause.printStackTrace();

ctx.close(); // 发生异常关闭链路

}

}

[](()4.服务端


EchoServer

/**

  • MsgPack 编解码器

  •   服务端
    
  • @author 波波烤鸭

  • @email dengpbs@163.com

*/

public class EchoServer {

public void bind(int port) throws Exception {

// 配置服务端的NIO线程组

// 服务端接受客户端的连接

NioEventLoopGroup bossGroup = new NioEventLoopGroup();

// 进行SocketChannel的网络读写

NioEventLoopGroup workerGroup = new NioEventLoopGroup();

try {

ServerBootstrap b = new ServerBootstrap();

b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)

.option(ChannelOption.SO_BACKLOG, 100)

.handler(new LoggingHandler(LogLevel.INFO))

.childHandler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ch.pipeline().addLast(“frameDecoder”,new LengthFieldBasedFrameDecoder(65535, 0, 2,0,2));

// 添加msgpack的编码和解码器

ch.pipeline().addLast(“msgpack decoder”,new MsgpackDecoder());

ch.pipeline().addLast(“frameEncoder”,new LengthFieldPrepender(2));

ch.pipeline().addLast(“msgpack encoder”,new MsgpackEncoder());

// 添加自定义的处理器

ch.pipeline().addLast(new EchoServerHandler());

}

});

// 绑定端口,同步等待成功

ChannelFuture f = b.bind(port).sync();

// 等待服务端监听端口关闭

f.channel().closeFuture().sync();

}catch(Exception e){

e.printStackTrace();

} finally {

// 优雅退出,释放线程池资源

bossGroup.shutdownGracefully();

workerGroup.shutdownGracefully();

}

}

public static void main(String[] args) throws Exception {

int port = 8080;

if(args!=null && args.length > 0){

try{

port = Integer.valueOf(args[0]);

}catch(NumberFormatException e){

// 采用默认值

}

}

new EchoServer().bind(port);

}

}

EchoServerHandler

/**

  • DelimiterBasedFrameDecoder 案例

  • 自定义处理器

  • @author 波波烤鸭

  • @email dengpbs@163.com

*/

public class EchoServerHandler extends ChannelHandlerAdapter{

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

//UserInfo user = (UserInfo) msg;

举报

相关推荐

0 条评论