0
点赞
收藏
分享

微信扫一扫

基于微信小程序爱心领养小程序设计与实现(源码+参考文档+定制开发)

Java架构领域 2024-10-05 阅读 15

前言

生产者消费者模型(CP模型)是一种十分经典的设计,常常用于多执行流的并发问题中!很多书上都说他很高效,但高效体现在哪里并没有说明!本博客将详解!

目录

前言

一、生产者消费者模型

1.1 什么是生产者消费者模型?

1.2 生产者消费者模型的特点

1.3 生产者消费者模型的优点

二、基于阻塞队列实现生产者消费者模型

2.1 阻塞队列

2.2 单生产单消费模型

2.3 多生产多消费模型

三、POSIX 信号量

3.1 信号量的基本概念

3.2 信号量的相关操作

四、基于环形队列实现生产者消费者模型

4.1 环形队列

4.2 单生产单消费模型

4.3 多生产多消费模型

• 如何理解生产者消费者模型的效率高?


一、生产者消费者模型

1.1 什么是生产者消费者模型?

什么生产者、消费者、容器?感觉很复杂~!虽然听起来很难,但实际上一点也不简单!哈哈~开个玩笑!其实生产者和消费者模型还是比较简单的,OK,我们下面举个例子理解一下:


我们就以现实中的超市举例子:

超市的工作模式:

超市工作模式的好处是,超市可以提前 缓存 大量的商品!正是超市的缓存机制,可以解决:

这就做到了,允许生产消费的步调不一致,即协调生产者和消费者的 忙闲不均

在这个例子中,顾客就是消费者,供应商就是生产者,超市就是容器(提供交易的场所)!

当然,超市不可能只面向同一个顾客/供应商,而是被多个顾客和供应商同时看见的!也就是说,容器(交易场所)注定是被多执行流所看见的,即他是共享资源,在多线程环境中 ,共享资源被多执行流并发访问时是必须要保证安全的,如何保证? 同步和互斥


在现实中,市场(超市中的货架位置)是有限的,多个同一货品的供应商,为了抢占市场,都会加大促销来排挤对手,例如泡面:某师傅与统某大战

在竞争之下,势必有一家供应商失去市场,所以可以得出:生产者之间是典型的互斥关系


消费者之间是互斥关系

消费者之间,给人的感觉好像是同步关系!但是实际上他们也是互斥的,比如,你和情敌到超市都想买一个红箭的口香糖,但是只有一个了!此时你们只能拼手速抢喽~!


生产者和消费者是同步和互斥关系

比如说,有一天你想吃最喜欢的 脑残酸菜牛肉面 了, 去超市购买,结果没货了,你就失望的走了,回到宿舍刚躺下完了三分钟的手机,看到视屏里面的人在吃,你忍不了了,又跑到超市结果还没有!老板看见了你,说小伙子你加我微信吧,有货了我告诉你!于是你就回去等了,当超市供货商提供了在录入价格的期间,老板说小伙子过来买吧,现在有了!但是由于货物过多,你去的时候价格还没有录入完,老板让你等2分钟把价格录入后,再让你到货架拿 脑残酸菜牛肉面!此时体现的是互斥和同步,互斥是当没录入完价格时不允许你进入拿面,同步是没货时先让供货商供货,然后再让你购买,具有一定的顺序!


1.2 生产者消费者模型的特点

生产者消费者的模型特点,可以总结为321原则

注意 :这里的321原则并不是课本上提出的,而是我的恩师蛋哥总结的!


1.3 生产者消费者模型的优点

生产者在生产时,无需关注消费者的状态,只需关注交易场所中是否有空闲位置

消费者在消费时,无需关注生产者的状态,只需关注交易场所中是否有就绪数据

• 当生产者速度慢,消费者速度快时,可以先让生产者先生产,再让消费者消费;

• 当生产者速度快,消费者速度慢时,可以先让消费者先消费,再让生产者生产;

这个后面实现了生产者消费者模型了解释!


二、基于阻塞队列实现生产者消费者模型

上面刚介绍了,生产者消费者模型中,要有一个交易场所,一般这个场所是 阻塞队列 或者 环形队列!两者的区别是,阻塞队列 是对这个场所整体性使用,而 环形队列 是对这个交易场所划分成多个小场所使用!我们先来介绍整体性使用的即阻塞队列!

2.1 阻塞队列

阻塞队列Blocking Queue)是一种常用于生产者消费者模型的数据结构;是一种特殊的队列,具备 先进先出 FIFO 的特性,与普通的队列不同的是 阻塞队列 大小固定的,也就是存在 容量的!阻塞队列 可以为空,也可以为满

是不是和 管道十分的相似!

这也和我们当初介绍管道的特点之一:“管道内部自己维护了同步和互斥的机制”一致!

2.2 单生产单消费模型

我们先来实现一个最简单的,但生产单消费模型,首先搭建一个阻塞队列类框架

首先,我们需要一个队列,可以和C语言一样手搓,但是今天STL中有现成的,所以队列就使用 std::queue 了,因为阻塞队列是有容量大小的,所以得使用一个整数记录容量!保障,队列满/空时的阻塞,所以得使用互斥锁和条件变量实现!

所以一个基本的框架如下:

#pragma once

#include <pthread.h>
#include <queue>

template <class T>
class BlockingQueue
{
private:
    // 判断阻塞队列是否为空
    bool IsEmpty()
    {
        // ...
    }

    // 判断阻塞队列是否为满
    bool IsFull()
    {
        // ...
    }
public:
    // 构造
    BlockingQueue(int cap = default_cap)
        : _max_cap(cap)
    {
        pthread_mutex_init(&_mutex);
        pthread_cond_init(&_cond);
    }

    // 析构
    ~BlockingQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

    // 生产者 生产(入队)
    void Push(const T& in)
    {
        // ...
    }

    // 消费者 消费(出队)
    void Pop(T* out)
    {
        // ...
    }

private:
    std::queue<T> _block_queue;
    int _max_cap;           // 阻塞队列的容量
    pthread_mutex_t _mutex; // 互斥锁
    pthread_cond_t _cond;   // 条件变量(存疑)
};

 OK,大框架有了之后,我们现在的问题就是,把上述的接口实现好即可!我们一个个来:

// 判断阻塞队列是否为空
bool IsEmpty()
{
    return _block_queue.empty();
}

// 判断阻塞队列是否为满
bool IsFull()
{
    return _max_cap == _block_queue.size();
}

但是现在的条件变量只有一个,而消费者和生产者都要等待,如果有一个条件变量的话,会使得编码很复杂,所以我们再加一个条件变量,让他们等到时在各自的条件下等待

// 生产者 生产(入队)
void Push(const T &in)
{
    // 加锁
    pthread_mutex_lock(&_mutex);
    // 判断是否为满
    if(IsFull())// if ?
    {
        // 在生产者的条件下等待
        pthread_cond_wait(&_p_cond, &_mutex);
    }
    // 1、不为满 || 2、重新竞争到锁了
    _block_queue.push(in);
    // 解锁
    pthread_mutex_unlock(&_mutex);
}
// 消费者 消费(出队)
void Pop(T *out)
{
    // 加锁
    pthread_mutex_lock(&_mutex);
    // 判断是否为空
    if(IsEmpty()) // if ?
    {
        // 在消费者的条件下等待
        pthread_cond_wait(&_c_cond, &_mutex);
    }
    // 1、不为空 || 2、重新竞争到锁了
    *out = _block_queue.front();
    _block_queue.pop();
    // 解锁
    pthread_mutex_unlock(&_mutex);
}

现在有个尴尬的问题是:

知道是否可以消费的是生产者,因为当他执行完入队操作时,注定了队列中一定至少有一个元素,所以,让他唤醒消费者最合适,因为只有他可以确保队列中有数据

同理,知道是否可以生产的一定是消费者,以为当他拿走一个数据后,至少队列中有一个位置可以生产(入队),所以让他唤醒生产者~!

所以,当生产者或消费者,执行完生产/消费时,应该唤醒阻塞的对方继续操作

#pragma once

#include <pthread.h>
#include <queue>

template <class T>
class BlockingQueue
{
private:
    // 判断阻塞队列是否为空
    bool IsEmpty()
    {
        return _block_queue.empty();
    }

    // 判断阻塞队列是否为满
    bool IsFull()
    {
        return _max_cap == _block_queue.size();
    }

public:
    // 构造
    BlockingQueue(int cap = default_cap)
        : _max_cap(cap)
    {
        pthread_mutex_init(&_mutex);
        pthread_cond_init(&_c_cond);
        pthread_cond_init(&_p_cond);
    }

    // 析构
    ~BlockingQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_init(&_c_cond);
        pthread_cond_init(&_p_cond);
    }

    // 生产者 生产(入队)
    void Push(const T &in)
    {
        // 加锁
        pthread_mutex_lock(&_mutex);
        // 判断是否为满
        if (IsFull()) // if ?
        {
            // 在生产者的条件下等待
            pthread_cond_wait(&_p_cond, &_mutex);
        }
        // 1、不为满 || 2、重新竞争到锁了
        _block_queue.push(in);
        // 解锁
        pthread_mutex_unlock(&_mutex);
        // 唤醒阻塞的消费者
        pthread_cond_signal(&_c_cond);
    }

    // 消费者 消费(出队)
    void Pop(T *out)
    {
        // 加锁
        pthread_mutex_lock(&_mutex);
        // 判断是否为空
        if (IsEmpty()) // if ?
        {
            // 在消费者的条件下等待
            pthread_cond_wait(&_c_cond, &_mutex);
        }
        // 1、不为空 || 2、重新竞争到锁了
        *out = _block_queue.front();
        _block_queue.pop();
        // 解锁
        pthread_mutex_unlock(&_mutex);
        // 唤醒阻塞的生产者
        pthread_cond_signal(&_p_cond);
    }

private:
    std::queue<T> _block_queue;
    int _max_cap;           // 阻塞队列的容量
    pthread_mutex_t _mutex; // 互斥锁
    pthread_cond_t _c_cond; // 消费者条件变量
    pthread_cond_t _p_cond; // 生产者条件变量
};

单生产但单费阻塞队列,这样就封装好了!我们下面来实现一下上层的调用操作:

#include "BlockingQueue.hpp"
#include <iostream>
#include <unistd.h>
#include <ctime>

void* Consumer(void*args)
{
    BlockingQueue<int> *bq = static_cast<BlockingQueue<int> *>(args);
    while (true)
    {
        // 从阻塞队列获取数据
        int data = 0;
        bq->Pop(&data);
        // 处理数据    
        std::cout << "Consumer -> " << data << std::endl;
    }

    return nullptr;
}

void* Producer(void*args)
{
    BlockingQueue<int> *bq = static_cast<BlockingQueue<int> *>(args);
    srand(time(nullptr) ^ getpid());
    while (true)
    {
        // 生产数据
        int data = rand() % 10 + 1;
        bq->Push(data);
        // 处理数据  
        std::cout << "Producer -> " << data << std::endl;   
    }

    return nullptr;
}

int main()
{
    // 创建一个阻塞队列
    BlockingQueue<int> *bq = new BlockingQueue<int>();
    // 创建两个线程
    pthread_t c, p;
    pthread_create(&c, nullptr, Consumer, bq);// 消费者
    pthread_create(&p, nullptr, Producer, bq);// 生产者

    // 等待线程
    pthread_join(c, nullptr);
    pthread_join(p, nullptr);

    delete bq;
    return 0;
}

我们先来直接运行:

此时生产者疯狂生产,消费者疯狂消费!也就是在两者疯狂的打印,不容易看到阻塞队列的特点,为了验证阻塞队列的特点,我们采用休眠的方式,验证:

预期现象生产者一次性把队列生产满,然后每个一秒消费者打印一次,生产者生产一次

符合预期!

预期现象由于一开始生产者休眠,所以消费者阻塞,后面生产者隔一秒生产一个,同时唤醒消费者消费一个,所以就是隔一秒打印一个

OK,符合预期~!


虽然上面的单生产但消费的代码,已经可以跑起来了,但是里面还存在一些细节问题,下面我们来进行优化一下:

问题一:在阻塞队列中的Push/Pop中直接使用 if 判断条件是否满足,可能会出现问题!

理由如下:

如何解决?只需if 换成 while 即可!这样即使你造成了伪唤醒,在往下执行前会先检查!直到条件在往下继续执行:

    // 生产者 生产(入队)
    void Push(const T &in)
    {
        // 加锁
        pthread_mutex_lock(&_mutex);
        // 判断是否为满
        while (IsFull()) // if ?
        {
            // 在生产者的条件下等待
            pthread_cond_wait(&_p_cond, &_mutex);
        }
        // 1、不为满 || 2、重新竞争到锁了
        _block_queue.push(in);
        // 解锁
        pthread_mutex_unlock(&_mutex);
        // 唤醒阻塞的消费者
        pthread_cond_signal(&_c_cond);
    }

    // 消费者 消费(出队)
    void Pop(T *out)
    {
        // 加锁
        pthread_mutex_lock(&_mutex);
        // 判断是否为空
        while (IsEmpty()) // if ?
        {
            // 在消费者的条件下等待
            pthread_cond_wait(&_c_cond, &_mutex);
        }
        // 1、不为空 || 2、重新竞争到锁了
        *out = _block_queue.front();
        _block_queue.pop();
        // 解锁
        pthread_mutex_unlock(&_mutex);
        // 唤醒阻塞的生产者
        pthread_cond_signal(&_p_cond);
    }

问题二:在Push/Pop后,需要唤醒对方来执行!唤醒在解锁前后有影响吗?

答案是:没有影响!唤醒对方,在解锁前后都可以!原因是:

所以在解锁前后唤醒对方是没有影响的!

问题三:阻塞队列的任务中只能放 int 这样的整数吗?

当然不是!我们写的是模板呀!数据类型是T,T可以是int这样的整数,当然也可以是自定义类的对象喽!

#pragma once

#include <iostream>

class Task
{
public:
    Task(int x, int y)
        :_x(x),_y(y)
    {}

    Task(){}

    std::string debug()
    {
        return std::to_string(_x) + "+"+std::to_string(_y) +"=" +"?";
    }

    void Excute()
    {
        _result = _x + _y;
    }

    void operator()()
    {
        Excute();
    }

    std::string result()
    {
        std::string msg = std::to_string(_x) + "+"+std::to_string(_y) +"=" + std::to_string(_result);
        return msg;
    }
    
private:
    int _x;
    int _y;
    int _result;
};

此时我们可以:让生产者给两个数,让你消费者计算:

看效果:

格局打开,这里只是放了一个简单计算的任务,我们实际还可以放入更复杂的任务!

比如 网络请求、SQL查询、并行 IO尤其是 IO,使用 「生产者消费者模型」 可以大大提高效率,包括后面的 多路转接,也可以接入 「生产者消费者模型」 来提高效率!


2.3 多生产多消费模型

基于上面的介绍,我们可以实现多生产多消费模型了!其实,经过上面的修改,我们不需要修改上面的代码直接可以适应多生产多消费的场景

OK,我们先来实验一下:

int main()
{
    // 创建一个阻塞队列
    BlockingQueue<Task> *bq = new BlockingQueue<Task>();
    // 创建两个线程
    pthread_t c1, c2, p1, p2, p3;
    pthread_create(&c1, nullptr, Consumer, bq); // 消费者
    pthread_create(&c2, nullptr, Consumer, bq); // 消费者
    pthread_create(&p1, nullptr, Producer, bq); // 生产者
    pthread_create(&p2, nullptr, Producer, bq); // 生产者
    pthread_create(&p3, nullptr, Producer, bq); // 生产者

    // 等待线程
    pthread_join(c1, nullptr);
    pthread_join(c2, nullptr);
    pthread_join(p1, nullptr);
    pthread_join(p2, nullptr);
    pthread_join(p3, nullptr);

    delete bq;
    return 0;
}

当然这可能会造成屏幕的打印错乱问题,这是因为显示器本质也是文件不同线程向同一个文件写入显示器不就是临界资源吗?所以理论上也要对显示器作保护的!


原因很简单:

生产者消费者、都是对同一个阻塞队列做操作,而阻塞队列是整体使用,即每次只允许一个持有锁的线程访问,所以即使多线程过来,也是得先竞争锁资源的!而互斥锁保证了他们是串行的~!

OK,以上就是基于阻塞队列实现的生产者消费者模型了!下面我们来介绍信号量和用信号量基于循环队列的生产者消费者模型!


三、POSIX 信号量

3.1 信号量的基本概念

互斥和同步 不是只能由 互斥锁和条件变量 实现,还能通过 信号量 sem互斥锁实现(出自POSIX标准)

信号量 的本质是一个 计数器描述临界资源中资源数目的计数器

信号量是描述临界资源数目的,但他也是被所有的线程所共享,即信号量是临界资源,所以对信号量的 PV 操作必须是原子的

如果我们把「生产者消费者模型」中某一临界资源 整体性使用,那他的信号量的值就是 1

此时的信号量只有两态,即 1 / 0 ,可以实现互斥锁的效果,即实现线程互斥!像这种只有两态的信号量被称为 二元信号量/二进制信号量

如果我们把「生产者消费者模型」中某一临界资源 分成N份使,那信号量的值就是N:

像这种信号量的值被初始化为N,的信号量被称为多元信号量/计数信号量

当一个线程想要访问临界资源时,就必须要申请信号量,当申请成功,继续执行;否则,就阻塞等待,直到有信号量资源可用!如此一来就可以和 条件变量 一样实现 同步 了!

其实 「信号量」 的工作机制类似于 买电影票,是一种 预订机制,只要你买到票了,即使你晚点到达电影院,你的位置也始终可用,买到票的本质是将对应的座位进行了预订!详见:Linux IPC-System V


3.2 信号量的相关操作

有了之前 互斥锁、条件变量 的使用基础,信号量 的接口学习是非常简单的,依旧是只有四个接口:初始化、销毁、申请、释放

初始化信号量

#include <semaphore.h>

int sem_init(sem_t *sem, int pshared, unsigned int value);

参数

返回值

销毁信号量

#include <semaphore.h>

int sem_destroy(sem_t *sem);

参数

申请信号量(P操作--)

#include <semaphore.h>

int sem_wait(sem_t *sem);

int sem_trywait(sem_t *sem);

int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);

主要使用 :int sem_wait(sem_t *sem);

参数

释放信号量(V操作++)

#include <semaphore.h>

int sem_post(sem_t *sem);

参数

这批接口属于是看一眼就会用,再多看一眼就会爆炸!所以我们直接来基于上述的接口实现生产者消费者模型!


四、基于环形队列实现生产者消费者模型

4.1 环形队列

生产者消费者模型 中的交易场所是可更换的,不仅可以使用 阻塞队列,还可以使用 环形队列 !

关于环形队列,这里不在多哔哔,我在数据结构的时候手撕过:DS线性表之栈和队列

数组实现的环形队列,麻烦的就是如何判断空和满?判断的方式有两种,以前我在博客中提到了第一种:

1、多开一个空间,当tail + 1 == head 时,表示满;当tail == head 的时候,表示空;

2、搞一个计数器,当计数器的值为 0 时,表示当前为空,当计数器的值为容量时,表示队列为满

这两种策略都可以确保 环形队列 正确判空和判满,至于这里肯定是选择策略二,因为 「信号量」 本身就是一个天然的计数器

环形队列 中,生产者消费者 关心的资源不一样:生产者只关心是否有空间放数据,消费者只关心是否能从空间中取到数据;所以可以搞两个信号量分别标识生产者和消费者的资源数!

OK,有了上述的理解,就可以去实现了,我们还是先单生产但消费,然后多生产多消费!

4.2 单生产单消费模型

我们定义一个数组作为循环队列的底层缓冲区,定义两个信号量分别是空间和数据,定义生产者和消费者的下标!为了操作和理解,我们将sem_wait和sem_post封装成P和V:

#pragma once

#include <pthread.h>
#include <semaphore.h>
#include <vector>

const static int default_cap = 5;

template <class T>
class RingQueue
{
private:
    void P(sem_t& s)
    {
        sem_wait(&s);
    }

    void V(sem_t& s)
    {
        sem_post(&s);
    }

public:
    RingQueue(int cap = default_cap)
        : _ring_queue(cap),_max_cap(cap),_c_step(0),_p_step(0)
    {
        sem_init(&_space, 0, _max_cap);
        sem_init(&_data, 0, 0);
    }

    ~RingQueue()   
    {
        sem_destroy(&_space);
        sem_destroy(&_data);
    }

    void Push(const T& in)
    {
        // 申请信号量
        P(_space);
        // 生产
        _ring_queue[_p_step] = in;
        _p_step++;
        _p_step %= _max_cap;
        // 释放信号量
        V(_data);
    }

    void Pop(T* out)
    {
        // 申请信号量
        P(_data);
        // 消费
        *out = _ring_queue[_c_step];
        _c_step++;
        _c_step %= _max_cap;
        // 释放信号量
        V(_space);
    }

private:
    std::vector<T> _ring_queue;
    int _max_cap;//容量

    sem_t _space;// 空间信号量
    sem_t _data;// 数据信号量

    int _c_step;// 消费者下标
    int _p_step;// 生产者下标
};

我们还是先用数字测试:

#include "RingQueue.hpp"
#include <iostream>
#include <unistd.h>
#include <ctime>

void *Consumer(void *args)
{
    RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);
    while (true)
    {
        // 获取数据
        int data = 0;
        rq->Pop(&data);
        // 处理数据
        std::cout << "Consumer -> " << data << std::endl;
        sleep(1);
    }

    return 0;
}

void *Producer(void *args)
{
    RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);
    while (true)
    {
        // 生产数据
        int data = rand() % 10 + 1;
        rq->Push(data);
        // 处理数据
        std::cout << "Producer -> " << data << std::endl;
        sleep(1);
    }

    return 0;
}

int main()
{
    srand(time(nullptr));
    RingQueue<int> *rq = new RingQueue<int>();
    
    pthread_t c,p;
    pthread_create(&c, nullptr, Producer,rq);
    pthread_create(&p, nullptr, Consumer,rq);

    // 等待线程
    pthread_join(c, nullptr);
    pthread_join(p, nullptr);
    return 0;
}

为了避免刷屏的效果,我们先各一秒,然后生产和消费:

让消费者一秒读取一次;预期现象:生产者先生成满,然后一秒过后,消费一个,生产一个:

当然,让生产者一秒生产一个,消费者不休眠,现象就是生成一个,消费一个:

细节问题:在没有 互斥锁 的情况下,是如何 确保生产者与消费者间的互斥关系的?

当然我们可不止只会生产和消费整数,因为是模板所以,可以是任意的类型,我们呢可以把上面的阻塞队列的Task.hpp的任务拿过来直接测试:

#pragma once

#include <iostream>

class Task
{
public:
    Task(int x, int y)
        :_x(x),_y(y)
    {}

    Task(){}

    std::string debug()
    {
        return std::to_string(_x) + "+"+std::to_string(_y) +"=" +"?";
    }

    void Excute()
    {
        _result = _x + _y;
    }

    void operator()()
    {
        Excute();
    }

    std::string result()
    {
        std::string msg = std::to_string(_x) + "+"+std::to_string(_y) +"=" + std::to_string(_result);
        return msg;
    }
    
private:
    int _x;
    int _y;
    int _result;
};

这里的运行结果与 阻塞队列 那边的一模一样,证明当前的 「生产者消费者模型」 没有问题(单生产单消费场景中)

注:如果想要提高并发度,可以增大环形队列的容量


4.3 多生产多消费模型

信号量的 PV 操作,保证的是 生产者和消费者 在任意时刻访问 循环队列 时只有一个线程可操作!也就是他保证的是 生产者和消费者之间 的互斥

但是现在是,多执行流即多生产多消费,他们申请到信号量进行 生产/消费 时可能会出现问题,原因是:生产/消费的下标各是一个,也就是会造成对于同一资源的破坏如何解决 生产和生产/消费和消费 之间的 互斥关系 呢?互斥锁

现在的问题是加几把锁?

答案是:两把!因为生产者和消费者关注的资源是不一样!

#pragma once

#include <pthread.h>
#include <semaphore.h>
#include <vector>

const static int default_cap = 5;

template <class T>
class RingQueue
{
private:
    void P(sem_t &s)
    {
        sem_wait(&s);
    }

    void V(sem_t &s)
    {
        sem_post(&s);
    }

    void Lock(pthread_mutex_t& mutex)
    {
        pthread_mutex_lock(&mutex);
    }
    
    void UnLock(pthread_mutex_t& mutex)
    {
        pthread_mutex_unlock(&mutex);
    }

public:
    RingQueue(int cap = default_cap)
        : _ring_queue(cap), _max_cap(cap), _c_step(0), _p_step(0)
    {
        sem_init(&_space, 0, _max_cap);
        sem_init(&_data, 0, 0);

        pthread_mutex_init(&_c_mutex, nullptr);
        pthread_mutex_init(&_p_mutex, nullptr);
    }

    ~RingQueue()
    {
        sem_destroy(&_space);
        sem_destroy(&_data);
        pthread_mutex_destroy(&_c_mutex);
        pthread_mutex_destroy(&_p_mutex);
    }

    void Push(const T &in)
    {
        // 申请信号量
        P(_space);
        // 加锁
        Lock(_p_mutex);
        // 生产
        _ring_queue[_p_step] = in;
        _p_step++;
        _p_step %= _max_cap;
        // 解锁
        UnLock(_p_mutex);
        // 释放信号量
        V(_data);
    }

    void Pop(T *out)
    {
        // 申请信号量
        P(_data);
        // 加锁
        Lock(_c_mutex);
        // 消费
        *out = _ring_queue[_c_step];
        _c_step++;
        _c_step %= _max_cap;
        // 解锁
        UnLock(_c_mutex);
        // 释放信号量
        V(_space);
    }

private:
    std::vector<T> _ring_queue;
    int _max_cap; // 容量

    sem_t _space; // 空间信号量
    sem_t _data;  // 数据信号量

    int _c_step; // 消费者下标
    int _p_step; // 生产者下标

    pthread_mutex_t _c_mutex; // 消费者互斥锁
    pthread_mutex_t _p_mutex; // 生产者互斥锁
};

让消费者先休眠一秒,然后现象应该是 :先生成满,然后一秒消费一个,生产一个:

细节1: 在信号量申请成功之后 加锁,可以提高并发度

上述的,为了防止 生产和生产/消费和消费 对同一资源破坏,需要加互斥锁让他们串行!加锁的位置有两种:1、在申请信号量前加锁 2、在申请信号量以后加锁 这两种都是可以的!但是后者更优!原因如下:

这就好比,你去看电影,你是到时候进放映厅的时候再买票,还是先买票到时候直接进去?

当然是后者喽!原因是信号量的PV本身就是原子的,所以不会出错!所以可以提前申请好信号到时候竞争互斥锁串行访问,即可

细节2:为什么在申请信号量的时候,不需要判断一下条件是否满足?

信号量本质就是一个资源数目的计数器!是一种资源的预定机制!

预定就体现在:可以不判断是否满足,就可以知道内部资源的情况!申请信号量本身就是在条件判断


• 如何理解生产者消费者模型的效率高?

阻塞队列 中,每一个线程执行操作都必须先得加锁,也就是串行操作!循环队列 中,多线程即使并发申请到了信号量,最后也是得申请锁,串行执行操作的!这好像也没多高效吧!这样看确实!但是上面刚说了,我们生产者和消费者不只是处理的是 int 这样的整数,而是大多可能执行的是 网络请求、SQL查询、并行 IO 等,而请求和处理这些操作 本身是很费时间的!

当一个线程在 请求完正在处理这个任务 的同时(花费时间),其他线程去请求,这样不就大大的提高了 并发度 吗?不就是提高了,效率吗!而在这种比较费时间的操作下,加锁和解锁的时间也是可以忽略的!即书上所说的,生产者消费者模型的高效是体现在这里


OK,好兄弟这就是CP模型的所有内容了,我是 cp 我们下期见~!

举报

相关推荐

0 条评论