NIO演变过程
本文会使用strace命令一步一步跟踪程序底层所产生的系统调用来分析NIO的演变过程。
NIO的阻塞模式
package com.morris.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
public class NioBlockingServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8899));
while (true) {
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("connect success: " + socketChannel.socket().getPort());
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int read = socketChannel.read(byteBuffer);
if(read > 0) {
byteBuffer.flip();
byte[] bytes = new byte[byteBuffer.limit()];
byteBuffer.get(bytes);
System.out.println("receive from client: " + new String(bytes));
socketChannel.close();
}
}
}
}
- 启动程序
strace -ff -o out java NioBlockingServer
,查看当前目录下第二个.out文件(后续的内容都摘自这个文件),关键内容如下:
socket(AF_INET6, SOCK_STREAM, IPPROTO_IP) = 6
bind(6, {sa_family=AF_INET6, sin6_port=htons(8899), inet_pton(AF_INET6, "::", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, 28) = 0
listen(6, 50)
accept(6,
accept方法会一直阻塞,等待连接。
- 使用telnet客户端建立连接
telnet localhost 8899
:
accept(6, {sa_family=AF_INET6, sin6_port=htons(53610), inet_pton(AF_INET6, "::ffff:127.0.0.1", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, [28]) = 7
write(1, "connect success: 53610", 22) = 22
write(1, "\n", 1) = 1
read(7,
说明:
- accept:获取到连接,创建文件描述符7,也就是建立了一个连接。
- read:读取数据阻塞。
- 在telnet客户端中输入hello:
read(7, "hello\r\n", 1024) = 7
write(1, "receive from client: hello\r\n", 28) = 28
write(1, "\n", 1) = 1
dup2(8, 7) = 7
close(7) = 0
NIO的阻塞模式其实就是BIO,只不过是不同的API而已。
单线程NIO服务器端
单线程实现的NIO服务器端代码如下:
package com.morris.nio;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.LinkedList;
import java.util.Objects;
@Slf4j
public class SingleThreadServer {
public static final int PORT = 8899;
public static void main(String[] args) throws IOException, InterruptedException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // socket
serverSocketChannel.bind(new InetSocketAddress(PORT)); // bind+listen
serverSocketChannel.configureBlocking(false); // 设置accept不阻塞
LinkedList<SocketChannel> socketChannels = new LinkedList<>(); // 存放所有的客户端连接
while (true) {
Thread.sleep(1000);
// 获取连接
SocketChannel socketChannel = serverSocketChannel.accept(); // 不阻塞 accept
if (Objects.isNull(socketChannel)) { // 没有连接会返回null
System.out.println("null");
} else {
log.info("connect success: {}", socketChannel.socket().getPort());
socketChannel.configureBlocking(false); // 设置read不阻塞
socketChannels.add(socketChannel);
}
// 读取数据
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
for (SocketChannel s : socketChannels) {
if (s.isOpen()) {
int readLength = s.read(byteBuffer); // 不阻塞,没数据返回-1 read
if (readLength > 0) {
byteBuffer.flip();
byte[] bytes = new byte[byteBuffer.limit()];
byteBuffer.get(bytes);
log.info("receive from client: {}", new String(bytes));
s.close();
byteBuffer.clear();
}
}
}
}
}
}
- 启动程序
strace -ff -o out java SingleThreadServer
,查看当前目录下第二个.out文件(后续的内容都摘自这个文件),关键内容如下:
socket(AF_INET6, SOCK_STREAM, IPPROTO_IP) = 6
... ...
bind(6, {sa_family=AF_INET6, sin6_port=htons(8899), inet_pton(AF_INET6, "::", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, 28) = 0
listen(6, 50) = 0
... ...
fcntl(6, F_SETFL, O_RDWR|O_NONBLOCK) = 0
... ...
accept(6, 0x7fdbd410c780, [28]) = -1 EAGAIN (Resource temporarily unavailable)
... ...
write(1, "null", 4) = 4
write(1, "\n", 1) = 1
说明:
- socket:对应java中的
ServerSocketChannel.open()
方法,创建一个socket。 - bind和listen:对应java中的bind()方法,bind()方法源码中会去调用listen方法。
- fcntl:设置文件描述符6也就是accept()方法为非阻塞(O_NONBLOCK)。
- accept:不阻塞,没有获取到连接就会返回-1,对应java中的accept方法返回null。
此时没有客户端连接,控制台就会一直打印null。
- 使用telnet客户端建立连接
telnet localhost 8899
... ....
accept(6, {sa_family=AF_INET6, sin6_port=htons(49757), inet_pton(AF_INET6, "::ffff:127.0.0.1", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, [28]) = 7
... ...
write(1, "connect success: 49757", 22) = 22
write(1, "\n", 1) = 1
fcntl(7, F_GETFL) = 0x2 (flags O_RDWR)
fcntl(7, F_SETFL, O_RDWR|O_NONBLOCK) = 0
read(7, 0x7f6bd4108c50, 1024) = -1 EAGAIN (Resource temporarily unavailable)
说明:
- accept:获取到连接,创建文件描述符7,也就是建立了一个连接。
- fcntl:设置文件描述符7也就是read()方法为非阻塞(O_NONBLOCK)。
- read:读取数据不阻塞,无数据返回-1。
- 在telnet客户端中输入hello
read(7, "hello\r\n", 1024) = 7
write(1, "receive from client: hello\r\n", 28) = 28
write(1, "\n", 1) = 1
dup2(8, 7) = 7
close(7) = 0
优点:不管是接受连接,还是读数据都不会阻塞,这样一个线程就能实现一个服务器的功能。
单线程NIO服务器端的缺点:假设有1万个连接,只有一个连接有数据,那么在用户态需要遍历1万个连接,发生1万次系统调用才知道哪个连接有数据,产生9999次无效的系统调用。那么怎么解决这个问题呢,就需要下面的IO多路复用器。
单线程实现IO多路复用器
单线程实现的IO多路复用器代码如下:
package com.morris.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class SingleSelectorServer {
public static final int PORT = 8899;
private static Selector selector;
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(PORT));
serverSocketChannel.configureBlocking(false);
selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 关心接受连接事件
System.out.println("server is start at " + PORT);
while (true) {
while (selector.select() > 0) { // selector.select()不带时间会一直阻塞,可以带一个超时时间
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if(key.isValid()) {
if(key.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
// select只会返回有数据的FD,真正获取连接和读取数据还需要调用accept和read
SocketChannel socketChannel = ssc.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
if(key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
socketChannel.read(byteBuffer);
byteBuffer.flip();
byte[] bytes = new byte[byteBuffer.limit()];
byteBuffer.get(bytes);
System.out.println("receive from client: " + new String(bytes));
socketChannel.close();
}
}
}
}
}
}
}
- 启动程序
strace -ff -o out java SingleSelectorServer
socket(AF_INET6, SOCK_STREAM, IPPROTO_IP) = 4
bind(4, {sa_family=AF_INET6, sin6_port=htons(8899), inet_pton(AF_INET6, "::", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, 28) = 0
listen(4, 50) = 0
fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK) = 0
epoll_create(256) = 7
epoll_ctl(7, EPOLL_CTL_ADD, 4, {EPOLLIN, {u32=4, u64=11028390733726875652}}) = 0
epoll_wait(7,
程序会一直阻塞在epoll_wait,对应java代码的select()方法。
- 使用telnet客户端建立连接
telnet localhost 8899
epoll_wait(7, [{EPOLLIN, {u32=4, u64=11028390733726875652}}], 4096, -1) = 1
accept(4, {sa_family=AF_INET6, sin6_port=htons(48222), inet_pton(AF_INET6, "::1", &sin6_addr), sin6_flowinfo=htonl(0), sin6_scope_id=0}, [28]) = 8
fcntl(8, F_SETFL, O_RDWR|O_NONBLOCK) = 0
epoll_ctl(7, EPOLL_CTL_ADD, 8, {EPOLLIN, {u32=8, u64=11115416937331425288}}) = 0
epoll_wait(7,
epoll_wait返回一个文件描述符4,accept在文件描述符4上建立连接,并将文件描述符4添加至epoll中。
- 在telnet客户端中输入hello
epoll_wait(7, [{EPOLLIN, {u32=8, u64=11115416937331425288}}], 4096, -1) = 1 = 0
read(8, "hello\n", 1024) = 6
write(1, "receive from client: hello\n", 27) = 27
write(1, "\n", 1) = 1 = 8
epoll_ctl(7, EPOLL_CTL_DEL, 8, 0x7f009a41e420) = -1 ENOENT (No such file or directory)
close(8) = 0
epoll_wait(7,
epoll_wait返回有数据到来的文件描述符8,使用read从文件描述符8读取数据。
单线程实现的IO多路复用器的缺点:获取连接和读取数据的业务逻辑耦合在一个线程中,如果读取数据的业务逻辑比较耗时,那么会影响其他客户端连接的建立和数据的读取,可以使用多个线程将获取连接和读取数据的业务逻辑分开。
多线程实现IO多路复用器
package com.morris.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class SingleBossMultiWorkerServer {
public static final int PORT = 8899;
private static Selector boss;
private static Selector work1;
private static Selector work2;
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(PORT));
serverSocketChannel.configureBlocking(false);
boss = Selector.open();
work1 = Selector.open();
work2 = Selector.open();
BlockingQueue<SocketChannel>[] blockingQueues = new LinkedBlockingQueue[2];
for (int i = 0; i < blockingQueues.length; i++) {
blockingQueues[i] = new LinkedBlockingQueue<>();
}
serverSocketChannel.register(boss, SelectionKey.OP_ACCEPT);
new BossThread("boss", boss, new Selector[]{work1, work2}, blockingQueues).start();
new WorkerThread("work1", work1, blockingQueues[0]).start();
new WorkerThread("work2", work2, blockingQueues[1]).start();
}
private static class BossThread extends Thread {
private Selector selector;
private AtomicInteger idx = new AtomicInteger();
private Selector[] selectors;
private BlockingQueue<SocketChannel>[] blockingQueues;
public BossThread(String name, Selector selector, Selector[] selectors, BlockingQueue<SocketChannel>[] blockingQueues) {
this.setName(name);
this.selector = selector;
this.selectors = selectors;
this.blockingQueues = blockingQueues;
}
@Override
public void run() {
try {
while (true) {
if (selector.select() > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel();
SocketChannel sc = ssc.accept();
System.out.println("connect success " + sc.socket().getPort());
int index = idx.getAndIncrement() % selectors.length;
blockingQueues[index].offer(sc);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private static class WorkerThread extends Thread {
private Selector selector;
private BlockingQueue<SocketChannel> blockingQueue;
public WorkerThread(String name, Selector selector, BlockingQueue<SocketChannel> blockingQueue) {
this.setName(name);
this.selector = selector;
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
try {
while (true) {
while (!blockingQueue.isEmpty()) {
SocketChannel socketChannel = blockingQueue.take();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
if (selector.select(100) > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
socketChannel.read(byteBuffer);
byteBuffer.flip();
byte[] bytes = new byte[byteBuffer.limit()];
byteBuffer.get(bytes);
System.out.println("receive from client: " + new String(bytes));
socketChannel.close();
}
}
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
}