条件变量
layout: post title: 条件变量——std::condition_variable categories: cpp_concurrency description: C++并发编程简介 keywords: c++, 并发编程,std::condition_variable,条件变量
- 条件变量
- keywords: c++, 并发编程,std::condition\_variable,条件变量
- 假唤醒
- 关于 wait\_for 是否使用第三个参数的讨论
- condition\_variable源码
条件变量是另外一种用来同步的方法。而所谓条件变量,就是说在某一条件达成的时候线程可以继续往下执行,否则将一直等待下去。C++提供了2种条件变量,这里只说condition_variable(因为condition_variable_any 我也没研究过~)。虽然条件变量可以用来进行同步,但是其同步功能还是需要mutex来完成,它自己只提供条件唤醒、等待等操作,因此条件变量一般用于条件等待、唤醒等场合。
假唤醒
- segmentfault参考
所谓假唤醒,就是被唤醒的时候其条件不符合被唤醒的预期。在使用条件变量的时候造成假唤醒的典型代码如下:
std::unique_lock ulock(mutex);
if(!state)
{
cv.wait(ulock);
}
上面的代码当第一次加锁成功后,由于state为false,此时条件变量将进入等待状态,直到下一次被唤醒。但是,由于系统底层的信号问题,导致他被唤醒的时候,其state并不一定是true(可能被唤醒之前是true但是又被其他线程修改为false了),导致线程被唤醒,代码继续往下执行,但是条件与我们的预期不符,这种情况好一点的可能啥错都不会发生,但是如果是具有任务队列的线程,那么可能由于队列为空就会造成未定义的行为直接导致程序崩溃。
要解决上面的问题,应该使用while循环,也就是如下的代码:
std::unique_lock ulock(mutex);
while(!state)
{
cv.wait(ulock);
}
因为当线程被唤醒的时候,cv是处于加锁的状态,此时如果state为false,那么它将会再次进入休眠状态并释放锁,等待下次的被唤醒。如果为true则与我们的预期相符,执行后面的代码。其实标准库有一个上面代码的简化版本——接收2个参数的wait成员函数,实例如下:
std::unique_lock ulock(mutex);
cv.wait(ulock,[&state](){return !state;});
实际上,接收2个参数的wait,也是在其内部一直进行while循环等待而已。
条件变量的其他用法可以参考《条件变量和互斥锁实现信号量》。
关于 wait_for 是否使用第三个参数的讨论
当我们要结束线程时可以选择在析构函数中使用条件变量通知线程。一般情况下,线程总是要执行某些任务的,有时候在线程空闲的时候我们选择sleep,有时候则选择使用条件变量进行等待。其实,C++11提供了 wait_for 满足前面两种情况。C++11的wait_for有两个版本,一个是接受2个参数的,其中第二个参数是表示超时时间的;另一个版本的接收3个参数,第二个参数表示超时时间,第三个参数是一个条件,当线程被唤醒或者进入wait的时候,就会循环检查条件以及是否超时,其底层代码如下:
template<typename _Lock, typename _Clock,typename _Duration, typename _Predicate>
bool wait_until(_Lock& __lock,const chrono::time_point<_Clock, _Duration>& __atime,_Predicate __p)
{
while (!__p())
{
if (wait_until(__lock, __atime) == cv_status::timeout)
return __p();
}
return true;
}
wait_for
被唤醒的条件有2个:
- 通过
notify_one
或者notify_all
唤醒 - 等待超时
如果我们的子线程有任务需要执行,又需要通知它结束的时候尽早退出,该怎么做的?答案是使用第三个参数的wait_for,示例代码如下:
class task
{
public:
task(const int64_t interval = 60*10):m_interval(interval),m_stop(false)
{
m_thread = std::thread([this](){
while (!this->m_stop)
{
timer_scope t;
std::cout << "will sleep for " << this->m_interval << ", and wait for condition" << std::endl;
std::unique_lock<std::mutex> ulock(this->m_mutex);
this->m_cv.wait_for(ulock,std::chrono::seconds(this->m_interval),[this](){
return this->m_stop.load();
});
}
});
}
~task()
{
stop();
if (m_thread.joinable())
{
m_thread.join();
}
}
void stop()
{
m_stop = true;
std::unique_lock<std::mutex> ulock(this->m_mutex);
m_cv.notify_one();
}
private:
int64_t m_interval;
std::thread m_thread;
std::condition_variable m_cv;
std::mutex m_mutex;
std::atomic<bool> m_stop;
};
condition_variable源码
class condition_variable { // class for waiting for conditions
public:
using native_handle_type = _Cnd_t;
condition_variable() {
_Cnd_init_in_situ(_Mycnd());
}
~condition_variable() noexcept {
_Cnd_destroy_in_situ(_Mycnd());
}
condition_variable(const condition_variable&) = delete;
condition_variable& operator=(const condition_variable&) = delete;
void notify_one() noexcept { // wake up one waiter
_Cnd_signal(_Mycnd());
}
void notify_all() noexcept { // wake up all waiters
_Cnd_broadcast(_Mycnd());
}
void wait(unique_lock<mutex>& _Lck) { // wait for signal
// Nothing to do to comply with LWG-2135 because std::mutex lock/unlock are nothrow
_Cnd_wait(_Mycnd(), _Lck.mutex()->_Mymtx());
}
template <class _Predicate>
void wait(unique_lock<mutex>& _Lck, _Predicate _Pred) { // wait for signal and test predicate
while (!_Pred()) {
wait(_Lck);
}
}
template <class _Rep, class _Period>
cv_status wait_for(unique_lock<mutex>& _Lck, const chrono::duration<_Rep, _Period>& _Rel_time) {
// wait for duration
if (_Rel_time <= chrono::duration<_Rep, _Period>::zero()) {
return cv_status::timeout;
}
// TRANSITION, ABI: The standard says that we should use a steady clock,
// but unfortunately our ABI speaks struct xtime, which is relative to the system clock.
_CSTD xtime _Tgt;
const bool _Clamped = _To_xtime_10_day_clamped(_Tgt, _Rel_time);
const cv_status _Result = wait_until(_Lck, &_Tgt);
if (_Clamped) {
return cv_status::no_timeout;
}
return _Result;
}
template <class _Rep, class _Period, class _Predicate>
bool wait_for(unique_lock<mutex>& _Lck, const chrono::duration<_Rep, _Period>& _Rel_time, _Predicate _Pred) {
// wait for signal with timeout and check predicate
return _Wait_until1(_Lck, chrono::steady_clock::now() + _Rel_time, _Pred);
}
template <class _Clock, class _Duration>
cv_status wait_until(unique_lock<mutex>& _Lck, const chrono::time_point<_Clock, _Duration>& _Abs_time) {
// wait until time point
for (;;) {
const auto _Now = _Clock::now();
if (_Abs_time <= _Now) {
return cv_status::timeout;
}
_CSTD xtime _Tgt;
(void) _To_xtime_10_day_clamped(_Tgt, _Abs_time - _Now);
const cv_status _Result = wait_until(_Lck, &_Tgt);
if (_Result == cv_status::no_timeout) {
return cv_status::no_timeout;
}
}
}
template <class _Clock, class _Duration, class _Predicate>
bool wait_until(
unique_lock<mutex>& _Lck, const chrono::time_point<_Clock, _Duration>& _Abs_time, _Predicate _Pred) {
// wait for signal with timeout and check predicate
return _Wait_until1(_Lck, _Abs_time, _Pred);
}
cv_status wait_until(unique_lock<mutex>& _Lck, const xtime* _Abs_time) {
// wait for signal with timeout
if (!_Mtx_current_owns(_Lck.mutex()->_Mymtx())) {
_Throw_Cpp_error(_OPERATION_NOT_PERMITTED);
}
// Nothing to do to comply with LWG-2135 because std::mutex lock/unlock are nothrow
const int _Res = _Cnd_timedwait(_Mycnd(), _Lck.mutex()->_Mymtx(), _Abs_time);
switch (_Res) {
case _Thrd_success:
return cv_status::no_timeout;
case _Thrd_timedout:
return cv_status::timeout;
default:
_Throw_C_error(_Res);
}
}
template <class _Predicate>
bool wait_until(unique_lock<mutex>& _Lck, const xtime* _Abs_time, _Predicate _Pred) {
// wait for signal with timeout and check predicate
return _Wait_until1(_Lck, _Abs_time, _Pred);
}
_NODISCARD native_handle_type native_handle() {
return _Mycnd();
}
void _Register(unique_lock<mutex>& _Lck, int* _Ready) { // register this object for release at thread exit
_Cnd_register_at_thread_exit(_Mycnd(), _Lck.release()->_Mymtx(), _Ready);
}
void _Unregister(mutex& _Mtx) { // unregister this object for release at thread exit
_Cnd_unregister_at_thread_exit(_Mtx._Mymtx());
}
private:
aligned_storage_t<_Cnd_internal_imp_size, _Cnd_internal_imp_alignment> _Cnd_storage;
_Cnd_t _Mycnd() noexcept { // get pointer to _Cnd_internal_imp_t inside _Cnd_storage
return reinterpret_cast<_Cnd_t>(&_Cnd_storage);
}
template <class _Predicate>
bool _Wait_until1(unique_lock<mutex>& _Lck, const xtime* _Abs_time, _Predicate& _Pred) {
// wait for signal with timeout and check predicate
while (!_Pred()) {
if (wait_until(_Lck, _Abs_time) == cv_status::timeout) {
return _Pred();
}
}
return true;
}
template <class _Clock, class _Duration, class _Predicate>
bool _Wait_until1(
unique_lock<mutex>& _Lck, const chrono::time_point<_Clock, _Duration>& _Abs_time, _Predicate& _Pred) {
while (!_Pred()) {
const auto _Now = _Clock::now();
if (_Abs_time <= _Now) {
return false;
}
_CSTD xtime _Tgt;
const bool _Clamped = _To_xtime_10_day_clamped(_Tgt, _Abs_time - _Now);
if (wait_until(_Lck, &_Tgt) == cv_status::timeout && !_Clamped) {
return _Pred();
}
}
return true;
}
};