0
点赞
收藏
分享

微信扫一扫

libevent(十五)ThreadPool线程池(一)


文章目录

  • ​​一个简单的示例:​​
  • ​​主函数​​
  • ​​线程池​​
  • ​​线程​​
  • ​​接口类(纯虚函数)​​
  • ​​任务类​​
  • ​​移植到linux下​​


libevent(十五)ThreadPool线程池(一)_网络

一个简单的示例:

在头文件中,尽量少以用其它类的头文件,尽量使用​​class 类名​

  • ​test_thread_pool.cpp​​ 为主函数,初始化了线程池,并绑定ip监听端口:127.0.0.1:5001
  • ​XThreadPool.cpp​​​ 线程池(Init,Dispatch)
    ​XThreadPool.h​
  • ​XTask.h​​ 生成任务(接口类)
  • ​XFtpServerCMD.cpp​​​ 继承XTask类(为具体任务事件)
    ​XFtpServerCMD.h​
  • ​XThread.cpp​​​ 线程函数(Setup, Start, Main, Activate, AddTask, Notify)
    ​XThread.h​
主函数

test_thread_pool.cpp

#include <iostream>
#include <event2/event.h>
#include <string.h>
#include <event2/listener.h>
#ifndef _WIN32
#include <signal.h> //使用 man signal 查找linux平台的头文件
#endif
#include "XThreadPool.h"
#include "XFtpServerCMD.h"
#define SPORT 5001


void listen_cb(struct evconnlistener* e, evutil_socket_t s, struct sockaddr* a, int socklen, void* arg) {
std::cout << "listen_cb" << std::endl;
XTask* task = new XFtpServerCMD();
task->sock = s;
XThreadPool::Get()->Dispatch(task);
}

int main(int argc, char** argv) {
#if _WIN32
//windowns 初始化socket库
WSADATA wsa;
WSAStartup(MAKEWORD(2, 2), &wsa);
#else
//linux 忽略管道信号,发送数据给已关闭的socket
if (signal(SIGPIPE, SIG_IGN) == SIG_ERR)
return 1;
#endif

//初始化线程池
XThreadPool::Get()->Init(10);
event_base* base = event_base_new();
if (base) {
std::cout << "event_base_new init successfuly!" << std::endl;
}

sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(SPORT);

//监听端口(socket,bind,listen,绑定事件)
evconnlistener* ev = evconnlistener_new_bind(
base, // base上下文
listen_cb, // 接受连接的回调函数
base, // 回调函数获取的参数
LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, // 地址重用,evconnlistener关闭同时关闭socket
10, // 连接队列大小,对应listen函数
(sockaddr*)&sin, // b绑定地址与端口
sizeof(sin)
);

// 事件分发处理
if (base) {
event_base_dispatch(base);
}
if (ev) {
evconnlistener_free(ev);
}
if (base) {
event_base_free(base);
}
#ifdef _WIN32
WSACleanup();
#endif // _WIN32
return 0;
}

线程池

XThreadPool.h

#pragma once
#include <vector>


class XTask;
class XThread;
class XThreadPool
{
public:
static XThreadPool* Get()
{
static XThreadPool p;
return &p;
}

void Dispatch(XTask*); // 分发线程
void Init(int threadCount); // 初始化线程并启动线程

private:
int threadCount = 0; // 线程数量
int lastThread = -1; // 上一次线程
std::vector<XThread*> threads; // 线程池

XThreadPool() {}; // 构造函数设置为私有的,确保只有自己能创建(单件模式)
};

XThreadPool.cpp

#include "XThreadPool.h"
#include "XThread.h"
#include <thread>
#include <iostream>
using namespace std;


//分发线程
void XThreadPool::Dispatch(XTask* task)
{
//轮询
if (!task)return;
int tid = (lastThread + 1) % threadCount;
lastThread = tid;
XThread* t = threads[tid];

//添加任务
t->AddTask(task);

//激活线程
t->Activate();
}


//初始化线程,并启动所有线程
void XThreadPool::Init(int threadCount)
{
this->threadCount = threadCount;
this->lastThread = -1;
for (int i = 0; i < threadCount; i++)
{
XThread* t = new XThread();
t->id = i + 1;
cout << "Create thread " << i << " successfully" << endl;
t->Start(); //启动线程
threads.push_back(t);
this_thread::sleep_for(10ms);
}
}

线程

XThread.h

#pragma once
#include <event2/event.h>
#include <list>
#include <mutex>


class XTask;
class XThread
{
public:
void Start(); // 启动线程
void Main(); // 线程入口函数
bool Setup(); // 安装线程,初始化event_base和管道监听事件用于激活线程

int id = 0; // 线程编号
void Notify(evutil_socket_t fd, short which); //收到主线程发出的激活信息(线程池的分发)

void Activate(); // 线程激活
void AddTask(XTask* t); // 添加处理的任务(一个线程可以处理多个任务,共用一个event)

private:
int notify_send_fd = 0;
struct event_base* base = 0;
std::list<XTask*> tasks;
std::mutex tasks_mutex;
};

XThread.cpp

#include "XThread.h"
#include <thread>
#include <iostream>
#include <event2/event.h>
#include "XTask.h"
#ifndef _WIN32
#include <signal.h>
#include <unistd.h>
#endif // !_WIN32

using namespace std;


//激活线程
void XThread::Activate()
{
#ifdef _WIN32
int re = send(this->notify_send_fd, "c", 1, 0);
#else
int re = write(this->notify_send_fd, "c", 1);
#endif // _WIN32
if (re <= 0)
{
cerr << "XThread::Activate failed." << endl;
}
}

//添加处理的任务(一个线程可以处理多个任务,共用一个event)
void XThread::AddTask(XTask* t)
{
if (!t)return;
t->base = this->base;
tasks_mutex.lock();
tasks.push_back(t);
tasks_mutex.unlock();

}


void XThread::Notify(evutil_socket_t fd, short which)
{
//水平触发(只要没有接受完成,会再次出发)
char buf[2] = { 0 };
#ifdef _WIN32
int re = recv(fd, buf, 1, 0);
#else
int re = read(fd, buf, 1);
#endif // _WIN32
if (re <= 0)return;
cout << id << " thread " << buf << endl;
XTask* task = NULL;

//获取并初始化任务
tasks_mutex.lock();
if (tasks.empty())
{
tasks_mutex.unlock();
return;
}
task = tasks.front(); //先进先出
tasks.pop_front();
tasks_mutex.unlock();
task->Init();
}



//激活线程任务的事件回调函数
static void NotifyCB(evutil_socket_t fd, short which, void*arg)
{
XThread* t = (XThread*)arg;
t->Notify(fd, which);
}



// 启动线程
void XThread::Start()
{
Setup();
thread th(&XThread::Main, this);

//断开与主线程联系
th.detach();
}



// 线程入口函数
void XThread::Main()
{
cout << id <<" XThread::Main() begin" << endl;
event_base_dispatch(base);
event_base_free(base);
cout << id <<" XThread::Main() end" << endl;
}



// 安装线程,初始化event_base和管道监听事件用于激活线程
bool XThread::Setup()
{
//windows用socketpair,linux用管道进行通信(0读,1写)
#if _WIN32
evutil_socket_t fds[2];
if (evutil_socketpair(AF_INET, SOCK_STREAM, 0, fds) < 0)
{
cout << "evutil_socketpair failed!" << endl;
return false;
}
//设置成非阻塞
evutil_make_socket_nonblocking(fds[0]);
evutil_make_socket_nonblocking(fds[1]);
#else
//管道使用
int fds[2];
if (pipe(fds))
{
cout << "pipe failed" << endl;
return false;
}
#endif
//读取绑定到event事件中,写入要保存。
notify_send_fd = fds[1];

//创建libevent上下文(无锁)
event_config * ev_config = event_config_new();
event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
this->base = event_base_new_with_config(ev_config);
if (!base)
{
cerr << "event_base_new_with_config failed in thread" << endl;
return false;
}

//添加管道监听事件,用于激活线程执行任务
event* ev = event_new(base, fds[0], EV_READ | EV_PERSIST, NotifyCB, this);
event_add(ev, 0);
event_config_free(ev_config);
return true;

}

接口类(纯虚函数)

XTask.h

#pragma once


class XTask
{
public:
struct event_base* base = 0;
int sock = 0;
int thread_id = 0;

//初始化任务
virtual bool Init() = 0;
};

任务类

XFtpServerCMD.h

#pragma once
#include "XTask.h"


class XFtpServerCMD :public XTask
{
public:
//初始化任务
virtual bool Init();

XFtpServerCMD() {}
~XFtpServerCMD() {}
};

#include "XFtpServerCMD.h"
#include <iostream>
#include <event2/bufferevent.h>
#include <event2/event.h>
#include <stdlib.h>
#include <string.h>
using namespace std;


static void EventCB(struct bufferevent* bev, short what, void* arg)
{
XFtpServerCMD* cmd = (XFtpServerCMD*)arg;

//如果对方网络中断或死机,收不到BEV_EVENT_EOF数据
if (what & (BEV_EVENT_EOF | BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT))
{
cout << "BEV_EVENT_EOF | BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT" << endl;
bufferevent_free(bev);
delete cmd;
}
}


//子线程XThread事件分发中被调用
static void ReadCB(bufferevent* bev, void*arg)
{
XFtpServerCMD* cmd = (XFtpServerCMD*)arg;
char data[1024] = { 0 };
for (;;)
{
int len = bufferevent_read(bev, data, sizeof(data) - 1);
if (len <= 0)break;
data[len] = '\0';
cout << data << flush;

//测试代码
if (strstr(data,"quit"))
{
bufferevent_free(bev);
delete cmd;
break;
}
}
}


//初始化任务,运行在子线程中
bool XFtpServerCMD::Init()
{
//监听socket bufferevent;
cout << "XFtpServerCMD::Init()" << endl;
bufferevent* bev= bufferevent_socket_new(base, sock, BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(bev, ReadCB, 0, EventCB, this);
bufferevent_enable(bev, EV_READ | EV_WRITE);

//添加超时
timeval rt = { 10,0 };
bufferevent_set_timeouts(bev, &rt, 0);
return true;
}

移植到linux下

test_thread_pool:test_thread_pool.cpp XFtpServerCMD.cpp XTask.cpp XThread.cpp XThreadPool.cpp
g++ $^ -o $@ -levent -lpthread
./$@

clean:
rm -rf test_thread_pool
rm

服务端

libevent(十五)ThreadPool线程池(一)_c++_02


客户端

libevent(十五)ThreadPool线程池(一)_开发语言_03


举报

相关推荐

0 条评论