上一篇 <<<Netty的粘包和拆包问题分析
下一篇 >>>序列化与反序列化知识点汇总
socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(10)); //参数为一次接受的数据长度
服务端代码:
serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 设置我们分割最大长度为1024
socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
// 获取数据的结果为string类型,客户端加密,服务端解密
socketChannel.pipeline().addLast(new StringEncoder());
socketChannel.pipeline().addLast(new ServerHandler());
}
});
客户端代码:
ctx.writeAndFlush(Unpooled.copiedBuffer("平均突破3万月薪\n", CharsetUtil.UTF_8));
服务端代码:
serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ByteBuf byteBuf = Unpooled.copiedBuffer("\t".getBytes());
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, byteBuf));
// 获取数据的结果为string类型,客户端加密,服务端解密
socketChannel.pipeline().addLast(new StringEncoder());
socketChannel.pipeline().addLast(new ServerHandler());
}
});
客户端代码:
ctx.writeAndFlush(Unpooled.copiedBuffer("我是测试代码\t", CharsetUtil.UTF_8));
// 服务端代码
public class NettyServer {
private static int port = 9090;
public static void main(String[] args) {
/**
* Channel——Socket,降低了直接使用Socket的复杂度
* EventLoop——控制流、多线程处理、并发
* 一个EventLoopGroup包含一个或者多个EventLoop;
* 一个EventLoop在它的生命周期内只和一个Thread绑定
* 一个Channel在它的生命周期内只注册一个EventLoop;
* 一个EventLoop可能会被分配给一个或多个Channel。
* ChannelFuture——异步通知
* 其addListener()方法注册了一个ChannelFutureListener,以便在某个操作完成时(无论是否成功)得到通知。
*/
/**boss线程组:用于接收客户端连接的请求*/
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
/**work线程组:用于处理客户端连接的读写操作*/
NioEventLoopGroup workGroup = new NioEventLoopGroup();
/**创建ServerBootstrap*/
ServerBootstrap serverBootstrap = new ServerBootstrap();
/**设置分组、channel管道及Handler处理器*/
serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
/**向pipeline中添加编码、解码、业务处理的handler*/
/**定义消息分割器,解决粘包、拆包问题,设置我们分割最大长度为1024,发送消息的时候必须以\n结尾才可以识别*/
socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
socketChannel.pipeline().addLast(new StringDecoder());
/**处理自定义处理器*/
socketChannel.pipeline().addLast(new ServerHandler());
}
});
try {
/**绑定端口,开启监听*/
ChannelFuture future = serverBootstrap.bind(port).sync();
System.out.println("服务器启动成功:" + port);
/**
* closeFuture()是开启了一个channel的监听器,负责监听channel是否关闭的状态,如果未来监听到channel关闭了,子线程才会释放
* sync() 让主线程同步等待子线程结果
*/
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
/**优雅的关闭连接*/
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
//客户端代码(和服务端相比就bootstrap和channel定义不一样)
public class NettyClient {
public static void main(String[] args) {
//创建nioEventLoopGroup
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress("127.0.0.1", 9090))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new ClientHandler());
}
});
try {
// 发起同步连接
ChannelFuture sync = bootstrap.connect().sync();
sync.channel().closeFuture().sync();
} catch (Exception e) {
} finally {
group.shutdownGracefully();
}
}
}
//消息的发送与接收
public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
/**
* 发送时必须设置标记边界\n或\r\n ,否则会存在粘包拆包的情况
*/
for(int i=0;i<10;i++) {
// 发送数据
ctx.writeAndFlush(Unpooled.copiedBuffer("每特教育平均月薪突破多少?\n", CharsetUtil.UTF_8));
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
String result = msg.toString(CharsetUtil.UTF_8);
/**
* 方式一:启动时设置LineBasedFrameDecoder,则直接读取数据,不存在粘包拆包问题
*/
System.out.println("resp:"+result);
/**
* 方式二:手动根据边界写代码
*/
String[] sp = result.split("\n");
for(String s:sp) {
System.out.println("resp:" + s);
}
/**
* 可以在读取的时候直接响应结果
* a、格式得ByteBuf,否则会出现乱码问题
* b、响应也属于发送,后面也得加\n或\r\n
*/
ctx.writeAndFlush(Unpooled.copiedBuffer("结果响应\n", CharsetUtil.UTF_8));
}
}
推荐阅读:
<<<OSI七层模型与层上协议
<<<TCP的三次握手建立链接和四次挥手释放链接
<<<TCP、UDP及Socket代码示例
<<<Https的1.0、2.0协议及长短链接区别
<<<Linux系统的五种IO模型
<<<BIO和NIO区别
<<<BIO模型的缺陷
<<<NIO模式的IO多路复用底层原理
<<<select、poll、epoll的区别
<<<Redis为什么单线程能够支持高并发
<<<Netty初识
<<<Netty的粘包和拆包问题分析
<<<序列化与反序列化知识点汇总
<<<MessagePack反序列化使用示例
<<<Marshalling在Netty中的使用