Netty源码分析3:创建一个简单的Netty服务并测试
- 创建并启动一个Netty服务端
- 创建客户端并连接到客户端
- 服务端boss的ChannelHandler
- 服务端worker和客户端的ChannelHander
- 启动测试
- 启动客户端链接到客户端
- 服务端处理客户端消息
创建并启动一个Netty服务端
如何创建并启动一个Netty服务端?通过如下的代码将启动一个netty服务端并监听在8888端口。
public class MyNettyServer {
private static Logger logger = LogManager.getLogger(MyNettyServer.class);
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
new ServerBootstrap()
.group(boss, worker)//该方法返回ServerBootstrap
.option(ChannelOption.SO_BACKLOG, 128)//该方法返回ServerBootstrap
.channel(NioServerSocketChannel.class)//该方法返回ServerBootstrap
.handler(new MyServerChannelDuplexHandler())//该方法返回ServerBootstrap
.childHandler(new MyChannelInitializer("客户端"))//该方法返回ServerBootstrap
.bind(8888)//返回ChannelFuture
.sync()//返回ChannelFuture
.addListener(future -> logger.info("服务启动成功:" + future.isSuccess()))//返回ChannelFuture
.channel()//返回Channel
.closeFuture()//返回ChannelFuture
.sync()//返回ChannelFuture
.addListener(future -> logger.info("端口已关闭,如有异常,则异常原因为:"+future.cause()));//返回ChannelFuture
} finally {
//优雅的关闭boss和worker
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
- NioEventLoopGroup boss:相当于循环监听端口操作,当有客户端发起连接时会得到一个与客户端对应的socket操作用于对端双方通信
- NioEventLoopGroup worker:上述boss在创建好与客户端的对应的socket后会将socket交给worker,由worker来执行具体的通信操作
- NioEventLoopGroup:异步事件循环执行器组,内部定义了一组异步事件执行器NioEventLoop,暂时可以简单的理解为线程池与线程的关系,后续会进行专门分析
- ServerBootstrap:服务启动引导类,通过该类可以快速的构建一个Netty应用,同时该类提供了一个链式编程
- group():配置boss和worker,简单的让当前ServerBootstrap持有boss和work。
- option(ChannelOption.SO_BACKLOG, 128):配置channel的相关参数,该示例配置链接请求待处理队列数量为128,超过128的链接请求将被拒绝
- channel():配置channel的类型,除了NioServerSocketChannel还可以使用OIOServerSocketChannel以及其他channel
- hander():配置boss的channel处理器,即如何处理连接请求,通常是将该连接请求分发给worker,因此boss拥有一个固定请求转发handler,该handler将所有链接请求转发到worker中。
- childHandler():配置worker的channel处理器,channel上的读写操作最终都是通过channelHander来实现的
- bind():上述配置信息中group和channel必须配置,通过bind方法将启动netty服务并监听对应的地址和端口
- sync():由于bind()方法内部是基于异步执行的,sync()方法等待bind()异步结果
- addListener():给异步结果添加监听器,将在结果执行完成后得到通知
- channel():获取异步执行结果中的channel
- closeFuture:获取channel的异步关闭结果,对于ServerSocketChannel端口而言,不可能被关闭,因此closeFure不会完成,对于SocketChannel而言,当对端调用channel.close()方法时将被关闭,因此此时closeFuture将被赋值
- sync():在closeFuture上等待通道关闭结果
- addListener():当通道关闭时,注册在通道上的监听器将被通知
创建客户端并连接到客户端
基于Netty框架创建好客户端逻辑与客户端基本一致,区别在于客户端不需要boss监听端口,使用Bootstrap引导创建
class MyNettyClient {
private static Logger logger = LogManager.getLogger(MyNettyServer.class);
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup g = new NioEventLoopGroup(1);
try {
SocketChannel channel = (SocketChannel) new Bootstrap()
.group(g)
.channel(NioSocketChannel.class)
.handler(new MyChannelInitializer("服务端"))
.connect(new InetSocketAddress(8888))
.sync()
.addListener(future -> logger.info("客户端启动是否成功:" + future.isSuccess()))
.channel();
channel.write("第一条消息");
channel.write(Stream.of("这是第二条消息", "第三条", "最后一条消息,非常长的长消息").collect(Collectors.toList()));
channel.closeFuture()
.sync()
.addListener(future -> logger.info("端口已关闭,如有异常,则异常原因为:" + future.cause()));
} finally {
g.shutdownGracefully();
}
}
}
服务端boss的ChannelHandler
该ChannelHandler会在服务端boss的不同节点打印日志输入
class MyServerChannelDuplexHandler extends ChannelDuplexHandler {
private static Logger logger = LogManager.getLogger(MyServerChannelDuplexHandler.class);
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
logger.info("NioServerSocketChannel:" + ctx.channel() + "成功注册到执行器:" + ctx.executor());
ctx.fireChannelRegistered();
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
logger.info("NioServerSocketChannel:" + ctx.channel() + "成功添加ChannelHandler:" + ctx.handler());
}
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
logger.info("NioServerSocketChannel:" + ctx.channel() + "执行绑定到地址:" + localAddress);
ctx.bind(localAddress, promise);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("NioServerSocketChannel:" + ctx.channel() + "已激活,准备监听端口事件");
ctx.fireChannelActive();
}
}
服务端worker和客户端的ChannelHander
class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
private static Log log = LogFactory.getLog(MyChannelInitializer.class);
private static final String delimiterStr = "$_-$";
private static final int maxFrameLen = 1024 * 1024;//1M
private static final ByteBuf delimiter = ByteBufAllocator.DEFAULT.directBuffer(4).writeBytes(delimiterStr.getBytes());
private String peer;
public MyChannelInitializer(String peer) {
this.peer = peer;
}
private void check(ChannelHandlerContext ctx, String msg) throws Exception {
if (msg.length() > maxFrameLen) {
exceptionCaught(ctx, new TooLongFrameException("发送消息内容过长"));
}
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new DelimiterBasedFrameDecoder(maxFrameLen, delimiter));
pipeline.addLast(new StringEncoder());
pipeline.addLast(new StringDecoder());
pipeline.addLast(new ChannelDuplexHandler() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
StringBuilder res = new StringBuilder();
if (msg instanceof String) {
String str = (String) msg;
check(ctx, str);
res.append(str).append(delimiterStr);
} else if (msg instanceof List && ((List) msg).get(0) instanceof String) {
for (String str : (List<String>) msg) {
check(ctx, str);
res.append(str).append(delimiterStr);
}
} else {
exceptionCaught(ctx, new Error("当前只允许发送String和List<String>类型"));
}
ctx.write(res);
ctx.flush();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
super.channelReadComplete(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("收到服务端【" + ctx.channel().remoteAddress() + "】消息【" + msg + "】");
if (peer == "客户端") {
ctx.pipeline().write("已成功处理消息");
ctx.flush();
}
}
});
}
}
启动测试
- 先启动服务端,将得到如下日志信息,根据日志顺序执行顺序依次是
- 向channel中注册channelHandler
- 将channel注册到执行器中
- 端口绑定
- 执行端口绑定监听器
- 通道激活开始监听端口事件
21:59:34.993 [nioEventLoopGroup-2-1] INFO netty.server.MyServerChannelDuplexHandler - NioServerSocketChannel:[id: 0xa12ccdd6]成功添加ChannelHandler:netty.server.MyServerChannelDuplexHandler@1a89455d
21:59:34.993 [nioEventLoopGroup-2-1] INFO netty.server.MyServerChannelDuplexHandler - NioServerSocketChannel:[id: 0xa12ccdd6]成功注册到执行器:io.netty.channel.nio.NioEventLoop@43301423
21:59:34.993 [nioEventLoopGroup-2-1] INFO netty.server.MyServerChannelDuplexHandler - NioServerSocketChannel:[id: 0xa12ccdd6]执行绑定到地址:0.0.0.0/0.0.0.0:8888
21:59:35.009 [nioEventLoopGroup-2-1] INFO netty.server.MyNettyServer - 服务启动成功:true
21:59:35.009 [nioEventLoopGroup-2-1] INFO netty.server.MyServerChannelDuplexHandler - NioServerSocketChannel:[id: 0xa12ccdd6, L:/0:0:0:0:0:0:0:0:8888]已激活,准备监听端口事件
启动客户端链接到客户端
客户端启动后将会输出如下日志,日志执行顺序如下
- 通知connect异步结果上的监听器
- 向服务端发送消息
- 接受服务端响应结果并输出
22:01:09.043 [nioEventLoopGroup-2-1] INFO netty.server.MyNettyServer - 客户端启动是否成功:true
22:01:09.090 [nioEventLoopGroup-2-1] INFO netty.server.MyChannelInitializer - 收到服务端【0.0.0.0/0.0.0.0:8888】消息【已成功处理消息】
22:01:09.090 [nioEventLoopGroup-2-1] INFO netty.server.MyChannelInitializer - 收到服务端【0.0.0.0/0.0.0.0:8888】消息【已成功处理消息】
22:01:09.090 [nioEventLoopGroup-2-1] INFO netty.server.MyChannelInitializer - 收到服务端【0.0.0.0/0.0.0.0:8888】消息【已成功处理消息】
22:01:09.090 [nioEventLoopGroup-2-1] INFO netty.server.MyChannelInitializer - 收到服务端【0.0.0.0/0.0.0.0:8888】消息【已成功处理消息】
服务端处理客户端消息
22:01:09.075 [nioEventLoopGroup-3-1] INFO netty.server.MyChannelInitializer - 收到服务端【/192.168.56.1:61353】消息【第一条消息】
22:01:09.075 [nioEventLoopGroup-3-1] INFO netty.server.MyChannelInitializer - 收到服务端【/192.168.56.1:61353】消息【这是第二条消息】
22:01:09.075 [nioEventLoopGroup-3-1] INFO netty.server.MyChannelInitializer - 收到服务端【/192.168.56.1:61353】消息【第三条】
22:01:09.075 [nioEventLoopGroup-3-1] INFO netty.server.MyChannelInitializer - 收到服务端【/192.168.56.1:61353】消息【最后一条消息,非常长的长消息】