0
点赞
收藏
分享

微信扫一扫

Keil开发IDE

早安地球 2024-07-24 阅读 46

目录

项目初始与项目演示

HTTP服务器基础认识

#include <iostream>
#include <cstring>
#include <string>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>

int main()
{
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd < 0)
    {
        perror("socket failed");
        return -1;
    }
    struct sockaddr_in server;
    server.sin_addr.s_addr = INADDR_ANY;
    server.sin_family = AF_INET;
    server.sin_port = htons(8085);
    int n = bind(sockfd, (const sockaddr*)&server, sizeof(server));
    if (n < 0)
    {
        perror("bind failed");
        return -1;
    }
    n = listen(sockfd, 5);
    if (n < 0)
    {
        perror("listen failed");
        return -1;
    }
    while (1)
    {
        int newsockfd = accept(sockfd, nullptr, nullptr);
        if (newsockfd < 0)
        {
            perror("accept error");
            continue;
        }
        char buffer[4096];
        n = recv(newsockfd, buffer, 4095, 0);
        if (n <= 0)
        {
            perror("recv failed");
            continue;
        }
        std::string body = "<html><body><h1>Hello World</h1></body></html>";
        std::string rsp = "HTTP/1.1 200 OK\r\n";
        rsp += "Content-Length: " + std::to_string(body.size()) + "\r\n";
        rsp += "Content-Type: text/html\r\n";
        rsp += "\r\n";
        rsp += body;

        n = send(newsockfd, rsp.c_str(), rsp.size(), 0);
        if (n < 0)
        {
            perror("send error");
            close(newsockfd);
        }
    }
}

Reactor模式基础认识

单Reactor单线程模式认识

单Reactor多线程模式认识

多Reactor多线程模式认识

目标定位

总体大模块划分

server模块的管理思想

Buffer子模块

Socket子模块

Channel子模块

Connection子模块

Acceptor子模块

TimerQueue子模块

Poller子模块

EventLoop子模块

TcpServer子模块

通信连接管理模块关系图

监听连接管理模块关系图

事件监控管理模块关系图

bind函数的认识与基本使用

举个例子

#include <iostream>
#include <string>
#include <functional>

int Sum(int a, int b)
{
    return a + b;
}

int main()
{
    auto func = std::bind(Sum, 1, 2);
    std::cout << func() << std::endl;
    return 0;
}
#include <iostream>
#include <string>
#include <functional>

int Sum(int a, int b)
{
    return a + b;
}

int Sum100(int b)
{
    return 100 + b;
}

int main()
{
    auto func = std::bind(Sum, 100, std::placeholders::_1);
    //此时std::bind(Sum, 100, std::placeholders::_1)就等价于Sum100
    std::cout << func(1) << std::endl;
    std::cout << func(2) << std::endl;
    std::cout << func(3) << std::endl;
    std::cout << Sum100(1) << std::endl;
    std::cout << Sum100(2) << std::endl;
    std::cout << Sum100(3) << std::endl;
    return 0;
}

#include <iostream>
#include <string>
#include <vector>
#include <functional>

int Sum(int a, int b)
{
    return a + b;
}

int Sum100(int b)
{
    return 100 + b;
}

int main()
{
    using Task = std::function<int()>;
    std::vector<Task> arry;
    arry.push_back(std::bind(Sum, 1, 2));
    arry.push_back(std::bind(Sum, 3, 4));
    arry.push_back(std::bind(Sum, 5, 6));
    arry.push_back(std::bind(Sum, 7, 8));

    for (auto& f : arry)
    {
        std::cout << f() << std::endl;
    }
    return 0;
}

timerfd的认识与基本使用

举例

#include <iostream>
#include <sys/timerfd.h>
#include <cstring>
#include <unistd.h>


int main()
{
    int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);
    if (timerfd < 0)
    {
        perror("create timerfd failed");
        return -1;
    }
    struct itimerspec itime;
    itime.it_value.tv_sec = 1;
    itime.it_value.tv_nsec = 0;
    itime.it_interval.tv_sec = 1;
    itime.it_interval.tv_nsec = 0;

    timerfd_settime(timerfd, 0, &itime, NULL);

    while (1)
    {
        uint64_t times;
        int ret = read(timerfd, &times, sizeof(times));	//每次写入固定位8字节
        if (ret < 0)
        {
            perror("read failed");
            break;
        }
        printf("距离上一次超时了%ld次\n", times); //注意是%ld,不是%d
    }
    close(timerfd);
    return 0;
}

时间轮定时器的基本思想理解

时间轮定时器的代码设计及实现

#include <unistd.h>
#include <cstdint>
#include <functional>
#include <vector>
#include <unordered_map>
#include <memory>
#include <iostream>

using TaskFunc = std::function<void()>;
using ReleaseFunc = std::function<void()>;
class TimerTask
{
private:
    uint64_t _id;   //定时器任务对象ID  定时任务必须得找得着,一个程序里定时任务可能有很多
    uint32_t _timeout;  //定时任务的超时时间
    TaskFunc _task_cb;  //定时器要执行的任务
    //用于删除TimerWheel中保存的定时器任务对象信息,定时任务释放的时候也要清理TimerWheel中保存的定时器对象信息
    //为什么将这个_release设置到TimerTask里面呢,不在TimerWheel层管理?
    //因为这个TimerWheel不知道是否某个定时任务真的释放了,而TimerTask是最清楚的,自己真的释放了就会调用析构函数
    ReleaseFunc _release;
    bool _canceled;     //false - 代表没有被取消,true - 代表取消了
public:
    TimerTask(uint64_t id, uint32_t timeout, const TaskFunc& cb)
    :_id(id), _timeout(timeout), _task_cb(cb), _canceled(false)
    {}
    ~TimerTask()
    {
        if (_canceled == false) //如果定时任务没有被取消
            _task_cb();
        _release();
    }
    void SetRelease(const ReleaseFunc& cb)
    {
        _release = cb;
    }
    uint32_t DelayTime()
    {
        return _timeout;
    }
    void Cancel()
    {
        _canceled = true;
    }
};

class TimerWheel
{
private:
    using WeakTask = std::weak_ptr<TimerTask>;
    using PtrTask = std::shared_ptr<TimerTask>;
    std::vector<std::vector<PtrTask>> _wheel;
    int _tick;      //当前的秒针,走到哪里释放哪里,就相当于执行哪里的任务
    int _capacity;  //表盘最大数量 -- 也是最大能设置的延时时间
    //为什么不用普通指针,要用weak_ptr,因为刷新定时任务的时候,需要通过该weak_ptr找到曾经shared_ptr,而普通指针则不行
    std::unordered_map<uint64_t, WeakTask> _timers;     //对所有的定时任务进行管理 
private:
    void RemoveTimer(uint64_t id)
    {
        auto pos = _timers.find(id);
        if (pos != _timers.end())
        {
            _timers.erase(pos);
        }
    }
public:
    TimerWheel():_capacity(60), _tick(0), _wheel(_capacity)
    {}
    //时间轮提供了一个功能:释放定时任务的功能
    //至于释放的任务是什么,这个组件也不知道,需要上层对内 设置回调函数
    void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc& cb) //添加定时任务
    {
        //1.构建定时任务
        PtrTask pt(new TimerTask(id, delay, cb));
        pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id));
        //2.将定时任务加入到_wheel中
        int i = (_tick + delay) % _capacity;
        _wheel[i].push_back(pt);
        //3.加入到时间轮的_timers里面
        // std::unordered_map<uint64_t, WeakTask>::iterator pos = _timers.find(id);
        auto pos = _timers.find(id);
        if (pos == _timers.end())
        {
            _timers.insert({id, pt});
        }
    }
    void TimerRefresh(uint64_t id)  //刷新/延迟定时任务
    {
        //通过id找到对应的定时任务
        auto pos = _timers.find(id);
        if (pos == _timers.end())
        {
            //如果没找到定时任务,则没办法更新
            return;
        }
        //获取到对应定时任务的shared_ptr,并构建一个新的智能指针,对应的计数加1
        PtrTask pt = pos->second.lock();
        //将对应的pt加入到_wheel中
        int delay = pt->DelayTime();
        int i = (_tick + delay) % _capacity;
        _wheel[i].push_back(pt);
    }
    //这个函数会每秒钟执行一次,相当于秒针向后走了一步
    void RunTimerTask()
    {
        _tick = (_tick + 1) % _capacity;
        _wheel[_tick].clear();  //情况指定位置的数组,就会把数组中保存的所有管理定时器对象的shared_ptr释放掉
    }
    void TimerCancel(uint64_t id)
    {
        auto pos = _timers.find(id);
        if (pos == _timers.end())
        {
            //没找到定时任务,没法刷新,没法延时
            return;
        }
        PtrTask pt = pos->second.lock();
        pt->Cancel();
    }
};

//该Test类用于测试,能清楚的观察到过程
struct Test
{
    Test()
    {
        std::cout << "Test()" << std::endl;
    }
    ~Test()
    {
        std::cout << "~Test()" << std::endl;
    }
};

//要交给时间轮的定时任务
void DelTest(Test* t)
{
    delete t;
}

int main()
{
    TimerWheel tw;
    Test* t = new Test();
    tw.TimerAdd(888, 5, std::bind(DelTest, t));
    for (int i = 0; i < 8; ++i)
    {
        sleep(1);
        tw.RunTimerTask();  //硬编码模拟时间轮走动
        std::cout << "走了:" << i + 1 << "秒" << std::endl;
    }
    while (1)
    {
        sleep(1);
        std::cout << "----------------------" << std::endl;
        tw.RunTimerTask();
    }
    return 0;
}

测试TimerRefresh功能,更新main函数

int main()
{
    TimerWheel tw;
    Test* t = new Test();
    tw.TimerAdd(888, 5, std::bind(DelTest, t));
    for (int i = 0; i < 8; ++i)
    {
        sleep(1);
        tw.RunTimerTask();  //硬编码模拟时间轮走动
        tw.TimerRefresh(888);  //每一秒都刷新定时任务
        std::cout << "走了:" << i + 1 << "秒" << std::endl;
    }
    while (1)
    {
        sleep(1);
        std::cout << "----------------------" << std::endl;
        tw.RunTimerTask();
    }
    return 0;
}

测试TimerCancel功能,更新main函数

int main()
{
    TimerWheel tw;
    Test* t = new Test();
    tw.TimerAdd(888, 5, std::bind(DelTest, t));
    for (int i = 0; i < 8; ++i)
    {
        sleep(1);
        tw.RunTimerTask();  //硬编码模拟时间轮走动
        tw.TimerRefresh(888);  //每一秒都刷新定时任务
        std::cout << "走了:" << i + 1 << "秒" << std::endl;
    }
    tw.TimerCancel(888);
    while (1)
    {
        sleep(1);
        std::cout << "----------------------" << std::endl;
        tw.RunTimerTask();
    }
    return 0;
}

正则表达式基本认识

举例:

#include <iostream>
#include <string>
#include <regex>


int main()
{
    std::string str = "/numbers/1234";  //现在我们想提取出里面的数字字符串
    std::regex e("/numbers/(\\d+)");    //括号表示提取数字
    /*  \d在正则表达式里表示数字
        +表示匹配前面子表达式一次或多次
        转的时候:第一步:字符串转义。第二步:正则表达式转义
        \d在字符串里表示对d字符进行转义 \\在字符串里表示\,所以在字符串里\\d才表示正则表达式的\d
    */
    std::smatch matches;
    bool ret = std::regex_match(str, matches, e);
    if (ret == false)
    {
        return -1;  //字符串整体匹配失败就返回false
    }
    for (auto& s : matches)
    {
        //因为我们在匹配的时候,首先是一个整体的规则匹配,看整体的是否匹配成功,首先匹配到的肯定是原始字符串,
        //所以首先存储了原始字符串,然后再去存储我们想要提取出来的字符串。所以首先打印的是原始字符串
        std::cout << s << std::endl;
    }
    return 0;
}

正则表达式提取HTTP请求行

#include <iostream>
#include <string>
#include <regex>

int main()
{
    std::string str = "GET /helloworld/login?user=xiaoming&passwd=123123 HTTP/1.1";
    std::smatch matches;
    std::regex e("(GET|HEAD|PUT|POST|DELETE) (/.*) HTTP/(1.[01])");
    /*
        (POST|GET):|表示匹配任一字符串并提取出来,使用|要加()
        .点:表示匹配除\n和\r之外的任何单个字符
        *:表示匹配前面子表达式任意次,可0次
        [01]:表示匹配0或1的任一字符
    */
    bool ret = regex_match(str, matches, e);
    if (ret == false)
    {
        std::cerr << "匹配失败" << std::endl;
        return -1;
    }
    for (auto match : matches)
    {
        std::cout << match << std::endl;
    }
    return 0;
}
#include <iostream>
#include <string>
#include <regex>

int main()
{
    std::string str = "GET /helloworld/login?user=xiaoming&passwd=123123 HTTP/1.1";
    std::smatch matches;
    // std::regex e("(GET|HEAD|PUT|POST|DELETE) (/.*) HTTP/(1.[01])");
    /*
        (POST|GET):|表示匹配任一字符串并提取出来,使用|要加()
        .点:表示匹配除\n和\r之外的任何单个字符
        *:表示匹配前面子表达式任意次,可0次
        [01]:表示匹配0或1的任一字符
    */
    std::regex e("(GET|HEAD|PUT|POST|DELETE) (/[^?]*(?://?(.*))?) HTTP/(1.[01])");
    /*
        [^?]:表示匹配非问号的单个字符
        (?:……):表示匹配某个字符串,但是不提取,字符串内有()则表示提取
        ?:表示匹配前面的子表达式1次或0次
        因为?在正则表达式里面有特殊含义,所以需要转义为/?,但/?在字符串里有特殊含义,所以需要将/先转义,//会被转义为/,所以最后写为//?表示匹配一个?字符
    */
    bool ret = regex_match(str, matches, e);
    if (ret == false)
    {
        std::cerr << "匹配失败" << std::endl;
        return -1;
    }
    for (auto match : matches)
    {
        std::cout << match << " size: " << match.str().size() << std::endl;
    }
    return 0;
}
#include <iostream>
#include <string>
#include <regex>

int main()
{
    // std::string str = "GET /helloworld/login?user=xiaoming&passwd=123123 HTTP/1.1";
    std::string str = "GET /helloworld/login HTTP/1.1";
    std::smatch matches;
    // std::regex e("(GET|HEAD|PUT|POST|DELETE) (/.*) HTTP/(1.[01])");
    /*
        (POST|GET):|表示匹配任一字符串并提取出来,使用|要加()
        .点:表示匹配除\n和\r之外的任何单个字符
        *:表示匹配前面子表达式任意次,可0次
        [01]:表示匹配0或1的任一字符
    */
    std::regex e("(GET|HEAD|PUT|POST|DELETE) (/[^?]*(?://?(.*))?) HTTP/(1.[01])");
    /*
        [^?]:表示匹配非问号的单个字符
        (?:……):表示匹配某个字符串,但是不提取,字符串内有()则表示提取
        ?:表示匹配前面的子表达式1次或0次
        因为?在正则表达式里面有特殊含义,所以需要转义为/?,但/?在字符串里有特殊含义,所以需要将/先转义,//会被转义为/,所以最后写为//?表示匹配一个?字符
    */
    bool ret = regex_match(str, matches, e);
    if (ret == false)
    {
        std::cerr << "匹配失败" << std::endl;
        return -1;
    }
    for (auto match : matches)
    {
        std::cout << match << " size: " << match.str().size() << std::endl;
    }
    return 0;
}

通用类型容器Any类设计思想

#include <iostream>
#include <string>

template<class T>
class Any
{
private:
    T _content;
};

int main()
{
    Any a;
    a = 100;
    a = std::string("string");
    return 0;
}
#include <iostream>
#include <string>
#include <typeinfo>
#include <utility>


class Any
{
public:
    class holder
    {
    public:
        virtual ~holder()
        {}
        virtual const std::type_info& type() = 0;  //获取当前子类的数据类型 -- 返回类型是const type_info&
        virtual holder* clone() = 0;        //针对当前的对象自身,克隆出一个新的子类对象
    };

    template<class T>
    class placeholder : public holder
    {
    public:
        placeholder(const T& val = T()):_val(val)
        {}
        virtual const std::type_info& type()
        {
            return typeid(T);
        }
        virtual holder* clone()
        {
            return new placeholder(_val);
        }
        T _val;
    };

    Any():_content(NULL)
    {}
    template <class T>
    Any(const T& content):_content(new placeholder<T>(content))
    {}
    ~Any()
    {
        delete _content;
    }
    Any& swap(Any& other)
    {
        std::swap(_content, other._content);
        return *this;
    }
    template<class T>
    Any& operator=(const T& val)
    {
        Any(val).swap(*this);   //这样写的好处:Any(val)为临时对象,交换完生命周期就到了,就会调用自己的析构函数
        return *this;
    }
    Any& operator=(const Any& other)
    {
        Any(other).swap(*this);
        return *this;
    }

    template<class T>
    T* get()    //返回子类对象保存数据的指针
    {
        if (typeid(T) != _content->type())  //如果你要的类型和我保存的类型不匹配
            return NULL;
        return &(((placeholder<T>*)_content)->_val);
    }
public:
    holder* _content;
};

class Test
{
public:
    Test()
    {
        std::cout << "Test()" << std::endl;
    }
    Test(const Test& t)
    {
        std::cout << "Test()" << std::endl;
    }
    ~Test()
    {
        std::cout << "~Test()" << std::endl;
    }
};

int main()
{
    Any a;
    {
        Test t;
        a = t;
    }
    a = 10;
    int *pa = a.get<int>();
    std::cout << *pa << std::endl;
    a = std::string("nihao");
    std::string *ps = a.get<std::string>();
    std::cout << *ps << std::endl;
    return 0;
}
#include <iostream>
#include <string>
#include <typeinfo>
#include <utility>
#include <any>

int main()
{
    //官方any的使用方法,主要还是要查文档
    std::any a;
    a = 10;
    int* pi = std::any_cast<int>(&a);
    std::cout << *pi << std::endl;

    a = std::string("hello");
    std::string* ps = std::any_cast<std::string>(&a);
    std::cout << *ps << std::endl;
    return 0;
}

Buffer缓冲区设计思想

#include <iostream>
#include <vector>
#include <cstdint>
#include <cassert>
#include <string>
#include <cstring>
#include <memory>


#define BUFFER_DEFAULT_SIZE 1024
class Buffer
{
private:
    std::vector<char> _buffer;
    uint64_t _read_idx;     //相对读偏移量
    uint64_t _write_idx;    //相对写偏移量
public:
    Buffer()
    :_read_idx(0), _write_idx(0), _buffer(1024)
    {}
    //获取起始地址
    char* Begin() const
    {
        return (char*)&_buffer[0];
    }
    //获取当前写入起始地址
    char* WritePosition() const
    {
        return Begin() + _write_idx;
    }
    //获取当前读取起始地址
    char* ReadPosition() const
    {
        return Begin() + _read_idx;
    }
    //获取缓冲区末尾空闲空间大小 -- 写偏移之后的空闲空间
    uint64_t TailIdleSize() const
    {
        return _buffer.size() - _write_idx;
    }
    //获取缓冲区起始空闲空间大小 -- 读偏移之前的空闲空间
    uint64_t HeadIdleSize() const
    {
        return _read_idx;
    }
    //获取可读数据大小
    uint64_t ReadAbleSize() const
    {
        return _write_idx - _read_idx;
    }
    //将读偏移向后移动,必须小于可读数据大小
    void MoveReadOffset(uint64_t len)
    {
        assert(len <= ReadAbleSize());
        _read_idx += len;
    }
    //将写偏移向后移动,向后移动的大小必须小于前沿和后沿的空闲空间的大小
    void MoveWriteOffset(uint64_t len)
    {
        assert(len <= HeadIdleSize() + TailIdleSize());
        _write_idx += len;
    }
    //确保可写空间足够(整体空闲空间够了就移动数据,否则就扩容)
    void EnsureWriteSpace(uint64_t len)
    {
        if (TailIdleSize() > len)   //如果末尾空闲空间大小足够,直接返回
            return;
        //末尾空间不够,则判断加上起始空闲位置的空闲空间大小是否足够
        if (len <= TailIdleSize() + HeadIdleSize())
        {
            uint64_t rsz = ReadAbleSize();  //把当前数据大小先保存起来
            //copy:第一个参数,要拷贝的起始地址。第二个参数,要拷贝的末尾地址。第三个参数:拷贝到某个地址上面
            std::copy(ReadPosition(), ReadPosition() + rsz, Begin());
            _read_idx = 0;      //将读偏移归0
            _write_idx = rsz;   //将写偏移置为可读数据大小
        }
        else    //总体空间不够,扩容
        {
            _buffer.resize(_write_idx + len);
        }
    }
    //写入数据
    void Write(const void* data, uint64_t len)
    {
        //1。保证有足够空间 2。拷贝数据进去
        if (len == 0)   //防御性编程,不要嫌弃多次判断
            return;
        EnsureWriteSpace(len);
        std::copy((char*)data, (char*)data + len, WritePosition());
    }
    void WriteAndPush(const void* data, uint64_t len)
    {
        Write(data, len);
        MoveWriteOffset(len);
    }
    void WriteString(const std::string& data)
    {
        Write((const void*)data.c_str(), data.size());
    }
    void WriteStringAndPush(const std::string& data)
    {
        WriteString(data);
        MoveWriteOffset(data.size());
    }
    void WriteBuffer(Buffer& data)
    {
        Write((const void*)data.ReadPosition(), data.ReadAbleSize());
    } 
    void WriteBufferAndPush(Buffer& data)
    {
        WriteBuffer(data);
        MoveWriteOffset(data.ReadAbleSize());
    }
    //读取数据
    void Read(void* buf, uint64_t len)
    {
        //要求读取的数据大小必须小于可读数据的大小
        assert(len <= ReadAbleSize());
        std::copy(ReadPosition(), ReadPosition() + len, (char*)buf);
    }
    void ReadAndPop(void* buf, uint64_t len)
    {
        Read(buf, len);
        MoveReadOffset(len);
    }
    std::string ReadAsString(uint64_t len)
    {
        assert(len <= ReadAbleSize());
        std::string str;
        str.resize(len);
        Read(&str[0], len);
        return str;
    }
    std::string ReadAsStringAndPop(uint64_t len)
    {
        std::string str = ReadAsString(len);
        MoveReadOffset(len);
        return str;
    }
    char* FindCRLF()    //查找回车字符的地址
    {
        //查找某一个字节/字符
        char* res = (char*)memchr(ReadPosition(), '\n', ReadAbleSize());
        return res;
    }
    //获取一行数据
    std::string GetLine()
    {
        const char* pos = FindCRLF();
        if (pos == NULL)
            return "";
        return ReadAsString(pos - ReadPosition() + 1);  //包括将'\n'也读取进去 -- 方便后面对HTTP协议的操作
    }
    std::string GetLineAndPop()
    {
        std::string str = GetLine();
        MoveReadOffset(str.size());
        return str;
    }
    //清空缓冲区
    void Clear()
    {
        //只需要将偏移量归0即可
        _read_idx = 0;
        _write_idx = 0;
    }
};

测试1:

#include "../source/server.hpp"

int main()
{
    Buffer buf;
    std::string str = "hello!!";
    buf.WriteStringAndPush(str);

    std::string tmp;
    tmp = buf.ReadAsStringAndPop(buf.ReadAbleSize());
    std::cout << tmp << std::endl;
    std::cout << buf.ReadAbleSize() << std::endl;
    return 0;
}

测试2:

#include "../source/server.hpp"


int main()
{
    Buffer buf;
    std::string str = "hello!!";
    buf.WriteStringAndPush(str);

    Buffer buf1;
    buf1.WriteBufferAndPush(buf);

    std::string tmp;
    tmp = buf1.ReadAsStringAndPop(buf1.ReadAbleSize());

    std::cout << tmp << std::endl;
    std::cout << buf.ReadAbleSize() << std::endl;
    std::cout << buf1.ReadAbleSize() << std::endl;
    return 0;
}

测试3:

#include "../source/server.hpp"


int main()
{
    Buffer buf;
    for (int i = 0; i < 300; ++i)
    {
        std::string str = "hello!!" + std::to_string(i) + '\n';
        buf.WriteStringAndPush(str);
    }
    while (buf.ReadAbleSize() > 0)
    {
        std::string line = buf.GetLineAndPop();
        std::cout << line;
    }
    return 0;
}

日志打印宏的编写

#include <stdio.h>
#include <iostream>
#include <string>

#define LOG(msg) fprintf(stdout, "[%s:%d] %s\n", __FILE__, __LINE__, msg);

int main()
{
    for (int i = 0; i < 8; ++i)
    {
        LOG("hello world");
    }
    return 0;
}
#include <stdio.h>
#include <iostream>
#include <string>

#define LOG(format, ...) fprintf(stdout, "[%s:%d] " format "\n", __FILE__, __LINE__, __VA_ARGS__);

int main()
{
    for (int i = 0; i < 8; ++i)
    {
        LOG("hello world, %d", i + 1);
    }
    return 0;
}
#include <stdio.h>
#include <iostream>
#include <string>

#define LOG(format, ...) fprintf(stdout, "[%s:%d] " format "\n", __FILE__, __LINE__, ##__VA_ARGS__);

int main()
{
    for (int i = 0; i < 8; ++i)
    {
        LOG("hello world");
    }
    return 0;
}
#include <stdio.h>
#include <iostream>
#include <string>
#include <ctime>
#include <unistd.h>

//技巧:加上do while循环能应用于代码的各种情况
//宏里面不能有换行,所以加上\转义换行,代表后面没有换行
#define LOG(format, ...)                                                                        \
    do                                                                                          \
    {                                                                                           \
    time_t t = time(NULL);                                                                      \
    struct tm* lltime = localtime(&t);                                                          \
    char time[32];                                                                              \
    strftime(time, 31, "%H:%M:%S", lltime);                                                     \
    fprintf(stdout, "[%s %s:%d] " format "\n", time, __FILE__, __LINE__, ##__VA_ARGS__);        \
    } while (0)

int main()
{
    for (int i = 0; i < 8; ++i)
    {
        sleep(1);
        LOG("hello world");
    }
    return 0;
}
#include <stdio.h>
#include <iostream>
#include <string>
#include <ctime>
#include <unistd.h>

//技巧:加上do while循环能应用于代码的各种情况
//宏里面不能有换行,所以加上\转义换行,代表后面没有换行
#define INF 0
#define DBG 1
#define ERR 2
#define LOG_LEVEL DBG
#define LOG(level, format, ...)                                                                 \
    do                                                                                          \
    {                                                                                           \
    if (LOG_LEVEL > level)                                                                      \
        break;                                                                                  \
    time_t t = time(NULL);                                                                      \
    struct tm* lltime = localtime(&t);                                                          \
    char time[32];                                                                              \
    strftime(time, 31, "%H:%M:%S", lltime);                                                     \
    fprintf(stdout, "[%p %s %s:%d] " format "\n",(void*)pthread_self(), time, __FILE__, __LINE__, ##__VA_ARGS__);        \
    } while (0)
#define INF_LOG(format, ...) LOG(INF, format, ##__VA_ARGS__)// 使用不定参的...只能声明定义的时候用,使用的时候用__VA_ARGS__
#define DBG_LOG(format, ...) LOG(DBG, format, ##__VA_ARGS__)
#define ERR_LOG(format, ...) LOG(ERR, format, ##__VA_ARGS__)
int main()
{
    for (int i = 0; i < 2; ++i)
    {
        sleep(1);
        INF_LOG("hello world1");
        DBG_LOG("hello world2");
        ERR_LOG("hello world3");
    }
    return 0;
}

Socket套接字类设计思想

#define MAX_LISTEN 1024
class Socket
{
private:
    int _sockfd;
public:
    Socket()
    :_sockfd(-1)
    {}
    Socket(int fd)
    :_sockfd(fd)
    {}
    ~Socket()
    {
        Close();
    }
    int Fd()
    {
        return _sockfd;
    }
    //创建套接字
    bool Create()
    {
        int ret = socket(AF_INET, SOCK_STREAM, 0);
        if (ret < 0)
        {
            ERR_LOG("CREATE SOCKET FAILED:%s", strerror(errno));
            return false;
        }
        _sockfd = ret;
        return true;
    }
    //绑定地址信息
    bool Bind(const std::string& ip, uint16_t port)
    {
        struct sockaddr_in addr;
        addr.sin_addr.s_addr = inet_addr(ip.c_str());
        addr.sin_family = AF_INET;
        addr.sin_port = htons(port);
        int ret = bind(_sockfd, (const sockaddr*)&addr, sizeof(addr));
        if (ret < 0)
        {
            ERR_LOG("BIND SOCKET FAILED:%s", strerror(errno));
            return false;
        }
        return true;
    }
    //开始监听
    bool Listen(int backlog = MAX_LISTEN)
    {
        int ret = listen(_sockfd, backlog);
        if (ret < 0)
        {
            ERR_LOG("LISTEN SOCKET FAILED:%s", strerror(errno));
            return false;
        }
        return true;
    }
    //向服务器发起连接
    bool Connect(const std::string& ip, uint16_t port)
    {
        struct sockaddr_in addr;
        addr.sin_addr.s_addr = inet_addr(ip.c_str());
        addr.sin_port = htons(port);
        addr.sin_family = AF_INET;
        int ret = connect(_sockfd, (const sockaddr*)&addr, sizeof(addr));
        if (ret < 0)
        {
            ERR_LOG("CONNECT SERVER FAILED:%s", strerror(errno));
            return false;
        }
        return true;

    }
    //获取新连接
    int Accept()
    {
        int newfd = accept(_sockfd, NULL, NULL);
        if (newfd < 0)
        {
            ERR_LOG("ACCEPT SOCKET FAILED:%s", strerror(errno));
            return -1;
        }
        return newfd;
    }
    //接收数据
    ssize_t Recv(void* Buffer, size_t len, int flag = 0)
    {
        int n = recv(_sockfd, Buffer, len, flag);
        if (n == 0)
        {
            //等于0的时候表示连接断开
            DBG_LOG("CONNECTION CLOSED");
            return -1;
        }
        if (n < 0)//小于0的时候表示读出错了
        {
            if (errno == EAGAIN || errno == EINTR)
            {
                //EAGAIN    表示当前socket的接收缓冲区中没有数据,在非阻塞情况下才会出现这种错误
                //EINTR     表示当前socket的阻塞等待被信号打断了
                return 0;   //表示这次没有接收到数据
            }
            ERR_LOG("SOCKET RECV FAILED");
            return -1;
        }
        return n;
    }
    //非阻塞读取
    ssize_t NonBlockRecv(void* buffer, size_t len)
    {
        return Recv(buffer, len, MSG_DONTWAIT);
    }
    //发送数据 -- 外部可以根据实际发送的数据长度来决定下一步的处理
    ssize_t Send(const void* buf, size_t len, int flag = 0)
    {
        int n = send(_sockfd, buf, len, flag);
        if (n < 0)
        {
            if (errno == EAGAIN || errno == EINTR)
            {
                return 0;
            }
            ERR_LOG("SOCKET SEND FAILED");
            return -1;
        }
        return n;
    }
    //非阻塞发送数据
    ssize_t NonBlockSend(void* buf, size_t len)
    {
        return send(_sockfd, buf, len, MSG_DONTWAIT);
    }
    //关闭套接字
    void Close()
    {
        if (_sockfd != -1)
            close(_sockfd);
        _sockfd = -1;
    }
    //创建一个服务器连接
    bool CreateServer(uint16_t port, const std::string& ip = "0.0.0.0", bool block_flag = false)
    {
        //创建套接字
        if (Create() == false)
            return false;
        if (block_flag == true)
            NonBlock();
        //设置端口复用
        ReuseAddress();
        //绑定地址
        if (Bind(ip, port) == false)
            return false;
        //开始监听
        if (Listen() == false)
            return false;
        return true;
    }
    //创建一个客户端连接
    bool CreateClient(uint16_t port, const std::string ip)
    {
        //创建套接字
        if (Create() == false)
            return false;
        //连接服务器
        if (Connect(ip, port) == false)
            return false;
        return true;
    }
    /*
        一个连接绑定了地址和端口之后一旦主动关闭连接的一方最终会进入time_wait状态,这时候套接字
        并不会立即被释放,因此IP地址和端口依然被占用,导致我们无法立即去使用它,在服务器使用的时候,
        崩溃了,退出了,会无法立即重启,所以我们要开启地址重用。
    */
    //设置套接字选项 -- 开启端口复用
    void ReuseAddress()
    {
        int opt = 1;
        setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT | SO_REUSEADDR, &opt, sizeof(opt));
    }
    /*
        我们在使用套接字去接收数据的时候,一次性可能取不完数据,就需要循环去接收缓冲区里的数据,什么时候取完呢?
        就是取到没有数据为止,但是套接字默认是阻塞的,没有数据的时候再去取就会被阻塞住,程序就
        无法继续往下走了,所以我们需要将套接字设置为非阻塞
    */
    //设置套接字阻塞属性 -- 设置为非阻塞
    void NonBlock()
    {
        int flag = fcntl(_sockfd, F_GETFL, 0);
        fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);
    }
};

测试:

#include "../source/server.hpp"

int main()
{
    Socket lst_sock;
    lst_sock.CreateServer(8085);
    while (1)
    {
        int newfd = lst_sock.Accept();
        if (newfd < 0)
        {
            continue;
        }
        DBG_LOG("ACCEPT NEW SOCKET SUCCESS");
        Socket cli_sock(newfd);
        char buf[4096] = {0};
        int n = cli_sock.Recv(buf, 4095);
        if (n < 0)
        {
            //说明读取出错
            cli_sock.Close();
            continue;
        }
        cli_sock.Send(buf, n);
        DBG_LOG("CLOSE CLIENT");
        cli_sock.Close();
    }
    lst_sock.Close();
    return 0;
}
#include "../source/server.hpp"

int main()
{
    Socket cli_socket;
    cli_socket.CreateClient(8085, "127.0.0.1");
    std::string str = "hello world!!!";
    cli_socket.Send(str.c_str(), str.size());
    char buf[4096] = {0};
    cli_socket.Recv(buf, 4095);
    DBG_LOG("%s", buf);
    return 0;
}

Channel事件管理类设计思想

事件说明
EPOLLIN可读
EPOLLOUT可写
EPOLLEDHUP连接断开
EPOLLPRI优先数据
EPOLLERR错误
EPOLLHUP挂断
class Channel
{
private:
    uint32_t _events;   //当前需要监控的事件
    uint32_t _revents;  //当前连接触发的事件
    using EventCallback = std::function<void()>;
    /*
        只有我们的连接才知道,一旦事件触发了该去怎么处理,所以需要设置回调函数。
        当启动读事件监控,就需要将channel挂到EventLoop上面进行事件监控,
        当可读事件触发,就会调用channel里设置的回调函数。
    */
    EventCallback _read_callback;   //可读事件被触发的回调函数
    EventCallback _write_callback;  //可写事件被触发的回调函数
    EventCallback _error_callback;  //错误事件被触发的回调函数
    EventCallback _close_callback;  //连接断开事件被触发的回调函数
    EventCallback _event_callback;  //任意事件被触发的回调函数
    int _fd;
public:
    Channel(int fd)
    :_fd(fd), _events(0), _revents(0)
    {}
    int Fd()
    {
        return _fd;
    }
    void SetREvents(uint32_t events)
    {
        _revents = events;
    }
    void SetReadCallback(const EventCallback& cb)
    {
        _read_callback = cb;
    }
    void SetWriteCallback(const EventCallback& cb)
    {
        _write_callback = cb;
    }
    void SetErrorCallback(const EventCallback& cb)
    {
        _error_callback = cb;
    }
    void SetCloseCallback(const EventCallback& cb)
    {
        _close_callback = cb;
    }
    void SetEventCallback(const EventCallback& cb)
    {
        _event_callback = cb;
    }
    //当前是否监控了可读
    bool ReadAble()
    {
        return (_events & EPOLLIN);
    }
    //当前是否监控了可写
    bool WriteAble()   
    {
        return (_events & EPOLLOUT);
    }
    //启动读事件监控
    void EnableRead()
    {
        _events |= EPOLLIN;
        //后边会添加到EventLoop的事件监控中,暂时不写,因为EventLoop模块还没有写
    }
    //启动可写事件监控
    void EnableWrite()
    {
        _events |= EPOLLOUT;
        //后边会添加到EventLoop的事件监控中,暂时不写,因为EventLoop模块还没有写
    }
    //关闭读事件监控
    void DisableRead()
    {
        _events &= ~EPOLLIN;
        //后边会修改在EventLoop的事件监控中,暂时不写,因为EventLoop模块还没有写
    }
    //关闭写事件监控
    void DisableWrite()
    {
        _events &= ~EPOLLOUT;
        //后边会修改在EventLoop的事件监控中,暂时不写,因为EventLoop模块还没有写
    }
    //关闭所有事件监控
    void DisableAll()
    {
        _events &= 0;
    }
    /*
        关闭事件监控,只是不去关心这个事件了,但还是在EventLoop中。
        移除事件监控,才是真正的将它从EventLoop中移除
    */
   //移除监控
    void Remove()
    {
        //后边会调用EventLoop接口来移除监控,暂时不写,因为EventLoop模块还没有写
    }
    /*
        EventLoop不用 你触发了什么事件我就去调用对应的回调函数,EventLoop不用管。
        EventLoop只管你触发了事件,我就调用你的HandleEvent,你自己来决定什么样的事件该如何处理
        这是最能体现Channel模块作用的功能之一
    */
    void HandleEvent()
    {
        if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI))
        {
            if (_read_callback)
                _read_callback();
        }
        if (_revents & EPOLLOUT)
        {
            if (_write_callback)
                _write_callback();
        }
        if (_revents & EPOLLERR)
        {
            if (_error_callback)
                _error_callback();
        }
        if (_revents & EPOLLHUP)
        {
            if (_close_callback)
                _close_callback();
        }
        if (_event_callback)
            _event_callback();
    }
};

Poller描述符监控类设计思想

#define MAX_EPOLLEVENTS 1024
class Poller
{
private:
    int _epfd;
    struct epoll_event _evs[MAX_EPOLLEVENTS];
    std::unordered_map<int, Channel*> _channels;    //这个和epoll里面的所有文件描述符是强绑定的
private:
    //对epoll的直接操作
    void Update(Channel* channel, int op)
    {
        int fd = channel->Fd();
        struct epoll_event ev;
        ev.data.fd = fd;
        ev.events = channel->Events();
        int ret = epoll_ctl(_epfd, op, fd, &ev);
        if (ret < 0)
        {
            ERR_LOG("EPOLLCTL FAILED");
        }
        return;
    }
    //判断一个Channel是否已经添加了事件监控
    bool HasChannel(Channel* channel)
    {
        auto pos = _channels.find(channel->Fd());
        if (pos == _channels.end())
        {
            return false;
        }
        return true;
    }
public:
    Poller()
    {
        _epfd = epoll_create(MAX_EPOLLEVENTS);
        if (_epfd < 0)
        {
            ERR_LOG("EPOLL CREATE TAILED!");
            abort();    //退出程序
        }
    }
    //添加或修改监控事件
    void UpdateEvent(Channel* channel)
    {
        bool ret = HasChannel(channel);
        if (ret == false)
        {
            _channels.insert(std::make_pair(channel->Fd(), channel));
            return Update(channel, EPOLL_CTL_ADD);
        }
        return Update(channel, EPOLL_CTL_MOD);
    }
    //移除监控
    void RemoveEvent(Channel* channel)
    {
        bool ret = HasChannel(channel);
        if (ret == false)
        {
            return;
        }
        _channels.erase(channel->Fd());
        return Update(channel, EPOLL_CTL_DEL);
    }
    //开始监控,返回活跃连接
    void Poll(std::vector<Channel*>* active)
    {
        int n = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1);
        if (n < 0)
        {
            if (errno == EINTR) //如果被信号打断了
            {
                return;
            }
            ERR_LOG("EPOLL WAIT FAILED:%s", strerror(errno));
            abort();
        }
        for (int i = 0; i < n; ++i)
        {
            auto it = _channels.find(_evs[i].data.fd);
            assert(it != _channels.end());
            it->second->SetREvents(_evs[i].events);
            active->push_back(it->second);
        }
    }
};

Poller模块与Channel模块整合与测试

更新Poller模块和Channel模块

class Poller;

class Channel
{
private:
    Poller* _poller;
    uint32_t _events;   //当前需要监控的事件
    uint32_t _revents;  //当前连接触发的事件
    using EventCallback = std::function<void()>;
    /*
        只有我们的连接才知道,一旦事件触发了该去怎么处理,所以需要设置回调函数。
        当启动读事件监控,就需要将channel挂到EventLoop上面进行事件监控,
        当可读事件触发,就会调用channel里设置的回调函数。
    */
    EventCallback _read_callback;   //可读事件被触发的回调函数
    EventCallback _write_callback;  //可写事件被触发的回调函数
    EventCallback _error_callback;  //错误事件被触发的回调函数
    EventCallback _close_callback;  //连接断开事件被触发的回调函数
    EventCallback _event_callback;  //任意事件被触发的回调函数
    int _fd;
public:
    Channel(Poller* poller, int fd)
    :_poller(poller), _fd(fd), _events(0), _revents(0)
    {}
    int Fd()
    {
        return _fd;
    }
    uint32_t Events()
    {
        return _events;
    }
    void SetREvents(uint32_t events)
    {
        _revents = events;
    }
    void SetReadCallback(const EventCallback& cb)
    {
        _read_callback = cb;
    }
    void SetWriteCallback(const EventCallback& cb)
    {
        _write_callback = cb;
    }
    void SetErrorCallback(const EventCallback& cb)
    {
        _error_callback = cb;
    }
    void SetCloseCallback(const EventCallback& cb)
    {
        _close_callback = cb;
    }
    void SetEventCallback(const EventCallback& cb)
    {
        _event_callback = cb;
    }
    //当前是否监控了可读
    bool ReadAble()
    {
        return (_events & EPOLLIN);
    }
    //当前是否监控了可写
    bool WriteAble()   
    {
        return (_events & EPOLLOUT);
    }
    //启动读事件监控
    void EnableRead()
    {
        _events |= EPOLLIN;
        Update();//因为EventLoop还没有实现,为了测试Channel和Poller,先暂时直接用Poller里的接口进行操作
    }
    //启动可写事件监控
    void EnableWrite()
    {
        _events |= EPOLLOUT;
        Update();//因为EventLoop还没有实现,为了测试Channel和Poller,先暂时直接用Poller里的接口进行操作
    }
    //关闭读事件监控
    void DisableRead()
    {
        _events &= ~EPOLLIN;
        Update();//因为EventLoop还没有实现,为了测试Channel和Poller,先暂时直接用Poller里的接口进行操作
    }
    //关闭写事件监控
    void DisableWrite()
    {
        _events &= ~EPOLLOUT;
        Update();//因为EventLoop还没有实现,为了测试Channel和Poller,先暂时直接用Poller里的接口进行操作
    }
    //关闭所有事件监控
    void DisableAll()
    {
        _events &= 0;
    }
    /*
        关闭事件监控,只是不去关心这个事件了,但还是在EventLoop中。
        移除事件监控,才是真正的将它从EventLoop中移除
    */
    //移除监控
    void Remove();//因为里面用到了Poller的成员,所以需要在Poller代码的下面去实现该函数
    void Update();

    /*
        EventLoop不用 你触发了什么事件我就去调用对应的回调函数,EventLoop不用管。
        EventLoop只管你触发了事件,我就调用你的HandleEvent,你自己来决定什么样的事件该如何处理
        这是最能体现Channel模块作用的功能之一
    */
    void HandleEvent()
    {
        if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI))
        {
            if (_read_callback)
                _read_callback();
        }
        if (_revents & EPOLLOUT)
        {
            if (_write_callback)
                _write_callback();
        }
        if (_revents & EPOLLERR)
        {
            if (_error_callback)
                _error_callback();
        }
        if (_revents & EPOLLHUP)
        {
            if (_close_callback)
                _close_callback();
        }
        if (_event_callback)
            _event_callback();
    }
    
};


#define MAX_EPOLLEVENTS 1024
class Poller
{
private:
    int _epfd;
    struct epoll_event _evs[MAX_EPOLLEVENTS];
    std::unordered_map<int, Channel*> _channels;    //这个和epoll里面的所有文件描述符是强绑定的
private:
    //对epoll的直接操作
    void Update(Channel* channel, int op)
    {
        int fd = channel->Fd();
        struct epoll_event ev;
        ev.data.fd = fd;
        ev.events = channel->Events();
        int ret = epoll_ctl(_epfd, op, fd, &ev);
        if (ret < 0)
        {
            ERR_LOG("EPOLLCTL FAILED");
        }
        return;
    }
    //判断一个Channel是否已经添加了事件监控
    bool HasChannel(Channel* channel)
    {
        auto pos = _channels.find(channel->Fd());
        if (pos == _channels.end())
        {
            return false;
        }
        return true;
    }
public:
    Poller()
    {
        _epfd = epoll_create(MAX_EPOLLEVENTS);
        if (_epfd < 0)
        {
            ERR_LOG("EPOLL CREATE TAILED!");
            abort();    //退出程序
        }
    }
    //添加或修改监控事件
    void UpdateEvent(Channel* channel)
    {
        bool ret = HasChannel(channel);
        if (ret == false)
        {
            _channels.insert(std::make_pair(channel->Fd(), channel));
            return Update(channel, EPOLL_CTL_ADD);
        }
        return Update(channel, EPOLL_CTL_MOD);
    }
    //移除监控
    void RemoveEvent(Channel* channel)
    {
        bool ret = HasChannel(channel);
        if (ret == false)
        {
            return;
        }
        _channels.erase(channel->Fd());
        return Update(channel, EPOLL_CTL_DEL);
    }
    //开始监控,返回活跃连接
    void Poll(std::vector<Channel*>* active)
    {
        int n = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1);
        if (n < 0)
        {
            if (errno == EINTR) //如果被信号打断了
            {
                return;
            }
            ERR_LOG("EPOLL WAIT FAILED:%s", strerror(errno));
            abort();
        }
        for (int i = 0; i < n; ++i)
        {
            auto it = _channels.find(_evs[i].data.fd);
            assert(it != _channels.end());
            it->second->SetREvents(_evs[i].events);
            active->push_back(it->second);
        }
    }
};

void Channel::Remove()//因为EventLoop还没有实现,为了测试Channel和Poller,先暂时直接用Poller里的接口进行操作
{
    return _poller->RemoveEvent(this);
}
void Channel::Update()//因为EventLoop还没有实现,为了测试Channel和Poller,先暂时直接用Poller里的接口进行操作
{
    return _poller->UpdateEvent(this);
}

测试:

#include "../source/server.hpp"


void HandleClose(Channel* channel)
{
    std::cout << "close: " << channel->Fd() << std::endl;
    channel->Remove();  //移除监控
    delete channel;	//这里直接进行delete是不合理的,因为接下来其他地方可能还要使用这个channel
}
void HandleRead(Channel* channel)
{
    int fd = channel->Fd();
    char buf[1024] = {0};
    int ret = recv(fd, buf, 1023, 0);
    if (ret <= 0)   //如果对端关闭,或者读取出错
    {
        return HandleClose(channel);    //关闭释放
    }
    std::cout << buf << std::endl;
    channel->EnableWrite(); //启动可写事件
}
void HandleWrite(Channel* channel)
{
    int fd = channel->Fd();
    const char* data = "天气真不错!!";
    int ret = send(fd, data, strlen(data), 0);
    if (ret < 0)
    {
        return HandleClose(channel);    //关闭释放
    }
    channel->DisableWrite();    //关闭写监控
}
void HandleError(Channel* channel)
{
    return HandleClose(channel);    //关闭释放
}
void HandleEvent(Channel* channel)
{
    std::cout << "有了一个事件!!" << std::endl;
}

void Acceptor(Poller* poller, Channel* lst_channel) 
{
    int fd = lst_channel->Fd();
    int newfd = accept(fd, NULL, NULL);
    if (newfd < 0)  return;
    Channel* channel = new Channel(poller, newfd);
    //对新连接的channel进行设置,设置的是事件到来了该如何处理
    channel->SetReadCallback(std::bind(HandleRead, channel));       //为通信套接字设置可读事件的回调函数
    channel->SetWriteCallback(std::bind(HandleWrite, channel));     //可写事件的回调函数
    channel->SetCloseCallback(std::bind(HandleClose, channel));     //关闭事件的回调函数
    channel->SetErrorCallback(std::bind(HandleError, channel));     //错误事件的回调函数
    channel->SetEventCallback(std::bind(HandleEvent, channel));   //任意事件的回调函数
    channel->EnableRead();  //启动可读事件
}

int main()
{
    Poller poller;
    Socket lst_sock;
    lst_sock.CreateServer(8085);
    //为监听套接字,创建一个Channel进行事件的管理,以及事件的处理
    Channel channel(&poller, lst_sock.Fd());
    channel.SetReadCallback(std::bind(Acceptor, &poller, &channel));
    channel.EnableRead();   //启动可读事件监控
    while (1)
    {
        std::vector<Channel*> actives;
        poller.Poll(&actives);
        for (auto& a : actives)
        {
            a->HandleEvent();
        }
    }
    lst_sock.Close();
    return 0;
}
#include "../source/server.hpp"

int main()
{
    Socket cli_socket;
    cli_socket.CreateClient(8085, "127.0.0.1");
    while (1)
    {
        std::string str = "hello world!!!";
        cli_socket.Send(str.c_str(), str.size());
        char buf[4096] = {0};
        cli_socket.Recv(buf, 4095);
        DBG_LOG("%s", buf);
        sleep(1);
    }
    return 0;
}

EventLoop模块中eventfd的认识

示例

#include <iostream>
#include <sys/eventfd.h>
#include <stdint.h>
#include <cstring>
#include <unistd.h>

int main()
{
    int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
    if (efd < 0)
    {
        perror("eventfd failed");
        return -1;
    }
    uint64_t val = 1;
    write(efd, &val, sizeof(val)); 
    write(efd, &val, sizeof(val));
    write(efd, &val, sizeof(val));
    uint64_t res1 = 0;
    read(efd, &res1, sizeof(res1));
    std::cout << res1 << std::endl;
    uint64_t res2 = 0;
    read(efd, &res2, sizeof(res2));
    std::cout << res2 << std::endl;
    close(efd);
    return 0;
}

EventLoop模块设计思想

更新Channel模块,实现EventLoop模块

class EventLoop;
class Channel
{
private:
    EventLoop* _loop;
    uint32_t _events;   //当前需要监控的事件
    uint32_t _revents;  //当前连接触发的事件
    using EventCallback = std::function<void()>;
    /*
        只有我们的连接才知道,一旦事件触发了该去怎么处理,所以需要设置回调函数。
        当启动读事件监控,就需要将channel挂到EventLoop上面进行事件监控,
        当可读事件触发,就会调用channel里设置的回调函数。
    */
    EventCallback _read_callback;   //可读事件被触发的回调函数
    EventCallback _write_callback;  //可写事件被触发的回调函数
    EventCallback _error_callback;  //错误事件被触发的回调函数
    EventCallback _close_callback;  //连接断开事件被触发的回调函数
    EventCallback _event_callback;  //任意事件被触发的回调函数
    int _fd;
public:
    Channel(EventLoop* loop, int fd)
    :_loop(loop), _fd(fd), _events(0), _revents(0)
    {}
    int Fd()
    {
        return _fd;
    }
    uint32_t Events()
    {
        return _events;
    }
    void SetREvents(uint32_t events)
    {
        _revents = events;
    }
    void SetReadCallback(const EventCallback& cb)
    {
        _read_callback = cb;
    }
    void SetWriteCallback(const EventCallback& cb)
    {
        _write_callback = cb;
    }
    void SetErrorCallback(const EventCallback& cb)
    {
        _error_callback = cb;
    }
    void SetCloseCallback(const EventCallback& cb)
    {
        _close_callback = cb;
    }
    void SetEventCallback(const EventCallback& cb)
    {
        _event_callback = cb;
    }
    //当前是否监控了可读
    bool ReadAble()
    {
        return (_events & EPOLLIN);
    }
    //当前是否监控了可写
    bool WriteAble()   
    {
        return (_events & EPOLLOUT);
    }
    //启动读事件监控
    void EnableRead()
    {
        _events |= EPOLLIN;
        Update();
    }
    //启动可写事件监控
    void EnableWrite()
    {
        _events |= EPOLLOUT;
        Update();
    }
    //关闭读事件监控
    void DisableRead()
    {
        _events &= ~EPOLLIN;
        Update();
    }
    //关闭写事件监控
    void DisableWrite()
    {
        _events &= ~EPOLLOUT;
        Update();
    }
    //关闭所有事件监控
    void DisableAll()
    {
        _events &= 0;
    }
    /*
        关闭事件监控,只是不去关心这个事件了,但还是在EventLoop中。
        移除事件监控,才是真正的将它从EventLoop中移除
    */
    //移除监控
    void Remove();
    void Update();

    /*
        EventLoop不用 你触发了什么事件我就去调用对应的回调函数,EventLoop不用管。
        EventLoop只管你触发了事件,我就调用你的HandleEvent,你自己来决定什么样的事件该如何处理
        这是最能体现Channel模块作用的功能之一
    */
    void HandleEvent()
    {
        if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI))
        {
            if (_event_callback)
                _event_callback();
            if (_read_callback)
                _read_callback();
        }
        //有可能会释放连接的操作事件,一次只处理一个
        if (_revents & EPOLLOUT)
        {
            if (_event_callback)
                _event_callback();
            if (_write_callback)
                _write_callback();
        }
        else if (_revents & EPOLLERR)
        {
            if (_event_callback)
                _event_callback();
            if (_error_callback)
                _error_callback();
        }
        else if (_revents & EPOLLHUP)
        {
            if (_event_callback)
                _event_callback();
            if (_close_callback)
                _close_callback();
        }
    }
};

class EventLoop
{
private:
    std::thread::id _thread_id; //线程ID
    int _event_fd;  //唤醒阻塞的IO事件监控
    std::unique_ptr<Channel> _event_channel;    //为了能更好的管理_event_fd,为其创建一个channel
    Poller _poller; //进行所有的描述符的事件监控

    using Functor = std::function<void()>;
    std::vector<Functor> _tasks;    //任务池
    std::mutex _mutex;     //保证多线程对任务池进行操作的线程安全
public:
    //执行任务池中的所有任务
    void RunAllTask()
    {
        std::vector<Functor> functor;
        {
            std::unique_lock<std::mutex> lock(_mutex);
            functor.swap(_tasks);
        }
        for (auto f : functor)
        {
            f();
        }
    }
    static int CreateEventFd()
    {
        int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
        if (efd < 0)
        {
            perror("eventfd failed");
            abort();
        }
        return efd;
    }
    void ReadEvent()
    {
        uint64_t res = 0;
        int ret = read(_event_fd, &res, sizeof(res));
        if (ret < 0)
        {
            //EINTR -- 被信号打断    EAGAIN -- 暂时无数据可读(非阻塞时才会触发)
            if (errno == EINTR || errno == EAGAIN)
            {
                return;
            }
            ERR_LOG("READ EVENTED FAILED");
            abort();
        }
    }
    void WeakUpEventFd()
    {
        uint64_t val = 1;
        int ret = write(_event_fd, &val, sizeof(val));
        if (ret < 0)
        {
            if (errno == EINTR)
            {
                return;
            }
            ERR_LOG("WRITE EVENTFD FAILED");
            abort();
        }
    }
public:
    EventLoop()
    :_thread_id(std::this_thread::get_id()) //获取的是当前实例化该EventLoop线程的id,进行线程绑定
    ,_event_fd(CreateEventFd())
    ,_event_channel(new Channel(this, _event_fd))
    {
        //给eventfd添加可读事件回调,读取eventfd事件通知次数
        _event_channel->SetReadCallback(std::bind(&EventLoop::ReadEvent, this));
        //启动eventfd的读事件监控
        _event_channel->EnableRead();
    }
    //判断将要执行的任务是否处于当前线程中,如果是则执行,不是则压入队列
    void RunInLoop(const Functor& cb)
    {
        if (IsInLoop())
        {
            return cb();
        }
        return QueueInLoop(cb);
    }
    //将操作压入任务池
    void QueueInLoop(const Functor& cb)
    {
        {
            std::unique_lock<std::mutex> lock(_mutex);
            _tasks.push_back(cb);
        }
        //其他线程把任务投入到你的EventLoop的任务池里面了,需要执行任务池里的这个任务
        //所以需要唤醒这个线程(因为这个线程可能在等待事件就绪的阻塞状态)
        WeakUpEventFd();
    }
    //用于判断当前线程是否是EventLoop对应的线程
    bool IsInLoop()
    {
        return _thread_id == std::this_thread::get_id();
    }
    //添加/修改描述符的事件监控
    void UpdateEvent(Channel* channel)
    {
        _poller.UpdateEvent(channel);
    }
    //移除描述符的监控
    void RemoveEvent(Channel* channel)
    {
        _poller.RemoveEvent(channel);
    }
    void Start()
    {
        while (1)
        {
            //事件监控
            std::vector<Channel*> actives;
            _poller.Poll(&actives);
            //就绪事件处理
            for (auto& channel : actives)
            {
                channel->HandleEvent();
            }
            //执行任务池中的任务
            RunAllTask();
        }
    }
};

void Channel::Remove()
{
    return _loop->RemoveEvent(this);
}
void Channel::Update()
{
    return _loop->UpdateEvent(this);
}

测试:

#include "../source/server.hpp"


void HandleClose(Channel* channel)
{
    std::cout << "close: " << channel->Fd() << std::endl;
    channel->Remove();  //移除监控
    delete channel;
}

void HandleRead(Channel* channel)
{
    int fd = channel->Fd();
    char buf[1024] = {0};
    int ret = recv(fd, buf, 1023, 0);
    if (ret <= 0)   //如果对端关闭,或者读取出错
    {
        HandleClose(channel);    //关闭释放
        return;
    }
    std::cout << buf << std::endl;
    channel->EnableWrite(); //启动可写事件
}

void HandleWrite(Channel* channel)
{
    int fd = channel->Fd();
    const char* data = "天气真不错!!";
    int ret = send(fd, data, strlen(data), 0);
    if (ret < 0)
    {
        return HandleClose(channel);    //关闭释放
    }
    channel->DisableWrite();    //关闭写监控
}
void HandleError(Channel* channel)
{
    return HandleClose(channel);    //关闭释放
}
void HandleEvent(Channel* channel)
{
    std::cout << "有了一个事件!!" << std::endl;
}

void Acceptor(EventLoop* loop, Channel* lst_channel) 
{
    int fd = lst_channel->Fd();
    int newfd = accept(fd, NULL, NULL);
    if (newfd < 0)  return;
    Channel* channel = new Channel(loop, newfd);
    //对新连接的channel进行设置,设置的是事件到来了该如何处理
    channel->SetReadCallback(std::bind(HandleRead, channel));       //为通信套接字设置可读事件的回调函数
    channel->SetWriteCallback(std::bind(HandleWrite, channel));     //可写事件的回调函数
    channel->SetCloseCallback(std::bind(HandleClose, channel));     //关闭事件的回调函数
    channel->SetErrorCallback(std::bind(HandleError, channel));     //错误事件的回调函数
    channel->SetEventCallback(std::bind(HandleEvent, channel));   //任意事件的回调函数
    channel->EnableRead();  //启动可读事件
}

int main()
{
    EventLoop loop;
    Socket lst_sock;
    lst_sock.CreateServer(8085);
    //为监听套接字,创建一个Channel进行事件的管理,以及事件的处理
    Channel channel(&loop, lst_sock.Fd());
    channel.SetReadCallback(std::bind(Acceptor, &loop, &channel));
    channel.EnableRead();   //启动可读事件监控
    while (1)
    {
        std::vector<Channel*> actives;
        loop.Start();
    }
    lst_sock.Close();
    return 0;
}
#include "../source/server.hpp"

int main()
{
    Socket cli_socket;
    cli_socket.CreateClient(8085, "127.0.0.1");
    while (1)
    {
        std::string str = "hello world!!!";
        cli_socket.Send(str.c_str(), str.size());
        char buf[4096] = {0};
        cli_socket.Recv(buf, 4095);
        DBG_LOG("%s", buf);
        sleep(1);
    }
    return 0;
}

EventLoop与TimerWheel定时器模块整合

实现TimerWheel模块和更新EventLoop模块

using TaskFunc = std::function<void()>;
using ReleaseFunc = std::function<void()>;
class TimerTask
{
private:
    uint64_t _id;   //定时器任务对象ID  定时任务必须得找得着,一个程序里定时任务可能有很多
    uint32_t _timeout;  //定时任务的超时时间
    TaskFunc _task_cb;  //定时器要执行的任务
    //用于删除TimerWheel中保存的定时器任务对象信息,定时任务释放的时候也要清理TimerWheel中保存的定时器对象信息
    //为什么将这个_release设置到TimerTask里面呢,不在TimerWheel层管理?
    //因为这个TimerWheel不知道是否某个定时任务真的释放了,而TimerTask是最清楚的,自己真的释放了就会调用析构函数
    ReleaseFunc _release;
    bool _canceled;     //false - 代表没有被取消,true - 代表取消了
public:
    TimerTask(uint64_t id, uint32_t timeout, const TaskFunc& cb)
    :_id(id), _timeout(timeout), _task_cb(cb), _canceled(false)
    {}
    ~TimerTask()
    {
        if (_canceled == false) //如果定时任务没有被取消
            _task_cb();
        _release();
    }
    void SetRelease(const ReleaseFunc& cb)
    {
        _release = cb;
    }
    uint32_t DelayTime()
    {
        return _timeout;
    }
    void Cancel()
    {
        _canceled = true;
    }
};

class TimerWheel
{
private:
    using WeakTask = std::weak_ptr<TimerTask>;
    using PtrTask = std::shared_ptr<TimerTask>;
    int _tick;      //当前的秒针,走到哪里释放哪里,就相当于执行哪里的任务
    int _capacity;  //表盘最大数量 -- 也是最大能设置的延时时间
    std::vector<std::vector<PtrTask>> _wheel;
    std::unordered_map<uint64_t, WeakTask> _timers;     //对所有的定时任务进行管理

    EventLoop* _loop;   //timerwheel所关联的EventLoop
    int _timerfd;       //定时器描述符
    std::unique_ptr<Channel> _timer_channel;
public:
    void RemoveTimer(uint64_t id)
    {
        auto pos = _timers.find(id);
        if (pos != _timers.end())
        {
            _timers.erase(pos);
        }
    }
    static int CreateTimerFd()
    {
        int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);
        if (timerfd < 0)
        {
            ERR_LOG("TIMERFD CREATE FAILED");
            abort();
        }
        struct itimerspec itime;
        itime.it_value.tv_sec = 1;
        itime.it_value.tv_nsec = 0;
        itime.it_interval.tv_sec = 1;
        itime.it_interval.tv_nsec = 0;

        timerfd_settime(timerfd, 0, &itime, NULL);
        return timerfd;
    }
    void ReadTimefd()
    {
        uint64_t times;
        int ret = read(_timerfd, &times, 8);
        if (ret < 0)
        {
            ERR_LOG("READ TIMEFD FAILED");
            abort();
        }
    }
    void RunTimerTask()
    {
        _tick = (_tick + 1) % _capacity;
        _wheel[_tick].clear();  //情况指定位置的数组,就会把数组中保存的所有管理定时器对象的shared_ptr释放掉
    }
    //这个函数每秒钟会被执行一次,相当于秒针向后走了一步
    void OnTime()
    {
        ReadTimefd();
        RunTimerTask();
    }
    void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc& cb)
    {
        //1.构建定时任务
        PtrTask pt(new TimerTask(id, delay, cb));
        pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id));
        //2.将定时任务加入到_wheel中
        int i = (_tick + delay) % _capacity;
        _wheel[i].push_back(pt);
        //3.加入到时间轮的_timers里面
        // std::unordered_map<uint64_t, WeakTask>::iterator pos = _timers.find(id);
        auto pos = _timers.find(id);
        if (pos == _timers.end())
        {
            _timers.insert({id, pt});
        }
    }
    void TimerRefreshInLoop(uint64_t id)  //刷新/延迟定时任务
    {
        //通过id找到对应的定时任务
        auto pos = _timers.find(id);
        if (pos == _timers.end())
        {
            //如果没找到定时任务,则没办法更新
            return;
        }
        //获取到对应定时任务的shared_ptr,并构建一个新的智能指针,对应的计数加1
        PtrTask pt = pos->second.lock();
        //将对应的pt加入到_wheel中
        int delay = pt->DelayTime();
        int i = (_tick + delay) % _capacity;
        _wheel[i].push_back(pt);
    }
    void TimerCancelInLoop(uint64_t id)
    {
        auto pos = _timers.find(id);
        if (pos == _timers.end())
        {
            //没找到定时任务,没法刷新,没法延时
            return;
        }
        PtrTask pt = pos->second.lock();
        if (pt) //如果自己已经销毁,则可能为空
        	pt->Cancel();
    }
public:
    TimerWheel(EventLoop* loop)
    :_capacity(60)
    , _tick(0)
    , _wheel(_capacity)
    , _loop(loop)
    , _timerfd(CreateTimerFd())
    , _timer_channel(new Channel(loop, _timerfd))
    {
        _timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this));
        _timer_channel->EnableRead();
    }
    //定时器中有个_timers成员,定时器信息的操作有可能在多线程中进行(如主线程想添加给所有的连接添加一个定时任务),因此需要考虑线程安全问题
    //如果不想加锁,那就把对应定期的所有操作,都放到一个线程中进行
    void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc& cb); //因为用到了EventLoop的_loop成员,所以要在EventLoop代码后面去实现
    void TimerRefresh(uint64_t id);
    void TimerCancel(uint64_t id);

    //这个接口存在线程安全问题--这个接口不能被外界使用者调用,只能在模块内,在对应的EventLoop线程内执行
    bool HasTimer(uint64_t id)
    {
        auto it = _timers.find(id);
        if (it == _timers.end())
        {
            return false;
        }
        return true;
    }
};

class EventLoop
{
private:
    std::thread::id _thread_id; //线程ID
    int _event_fd;  //唤醒阻塞的IO事件监控
    std::unique_ptr<Channel> _event_channel;    //为了能更好的管理_event_fd,为其创建一个channel
    Poller _poller; //进行所有的描述符的事件监控

    using Functor = std::function<void()>;
    std::vector<Functor> _tasks;    //任务池
    std::mutex _mutex;     //保证多线程对任务池进行操作的线程安全
    TimerWheel _timer_wheel;
public:
    //执行任务池中的所有任务
    void RunAllTask()
    {
        std::vector<Functor> functor;
        {
            std::unique_lock<std::mutex> lock(_mutex);
            functor.swap(_tasks);
        }
        for (auto f : functor)
        {
            f();
        }
    }
    static int CreateEventFd()
    {
        int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
        if (efd < 0)
        {
            perror("eventfd failed");
            abort();
        }
        return efd;
    }
    void ReadEvent()
    {
        uint64_t res = 0;
        int ret = read(_event_fd, &res, sizeof(res));
        if (ret < 0)
        {
            //EINTR -- 被信号打断    EAGAIN -- 暂时无数据可读(非阻塞时才会触发)
            if (errno == EINTR || errno == EAGAIN)
            {
                return;
            }
            ERR_LOG("READ EVENTED FAILED");
            abort();
        }
    }
    void WeakUpEventFd()
    {
        uint64_t val = 1;
        int ret = write(_event_fd, &val, sizeof(val));
        if (ret < 0)
        {
            if (errno == EINTR)
            {
                return;
            }
            ERR_LOG("WRITE EVENTFD FAILED");
            abort();
        }
    }
public:
    EventLoop()
    :_thread_id(std::this_thread::get_id()) //获取的是当前实例化该EventLoop线程的id,进行线程绑定
    ,_event_fd(CreateEventFd())
    ,_event_channel(new Channel(this, _event_fd))
    ,_timer_wheel(this)
    {
        //给eventfd添加可读事件回调,读取eventfd事件通知次数
        _event_channel->SetReadCallback(std::bind(&EventLoop::ReadEvent, this));
        //启动eventfd的读事件监控
        _event_channel->EnableRead();
    }
    //判断将要执行的任务是否处于当前线程中,如果是则执行,不是则压入队列
    void RunInLoop(const Functor& cb)
    {
        if (IsInLoop())
        {
            return cb();
        }
        return QueueInLoop(cb);
    }
    //将操作压入任务池
    void QueueInLoop(const Functor& cb)
    {
        {
            std::unique_lock<std::mutex> lock(_mutex);
            _tasks.push_back(cb);
        }
        //其他线程把任务投入到你的EventLoop的任务池里面了,需要执行任务池里的这个任务
        //所以需要唤醒这个线程(因为这个线程可能在等待事件就绪的阻塞状态)
        WeakUpEventFd();
    }
    //用于判断当前线程是否是EventLoop对应的线程
    bool IsInLoop()
    {
        return _thread_id == std::this_thread::get_id();
    }
    //添加/修改描述符的事件监控
    void UpdateEvent(Channel* channel)
    {
        _poller.UpdateEvent(channel);
    }
    //移除描述符的监控
    void RemoveEvent(Channel* channel)
    {
        _poller.RemoveEvent(channel);
    }
    void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc& cb)
    {
        return _timer_wheel.TimerAdd(id, delay, cb);
    }
    void TimerRefresh(uint64_t id)
    {
        return _timer_wheel.TimerRefresh(id);
    }
    void TiemrCancel(uint64_t id)
    {
        return _timer_wheel.TimerCancel(id);
    }
    bool HasTimer(uint64_t id)
    {
        return _timer_wheel.HasTimer(id);
    }
    void Start()
    {
        while (1)
        {
            //事件监控
            std::vector<Channel*> actives;
            _poller.Poll(&actives);
            //就绪事件处理
            for (auto& channel : actives)
            {
                channel->HandleEvent();
            }
            //执行任务池中的任务
            RunAllTask();
        }
    }
};

void TimerWheel::TimerAdd(uint64_t id, uint32_t delay, const TaskFunc& cb)
{
    _loop->RunInLoop(std::bind(&TimerWheel::TimerAddInLoop, this, id, delay, cb));
}
void TimerWheel::TimerRefresh(uint64_t id)
{
    _loop->RunInLoop(std::bind(&TimerWheel::TimerRefreshInLoop, this, id));
}
void TimerWheel::TimerCancel(uint64_t id)
{
    _loop->RunInLoop(std::bind(&TimerWheel::TimerCancelInLoop, this, id));
}

测试:

#include "../source/server.hpp"


void HandleClose(Channel* channel)
{
    DBG_LOG("close: %d", channel->Fd());
    channel->Remove();  //移除监控
    delete channel;
}

void HandleRead(Channel* channel)
{
    int fd = channel->Fd();
    char buf[1024] = {0};
    int ret = recv(fd, buf, 1023, 0);
    if (ret <= 0)   //如果对端关闭,或者读取出错
    {
        HandleClose(channel);    //关闭释放
        return;
    }
    DBG_LOG("%s", buf);
    channel->EnableWrite(); //启动可写事件
}

void HandleWrite(Channel* channel)
{
    int fd = channel->Fd();
    const char* data = "天气真不错!!";
    int ret = send(fd, data, strlen(data), 0);
    if (ret < 0)
    {
        return HandleClose(channel);    //关闭释放
    }
    channel->DisableWrite();    //关闭写监控
}
void HandleError(Channel* channel)
{
    return HandleClose(channel);    //关闭释放
}
void HandleEvent(EventLoop* loop, Channel* channel, uint64_t timerid)
{
    loop->TimerRefresh(timerid);
}

void Acceptor(EventLoop* loop, Channel* lst_channel) 
{
    int fd = lst_channel->Fd();
    int newfd = accept(fd, NULL, NULL);
    if (newfd < 0)  return;
    int timerid = rand() % 10000;
    Channel* channel = new Channel(loop, newfd);
    //对新连接的channel进行设置,设置的是事件到来了该如何处理
    channel->SetReadCallback(std::bind(HandleRead, channel));       //为通信套接字设置可读事件的回调函数
    channel->SetWriteCallback(std::bind(HandleWrite, channel));     //可写事件的回调函数
    channel->SetCloseCallback(std::bind(HandleClose, channel));     //关闭事件的回调函数
    channel->SetErrorCallback(std::bind(HandleError, channel));     //错误事件的回调函数
    channel->SetEventCallback(std::bind(&HandleEvent, loop, channel, timerid));   //任意事件的回调函数

    //非活跃连接的超时释放操作,10s后关闭连接
    //注意:定时销毁任务,必须在启动读事件之前,因为有可能启动了事件监控后,立即就有了事件,就去处理事件了,
    //但是这时候还没有定时任务,之后就会去执行TimerRefresh,虽然我们做了防御性编程,但这总归来说是不好的。
    loop->TimerAdd(timerid, 10, std::bind(&HandleClose, channel));
    channel->EnableRead();  //启动可读事件
}

int main()
{
    srand(time(NULL));
    EventLoop loop;
    Socket lst_sock;
    lst_sock.CreateServer(8085);
    //为监听套接字,创建一个Channel进行事件的管理,以及事件的处理
    Channel channel(&loop, lst_sock.Fd());
    channel.SetReadCallback(std::bind(Acceptor, &loop, &channel));
    channel.EnableRead();   //启动可读事件监控
    while (1)
    {
        loop.Start();
    }
    lst_sock.Close();
    return 0;
}
#include "../source/server.hpp"

int main()
{
    Socket cli_socket;
    cli_socket.CreateClient(8085, "127.0.0.1");
    for (int i = 0; i < 5; ++i)
    {
        std::string str = "hello world!!!";
        cli_socket.Send(str.c_str(), str.size());
        char buf[4096] = {0};
        cli_socket.Recv(buf, 4095);
        DBG_LOG("%s", buf);
        sleep(1);
    }
    while (1)
    {}
    return 0;
}

EventLoop模块联调中的模块流程关系图

Connection模块设计思想

实现Connection模块

class Any
{
public:
    class holder
    {
    public:
        virtual ~holder()
        {}
        virtual const std::type_info& type() = 0;  //获取当前子类的数据类型 -- 返回类型是const type_info&
        virtual holder* clone() = 0;        //针对当前的对象自身,克隆出一个新的子类对象
    };

    template<class T>
    class placeholder : public holder
    {
    public:
        placeholder(const T& val = T()):_val(val)
        {}
        virtual const std::type_info& type()
        {
            return typeid(T);
        }
        virtual holder* clone()
        {
            return new placeholder(_val);
        }
        T _val;     //主要是这里,如果不通过virtual,则无法释放这里的对象
    };

    Any():_content(NULL)
    {}
    template <class T>
    Any(const T& content):_content(new placeholder<T>(content))
    {}
    Any(const Any& other)
    :_content(other._content ? other._content->clone() : NULL)
    {}
    ~Any()
    {
        delete _content;
    }
    Any& swap(Any& other)
    {
        std::swap(_content, other._content);
        return *this;
    }
    template<class T>
    Any& operator=(const T& val)
    {
        Any(val).swap(*this);   //这样写的好处:Any(val)为临时对象,交换完生命周期就到了,就会调用自己的析构函数
        return *this;
    }
    Any& operator=(const Any& other)
    {
        Any(other).swap(*this);
        return *this;
    }

    template<class T>
    T* get()    //返回子类对象保存数据的指针
    {
        if (typeid(T) != _content->type())  //如果你要的类型和我保存的类型不匹配
            return NULL;
        return &(((placeholder<T>*)_content)->_val);
    }
public:
    holder* _content;
};

typedef enum
{
    DISCONNECTED,   //连接关闭状态
    CONNECTING,     //连接建立成功 - 待处理状态
    CONNECTED,      //连接建立完成,各种设置已完成,可以通信的状态
    DISCONNECTING   //待关闭状态
}ConnStatu;

class Connection : public std::enable_shared_from_this<Connection>
{
private:
    uint64_t _conn_id;  //连接的唯一ID,也是定时任务唯一ID,便于连接的管理和查找
    int _sockfd;        //连接关联的文件描述符
    bool _enable_inactive_release;  //连接是否启动非活跃销毁的判断标志,默认为false
    EventLoop* _loop;   //连接所关联的一个EventLoop
    ConnStatu _statu;   //连接状态
    Socket _socket;     //套接字操作管理
    Channel _channel;   //连接的事件管理
    Buffer _in_buffer;  //输入缓冲区 -- 存放从socket中读取到的数据
    Buffer _out_buffer; //输出缓冲区 -- 存放要发送给对端的数据
    Any _context;       //请求的接收处理上下文

    //以下这四个回调函数吗,是让Server模块来设置的(服务器模块的处理回调是组件使用者设置的)
    using ConnectedCallback = std::function<void(const PtrConnection&)>;
    using MessageCallback = std::function<void(const PtrConnection&, Buffer*)>;
    using ClosedCallback = std::function<void(const PtrConnection&)>;
    using AnyEventCallback = std::function<void(const PtrConnection&)>;
    ConnectedCallback _connected_callback;
    MessageCallback _message_callback;
    ClosedCallback _closed_callback;
    AnyEventCallback _event_callback;
    //组件内的连接关闭回调 -- 组件内设置的,因为服务器组件内会把所有的连接管理起来。
    //一旦某个连接要关闭,就应该从管理的地方移除掉自己的信息。
    ClosedCallback _server_closed_callback;
private:
    //五个channel的事件回调函数
    //描述符可读事件触发后调用的函数,接收socket数据放到接收缓冲区中,然后调用_message_callback
    void HandleRead()
    {
        char buf[65536];
        ssize_t ret = _socket.NonBlockRecv(buf, 65535);
        if (ret < 0)
        {
            //读出错了,可能是客户端关闭,不能直接关闭连接,因为可能有数据没发送或者有数据还没处理
            return ShutdownInLoop();
        }
        else if (ret == 0)
        {
            //表示没有读取到数据,并不是连接断开,因为我们调用的是自己封装的NonBlockRecv
            return;
        }
        _in_buffer.WriteAndPush(buf, ret);
        //2.调用message_callback进行业务处理
        if (_in_buffer.ReadAbleSize() > 0)
        {
            //shared_from_this -- 从当前对象自身获取自身的shared_ptr管理对象
            _message_callback(shared_from_this(), &_in_buffer);
        }
    }
    //描述符可写事件触发后调用的函数,将发送缓冲区中的数据进行发送
    void HandleWrite()
    {
        ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPosition(), _out_buffer.ReadAbleSize());
        if (ret < 0)
        {
            if (_in_buffer.ReadAbleSize() > 0)
            {
                _message_callback(shared_from_this(), &_in_buffer);
            }
            return ReleaseInLoop(); //实际的关闭释放操作
        }
        _out_buffer.MoveReadOffset(ret);    //千万不要忘了,将读偏移向后移动
        if (_out_buffer.ReadAbleSize() == 0)
        {
            _channel.DisableWrite();    //如果数据发送完了,就关闭写事件监控
            //如果当前是连接待关闭状态,并且数据发送完毕,则可以将连接直接释放
            if (_statu == DISCONNECTING)
            {
                return ReleaseInLoop();
            }
        }
        //发送数据可能发不完,不关闭写事件监控
        return;
    }
    //描述符触发挂断事件
    void HandleClose()
    {
        if (_in_buffer.ReadAbleSize() > 0)
        {
            _message_callback(shared_from_this(), &_in_buffer);
        }
        return ReleaseInLoop();
    }
    //描述符触发出错事件
    void HandleError()
    {
        return HandleClose();
    }
    //描述符触发任意事件
    void HandleEvent()
    {
        //刷新连接活跃度
        if (_enable_inactive_release == true)
        {
            _loop->TimerRefresh(_conn_id);
        }
        //调用组件使用者的任意事件回调
        if (_event_callback)
            _event_callback(shared_from_this());
    }
    //连接获取之后,所处的状态下要进行各种设置
    void EstablishedInLoop()
    {
        //修改连接状态
        assert(_statu == CONNECTING);
        _statu = CONNECTED;
        _channel.EnableRead();
        if (_connected_callback)
            _connected_callback(shared_from_this());
    }
    //这个接口才是实际的释放接口
    void ReleaseInLoop()
    {
        //修改连接状态,将其置为DISCONNECTED
        _statu = DISCONNECTED;
        //移除连接的事件监控
        _channel.Remove();
        //关闭描述符
        _socket.Close();
        //如果当前定时器队列中还有定时任务,则取消任务
        if (_loop->HasTimer(_conn_id))
            CancelInactiveRelease();
        //调用关闭回调函数,避免先移除服务器管理的连接信息导致Connection被释放,因此先调用户的回调函数
        if (_closed_callback)
            _closed_callback(shared_from_this());
        if (_server_closed_callback)
            _server_closed_callback(shared_from_this());
    }
    //这个接口并不是实际的发送接口,而只是把数据放到了发送缓冲区,启动了可写事件监控
    //为什么要这么做?因为可写条件可能不就绪,即内核缓冲区的数据满了,写不进去了
    void SendInLoop(Buffer buf)
    {
        if (_statu == DISCONNECTED) //如果状态已经关闭,则直接return,已经关闭则代表发送缓冲区数据为0
            return;
        _out_buffer.WriteBufferAndPush(buf);
        if (_channel.WriteAble() == false)
            _channel.EnableWrite();
    }
    //这个关闭操作并非实际的连接释放操作,需要判断还有没有数据待处理,待发送
    void ShutdownInLoop()
    {
        _statu = DISCONNECTING; //设置为半关闭状态
        if (_in_buffer.ReadAbleSize() > 0)
            _message_callback(shared_from_this(), &_in_buffer);
        if (_out_buffer.ReadAbleSize() > 0)
            _channel.EnableWrite();
        //因为可能发送缓冲区将数据发送不完,所以写关心就不用关闭了,也不用真正释放了
        if (_out_buffer.ReadAbleSize() == 0)
        {
            _channel.DisableWrite();
            ReleaseInLoop();
        }
    }
    //启动非活跃连接超时释放规则
    void EnableInactiveReleaseInLoop(int sec)
    {
        //将判断标志 _enable_inactive_erlease置为true
        _enable_inactive_release = true;
        //如果当前定时销毁任务已经存在,那就刷新延迟一下即可
        if (_loop->HasTimer(_conn_id))
            return _loop->TimerRefresh(_conn_id);
        //如果不存在定时销毁任务,则新增
        _loop->TimerAdd(_conn_id, sec, std::bind(&Connection::ReleaseInLoop, this));
    }
    //取消非活跃连接超时释放规则
    void CancelInactiveReleaseInLoop()
    {
        _enable_inactive_release = false;
        if (_loop->HasTimer(_conn_id))
            _loop->TimerCancel(_conn_id);
    }
    //切换/升级协议
    void UpgradeInLoop(const Any& context, const ConnectedCallback& conn, const MessageCallback& msg, const ClosedCallback& closed, const AnyEventCallback& event)
    {
        _context = context; //改变上下文
        _connected_callback = conn;
        _message_callback = msg;
        _closed_callback = closed;
        _event_callback = event;
    }
public:
    Connection(EventLoop* loop, uint64_t conn_id, int sockfd)
    :_conn_id(conn_id)
    ,_sockfd(sockfd)
    ,_enable_inactive_release(false)
    ,_loop(loop)
    ,_statu(CONNECTING)
    ,_socket(sockfd)
    ,_channel(loop, _sockfd)
    {
        _channel.SetReadCallback(std::bind(&Connection::HandleRead, this));
        _channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this));
        _channel.SetCloseCallback(std::bind(&Connection::HandleClose, this));
        _channel.SetErrorCallback(std::bind(&Connection::HandleError, this));
        _channel.SetEventCallback(std::bind(&Connection::HandleEvent, this));
    }
    ~Connection()
    {
        DBG_LOG("RELEASE CONNEDCTION: %p", this);
    }
    //获取管理的文件描述符
    int Fd()
    {
        return _sockfd;
    }
    //获取连接ID
    int Id()
    {
        return _conn_id;
    }
    //是否处于CONNECTED状态
    bool Connected()
    {
        return (_statu == CONNECTED);
    }
    //设置上下文 -- 连接建立完成时进行调用
    void SetContext(const Any& context)
    {
        _context = context;
    }
    //获取上下文,返回的是指针
    Any* GetContext()
    {
        return &_context;
    }
    void SetConnectedCallback(const ConnectedCallback& cb)
    {
        _connected_callback = cb;
    }
    void SetMessageCallback(const MessageCallback& cb)
    {
        _message_callback = cb;
    }
    void SetClosedCallback(const ClosedCallback& cb)
    {
        _closed_callback = cb;
    }
    void SetSvrClosedCallback(const ClosedCallback& cb)
    {
        _server_closed_callback = cb;
    }
    void SetAnyEventCallback(const AnyEventCallback& cb)
    {
        _event_callback = cb;
    }
    //连接建立就绪后,进行channel回调设置,启动读监控,调用_connected_callback
    void Establised()
    {
        _loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this));
    }
    //发送数据,将数据放到发送缓冲区,启动写事件监控
    void Send(const char* data, size_t len)
    {
        Buffer buf; //为什么要重新创建一个临时变量?因为data可能是一个可能被释放的空间,我们将其压入任务队列等待被执行的过程中,空间可能被释放了
        buf.WriteAndPush(data, len);
        _loop->RunInLoop(std::bind(&Connection::SendInLoop, this, std::move(buf)));
    }
    //提供给组件使用者的关闭接口 -- 并不实际关闭,需要判断有没有数据待处理
    void Shutdown()
    {
        _loop->RunInLoop(std::bind(&Connection::ShutdownInLoop, this));
    }
    //启动非活跃销毁,并定义多长时间无通信就是非活跃,添加定时任务
    void EnableInactiveRelease(int sec)
    {
        _loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop, this, sec));
    }
    //取消非活跃销毁
    void CancelInactiveRelease()
    {
        _loop->RunInLoop(std::bind(&Connection::CancelInactiveReleaseInLoop, this));
    }
    //切换协议 -- 重置上下文以及阶段性处理函数
    void Upgrade(const Any& context, const ConnectedCallback& conn, const MessageCallback& msg, const ClosedCallback& closed, const AnyEventCallback& event)
    {
        _loop->RunInLoop(std::bind(&Connection::UpgradeInLoop, this, context, conn, msg, closed, event));
    }
};

测试:

#include "../source/server.hpp"

//管理所有的连接
std::unordered_map<uint64_t, PtrConnection> _conns;
uint64_t conn_id = 0;

void ConnectionDestroy(const PtrConnection& conn)   //这些函数未来是TcpServer模块提供的
{
    _conns.erase(conn->Id());
}
void OnConnected(const PtrConnection& conn)
{
    DBG_LOG("NEW CONNECTION: %p", conn.get());
}
void OnMessage(const PtrConnection& conn, Buffer* buf)
{
    buf->MoveReadOffset(buf->ReadAbleSize());
    std::string str = "Hello World";
    conn->Send(str.c_str(), str.size());
}
void Acceptor(EventLoop* loop, Channel* lst_channel) 
{
    int fd = lst_channel->Fd();
    int newfd = accept(fd, NULL, NULL);
    if (newfd < 0)  return;
    conn_id++;
    PtrConnection conn(new Connection(loop, conn_id, newfd));
    conn->SetMessageCallback(OnMessage);
    conn->SetSvrClosedCallback(ConnectionDestroy);
    conn->SetConnectedCallback(OnConnected);
    conn->EnableInactiveRelease(10);    //启动非活跃超时销毁
    conn->Establised(); //就绪初始化
    _conns.insert(std::make_pair(conn_id, conn));
}

int main()
{
    EventLoop loop;
    Socket lst_sock;
    lst_sock.CreateServer(8085);
    //为监听套接字,创建一个Channel进行事件的管理,以及事件的处理
    Channel channel(&loop, lst_sock.Fd());
    channel.SetReadCallback(std::bind(Acceptor, &loop, &channel));
    channel.EnableRead();   //启动可读事件监控
    while (1)
    {
        loop.Start();
    }
    lst_sock.Close();
    return 0;
}
#include "../source/server.hpp"

int main()
{
    Socket cli_socket;
    cli_socket.CreateClient(8085, "127.0.0.1");
    for (int i = 0; i < 5; ++i)
    {
        std::string str = "hello world!!!";
        cli_socket.Send(str.c_str(), str.size());
        char buf[4096] = {0};
        cli_socket.Recv(buf, 4095);
        DBG_LOG("%s", buf);
        sleep(1);
    }
    while (1)
    {}
    return 0;
}

Acceptor模块设计思想

实现Acceptor模块

class Acceptor
{
private:
    Socket _socket;     //用于创建监听套接字
    EventLoop* _loop;   //用于对监听套接字进行事件监控
    Channel _channel;   //对于对监听套接字进行事件管理

    using AcceptCallback = std::function<void(int)>;
    AcceptCallback _accept_callback;    //由Server模块提供的回调函数
private:
    //监听套接字的读事件回调处理函数 -- 获取新连接,调用_accept_callback函数进行新连接处理
    void HandleRead()
    {
        int newfd = _socket.Accept();
        if (newfd < 0)
            return;
        if (_accept_callback)
            _accept_callback(newfd);
    }
    int CreateServer(uint16_t port)
    {
        bool ret = _socket.CreateServer(port);
        if (ret < 0)
            abort();
        return _socket.Fd();
    }
public:
    Acceptor(EventLoop* loop, int port)
    :_loop(loop)
    ,_socket(CreateServer(port))
    ,_channel(loop, _socket.Fd())
    {
        _channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this));
    }
    void SetAcceptCallback(const AcceptCallback& cb)
    {
        _accept_callback = cb;
    }
    void Listen()
    {
        _channel.EnableRead();//启动监听的可读事件监控 -- 会自动将自己挂到poller中
    }
};

测试:

#include "../source/server.hpp"


//管理所有的连接
std::unordered_map<uint64_t, PtrConnection> _conns;
uint64_t conn_id = 0;
EventLoop loop;

void ConnectionDestroy(const PtrConnection& conn)   //这些函数未来是TcpServer模块提供的
{
    _conns.erase(conn->Id());
}
void OnConnected(const PtrConnection& conn)
{
    DBG_LOG("NEW CONNECTION: %p", conn.get());
}
void OnMessage(const PtrConnection& conn, Buffer* buf)
{
    buf->MoveReadOffset(buf->ReadAbleSize());
    std::string str = "Hello World";
    conn->Send(str.c_str(), str.size());
}
void NewConnection(int fd) 
{
    conn_id++;
    PtrConnection conn(new Connection(&loop, conn_id, fd));
    conn->SetMessageCallback(OnMessage);
    conn->SetSvrClosedCallback(ConnectionDestroy);
    conn->SetConnectedCallback(OnConnected);
    conn->EnableInactiveRelease(10);    //启动非活跃超时销毁
    conn->Establised(); //就绪初始化
    _conns.insert(std::make_pair(conn_id, conn));
}

int main()
{
    Acceptor acceptor(&loop, 8085);
    acceptor.SetAcceptCallback(NewConnection);
    acceptor.Listen();
    while (1)
    {
        loop.Start();
    }
    return 0;
}

#include "../source/server.hpp"

int main()
{
    Socket cli_socket;
    cli_socket.CreateClient(8085, "127.0.0.1");
    for (int i = 0; i < 5; ++i)
    {
        std::string str = "hello world!!!";
        cli_socket.Send(str.c_str(), str.size());
        char buf[4096] = {0};
        cli_socket.Recv(buf, 4095);
        DBG_LOG("%s", buf);
        sleep(1);
    }
    while (1)
    {}
    return 0;
}

LoopThread模块设计思想

实现LoopThread模块

class LoopThread
{
private:
    std::mutex _mutex;          //互斥锁
    std::condition_variable _cond;  //条件变量
    EventLoop* _loop;
    std::thread _thread;        //EventLoop对应的线程
private:
    //实例化EventLoop对象,并且开始运行EventLoop模块的功能
    void ThreadEntry()
    {
        EventLoop loop; //因为下面Start会一直循环运行,所以EventLoop的生命周期不会结束
        {
            std::unique_lock<std::mutex> lock(_mutex);
            _loop = &loop;
            _cond.notify_all();
        }
        loop.Start();
    }
public:
    LoopThread()
    :_loop(NULL)
    ,_thread(std::thread(&LoopThread::ThreadEntry))//创建线程,设定线程入口函数
    {}
    //返回当前线程关联的EventLoop对象指针
    EventLoop* GetLoop()
    {
        EventLoop* loop = NULL;
        {
            std::unique_lock<std::mutex> lock(_mutex);  //加锁
            //第二个参数时一个bool的函数,如果为false就一直阻塞住,被唤醒才能继续往下走
            _cond.wait(lock, [&](){
                return _loop != nullptr;
            });
            loop = _loop;
        }
        return loop;
    }
};

测试:

#include "../source/server.hpp"


//管理所有的连接
std::unordered_map<uint64_t, PtrConnection> _conns;
uint64_t conn_id = 0;
EventLoop base_loop;
std::vector<LoopThread> threads(2);
int next_loop = 0;

void ConnectionDestroy(const PtrConnection& conn)   //这些函数未来是TcpServer模块提供的
{
    _conns.erase(conn->Id());
}
void OnConnected(const PtrConnection& conn)
{
    DBG_LOG("NEW CONNECTION: %p", conn.get());
}
void OnMessage(const PtrConnection& conn, Buffer* buf)
{
    buf->MoveReadOffset(buf->ReadAbleSize());
    std::string str = "Hello World";
    conn->Send(str.c_str(), str.size());
}
void NewConnection(int fd) 
{
    conn_id++;
    next_loop = (next_loop + 1) % 2;
    PtrConnection conn(new Connection(threads[next_loop].GetLoop(), conn_id, fd));
    conn->SetMessageCallback(OnMessage);
    conn->SetSvrClosedCallback(ConnectionDestroy);
    conn->SetConnectedCallback(OnConnected);
    conn->EnableInactiveRelease(10);    //启动非活跃超时销毁
    conn->Establised(); //就绪初始化
    _conns.insert(std::make_pair(conn_id, conn));
}

int main()
{
    Acceptor acceptor(&base_loop, 8085);
    acceptor.SetAcceptCallback(NewConnection);
    acceptor.Listen();
    while (1)
    {
        base_loop.Start();
    }
    return 0;
}
#include "../source/server.hpp"

int main()
{
    Socket cli_socket;
    cli_socket.CreateClient(8085, "127.0.0.1");
    for (int i = 0; i < 5; ++i)
    {
        std::string str = "hello world!!!";
        cli_socket.Send(str.c_str(), str.size());
        char buf[4096] = {0};
        cli_socket.Recv(buf, 4095);
        DBG_LOG("%s", buf);
        sleep(1);
    }
    while (1)
    {}
    return 0;
}

LoopThreadPool模块设计思想

实现LoopThread模块

class LoopThreadPool
{
private:
    int _thread_count;  //从属线程的数量
    int _next_idx;
    EventLoop* _baseLoop;   //主EventLoop,运行在主线程,从属线程数量为0,则所有操作都在baseloop中进行
    std::vector<LoopThread*> _threads;  //保存所有的LoopThread对象
    std::vector<EventLoop*> _loops;     //从属线程数量大于0则从_loops中进行线程EventLoop分配
public:
    LoopThreadPool(EventLoop* baseLoop)
    :_thread_count(0)
    ,_next_idx(0)
    ,_baseLoop(baseLoop)
    {}
    //设置线程数量
    void SetThreadCount(int count)
    {
        _thread_count = count;
    }
    //创建所有的从属线程
    void Create()
    {
        if (_thread_count > 0)
        {
            _threads.resize(_thread_count);
            _loops.resize(_thread_count);
            for (int i = 0; i < _thread_count; ++i)
            {
                _threads[i] = new LoopThread;
                _loops[i] = _threads[i]->GetLoop();
            }
        }
    }
    //为了实现RR轮转,返回下一个从属线程的EventLoop
    EventLoop* NextLoop()
    {
        if (_thread_count == 0)
        {
            return _baseLoop;
        }
        _next_idx = (_next_idx + 1) % _thread_count;
        return _loops[_next_idx];
    }
};

测试:

#include "../source/server.hpp"


//管理所有的连接
std::unordered_map<uint64_t, PtrConnection> _conns;
uint64_t conn_id = 0;
EventLoop base_loop;
LoopThreadPool* loopPool;

void ConnectionDestroy(const PtrConnection& conn)   //这些函数未来是TcpServer模块提供的
{
    _conns.erase(conn->Id());
}
void OnConnected(const PtrConnection& conn)
{
    DBG_LOG("NEW CONNECTION: %p", conn.get());
}
void OnMessage(const PtrConnection& conn, Buffer* buf)
{
    buf->MoveReadOffset(buf->ReadAbleSize());
    std::string str = "Hello World";
    conn->Send(str.c_str(), str.size());
}
void NewConnection(int fd) 
{
    conn_id++;
    PtrConnection conn(new Connection(loopPool->NextLoop(), conn_id, fd));
    conn->SetMessageCallback(OnMessage);
    conn->SetSvrClosedCallback(ConnectionDestroy);
    conn->SetConnectedCallback(OnConnected);
    conn->EnableInactiveRelease(10);    //启动非活跃超时销毁
    conn->Establised(); //就绪初始化
    _conns.insert(std::make_pair(conn_id, conn));
    DBG_LOG("获取到了一个新的连接");
}

int main()
{
    loopPool = new LoopThreadPool(&base_loop);
    // loopPool->SetThreadCount(2);
    loopPool->Create();
    Acceptor acceptor(&base_loop, 8085);
    acceptor.SetAcceptCallback(NewConnection);
    acceptor.Listen();
    while (1)
    {
        base_loop.Start();
    }
    return 0;
}
#include "../source/server.hpp"

int main()
{
    Socket cli_socket;
    cli_socket.CreateClient(8085, "127.0.0.1");
    for (int i = 0; i < 5; ++i)
    {
        std::string str = "hello world!!!";
        cli_socket.Send(str.c_str(), str.size());
        char buf[4096] = {0};
        cli_socket.Recv(buf, 4095);
        DBG_LOG("%s", buf);
        sleep(1);
    }
    while (1)
    {}
    return 0;
}

TcpServer模块设计思想

实现TcpServer模块:

class TcpServer
{
private:
    uint16_t _port;
    uint64_t _next_id;  //这是Connection和定时任务公用的id
    int _timeout;       //非活跃连接的超时时间
    bool _enable_inactive_release;  //是否启动了非活跃连接超时销毁的判断标志
    EventLoop _baseloop;    //主线程EventLoop对象,负责 监听事件的处理
    Acceptor _acceptor;     //监听套接字的管理对象
    LoopThreadPool _pool;   //这是从属EventLoop线程池
    std::unordered_map<uint64_t, PtrConnection> _conns; //保存管理所有连接对应的shared_ptr对象 -- 这里面的东西被删除,就意味着这个连接在某个不久的将来会被释放

    //用户设置的回调函数 -- 未来要设置给Connection
    using ConnectedCallback = std::function<void(const PtrConnection&)>;
    using MessageCallback = std::function<void(const PtrConnection&, Buffer*)>;
    using ClosedCallback = std::function<void(const PtrConnection&)>;
    using AnyEventCallback = std::function<void(const PtrConnection&)>;
    using Functor = std::function<void()>;
    ConnectedCallback _connected_callback;
    MessageCallback _message_callback;
    ClosedCallback _closed_callback;
    AnyEventCallback _event_callback;
private:
    //为新连接构造一个Connection
    void NewConnection(int fd)
    {
        _next_id++;
        PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd));
        conn->SetMessageCallback(_message_callback);
        conn->SetClosedCallback(_closed_callback);
        conn->SetSvrClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));
        conn->SetConnectedCallback(_connected_callback);
        conn->SetAnyEventCallback(_event_callback);
        if (_enable_inactive_release == true)
            conn->EnableInactiveRelease(_timeout);    //启动非活跃超时销毁
        conn->Establised(); //就绪初始化
        _conns.insert(std::make_pair(_next_id, conn));
        DBG_LOG("获取到了一个新的连接");
    }
    void RemoveConnectionInLoop(const PtrConnection& conn)
    {
        int id = conn->Id();
        auto it = _conns.find(id);
        if (it != _conns.end())
        {
            _conns.erase(it);
        }
    }
    //从管理Connection的_conns中移除连接信息 -- 因为对STL容器进行操作,所以需要考虑线程安全问题
    void RemoveConnection(const PtrConnection& conn)
    {
        _baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop, this, conn));
    }
    void RunAfterInLoop(const Functor& task, int delay)
    {
        _next_id++;
        _baseloop.TimerAdd(_next_id, delay, task);
    }
public:
    TcpServer(int port)
    :_port(port), _next_id(0), _enable_inactive_release(false), _acceptor(&_baseloop, port)
    ,_pool(&_baseloop)
    {
        _acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));
        _acceptor.Listen();
    }
    //设置从属线程数量
    void SetThreadCount(int count)
    {
        return _pool.SetThreadCount(count);
    }
    void SetConnectedCallback(const ConnectedCallback& cb)
    {
        _connected_callback = cb;
    }
    void SetMessageCallback(const MessageCallback& cb)
    {
        _message_callback = cb;
    }
    void SetClosedCallback(const ClosedCallback& cb)
    {
        _closed_callback = cb;
    }
    void SetAnyEventCallback(const AnyEventCallback& cb)
    {
        _event_callback = cb;
    }
    void EnalbeInactiveRelease(int timeout)
    {
        _timeout = timeout;
        _enable_inactive_release = true;
    }
    //用于添加一个定时任务 -- 提供给我们用户的
    void RunAfter(const Functor& task, int delay)
    {
        _baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop, this, task, delay));
    }
    void Start()
    {
        _pool.Create();
        _baseloop.Start();
    }
};
//可以把网络通信看成一个管道通信,这个类的功能是:防止客户端退出即读端关闭,服务器写端还在写,服务器极小几率可能会收到管道信号,导致服务器退出。
class NetWork
{
public:
    NetWork()
    {
        DBG_LOG("SIGPIPE INIT");
        signal(SIGPIPE, SIG_IGN);
    }
};
static NetWork nw;  //主要是调用该类的构造函数

测试:

#include "../source/server.hpp"

void OnConnected(const PtrConnection& conn)
{
    DBG_LOG("NEW CONNECTION: %p", conn.get());
}
void OnClosed(const PtrConnection& conn)
{
    DBG_LOG("CLOSE CONNECTION:%p", conn.get());
}
void OnMessage(const PtrConnection& conn, Buffer* buf)
{
    DBG_LOG("%s", buf->ReadPosition());
    buf->MoveReadOffset(buf->ReadAbleSize());
    std::string str = "Hello World";
    conn->Send(str.c_str(), str.size());
}

int main()
{
    TcpServer server(8085);
    server.SetThreadCount(2);
    server.EnalbeInactiveRelease(10);
    server.SetConnectedCallback(OnConnected);
    server.SetMessageCallback(OnMessage);
    server.Start();
    return 0;
}
#include "../source/server.hpp"

int main()
{
    Socket cli_socket;
    cli_socket.CreateClient(8085, "127.0.0.1");
    for (int i = 0; i < 5; ++i)
    {
        std::string str = "hello world!!!";
        cli_socket.Send(str.c_str(), str.size());
        char buf[4096] = {0};
        cli_socket.Recv(buf, 4095);
        DBG_LOG("%s", buf);
        sleep(1);
    }
    while (1)
    {}
    return 0;
}

基于TcpServer实现回显服务器

实现Echo.hpp服务器

#include "../server.hpp"

class EchoServer
{
private:
    TcpServer _server;
private:
    void OnConnected(const PtrConnection& conn)
    {
        DBG_LOG("NEW CONNECTGION:%p", conn.get());
    }
    void OnClosed(const PtrConnection& conn)
    {
        DBG_LOG("CLOSE CONNECTION:%p", conn.get());
    }
    //我们将echo服务器设置为短连接即处理一次业务逻辑就关闭连接,因为长连接太占用服务器资源了
    void OnMessage(const PtrConnection& conn, Buffer* buf)
    {
        conn->Send(buf->ReadPosition(), buf->ReadAbleSize());
        buf->MoveReadOffset(buf->ReadAbleSize());
        conn->Shutdown();
    }
public:
    EchoServer(int port)
    :_server(port)
    {
        _server.SetThreadCount(2);
        _server.EnalbeInactiveRelease(10);
        _server.SetClosedCallback(std::bind(&EchoServer::OnClosed, this, std::placeholders::_1));
        _server.SetConnectedCallback(std::bind(&EchoServer::OnConnected, this, std::placeholders::_1));
        _server.SetMessageCallback(std::bind(&EchoServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2));
    }
    void Start()
    {
        _server.Start();
    }
};

测试
main.cc

#include "echo.hpp"

int main()
{
    EchoServer server(8085);
    server.Start();
    return 0;
}

tcp_cli.cpp

#include "../source/server.hpp"

int main()
{
    Socket cli_socket;
    cli_socket.CreateClient(8085, "127.0.0.1");
    for (int i = 0; i < 5; ++i)
    {
        std::string str = "hello world!!!";
        cli_socket.Send(str.c_str(), str.size());
        char buf[4096] = {0};
        cli_socket.Recv(buf, 4095);
        DBG_LOG("%s", buf);
        sleep(1);
    }
    while (1)
    {}
    return 0;
}

EchoServer回显服务器性能测试

EchoServer回显服务器模块关系图

HTTP协议模块的子模块划分

Util工具类设计思想

Util工具类字符串分割函数实现

#include "../server.hpp"

class Util
{
public:
    //字符串分割函数,将src字符串按照sep字符进行分割,得到的各个子串放到arry中,最终返回子串的数量
    static size_t Split(const std::string& src, const std::string& sep, std::vector<std::string>* arry)
    {
        size_t offset = 0;
        while (offset < src.size())
        {
            size_t pos = src.find(sep, offset);
            if (pos == src.npos)
            {
                arry->push_back(src.substr(offset));
                return arry->size();
            }
            if (pos != offset)//如果不是一个空串,就加入结果
                arry->push_back(src.substr(offset, pos - offset));
            offset = pos + sep.size();
        }
    }
    //读取文件内容
    static bool ReadFile();
    //向文件写入数据
    static bool WriteFile();
    //URL编码
    static std::stringUrlEncode();
    //URL解码
    static std::string UrlDecode();
    //响应状态码的描述信息获取
    static std::string StatuDesc();
    //根据文件后缀名获取文件mime
    static std::string ExtMime();
    //判断一个文件是否是一个目录
    static bool IsDirectory();
    //判断一个文件是否是一个普通文件
    static bool IsRegular();
    //http请求的资源路径有效性判断
    static bool ValidPath();
};

测试:

#include "http.hpp"

int main()
{
    std::string str = ",,abc,bcd,efg";
    std::vector<std::string> arry;
    Util::Split(str, ",", &arry);
    for (auto& s : arry)
    {
        std::cout << "[" << s << "]\n"; 
    }
    return 0;
}

Util工具类文件数据读取函数实现

//读取文件内容,将读取的内容放到一个String中,如果放到Buffer中会用到临时空间,比如读1G的文件,就会占用2G的空间
static bool ReadFile(const std::string& filename, std::string* buf)
{
    std::ifstream ifs(filename, std::ios::binary);
    if (ifs.is_open())
    {
        printf("open %s file failed!!", filename.c_str());
        return false;
    }
    size_t fsize = 0;
    ifs.seekg(0, ifs.end);  //将文件偏移移动到文件的末尾
    fsize = ifs.tellg();    //此时偏移的位置就是文件的大小
    ifs.seekg(0, ifs.beg);  //将文件偏移移动到文件起始位置
    buf->resize(fsize);
    ifs.read(&(*buf)[0], fsize);
    if (ifs.good() == false)    //判断上次操作是否出问题
    {
        printf("read %s file failed!!", filename.c_str());
        ifs.close();
        return false;
    }
    ifs.close();
    return true;
}

测试

#include "http.hpp"

int main()
{
    std::string str;
    Util::ReadFile("./http.hpp", &str);
    std::cout << str << std::endl;
    return 0;
}

Util工具类文件数据写入函数实现

//向文件写入数据
static bool WriteFile(const std::string& filename, const std::string& buf)
{
    std::ofstream ofs(filename, std::ios::binary | std::ios::trunc);    //trunc: 截断,不要文件的原有内容,即覆盖写
    if (!ofs.is_open())
    {
        printf("open %s file failed!!", filename.c_str());
        return false;
    }
    ofs.write(buf.c_str(), buf.size());
    if (ofs.good() == false)
    {
        ERR_LOG("write %s file failed!", filename.c_str());
        ofs.close();
        return false;
    }
    ofs.close();
    return true;
}

测试:

#include "http.hpp"

int main()
{
    std::string str;
    Util::ReadFile("./http.hpp", &str);
    Util::WriteFile("./ttttttttt.c", str);
    return 0;
}

Util工具类UrlEncode函数实现

//URL编码
static std::string UrlEncode(const std::string url, bool convert_space_to_plus)
{
    std::string res;
    for (auto& c : url)
    {
        if (c == '.' || c == '-' || c == '_' || c == '~' || isalnum(c))
        {
            res += c;
            continue;
        }
        else if (c == ' ' && convert_space_to_plus == true)
        {
            res += '+';
            continue;
        }
        //剩下的字符都是需要编码成为%HH格式
        char tmp[4];
        snprintf(tmp, 4, "%%%02X", c);//前两个%表示一个%,后面的是%02X表示用十六进制表示
        res += tmp;
    }
    return res;
}

测试

#include "http.hpp"

int main()
{
    std::string str = "/login?user=bite&passwd=C++";
    std::string res = Util::UrlEncode(str, false);
    std::cout << res << std::endl;
    return 0;
}

Util工具类UrlDecode函数实现

static char HEXTOI(char c)
{
     if (c >= '0' && c <= '9')
         return c - '0';
     else if (c >= 'a' && c <= 'z')
         return c - 'a' + 10;
     else if (c >= 'A' && c <= 'Z')
         return c - 'A' + 10;
     return -1;
 }
 //URL解码
 static std::string UrlDecode(const std::string url, bool convert_plus_to_space)
 {
     std::string res;
     for (int i = 0; i < url.size(); ++i)
     {
         if (url[i] == '%' && i + 2 < url.size())
         {
             char v1 = HEXTOI(url[i + 1]);
             char v2 = HEXTOI(url[i + 2]);
             char v = (v1 * 16) + v2;
             res += v;
             i += 2;
             continue;
         }
         if (convert_plus_to_space == true && url[i] == '+')
         {
             res += ' ';
             continue;
         }
         res += url[i];
     }
     return res;
 }

测试:

#include "http.hpp"

int main()
{
    std::string str = "C  ";
    std::string res = Util::UrlEncode(str, true);
    std::string tmp = Util::UrlDecode(res, true);
    std::cout << "[" << res << "]\n";
    std::cout << "[" << tmp << "]\n";
    return 0;
}

Util工具类Mime与Statu

//这些信息在文档里查看
std::unordered_map<int, std::string> _statu_msg = {
    {100,  "Continue"},
    {101,  "Switching Protocol"},
    {102,  "Processing"},
    {103,  "Early Hints"},
    {200,  "OK"},
    {201,  "Created"},
    {202,  "Accepted"},
    {203,  "Non-Authoritative Information"},
    {204,  "No Content"},
    {205,  "Reset Content"},
    {206,  "Partial Content"},
    {207,  "Multi-Status"},
    {208,  "Already Reported"},
    {226,  "IM Used"},
    {300,  "Multiple Choice"},
    {301,  "Moved Permanently"},
    {302,  "Found"},
    {303,  "See Other"},
    {304,  "Not Modified"},
    {305,  "Use Proxy"},
    {306,  "unused"},
    {307,  "Temporary Redirect"},
    {308,  "Permanent Redirect"},
    {400,  "Bad Request"},
    {401,  "Unauthorized"},
    {402,  "Payment Required"},
    {403,  "Forbidden"},
    {404,  "Not Found"},
    {405,  "Method Not Allowed"},
    {406,  "Not Acceptable"},
    {407,  "Proxy Authentication Required"},
    {408,  "Request Timeout"},
    {409,  "Conflict"},
    {410,  "Gone"},
    {411,  "Length Required"},
    {412,  "Precondition Failed"},
    {413,  "Payload Too Large"},
    {414,  "URI Too Long"},
    {415,  "Unsupported Media Type"},
    {416,  "Range Not Satisfiable"},
    {417,  "Expectation Failed"},
    {418,  "I'm a teapot"},
    {421,  "Misdirected Request"},
    {422,  "Unprocessable Entity"},
    {423,  "Locked"},
    {424,  "Failed Dependency"},
    {425,  "Too Early"},
    {426,  "Upgrade Required"},
    {428,  "Precondition Required"},
    {429,  "Too Many Requests"},
    {431,  "Request Header Fields Too Large"},
    {451,  "Unavailable For Legal Reasons"},
    {501,  "Not Implemented"},
    {502,  "Bad Gateway"},
    {503,  "Service Unavailable"},
    {504,  "Gateway Timeout"},
    {505,  "HTTP Version Not Supported"},
    {506,  "Variant Also Negotiates"},
    {507,  "Insufficient Storage"},
    {508,  "Loop Detected"},
    {510,  "Not Extended"},
    {511,  "Network Authentication Required"}
};
//响应状态码的描述信息获取
static std::string StatuDesc(int statu)
{
    auto it = _statu_msg.find(statu);
    if (it != _statu_msg.end())
    {
        return it->second;
    }
    return "Unkown";
}

测试:

#include "http.hpp"

int main()
{
    std::cout << Util::StatuDesc(200) << std::endl;
    std::cout << Util::StatuDesc(302) << std::endl;
    return 0;
}
std::unordered_map<std::string, std::string> _mime_msg = {
    {".aac",        "audio/aac"},
    {".abw",        "application/x-abiword"},
    {".arc",        "application/x-freearc"},
    {".avi",        "video/x-msvideo"},
    {".azw",        "application/vnd.amazon.ebook"},
    {".bin",        "application/octet-stream"},
    {".bmp",        "image/bmp"},
    {".bz",         "application/x-bzip"},
    {".bz2",        "application/x-bzip2"},
    {".csh",        "application/x-csh"},
    {".css",        "text/css"},
    {".csv",        "text/csv"},
    {".doc",        "application/msword"},
    {".docx",       "application/vnd.openxmlformats-officedocument.wordprocessingml.document"},
    {".eot",        "application/vnd.ms-fontobject"},
    {".epub",       "application/epub+zip"},
    {".gif",        "image/gif"},
    {".htm",        "text/html"},
    {".html",       "text/html"},
    {".ico",        "image/vnd.microsoft.icon"},
    {".ics",        "text/calendar"},
    {".jar",        "application/java-archive"},
    {".jpeg",       "image/jpeg"},
    {".jpg",        "image/jpeg"},
    {".js",         "text/javascript"},
    {".json",       "application/json"},
    {".jsonld",     "application/ld+json"},
    {".mid",        "audio/midi"},
    {".midi",       "audio/x-midi"},
    {".mjs",        "text/javascript"},
    {".mp3",        "audio/mpeg"},
    {".mpeg",       "video/mpeg"},
    {".mpkg",       "application/vnd.apple.installer+xml"},
    {".odp",        "application/vnd.oasis.opendocument.presentation"},
    {".ods",        "application/vnd.oasis.opendocument.spreadsheet"},
    {".odt",        "application/vnd.oasis.opendocument.text"},
    {".oga",        "audio/ogg"},
    {".ogv",        "video/ogg"},
    {".ogx",        "application/ogg"},
    {".otf",        "font/otf"},
    {".png",        "image/png"},
    {".pdf",        "application/pdf"},
    {".ppt",        "application/vnd.ms-powerpoint"},
    {".pptx",       "application/vnd.openxmlformats-officedocument.presentationml.presentation"},
    {".rar",        "application/x-rar-compressed"},
    {".rtf",        "application/rtf"},
    {".sh",         "application/x-sh"},
    {".svg",        "image/svg+xml"},
    {".swf",        "application/x-shockwave-flash"},
    {".tar",        "application/x-tar"},
    {".tif",        "image/tiff"},
    {".tiff",       "image/tiff"},
    {".ttf",        "font/ttf"},
    {".txt",        "text/plain"},
    {".vsd",        "application/vnd.visio"},
    {".wav",        "audio/wav"},
    {".weba",       "audio/webm"},
    {".webm",       "video/webm"},
    {".webp",       "image/webp"},
    {".woff",       "font/woff"},
    {".woff2",      "font/woff2"},
    {".xhtml",      "application/xhtml+xml"},
    {".xls",        "application/vnd.ms-excel"},
    {".xlsx",       "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"},
    {".xml",        "application/xml"},
    {".xul",        "application/vnd.mozilla.xul+xml"},
    {".zip",        "application/zip"},
    {".3gp",        "video/3gpp"},
    {".3g2",        "video/3gpp2"},
    {".7z",         "application/x-7z-compressed"}
};
//根据文件后缀名获取文件mime
static std::string ExMime(const std::string& filename)
{
    size_t pos = filename.find_last_of('.');
    if (pos == std::string::npos)
    {
        return "application/occtet-stream";
    }
    //根据扩展名,获取mime
    std::string ext = filename.substr(pos);
    auto it = _mime_msg.find(ext);
    if (it == _mime_msg.end())
    {
        return "application/occtet-stream";
    }
    return it->second;
}

测试

#include "http.hpp"

int main()
{
    std::cout << Util::ExtMime("a.txt") << std::endl;
    std::cout << Util::ExtMime("a.png") << std::endl;
    return 0;
}

Util工具类文件类型判断接口实现

//判断一个文件是否是一个目录
static bool IsDirectory(const std::string& filename)
{
    struct stat st;
    int ret = stat(filename.c_str(), &st);  //获取到文件属性
    if (ret < 0)
    {
        return false;
    }
    return S_ISDIR(st.st_mode);
}
//判断一个文件是否是一个普通文件
static bool IsRegular(const std::string& filename)
{
    struct stat st;
    int ret = stat(filename.c_str(), &st);  //获取到文件属性
    if (ret < 0)
    {
        return false;
    }
    return S_ISREG(st.st_mode);
}

测试

#include "http.hpp"

int main()
{
    std::cout << Util::IsRegular("main.cc") << std::endl;
    std::cout << Util::IsRegular("../Http") << std::endl;
    std::cout << Util::IsDirectory("main.cc") << std::endl;
    std::cout << Util::IsDirectory("../Http") << std::endl;
    return 0;
}

Util工具类路径有效性判断接口实现

//http请求的资源路径有效性判断
static bool ValidPath(const std::string& path)
{
    std::vector<std::string> subdir;
    Util::Split(path, "/", &subdir);
    int level = 0;
    for (auto& dir : subdir)
    {
        if (dir == "..")
        {
            level--;
            if (level < 0)//任意一层走出相对根目录,就认为有问题
            {
                return false;
            }
        }
        level++;
    }
    return true;
}

测试:

#include "http.hpp"

int main()
{
    std::cout << Util::ValidPath("/abc/eee/../ff") << std::endl;
    std::cout << Util::ValidPath("/../Http") << std::endl;
    return 0;
}

HttpRequest模块设计思想

class HttpRequest
{
public:
    std::string _method;    //请求方法
    std::string _path;      //资源路径
    std::string _version;   //协议版本
    std::string _body;      //请求正文
    std::smatch _matches;   //资源路径的正则提取数据
    std::unordered_map<std::string, std::string> _headers;  //头部字段
    std::unordered_map<std::string, std::string> _params;   //查询字符串
public:
    HttpRequest()
    :_version("HTTP/1.1")
    {}
    //重置 -- 每一次上下文里面的内容处理完了我们就要重置一下,因为不重置,就代表这个请求还在处理,这次的信息就会对下次的请求信息造成影响
    void ReSet()
    {
        _method.clear();
        _path.clear();
        _version = "HTTP/1.1";
        _body.clear();
        std::smatch match;  //因为smatch没有clear函数,通过swap也可以达到clear的效果
        _matches.swap(match);
        _headers.clear();
        _params.clear();
    }
    //插入头部字段
    void SetHeader(const std::string& key, const std::string& val)
    {
        _headers.insert({key, val});
    }
    //判断是否存在指定头部字段
    bool HasHeader(const std::string& key)
    {
        auto it = _headers.find(key);
        if (it == _headers.end())
            return false;
        return true;
    }
    //获取指定头部字段
    std::string GetHeader(const std::string& key) const 
    {
        auto it = _headers.find(key);
        if (it == _headers.end())
        {
            return "";
        }
        return it->second;
    }
    //插入查询字符串
    void SetParam(std::string& key, std::string& val)
    {
        _params.insert({key, val});
    }
    //判断是否有某个指定的查询字符串
    bool HasParam(std::string& key)
    {
        auto it = _params.find(key);
        if (it == _params.end())
            return false;
        return true;
    }
    //获取指定的查询字符串
    std::string GetParam(std::string& key)
    {
        auto it = _params.find(key);
        if (it == _params.end())
        {
            return "";
        }
        return it->second;
    }
    //获取正文长度
    size_t ContentLength()
    {
        bool ret = HasHeader("Content-Length");
        if (ret == false)
            return 0;
        return std::stoi(GetHeader("Content-Length"));
    }
    //判断是否是短连接
    bool Close() const
    {
        if (GetHeader("Connection") == "keep-alive")
            return false;
        return true;
    }
};

HttpResponse模块设计思想

class HttpResponse
{
public:
    int _statu;                 //状态码
    bool _redirect_flag;        //是否重定向
    std::string _body;          //响应正文
    std::string _redirect_url;  //重定向路径
    std::unordered_map<std::string, std::string> _headers;  //响应头部字段
public:
    HttpResponse(int statu = 200)
    :_redirect_flag(false), _statu(statu)
    {}
    void ReSet()
    {
        _statu = 200;
        _redirect_flag = false;
        _body.clear();
        _redirect_url.clear();
        _headers.clear();
    }
    //插入头部字段
    void SetHeader(const std::string& key, const std::string& val)
    {
        _headers.insert({key, val});
    }
    //判断是否存在指定头部字段
    bool HasHeader(const std::string& key)
    {
        auto it = _headers.find(key);
        if (it == _headers.end())
            return false;
        return true;
    }
    //获取指定头部字段
    std::string GetHeader(const std::string& key)
    {
        auto it = _headers.find(key);
        if (it == _headers.end())
        {
            return "";
        }
        return it->second;
    }
    void SetContent(const std::string& body, const std::string& type = "text/html")
    {
        _body = body;
        SetHeader("Content-Type", type);
    }
    void SetRedirect(const std::string& url, int statu = 302)
    {
        _statu = statu;
        _redirect_flag =  true;
        _redirect_url = url;
    }
    //判断是否是短连接
    bool Close()
    {
        if (GetHeader("Connection") == "keep-alive")
            return false;
        return true;
    }
};

HttpContext模块设计思想

typedef enum
{
    RECV_HTTP_ERROR,    //出错
    RECV_HTTP_LINE,     //请求行
    RECV_HTTP_HEAD,     //请求头部
    RECV_HTTP_BODY,     //请求正文
    RECV_HTTP_OVER      //结束阶段
}HttpRecvStatu;

#define MAX_LINE 8192
class HttpContext
{
public:
    int _resp_statu;            //响应状态码
    HttpRecvStatu _recv_statu;  //当前接收机解析的阶段状态
    HttpRequest _request;       //已经解析得到的请求信息
private:
    bool ParseHttpLine(const std::string& line)
    {
        std::smatch matches;
        std::regex e("(GET|HEAD|POST|PUT|DELETE) ([^?]*)(?:\\?(.*))? (HTTP/1\\.[01])(?:\n|\r\n)?", std::regex::icase);//icase忽略大小写
        bool ret = std::regex_match(line, matches, e);
        if (ret == false)
        {
            _recv_statu = RECV_HTTP_ERROR;
            _resp_statu = 400;  //BAD REQUEST
            return false;
        }
        // 0: GET /biejiuyeke/login?user=xiaoming&passwd=123123 HTTP/1.1 size:60
        // 1: GET size:3
        // 2: /biejiuyeke/login size:17
        // 3: user=xiaoming&passwd=123123 size:27
        // 4: HTTP/1.1 size:8
        //第0个是url本身,第一个是请求方法,第二个是资源路径,第三个是查询字符串,第四个是协议版本

        //请求方法的获取
        _request._method = matches[1];
        std::transform(_request._method.begin(), _request._method.end(), _request._method.begin(), ::toupper);//字母都转为大写
        //查询路径可能也是经过url编码的,资源路径的获取,需要进行URL解码操作,但是不需要+转空格
        _request._path = Util::UrlDecode(matches[2], false);
        //协议版本获取
        _request._version = matches[4];
        //查询字符串的获取与处理
        std::vector<std::string> query_string_arry;
        std::string query_string = matches[3];
        //查询字符串的格式key=val&key=val&key=val...,以&符号进行分割,得到各个子串
        Util::Split(query_string, "&", &query_string_arry);
        //针对各个子串,以=符号进行分割,得到key和val,得到之后也需要进行URL解码
        for (auto& str : query_string_arry)
        {
            size_t pos = str.find("="); //查询字符串都是一种key=value的字符串
            if (pos == std::string::npos)
            {
                _recv_statu = RECV_HTTP_ERROR;
                _resp_statu = 400;  //BAD REQUEST
                return false;
            }
            std::string key = Util::UrlDecode(str.substr(0, pos), true);
            std::string val = Util::UrlDecode(str.substr(pos + 1), true);
            _request.SetParam(key, val);
        }
        return true;
    }
    //接收请求行
    bool RecvHttpLine(Buffer* buf)
    {
        if (_recv_statu != RECV_HTTP_LINE)
            return false;
        //1。获取一行数据,带有末尾的换行
        std::string line = buf->GetLineAndPop();    //这就是为什么之前设计的时候读取带有末尾的换行
        //2。需要考虑的一些要素:缓冲区中的数据不足一行或者获取一行的数据超大
        if (line.size() == 0)
        {
            //说明缓冲区中数据不足一行
            if (buf->ReadAbleSize() > MAX_LINE) //判断缓冲区中的可读数据长度,如果很长了都不足一行,说明有问题
            {
                _recv_statu = RECV_HTTP_ERROR;
                _resp_statu = 414;  //URI TOO LONG
                return false;
            }
            //缓冲区中的数据不足一行,长度也合理,就等下一次的新数据到来再处理
            return true;
        }
        if (line.size() > MAX_LINE)
        {
            _recv_statu = RECV_HTTP_ERROR;
            _resp_statu = 414;  //URI TOO LONG
            return false;
        }
        //走到这里说明得到了完整的请求行,就可以进行解析了
        bool ret = ParseHttpLine(line);
        if (ret == false)
            return false;
        //首行处理完毕,进入头部获取阶段
        _recv_statu = RECV_HTTP_HEAD;
        return true;
    }
    bool ParseHttpHead(std::string& line)
    {
        if (line.back() == '\n')
            line.pop_back();
        if (line.back() == '\r')
            line.pop_back();
        size_t pos = line.find(": ");
        if (pos == line.npos)
        {
            _recv_statu = RECV_HTTP_ERROR;
            _resp_statu = 400;
            return false;
        }
        std::string key = line.substr(0, pos);
        std::string val = line.substr(pos + 2);
        _request.SetHeader(key, val);
        return true;
    }
    //接收请求头部字段
    bool RecvHttpHead(Buffer* buf)
    {
        if (_recv_statu != RECV_HTTP_HEAD)
            return false;
        while (1)
        {
            //一行一行取出数据,知道遇到空行为止,头部格式 key: val\r\nkey: val\r\n...
            //1。获取一行数据
            std::string line = buf->GetLineAndPop();
            //2。需要考虑的一些要素:缓冲区中的数据不足一行,获取的一行数据超大
            if (line.size() == 0)
            {
                //缓冲区中的数据不足一行,则需要判断缓冲区的可读数据长度,如果很长都不足一行,这是有问题的
                if (buf->ReadAbleSize() > MAX_LINE)
                {
                    _recv_statu = RECV_HTTP_ERROR;
                    _resp_statu = 414;
                    return false;
                }
                //缓冲区中数据不足一行,符合长度预期,就等待新数据的到来
                return true;
            }
            if (line.size() > MAX_LINE)
            {
                _recv_statu = RECV_HTTP_ERROR;
                _resp_statu = 414;  //URI TOO LONG
                return false;
            }
            if (line == "\n" || line == "\r\n") //说明读到了空行
                break;
            bool ret = ParseHttpHead(line);
            if (ret == false)
                return false;
        }
        //头部处理完毕,进入正文获取阶段
        _recv_statu = RECV_HTTP_BODY;
        return true;
    }
    //接收请求正文
    bool RecvHttpBody(Buffer* buf)
    {
        if (_recv_statu != RECV_HTTP_BODY)
            return false;
        //1。获取正文长度
        size_t content_length = _request.ContentLength();
        if (content_length == 0)
        {
            //没有正文,则请求接收解析完毕
            _recv_statu = RECV_HTTP_OVER;
            return true;
        }
        //2。当前已经接收了多少正文,其实就是往_request._body中放了多少数据
        size_t real_len = content_length - _request._body.size();   //实际还要接收的正文长度
        //3。接收正文放到body中,但是也要考虑当前缓冲区中的数据,是否是全部的正文
        //3.1 缓冲区中数据,包含了当前请求的所有正文,则取出所需数据。缓冲区中也有可能包含下一个协议的内容
        if (buf->ReadAbleSize() >= real_len)
        {
            _request._body.append(buf->ReadPosition(), real_len);
            buf->MoveReadOffset(real_len);
            _recv_statu = RECV_HTTP_OVER;
            return true;
        }
        //3.2 缓冲区中数据,无法满足当前正文的需要,数据不足,取出数据,然后等待新数据到来
        _request._body.append(buf->ReadPosition(), buf->ReadAbleSize());
        buf->MoveReadOffset(buf->ReadAbleSize());
        return true;
    }
public:
    HttpContext()
    :_resp_statu(200), _recv_statu(RECV_HTTP_LINE)
    {}
    void ReSet()
    {
        _resp_statu = 200;
        _recv_statu = RECV_HTTP_LINE;
        _request.ReSet();
    }
    //获取相应状态码
    int RespStatu()
    {
        return _resp_statu;
    }
    //获取接收状态
    HttpRecvStatu RecvStatu()
    {
        return _recv_statu;
    }
    //获取已经得到的请求信息
    HttpRequest& Request()
    {
        return _request;
    }
    //接受并解析HTTP请求
    void RecvHttpRequest(Buffer* buf)
    {
        //不同的状态做不同的事情,但是这里不要break,因为处理完请求后,应该立即处理头部,而不是退出等新数据
        switch(_recv_statu)
        {
        case RECV_HTTP_LINE:
            RecvHttpLine(buf);
        case RECV_HTTP_HEAD:
            RecvHttpHead(buf);
        case RECV_HTTP_BODY:
            RecvHttpBody(buf);
        }
    }
};

HttpServer模块设计思想

#define DEFAULT_TIMEOUT 10

class HttpServer
{
private:
    using Handler = std::function<void(const HttpRequest&, HttpResponse*)>;
    using Handlers = std::vector<std::pair<std::regex, Handler>>;
    //有四个功能性请求的路由表
    Handlers _get_route;
    Handlers _post_route;
    Handlers _put_route;
    Handlers _delete_route;
    std::string _basedir;   //静态资源根目录
    TcpServer _server;
private:
    void ErrorHandler(const HttpRequest& req, HttpResponse* rsp)
    {
        //1。组织一个错误展示页面
        std::string body;
        body += "<html>";
        body += "<head>";
        body += "<meta http-equiv='Content-Type' content='text/html;charset=utf-8'>";
        body += "</head>";
        body += "<body>";
        body += "<h1>";
        body += std::to_string(rsp->_statu);
        body += " ";
        body += Util::StatuDesc(rsp->_statu);
        body += "</h1>";
        body += "</body>";
        body += "</html>";
        //2.将页面数据,当做响应正文,放入rsp中
        rsp->SetContent(body, "text/html");
    }
    //将HttpResponse中的要素按照http协议格式进行组织,发送
    void WriteResponse(const PtrConnection& conn, const HttpRequest& req, HttpResponse& rsp)
    {
        //1.先完善头部字段 -- 这几个是几乎必要的头部字段
        if(req.Close() == true)
            rsp.SetHeader("Connection", "close");
        else 
            rsp.SetHeader("Connection", "keep-alive");
        if (rsp._body.empty() == false && rsp.HasHeader("Content-Length") == false)//如果没有设置长度,则要设置长度
            rsp.SetHeader("Content-Length", std::to_string(rsp._body.size()));
        if (rsp._body.empty() == false && rsp.HasHeader("Content-Type") == false)//如果没有设置Content-Type,则要设置
            rsp.SetHeader("Content-Type", "application/octet-stream");
        if (rsp._redirect_flag == true)
            rsp.SetHeader("Location", rsp._redirect_url);
        //2。将rsp中的要素,按照http协议格式进行组织
        std::stringstream rsp_str;  //技巧:快速的拼接字符串
        rsp_str << req._version + " " << std::to_string(rsp._statu) << " " << Util::StatuDesc(rsp._statu) << "\r\n";
        for (auto& head : rsp._headers)
        {
            rsp_str << head.first << ":" << head.second << "\r\n";
        }
        rsp_str << "\r\n";
        rsp_str << rsp._body;
        //3。发送数据
        conn->Send(rsp_str.str().c_str(), rsp_str.str().size());
    }
    //是否是一个静态资源请求
    bool IsFileHandler(const HttpRequest& req)
    {
        //1.必须设置了静态资源根目录
        if (_basedir.empty())
            return false;
        //2.请求方法必须是GET/HEAD方法 -- 只有这两个才是获取实体资源的请求 POST/PUT/DELETE通常更多是功能性的请求
        if (req._method != "GET" && req._method != "HEAD")
            return false;
        //3.请求的资源路径必须是一个合法的路径
        if (Util::ValidPath(req._path) == false)
            return false;
        //4.请求的资源必须存在,且是一个普通文件
        std::string req_path = _basedir + req._path;
        if (req._path.back() == '/')
            req_path += "index.html";
        if (Util::IsRegular(req_path) == false)
            return false;
        return true;
    }
    //静态资源的请求处理
    void FileHandler(const HttpRequest& req, HttpResponse* rsp)
    {
        std::string req_path = _basedir + req._path;
        if (req._path.back() == '/')
        {
            req_path += "index.html";
        }
        bool ret = Util::ReadFile(req_path, &(rsp->_body));
        if (ret == false)
            return;
        std::string mime = Util::ExMime(req_path);  //通过扩展名来获取它的mime
        rsp->SetHeader("Content-Type", mime);
        return;
    }
    //功能性请求的分类处理
    void Dispatcher(HttpRequest& req, HttpResponse* rsp, Handlers& handlers)
    {
        //在对应请求方法的路由表中,查找对应是否含有对应资源请求的处理函数,有则调用,没有则返回404
        //思想:路由表存储的是键值对<正则表达式, 处理函数>
        //使用正则表达式,对请求的资源路径进行正则匹配,匹配成功就使用对应函数进行处理
        //为什么用正则表达式,而不用字符串?因为查询路径,可能有很多种情况,我们不能全写进路由表
        for (auto& handler : handlers)//将每一个正则表达式拿出来进行与查询路径进行匹配
        {
            const std::regex& re = handler.first;
            const Handler& functor = handler.second;
            bool ret = std::regex_match(req._path, req._matches, re);
            if (ret == false)
                continue;
            return functor(req, rsp);   //传入请求信息和空的rsp,执行处理函数
        }
        rsp->_statu = 404;
    }
    void Route(HttpRequest& req, HttpResponse* rsp)
    {
        //对请求进行分析,是一个静态资源请求还是一个功能性请求
        //  静态资源请求,则进行静态资源的处理
        //  功能性请求,则需要通过几个请求路由表来确定是否有处理函数
        //  既不是静态资源请求,也没有设置对应的功能请求处理函数,就返回405
        if (IsFileHandler(req) == true)
        {
            //是一个静态资源请求,则进行静态资源请求的处理
            return FileHandler(req, rsp);
        }
        //走到这里,则说明是功能性请求
        if (req._method == "GET" || req._method == "HEAD")
            return Dispatcher(req, rsp, _get_route);
        else if (req._method == "POST")
            return Dispatcher(req, rsp, _post_route);
        else if (req._method == "PUT")
            return Dispatcher(req, rsp, _put_route);
        else if (req._method == "DELETE")
            return Dispatcher(req, rsp, _delete_route);
        rsp->_statu = 405;  //Method Not Allowed
    }
    //设置上下文
    void OnConnected(const PtrConnection& conn)
    {
        conn->SetContext(HttpContext());
        DBG_LOG("NEW CONNECTION %p", conn.get());
    }
    //缓冲区数据解析 + 处理
    void OnMessage(const PtrConnection& conn, Buffer* buffer)
    {
        while (buffer->ReadAbleSize() > 0)
        {
            //1。获取上下文
            HttpContext* context = conn->GetContext()->get<HttpContext>();
            //2。通过上下文对缓冲区数据进行解析,得到HttpRequest对象
            //a。如果缓冲区的数据解析出错,就直接回复出错响应
            //b。如果解析正常,且请求已经获取完毕,才开始进行处理
            context->RecvHttpRequest(buffer);
            HttpRequest& req = context->Request();
            HttpResponse rsp(context->RespStatu());
            if (context->RespStatu() >= 400)    //代表数据解析出错
            {
                //进行错误响应,关闭连接
                ErrorHandler(req, &rsp);        //填充一个错误显示页面数据到rsp中
                WriteResponse(conn, req, rsp);   //组织响应发送给客户端
                context->ReSet();
                buffer->MoveReadOffset(buffer->ReadAbleSize()); //出错了就把缓冲区数据清空
                conn->Shutdown();   //关闭连接
                return;
            }
            if (context->_recv_statu != RECV_HTTP_OVER)//代表这不是一个完整的请求
                return;
            //3。路由请求 + 业务处理
            Route(req, &rsp);
            //4。对HttpResponse进行组织发送
            WriteResponse(conn, req, rsp);
            //5。重置上下文
            context->ReSet();
            //6。根据长短连接判断是否关闭连接或者继续处理 -- 短连接:接收处理一次服务器就把连接关闭     长连接:一直与你通信
            if (rsp.Close() == true)
                conn->Shutdown();
        }
    }
public:
    HttpServer(int port, int timeout = DEFAULT_TIMEOUT)
    :_server(port)
    {
        _server.EnableInactiveRelease(timeout);
        _server.SetConnectedCallback(std::bind(&HttpServer::OnConnected, this, std::placeholders::_1));
        _server.SetMessageCallback(std::bind(&HttpServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2));
    }
    void SetBaseDir(const std::string& path)
    {
        assert(Util::IsDirectory(path) == true);
        _basedir = path;
    }
    //设置/添加,请求与处理函数的映射关系
    void Get(const std::string& pattern, const Handler& handler)
    {
        _get_route.push_back(std::make_pair(std::regex(pattern), handler));
    }
    void Post(const std::string& pattern, const Handler& handler)
    {
        _post_route.push_back(std::make_pair(std::regex(pattern), handler));
    }
    void Put(const std::string& pattern, const Handler& handler)
    {
        _put_route.push_back(std::make_pair(std::regex(pattern), handler));
    }
    void Delete(const std::string& pattern, const Handler& handler)
    {
        _delete_route.push_back(std::make_pair(std::regex(pattern), handler));
    }
    void SetThreadCount(int count)
    {
        _server.SetThreadCount(count);
    }
    void Listen()
    {
        _server.Start();
    }
};

基于HttpServer搭建HTTP服务器

index.html文件

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Document</title>
</head>
<body>
    <form action = "/login" method="post">
        <input type="text" name="username"><br>
        <input type="password" name="password" placeholder="请输入密码"><br>
        <input type="submit" value = "提交" name="submit">
</body>
</html>

main.cc文件

#include "http.hpp"

#define WWWROOT "./wwwroot/"

//业务:进行回显
std::string RequestStr(const HttpRequest& req)
{
    std::stringstream ss;
    ss << req._method << " " << req._path << " " << req._version << "\r\n";
    for (auto& it : req._params)
    {
        ss << it.first << ": " << it.second << "\r\n";
    }
    for (auto& it : req._headers)
    {
        ss << it.first << ": " << it.second << "\r\n";
    }
    ss << "\r\n";
    ss << req._body;
    return ss.str();
}
void Hello(const HttpRequest& req, HttpResponse* rsp)
{
    rsp->SetContent(RequestStr(req), "text/plain");
}
void Login(const HttpRequest& req, HttpResponse* rsp)
{
    rsp->SetContent(RequestStr(req), "text/plain");
}
void PutFile(const HttpRequest& req, HttpResponse* rsp)
{
    std::string pathname = WWWROOT + req._path;
    Util::WriteFile(pathname, req._body);
}
void DelFile(const HttpRequest& req, HttpResponse* rsp)
{
    rsp->SetContent(RequestStr(req), "text/plain");
}

int main()
{
    HttpServer server(8085);
    server.SetThreadCount(5);
    server.SetBaseDir(WWWROOT);
    server.Get("/hello", Hello);
    server.Post("/login", Login);
    server.Put("/1234.txt", PutFile);
    server.Delete("/1234.txt", DelFile);
    server.Listen();
    return 0;
}

HTTP服务器长连接测试

#include "../source/server.hpp"

int main()
{
    Socket cli_sock;
    cli_sock.CreateClient(8085, "127.0.0.1");
    std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";
    while (1)
    {
        assert(cli_sock.Send(req.c_str(), req.size()) != -1);
        char buf[1024] = {0};
        assert(cli_sock.Recv(buf, 1023));
        DBG_LOG("[%s]", buf);
        sleep(3);
    }
    cli_sock.Close();
    return 0;
}

HTTP服务器超时连接测试

#include "../source/server.hpp"


int main()
{
    Socket cli_sock;
    cli_sock.CreateClient(8085, "127.0.0.1");
    std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";
    while (1)
    {
        assert(cli_sock.Send(req.c_str(), req.size()) != -1);
        char buf[1024] = {0};
        assert(cli_sock.Recv(buf, 1023));
        DBG_LOG("[%s]", buf);
        sleep(15);
    }
    cli_sock.Close();
    return 0;
}

HTTP服务器错误请求测试

#include "../source/server.hpp"


int main()
{
    Socket cli_sock;
    cli_sock.CreateClient(8085, "127.0.0.1");
    std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 100\r\n\r\nbitejiuyeke";
    while (1)
    {
        assert(cli_sock.Send(req.c_str(), req.size()) != -1);
        assert(cli_sock.Send(req.c_str(), req.size()) != -1);
        assert(cli_sock.Send(req.c_str(), req.size()) != -1);
        assert(cli_sock.Send(req.c_str(), req.size()) != -1);
        assert(cli_sock.Send(req.c_str(), req.size()) != -1);
        char buf[1024] = {0};
        assert(cli_sock.Recv(buf, 1023));
        DBG_LOG("[%s]", buf);
        sleep(3);
    }
    cli_sock.Close();
    return 0;
}

HTTP服务器业务处理超时测试

更新我们Connection模块中的释放接口

class Connection : public std::enable_shared_from_this<Connection>
{
private:
    uint64_t _conn_id;  //连接的唯一ID,也是定时任务唯一ID,便于连接的管理和查找
    int _sockfd;        //连接关联的文件描述符
    bool _enable_inactive_release;  //连接是否启动非活跃销毁的判断标志,默认为false
    EventLoop* _loop;   //连接所关联的一个EventLoop
    ConnStatu _statu;   //连接状态
    Socket _socket;     //套接字操作管理
    Channel _channel;   //连接的事件管理
    Buffer _in_buffer;  //输入缓冲区 -- 存放从socket中读取到的数据
    Buffer _out_buffer; //输出缓冲区 -- 存放要发送给对端的数据
    Any _context;       //请求的接收处理上下文

    //以下这四个回调函数吗,是让Server模块来设置的(服务器模块的处理回调是组件使用者设置的)
    using ConnectedCallback = std::function<void(const PtrConnection&)>;
    using MessageCallback = std::function<void(const PtrConnection&, Buffer*)>;
    using ClosedCallback = std::function<void(const PtrConnection&)>;
    using AnyEventCallback = std::function<void(const PtrConnection&)>;
    ConnectedCallback _connected_callback;
    MessageCallback _message_callback;
    ClosedCallback _closed_callback;
    AnyEventCallback _event_callback;
    //组件内的连接关闭回调 -- 组件内设置的,因为服务器组件内会把所有的连接管理起来。
    //一旦某个连接要关闭,就应该从管理的地方移除掉自己的信息。
    ClosedCallback _server_closed_callback;
private:
    //五个channel的事件回调函数
    //描述符可读事件触发后调用的函数,接收socket数据放到接收缓冲区中,然后调用_message_callback
    void HandleRead()
    {
        char buf[65536];
        ssize_t ret = _socket.NonBlockRecv(buf, 65535);
        if (ret < 0)
        {
            //读出错了,可能是客户端关闭,不能直接关闭连接,因为可能有数据没发送或者有数据还没处理
            return ShutdownInLoop();
        }
        else if (ret == 0)
        {
            //表示没有读取到数据,并不是连接断开,因为我们调用的是自己封装的NonBlockRecv
            return;
        }
        _in_buffer.WriteAndPush(buf, ret);
        //2.调用message_callback进行业务处理
        if (_in_buffer.ReadAbleSize() > 0)
        {
            //shared_from_this -- 从当前对象自身获取自身的shared_ptr管理对象
            _message_callback(shared_from_this(), &_in_buffer);
        }
    }
    //描述符可写事件触发后调用的函数,将发送缓冲区中的数据进行发送
    void HandleWrite()
    {
        ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPosition(), _out_buffer.ReadAbleSize());
        if (ret < 0)
        {
            if (_in_buffer.ReadAbleSize() > 0)
            {
                _message_callback(shared_from_this(), &_in_buffer);
            }
            return Release(); //实际的关闭释放操作
        }
        _out_buffer.MoveReadOffset(ret);    //千万不要忘了,将读偏移向后移动
        if (_out_buffer.ReadAbleSize() == 0)
        {
            _channel.DisableWrite();    //如果数据发送完了,就关闭写事件监控
            //如果当前是连接待关闭状态,并且数据发送完毕,则可以将连接直接释放
            if (_statu == DISCONNECTING)
            {
                return Release();
            }
        }
        //发送数据可能发不完,不关闭写事件监控
        return;
    }
    //描述符触发挂断事件
    void HandleClose()
    {
        if (_in_buffer.ReadAbleSize() > 0)
        {
            _message_callback(shared_from_this(), &_in_buffer);
        }
        return Release();
    }
    //描述符触发出错事件
    void HandleError()
    {
        return HandleClose();
    }
    //描述符触发任意事件
    void HandleEvent()
    {
        //刷新连接活跃度
        if (_enable_inactive_release == true)
        {
            _loop->TimerRefresh(_conn_id);
        }
        //调用组件使用者的任意事件回调
        if (_event_callback)
            _event_callback(shared_from_this());
    }
    //连接获取之后,所处的状态下要进行各种设置
    void EstablishedInLoop()
    {
        //修改连接状态
        assert(_statu == CONNECTING);
        _statu = CONNECTED;
        _channel.EnableRead();
        if (_connected_callback)
            _connected_callback(shared_from_this());
    }
    //这个接口才是实际的释放接口
    void ReleaseInLoop()
    {
        //修改连接状态,将其置为DISCONNECTED
        _statu = DISCONNECTED;
        //移除连接的事件监控
        _channel.Remove();
        //关闭描述符
        _socket.Close();
        //如果当前定时器队列中还有定时任务,则取消任务
        if (_loop->HasTimer(_conn_id))
            CancelInactiveRelease();
        //调用关闭回调函数,避免先移除服务器管理的连接信息导致Connection被释放,因此先调用户的回调函数
        if (_closed_callback)
            _closed_callback(shared_from_this());
        if (_server_closed_callback)
            _server_closed_callback(shared_from_this());
    }
    //这个接口并不是实际的发送接口,而只是把数据放到了发送缓冲区,启动了可写事件监控
    //为什么要这么做?因为可写条件可能不就绪,即内核缓冲区的数据满了,写不进去了
    void SendInLoop(Buffer buf)
    {
        if (_statu == DISCONNECTED) //如果状态已经关闭,则直接return,已经关闭则代表发送缓冲区数据为0
            return;
        _out_buffer.WriteBufferAndPush(buf);
        if (_channel.WriteAble() == false)
            _channel.EnableWrite();
    }
    //这个关闭操作并非实际的连接释放操作,需要判断还有没有数据待处理,待发送
    void ShutdownInLoop()
    {
        _statu = DISCONNECTING; //设置为半关闭状态
        if (_in_buffer.ReadAbleSize() > 0)
            _message_callback(shared_from_this(), &_in_buffer);
        if (_out_buffer.ReadAbleSize() > 0)
            _channel.EnableWrite();
        //因为可能发送缓冲区将数据发送不完,所以写关心就不用关闭了,也不用真正释放了
        if (_out_buffer.ReadAbleSize() == 0)
        {
            _channel.DisableWrite();
            Release();
        }
    }
    //启动非活跃连接超时释放规则
    void EnableInactiveReleaseInLoop(int sec)
    {
        //将判断标志 _enable_inactive_erlease置为true
        _enable_inactive_release = true;
        //如果当前定时销毁任务已经存在,那就刷新延迟一下即可
        if (_loop->HasTimer(_conn_id))
            return _loop->TimerRefresh(_conn_id);
        //如果不存在定时销毁任务,则新增
        _loop->TimerAdd(_conn_id, sec, std::bind(&Connection::Release, this));
    }
    //取消非活跃连接超时释放规则
    void CancelInactiveReleaseInLoop()
    {
        _enable_inactive_release = false;
        if (_loop->HasTimer(_conn_id))
            _loop->TimerCancel(_conn_id);
    }
    //切换/升级协议
    void UpgradeInLoop(const Any& context, const ConnectedCallback& conn, const MessageCallback& msg, const ClosedCallback& closed, const AnyEventCallback& event)
    {
        _context = context; //改变上下文
        _connected_callback = conn;
        _message_callback = msg;
        _closed_callback = closed;
        _event_callback = event;
    }
public:
    Connection(EventLoop* loop, uint64_t conn_id, int sockfd)
    :_conn_id(conn_id)
    ,_sockfd(sockfd)
    ,_enable_inactive_release(false)
    ,_loop(loop)
    ,_statu(CONNECTING)
    ,_socket(sockfd)
    ,_channel(loop, _sockfd)
    {
        _channel.SetReadCallback(std::bind(&Connection::HandleRead, this));
        _channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this));
        _channel.SetCloseCallback(std::bind(&Connection::HandleClose, this));
        _channel.SetErrorCallback(std::bind(&Connection::HandleError, this));
        _channel.SetEventCallback(std::bind(&Connection::HandleEvent, this));
    }
    ~Connection()
    {
        DBG_LOG("RELEASE CONNEDCTION: %p", this);
    }
    //获取管理的文件描述符
    int Fd()
    {
        return _sockfd;
    }
    //获取连接ID
    int Id()
    {
        return _conn_id;
    }
    //是否处于CONNECTED状态
    bool Connected()
    {
        return (_statu == CONNECTED);
    }
    //设置上下文 -- 连接建立完成时进行调用
    void SetContext(const Any& context)
    {
        _context = context;
    }
    //获取上下文,返回的是指针
    Any* GetContext()
    {
        return &_context;
    }
    void SetConnectedCallback(const ConnectedCallback& cb)
    {
        _connected_callback = cb;
    }
    void SetMessageCallback(const MessageCallback& cb)
    {
        _message_callback = cb;
    }
    void SetClosedCallback(const ClosedCallback& cb)
    {
        _closed_callback = cb;
    }
    void SetSvrClosedCallback(const ClosedCallback& cb)
    {
        _server_closed_callback = cb;
    }
    void SetAnyEventCallback(const AnyEventCallback& cb)
    {
        _event_callback = cb;
    }
    //连接建立就绪后,进行channel回调设置,启动读监控,调用_connected_callback
    void Establised()
    {
        _loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this));
    }
    //发送数据,将数据放到发送缓冲区,启动写事件监控
    void Send(const char* data, size_t len)
    {
        Buffer buf; //为什么要重新创建一个临时变量?因为data可能是一个可能被释放的空间,我们将其压入任务队列等待被执行的过程中,空间可能被释放了
        buf.WriteAndPush(data, len);
        _loop->RunInLoop(std::bind(&Connection::SendInLoop, this, std::move(buf)));
    }
    //提供给组件使用者的关闭接口 -- 并不实际关闭,需要判断有没有数据待处理
    void Shutdown()
    {
        _loop->RunInLoop(std::bind(&Connection::ShutdownInLoop, this));
    }
    void Release()
    {
        _loop->QueueInLoop(std::bind(&Connection::ReleaseInLoop, this));
    }
    //启动非活跃销毁,并定义多长时间无通信就是非活跃,添加定时任务
    void EnableInactiveRelease(int sec)
    {
        _loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop, this, sec));
    }
    //取消非活跃销毁
    void CancelInactiveRelease()
    {
        _loop->RunInLoop(std::bind(&Connection::CancelInactiveReleaseInLoop, this));
    }
    //切换协议 -- 重置上下文以及阶段性处理函数
    void Upgrade(const Any& context, const ConnectedCallback& conn, const MessageCallback& msg, const ClosedCallback& closed, const AnyEventCallback& event)
    {
        _loop->RunInLoop(std::bind(&Connection::UpgradeInLoop, this, context, conn, msg, closed, event));
    }
};

更新TImerWheel模块中的ReadTimefd函数和OnTime函数

int ReadTimefd()
{
    uint64_t times;
    int ret = read(_timerfd, &times, 8);
    if (ret < 0)
    {
        ERR_LOG("READ TIMEFD FAILED");
        abort();
    }
    return times;
}
void OnTime()
{
    int times = ReadTimefd();
    for (int i = 0; i < times; ++i)
        RunTimerTask();
}

将业务处理时间延长至15s

void Hello(const HttpRequest& req, HttpResponse* rsp)
{
    rsp->SetContent(RequestStr(req), "text/plain");
    sleep(15);
}

HTTP服务器同时多条请求测试

测试:

#include "../source/server.hpp"


int main()
{
    signal(SIGCHLD, SIG_IGN);
    for (int i = 0; i < 10; i++)
    {
        pid_t pid = fork();
        if (pid < 0)
        {
            DBG_LOG("FORK ERROR");
            return -1;
        }
        else if (pid == 0)
        {
            Socket cli_sock;
            cli_sock.CreateClient(8085, "127.0.0.1");
            std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";
            
            assert(cli_sock.Send(req.c_str(), req.size()) != -1);
            char buf[1024] = {0};
            assert(cli_sock.Recv(buf, 1023));
            DBG_LOG("[%s]", buf);
            
            while (1) sleep(1);
        }
    }
    
    return 0;
}

HTTP服务器大文件传输测试

#include "../source/server.hpp"
#include "../source/Http/http.hpp"


int main()
{
    Socket cli_sock;
    cli_sock.CreateClient(8085, "60.205.245.92");
    std::string req = "PUT /1234.txt HTTP/1.1\r\nConnection: keep-alive\r\n";
    std::string body;
    Util::ReadFile("./hello.txt", &body);
    req += "Content-Length: " + std::to_string(body.size()) + "\r\n\r\n";
    assert(cli_sock.Send(req.c_str(), req.size()) != -1);
    assert(cli_sock.Send(body.c_str(), body.size()) != -1);
    char buf[1024] = {0};
    assert(cli_sock.Recv(buf, 1023));
    DBG_LOG("[%s]", buf);
    sleep(3);
    cli_sock.Close();
    return 0;
}

HTTP服务器性能压力测试说明

举报

相关推荐

0 条评论