【Socket学习】2. BIO(阻塞IO)
2.1 BIO的一系列操作
2.2 一些重要概念
- 阻塞IO 和 非阻塞IO
这两个词是用于描述应用程序的。前者阻塞IO也就是2.1里边说的应用程序发出请求就进入阻塞状态,得到响应后结束等待;而后者发出请求后继续执行(并且使用线程一直轮询,直到有IO资源准备好了) - 同步IO 和 非同步IO
这两个词是用于描述操作系统的。主要描述的是操作系统在收到应用程序请求的IO操作后,如果IO资源没有准备好,该如何处理。前者不响应应用程序的请求,直到IO资源准备好后;后者返回一个标记(好让程序和自己知道以后的数据往哪里通知),当IO资源准备好以后,再用事件机制返回给程序(这个事件机制是啥我还不知道)
2.2 BIO深入分析
在上一篇博客我们实现的TCP和UDP通信也是使用的BIO的通信方式,现在来分析一下BIO的通信方式.
BIO的问题关键**不在于是否使用了多线程去处理请求,而是在于accept()、read()的这些操作都是被阻塞的**。
模拟二十个客户端并发请求,服务器单线程
客户端守护线程(SocketClientDaemon.java)
package testBSocket;
import java.util.concurrent.CountDownLatch;
/**
* @author Mo
* @createTime 2022/1/16 23:33
* @descripiton
*/
public class SocketClientDaemon {
public static void main(String[] args) throws InterruptedException {
Integer clientNumber = 20;
CountDownLatch countDownLatch = new CountDownLatch(clientNumber);
// 分别开启这20个客户端
for (int index = 0; index < clientNumber; index ++, countDownLatch.countDown()) {
SocketClientRequestThread client = new SocketClientRequestThread(countDownLatch, index);
new Thread(client).start();
}
// 保证守护线程启动所有的线程后,进入等待状态
synchronized (SocketClientDaemon.class) {
SocketClientDaemon.class.wait();
}
}
}
客户端模拟请求线程(SocketClientRequestThread.java)
package testBSocket;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
/**
* @author Mo
* @createTime 2022/1/16 23:35
* @descripiton 模拟客户端发出请求, 一个线程代表一个客户端的请求
*/
public class SocketClientRequestThread implements Runnable {
/**
* CountDownLatch是concurrent包中提供的同步计数器
* 当计数器数值减为0时,所有受其影响而等待的线程将会被激活。这样保证模拟并发请求的真实性
*/
private CountDownLatch countDownLatch;
/**
* 线程编号
*/
private Integer clientIndex;
public SocketClientRequestThread() {
}
public SocketClientRequestThread(CountDownLatch countDownLatch, Integer clientIndex) {
this.countDownLatch = countDownLatch;
this.clientIndex = clientIndex;
}
@Override
public void run() {
Socket socket = null;
OutputStream clientRequest = null;
InputStream clientResponse = null;
try {
socket = new Socket("localhost", 88);
clientRequest = socket.getOutputStream();
clientResponse = socket.getInputStream();
// 等待,直到SocketClientDaemon将所有线程启动完毕后,然后再让所有线程一起发送请求,模拟并发
this.countDownLatch.await();
clientRequest.write(("这是 " + this.clientIndex + " 号客户端发送的请求").getBytes());
clientRequest.flush();
System.out.println(this.clientIndex + " 号客户端请求发送完成,等待服务器返回信息");
int maxLen = 1024;
byte[] contextBytes = new byte[maxLen];
int realLen;
StringBuilder message = new StringBuilder("");
while ((realLen = clientResponse.read(contextBytes, 0, maxLen)) != -1) {
message.append(new String(contextBytes, 0, realLen));
}
System.out.println("接收到来自服务器的消息: " + message);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
} finally {
try {
if (clientRequest != null) {
clientRequest.close();
}
if (clientResponse != null) {
clientResponse.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
单线程服务器(SocketServer1.java)
package testBSocket;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @author Mo
* @createTime 2022/1/17 0:02
* @descripiton
*/
public class SocketServer1 {
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(88);
try {
while (true) {
Socket socket = serverSocket.accept();
InputStream in = socket.getInputStream();
OutputStream out = socket.getOutputStream();
Integer sourcePort = socket.getPort();
int maxLen = 2048;
byte[] contextBytes = new byte[maxLen];
int realLen = in.read(contextBytes, 0, maxLen);
String message = new String(contextBytes, 0, realLen);
System.out.println("服务器收到来自于端口" + sourcePort + "的消息: " + message);
// 发送信息
out.write("回发响应消息".getBytes());
out.close();
in.close();
socket.close();
}
} finally {
if (serverSocket != null) {
serverSocket.close();
}
}
}
}
对服务器端DEBUG走一边就会发现这实际上服务器端还是一个一个接受信息,在一调用accept()方法或read()方法便会阻塞。
多线程优化服务器端
客户端代码不变
多线程服务器端(SocketServer2.java)
package testBSocket;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @author Mo
* @createTime 2022/1/17 1:45
* @descripiton
*/
public class SocketServer2 {
public static void main(String[] args) throws Exception{
ServerSocket serverSocket = new ServerSocket(88);
try {
while(true) {
Socket socket = serverSocket.accept();
//当然业务处理过程可以交给一个线程(这里可以使用线程池),并且线程的创建是很耗资源的。
//最终改变不了.accept()只能一个一个接受socket的情况,并且被阻塞的情况
SocketServerThread socketServerThread = new SocketServerThread(socket);
new Thread(socketServerThread).start();
}
} catch(Exception e) {
e.printStackTrace();
} finally {
if(serverSocket != null) {
serverSocket.close();
}
}
}
}
/**
* 当然,接收到客户端的socket后,业务的处理过程可以交给一个线程来做。
* 但还是改变不了socket被一个一个的做accept()的情况。
* @author Mo
*/
class SocketServerThread implements Runnable {
private Socket socket;
public SocketServerThread (Socket socket) {
this.socket = socket;
}
@Override
public void run() {
InputStream in = null;
OutputStream out = null;
try {
//下面我们收取信息
in = socket.getInputStream();
out = socket.getOutputStream();
Integer sourcePort = socket.getPort();
int maxLen = 1024;
byte[] contextBytes = new byte[maxLen];
//使用线程,同样无法解决read方法的阻塞问题,
//也就是说read方法处同样会被阻塞,直到操作系统有数据准备好
int realLen = in.read(contextBytes, 0, maxLen);
//读取信息
String message = new String(contextBytes , 0 , realLen);
//下面打印信息
System.out.println("服务器收到来自于端口: " + sourcePort + "的信息: " + message);
//下面开始发送信息
out.write("回发响应信息!".getBytes());
} catch(Exception e) {
e.printStackTrace();
} finally {
//试图关闭
try {
if(in != null) {
in.close();
}
if(out != null) {
out.close();
}
if(this.socket != null) {
this.socket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
尽管服务器使用多线程实现了去处理这些请求(伪异步),但还是会在accept()、read()方法上被阻塞。
这就是异步IO模式,就是为了解决这样的并发性存在的
这是accept方法的源码
/**
* Listens for a connection to be made to this socket and accepts
* it. The method blocks until a connection is made.
*
* <p>A new Socket {@code s} is created and, if there
* is a security manager,
* the security manager's {@code checkAccept} method is called
* with {@code s.getInetAddress().getHostAddress()} and
* {@code s.getPort()}
* as its arguments to ensure the operation is allowed.
* This could result in a SecurityException.
*
* @exception IOException if an I/O error occurs when waiting for a
* connection.
* @exception SecurityException if a security manager exists and its
* {@code checkAccept} method doesn't allow the operation.
* @exception SocketTimeoutException if a timeout was previously set with setSoTimeout and
* the timeout has been reached.
* @exception java.nio.channels.IllegalBlockingModeException
* if this socket has an associated channel, the channel is in
* non-blocking mode, and there is no connection ready to be
* accepted
*
* @return the new Socket
* @see SecurityManager#checkAccept
* @revised 1.4
* @spec JSR-51
*/
public Socket accept() throws IOException {
if (isClosed())
throw new SocketException("Socket is closed");
if (!isBound())
throw new SocketException("Socket is not bound yet");
Socket s = new Socket((SocketImpl) null);
implAccept(s);
return s;
}
这个方法的描述里写着
监听向这个socket建立的连接并且接受socket连接。这个方法会阻塞直到连接建立完成
这涉及到了阻塞式同步IO的工作原理
服务器线程发起一个accept动作,询问操作系统是否有新的socket套件则信息从端口XXX发过来。
注意是操作系统。
这说明socket套接字的IO模式支持是基于操作系统的,所以同步IP/异步IO的支持就是需要操作系统级别的了
如果操作系统发现没有套接字从指定的端口XX来,那么操作系统就会等待。这样serverSocket.accept()方法就会一直等待。这就是为甚么accept()方法会阻塞,他的内部实现是使用的操作系统的同步IO
参考文章:
- https://www.pdai.tech/md/java/io/java-io-bio.html