0
点赞
收藏
分享

微信扫一扫

TCP的沾包与拆包及基于Netty的解决方案


什么是粘包和拆包

在网络发送中,如果不对发送的数据包进行处理,就可能导致客户端像服务端发送数据粘在一起或者被拆开了变成不是一个完整的数据包的情况。这就是沾包与拆包

演示粘包和拆包的程序

下面会贴上netty入门的一个简单程序,如果觉得本文看不清的也可以去​​git下载​​。

服务端

package com.bxoon.server;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;

public class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

@Override
protected void initChannel(SocketChannel agr0) throws Exception {
agr0.pipeline().addLast(new TimeServerHandler());
}

public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {

}
}
new TimeServer().bind(port);
}
}
package com.bxoon.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class TimeServer {
public void bind(int port) throws Exception {
//配置服务端的NIO线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler());

//绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();

//等待服务端监听端口关闭
f.channel().closeFuture().sync();
} finally {
// 优雅退出,释放线程资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

}
package com.bxoon.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class TimeServerHandler extends ChannelHandlerAdapter {

private int counter;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8").substring(0, req.length - System.getProperty("line.separator").length());
System.out.println("The time server receive order:" + body+";the counter is :" + ++counter);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(
System.currentTimeMillis()).toString() : "BAD ORDER";
currentTime = currentTime + System.getProperty("line.separator");
ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
ctx.writeAndFlush(resp);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}


}

客户端

package com.bxoon.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class TimeClient {
public void connect(int port, String host) throws Exception {

// 配置客户端NIO线程组
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});

// 发起异步链接操作
ChannelFuture f = b.connect(host, port).sync();

//等待客户端链路关闭
f.channel().closeFuture().sync();
} finally {
//优雅退出,释放NIO线程组
group.shutdownGracefully();
}
}

public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (Exception ex) {

}
}
new TimeClient().connect(port, "127.0.0.1");
}

}
package com.bxoon.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.util.logging.Logger;

public class TimeClientHandler extends ChannelHandlerAdapter {

private static final Logger logger = Logger.getLogger(TimeClientHandler.class.getName());

private int counter;

private byte[] req;

// private final ByteBuf firstMessage;

public TimeClientHandler() {
req = ("QUERY TIME ORDER"+System.getProperty("line.separator")).getBytes();
// firstMessage = Unpooled.buffer(req.length);
// firstMessage.writeBytes(req);
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
ByteBuf messgae = null;
for (int i=0;i<100;i++) {
messgae = Unpooled.buffer(req.length);
messgae.writeBytes(req);
ctx.writeAndFlush(messgae);
}
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println("Now is:" + body+";the counter is :"+ ++counter);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.warning("Unexpected excetion fronm downstream:" + cause.getMessage());
ctx.close();
}
}

服务端输出结果

TCP的沾包与拆包及基于Netty的解决方案_.net

客户端输出结果

TCP的沾包与拆包及基于Netty的解决方案_netty_02

上面这个demo有个问题,我们客户端给服务端发送了100次请求,期待服务端的counter将等于100,客户端的counter也等于100.但是从输出结果来看我们服务端counter只为2,而客户端的counter为1。那么这是为什么呢?其实这就是粘包了,客户端在给服务端发送请求的时候由于粘包的问题,这100个请求被合并成2个请求发送到了服务端,而服务端的两个响应又被合并为一个响应响应给了客户端,这就是TCP的粘包问题,而实际上还有可能存在的情况就是通讯过程中的数据包被拆成不是一个完整的包发送,那么这个时候称之为拆包。粘包和拆包会造成收到的数据有误或者不完整。那么沾包和拆包是如何产生的呢?netty又该如果解决TCP的粘包和拆包的问题呢?

粘包和拆包问题产生的原因

这里摘抄《Netty权威指南》上的一段话

TCP的沾包与拆包及基于Netty的解决方案_sed_03

原因很多,列出以下几点供参考


  1. 要发送的数据大于TCP发送缓冲区剩余空间大小,将会发生拆包。
  2. 待发送数据大于MSS(最大报文长度),TCP在传输前将进行拆包。
  3. 要发送的数据小于TCP发送缓冲区的大小,TCP将多次写入缓冲区的数据一次发送出去,将会发生粘包。
  4. 接收数据端的应用层没有及时读取接收缓冲区中的数据,将发生粘包。

解决方案


  • 发送端给每个数据包添加包首部,首部中应该至少包含数据包的长度,这样接收端在接收到数据后,通过读取包首部的长度字段,便知道每一个数据包的实际长度了。
  • 发送端将每个数据包封装为固定长度(不够的可以通过补0填充),这样接收端每次从接收缓冲区中读取固定长度的数据就自然而然的把每个数据包拆分开来。
  • 可以在数据包之间设置边界,如添加特殊符号,这样,接收端通过这个边界就可以将不同的数据包拆分开。比如如下代码,我们通过在每个请求后面加上特定的标识,比如“\r\n”来标识这个请求完成了,然后服务端根据具体的标识来拆分。就可以解决粘包的问题了。
    TCP的沾包与拆包及基于Netty的解决方案_sed_04

通过Netty半包解码器解决TCP的粘包与拆包的问题

这部分内容过多,我直接摘抄​​这篇文章​​,如有侵权请告知,立删。

Netty半包解码器有以下几种


  • LineBasedFrameDecoder
  • DelimiterBasedFrameDecoder(添加特殊分隔符报文来分包)
  • FixedLengthFrameDecoder(使用定长的报文来分包)
  • engthFieldBasedFrameDecoder

LineBasedFrameDecoder解码器

LineBasedFrameDecoder是回车换行解码器,如果用户发送的消息以回车换行符作为消息结束的标识,则可以直接使用Netty的LineBasedFrameDecoder对消息进行解码,只需要在初始化Netty服务端或者客户端时将LineBasedFrameDecoder正确的添加到ChannelPipeline中即可,不需要自己重新实现一套换行解码器。

LineBasedFrameDecoder的工作原理是它依次遍历ByteBuf中的可读字节,判断是否有“\n”或“\r\n”,如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。它是以换行符为结束标志的解码器,支持携带结束符或不携带结束符两种解码方式,同时支持配置单行的最大长度。如果连接读取到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉之前读到的异常码流。防止由于数据报没有携带换行符导致接收到 ByteBuf 无限制积压,引起系统内存溢出。

通常LineBasedFrameDecoder会和StringDecoder搭配使用。StringDecoder的功能非常简单,就是将接收到的对象转换成字符串,然后继续调用后面的Handler。LineBasedFrameDecoder+StringDecoder组合就是按行切换的文本解码器,它本设计用来支持TCP的粘包和拆包。对于文本类协议的解析,文本换行解码器非常实用,例如对 HTTP 消息头的解析、FTP 协议消息的解析等。

使用起来十分简单,只需要在ChannelPipeline 中添加即可,如下所示:

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline p = ch.pipeline();
p.addLast(new LineBasedFrameDecoder(1024));
p.addLast(new StringDecoder());
p.addLast(new StringEncoder());

p.addLast(new LineServerHandler());
}
});

DelimiterBasedFrameDecoder解码器

DelimiterBasedFrameDecoder是分隔符解码器,用户可以指定消息结束的分隔符,它可以自动完成以分隔符作为码流结束标识的消息的解码。回车换行解码器实际上是一种特殊的DelimiterBasedFrameDecoder解码器。使用方式如下

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer(Constants.DELIMITER.getBytes())));
p.addLast(new StringDecoder());
p.addLast(new StringEncoder());
p.addLast(new DelimiterServerHandler());
}
});

首先将“$_”转换成 ByteBuf 对象,作为参数构造 DelimiterBasedFrameDecoder,将其添加到 ChannelPipeline 中,然后依次添加字符串解码器(通常用于文本解码)和用户 Handler,请注意解码器和 Handler 的添加顺序,如果顺序颠倒,会导致消息解码失败。

FixedLengthFrameDecoder解码器

FixedLengthFrameDecoder是固定长度解码器,它能够按照指定的长度对消息进行自动解码,开发者不需要考虑TCP的粘包/拆包等问题,非常实用。

对于定长消息,如果消息实际长度小于定长,则往往会进行补位操作,它在一定程度上导致了空间和资源的浪费。但是它的优点也是非常明显的,编解码比较简单,因此在实际项目中仍然有一定的应用场景。

使用起来十分简单,只需要在ChannelPipeline 中添加即可,如下所示:

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))//配置日志输出
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new FixedLengthFrameDecoder(1<<5));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());

ch.pipeline().addLast(new ServerHandler());
}
});

利用 FixedLengthFrameDecoder 解码器,无论一次接收到多少数据报,它都会按照构造函数中设置的固定长度进行解码,如果是半包消息,FixedLengthFrameDecoder 会缓存半包消息并等待下个包到达后进行拼包,直到读取到一个完整的包。

LengthFieldBasedFrameDecoder解码器

大多数的协议(私有或者公有),协议头中会携带长度字段,用于标识消息体或者整包消息的长度,例如SMPP、HTTP协议等。由于基于长度解码需求的通用性,以及为了降低用户的协议开发难度,Netty提供了LengthFieldBasedFrameDecoder,自动屏蔽TCP底层的拆包和粘包问题,只需要传入正确的参数,即可轻松解决“读半包“问题。

下面我们看看如何通过参数组合的不同来实现不同的“半包”读取策略。第一种常用的方式是消息的第一个字段是长度字段,后面是消息体,消息头中只包含一个长度字段。它的消息结构定义如图所示:

TCP的沾包与拆包及基于Netty的解决方案_sed_05

解码前的字节缓冲区(14 字节)

使用以下参数组合进行解码:


  1. lengthFieldOffset = 0;
  2. lengthFieldLength = 2
  3. lengthAdjustment = 0
  4. initialBytesToStrip = 0

解码后的字节缓冲区内容如图所示:

TCP的沾包与拆包及基于Netty的解决方案_粘包和拆包_06

通过 ByteBuf.readableBytes() 方法我们可以获取当前消息的长度,所以解码后的字节缓冲区可以不携带长度字段,由于长度字段在起始位置并且长度为 2,所以将 initialBytesToStrip 设置为 2,参数组合修改为:


  1. lengthFieldOffset = 0
  2. lengthFieldLength = 2
  3. lengthAdjustment = 0
  4. initialBytesToStrip = 2

解码后的字节缓冲区内容如图所示:

TCP的沾包与拆包及基于Netty的解决方案_.net_07

解码后的字节缓冲区丢弃了长度字段,仅仅包含消息体,对于大多数的协议,解码之后消息长度没有用处,因此可以丢弃。



举报

相关推荐

0 条评论