0
点赞
收藏
分享

微信扫一扫

Netty相关理念及案例引导

醉倾城1 2022-04-06 阅读 92

Netty

概述

Netty的官网:https://Netty.io/

也就是说Netty是一个基于NIO的客户,服务端的编程框架,使用Netty可以确保快速和简单的开发出一个网络应用,例如:基于TCP和UDP的socket服务开发

为什么用Netty

优势

用于各种传输类型的统一API-阻塞和非阻塞Socket

基于一个灵活和可扩展的事件模型,它允许清晰地分离关注点

高度可定制的线程模型-单线程,一个或多个线程池

真正的无连接数据报Socket支持

文档丰富的Javadoc、用户指南和示例

没有其他依赖项,JDK5(Netty3.x)或JDK6(Netty4.x)就足够了

更好的吞吐量,更低的延迟

减少资源消耗

最小化不必要的内存拷贝(零拷贝–操作系统层面)

完整的SSL/TLS和StartTLS支持

版本迭代周期短,发现的Bug 可以被及时修复,同时,更多的新功能会被加入。

Netty的使用场景

  1. 互联网行业:在分布式系统中,各个节点之间需要远程服务调用,高性能的 RPC 框架必不可少,Netty 作为异步高性能的通信框架,往往作为基础通信组件被这些 RPC 框架使用。典型的应用有:阿里
    分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通信,Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信
  2. 游戏行业:无论是手游服务端还是大型的网络游戏,Java 语言得到了越来越广泛的应用。Netty 作为高性能的基础通信组件,它本身提供了 TCP/UDP 和 HTTP 协议栈。非常方便定制和开发私有协议栈,账号登录服务器,地图服务器之间可以方便的通过 Netty 进行高性能的通信
  3. 大数据领域:经典的 Hadoop 的高性能通信和序列化组件 Avro 的 RPC 框架,默认采用 Netty 进行跨界点通信,它的 Netty Service 基于 Netty 框架二次封装实现

Netty高性能分析

Netty 作为异步事件驱动的网络应用程序框架,高性能之处主要来自于其 I/O 模型和线程处理模型,I/O 模型决定如何收发数据,线程模型决定如何处理数据

I/O模型

阻塞IO

传统阻塞型I/O(BIO)

在这里插入图片描述

1. 每个请求都需要独立的线程完成数据的读写和业务处理
2. 当客户端请求并发数较大时,需要创建大量线程来处理连接,系统资源占用较大
3. 建立连接后,如果当前线程暂时没有数据可读,则线程就会阻塞在读取数据的操作上,造成线程资源浪费

IO复用模型

在 I/O 复用模型中,会用到 Select,这个函数也会使进程阻塞,但是和阻塞 I/O 所不同的是这两个函数可以同时阻塞多个 I/O 操作,而且可以同时对多个读操作,多个写操作的 I/O 函数进行检测,直到有数据可读或可写时,才真正调用 I/O 操作函数

Netty 的非阻塞 I/O 的实现关键是基于 I/O 复用模型,这里用 Selector 对象表示:

在这里插入图片描述

1. 当线程从一个客户端SocketChannel进行读写数据时,若没有数据可用时,该线程可以进行其他任务
2. 线程通常将非阻塞IO的空闲时间用于在其他通道上执行IO操作,所以单独的线程可以管理多个输入和输出通道
3. 由于读写操作都是非阻塞的,这就可以充分提升IO线程的运行效率,避免由于频繁I/O阻塞导致的线程挂起
4. 一个 I/O 线程可以并发处理N个客户端连接和读写操作,这从根本上解决了传统同步阻塞I/O一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升
传统的IO是面向字节流或字符流的,以流式的方式顺序地从一个Stream中读取一个或多个字节, 因此也就不能随意改变读取指针的位置。

在NIO中,抛弃了传统的IO流,而是引入了Channel和Buffer的概念。在NIO中,只能从Channel中读取数据到Buffer中或将数据从Buffer中写入到 Channel

基于Buffer操作不像传统IO的顺序操作,NIO中可以随意地读取任意位置的数据

线程模型

线程模型决定服务器如何处理数据

传统阻塞I/O服务模型

在这里插入图片描述

特点:

  • 采用阻塞式I/O模型获取输入数据
  • 每个连接都需要独立的线程完成数据输入,业务处理,数据返回的完整操作

存在问题:

  • 并发数较大时,需要创建大量线程来处理连接,系统资源占用较大
  • 连接建立后,如果当前线程暂时没有数据可读,线程就阻塞在读取数据操作上,造成线程资源浪费

Reactor模式

针对传统阻塞I/O服务模型的缺点,有如下常见解决方案:

  • 基于 I/O 复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象上等待,无需阻塞等待所有连接。当某条连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理
  • 基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务

Reactor模式基本设计思想就是I/O复用结合线程池:

在这里插入图片描述

Reactor 模式,是指通过一个或多个输入同时传递给服务处理器的服务请求的事件驱动处理模式。

服务端程序处理传入多路请求,并将它们同步分派给请求对应的处理线程,Reactor 模式也叫Dispatcher 模式

根据Reactor 的数量和Handler的数量不同,有 3 种典型的实现:

  • 单Reactor单线程
  • 多线程Reactor
  • 主从Reactor多线程
单线程Reactor

在这里插入图片描述

说明:

  • Reactor 对象通过 Select 监控客户端请求事件,收到事件后通过 Dispatch 进行分发
  • 如果是建立连接请求事件,则由 Acceptor 通过 Accept 处理连接请求,然后创建一个 Handler对象处理连接完成后的后续业务处理
  • 如果不是建立连接事件,则 Reactor 会分发调用连接对应的 Handler 来响应
  • Handler 会完成 Read→业务处理→Send 的完整业务流程

优点: 模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成

缺点: 性能问题,只有一个线程,无法完全发挥多核 CPU 的性能。Handler 在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈。 当其中某个 handler 阻塞时, 会导致其他所有的 client 的 handler 都得不到执行, 并且更严重的是, handler 的阻塞也会导致整个服务不能接收新的 client 请求(因为 acceptor 也被阻塞了)。 因为有这么多的缺陷, 因此单线程Reactor 模型用的比较少

使用场景: 客户端的数量有限,业务处理非常快速,比如 Redis

多线程Reactor

在这里插入图片描述

说明:

  • Reactor 对象通过 Select 监控客户端请求事件,收到事件后通过 Dispatch 进行分发
  • 如果是建立连接请求事件,则由 Acceptor 通过 Accept 处理连接请求,然后创建一个 Handler对象处理连接完成后续的各种事件
  • 如果不是建立连接事件,则 Reactor 会分发调用连接对应的 Handler 来响应
  • Handler 只负责响应事件,不做具体业务处理,通过 Read 读取数据后,会分发给后面的Worker 线程池进行业务处理
  • Worker 线程池会分配独立的线程完成真正的业务处理,如何将响应结果发给 Handler 进行处理
  • Handler 收到响应结果后通过 Send 将响应结果返回给 Client

优点: 可以充分利用多核 CPU 的处理能力

缺点: 多线程数据共享和访问比较复杂;Reactor 承担所有事件的监听和响应,在单线程中运行,高并发场景下容易成为性能瓶颈

主从多线程Reactor

在这里插入图片描述

说明:

  • Reactor 主线程 MainReactor 对象通过 Select 监控建立连接事件,收到事件后通过 Acceptor接收,处理建立连接事件
  • Acceptor 处理建立连接事件后,MainReactor 将连接分配 Reactor 子线程给 SubReactor 进行处理
  • SubReactor 将连接加入连接队列进行监听,并创建一个 Handler 用于处理各种连接事件
  • 当有新的事件发生时,SubReactor 会调用连接对应的 Handler 进行响应
  • Handler 通过 Read 读取数据后,会分发给后面的 Worker 线程池进行业务处理
  • Worker 线程池会分配独立的线程完成真正的业务处理,如何将响应结果发给 Handler 进行处理
  • Handler 收到响应结果后通过 Send 将响应结果返回给 Client

优点: 父线程与子线程的数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理

父线程与子线程的数据交互简单,Reactor 主线程只需要把新连接传给子线程,子线程无需返回数据。这种模型在许多项目中广泛使用,包括 Nginx 主从 Reactor 多进程模型,Memcached 主从多线程,Netty 主从多线程模型的支持

总结

打个比方:把餐厅的接待员理解为Reactor模型中只处理连接事件的handler,把餐厅的侍应生比作为处理后续读写等事件的handler

  • 单Reactor单线程,接待员和侍应生是同一个人,全程为顾客服务
  • 多线程Reactor,1 个接待员小姐姐,多个侍应生小哥哥,接待员只负责接待
  • 主从 Reactor 多线程,多个接待员小姐姐,多个侍应生小哥哥

Reactor具有如下优点:

  • 响应快,不必为单个同步时间所阻塞,虽然 Reactor 本身依然是同步的
  • 编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销
  • 可扩展性,可以方便的通过增加 Reactor 实例个数来充分利用 CPU 资源
  • 可复用性,Reactor 模型本身与具体事件处理逻辑无关,具有很高的复用性

Netty工作架构

  1. Netty抽象出两组线程池: BossGroup负责接收客户端的连接, WorkerGroup负责网络的读写,他们的类型都是NioEventLoopGroup

  2. NioEventLoopGroup 相当于一个事件循环组,主要管理 eventLoop 的生命周期,可以理解为一个线程池,内部维护了一组线程,每个线程(NioEventLoop)负责处理多个 Channel 上的事件,而一个 Channel 只对应于一个线程

  3. NioEventLoop 表示一个不断循环的执行处理任务的线程, 每个NioEventLoop 都有一个selector, 用于监听绑定在其上的socket的网络通讯;NioEventLoop 中维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用 NioEventLoop 的 run 方法,执行 I/O 任务和非 I/O 任务

    I/O任务,即selectionKey 中 ready 的事件,如 accept、connect、read、write 等,由 processSelectedKeys 方法触发
    
    非 IO 任务,添加到 taskQueue 中的任务,如 register0、bind0 等任务,由 runAllTasks方法触发
    
  4. 每个Boss NioEventLoop 循环执行的步骤

    1. 轮询监听IO 事件(accept)
    2. 处理监听到的连接就绪IO事件(accept) ,与client建立连接 , 生成NioScocketChannel ,并将其注册到某个worker NIOEventLoop 上的 selector
    3. 执行任务队列(taskQueue/delayTaskQueue)中的非IO任务
    
  5. 每个 Worker NIOEventLoop 循环执行的步骤

    1. 轮询监听IO事件(read, write )
    2. 处理监听到的IO事件,在对应NioScocketChannel 处理
    3. 执行任务队列(taskQueue/delayTaskQueue)中的非IO任务
    
  6. 每个Worker NIOEventLoop 处理业务时,会使用pipeline(管道), pipeline 中包含了 channel , 即通过pipeline 可以获取到对应通道, 管道中维护了很多的 处理器

Netty案例

引用依赖

<dependencies>
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId> <!-- Use 'netty-all' for 4.0 or above -->
        <version>4.1.50.Final</version>
        <scope>compile</scope>
    </dependency>
</dependencies>

服务器端

package com.wx;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.sctp.nio.NioSctpServerChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * @Description:
 * TCP服务--服务端
 *
 * @Copyright:Powered By wx
 * @Author: wuxiang
 * @Version V1.0
 */

public class NettyServer {
    public static void main(String[] args) throws Exception {
        // 1.创建俩个线程组 bossGroup  workGroup
        // bossGroup负责客户端连接
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // workGroup线程组负责网络读写操作
        EventLoopGroup workGroup = new NioEventLoopGroup();
       // 2.创建服务器启动助手配置参数--创建辅助的工具类,用于服务器通道的一系列配置
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workGroup) //绑定俩个线程组
                .channel(NioServerSocketChannel.class) //指定NIO模式
                .option(ChannelOption.SO_BACKLOG, 512) //设置TCP缓冲区
                .childOption(ChannelOption.SO_KEEPALIVE, true) //保持连接额
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    /**
                     * 数据接收方法德尔处理
                     */
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new NettyServerHandler()); //具体业务的处理
                    }
                });
        System.out.println("Server准备就绪");
        // 3.绑定端口,设置非堵塞
        ChannelFuture cf = serverBootstrap.bind(8895).sync();
        ChannelFuture cf2 = serverBootstrap.bind(8896).sync();

        System.out.println("Server启动");

        // 4.关闭通道
        cf.channel().closeFuture().sync();
        cf2.channel().closeFuture().sync();

        // 关闭线程组
        bossGroup.shutdownGracefully();
        workGroup.shutdownGracefully();
        System.out.println("Server已关闭线程组");
    }
}

服务器端业务处理类

package com.wx;

import com.sun.beans.editors.ByteEditor;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

import java.nio.ByteBuffer;

/**
 * @Description:
 * 服务器端的业务处理类
 *
 * @Copyright:Powered By wx
 * @Author: wuxiang
 * @Version V1.0
 */

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    // 数据的读取事件
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("ServerHandler--ctx" + ctx);
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("来自客户端的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
    }


    // 数据读取完毕事件
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ChannelFuture channelFuture = ctx.writeAndFlush(Unpooled.copiedBuffer("hi,client,我收到你的消息了",CharsetUtil.UTF_8));
        // 添加监听事件:数据发送完毕后,断开与客户端连接
        channelFuture.addListener(ChannelFutureListener.CLOSE);
    }

    // 异常捕捉
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
       ctx.close();
    }
}

客户端

package com.wx;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * @Description:
 * TCP服务--客户端
 * @Copyright:Powered By 吴翔
 * @Author: wuxiang
 * @Version V1.0
 */

public class NettyClient {
    public void run() throws Exception{
        EventLoopGroup group = new NioEventLoopGroup();
        // 创建客户端的启动助手
        Bootstrap bootstrap = new Bootstrap();
        // 开始配置
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {

                    // 具体业务处理
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new NettyClientHandler());
                    }
                });


        System.out.println("Client准备就绪");
        // 启动客户端连接服务器
        ChannelFuture cf = bootstrap.connect("127.0.0.1", 8895).sync();
        ChannelFuture cf2 = bootstrap.connect("127.0.0.1", 8896).sync();

        cf.channel().closeFuture().sync();
        cf2.channel().closeFuture().sync();

        System.out.println("Client关闭通道");
        group.shutdownGracefully();
        System.out.println("Client关闭线程组");
    }

    public static void main(String[] args) {
        try {
            new NettyClient().run();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}

客户端业务处理类

package com.wx;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

import java.nio.ByteBuffer;

/**
 * @Description:
 * 客户端业务处理类
 * @Copyright:Powered By wx
 * @Author: wuxiang
 * @Version V1.0
 */

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    // 通道就绪
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Client : ctx = " + ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("hi,server,这是来自客户端的招呼", CharsetUtil.UTF_8));
    }

    // 数据读写
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("服务器回复的消息:"  + byteBuf.toString(CharsetUtil.UTF_8));
    }

}

举报

相关推荐

0 条评论