目录
一、高级IO
1.1 概念
了解了网络通信相关的知识后,我们也许能直到,通信的本质就是IO,通信的核心是在两个或多个设备之间传输数据,这与计算机系统中的输入输出操作类似。
 当通信时使用接收端口例如 recv 时,系统等待网络数据包从远程服务器通过网络传输到本地机器,数据包从网卡的硬件缓冲区复制到系统内存中的应用程序缓冲区;当文件读取时,系统等待磁盘将所请求的数据读取到磁盘缓冲区中,数据从磁盘缓冲区复制到系统内存中的用户空间。
 所以换种说法,IO = 等待 + 拷贝
那么如何提高IO的效率呢?
当缩小了等待的时间后,IO的效率就会提高。 
1.2 五种IO模型
然后,从一个钓鱼的实例引入今天的主题:
 将上面的例子抽象成通信IO:
上面的五个人物分别对应了五种IO模型:
其中,前四个人都属于同步IO,即只要参与了IO的过程,那就是同步IO。田七将钓鱼的工作交给了另一个人,并没有参与IO,所以是异步IO。
阻塞 IO:在内核将数据准备好之前,系统调用会一直等待。所有的套接字,默认都是阻塞方式。阻塞 IO 是最常见的 IO 模型。
非阻塞 IO:如果内核还未将数据准备好,系统调用仍然会直接返回,并且返回 EWOULDBLOCK 错误码。非阻塞 IO 往往需要程序员循环的方式反复尝试读写文件描述符,这个过程称为轮询。这对 CPU 来说是较大的浪费,一般只有特定场景下才使用。
信号驱动 IO:内核将数据准备好的时候,使用 SIGIO 信号通知应用程序进行 IO 操作。
IO 多路转接: 虽然从流程图上看起来和阻塞 IO 类似。实际上最核心在于 IO 多路转接能够同时等待多个文件描述符的就绪状态。
异步 IO: 由内核在数据拷贝完成时,通知应用程序(而信号驱动是告诉应用程序何时可以开始拷贝数据)。
1.3 小结
任何 IO 过程中,都包含两个步骤。第一是等待,第二是拷贝。而且在实际的应用场景中,等待消耗的时间往往都远远高于拷贝的时间。让 IO 更高效,最核心的办法就是让等待的时间尽量少。
二、多路转接的老派
上面介绍的五人中,只有赵六真正实现了减少等待的时间,所以在IO中可以使用多路转接以达到高校IO,这里我们要介绍的就是多路转接中的老派—— select ,即使它有很多缺点,但是以为其出现的时间比较早,所以基本很多程序都会兼容它,一些比较古早的程序中也仍使用它。
2.1 select 的作用
select的主要作用是监视一组文件描述符,以查看其中哪些文件描述符处于可读、可写或有错误状态。当有时间就绪,就进行任务的派发。
2.2 select 的接口
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
 

 其中,除了第一个参数,后三个均为输出型参数。
用户通过传入 fd_set 来告诉系统需要帮用户关注哪些文件描述符,系统则通过修改比特位告诉用户哪些文件描述符已经满足用户条件。在位图中,位数表示fd的值,0或1表示是否就绪。
用户通过传入 timeout 来表示等待的规则,struct timeval 是一个结构体,由一个表示秒的变量与一个表示微妙的变量组合,为 select 设置等待规则,如果用户设置等待 5s ,实际花费了 3s,则返回的是剩余时间 2s
- 如果 
timeout为NULL,select将会无限等待,直到至少一个文件描述符变得可用。 - 如果 
timeout为一个有效的timeval结构,select会等待指定的时间长度。如果超时,select将返回0。 - 如果 
timeout为一个零时间的timeval结构(即tv_sec和tv_usec都为0),select会立即返回,不会等待。 
       void FD_CLR(int fd, fd_set *set);
       int  FD_ISSET(int fd, fd_set *set);
       void FD_SET(int fd, fd_set *set);
       void FD_ZERO(fd_set *set);
 
-  
FD_CLR(int fd, fd_set *set):- 作用:从文件描述符集合 
set中移除文件描述符fd。将fd在集合中的对应位清零。 - 使用场景:当不再需要监视某个文件描述符时,使用该函数将其从集合中移除。
 
 - 作用:从文件描述符集合 
 -  
FD_ISSET(int fd, fd_set *set):- 作用:检查文件描述符 
fd是否在集合set中。如果fd在集合中,则返回非零值;否则返回零。 - 使用场景:在 
select返回后,用于检测哪个文件描述符有事件发生。 
 - 作用:检查文件描述符 
 -  
FD_SET(int fd, fd_set *set):- 作用:将文件描述符 
fd添加到集合set中。将fd在集合中的对应位置为1。 - 使用场景:在调用 
select之前,将需要监视的文件描述符添加到集合中。 
 - 作用:将文件描述符 
 -  
FD_ZERO(fd_set *set):- 作用:初始化文件描述符集合 
set,将集合中所有的位清零。 - 使用场景:在使用 
fd_set之前,首先需要使用该函数初始化集合,以确保集合中的所有位都是零。 
 - 作用:初始化文件描述符集合 
 
三、select 的编写
这里以 select_echo_server 入手,来认识熟悉 select 。
因为本篇博客是关于网络的信息,所以以下所说的文件描述符与套接字都是一个意思,即 sockfd
3.1 类的预先准备
select 需要有端口号与套接字,在套接字这里,我们选择使用TCP套接字,同时将之前编写过的TCP服务端进行进一步封装,这里使用自己封装过的类,可以省去了在程序中直接对套接字的创建、初始化与监听等工作。
下面先来看一下我们封装的Socket类,
Socket.hpp:
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <sys/types.h> 
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>
#include <pthread.h>
#include <sys/types.h>
#include <memory>
#include "InetAddr.hpp"
#include "Log.hpp"
namespace socket_ns
{
    class Socket;
    const static int gbacklog = 8;
    using socket_sptr = std::shared_ptr<Socket>;
    enum
    {
        SOCKET_ERROR = 1,
        BIND_ERROR,
        LISTEN_ERROR,
        USAGE_ERROR
    };
    class Socket
    {
    public:
        virtual void CreateSocketOrDie() = 0;
        virtual void BindSocketOrDie(InetAddr &addr) = 0;
        virtual void ListenSocketOrDie() = 0;
        virtual int Accepter(InetAddr *addr) = 0;
        virtual bool Connetcor(InetAddr &addr) = 0;
        virtual void SetSocketAddrReuse() = 0;
        virtual int SockFd() = 0;
        virtual int Recv(std::string *out) = 0;
        virtual int Send(const std::string &in) = 0;
        virtual void Close() = 0;
    public:
        void BuildListenSocket(InetAddr &addr)
        {
            CreateSocketOrDie();
            SetSocketAddrReuse();
            BindSocketOrDie(addr);
            ListenSocketOrDie();
        }
        bool BuildClientSocket(InetAddr &addr)
        {
            CreateSocketOrDie();
            return Connetcor(addr);
        }
    };
    class TcpSocket : public Socket
    {
    public:
        TcpSocket(int fd = -1) : _sockfd(fd)
        {
        }
        void CreateSocketOrDie() override
        {
            // 1. 创建流式套接字
            _sockfd = ::socket(AF_INET, SOCK_STREAM, 0);
            if (_sockfd < 0)
            {
                LOG(FATAL, "socket error");
                exit(SOCKET_ERROR);
            }
            LOG(DEBUG, "socket create success, sockfd is : %d\n", _sockfd);
        }
        void BindSocketOrDie(InetAddr &addr) override
        {
            // 2. bind
            struct sockaddr_in local;
            memset(&local, 0, sizeof(local));
            local.sin_family = AF_INET;
            local.sin_port = htons(addr.Port());
            local.sin_addr.s_addr = inet_addr(addr.Ip().c_str());
            int n = ::bind(_sockfd, (struct sockaddr *)&local, sizeof(local));
            if (n < 0)
            {
                LOG(FATAL, "bind error\n");
                exit(BIND_ERROR);
            }
            LOG(DEBUG, "bind success, sockfd is : %d\n", _sockfd);
        }
        void ListenSocketOrDie() override
        {
            int n = ::listen(_sockfd, gbacklog);
            if (n < 0)
            {
                LOG(FATAL, "listen error\n");
                exit(LISTEN_ERROR);
            }
            LOG(DEBUG, "listen success, sockfd is : %d\n", _sockfd);
        }
        int Accepter(InetAddr *addr) override
        {
            struct sockaddr_in peer;
            socklen_t len = sizeof(peer);
            int sockfd = ::accept(_sockfd, (struct sockaddr *)&peer, &len);
            if (sockfd < 0)
            {
                LOG(WARNING, "accept error\n");
                return -1;
            }
            *addr = peer;
            return sockfd;
        }
        virtual bool Connetcor(InetAddr &addr)
        {
            struct sockaddr_in server;
            // 构建目标主机的socket信息
            memset(&server, 0, sizeof(server));
            server.sin_family = AF_INET;
            server.sin_port = htons(addr.Port());
            server.sin_addr.s_addr = inet_addr(addr.Ip().c_str());
            int n = connect(_sockfd, (struct sockaddr *)&server, sizeof(server));
            if (n < 0)
            {
                std::cerr << "connect error" << std::endl;
                return false;
            }
            return true;
        }
        void SetSocketAddrReuse() override
        {
            int opt = 1;
            ::setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));
        }
        int Recv(std::string *out) override
        {
            char inbuffer[4096];
            ssize_t n = ::recv(_sockfd, inbuffer, sizeof(inbuffer) - 1, 0);
            if (n > 0)
            {
                inbuffer[n] = 0;
                *out = inbuffer; // ??? +=
            }
            return n;
        }
        int Send(const std::string &in) override
        {
            int n = ::send(_sockfd, in.c_str(), in.size(), 0);
            return n;
        }
        int SockFd() override
        {
            return _sockfd;
        }
        void Close() override
        {
            if (_sockfd > -1)
                ::close(_sockfd);
        }
    private:
        int _sockfd;
    };
} 
这里还用到了之前封装的 InetAddr 类与日志宏,详情可以看下面的博客:
Linux网络——套接字与UdpServer-CSDN博客
Linux网络——TcpServer-CSDN博客
日志宏的编写与线程池的结合-CSDN博客
Log.hpp:
#pragma once
#include <iostream>
#include <fstream>
#include <cstdio>
#include <string>
#include <ctime>
#include <cstdarg>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
#include "LockGuard.hpp"
bool gIsSave = false;
const std::string logname = "log.txt";
// 1. 日志是由等级的
enum Level
{
    DEBUG = 0,
    INFO,
    WARNING,
    ERROR,
    FATAL
};
void SaveFile(const std::string &filename, const std::string &message)
{
    std::ofstream out(filename, std::ios::app);
    if (!out.is_open())
    {
        return;
    }
    out << message;
    out.close();
}
std::string LevelToString(int level)
{
    switch (level)
    {
    case DEBUG:
        return "Debug";
    case INFO:
        return "Info";
    case WARNING:
        return "Warning";
    case ERROR:
        return "Error";
    case FATAL:
        return "Fatal";
    default:
        return "Unknown";
    }
}
std::string GetTimeString()
{
    time_t curr_time = time(nullptr);
    struct tm *format_time = localtime(&curr_time);
    if (format_time == nullptr)
        return "None";
    char time_buffer[1024];
    snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d",
             format_time->tm_year + 1900,
             format_time->tm_mon + 1,
             format_time->tm_mday,
             format_time->tm_hour,
             format_time->tm_min,
             format_time->tm_sec);
    return time_buffer;
}
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
void LogMessage(std::string filename, int line, bool issave, int level, const char *format, ...)
{
    std::string levelstr = LevelToString(level);
    std::string timestr = GetTimeString();
    pid_t selfid = getpid();
    char buffer[1024];
    va_list arg;
    va_start(arg, format);
    vsnprintf(buffer, sizeof(buffer), format, arg);
    va_end(arg);
    std::string message = "[" + timestr + "]" + "[" + levelstr + "]" +
                          "[" + std::to_string(selfid) + "]" +
                          "[" + filename + "]" + "[" + std::to_string(line) + "] " + buffer;
    LockGuard lockguard(&lock);
    if (!issave)
    {
        std::cout << message;
    }
    else
    {
        SaveFile(logname, message);
    }
}
#define LOG(level, format, ...)                                                \
    do                                                                         \
    {                                                                          \
        LogMessage(__FILE__, __LINE__, gIsSave, level, format, ##__VA_ARGS__); \
    } while (0)
#define EnableFile()    \
    do                  \
    {                   \
        gIsSave = true; \
    } while (0)
#define EnableScreen()   \
    do                   \
    {                    \
        gIsSave = false; \
    } while (0)
 
InetAddr:
#pragma once
#include <iostream>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
class InetAddr
{
private:
    void GetAddress(std::string *ip, uint16_t *port)
    {
        *port = ntohs(_addr.sin_port);
        *ip = inet_ntoa(_addr.sin_addr);
    }
public:
    InetAddr(const struct sockaddr_in &addr) : _addr(addr)
    {
        GetAddress(&_ip, &_port);
    }
    InetAddr(const std::string &ip, uint16_t port) : _ip(ip), _port(port)
    {
        _addr.sin_family = AF_INET;
        _addr.sin_port = htons(_port);
        _addr.sin_addr.s_addr = inet_addr(_ip.c_str());
    }
    InetAddr()
    {}
    std::string Ip()
    {
        return _ip;
    }
    bool operator == (const InetAddr &addr)
    {
        if(_ip == addr._ip && _port == addr._port) // 方便测试
        {
            return true;
        }
        return false;
    }
    struct sockaddr_in Addr()
    {
        return _addr;
    }
    uint16_t Port()
    {
        return _port;
    }
    ~InetAddr()
    {
    }
private:
    struct sockaddr_in _addr;
    std::string _ip;
    uint16_t _port;
}; 
为了保护日志宏的线程安全,我们又使用到了之前封装的锁:
LockGuard.hpp:
#ifndef __LOCK_GUARD_HPP__
#define __LOCK_GUARD_HPP__
#include <iostream>
#include <pthread.h>
class LockGuard
{
public:
    LockGuard(pthread_mutex_t *mutex):_mutex(mutex)
    {
        pthread_mutex_lock(_mutex); // 构造加锁
    }
    ~LockGuard()
    {
        pthread_mutex_unlock(_mutex);
    }
private:
    pthread_mutex_t *_mutex;
};
#endif 
3.2 类的整体框架
我们知道 select 可以监视一组套接字,所以类的内部就需要一个数组来辅助,同时包括上面说的端口号与Tcp套接字。
#pragma once
#include <iostream>
#include <string>
#include <memory>
#include "Socket.hpp"
using namespace socket_ns;
class SelectServer
{
    const static int N = sizeof(fd_set) * 8;
    const static int defaultfd = -1;
public:
    SelectServer(uint16_t port)
        : _port(port),
          _listensock(std::make_unique<TcpSocket>())
    {
        InetAddr addr("0", _port);
        _listensock->BuildListenSocket(addr);
        for (int i = 0; i < N; i++)
        {
            _fd_array[i] = defaultfd;
        }
        _fd_array[0] = _listensock->SockFd();
    }
    ~SelectServer()
    {
    }
private:
    uint16_t _port;
    std::unique_ptr<Socket> _listensock;
    int _fd_array[N]; // 辅助数组
}; 
在初始化时,初始化端口号是必须的,紧接着根据端口号初始化InetAddr,再根据端口号创建套接字并执行监听:
 随后,要对辅助数组中的元素进行初始化,因为文件标识符不小于0,所以使用 -1 进行初始化,以后的代码只要判断数组中的该位置是否 <0 ,即可判断是否为有效的 fd 。
3.3 类的执行 Loop
在使用main函数时,只需要调用该函数就可以完成相关的操作。
这里就根据 select 的返回值进行相应的操作,比如成功、出错或超时。
int n = select(max_fd + 1, &rfds, nullptr, nullptr, nullptr);
switch (n)
{
case 0:
    LOG(INFO, "timeout, %d.%d\n", timeout.tv_sec, timeout.tv_usec);
    break;
case -1:
    LOG(ERROR, "select error...\n");
    break;
default:
    LOG(DEBUG, "Event Happen. n : %d\n", n); 
    HandlerEvent(rfds);
    break;
} 
HandlerEvent是我们后续要写的一个回调函数,select 的参数中,max_fd 与 rfds 就是我们提前要进行的工作,其中,每次 select 每次都会将已就绪的套接字添加到 rfds 中。
接下来,根据 select 的传参,我们要进行两个变量的定义:
fd_set rfds;
FD_ZERO(&rfds);
int max_fd = defaultfd;
for (int i = 0; i < N; i++)
{
    if (_fd_array[i] == defaultfd)
        continue;
    FD_SET(_fd_array[i], &rfds); // 将所有合法的fd添加到rfds中
    if (max_fd < _fd_array[i])
    {
        max_fd = _fd_array[i]; // 更新出最大的fd的值
    }
} 
首先是定义了一个 fd_set 的集合 rfds ,在select 中传入表示我们只在意读就绪的套接字。同时,当 select 不断更新已就绪的套接字,我们每次也要重新进行更新,要知道在 rfds 中保存的可能不是连续的数字,而是会自动分配当前最小的文件描述符,比如文件描述符 10 已经分配了但是用户未退,而 5 已经退了,此时再进来一个新的连接,会分配 5 而不是 11。这一步基本是使用 select 时必做的一个操作。
同时,上述的操作我们需要一直进行,每次一有新连接,价于对方给我发送数据!我们作为读事件同一处理,也就是说新连接到来等价于读事件就绪!所以我们要一直重复,把它放在 while 中:
    void Loop()
    {
        while (true)
        {
            fd_set rfds;
            FD_ZERO(&rfds);
            int max_fd = defaultfd;
            for (int i = 0; i < N; i++)
            {
                if (_fd_array[i] == defaultfd)
                    continue;
                FD_SET(_fd_array[i], &rfds); // 将所有合法的fd添加到rfds中
                if (max_fd < _fd_array[i])
                {
                    max_fd = _fd_array[i]; // 更新出最大的fd的值
                }
            }
            struct timeval timeout = {0, 0};
            int n = select(max_fd + 1, &rfds, nullptr, nullptr, nullptr);
            switch (n)
            {
            case 0:
                LOG(INFO, "timeout, %d.%d\n", timeout.tv_sec, timeout.tv_usec);
                break;
            case -1:
                LOG(ERROR, "select error...\n");
                break;
            default:
                LOG(DEBUG, "Event Happen. n : %d\n", n); 
                HandlerEvent(rfds);
                break;
            }
        }
    } 
四、Loop 中的回调函数
4.1 HandlerEvent
下面根据回调函数的逻辑画了一张简略的流程图:
首先,遍历整个类成员——存放 sockfd 的数组;其次,使用 FD_ISSET 函数来确保该sockfd已就绪;随后,判断该文件描述符是否是用户的套接字,即判断是否是TCP中的 sockfd ,若不是,才会去执行最后的回调函数。
根据 FD_ISSET 的参数,很显然我们设计的该回调函数应该有 fd_set 的集合,故需要传入 Loop() 中的 fd_set 。
    void HandlerEvent(fd_set &rfds)
    {
        for (int i = 0; i < N; i++)
        {
            if (_fd_array[i] == defaultfd)
                continue;
            if (FD_ISSET(_fd_array[i], &rfds))
            {
                if (_fd_array[i] == _listensock->SockFd())
                {
                    AcceptClient();
                }
                else
                {
                    ServiceIO(i);
                }
            }
        }
    } 
4.2 AcceptClient
如果该 sockfd 是 listen 监听到的套接字,那么服务端就需要对其进行 accept 的处理,表示服务端已经收到了来自客户端的第三次握手请求,此时的返回值就是以后要使用 select 处理的返回值。
 关于 accept 的介绍可以参考一下博客:Linux网络——TcpServer-CSDN博客
也就是说,使用 accept 后,返回的套接字信息,才是以后真正要进行处理的。所以这时候,又需要一次遍历,来为其返回值找到一个合适的位置。最后,还要判断该位置是否为合法位置,若合法才能进入数组,否则,添加失败。

    void AcceptClient()
    {
        InetAddr clientaddr;
        int sockfd = _listensock->Accepter(&clientaddr); 
        if (sockfd < 0)
            return;
        LOG(DEBUG, "Get new Link, sockfd: %d, client info %s:%d\n", sockfd, clientaddr.Ip().c_str(), clientaddr.Port());
        int pos = 1;
        for (; pos < N; pos++)
        {
            if (_fd_array[pos] == defaultfd)
                break;
        }
        if (pos == N)
        {
            ::close(sockfd); 
            LOG(WARNING, "server is full!\n");
            return;
        }
        else
        {
            _fd_array[pos] = sockfd;
            LOG(DEBUG, "%d add to select array!\n", sockfd);
        }
        LOG(DEBUG, "curr fd_array[] fd list : %s\n", RfdsToString().c_str());
    }
 

4.3 ServiceIO
当程序执行到这里的时候,基本可以判断该文件描述符是 select 要进行处理的 sockfd 了,这时定义的回调函数就可以根据要求任意定义了:
    void ServiceIO(int pos)
    {
        char buffer[1024];
        ssize_t n = ::recv(_fd_array[pos], buffer, sizeof(buffer) - 1, 0); 
        if (n > 0)
        {
            buffer[n] = 0;
            std::cout << "client say# " << buffer << std::endl;
            std::string echo_string = "[server echo]# ";
            echo_string += buffer;
            ::send(_fd_array[pos], echo_string.c_str(), echo_string.size(), 0);
        }
        else if (n == 0)
        {
            LOG(DEBUG, "%d is closed\n", _fd_array[pos]);
            ::close(_fd_array[pos]);
            _fd_array[pos] = defaultfd;
            LOG(DEBUG, "curr fd_array[] fd list : %s\n", RfdsToString().c_str());
        }
        else
        {
            LOG(DEBUG, "%d recv error\n", _fd_array[pos]);
            ::close(_fd_array[pos]);
            _fd_array[pos] = defaultfd;
            LOG(DEBUG, "curr fd_array[] fd list : %s\n", RfdsToString().c_str());
        }
    } 
其中,为了方便阅读,特意写了一个 ToString 的函数:
    std::string RfdsToString()
    {
        std::string fdstr;
        for (int i = 0; i < N; i++)
        {
            if (_fd_array[i] == defaultfd)
                continue;
            fdstr += std::to_string(_fd_array[i]);
            fdstr += " ";
        }
        return fdstr;
    }









