Netty 三
Netty Handler的调用机制
- ChannelHandler充当处理入站& 出站数据的应用程序逻辑的容器. 例如, 实
现 ChannelInboundHandler接口(或 ChannelInboundHandlerAdapter), 可以接收入站事件& 数据 - ChannelPipeline提供 ChannelHandler链的容器. 图所示
Netty的编& 解码
- Netty提供了一系列实用的编解码器, 解码器都实现了 ChannelInboundHadnler, 编码器实现了 ChannelOutboundHandler接口. 以入站为例, 当每次入栈 channelRead方法会被调用. 随后, 调用解码器的 decode()方法进行解码, 并将已经解码的字节转发给 ChannelPipeline中的下一个 ChannelInboundHandler
ByteToMessageDecoder解码器(继承自 ChannelOutboundHandlerAdapter)
- 由于无从得知远程节点是否会一次性发送一个完整的信息, tcp有可能出现粘包拆包的问题, 这个类会对入栈数据进行缓冲, 直到它准备好被处理
- 使用实例片段:
public class ToIntDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() >= 4) {
out.add(in.readInt());
}
}
}
ChannelPipeline pipeline = ch.pipeline();
// 配置自定义解码器
pipeline.addLast(new ToIntDecoder());
说明:
1) 这个例子中, 每次入站从 ByteBuf中读取4字节, 将其解码为一个 int, 然后将它添加到下一个 List中. int在被添加到List中时, 会被自动装箱为 Integer. 在调用 readInt()方法前必须验证所输入的 ByteBuf是否具有足够的数据
2) 当没有更多元素添加到该 List中时, 它的内容将会被发送给下一个 ChannelInboundHandler
*注: 不论解码器 Handler还是编码器 Handler, 即接收的消息类型与待处理的消息类型必须一致, 否则该 Handler不会被执行
对数据解码时, 需要判断缓存区(ByteBuf)的数据是否足够, 否则接收到的结果会和期望结果可能不一致
ReplayingDecoder解码器(继承自 ByteToMessageDecoder)
*特点是这个解码器无需调用 readableBytes()方法, 也就是该判断已在内部做了处理
- 使用实例片段:
public class MyReplayingDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
out.add(in.readLong());
}
}
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyReplayingDecoder());
- 例子:
public class MyServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new MyServerInitializer()); // 为 workerGroup进行配置, 自定义初始化类
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 指定配置解码器(将入站的 handler进行解码
pipeline.addLast("decoder", new MyReplayingDecoder());
// 指定配置编码器(将出站的 handler进行编码
pipeline.addLast("encoder", new MyLongToByteEncoder());
// 加自定义处理器
pipeline.addLast(new MyServerHandler());
}
}
public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
@Override
protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
System.out.println("MyLongToByteEncoder被调用");
System.out.println("msg=" + msg);
out.writeLong(msg);
}
}
public class MyReplayingDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyReplayingDecoder被调用");
out.add(in.readLong());
}
}
public class MyServerHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("客户端 地址=" + ctx.channel().remoteAddress() + " 接收到long类型数据=" + msg);
// 往客户端发送 9876543210
ctx.writeAndFlush(9876543210L);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
public class MyClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.handler(new MyClientInitializer()); // 自定义初始化类, 配置处理器
ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 指定配置编码器(将出站的 handler进行编码
pipeline.addLast(new MyLongToByteEncoder());
// 指定配置解码器(将入站的 handler进行解码
//pipeline.addLast(new MyReplayingDecoder());
pipeline.addLast(new MyByteToLongDecoder());
// 加自定义处理器
pipeline.addLast(new MyClientHandler());
}
}
public class MyClientHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("服务器的 地址=" + ctx.channel().remoteAddress() + " 收到消息=" + msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("MyClientHandler发送数据");
ctx.writeAndFlush(123456L);
}
}
public class MyByteToLongDecoder extends ByteToMessageDecoder {
/**
* decode会根据接收的数据, 被调用多次, 直到确定没有新的元素被添加到 list, 或者是 ByteBuf没有更多的可读字节为止
* 如果 list out不为空, 就会将 list的内容传递给下一个 channelinboundhandler处理
* * @param ctx 上下文对象
* @param in 入站的 ByteBuf
* @param out List集合, 将解码后的数据传给下一个 handler
* @throws Exception*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyByteToLongDecoder被调用");
// 因为 long 8个字节, 需要判断有8个字节, 才能读取一个 long
if (in.readableBytes() >= 8) {
out.add(in.readLong());
}
}
}
整合 Log4j到 Netty便于排查问题
- 添加依赖 pom.xml
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
- 配置 resources/log4j.properties
log4j.rootLogger=DEBUG, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%p] %C{1} - %m%n
Protobuf(全称为 Google Protocol Buffers)
- 是 Google开源, 可用于结构化数据串行化(序列化). 很适合做数据存储或 RPC(远程过程调用 Remote Procedure Call)数据交换格式
- 优点是轻便, 高效, 可靠的结构化数据存储格式
- 参考文档 : https://developers.google.com/protocol-buffers/docs/proto
- Protobuf是以 message的方式来管理数据的
- 支持跨平台, 跨语言, 它支持目前绝大多数语言 如 C++, Java, Python, Go, C#, Objective C, JavaScript, Ruby, PHP, and Dart等
- Protobuf包引入 pom.xml
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.6.1</version>
</dependency>
- Protobuf定义单 message的 User.proto文件:
syntax = "proto3"; // 版本
option java_outer_classname = "UserPOJO"; // 生成的外部类名, 同时也是文件名
// protobuf使用 message管理数据
message User { // 生成的 UserPOJO外部类里的内部类 User, 该类为实际发送业务数据的 POJO对象
int32 id = 1; // 定义该类下的属性字段. int32(protobuf类型)1表示属性序号, 而不是值
string name = 2;
}
-
Protobuf生成 UserPOJO.java文件:
dos> E:\protoc-3.6.1-win32\bin\protoc.exe --java_out=. User.proto -
使用实例:
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
public class NettyServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 指定配置编码器
pipeline.addLast("encoder", new ProtobufEncoder());
// 指定配置解码器
pipeline.addLast("decoder", new ProtobufDecoder(UserPOJO.User.getDefaultInstance()));
// 加自定义处理器
pipeline.addLast(new NettyServerHandler());
}
});
ChannelFuture cf = bootstrap.bind(6668).sync();
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class NettyServerHandler extends SimpleChannelInboundHandler<UserPOJO.User> {
@Override
public void channelRead0(ChannelHandlerContext ctx, UserPOJO.User msg) throws Exception {
System.out.println("客户端发送的数据 id=" + msg.getId() + " 名字=" + msg.getName());
}
// 数据读取完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 回消息到客户端
ctx.writeAndFlush(
UserPOJO.User.newBuilder()
.setId(2)
.setName("服务器")
.build()
);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
public class NettyClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 指定配置编码器
pipeline.addLast("encoder", new ProtobufEncoder());
// 指定配置解码器
pipeline.addLast("decoder", new ProtobufDecoder(UserPOJO.User.getDefaultInstance()));
// 加自定义处理器
pipeline.addLast(new NettyClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(
UserPOJO.User.newBuilder()
.setId(1)
.setName("客户端")
.build()
);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
UserPOJO.User student = (UserPOJO.User) msg;
System.out.println("服务器发送的数据 id=" + student.getId() + " 名字=" + student.getName());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
- Protobuf定义多 message(多个类)的 UserSet.proto文件
syntax = "proto3";
option optimize_for = SPEED; // 加快解析
//option java_package="com.netty.test"; // 指定创建后存文件的包
option java_outer_classname="UserMessages"; // 外部类名, 文件名
// protobuf可以使用 message管理其他的 message
message UserSet {
// 定义一个枚举类型
enum DataType {
UserType = 0; // proto3要求 enum的编号从0开始
WalletType = 1;
}
// 用 data_type来标识锁定其中一个 message
DataType data_type = 1; // 该编号为顺序号
// 表示只可以选择其中一个 message, 为节省空间
oneof dataBody {
User user = 2;
Wallet wallet = 3;
}
}
message User {
int32 id = 1; // 该编号为顺序
int32 age = 2;
string name = 3;
}
message Wallet {
int32 point = 1;
double balace = 2;
}
-
Protobuf生成 UserMessages.java文件:
dos> E:\protoc-3.6.1-win32\bin\protoc.exe --java_out=. UserSet.proto -
使用实例:
public class NettyServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象)
//给pipeline 设置处理器
@Override
protected void initChannel(SocketChannel ch) throws Exception { // 为 workerGroup进行配置
ChannelPipeline pipeline = ch.pipeline();
// 指定配置解码器
pipeline.addLast("decoder", new ProtobufDecoder(UserMessages.UserSet.getDefaultInstance()));
// 加自定义处理器
pipeline.addLast(new NettyServerHandler());
}
});
ChannelFuture cf = serverBootstrap.bind(7000).sync();
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class NettyServerHandler extends SimpleChannelInboundHandler<UserMessages.UserSet> {
@Override
public void channelRead0(ChannelHandlerContext ctx, UserMessages.UserSet msg) throws Exception {
// 根据 dataType显示不同的信息
if (msg.getDataType() == UserMessages.UserSet.DataType.UserType) {
UserMessages.User user = msg.getUser();
System.out.println("用户 id=" + user.getId() + " age=" + user.getAge() + " name=" + user.getName());
} else if (msg.getDataType() == UserMessages.UserSet.DataType.WalletType) {
UserMessages.Wallet wallet = msg.getWallet();
System.out.println("钱包 point=" + wallet.getPoint() + " balace=" + wallet.getBalace());
} else {
System.out.println("传输的类型不正确");
}
}
// 数据读取完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 回复客户端消息
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, this is servier!", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
public class NettyClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 指定配置编码器
pipeline.addLast("encoder", new ProtobufEncoder());
// 加自定义处理器
pipeline.addLast(new NettyClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 7000).sync();
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 随机发送 User或 Wallet对象
int random = new Random().nextInt(3);
UserMessages.UserSet userSet;
if (0 == random) {
// 发送 User对象
userSet = UserMessages.UserSet.newBuilder()
.setDataType(UserMessages.UserSet.DataType.UserType)
.setUser(
UserMessages.User.newBuilder()
.setId(1)
.setAge(10)
.setName("客户端用户")
.build()
).build();
} else {
// 发送 Wallet对象
userSet = UserMessages.UserSet.newBuilder()
.setDataType(UserMessages.UserSet.DataType.WalletType)
.setWallet(
UserMessages.Wallet.newBuilder()
.setPoint(100)
.setBalace(100.99)
.build()
).build();
}
ctx.writeAndFlush(userSet);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复 地址=" + ctx.channel().remoteAddress() + " 消息=" + buf.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
TCP粘包和拆包基本介绍
- TCP是面向连接, 面向流的. 收发两端都有一一成对的 socket, 当发送端为了更有效的发数据到接收方, 使用了
Nagle算法
, 将连续发送的请求时间间隔较小, 且数据量小的数据, 合并成一个大的数据块, 然后进行封包, 并发送数据. 这种方式虽提高了传输效率, 但接收方难以分辨出完整的数据包, 因为面向流的通信是无消息保护边界的
- TCP粘包& 拆包图示:
通过 Netty编写的程序, 如果未做处理, 就会发生粘包和拆包的问题
- 产生粘包和拆包的问题实例:
public class MyServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new MyServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyServerHandler());
}
}
public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
// 每个客户端的发包的累计个数
private int count;
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
byte[] buffer = new byte[msg.readableBytes()];
msg.readBytes(buffer);
String message = new String(buffer, Charset.forName("utf-8"));
System.out.println("客户端 地址=" + ctx.channel().remoteAddress() + " 发包序号(" + (++this.count) + ") 接收到数据=" + message);
// 服务器回送数据给客户端, 回送一个随机 id
ByteBuf responseByteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString() + " ", Charset.forName("utf-8"));
ctx.writeAndFlush(responseByteBuf);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
public class MyClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.handler(new MyClientInitializer());
ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyClientHandler());
}
}
public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
// 服务端发包的个数
private int count;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 使用客户端发送10条数据 hello, server + 编号
for (int i = 0; i < 10; ++i) {
ByteBuf buffer = Unpooled.copiedBuffer("hello, server " + i + " ", Charset.forName("utf-8"));
ctx.writeAndFlush(buffer);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
byte[] buffer = new byte[msg.readableBytes()];
msg.readBytes(buffer);
String message = new String(buffer, Charset.forName("utf-8"));
System.out.println("服务端发包序号(" + (++this.count) + ") 接收到数据=" + message);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
执行客户端 1: 服务端的打印
客户端 地址=/127.0.0.1:63964 发包序号(1) 接收到数据=hello, server 0 hello, server 1 hello, server 2 hello, server 3 hello, server 4 hello, server 5 hello, server 6 hello, server 7 hello, server 8 hello, server 9
执行客户端 2: 服务端的打印
客户端 地址=/127.0.0.1:64050 发包序号(1) 接收到数据=hello, server 0
客户端 地址=/127.0.0.1:64050 发包序号(2) 接收到数据=hello, server 1
客户端 地址=/127.0.0.1:64050 发包序号(3) 接收到数据=hello, server 2 hello, server 3 hello, server 4 hello, server 5
客户端 地址=/127.0.0.1:64050 发包序号(4) 接收到数据=hello, server 6 hello, server 7
客户端 地址=/127.0.0.1:64050 发包序号(5) 接收到数据=hello, server 8 hello, server 9
执行客户端 3: 服务端的打印
客户端 地址=/127.0.0.1:64092 发包序号(1) 接收到数据=hello, server 0
客户端 地址=/127.0.0.1:64092 发包序号(2) 接收到数据=hello, server 1
客户端 地址=/127.0.0.1:64092 发包序号(3) 接收到数据=hello, server 2 hello, server 3 hello, server 4
客户端 地址=/127.0.0.1:64092 发包序号(4) 接收到数据=hello, server 5 hello, server 6 hello, server 7
客户端 地址=/127.0.0.1:64092 发包序号(5) 接收到数据=hello, server 8 hello, server 9
*可以看到未处理粘包& 拆包
- TCP粘包和拆包解决方案
- 使用自定义协议 + 编解码器来解决
- 每次传输数据时附加数据长度 + 字节码数组, 以此避免服务端多读或少读的问题(也就是解决粘包& 拆包的问题)
- 解决案例 1:
// 协议包
public class MessageProtocol {
private int len;
private byte[] content;
public int getLen() {
return len;
}
public void setLen(int len) {
this.len = len;
}
public byte[] getContent() {
return content;
}
public void setContent(byte[] content) {
this.content = content;
}
}
public class MyServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new MyServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8000).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyMessageEncoder());
pipeline.addLast(new MyMessageDecoder());
pipeline.addLast(new MyServerHandler());
}
}
public class MyServerHandler extends SimpleChannelInboundHandler<MessageProtocol> {
// 每个客户端的发包的累计个数
private int count;
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
int len = msg.getLen();
byte[] content = msg.getContent();
System.out.println("客户端 地址=" + ctx.channel().remoteAddress() + " 发包序号(" + (++this.count) + ")" +
" 长度=" + len + " 接收到数据=" + new String(content, Charset.forName("utf-8")));
// 回复消息
byte[] responseContent = UUID.randomUUID().toString().getBytes("utf-8");
// 构建一个协议包
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLen(responseContent.length);
messageProtocol.setContent(responseContent);
ctx.writeAndFlush(messageProtocol);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
public class MyClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.handler(new MyClientInitializer());
ChannelFuture channelFuture = bootstrap.connect("localhost", 8000).sync();
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyMessageEncoder());
pipeline.addLast(new MyMessageDecoder());
pipeline.addLast(new MyClientHandler());
}
}
public class MyClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {
// 服务端发包的个数
private int count;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 使用客户端发送10条数据
for (int i = 0; i < 10; i++) {
byte[] content = ("客户端: 测试消息" + i).getBytes(Charset.forName("utf-8"));
// 构建一个协议包
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLen(content.length);
messageProtocol.setContent(content);
ctx.writeAndFlush(messageProtocol);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
int len = msg.getLen();
byte[] content = msg.getContent();
System.out.println("服务端 发包序号(" + (++this.count) + ")" +
" 长度=" + len + " 接收到数据=" + new String(content, Charset.forName("utf-8")));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
@Override
protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
System.out.println("MyMessageEncoder.encode被调用");
out.writeInt(msg.getLen());
out.writeBytes(msg.getContent());
}
}
public class MyMessageDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyMessageDecoder.decode被调用");
int length = in.readInt();
byte[] content = new byte[length];
in.readBytes(content);
// 放入 MessageProtocol对象内, 再放入 out, 传递给下一个 handler业务处理
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLen(length);
messageProtocol.setContent(content);
out.add(messageProtocol);
}
}
客户端传输10条数据时, 服务端会将产生的每个粘包都正常拆包, 并按序打印
MyMessageEncoder.encode被调用
MyMessageEncoder.encode被调用
MyMessageEncoder.encode被调用
MyMessageEncoder.encode被调用
MyMessageEncoder.encode被调用
MyMessageEncoder.encode被调用
MyMessageEncoder.encode被调用
MyMessageEncoder.encode被调用
MyMessageEncoder.encode被调用
MyMessageEncoder.encode被调用
MyMessageDecoder.decode被调用
服务端 发包序号(1) 长度=36 接收到数据=64ba40a3-7cd3-45a6-bcee-43ace469a223
MyMessageDecoder.decode被调用
服务端 发包序号(2) 长度=36 接收到数据=f0104982-2738-485c-bf78-bea539cdc248
MyMessageDecoder.decode被调用
服务端 发包序号(3) 长度=36 接收到数据=ef9421d6-6b08-4c11-8c90-dd6dc758327b
MyMessageDecoder.decode被调用
服务端 发包序号(4) 长度=36 接收到数据=9217a873-a3a4-473c-8c0c-99d19c81bdd8
MyMessageDecoder.decode被调用
服务端 发包序号(5) 长度=36 接收到数据=39e1806a-b6fb-4979-9712-eeffd478b844
MyMessageDecoder.decode被调用
服务端 发包序号(6) 长度=36 接收到数据=86f70794-eb6c-4004-9658-3ba7db7012c3
MyMessageDecoder.decode被调用
服务端 发包序号(7) 长度=36 接收到数据=41bfa0ed-7569-4e46-b46a-62e2b7f3b993
MyMessageDecoder.decode被调用
服务端 发包序号(8) 长度=36 接收到数据=f3133ae8-82b1-4da9-9bc7-ff3855ee6251
MyMessageDecoder.decode被调用
服务端 发包序号(9) 长度=36 接收到数据=815c87df-a42e-4b8e-9516-af8c3628138f
MyMessageDecoder.decode被调用
服务端 发包序号(10) 长度=36 接收到数据=8b1d2a40-1291-4755-ae91-46b2b56c5bea
- 解决案例 2: 使用 Google Protobuf的 Netty程序
public class NettyServer {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() { // 创建一个通道初始化对象(匿名对象)
@Override
protected void initChannel(SocketChannel ch) throws Exception { // 为 workerGroup进行配置
// 给 pipeline 设置处理器
ChannelPipeline pipeline = ch.pipeline();
// 指定配置编码器
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(new ProtobufEncoder());
// 指定配置解码器
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
pipeline.addLast(new ProtobufDecoder(UserMessages.UserSet.getDefaultInstance()));
// 加自定义处理器
pipeline.addLast(new NettyServerHandler());
}
});
ChannelFuture cf = serverBootstrap.bind(8000).sync();
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class NettyServerHandler extends SimpleChannelInboundHandler<UserMessages.UserSet> {
// 每个客户端的发包的累计个数
private int count;
@Override
public void channelRead0(ChannelHandlerContext ctx, UserMessages.UserSet msg) throws Exception {
System.out.println("客户端 地址=" + ctx.channel().remoteAddress() + " 发包序号(" + (++this.count) + ")");
// 根据 dataType显示不同的信息
if (msg.getDataType() == UserMessages.UserSet.DataType.UserType) {
UserMessages.User user = msg.getUser();
System.out.println("用户 id=" + user.getId() + " age=" + user.getAge() + " name=" + user.getName());
} else if (msg.getDataType() == UserMessages.UserSet.DataType.WalletType) {
UserMessages.Wallet wallet = msg.getWallet();
System.out.println("钱包 point=" + wallet.getPoint() + " balace=" + wallet.getBalace());
} else {
System.out.println("传输的类型不正确");
}
}
// 数据读取完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 20; i++) {
// 回复客户端消息
ctx.writeAndFlush(UserMessages.UserSet.newBuilder()
.setDataType(UserMessages.UserSet.DataType.UserType)
.setUser(
UserMessages.User.newBuilder()
.setId(2)
.setAge(999)
.setName("服务端用户")
.build()
).build());
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
public class NettyClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 指定配置编码器
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(new ProtobufEncoder());
// 指定配置解码器
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
pipeline.addLast(new ProtobufDecoder(UserMessages.UserSet.getDefaultInstance()));
// 加自定义处理器
pipeline.addLast(new NettyClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8000).sync();
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 使用客户端发送10条数据
for (int i = 0; i < 10; i++) {
// 随机发送 User或 Wallet对象
int random = new Random().nextInt(3);
UserMessages.UserSet userSet;
if (0 == random) {
// 发送 User对象
userSet = UserMessages.UserSet.newBuilder()
.setDataType(UserMessages.UserSet.DataType.UserType)
.setUser(
UserMessages.User.newBuilder()
.setId(1)
.setAge(10)
.setName("客户端用户" + i)
.build()
).build();
} else {
// 发送 Wallet对象
userSet = UserMessages.UserSet.newBuilder()
.setDataType(UserMessages.UserSet.DataType.WalletType)
.setWallet(
UserMessages.Wallet.newBuilder()
.setPoint(100 + i)
.setBalace(100.99)
.build()
).build();
}
ctx.writeAndFlush(userSet);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
UserMessages.UserSet userSet = (UserMessages.UserSet) msg;
UserMessages.User user = userSet.getUser();
System.out.println("服务器回复 id=" + user.getId() + " age=" + user.getAge() + " name=" + user.getName());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}