0
点赞
收藏
分享

微信扫一扫

网络IO之NIO的演变过程


NIO演变过程

本文会使用strace命令一步一步跟踪程序底层所产生的系统调用来分析NIO的演变过程。

NIO的阻塞模式

package com.morris.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

public class NioBlockingServer {

    public static void main(String[] args) throws IOException {

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(8899));

        while (true) {
            SocketChannel socketChannel = serverSocketChannel.accept();
            System.out.println("connect success: " + socketChannel.socket().getPort());

            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            int read = socketChannel.read(byteBuffer);
            if(read > 0) {
                byteBuffer.flip();
                byte[] bytes = new byte[byteBuffer.limit()];
                byteBuffer.get(bytes);
                System.out.println("receive from client: " + new String(bytes));
                socketChannel.close();
            }
        }
    }
}

  1. 启动程序strace -ff -o out java NioBlockingServer,查看当前目录下第二个.out文件(后续的内容都摘自这个文件),关键内容如下:

socket(AF_INET6, SOCK_STREAM, IPPROTO_IP) = 6
bind(6, {sa_family=AF_INET6, sin6_port=htons(8899), inet_pton(AF_INET6, "::", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, 28) = 0
listen(6, 50) 
accept(6,

accept方法会一直阻塞,等待连接。

  1. 使用telnet客户端建立连接telnet localhost 8899

accept(6, {sa_family=AF_INET6, sin6_port=htons(53610), inet_pton(AF_INET6, "::ffff:127.0.0.1", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, [28]) = 7
write(1, "connect success: 53610", 22)  = 22
write(1, "\n", 1)                       = 1
read(7,

说明:

  • accept:获取到连接,创建文件描述符7,也就是建立了一个连接。
  • read:读取数据阻塞。
  1. 在telnet客户端中输入hello:

read(7, "hello\r\n", 1024)              = 7
write(1, "receive from client: hello\r\n", 28) = 28
write(1, "\n", 1)                       = 1
dup2(8, 7)                              = 7
close(7)                                = 0

NIO的阻塞模式其实就是BIO,只不过是不同的API而已。

单线程NIO服务器端

单线程实现的NIO服务器端代码如下:

package com.morris.nio;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.LinkedList;
import java.util.Objects;

@Slf4j
public class SingleThreadServer {

    public static final int PORT = 8899;

    public static void main(String[] args) throws IOException, InterruptedException {

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // socket
        serverSocketChannel.bind(new InetSocketAddress(PORT)); // bind+listen
        serverSocketChannel.configureBlocking(false); // 设置accept不阻塞
        LinkedList<SocketChannel> socketChannels = new LinkedList<>(); // 存放所有的客户端连接

        while (true) {
            Thread.sleep(1000);
            // 获取连接
            SocketChannel socketChannel = serverSocketChannel.accept(); // 不阻塞 accept
            if (Objects.isNull(socketChannel)) { // 没有连接会返回null
                System.out.println("null");
            } else {
                log.info("connect success: {}", socketChannel.socket().getPort());
                socketChannel.configureBlocking(false); // 设置read不阻塞
                socketChannels.add(socketChannel);
            }

            // 读取数据
            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
            for (SocketChannel s : socketChannels) {
                if (s.isOpen()) {
                    int readLength = s.read(byteBuffer); // 不阻塞,没数据返回-1  read
                    if (readLength > 0) {
                        byteBuffer.flip();
                        byte[] bytes = new byte[byteBuffer.limit()];
                        byteBuffer.get(bytes);
                        log.info("receive from client: {}", new String(bytes));
                        s.close();
                        byteBuffer.clear();
                    }
                }
            }
        }
    }
}

  1. 启动程序strace -ff -o out java SingleThreadServer,查看当前目录下第二个.out文件(后续的内容都摘自这个文件),关键内容如下:

socket(AF_INET6, SOCK_STREAM, IPPROTO_IP) = 6
... ...
bind(6, {sa_family=AF_INET6, sin6_port=htons(8899), inet_pton(AF_INET6, "::", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, 28) = 0
listen(6, 50)                           = 0
... ...
fcntl(6, F_SETFL, O_RDWR|O_NONBLOCK)    = 0
... ...
accept(6, 0x7fdbd410c780, [28])         = -1 EAGAIN (Resource temporarily unavailable)
... ...
write(1, "null", 4)                     = 4
write(1, "\n", 1)                       = 1

说明:

  • socket:对应java中的ServerSocketChannel.open()方法,创建一个socket。
  • bind和listen:对应java中的bind()方法,bind()方法源码中会去调用listen方法。
  • fcntl:设置文件描述符6也就是accept()方法为非阻塞(O_NONBLOCK)。
  • accept:不阻塞,没有获取到连接就会返回-1,对应java中的accept方法返回null。

此时没有客户端连接,控制台就会一直打印null。

  1. 使用telnet客户端建立连接telnet localhost 8899

... ....
accept(6, {sa_family=AF_INET6, sin6_port=htons(49757), inet_pton(AF_INET6, "::ffff:127.0.0.1", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, [28]) = 7
... ...
write(1, "connect success: 49757", 22)  = 22
write(1, "\n", 1)                       = 1
fcntl(7, F_GETFL)                       = 0x2 (flags O_RDWR)
fcntl(7, F_SETFL, O_RDWR|O_NONBLOCK)    = 0
read(7, 0x7f6bd4108c50, 1024)           = -1 EAGAIN (Resource temporarily unavailable)

说明:

  • accept:获取到连接,创建文件描述符7,也就是建立了一个连接。
  • fcntl:设置文件描述符7也就是read()方法为非阻塞(O_NONBLOCK)。
  • read:读取数据不阻塞,无数据返回-1。
  1. 在telnet客户端中输入hello

read(7, "hello\r\n", 1024)              = 7
write(1, "receive from client: hello\r\n", 28) = 28
write(1, "\n", 1)                       = 1
dup2(8, 7)                              = 7
close(7)                                = 0

优点:不管是接受连接,还是读数据都不会阻塞,这样一个线程就能实现一个服务器的功能。

单线程NIO服务器端的缺点:假设有1万个连接,只有一个连接有数据,那么在用户态需要遍历1万个连接,发生1万次系统调用才知道哪个连接有数据,产生9999次无效的系统调用。那么怎么解决这个问题呢,就需要下面的IO多路复用器。

单线程实现IO多路复用器

单线程实现的IO多路复用器代码如下:

package com.morris.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class SingleSelectorServer {

    public static final int PORT = 8899;

    private static Selector selector;

    public static void main(String[] args) throws IOException {

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(PORT));
        serverSocketChannel.configureBlocking(false);
        selector = Selector.open();

        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 关心接受连接事件
        System.out.println("server is start at " + PORT);

        while (true) {

            while (selector.select() > 0) { // selector.select()不带时间会一直阻塞,可以带一个超时时间
                Set<SelectionKey> selectionKeys = selector.selectedKeys();

                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();

                    if(key.isValid()) {
                        if(key.isAcceptable()) {
                            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                            // select只会返回有数据的FD,真正获取连接和读取数据还需要调用accept和read
                            SocketChannel socketChannel = ssc.accept();
                            socketChannel.configureBlocking(false);
                            socketChannel.register(selector, SelectionKey.OP_READ);
                        }

                        if(key.isReadable()) {
                            SocketChannel socketChannel = (SocketChannel) key.channel();
                            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
                            socketChannel.read(byteBuffer);

                            byteBuffer.flip();

                            byte[] bytes = new byte[byteBuffer.limit()];
                            byteBuffer.get(bytes);

                            System.out.println("receive from client: " + new String(bytes));
                            socketChannel.close();
                        }
                    }

                }

            }

        }
    }
}

  1. 启动程序strace -ff -o out java SingleSelectorServer

socket(AF_INET6, SOCK_STREAM, IPPROTO_IP) = 4
bind(4, {sa_family=AF_INET6, sin6_port=htons(8899), inet_pton(AF_INET6, "::", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, 28) = 0
listen(4, 50)                           = 0
fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK)    = 0
epoll_create(256)                       = 7
epoll_ctl(7, EPOLL_CTL_ADD, 4, {EPOLLIN, {u32=4, u64=11028390733726875652}}) = 0
epoll_wait(7,

程序会一直阻塞在epoll_wait,对应java代码的select()方法。

  1. 使用telnet客户端建立连接telnet localhost 8899

epoll_wait(7, [{EPOLLIN, {u32=4, u64=11028390733726875652}}], 4096, -1) = 1
accept(4, {sa_family=AF_INET6, sin6_port=htons(48222), inet_pton(AF_INET6, "::1", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, [28]) = 8
fcntl(8, F_SETFL, O_RDWR|O_NONBLOCK)    = 0
epoll_ctl(7, EPOLL_CTL_ADD, 8, {EPOLLIN, {u32=8, u64=11115416937331425288}}) = 0
epoll_wait(7,

epoll_wait返回一个文件描述符4,accept在文件描述符4上建立连接,并将文件描述符4添加至epoll中。

  1. 在telnet客户端中输入hello

epoll_wait(7, [{EPOLLIN, {u32=8, u64=11115416937331425288}}], 4096, -1) = 1                              = 0
read(8, "hello\n", 1024)                = 6
write(1, "receive from client: hello\n", 27) = 27
write(1, "\n", 1)                       = 1                           = 8
epoll_ctl(7, EPOLL_CTL_DEL, 8, 0x7f009a41e420) = -1 ENOENT (No such file or directory)
close(8)                                = 0
epoll_wait(7,

epoll_wait返回有数据到来的文件描述符8,使用read从文件描述符8读取数据。

单线程实现的IO多路复用器的缺点:获取连接和读取数据的业务逻辑耦合在一个线程中,如果读取数据的业务逻辑比较耗时,那么会影响其他客户端连接的建立和数据的读取,可以使用多个线程将获取连接和读取数据的业务逻辑分开。

多线程实现IO多路复用器

package com.morris.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class SingleBossMultiWorkerServer {

    public static final int PORT = 8899;

    private static Selector boss;
    private static Selector work1;
    private static Selector work2;
    
    public static void main(String[] args) throws IOException {

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        serverSocketChannel.bind(new InetSocketAddress(PORT));
        serverSocketChannel.configureBlocking(false);

        boss = Selector.open();
        work1 = Selector.open();
        work2 = Selector.open();

        BlockingQueue<SocketChannel>[] blockingQueues = new LinkedBlockingQueue[2];
        for (int i = 0; i < blockingQueues.length; i++) {
            blockingQueues[i] = new LinkedBlockingQueue<>();
        }

        serverSocketChannel.register(boss, SelectionKey.OP_ACCEPT);

        new BossThread("boss", boss, new Selector[]{work1, work2}, blockingQueues).start();
        new WorkerThread("work1", work1, blockingQueues[0]).start();
        new WorkerThread("work2", work2, blockingQueues[1]).start();

    }

    private static class BossThread extends Thread {
        private Selector selector;
        private AtomicInteger idx = new AtomicInteger();
        private Selector[] selectors;
        private BlockingQueue<SocketChannel>[] blockingQueues;

        public BossThread(String name, Selector selector, Selector[] selectors, BlockingQueue<SocketChannel>[] blockingQueues) {
            this.setName(name);
            this.selector = selector;
            this.selectors = selectors;
            this.blockingQueues = blockingQueues;
        }

        @Override
        public void run() {

            try {
                while (true) {
                    if (selector.select() > 0) {
                        Set<SelectionKey> selectionKeys = selector.selectedKeys();
                        Iterator<SelectionKey> iterator = selectionKeys.iterator();
                        while (iterator.hasNext()) {
                            SelectionKey selectionKey = iterator.next();
                            iterator.remove();
                            ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel();
                            SocketChannel sc = ssc.accept();

                            System.out.println("connect success " + sc.socket().getPort());

                            int index = idx.getAndIncrement() % selectors.length;
                            blockingQueues[index].offer(sc);
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }


    private static class WorkerThread extends Thread {

        private Selector selector;
        private BlockingQueue<SocketChannel> blockingQueue;

        public WorkerThread(String name, Selector selector, BlockingQueue<SocketChannel> blockingQueue) {
            this.setName(name);
            this.selector = selector;
            this.blockingQueue = blockingQueue;
        }

        @Override
        public void run() {

            try {
                while (true) {

                    while (!blockingQueue.isEmpty()) {
                        SocketChannel socketChannel = blockingQueue.take();
                        socketChannel.configureBlocking(false);
                        socketChannel.register(selector, SelectionKey.OP_READ);
                    }

                    if (selector.select(100) > 0) {
                        Set<SelectionKey> selectionKeys = selector.selectedKeys();
                        Iterator<SelectionKey> iterator = selectionKeys.iterator();
                        while (iterator.hasNext()) {
                            SelectionKey selectionKey = iterator.next();
                            iterator.remove();
                            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
                            socketChannel.read(byteBuffer);

                            byteBuffer.flip();
                            byte[] bytes = new byte[byteBuffer.limit()];
                            byteBuffer.get(bytes);

                            System.out.println("receive from client: " + new String(bytes));
                            socketChannel.close();
                        }
                    }
                }
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}


举报

相关推荐

0 条评论