0
点赞
收藏
分享

微信扫一扫

【Socket学习】2. BIO(阻塞IO)

【Socket学习】2. BIO(阻塞IO)

2.1 BIO的一系列操作

2.2 一些重要概念

  • 阻塞IO 和 非阻塞IO
    这两个词是用于描述应用程序的。前者阻塞IO也就是2.1里边说的应用程序发出请求就进入阻塞状态,得到响应后结束等待;而后者发出请求后继续执行(并且使用线程一直轮询,直到有IO资源准备好了)
  • 同步IO 和 非同步IO
    这两个词是用于描述操作系统的。主要描述的是操作系统在收到应用程序请求的IO操作后,如果IO资源没有准备好,该如何处理。前者不响应应用程序的请求,直到IO资源准备好后;后者返回一个标记(好让程序和自己知道以后的数据往哪里通知),当IO资源准备好以后,再用事件机制返回给程序(这个事件机制是啥我还不知道)

2.2 BIO深入分析

在上一篇博客我们实现的TCP和UDP通信也是使用的BIO的通信方式,现在来分析一下BIO的通信方式.
BIO的问题关键**不在于是否使用了多线程去处理请求,而是在于accept()、read()的这些操作都是被阻塞的**。

模拟二十个客户端并发请求,服务器单线程

客户端守护线程(SocketClientDaemon.java)
package testBSocket;

import java.util.concurrent.CountDownLatch;

/**
 * @author Mo
 * @createTime 2022/1/16 23:33
 * @descripiton
 */
public class SocketClientDaemon {
    public static void main(String[] args) throws InterruptedException {
        Integer clientNumber = 20;
        CountDownLatch countDownLatch = new CountDownLatch(clientNumber);

        //  分别开启这20个客户端
        for (int index = 0; index < clientNumber; index ++, countDownLatch.countDown()) {
            SocketClientRequestThread client = new SocketClientRequestThread(countDownLatch, index);
            new Thread(client).start();

        }
      
        //  保证守护线程启动所有的线程后,进入等待状态
        synchronized (SocketClientDaemon.class) {
            SocketClientDaemon.class.wait();
        }
    }
}
客户端模拟请求线程(SocketClientRequestThread.java)
package testBSocket;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;

/**
 * @author Mo
 * @createTime 2022/1/16 23:35
 * @descripiton 模拟客户端发出请求, 一个线程代表一个客户端的请求
 */
public class SocketClientRequestThread implements Runnable {
    /**
     * CountDownLatch是concurrent包中提供的同步计数器
     * 当计数器数值减为0时,所有受其影响而等待的线程将会被激活。这样保证模拟并发请求的真实性
     */
    private CountDownLatch countDownLatch;
    /**
     * 线程编号
     */
    private Integer clientIndex;

    public SocketClientRequestThread() {
    }

    public SocketClientRequestThread(CountDownLatch countDownLatch, Integer clientIndex) {
        this.countDownLatch = countDownLatch;
        this.clientIndex = clientIndex;
    }

    @Override
    public void run() {
        Socket socket = null;
        OutputStream clientRequest = null;
        InputStream clientResponse = null;

        try {
            socket = new Socket("localhost", 88);
            clientRequest = socket.getOutputStream();
            clientResponse = socket.getInputStream();
            //  等待,直到SocketClientDaemon将所有线程启动完毕后,然后再让所有线程一起发送请求,模拟并发
            this.countDownLatch.await();
            clientRequest.write(("这是 " + this.clientIndex + " 号客户端发送的请求").getBytes());
            clientRequest.flush();
            System.out.println(this.clientIndex + " 号客户端请求发送完成,等待服务器返回信息");

            int maxLen = 1024;
            byte[] contextBytes = new byte[maxLen];
            int realLen;
            StringBuilder message = new StringBuilder("");
            while ((realLen = clientResponse.read(contextBytes, 0, maxLen)) != -1) {
                message.append(new String(contextBytes, 0, realLen));
            }
            System.out.println("接收到来自服务器的消息: " + message);
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        } finally {
            try {
                if (clientRequest != null) {
                    clientRequest.close();
                }
                if (clientResponse != null) {
                    clientResponse.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
单线程服务器(SocketServer1.java)
package testBSocket;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * @author Mo
 * @createTime 2022/1/17 0:02
 * @descripiton
 */
public class SocketServer1 {
    public static void main(String[] args) throws Exception {
        ServerSocket serverSocket = new ServerSocket(88);
        try {
            while (true) {
                Socket socket = serverSocket.accept();
                InputStream in = socket.getInputStream();
                OutputStream out = socket.getOutputStream();
                Integer sourcePort = socket.getPort();
                int maxLen = 2048;
                byte[] contextBytes = new byte[maxLen];
                int realLen = in.read(contextBytes, 0, maxLen);
                String message = new String(contextBytes, 0, realLen);
                System.out.println("服务器收到来自于端口" + sourcePort + "的消息: " + message);
                //  发送信息
                out.write("回发响应消息".getBytes());
                out.close();
                in.close();
                socket.close();
            }
        } finally {
            if (serverSocket != null) {
                serverSocket.close();
            }
        }
    }
}

对服务器端DEBUG走一边就会发现这实际上服务器端还是一个一个接受信息,在一调用accept()方法或read()方法便会阻塞。

多线程优化服务器端

客户端代码不变

多线程服务器端(SocketServer2.java)
package testBSocket;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * @author Mo
 * @createTime 2022/1/17 1:45
 * @descripiton
 */
public class SocketServer2 {
    public static void main(String[] args) throws Exception{
        ServerSocket serverSocket = new ServerSocket(88);

        try {
            while(true) {
                Socket socket = serverSocket.accept();
                //当然业务处理过程可以交给一个线程(这里可以使用线程池),并且线程的创建是很耗资源的。
                //最终改变不了.accept()只能一个一个接受socket的情况,并且被阻塞的情况
                SocketServerThread socketServerThread = new SocketServerThread(socket);
                new Thread(socketServerThread).start();
            }
        } catch(Exception e) {
            e.printStackTrace();
        } finally {
            if(serverSocket != null) {
                serverSocket.close();
            }
        }
    }
}

/**
 * 当然,接收到客户端的socket后,业务的处理过程可以交给一个线程来做。
 * 但还是改变不了socket被一个一个的做accept()的情况。
 * @author Mo
 */
class SocketServerThread implements Runnable {
    private Socket socket;

    public SocketServerThread (Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        InputStream in = null;
        OutputStream out = null;
        try {
            //下面我们收取信息
            in = socket.getInputStream();
            out = socket.getOutputStream();
            Integer sourcePort = socket.getPort();
            int maxLen = 1024;
            byte[] contextBytes = new byte[maxLen];
            //使用线程,同样无法解决read方法的阻塞问题,
            //也就是说read方法处同样会被阻塞,直到操作系统有数据准备好
            int realLen = in.read(contextBytes, 0, maxLen);
            //读取信息
            String message = new String(contextBytes , 0 , realLen);

            //下面打印信息
            System.out.println("服务器收到来自于端口: " + sourcePort + "的信息: " + message);

            //下面开始发送信息
            out.write("回发响应信息!".getBytes());
        } catch(Exception e) {
            e.printStackTrace();
        } finally {
            //试图关闭
            try {
                if(in != null) {
                    in.close();
                }
                if(out != null) {
                    out.close();
                }
                if(this.socket != null) {
                    this.socket.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

尽管服务器使用多线程实现了去处理这些请求(伪异步),但还是会在accept()、read()方法上被阻塞。

这就是异步IO模式,就是为了解决这样的并发性存在的

这是accept方法的源码

    /**
     * Listens for a connection to be made to this socket and accepts
     * it. The method blocks until a connection is made.
     *
     * <p>A new Socket {@code s} is created and, if there
     * is a security manager,
     * the security manager's {@code checkAccept} method is called
     * with {@code s.getInetAddress().getHostAddress()} and
     * {@code s.getPort()}
     * as its arguments to ensure the operation is allowed.
     * This could result in a SecurityException.
     *
     * @exception  IOException  if an I/O error occurs when waiting for a
     *               connection.
     * @exception  SecurityException  if a security manager exists and its
     *             {@code checkAccept} method doesn't allow the operation.
     * @exception  SocketTimeoutException if a timeout was previously set with setSoTimeout and
     *             the timeout has been reached.
     * @exception  java.nio.channels.IllegalBlockingModeException
     *             if this socket has an associated channel, the channel is in
     *             non-blocking mode, and there is no connection ready to be
     *             accepted
     *
     * @return the new Socket
     * @see SecurityManager#checkAccept
     * @revised 1.4
     * @spec JSR-51
     */
    public Socket accept() throws IOException {
        if (isClosed())
            throw new SocketException("Socket is closed");
        if (!isBound())
            throw new SocketException("Socket is not bound yet");
        Socket s = new Socket((SocketImpl) null);
        implAccept(s);
        return s;
    }

这个方法的描述里写着

监听向这个socket建立的连接并且接受socket连接。这个方法会阻塞直到连接建立完成

这涉及到了阻塞式同步IO的工作原理

服务器线程发起一个accept动作,询问操作系统是否有新的socket套件则信息从端口XXX发过来。

注意是操作系统

这说明socket套接字的IO模式支持是基于操作系统的,所以同步IP/异步IO的支持就是需要操作系统级别的了

如果操作系统发现没有套接字从指定的端口XX来,那么操作系统就会等待。这样serverSocket.accept()方法就会一直等待。这就是为甚么accept()方法会阻塞,他的内部实现是使用的操作系统的同步IO

参考文章:

  • https://www.pdai.tech/md/java/io/java-io-bio.html
举报

相关推荐

0 条评论