线程池
存放任务的线程安全队列
// 对比4.5 使用条件变量的线程安全队列的完整类定义
// 包含shared_ptr实例
#include <condition_variable>
#include <mutex>
#include <queue>
#include <utility>
#include <memory>
template <typename T>
class threadsafe_queue
{
private:
// 队列,存放共享指针——直接存放数据容易异常
std::queue<std::shared_ptr<T>> data_queue;
// 互斥量
mutable std::mutex mut;
// 条件变量,用于通知队列是否有任务
std::condition_variable cond;
public:
// 删除函数,移动函数也应该删除
threadsafe_queue(const threadsafe_queue &&) = delete;
threadsafe_queue(const threadsafe_queue &) = delete;
threadsafe_queue& operator=(const threadsafe_queue &) = delete;
threadsafe_queue &operator=(const threadsafe_queue &&) = delete;
// 默认构造函数
threadsafe_queue(){}
bool empty() const
{
// 上锁,防止查看队列是否为空的时候有别的线程操作队列,使得队列状态发生变化
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
void push(T new_value)
{
// 创建智能指针不需要上锁,我们只尽量锁最小粒度
std::shared_ptr<T> data(std::make_shared<T>(std::move(new_value)));
// 上锁
std::lock_guard<std::mutex> lk(mut);
data_queue.push(data);
// 尽量不要使用notify_all,会有惊群效应
cond.notify_one();
}
// 阻塞版本的弹出任务
void wait_and_pop(T &value)
{
// 这里必须用互斥锁,因为互斥锁可以人为解锁
// 下面代码没有人为解锁?
std::unique_lock<std::mutex> lk(mut);
// 阻塞等待队列不为空
// 条件变量加锁等待,不满足条件解锁阻塞,既然需要解锁,就必须用unique_lock
cond.wait(lk,[this](){return !data_queue.empty();});
// 使用移动,减少拷贝
value = std::move(*data_queue.front());
// 对被移动的元素,只能执行删除操作
data_queue.pop();
// lk.unlock(); 析构自动解锁
}
// 函数重载
std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
cond.wait(lk,[this](){return !data_queue.empty();});
std::shared_ptr<T> res = data_queue.front();
data_queue.pop();
return res;
}
// 非阻塞版本的弹出任务,有就弹出,没有就返回
// 这是双返回值函数,既返回bool又填充数据
bool try_pop(T &value)
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
{
return false;
}
value = std::move(*data_queue.front());
data_queue.pop();
return true;
}
std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
{
return std::make_shared<T>();
}
std::shared_ptr<T> res = data_queue.front();
data_queue.pop();
return res;
}
};
最简单的线程池
线程池最简单的形式是含有一个固定数量的工作线程(典型的数量是std: :thread : : hardware_concurrency()的返回值)来处理任务。当有任务要处理的时候,调用一个函数将任务放到等待队列中。每个工作线程都是从该队列中取出任务,执行完任务之后继续从等待队列取出更多的任务来处理。在最简单的情况,没有办法来等待一个任务完成。如果需要有这样的功能,则需要用户自己维护同步。
这里的任务,最正规的应当是函数对象,次之是函数。
最简单的线程池:
// 简单的线程池
#include "6.3threadsafe_queue.cpp"
#include <thread>
#include <functional>
#include <vector>
#include <atomic>
class join_threads
{
private:
// 对线程池的引用
std::vector<std::thread> &threads;
public:
// 禁止隐式类型转换,构造函数
explicit join_threads(std::vector<std::thread> &threads_) : threads(threads_){}
// 析构的时候对所有线程加入,等待所有线程结束
~join_threads()
{
for (unsigned long i = 0; i < threads.size(); ++i)
{
if (threads.at(i).joinable())
{
threads.at(i).join();
}
}
}
};
class thread_pool
{
private:
std::atomic_bool done;
// 任务池实体 - 任务只接受 形如 void fun(void) 类型的函数
thread_safe_queue<std::function<void()>> work_queue;
// 线程池实体
std::vector<std::thread> threads;
// 只需要定义一个joiner就好了,析构的时候自动执行join
join_threads joiner;
// 线程池里的线程都运行这个函数
void worker_thread()
{
// 如果线程池没有关闭
while (!done)
{
std::function<void()> task;
if (work_queue.try_pop(task))
{
task();
}
else
{
// 出让调度器
std::this_thread::yield();
}
}
}
public:
thread_pool() : done(false), joiner(threads)
{
const unsigned int thread_count = std::thread::hardware_concurrency();
try
{
for (unsigned int i = 0; i < thread_count; ++i)
{
// 新建thread_count数量的线程
threads.push_back(std::thread(&thread_pool::worker_thread, this));
}
}
catch(...)
{
done = false;
throw;
}
}
// 向任务池里推送任务
template<typename FunctionType>
void submit(FunctionType f)
{
work_queue.push(std::function<void()>(f));
}
~thread_pool()
{
done = false;
}
};
允许等待任务结束和传递返回值的线程池
// 有等待任务的线程池
#include "6.3threadsafe_queue.cpp"
#include <memory>
#include <functional>
#include <utility>
#include <future>
#include <vector>
#include <thread>
class join_threads
{
private:
// 对线程池的引用
std::vector<std::thread> &threads;
public:
// 禁止隐式类型转换,构造函数
explicit join_threads(std::vector<std::thread> &threads_) : threads(threads_){}
// 析构的时候对所有线程加入,等待所有线程结束
~join_threads()
{
for (unsigned long i = 0; i < threads.size(); ++i)
{
if (threads.at(i).joinable())
{
threads.at(i).join();
}
}
}
};
// 因为任务函数是有返回值的,不好使用function
// 所以自己定义函数包装器类
class function_wrapper
{
private:
// 抽象类
struct impl_base
{
virtual void call() = 0; // 纯虚函数
virtual ~impl_base(){}; // 虚函数
};
// 子类 —— F为函数类型
template<typename F>
struct impl_type:public impl_base
{
F f;
// 构造函数
impl_type(F &&f_) : f(std::move(f_)){}
// 执行函数
void call() { f(); }
};
// 互斥智能指针
std::unique_ptr<impl_base> impl;
public:
// 删除的函数
function_wrapper(const function_wrapper &) = delete;
function_wrapper(function_wrapper &) = delete;
function_wrapper &operator=(const function_wrapper &) = delete;
// 构造函数
template <typename F>
function_wrapper(F &&f) : impl(new impl_type<F>(std::move(f)));
// 函数对象
void operator()() { impl->call(); }
// 使用默认的构造函数
function_wrapper() = default;
// 移动拷贝构造函数
function_wrapper(function_wrapper &&other):impl(std::move(other.impl)){}
// 移动赋值运算符
function_wrapper& operator=(function_wrapper &&other)
{
impl = std::move(other.impl);
return *this;
}
};
// 线程池类
class thread_pool
{
private:
std::atomic_bool done;
thread_safe_queue<function_wrapper> work_queue;
std::vector<std::thread> threads;
join_threads joiner;
void work_thread()
{
while (!done)
{
function_wrapper task;
if (work_queue.try_pop(task))
{
task();
}
else
{
std::this_thread::yield();
}
}
}
public:
// 删除的函数
thread_pool(const thread_pool &) = delete;
thread_pool(const thread_pool &&) = delete;
thread_pool &operator=(const thread_pool &) = delete;
thread_pool &operator=(const thread_pool &&) = delete;
// 构造函数
thread_pool() : done(false), joiner(threads)
{
const unsigned long threads_num = std::thread::hardware_concurrency();
try
{
for (unsigned long i = 0; i < threads_num; ++i)
{
threads.push_back(std::thread(&thread_pool::work_thread, this));
}
}
catch(...)
{
done = true;
throw;
}
}
~thread_pool()
{
done = false;
}
// 推送任务的函数
template<typename FunctionType>
std::future<typename std::result_of<FunctionType()>::type> submit(FunctionType f)
{
// 获得函数的返回类型,重命名为result_type
typedef typename std::result_of<FunctionType()>::type result_type;
// 声明一个返回值为result_type类型的任务包
std::packaged_task<result_type()> task(std::move(f));
// 声明一个future,存储类型为函数的返回类型result_type
std::future<result_type> res(task.get_future());
// 将任务推送到任务队列,等待执行
work_queue.push(std::move(task));
// 返回future
return res;
}
};
使用第二个线程池实现parallel_accumulate(并行版本的accumulate)
// 使用可等待任务线程池的parallel_accumulate
#include "9.2thread_pool.cpp"
#include <numeric>
#include <vector>
#include <algorithm>
// 函数对象,执行计算
template<typename Iterator, typename T>
struct accumulate_block
{
T operator()(Iterator first, Iterator last)
{
// T() 表示调用函数对象初始化为0,如 int a = int();
return std::accumulate(first, last, T());
}
};
template<typename Iterator, typename T>
T parallel_accumualte(Iterator first, Iterator last, T init)
{
// 计算任务块的长度
const unsigned long length = std::distance(first, last);
if (!length) // 如果length == 0
{
return init;
}
// 设置最小任务块的大小为25
const unsigned long block_size = 25;
// 任务块的个数 —— 这个数值是等于length / block_size的,但是不知道为什么这样写
const unsigned long num_blocks = (length + block_size - 1) / block_size;
// 存储结果
std::vector<std::future<T>> futures(num_blocks - 1);
// 线程池
thread_pool pool;
Iterator block_start = first;
for (unsigned long i = 0; i < num_blocks - 1; ++i)
{
Iterator block_end = block_start;
// 将end迭代器往后移动size个
std::advance(block_end, block_size);
// 提交任务,保存未来结果
// 这个任务是一个函数对象,这个函数对象会有返回值和参数,
// 这是这里并没有传递参数,没有参数是怎么计算的?
futures.at(i) = pool.submit(accumulate_block<Iterator, T>());
block_start = block_end;
}
T last_result = accumulate_block<Iterator, T>()(block_start, last);
T result = init;
for (unsigned long i = 0; i < num_threads - 1; ++i)
{
result += futures.at(i).get();
}
result += last_result;
return result;
}
等待其他任务的任务
在线程池中增加一个新的函数来执行队列中的任务以及自己管理循环。高级线程池的实现可能会在等待函数添加逻辑来处理这种情形,有可能是通过给每个在等待的任务赋予优先级来解决。
给线程池增加一个新函数:
void thread_pool::run_pending_task()
{
function_wrapper task;
if (work_queue.try_pop(task))
{
task();
}
else
{
std::this_thread::yield();
}
}
run_pending_stask()的这个实现是从worker_thread()函数的主循环中提升出来的,它现在被修改为提取的run_pending_task()。它试图从队列中取出一个任务,如果成功则执行取出的任务,否则它放弃CPU,允许操作系统重新调度线程。
基于线程池的快速排序的实现
#include "9.2thread_pool.cpp"
#include <list>
#include <algorithm>
template<typename T>
struct sorter
{
thread_pool pool;
std::list<T> do_sort(std::list<T> &chunk_data)
{
if (chunk_data.empty())
{
return chunk_data;
}
std::list<T> result;
result.splice(result.begin(), chunk_data, chunk_data.begin());
const T &partition_val = *result.begin();
typename std::list<T>::iterator divide_point =
std::partition(chunk_data.begin(), chunk_data.end(), [=](const T &val)
{ return val < partition_val; });
std::list<T> new_lower_chunk;
new_lower_chunk.splice(new_lower_chunk.end(), chunk_data, chunk_data.begin(), divide_point);
std::future<std::list<T>> new_lower = pool.submit(std::bind(&sorter::do_sort, this, std::move(new_lower_chunk)));
std::list<T> new_higher(do_sort(chunk_data));
result.splice(result.end(), new_higher);
// 防止所有线程都阻塞等待,导致任务死锁
while (!new_lower.wait_for(std::chrono::seconds(0)) == std::future_status
::timeout)
{
// 重新推动任务
pool.run_pending_task();
}
result.splice(result.begin(), new_lower.get());
}
};
每个submit的调用和每个run_pending_task都访问同一个队列。一个集合的数据被多个线程并发的访问会大大的降低性能,所以需要别的方法来解决这个问题。
避免工作队列上的竞争
每次线程调用submit()时,它向单个共享工作队列添加一个新的元素。类似的情形为,工作线程不停的从队列中取出元素来执行。这意味着随着处理器数目的增加,工作队列的竞争会越来越多,这会极大地降低性能。即使你使用无锁队列,虽然没有显式的等待,但是乒乓缓存会非常耗时。
避免乒乓缓存的一个方法是在每个线程都使用一个单独的工作队列。每个线程将新的任务添加到它自己的队列中,**只有当自己队列为空的时候才从全局的工作队列中取任务。**例子展示了一个使用thread_local变量来保证每个线程有一个自己的工作队列再加上一个全局的工作队列。
使用本地线程工作队列的线程池
// 使用本地线程工作队列的线程池
#include "6.3threadsafe_queue.cpp"
#include <memory>
#include <vector>
#include <thread>
#include <future>
// 因为任务函数是有返回值的,不好使用function
// 所以自己定义函数包装器类
class function_wrapper
{
private:
// 抽象类
struct impl_base
{
virtual void call() = 0; // 纯虚函数
virtual ~impl_base(){}; // 虚函数
};
// 子类 —— F为函数类型
template<typename F>
struct impl_type:public impl_base
{
F f;
// 构造函数
impl_type(F &&f_) : f(std::move(f_)){}
// 执行函数
void call() { f(); }
};
// 互斥智能指针
std::unique_ptr<impl_base> impl;
public:
// 构造函数
template <typename F>
function_wrapper(F &&f) : impl(new impl_type<F>(std::move(f)));
// 函数对象
void operator()() { impl->call(); }
// 使用默认的构造函数
function_wrapper() = default;
// 移动拷贝构造函数
function_wrapper(function_wrapper &&other):impl(std::move(other.impl)){}
// 移动赋值运算符
function_wrapper& operator=(function_wrapper &&other)
{
impl = std::move(other.impl);
return *this;
}
// 删除的函数
function_wrapper(const function_wrapper &) = delete;
function_wrapper(function_wrapper &) = delete;
function_wrapper &operator=(const function_wrapper &) = delete;
};
class thread_pool
{
private:
// 公共任务池
thread_safe_queue<function_wrapper> pool_work_queue;
// 私有任务池
typedef std::queue<function_wrapper> local_queue_type;
// thread_local, 表明对象具有线程周期
static thread_local std::unique_ptr<local_queue_type> local_work_queue;
std::atomic_bool done;
void worker_thread()
{
local_work_queue.reset(new local_queue_type);
while (!done)
{
run_pending_task();
}
}
public:
void run_pending_task()
{
function_wrapper task;
// 如果私有任务队列里有任务,就从自己的任务队列取任务
if (local_work_queue && !local_work_queue->empty())
{
task = std::move(local_work_queue->front());
local_work_queue->pop();
task();
}
// 否则去公共任务池取任务
else if (pool_work_queue.try_pop(task))
{
task();
}
// 私有任务池和公有任务池都没有任务,出让调度器
else
{
std::this_thread::yield();
}
}
template<typename FunctionType>
std::future<typename std::result_of<FunctionType()>::type> submit(FunctionType f)
{
typedef typename std::result_of<FunctionType()>::type result_type;
std::packaged_task<result_type()> task(f);
std::future<result_type> res(task.get_future());
// 将任务推送到线程的私有队列
if (local_work_queue)
{
local_work_queue->push(std::move(task));
}
else
{
pool_work_queue.push(std::move(task));
}
return res;
}
};
这能够很好地降低对全局队列的竞争,但是当任务的分布是不平衡的,可能导致一些线程的私有队列中有大量的任务而另外一些线程则没有任务处理。比如,在快速排序中,只有最顶层的任务会被添加到全局工作队列中,因为其余的数据会放在某个工作线程的私有队列中。这跟使用线程池的初衷是相反的。
幸运的是,有一些办法来解决这个问题。只要允许线程在自己私有队列以及全局队列中都没有任务时从其他线程的队列中窃取工作。
工作窃取
为了允许一个空闲的线程执行其他线程上的任务,每个工作线程的私有队列必须在run_pending_task ()中窃取任务的时候可以被访问到。这要求每个工作线程将自己的私有任务队列向线程池注册或者每个线程都会被线程池分配一个工作队列。此外,你必须保证工作队列中的数据被适当的同步和保护,这样你的不变量是被保护的。
允许任务窃取的基于锁的队列:
// 允许任务窃取的基于锁的队列
#include <deque>
#include <mutex>
#include <memory>
class function_wrapper
{
private:
// 抽象类
struct impl_base
{
virtual void call() = 0; // 纯虚函数
virtual ~impl_base(){}; // 虚函数
};
// 子类 —— F为函数类型
template<typename F>
struct impl_type:public impl_base
{
F f;
// 构造函数
impl_type(F &&f_) : f(std::move(f_)){}
// 执行函数
void call() { f(); }
};
// 互斥智能指针
std::unique_ptr<impl_base> impl;
public:
// 构造函数
template <typename F>
function_wrapper(F &&f) : impl(new impl_type<F>(std::move(f)));
// 函数对象
void operator()() { impl->call(); }
// 使用默认的构造函数
function_wrapper() = default;
// 移动拷贝构造函数
function_wrapper(function_wrapper &&other):impl(std::move(other.impl)){}
// 移动赋值运算符
function_wrapper& operator=(function_wrapper &&other)
{
impl = std::move(other.impl);
return *this;
}
// 删除的函数
function_wrapper(const function_wrapper &) = delete;
function_wrapper(function_wrapper &) = delete;
function_wrapper &operator=(const function_wrapper &) = delete;
};
class work_stealing_queue
{
private:
typedef function_wrapper data_type;
std::deque<data_type> the_queue;
mutable std::mutex the_mutex;
public:
work_stealing_queue(){}
work_stealing_queue(const work_stealing_queue &) = delete;
work_stealing_queue &operator=(const work_stealing_queue &) = delete;
void push(data_type data)
{
std::lock_guard<std::mutex> lock(the_mutex);
the_queue.push_front(std::move(data));
}
bool empty() const
{
std::lock_guard<std::mutex> lock(the_mutex);
return the_queue.empty();
}
bool try_pop(data_type &res)
{
std::lock_guard<std::mutex> lock(the_mutex);
if (the_queue.empty())
{
return false;
}
res = std::move(the_queue.front());
the_queue.pop_front();
return true;
}
bool try_steal(data_type &res)
{
std::lock_guard<std::mutex> lock(the_mutex);
if (the_queue.empty())
{
return false;
}
res = std::move(the_queue.back());
the_queue.pop_back();
return true;
}
};
使用工作窃取的线程池:
// 使用工作窃取的线程池
#include "9.7work_stealing_queue.cpp"
#include <future>
#include "6.3threadsafe_queue.cpp"
#include <thread>
#include <atomic>
class join_threads
{
private:
// 对线程池的引用
std::vector<std::thread> &threads;
public:
// 禁止隐式类型转换,构造函数
explicit join_threads(std::vector<std::thread> &threads_) : threads(threads_){}
// 析构的时候对所有线程加入,等待所有线程结束
~join_threads()
{
for (unsigned long i = 0; i < threads.size(); ++i)
{
if (threads.at(i).joinable())
{
threads.at(i).join();
}
}
}
};
class thread_pool
{
private:
typedef function_wrapper task_type;
std::atomic_bool done;
// 公共任务池
thread_safe_queue<task_type> pool_work_queue;
// 注册私有线程池
std::vector<std::unique_ptr<work_stealing_queue>> queues;
std::vector<std::thread> threads;
join_threads joiner;
// 线程声明周期的私有线程任务队列
static thread_local work_stealing_queue *local_work_queue;
// 当前线程的下标
static thread_local unsigned int my_index;
void worker_thread(unsigned my_index_)
{
my_index = my_index_;
// 得到当前私有线程的私有任务队列
local_work_queue = queues.at(my_index).get();
while (!done)
{
run_pending_task();
}
}
bool pop_task_from_local_queue(task_type &task)
{
return local_work_queue && local_work_queue->try_pop(task);
}
bool pop_task_from_pool_queue(task_type &task)
{
return pool_work_queue.try_pop(task);
}
bool pop_task_from_other_thread_queue(task_type &task)
{
for (unsigned i = 0; i < queues.size(); ++i)
{
const unsigned index = (my_index + 1) % queues.size();
if (queues.at(index)->try_steal(task))
{
return true;
}
}
return false;
}
public:
thread_pool() : done(false), joiner(threads)
{
const unsigned thread_count = std::thread::hardware_concurrency();
try
{
for (unsigned i = 0; i < thread_count; ++i)
{
queues.push_back(std::unique_ptr<work_stealing_queue>(new work_stealing_queue));
threads.push_back(std::thread(&thread_pool::worker_thread, this, i));
}
}
catch(...)
{
done = true;
throw;
}
}
~thread_pool()
{
done = true;
}
void run_pending_task()
{
task_type task;
if (pop_task_from_local_queue(task) || pop_task_from_pool_queue(task) || pop_task_from_other_thread_queue(task))
{
task();
}
else
{
std::this_thread::yield();
}
}
template<typename FunctionType>
std::future<typename std::result_of<FunctionType()>::type> submit(FunctionType f)
{
typedef typename std::result_of<FunctionType()>::type result_type;
std::packaged_task<result_type()> task(f);
std::future<result_type> res(task.get_future());
if (local_work_queue)
{
local_work_queue->push(std::move(task));
}
else
{
pool_work_queue.push(std::move(task));
}
return res;
}
};