为了改进一线程一连接模型, 后来又演进出了一种通过线程池或者消息队列实现一个或者多个线程处理N个客户端的模型, 通过线程池的方式可以灵活地调配线程资源, 设置线程的最大值,防止由于海量并发接入导致线程耗尽的问题,但它的底层通信机制依然使用同步阻塞I/O,所以被称为 “ 伪异步''。
伪异步I/O模型图
伪异步采用线程池和任务队列实现,模型图如下:

当有新的客户端接入时,将客户端的 Socket封装成一个 Task(该任务实现 java.lang.Runnable接口)投递到后端的线程池中进行处理, JDK的线程池维护一个消息队列和N个活跃线程, 对消息队列中的任务进行处理。由于线程池可以设置消息队列的大小和最大线程数, 因此, 它的资源占用是可控的, 无论多少个客户端并发访问, 都不会导致资源的耗尽和宕机。
下面通过一个程序来说明一下
我们首先创建一个服务器处理类的线程池, 当接收到新的客户端连接时,将请求 Socket 封装成一个 Task,然后调用线程池的 execute方法执行, 从而避免了每个请求接入都创建 一个新的线程
public class Bio2ServerHandlerExcuterPool {
private int maxPoolSize;
private int queueSize;
private ExecutorService executorService;
public Bio2ServerHandlerExcuterPool(int maxPoolSize, int queueSize) {
this.maxPoolSize = maxPoolSize;
this.queueSize = queueSize;
executorService = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(), maxPoolSize, 120L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(queueSize));
}
public void execute(Runnable task) {
executorService.execute(task);
}
}由于线程池和消息队列都是有界的, 因此, 无论客户端并发连接数多大. 它都不会导致线程个数过于膨服或者内存溢出, 相比于传统的一连接一线程模型,是一种改良 。
接下来我们在服务端使用我们的线程池
public class Bio2Server {
public static void main(String[] args) throws IOException {
int port = 8080;
if(args != null && args.length >0 ){
try{
port = Integer.valueOf(args[0]);
}catch (NumberFormatException e){
}
}
ServerSocket server = null;
try{
server = new ServerSocket(port);
System.out.println("the bio2 server is start in port :"+port);
Socket socket = null;
Bio2ServerHandlerExcuterPool pool = new Bio2ServerHandlerExcuterPool(10,10);
while (true){
socket = server.accept();
pool.execute(new Bio2ServerHandler(socket));
}
}finally {
if(server!=null){
System.out.println("the bio2 server close");
server.close();
server = null;
}
}
}
}客户端和BIO没有区别,我们就不说了。
伪异步 I/O通信框架采用了线程池实现, 因此避免了为每个请求都创建一个独立线程造成的线程资源耗尽问题。但是由于它底层的通信依然采用同步阻塞模型,因此无法从根本上解决问题。下面我们对伪异步I/O进行深入分析,看一下它的弊端
伪异步I/O弊端分析
首先我们对输入流InputStream进行分析,先看下read()方法:
/**
* Reads some number of bytes from the input stream and stores them into
* the buffer array <code>b</code>. The number of bytes actually read is
* returned as an integer. This method blocks until input data is
* available, end of file is detected, or an exception is thrown.
*
* <p> If the length of <code>b</code> is zero, then no bytes are read and
* <code>0</code> is returned; otherwise, there is an attempt to read at
* least one byte. If no byte is available because the stream is at the
* end of the file, the value <code>-1</code> is returned; otherwise, at
* least one byte is read and stored into <code>b</code>.
*
* <p> The first byte read is stored into element <code>b[0]</code>, the
* next one into <code>b[1]</code>, and so on. The number of bytes read is,
* at most, equal to the length of <code>b</code>. Let <i>k</i> be the
* number of bytes actually read; these bytes will be stored in elements
* <code>b[0]</code> through <code>b[</code><i>k</i><code>-1]</code>,
* leaving elements <code>b[</code><i>k</i><code>]</code> through
* <code>b[b.length-1]</code> unaffected.
*
* <p> The <code>read(b)</code> method for class <code>InputStream</code>
* has the same effect as: <pre><code> read(b, 0, b.length) </code></pre>
*
* @param b the buffer into which the data is read.
* @return the total number of bytes read into the buffer, or
* <code>-1</code> if there is no more data because the end of
* the stream has been reached.
* @exception IOException If the first byte cannot be read for any reason
* other than the end of the file, if the input stream has been closed, or
* if some other I/O error occurs.
* @exception NullPointerException if <code>b</code> is <code>null</code>.
* @see java.io.InputStream#read(byte[], int, int)
*/
public int read(byte b[]) throws IOException {
return read(b, 0, b.length);
}注意这句话:
This method blocks until input data is available, end of file is detected, or an exception is thrown.
也就是当对 Socket输入流进行读取操作的时候,它会一直阻塞下去,直到发生如下三种事件。
- 有数据可读
- 可用数据已经读取完毕
- 发生空指针或者 I/O异常
这意味着当对方发送请求或者应答消息比较缓慢, 或者网络传输较慢时, 读取输入流一方的通信线程将被长时间阻塞, 如果对方要 60s 才能够将数据发送完成, 读取一方的 I/O 线程也将会被同步阻塞60s, 在此期间, 其他接入消息只能在消息队列中排队 。
下面我们接着对输出流OutputStream进行分析,看下write()方法
/**
* Writes <code>b.length</code> bytes from the specified byte array
* to this output stream. The general contract for <code>write(b)</code>
* is that it should have exactly the same effect as the call
* <code>write(b, 0, b.length)</code>.
*
* @param b the data.
* @exception IOException if an I/O error occurs.
* @see java.io.OutputStream#write(byte[], int, int)
*/
public void write(byte b[]) throws IOException {
write(b, 0, b.length);
}当调用 OutputStream的 write方法写输出流的时候, 它将会被阻塞, 直到所有要发送的字节全部写入完毕,或者发生异常。学习过 TCP/lP相关知识的人都知道,当消息的接收方处理缓慢的时候, 将不能及时地从 TCP 缓冲区读取数据, 这将会导致发送方的 TCP window size不断减小直到为 0,双方处于 Keep-Alive状态,消息发送方将不能再向 TCP 缓冲区写入消息,这时如果采用的是同步阻塞l/O, write操作将会被无限期阻塞,直到 TCP window size大于 0或者发生 I/O异常。
通过对输入和输出流的 API 文档进行分析, 我们了解到读和写操作都是同步阻塞的,阻塞的时间取决于对方l/O线程的处理速度和网络 I/O的传输速度。本质上来讲,我们无法保证生产环境的网络状况和对端的应用程序能足够快,如果我们的应用程序依赖对方的处理速度,它的可靠性就非常差。也许在实验室进行的性能测试结果令人满意,但是一旦上线运行, 问题就会大量出现。
伪异步 l/O 实际上仅仅是对 I/O线程模型的一个简单优化, 它无法从根本上解決同步 I/O导致的通信线程阻塞问题。下面我们就简单分析下通信对方返回应答时间过长会引起的级联故障 。
1. 服务端处理缓慢,返回应答消息耗费60s,平时只需要10ms。
2. 采用伪异步I/O的线程正在读取故障服务节点的响应,由于读取输入流是阻塞的,因此,它将会被同步阻塞60s。
3. 假如所有的可用线程都被故障服务器阻塞,那后续所有的I/O消息都将在队列中排队。
4. 由于线程池采用阻塞队列实现,当队列积满之后,后续入队列的操作将被阻塞。
5. 由于前端只有一个Accptor线程接收客户端接入,它被阻塞在线程池的同步阻塞队列之后,新的客户端请求消息将被拒绝,客户端会发生大量的连接超时。
6. 由于几乎所有的连接都超时,调用者会认为系统已经崩溃,无法接收新的请求消息。
