一、线程池模式介绍
线程池模式(Thread Pool Pattern)是一种并发设计模式,用于管理和循环使用线程资源以处理大量任务。它旨在提高系统性能和资源利用率,特别是在需要频繁创建和销毁线程的环境中。
1、线程池模式结构图
2、创建线程池的4种常见方法
二、线程池的设计方法
示范说明:为了便于演示,所有模拟的任务时间都是固定时间(焊四)。
1、固定大小线程池示例
完整代码
fix_threadpool.cpp
#include <iostream>
#include <vector>
#include <thread>
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <chrono>
// 添加互斥锁以保护 std::cout
std::mutex coutMutex;
// 线程池
class ThreadPool {
public:
ThreadPool(size_t numThreads) : stop(false) {
for (size_t i = 0; i < numThreads; ++i) {
workers.emplace_back([this] () {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queueMutex);
this->condition.wait(lock, [this] () {
return this->stop || !this->tasks.empty();
});
if (this->stop && this->tasks.empty()) {
return;
}
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
} );
}
}
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queueMutex);
stop = true;
}
condition.notify_all();
for (std::thread& worker : workers) {
worker.join(); // 阻塞线程,等待主线程回收子线程
}
}
template<class F, class... Args>
void enqueue(F&& f, Args&&... args) {
{
std::unique_lock<std::mutex> lock(queueMutex);
if (stop) {
throw std::runtime_error("Error: 在已停止的线程池上排队! ");
}
tasks.emplace([f, args...]() {
f(args...);
});
}
condition.notify_one();
}
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queueMutex;
std::condition_variable condition;
bool stop;
};
void simulateTasks(int id) {
std::lock_guard<std::mutex> guard(coutMutex);
std::cout << "线程 " << std::this_thread::get_id() << " 正在执行 " << id << "任务\n";
// 模拟任务时间相同,每个任务耗时1ms
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
int main() {
// 创建4个线程池
ThreadPool pool(4);
// 创建1081个模拟的任务,加入队列中
for (int i = 0; i < 1081; ++i) {
pool.enqueue(simulateTasks, i);
}
return 0;
}
运行效果
代码使用固定线程池创建4个线程,并创建1081个模拟的任务被这4个线程共同并发执行,实际线程号只有4种,他们之间通过互斥锁防止恶性竞争,并通过一把全局互斥锁保护 std::cout 的访问。从运行效果看运行速度非常快。
2、可缓存线程池示例
完整代码
cache_threadpool.cpp
#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <condition_variable>
#include <mutex>
#include <functional>
#include <chrono>
#include <atomic>
#include <future>
#include <map>
#include <iomanip>
std::mutex coutMutex; // 添加全局互斥锁以保护 std::cout
class CachedThreadPool {
public:
CachedThreadPool(size_t initialThreads = std::thread::hardware_concurrency(), int idleTimeout = 20)
: maxThreads(initialThreads), shutdown(false), idleTimeout(idleTimeout) {
startThreads(initialThreads);
}
~CachedThreadPool() {
{
std::unique_lock<std::mutex> lock(queueMutex);
shutdown = true;
}
condVar.notify_all();
for (std::thread& worker : workers) {
if (worker.joinable()) {
worker.join();
}
}
}
std::future<void> enqueueTask(std::function<void()> task) {
std::packaged_task<void()> packagedTask(std::move(task));
std::future<void> future = packagedTask.get_future();
{
std::unique_lock<std::mutex> lock(queueMutex);
tasks.push(std::move(packagedTask));
lastActive = std::chrono::steady_clock::now();
}
condVar.notify_one();
return future;
}
private:
void startThreads(size_t count) {
for (size_t i = 0; i < count; ++i) {
workers.emplace_back([this]() {
auto lastActiveTime = std::chrono::steady_clock::now();
auto threadStartTime = lastActiveTime; // 记录线程开始活动时间
static int TaskCode = 0;
while (true) {
std::packaged_task<void()> task;
{
std::unique_lock<std::mutex> lock(queueMutex);
// 等待任务到来或者超时(释放互斥锁)
condVar.wait_for(lock, std::chrono::seconds(idleTimeout), [this]() {
return shutdown || !tasks.empty();
});
if (!tasks.empty()) {
std::lock_guard<std::mutex> guard(coutMutex);
task = std::move(tasks.front());
tasks.pop();
TaskCode++; // 任务代号
lastActive = std::chrono::steady_clock::now();
std::cout << "线程 " << std::this_thread::get_id() << " 执行任务:" << TaskCode << " 开始时间: " << toTimeString(threadStartTime) << std::endl;
std::cout << "线程 " << std::this_thread::get_id() << " 执行任务:" << TaskCode << " 最后一次活动时间: " << toTimeString(lastActive) << std::endl;
lastActiveTime = lastActive;
} else {
// 当任务为空,从线程池的最后一次活动状态开始记录超时时间
auto now = std::chrono::steady_clock::now();
while (std::chrono::duration_cast<std::chrono::seconds>(now - lastActiveTime).count() < idleTimeout) {
now = std::chrono::steady_clock::now();
}
if (shutdown && tasks.empty()) {
break;
}
}
}
try {
task();
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
}
}
// 线程销毁时的处理
{
std::lock_guard<std::mutex> guard(coutMutex);
std::cout << "线程 " << std::this_thread::get_id() << " 已销毁" << std::endl;
}
});
}
}
std::string toTimeString(const std::chrono::steady_clock::time_point& tp) const {
using namespace std::chrono;
auto timeT = system_clock::to_time_t(system_clock::now() + (tp - steady_clock::now()));
std::tm tm = *std::localtime(&timeT);
std::ostringstream oss;
oss << std::put_time(&tm, "%Y-%m-%d %H:%M:%S");
return oss.str();
}
size_t maxThreads;
std::vector<std::thread> workers;
std::queue<std::packaged_task<void()>> tasks;
std::mutex queueMutex;
std::condition_variable condVar;
std::atomic<bool> shutdown;
std::chrono::steady_clock::time_point lastActive;
int idleTimeout;
};
int main() {
CachedThreadPool pool;
std::vector<std::future<void>> tasks;
// 模拟20个任务
for (int i = 1; i <= 20; ++i) {
tasks.push_back(pool.enqueueTask([i, &pool]() {
{
std::lock_guard<std::mutex> guard(coutMutex);
std::cout << i << " 任务被 " << std::this_thread::get_id() << " 线程执行中." << std::endl;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}));
}
// 等待所有任务完成
for (auto& task : tasks) {
task.get();
}
// 程序退出前稍作等待,确保所有销毁信息被打印出来
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "\n主线程等待20秒后回收所有线程资源,程序退出 " << std::endl;
return 0;
}
运行效果
代码简单实现可缓存的线程池,首先调用std::thread::hardware_concurrency()函数获取4个线程共同完成20个模拟的任务,每个模拟的任务需耗时1s,从输出结果看4个线程4s完成20个模拟任务,完成任务后等待20s触发超时,join()函数阻塞,然后所有子线程被主线程回收,程序退出。
3、定时任务线程池示例
完整代码
timing_threadpool.cpp
#include <iostream>
#include <vector>
#include <thread>
#include <chrono>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <functional>
#include <atomic>
#include <iomanip>
// 全局一把锁
std::mutex taskMutex;
class ThreadPool {
public:
static ThreadPool& getInstance(size_t numThreads) {
static ThreadPool instance(numThreads);
return instance;
}
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
void enqueueTask(const std::function<void()>& task);
void start();
void stop();
private:
ThreadPool(size_t numThreads);
~ThreadPool();
void workerThread();
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::condition_variable condVar;
std::atomic<bool> stopFlag;
std::atomic<bool> runFlag;
std::chrono::seconds interval{10};
};
ThreadPool::ThreadPool(size_t numThreads) : stopFlag(false), runFlag(false) {
workers.reserve(numThreads);
}
ThreadPool::~ThreadPool() { stop(); }
void ThreadPool::enqueueTask(const std::function<void()>& task) {
std::lock_guard<std::mutex> lock(taskMutex);
tasks.push(task);
condVar.notify_one();
}
void ThreadPool::start() {
runFlag = true;
for (size_t i = 0; i < workers.capacity(); ++i) {
workers.emplace_back(&ThreadPool::workerThread, this);
}
}
void ThreadPool::stop() {
stopFlag = true;
condVar.notify_all();
for (std::thread& worker : workers) {
if (worker.joinable()) {
worker.join();
}
}
runFlag = false;
}
void ThreadPool::workerThread() {
while (!stopFlag) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(taskMutex);
condVar.wait_for(lock, interval, [this]() {
return !tasks.empty() || stopFlag;
});
if (stopFlag && tasks.empty()) {
return;
}
if (!tasks.empty()) {
task = tasks.front();
tasks.pop();
}
}
if (task) { task(); }
}
}
// 任务模拟
void simulateTask(const std::string& type) {
std::cout << "线程 " << std::this_thread::get_id() << " 执行任务:" << type <<std::endl;
// 模拟任务调度耗时1s
std::this_thread::sleep_for(std::chrono::seconds(1));
}
// 时间格式转换
std::string toTimeString(const std::chrono::system_clock::time_point& time) {
using namespace std::chrono;
auto timeT = system_clock::to_time_t(time);
std::tm tm = *std::localtime(&timeT);
std::ostringstream oss;
oss << std::put_time(&tm, "%Y-%m-%d %H:%M:%S");
return oss.str();
}
int main() {
// 单例模式创建2个线程
ThreadPool& pool = ThreadPool::getInstance(2);
// 启动线程池
pool.start();
// 添加任务到线程池
std::string type = "冒烟测试";
std::thread([&pool, type]() {
while(true) {
pool.enqueueTask([type]() {
std::cout << "任务:" << type << ",在 " << toTimeString(std::chrono::system_clock::now()) << " 触发执行" << std::endl;
simulateTask(type);
});
std::this_thread::sleep_for(std::chrono::seconds(10));
}
}).detach();
// 主线程休眠1分钟,不让主线程死循环并留出一分钟时间让子线程周期性执行任务
std::this_thread::sleep_for(std::chrono::minutes(1));
std::cout << "\n最长演示时间1分钟,线程已回收,程序结束。" << std::endl;
pool.stop();
return 0;
}
运行效果
定义全局的唯把锁,设计单例模式的线程池,主函数中调用单例创建2个线程加入线程池,这2个线程每隔10s周期性执行simulate函数(函数代表被模拟的任务)。
4、自适应线程池
完整代码
adaptive_threadpool.cpp
#include <iostream>
#include <thread>
#include <functional>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <algorithm>
#include <chrono>
#include <string>
#include <atomic>
#include <iomanip>
#include <sys/select.h>
#include <unistd.h>
// 任务锁
std::mutex taskMutex;
class ThreadPool {
public:
// 获取单例实例
static ThreadPool& getInstance(size_t min_threads = 1) {
static ThreadPool instance(min_threads);
return instance;
}
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
// 执行任务
void execute(std::function<void()> task) {
{
std::unique_lock<std::mutex> lock(queue_mutex);
if (stop) {
throw std::runtime_error("线程池已停止运行.");
}
tasks.emplace(std::move(task));
++task_count; // 增加任务计数
}
condition.notify_one();
}
void start(size_t min_threads) {
std::lock_guard<std::mutex> lock(queue_mutex);
max_threads = std::max(max_threads, min_threads);
workers.reserve(max_threads);
for (size_t i = workers.size(); i < max_threads; ++i) {
workers.emplace_back(&ThreadPool::worker_thread, this);
}
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (auto& worker : workers) {
if (worker.joinable()) {
worker.join();
}
}
}
// 等待所有任务完成
void wait() {
std::unique_lock<std::mutex> lock(queue_mutex);
finished_condition.wait(lock, [this]() { return task_count == 0; });
}
private:
ThreadPool(size_t min_threads = 1)
: max_threads(min_threads), stop(false), task_count(0) {}
void worker_thread() {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queue_mutex);
condition.wait(lock, [this] { return stop || !tasks.empty(); });
if (stop && tasks.empty()) {
return;
}
task = std::move(tasks.front());
tasks.pop();
}
task();
// 自适应调整线程池大小
{
std::lock_guard<std::mutex> lock(queue_mutex);
if (tasks.size() > max_threads * 3 && workers.size() < max_threads) {
size_t additional_threads = std::min(max_threads - workers.size(), tasks.size() / 10);
for (size_t i = 0; i < additional_threads; ++i) {
workers.emplace_back(&ThreadPool::worker_thread, this);
}
}
}
// 任务完成,减少计数器并通知主线程
{
std::lock_guard<std::mutex> lock(queue_mutex);
--task_count;
if (task_count == 0) {
finished_condition.notify_one();
}
}
}
}
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queue_mutex;
std::condition_variable condition;
std::condition_variable finished_condition;
size_t max_threads;
std::atomic<bool> stop;
std::atomic<size_t> task_count; // 任务计数器
};
// 时间格式转换
std::string toTimeString(const std::chrono::system_clock::time_point& time) {
using namespace std::chrono;
auto timeT = system_clock::to_time_t(time);
std::tm tm = *std::localtime(&timeT);
std::ostringstream oss;
oss << std::put_time(&tm, "%Y-%m-%d %H:%M:%S");
return oss.str();
}
// 超时处理
bool getInputWithTimeout(std::string& input, int timeoutSeconds) {
fd_set set;
struct timeval timeout;
int rv;
FD_ZERO(&set);
FD_SET(STDIN_FILENO, &set);
timeout.tv_sec = timeoutSeconds;
timeout.tv_usec = 0;
rv = select(STDIN_FILENO + 1, &set, NULL, NULL, &timeout);
if (rv == -1) {
std::cerr << "select() 错误" << std::endl;
return false;
} else if (rv == 0) {
return false;
} else {
std::getline(std::cin, input);
return true;
}
}
// 任务模拟
void simulateTask(const std::string& type, const int index) {
std::lock_guard<std::mutex> lock(taskMutex);
std::cout << "序号:" << index << " 线程 " << std::this_thread::get_id() << " 在运行任务 " << type << " " << toTimeString(std::chrono::system_clock::now()) << std::endl;
}
int main() {
/*模拟客户端永不关机,线程池每隔10s周期性轮询任务,\
!!! 如果任务不存在,或超时,则执行上一次模拟的任务.
*/
ThreadPool& pool = ThreadPool::getInstance();
// 启动线程池(初始创建2个线程)
pool.start(2);
static std::string type = "";
while (true) {
std::cout << "输入任务称号(按 Enter 键结束输入): " << std::flush;
if (getInputWithTimeout(type, 20) && !type.empty()) {
std::cout << "\n正在执行新任务 " << type << "\n" << std::endl;
} else if (type.empty() && !getInputWithTimeout(type, 20)) {
std::cout << "\n输入为空 或 超时20s未输入,执行默认任务.\n" << std::endl;
type = "黑神话·悟空";
} else {
std::cout << "\n输入为空 或 超时20s未输入,执行上一次任务.\n" << std::endl;
}
// 传递任务参数
std::string localType = type;
for (int i = 0; i < 20; ++i) {
pool.execute([i, localType]() {
simulateTask(localType, i);
// 模拟任务调度耗时
std::this_thread::sleep_for(std::chrono::seconds(i*1));
});
}
// 等待所有任务完成
pool.wait();
// 所有任务完成后再进行等待
std::cout << "\n所有任务已完成,等待 10 秒...\n" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(10));
}
return 0;
}
代码实现模拟的客户端永不关机,单例线程池每隔10s周期性轮询任务。如果任务不存在,或超时20s终端未输入任何信息,则执行默认的模拟任务,如果任务上次执行过,这次轮询时间不存在新的任务,则执行上一次的任务。
运行效果
三、线程池模式的应用场景
四、总结
实际上线程池模式属于GoF一书23种设计模式的一种或多种设计模式组合。线程池的主要优点包括减少创建和销毁线程的开销、提高资源的利用率、简化线程管理。它适用于需要处理大量短时间任务的场景(如高并发)。在实际应用中,线程池模式有助于提高系统的响应速度和吞吐量,减少资源浪费,并使任务处理更为高效和可控。