文章目录
前言(IO、BIO)
IO
一、IO简介
- io即输入/输出,就是用什么样的通道进行数据的发送和接收,很大程度上决定了程序通信的性能。
二、IO通道:
三、IO控制方式
- 程序io方式:忙-等待方式,不断循环等待测试状态位busy,直到空闲才进行后续IO任务。
- 中断驱动IO控制方式:当进程要启动某个IO设备工作时,便有cpu向相应的设备控制器发出一条IO指令,然后立即返回继续执行原来的任务
- 直接存储器访问(DMA)
- IO通道方式:CPU只需要发出一条IO指令,就可以完成多种IO操作,之后被中断。
四、IO模式
- BIO
- NIO
- AIO
BIO
一、简介
- BIO模型:同步并阻塞,即一对一模式:一个线程对一个客户端连接,如果这个连接不做任何事情会造成不必要的线程开销。
二、使用流程
- 服务器端启动一个 Server。
- 客户端启动 Socket 对服务器进行通信,默认情况下服务器端需要对每个客户建立一个线程与之通讯。
- 客户端发出请求后,先咨询服务器是否有线程响应,如果没有则会等待,或者被拒绝。
- 如果有响应,客户端线程会等待请求结束后,再继续执行。
提示:以下是本篇文章正文内容,下面案例可供参考
NIO
一、定义
- 同步非阻塞模式:即一对多模型(一个服务器处理多个客户端请求),服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有 I/O 请求就进行处理。如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。
二、NIO的核心组件
1.简介
- channel通道:Buffer和Channel之间的数据流向是双向的
- buffer缓冲区:面向缓冲区,或者面向块编程的。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络。(而BIO是基于字节流和字符流)
- selector选择器:即多路复用 (HTTP 2.0 使用了多路复用的技术,做到同一个连接并发处理多个请求,而且并发请求的数量比 HTTP 1.1 大了好几个数量级。)
2.Buffer
目的:提高CPU和IO设备之间的并行性;缓和CPU和IO设备之间速度不匹配矛盾
buffer类定义了所有的缓冲区都具有的四个属性来提供关于其所包含的数据元素的信息
public abstract class Buffer {
static final Unsafe UNSAFE = Unsafe.getUnsafe();
static final int SPLITERATOR_CHARACTERISTICS = 16464;
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;
long address;
}
针对读写切换的flip():Buffer有两种模式,写模式和读模式。在写模式下调用flip()之后,Buffer从写模式变成读模式。那么limit就设置成了position当前的值(即当前写了多少数据),postion会被置为0,以表示读操作从缓存的头开始读,mark置为-1。
3.channel
目的:类似于传统IO中的流,是直接对接操作系统(java中的unSafe类)的,当我们处理好缓冲区后,就可以通过缓冲区对通道进行数据的输入输出,完成整个NIO的过程
4.selector
目的:主要作用就是使用一个线程来对多个通道中的已就绪通道进行选择,然后就可以对选择的通道进行数据处理,属于一对多的关系。这种机制在NIO技术中心称为“IO多路复用”。其优势是可以节省CPU资源。
public abstract class Selector implements Closeable {
//得到一个选择器对象
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
//判断selector是否处于工作状态
public abstract boolean isOpen();
//从内部稽核中得到所有的selectorkey
public abstract Set<SelectionKey> selectedKeys();
//监控所有注册的通道,当其中有IO操作可以进行时,将对应的SelectorKey加入内部集合中并返回,参数用来设置超时时间
public abstract int select(long var1) throws IOException;
相关说明:
1.为什么netty支持高并发呢? 大致就是利用了多路复用,netty的IO线程NioEventLoop聚合了Selector(多路复用器),可以同时并发处理成百上千个客户端连接。即比如假设有 10000 个请求过来,根据实际情况,可以分配 50 或者 100 个线程来生成对应selelctor来处理即可。不像传统的阻塞 IO 那样,非得分配 10000 个。
package com.atguigu.nio;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
public class NIOServer {
public static void main(String[] args) throws Exception{
//创建ServerSocketChannel -> ServerSocket
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//得到一个Selecor对象
Selector selector = Selector.open();
//绑定一个端口6666, 在服务器端监听
serverSocketChannel.socket().bind(new InetSocketAddress(6666));
//设置为非阻塞
serverSocketChannel.configureBlocking(false);
//把 serverSocketChannel 注册到 selector 关心 事件为 OP_ACCEPT pos_1
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//循环等待客户端连接
while (true){
//如果返回的>0, 就获取到相关的 selectionKey集合
//1.如果返回的>0, 表示已经获取到关注的事件
//2. selector.selectedKeys() 返回关注事件的集合
// 通过 selectionKeys 反向获取通道
Set<SelectionKey> selectionKeys = selector.selectedKeys();
System.out.println("selectionKeys 数量 = " + selectionKeys.size());
//遍历 Set<SelectionKey>, 使用迭代器遍历
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
//获取到SelectionKey
SelectionKey key = keyIterator.next();
//根据key 对应的通道发生的事件做相应处理
if(key.isAcceptable()) { //如果是 OP_ACCEPT, 有新的客户端连接
//该该客户端生成一个 SocketChannel
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("客户端连接成功 生成了一个 socketChannel " + socketChannel.hashCode());
//将 SocketChannel 设置为非阻塞
socketChannel.configureBlocking(false);
//将socketChannel 注册到selector, 关注事件为 OP_READ, 同时给socketChannel
//关联一个Buffer
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
}
if(key.isReadable()) { //发生 OP_READ
//通过key 反向获取到对应channel
SocketChannel channel = (SocketChannel)key.channel();
//获取到该channel关联的buffer
ByteBuffer buffer = (ByteBuffer)key.attachment();
channel.read(buffer);
System.out.println("form 客户端 " + new String(buffer.array()));
}
//手动从集合中移动当前的selectionKey, 防止重复操作
keyIterator.remove();
}
}
}
}
补充
1.Epoll 导致Selecotor空轮询问题
- 问题:若Selector的轮询结果为空,也没有wakeup或新消息处理,使用selector的selelct方法来轮询当前是否有IO时间,而select方法会一直阻塞,知道IO时间达到或者超时。则发生空轮询,CPU使用率100%,
- 解决
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
//
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++; 关键在这里 每次select则进行计数+1
.
.
.
后续这里会判断
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
如果是超时则跳出阻塞
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break; //如果不是超时则重新构造selelctor
}
currentTimeNanos = time;
}
}
}
1.每次调用后,selectCnt + 1
2.若超时(正常 跳出阻塞),重置selectCnt的值
3.若 未超时(非正常 跳出阻塞),重新构造一个selector,并 重置selectCnt的值
重建Selector相关源码
private void rebuildSelector0() {
final Selector oldSelector = selector; 记录原来的Selector
final SelectorTuple newSelectorTuple;
if (oldSelector == null) {
return;
}
try {
newSelectorTuple = openSelector(); 新建一个Selector
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
关键在这里,将旧的Selector上主的的key全部注册到新的Selector上
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
然后将当前Selector指向新的Selector
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
try {
// time to close the old selector as everything else is registered to the new one
oldSelector.close(); 关闭旧的Selector
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
if (logger.isInfoEnabled()) {
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
}
1.新建一个 Selector
2.将旧的Selector 上注册的key,全部注册 到 新的Selector 上
3.将当前Selector 指向新的Selector
4.关闭旧的Selector