0
点赞
收藏
分享

微信扫一扫

Netty :Netty介绍 & 实现简易多人聊天室


Netty :Netty介绍 & 实现简易多人聊天室

博主在去年二月份就介绍了​​Java​​​原生网络编程​​API​​​,并用它们实现了多种​​I/O​​网络编程模型的简易多人聊天室:

  • ​​Java网络编程-Socket编程初涉一(简易客户端-服务器)​​
  • ​​Java网络编程-Socket编程初涉二(基于BIO模型的简易多人聊天室)​​
  • ​​Java网络编程-Socket编程初涉三(伪异步I/O模型的简易多人聊天室)​​
  • ​​Java网络编程-Socket编程初涉四(NIO模型的简易多人聊天室)​​
  • ​​Java网络编程-Socket编程初涉五(AIO模型的简易客户端-服务器)​​
  • ​​Java网络编程-Socket编程初涉六(AIO模型的简易多人聊天室)​​

使用​​Java​​​原生网络编程​​API​​​还是比较繁琐的,如果需要更换​​I/O​​​网络编程模型来适应不同的业务场景,重构工作量还是很大的,而​​Netty​​​对​​Java​​​原生网络编程​​API​​​进行了封装和扩展,使得不同​​I/O​​​网络编程模型之间的转换需要的代码改动非常少,并且性能非常高,​​Netty​​​的实现原理留到以后再介绍,将​​Netty​​​提供的组件使用熟练后,它的实现原理才能理解的更透彻,​​Netty​​​的实现原理涉及到了​​Linux​​​内核部分,比如零拷贝、​​I/O​​​多路复用等,以及​​DMA​​等硬件。

Netty介绍

EventLoopGroup、EventLoop

​Netty​​​的调度模块称为​​EventLoopGroup​​​,​​Netty​​​提供了​​NioEventLoopGroup​​​、​​OioEventLoopGroup​​​、​​EpollEventLoopGroup​​​(在​​Linux​​下可用)等多种实现。

Netty :Netty介绍 & 实现简易多人聊天室_spring cloud


​EventLoopGroup​​​是一组​​EventLoop​​​的抽象,一个​​EventLoopGroup​​​当中会包含一个或多个​​EventLoop​​​,如下图所示(图来自​​《Netty In Action》​​):

Netty :Netty介绍 & 实现简易多人聊天室_cloud native_02


Netty :Netty介绍 & 实现简易多人聊天室_spring cloud_03

Channel

​EventLoop​​​在它的整个生命周期中只会与一个​​Thread​​​(真正的​​I/O​​​线程)绑定。所有由​​EventLoop​​​处理的​​I/O​​​事件都将在它所关联的​​Thread​​​上进行处理。一个​​Channel​​​在它的整个生命周期中只会注册在一个​​EventLoop​​​上。也就是说一个​​Channel​​​上绑定的所有方法只会由同一个线程执行。一个​​EventLoop​​​在运行过程当中会被分配给一个或多个​​Channel​​。

​Channel​​​可以有一个父​​Channel​​​,这取决于它是如何创建的。 例如,被​​ServerSocketChannel​​​接受的​​SocketChannel​​​(当客户端与服务端建立连接时,在服务端创建与客户端通信的​​Channel​​​)将在​​parent()​​​上返回​​ServerSocketChannel​​​作为其父​​Channel​​​。​​Channel​​​用于连接字节缓冲区和另一端的实体,这个实体可以是​​Socket​​​,也可以是​​File​​​,在​​NIO​​​网络编程模型中,服务端和客户端进行​​IO​​​通信的媒介就是​​Channel​​​。​​Netty​​​对​​Java​​​原生的​​ServerSocketChannel​​​进行了封装和增强,相对于原生的​​Channel​​​, ​​Netty​​​的​​Channel​​增加了如下组件(但不限于这些组件):

  • ​ChannelId​​:标识唯一身份信息。
  • ​ChannelPipeline​​​:处理或拦截​​Channel​​​入站事件和出站操作的​​ChannelHandler​​​的列表。 ​​ChannelPipeline​​​实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式以及 ​​ChannelPipeline​​​中的​​ChannelHandler​​​如何互相交互。每个​​Channel​​​都有自己的​​ChannelPipeline​​​,并在创建新​​Channel​​时自动创建。
  • ​EventLoop​​​:用来处理​​Channel I/O​​​事件的​​EventLoop​​。
  • ​ChannelConfig​​​:​​Channel​​的配置参数(例如接收缓冲区大小)。

ChannelHandler

​ChannelHandler​​​会处理​​I/O​​​事件或拦截​​I/O​​​操作,并将其转发到​​ChannelPipeline​​​中的下一个​​ChannelHandler​​​。​​ChannelHandler​​本身并没有提供很多方法,通常需要实现其子类型之一:

  • ​ChannelInboundHandler​​​:处理入站​​I/O​​事件的抽象。
  • ​ChannelOutboundHandler​​​:处理出站​​I/O​​操作的抽象。

为了方便,​​Netty​​提供了以下适配器类:

  • ​ChannelInboundHandlerAdapter​​​:处理入站​​I/O​​事件的一种简单实现。
  • ​ChannelOutboundHandlerAdapter​​​:处理出站​​I/O​​操作的一种简单实现。

Bootstrap、ServerBootStrap

​Netty​​​的启动类分为客户端启动类和服务端启动类,分别是​​BootStrap​​​和​​ServerBootStrap​​​。它们都是​​AbstractBootStrap​​​的子类,总的来说它们都是​​Netty​​​中的辅助类,提供了链式配置方法,方便了​​Channel​​的引导和启动。

简易多人聊天室

博主接下来用​​Netty​​​实现一个​​NIO​​​网络编程模型的简易多人聊天室,来介绍​​Netty​​的基本使用。

首先需要导入​​Netty​​​的依赖(博主使用​​4.1.70.Final​​版本):

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.70.Final</version>
</dependency>

服务端

package com.kaven.netty.nio;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.concurrent.atomic.AtomicReference;

public class Server {

private static final int PORT = 8080;

public static void main(String[] args) throws InterruptedException {
final ServerHandler serverHandler = new ServerHandler();
// 主线程组,用于接受客户端的连接,但是不做任何处理,跟老板一样
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 从线程组,主线程组会把任务丢给它,让从线程组去做相应的处理
EventLoopGroup workerGroup = new NioEventLoopGroup();

try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup , workerGroup)
.channel(NioServerSocketChannel.class)
.localAddress(PORT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(serverHandler);
}
});

ChannelFuture channelFuture = serverBootstrap.bind().sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

@ChannelHandler.Sharable
static class ServerHandler extends ChannelInboundHandlerAdapter {

private static ChannelGroup clientChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
clientChannels.add(channel);

String sendMsg = "客户[" + channel.remoteAddress() + "]上线\n";
System.out.print(sendMsg);
clientChannels.forEach(clientChannel -> {
if(clientChannel != channel) {
clientChannel.writeAndFlush(sendMsg);
}
else {
clientChannel.writeAndFlush("欢迎您上线\n");
}
});
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
if(clientChannels.contains(channel)) {
clientChannels.remove(channel);

String sendMsg = "客户[" + channel.remoteAddress() + "]异常下线\n";
System.out.print(sendMsg);
clientChannels.forEach(clientChannel -> clientChannel.writeAndFlush(sendMsg));
}
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel channel = ctx.channel();

AtomicReference<String> sendMsg = new AtomicReference<>("客户[" + channel.remoteAddress() + "]消息: " + msg + "\n");
if(msg instanceof String && msg.equals("quit")) {
clientChannels.remove(channel);
channel.close();
sendMsg.set("客户[" + channel.remoteAddress() + "]下线\n");
System.out.print(sendMsg.get());
}
clientChannels.forEach(clientChannel -> {
if(clientChannel != channel) {
clientChannel.writeAndFlush(sendMsg.get());
}
});
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel channel = ctx.channel();
clientChannels.remove(channel);

String msg = cause.getMessage();
String sendMsg = "客户[" + channel.remoteAddress() + "]异常: " + msg + "\n";
System.out.print(sendMsg);
clientChannels.forEach(clientChannel -> clientChannel.writeAndFlush(sendMsg));
}
}
}

当有客户端连接时,服务端会使用​​ChannelInitializer​​​初始化与客户端通信的​​Channel​​​,并且在​​Channel​​​的​​ChannelPipeline​​​ 中添加​​ChannelHandler​​​,这里主要是添加​​StringDecoder​​​(将接收到的​​ByteBuf​​​解码为​​String​​​,是一种​​ChannelInboundHandlerAdapter​​​)、​​StringEncoder​​​(将请求的​​String​​​编码为​​ByteBuf​​​,是一种​​ChannelOutboundHandlerAdapter​​​)以及自实现的​​ChannelInboundHandlerAdapter​​(处理与客户端之间的通信)。

.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(serverHandler);
}
});

​@ChannelHandler.Sharable​​​注解表示可以将该​​ChannelHandler​​​的同一实例多次添加到一个或多个​​ChannelPipeline​​​中,而不会出现竞争条件。如果未指定此注解,则每次将​​ChannelHandler​​​实例添加到​​ChannelPipeline​​中时都必须创建一个新的实例,因为它具有成员变量等非共享状态。

​ChannelGroup​​​是一个线程安全的​​Set​​​ ,包含开放的​​Channel​​​并提供对它们的各种批量操作。 使用​​ChannelGroup​​​ ,可以将​​Channel​​​分类为一个有意义的组(例如,基于每个服务或每个状态来分组),以便实现广播的功能,关闭的​​Channel​​​会自动从集合中删除(博主是手动删除的,只是为了演示怎么删除​​Channel​​​),因此无需担心​​ChannelGroup​​​中​​Channel​​​的生命周期。 一个​​Channel​​​可以属于多个​​ChannelGroup​​ 。

Netty :Netty介绍 & 实现简易多人聊天室_云原生_04

private static ChannelGroup clientChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

客户端

package com.kaven.netty.nio;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.net.InetSocketAddress;
import java.util.Scanner;

public class Client {

private static final int PORT = 8080;

public static void main(String[] args) throws InterruptedException {

final ClientHandler clientHandler = new ClientHandler();
EventLoopGroup workerGroup = new NioEventLoopGroup();

try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(PORT))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(clientHandler);
}
});

ChannelFuture channelFuture = bootstrap.connect().sync();
Channel channel = channelFuture.channel();
//客户端发送消息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String msg = scanner.nextLine();
//通过客户端把输入内容发送到服务端
channel.writeAndFlush(msg).sync();
if(msg.equals("quit")) {
channel.close().sync();
break;
}
}
channelFuture.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}

@ChannelHandler.Sharable
static class ClientHandler extends SimpleChannelInboundHandler<String> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.print(msg);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel channel = ctx.channel();
String msg = cause.getMessage();
System.out.print("群聊[" + channel.remoteAddress() + "]异常: " + msg);
}
}
}

​SimpleChannelInboundHandler​​​抽象类继承了​​ChannelInboundHandlerAdapter​​​类,并且默认会自动释放所有处理的消息,在这种情况下,如果需要将消息传递给​​ChannelPipeline​​​中的下一个​​ChannelHandler​​​,需要使用​​ReferenceCountUtil.retain(Object)​​​ (​​Netty​​资源管理部分,博主以后也会详细介绍)。

Netty :Netty介绍 & 实现简易多人聊天室_bootstrap_05

测试

Netty :Netty介绍 & 实现简易多人聊天室_云原生_06

Netty :Netty介绍 & 实现简易多人聊天室_spring cloud_07


Netty :Netty介绍 & 实现简易多人聊天室_cloud native_08


Netty :Netty介绍 & 实现简易多人聊天室_cloud native_09

更换I/O网络编程模型

在​​Netty​​​中更换​​I/O​​网络编程模型非常方便,只需要修改服务端和客户端相应的地方即可,如下图所示:

Netty :Netty介绍 & 实现简易多人聊天室_bootstrap_10

Netty :Netty介绍 & 实现简易多人聊天室_.net_11

OioEventLoopGroup        OioServerSocketChannel       OioSocketChannel
NioEventLoopGroup NioServerSocketChannel NioSocketChannel
EpollEventLoopGroup EpollServerSocketChannel EpollSocketChannel
KQueueEventLoopGroup KQueueServerSocketChannel KQueueSocketChannel

​Oio​​​(​​Old-Blocking-IO​​​)就是​​BIO​​网络编程模型。

​Epoll​​:

Netty :Netty介绍 & 实现简易多人聊天室_.net_12

​KQueue​​:

Netty :Netty介绍 & 实现简易多人聊天室_cloud native_13

到这里就结束了,源码分析和实现原理留到以后再进行介绍,之后博主会对​​Netty​​中比较重要的组件进行详细介绍和源码分析。如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。


举报

相关推荐

0 条评论