Netty :Netty介绍 & 实现简易多人聊天室
博主在去年二月份就介绍了Java原生网络编程API,并用它们实现了多种I/O网络编程模型的简易多人聊天室:
- Java网络编程-Socket编程初涉一(简易客户端-服务器)
- Java网络编程-Socket编程初涉二(基于BIO模型的简易多人聊天室)
- Java网络编程-Socket编程初涉三(伪异步I/O模型的简易多人聊天室)
- Java网络编程-Socket编程初涉四(NIO模型的简易多人聊天室)
- Java网络编程-Socket编程初涉五(AIO模型的简易客户端-服务器)
- Java网络编程-Socket编程初涉六(AIO模型的简易多人聊天室)
使用Java原生网络编程API还是比较繁琐的,如果需要更换I/O网络编程模型来适应不同的业务场景,重构工作量还是很大的,而Netty对Java原生网络编程API进行了封装和扩展,使得不同I/O网络编程模型之间的转换需要的代码改动非常少,并且性能非常高,Netty的实现原理留到以后再介绍,将Netty提供的组件使用熟练后,它的实现原理才能理解的更透彻,Netty的实现原理涉及到了Linux内核部分,比如零拷贝、I/O多路复用等,以及DMA等硬件。
Netty介绍
EventLoopGroup、EventLoop
Netty的调度模块称为EventLoopGroup,Netty提供了NioEventLoopGroup、OioEventLoopGroup、EpollEventLoopGroup(在Linux下可用)等多种实现。

EventLoopGroup是一组EventLoop的抽象,一个EventLoopGroup当中会包含一个或多个EventLoop,如下图所示(图来自《Netty In Action》):


Channel
EventLoop在它的整个生命周期中只会与一个Thread(真正的I/O线程)绑定。所有由EventLoop处理的I/O事件都将在它所关联的Thread上进行处理。一个Channel在它的整个生命周期中只会注册在一个EventLoop上。也就是说一个Channel上绑定的所有方法只会由同一个线程执行。一个EventLoop在运行过程当中会被分配给一个或多个Channel。
Channel可以有一个父Channel,这取决于它是如何创建的。 例如,被ServerSocketChannel接受的SocketChannel(当客户端与服务端建立连接时,在服务端创建与客户端通信的Channel)将在parent()上返回ServerSocketChannel作为其父Channel。Channel用于连接字节缓冲区和另一端的实体,这个实体可以是Socket,也可以是File,在NIO网络编程模型中,服务端和客户端进行IO通信的媒介就是Channel。Netty对Java原生的ServerSocketChannel进行了封装和增强,相对于原生的Channel, Netty的Channel增加了如下组件(但不限于这些组件):
-
ChannelId:标识唯一身份信息。 -
ChannelPipeline:处理或拦截Channel入站事件和出站操作的ChannelHandler的列表。 ChannelPipeline实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式以及 ChannelPipeline中的ChannelHandler如何互相交互。每个Channel都有自己的ChannelPipeline,并在创建新Channel时自动创建。 -
EventLoop:用来处理Channel I/O事件的EventLoop。 -
ChannelConfig:Channel的配置参数(例如接收缓冲区大小)。
ChannelHandler
ChannelHandler会处理I/O事件或拦截I/O操作,并将其转发到ChannelPipeline中的下一个ChannelHandler。ChannelHandler本身并没有提供很多方法,通常需要实现其子类型之一:
-
ChannelInboundHandler:处理入站I/O事件的抽象。 -
ChannelOutboundHandler:处理出站I/O操作的抽象。
为了方便,Netty提供了以下适配器类:
-
ChannelInboundHandlerAdapter:处理入站I/O事件的一种简单实现。 -
ChannelOutboundHandlerAdapter:处理出站I/O操作的一种简单实现。
Bootstrap、ServerBootStrap
Netty的启动类分为客户端启动类和服务端启动类,分别是BootStrap和ServerBootStrap。它们都是AbstractBootStrap的子类,总的来说它们都是Netty中的辅助类,提供了链式配置方法,方便了Channel的引导和启动。
简易多人聊天室
博主接下来用Netty实现一个NIO网络编程模型的简易多人聊天室,来介绍Netty的基本使用。
首先需要导入Netty的依赖(博主使用4.1.70.Final版本):
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.70.Final</version>
</dependency>
服务端
package com.kaven.netty.nio;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.concurrent.atomic.AtomicReference;
public class Server {
private static final int PORT = 8080;
public static void main(String[] args) throws InterruptedException {
final ServerHandler serverHandler = new ServerHandler();
// 主线程组,用于接受客户端的连接,但是不做任何处理,跟老板一样
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 从线程组,主线程组会把任务丢给它,让从线程组去做相应的处理
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup , workerGroup)
.channel(NioServerSocketChannel.class)
.localAddress(PORT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(serverHandler);
}
});
ChannelFuture channelFuture = serverBootstrap.bind().sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
@ChannelHandler.Sharable
static class ServerHandler extends ChannelInboundHandlerAdapter {
private static ChannelGroup clientChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
clientChannels.add(channel);
String sendMsg = "客户[" + channel.remoteAddress() + "]上线\n";
System.out.print(sendMsg);
clientChannels.forEach(clientChannel -> {
if(clientChannel != channel) {
clientChannel.writeAndFlush(sendMsg);
}
else {
clientChannel.writeAndFlush("欢迎您上线\n");
}
});
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
if(clientChannels.contains(channel)) {
clientChannels.remove(channel);
String sendMsg = "客户[" + channel.remoteAddress() + "]异常下线\n";
System.out.print(sendMsg);
clientChannels.forEach(clientChannel -> clientChannel.writeAndFlush(sendMsg));
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel channel = ctx.channel();
AtomicReference<String> sendMsg = new AtomicReference<>("客户[" + channel.remoteAddress() + "]消息: " + msg + "\n");
if(msg instanceof String && msg.equals("quit")) {
clientChannels.remove(channel);
channel.close();
sendMsg.set("客户[" + channel.remoteAddress() + "]下线\n");
System.out.print(sendMsg.get());
}
clientChannels.forEach(clientChannel -> {
if(clientChannel != channel) {
clientChannel.writeAndFlush(sendMsg.get());
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel channel = ctx.channel();
clientChannels.remove(channel);
String msg = cause.getMessage();
String sendMsg = "客户[" + channel.remoteAddress() + "]异常: " + msg + "\n";
System.out.print(sendMsg);
clientChannels.forEach(clientChannel -> clientChannel.writeAndFlush(sendMsg));
}
}
}
当有客户端连接时,服务端会使用ChannelInitializer初始化与客户端通信的Channel,并且在Channel的ChannelPipeline 中添加ChannelHandler,这里主要是添加StringDecoder(将接收到的ByteBuf解码为String,是一种ChannelInboundHandlerAdapter)、StringEncoder(将请求的String编码为ByteBuf,是一种ChannelOutboundHandlerAdapter)以及自实现的ChannelInboundHandlerAdapter(处理与客户端之间的通信)。
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(serverHandler);
}
});@ChannelHandler.Sharable注解表示可以将该ChannelHandler的同一实例多次添加到一个或多个ChannelPipeline中,而不会出现竞争条件。如果未指定此注解,则每次将ChannelHandler实例添加到ChannelPipeline中时都必须创建一个新的实例,因为它具有成员变量等非共享状态。
ChannelGroup是一个线程安全的Set ,包含开放的Channel并提供对它们的各种批量操作。 使用ChannelGroup ,可以将Channel分类为一个有意义的组(例如,基于每个服务或每个状态来分组),以便实现广播的功能,关闭的Channel会自动从集合中删除(博主是手动删除的,只是为了演示怎么删除Channel),因此无需担心ChannelGroup中Channel的生命周期。 一个Channel可以属于多个ChannelGroup 。

private static ChannelGroup clientChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
客户端
package com.kaven.netty.nio;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
import java.util.Scanner;
public class Client {
private static final int PORT = 8080;
public static void main(String[] args) throws InterruptedException {
final ClientHandler clientHandler = new ClientHandler();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(PORT))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(clientHandler);
}
});
ChannelFuture channelFuture = bootstrap.connect().sync();
Channel channel = channelFuture.channel();
//客户端发送消息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String msg = scanner.nextLine();
//通过客户端把输入内容发送到服务端
channel.writeAndFlush(msg).sync();
if(msg.equals("quit")) {
channel.close().sync();
break;
}
}
channelFuture.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
@ChannelHandler.Sharable
static class ClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.print(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel channel = ctx.channel();
String msg = cause.getMessage();
System.out.print("群聊[" + channel.remoteAddress() + "]异常: " + msg);
}
}
}
SimpleChannelInboundHandler抽象类继承了ChannelInboundHandlerAdapter类,并且默认会自动释放所有处理的消息,在这种情况下,如果需要将消息传递给ChannelPipeline中的下一个ChannelHandler,需要使用ReferenceCountUtil.retain(Object) (Netty资源管理部分,博主以后也会详细介绍)。

测试




更换I/O网络编程模型
在Netty中更换I/O网络编程模型非常方便,只需要修改服务端和客户端相应的地方即可,如下图所示:


OioEventLoopGroup OioServerSocketChannel OioSocketChannel
NioEventLoopGroup NioServerSocketChannel NioSocketChannel
EpollEventLoopGroup EpollServerSocketChannel EpollSocketChannel
KQueueEventLoopGroup KQueueServerSocketChannel KQueueSocketChannel
Oio(Old-Blocking-IO)就是BIO网络编程模型。
Epoll:

KQueue:

到这里就结束了,源码分析和实现原理留到以后再进行介绍,之后博主会对Netty中比较重要的组件进行详细介绍和源码分析。如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。










