0
点赞
收藏
分享

微信扫一扫

C++11新特性之并发

安七月读书 03-31 22:30 阅读 2

std::thread

c++11引入了std::thread来创建线程,支持对线程join或者detach。直接看代码:

#include <iostream>
#include <thread>

void threadFunction() {
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "Inside thread function!" << std::endl;
}

int main() {
    // 创建一个线程并立即执行
    std::thread t(threadFunction);

    // 等待线程执行完毕
    t.join();
    std::cout << "Thread has finished execution and joined." << std::endl;

    // 创建另一个线程并分离
    std::thread t2(threadFunction);
    t2.detach(); // 分离线程

    // 主线程继续执行其他工作
    std::this_thread::sleep_for(std::chrono::seconds(2));
    std::cout << "Main thread continues to work." << std::endl;

    return 0;
}

我们创建了一个线程t,然后调用join函数等待线程执行完毕。接着我们创建了另一个线程t2,然后调用detach函数来分离该线程。当线程被分离后,它会在后台继续执行,与主线程独立运行,主线程也可以继续执行其他工作。

如果没有调用join或者detach函数,假如线程函数执行时间较长,此时线程对象的生命周期结束调用析构函数清理资源,这时可能会发生错误,这里有两种解决办法,一个是调用join(),保证线程函数的生命周期和线程对象的生命周期相同,另一个是调用detach(),将线程和线程对象分离,这里需要注意,如果线程已经和对象分离,那我们就再也无法控制线程什么时候结束了,不能再通过join来等待线程执行完。

std::mutex

std::mutex是一种线程同步的手段,用于保存多线程同时操作的共享数据。

mutex分为四种:

  • std::mutex:独占的互斥量,不能递归使用,不带超时功能
  • std::recursive_mutex:递归互斥量,可重入,不带超时功能
  • std::timed_mutex:带超时的互斥量,不能递归
  • std::recursive_timed_mutex:带超时的互斥量,可以递归使用
#include <iostream>
#include <thread>
#include <mutex>

std::mutex mtx; // 定义一个互斥锁
int sharedResource = 0;

void incrementSharedResource() {
    for (int i = 0; i < 1000000; ++i) {
        mtx.lock(); // 线程尝试获取互斥锁
        sharedResource++; // 访问共享资源
        mtx.unlock(); // 释放互斥锁
    }
}
int main() {
    std::thread t1(incrementSharedResource);
    std::thread t2(incrementSharedResource);

    t1.join();
    t2.join();

    std::cout << "Final value of shared resource: " << sharedResource << std::endl;
    return 0;
}

在该示例中,两个线程 t1 和 t2 同时调用 incrementSharedResource 函数来增加 sharedResource 的值。由于我们使用了 std::mutex,因此在每次访问 sharedResource 之前,线程都会尝试获取互斥锁,然后再释放互斥锁。这样就保证了在同一时刻只有一个线程能够修改 sharedResource 的值,从而避免了数据竞争和不确定的结果。

std::lock

主要介绍两种RAII方式的锁封装,可以动态的释放锁资源,防止线程由于编码失误导致一直持有锁。

c++11主要有std::lock_guard和std::unique_lock两种方式,使用方式都类似。

std::lock_guard:

std::lock_guard是一个轻量级的RAII封装,用于自动管理std::mutex的锁定和解锁。当创建std::lock_guard对象时,它会在构造函数中锁定std::mutex,在析构函数中自动释放锁。(RAII)

std::lock_guard适用于需要在作用域内锁定std::mutex的简单场景,不支持手动解锁。

#include <iostream>
#include <thread>
#include <mutex>

std::mutex mtx;
int sharedData = 0;

void incrementData() {
    std::lock_guard<std::mutex> lock(mtx);
    sharedData++;
}

int main() {
    std::thread t1(incrementData);
    std::thread t2(incrementData);

    t1.join();
    t2.join();

    std::cout << "Final value of shared data: " << sharedData << std::endl;

    return 0;
}

std::unique_lock

std::unique_lock是比std::lock_guard更灵活的互斥锁管理类,提供了更多的功能。可以手动锁定和解锁std::mutex,也可以延迟锁定,支持条件变量等功能。相比std::lock_guard,std::unique_lock的开销略大,但提供了更多的灵活性。需要配合以下的条件变量使用。

std::atomic

原子类型对象的主要特点就是从不同线程访问不会导致数据竞争(data race)。因此从不同线程访问某个原子对象是良性 (well-defined) 行为,而通常对于非原子类型而言,并发访问某个对象(如果不做任何同步操作)会导致未定义 (undifined) 行为发生。

atomic<long> total = 0;

std::condition_variable

条件变量是c++11引入的一种同步机制,它可以阻塞一个线程或者个线程,直到有线程通知或者超时才会唤醒正在阻塞的线程,条件变量需要和锁配合使用,这里的锁就是上面的std::unique_lock。

#include <iostream>                // std::cout
#include <thread>                // std::thread
#include <mutex>                // std::mutex, std::unique_lock
#include <condition_variable>    // std::condition_variable

std::mutex mtx; // 全局互斥锁.
std::condition_variable cv; // 全局条件变量.
bool ready = false; // 全局标志位.

void do_print_id(int id)
{
    std::unique_lock <std::mutex> lck(mtx);
    while (!ready) // 如果标志位不为 true, 则等待...
        cv.wait(lck); // 当前线程被阻塞, 当全局标志位变为 true 之后,
    // 线程被唤醒, 继续往下执行打印线程编号id.
    std::cout << "thread " << id << '\n';
}

void go()
{
    std::unique_lock <std::mutex> lck(mtx);
    ready = true; // 设置全局标志位为 true.
    cv.notify_all(); // 唤醒所有线程.
}

int main()
{
    std::thread threads[10];
    // spawn 10 threads:
    for (int i = 0; i < 10; ++i)
        threads[i] = std::thread(do_print_id, i);

    std::cout << "10 threads ready to race...\n";
    go(); // go!

  for (auto & th:threads)
        th.join();

    return 0;
}

在线程被阻塞时,该函数会自动调用 lck.unlock() 释放锁,使得其他被阻塞在锁竞争上的线程得以继续执行。另外,一旦当前线程获得通知(notified,通常是另外某个线程调用 notify_* 唤醒了当前线程),wait() 函数也是自动调用 lck.lock(),使得 lck 的状态和 wait 函数被调用时相同。

若设置了Predicate,只有当 pred 条件为 false 时调用 wait() 才会阻塞当前线程,并且在收到其他线程的通知后只有当 pred 为 true 时才会被解除阻塞。

while (!ready) wait(lck);
//和以下等价
cv.wait(lck, [] {return ready; });

std::condition_variable::notify_one() :唤醒某个等待(wait)线程。如果当前没有等待线程,则该函数什么也不做,如果同时存在多个等待线程,则唤醒某个线程是不确定的(unspecified)。

std::future

c++11关于异步操作提供了future相关的类,主要有std::future、std::promise和std::packaged_task,std::future比std::thread高级些,std::future作为异步结果的传输通道,通过get()可以很方便的获取线程函数的返回值,std::promise用来包装一个值,将数据和future绑定起来,而std::packaged_task则用来包装一个调用对象,将函数和future绑定起来,方便异步调用。而std::future是不可以复制的,如果需要复制放到容器中可以使用std::shared_future。

std::future 可以用来获取异步任务的结果,因此可以把它当成一种简单的线程间同步的手段。std::future 通常由某个 Provider 创建,你可以把 Provider 想象成一个异步任务的提供者,Provider 在某个线程中设置共享状态的值,与该共享状态相关联的 std::future 对象调用 get(通常在另外一个线程中) 获取该值,如果共享状态的标志不为 ready,则调用 std::future::get 会阻塞当前的调用者,直到 Provider 设置了共享状态的值(此时共享状态的标志变为 ready),std::future::get 返回异步任务的值或异常(如果发生了异常)。

一个有效(valid)的 std::future 对象通常由以下三种 Provider 创建,并和某个共享状态相关联。Provider 可以是函数或者类,其实我们前面都已经提到了,他们分别是:

  • std::async 函数
  • std::promise::get_future,get_future 为 promise 类的成员函数
  • std::packaged_task::get_future,此时 get_future为 packaged_task 的成员函数

promise 对象可以保存某一类型 T 的值,该值可被 future 对象读取(可能在另外一个线程中),因此 promise 也提供了一种线程同步的手段。在 promise 对象构造时可以和一个共享状态(通常是std::future)相关联,并可以在相关联的共享状态(std::future)上保存一个类型为 T 的值。

#include <iostream>       // std::cout
#include <functional>     // std::ref
#include <thread>         // std::thread
#include <future>         // std::promise, std::future

void print_int(std::future<int>& fut) {
    int x = fut.get(); // 获取共享状态的值.
    std::cout << "value: " << x << '\n'; // 打印 value: 10.
}

int main()
{
    std::promise<int> prom; // 生成一个 std::promise<int> 对象.
    std::future<int> fut = prom.get_future(); // 和 future 关联.
    std::thread t(print_int, std::ref(fut)); // 将 future 交给另外一个线程t.std::ref模板传参传递引用
    prom.set_value(10); // 设置共享状态的值, 此处和线程t保持同步.
    t.join();
    return 0;
}

std::packaged_task 包装一个可调用的对象,并且允许异步获取该可调用对象产生的结果,从包装可调用对象意义上来讲,std::packaged_task 与 std::function 类似,只不过 std::packaged_task 将其包装的可调用对象的执行结果传递给一个 std::future 对象(该对象通常在另外一个线程中获取 std::packaged_task 任务的执行结果)。

std::packaged_task 对象内部包含了两个最基本元素,一、被包装的任务(stored task),任务(task)是一个可调用的对象,如函数指针、成员函数指针或者函数对象,二、共享状态(shared state),用于保存任务的返回值,可以通过 std::future 对象来达到异步访问共享状态的效果。

std::packaged_task 的共享状态的生命周期一直持续到最后一个与之相关联的对象被释放或者销毁为止。

#include <iostream>     // std::cout
#include <future>       // std::packaged_task, std::future
#include <chrono>       // std::chrono::seconds
#include <thread>       // std::thread, std::this_thread::sleep_for

int countdown(int from, int to) {
    for (int i = from; i != to; --i) {
        std::cout << i << '\n';
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
    std::cout << "Finished!\n";
    return from - to;
}

int main()
{
    std::packaged_task<int(int, int)> task(countdown); // 设置 packaged_task
    std::future<int> ret = task.get_future(); // 获得与 packaged_task 共享状态相关联的 future 对象.
    std::thread th(std::move(task), 10, 0);   //创建一个新线程完成计数任务.
    int value = ret.get();                    // 等待任务完成并获取结果.
    std::cout << "The countdown lasted for " << value << " seconds.\n";
    th.join();
    return 0;
}

promise包装的是一个值,packaged_task包装的是一个函数,当需要获取线程中的某个值,可以使用std::promise,当需要获取线程函数返回值,可以使用std::packaged_task。async是比future,packaged_task,promise更高级的东西,它是基于任务的异步操作,通过async可以直接创建异步的任务,返回的结果会保存在future中,不需要像packaged_task和promise那么麻烦,关于线程操作应该优先使用async。

#include <functional>
#include <future>
#include <iostream>
#include <thread>

using namespace std;

int func(int in) { return in + 1; }

int main() {
    auto res = std::async(func, 5);
    // res.wait();
    cout << res.get() << endl; // 阻塞直到函数返回
    return 0;
}

async具体语法如下:

async(std::launch::async | std::launch::deferred, func, args...);

第一个参数是创建策略:

  • std::launch::async表示任务执行在另一线程
  • std::launch::deferred表示延迟执行任务,调用get或者wait时才会执行,不会创建线程,惰性执行在当前线程。

如果不明确指定创建策略,以上两个都不是async的默认策略,而是未定义,它是一个基于任务的程序设计,内部有一个调度器(线程池),会根据实际情况决定采用哪种策略。

若从 std::async 获得的 std::future 未被移动或绑定到引用,则在完整表达式结尾, std::future的析构函数将阻塞直至异步计算完成,实际上相当于同步操作。在需要获取异步任务结果的时候,调用 result.get() 方法来获取结果,这里会阻塞直到异步任务完成。合理地延长 std::future 对象的生命周期。

有时候我们如果想真正执行异步操作可以对async进行封装,强制使用std::launch::async策略来调用async。

template <typename F, typename... Args>
inline auto ReallyAsync(F&& f, Args&&... params) {
    return std::async(std::launch::async, std::forward<F>(f), std::forward<Args>(params)...);
}
举报

相关推荐

0 条评论