linux之线程同步(生产消费模型)
为什么有同步
面对共享资源被多个线程访问,有个解决办法就是使用互斥锁!
那么是不是使用互斥锁就可以了呢?就可以解决所有的问题?例如数据不一致问题,线程安全问题——不是!
加锁也有自己的应用场景,不是所有的场景都是时候加锁的!例如下面的代码
#include<iostream>
#include<pthread.h>
//这是一个模拟抢票的逻辑
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
int tickets = 1000;//充当有1000张票
void* getTicket(void* args)
{
std::string username = static_cast<const char*>(args);
while(true)
{
pthread_mutex_lock(&lock);
if (tickets > 0)
{
usleep(1245);
std::cout << username << " 正在抢票" << tickets << std::endl;
tickets--;
pthread_mutex_unlock(&lock);
}
else
{
break;
pthread_mutex_unlock(&lock);
}
}
return nullptr;
}
int main()
{
pthread_t t1,t2,t3,t4;
pthread_create(&t1,nullptr,getTicket,(void*)"thread 1");
pthread_create(&t2,nullptr,getTicket,(void*)"thread 2");
pthread_create(&t3,nullptr,getTicket,(void*)"thread 3");
pthread_create(&t4,nullptr,getTicket,(void*)"thread 4");
pthread_join(t1,nullptr);
pthread_join(t2,nullptr);
pthread_join(t3,nullptr);
pthread_join(t4,nullptr);
}
==虽然这个互斥锁解决了数据不一致问题!但是它很不合理,因为有一个线程对于锁的竞争力比另外几个线程更强!所以导致了经常只有一个线程在抢票!==
那么如何安全且合理的抢票呢?
我们举个例子:假设现在在学校里面有一个自习室,叫做学霸VIP自习室!自习室的环境特别好但是只有一张座椅板凳,只允许一个同学进去自习!——拥有这个自习室的前提条件是,谁先拿到钥匙谁就能先进去,先到先得,自习室的钥匙放在门口!
有一个同学是个卷王是最早来的!他拿下钥匙,将门反锁!后面人就进不来了!——那么这为同学就互斥式的访问了这个自习室!(这种互斥式的访问就看谁的竞争能力强(谁先早到)! 谁就先拿到钥匙,那么就能将门反锁)
在这个同学自习了两个小时,想去上厕所,但是打开门一看,门口站满了人,都准备抢这个钥匙,但是这个同学只是想上个厕所,**于是将钥匙放进口袋里面!**然后出来的时候将门反锁!
这样子在同学上厕所,就没有能进这个自习室了!——这叫==当线程执行流被切换走的时候,将钥匙也一起带走了!==这样子等回来后,又开门,反锁,继续自学。
等过了2小时,这位同学有点不好意思了,所以决定让出自习室,于是开锁,出门,反锁,挂上钥匙,等准备走的时候,又觉得自己今天一大早就出来,就自习这么一会有点亏,所以想在自习一会,因为这位同学离这个钥匙最近最近,所以他又反手拿起钥匙,开门,反锁,继续自习!
==这叫什么!——这叫因为离资源最近!所以竞争能力最强!所以又申请到了这个钥匙==
然后这位同学终于肚子饿了,出门,反锁,挂上钥匙,但是一看门口人那么多,想到下次又不知道什么时候能轮到自己,于是凭借着离钥匙最近于是又拿起钥匙,开门,反锁,自习,但是只呆了几分钟就撑不住了,又出门了,又不甘心。
因为其他人离钥匙都没有这个同学近,所以其他同学,只能看着这位同学疯狂开门,放钥匙,拿钥匙,每一次待又不到几分钟
==这样子周而复始下去,这个自习室有没有创造价值?——没有!因为大部分时间都在申请锁,释放锁这个动作了!,有没有让别人申请锁呢?——没有!因为这位同学的竞争能力太强,而导致别人无法申请到锁!==
==我们将其他在门口的人,长时间得不到锁资源,而无法访问公共资源(自习室)的这些同学处于饥饿状态!==
一个线程频繁的申请锁资源!而导致其他线程长期得不到资源的问题就是==饥饿问题!==
但是这个线程(同学)有错么?——没有就是因为就是怎么规定的!它的竞争能力就是比其他线程(同学)更强!==但是这样子不合理!==
在其他同学的投诉下!学校出了新的规定!——==所有在自习室等待的同学都要排队,然后从自习室出来的同学,不能立马申请锁!要先去当前队列的尾部!然后重新等到这位同学的时候才能申请!==——这样子就在保证一个人访问自习室的情况下(数据安全的情况下),让同学按照一定的顺序进行访问自习室(公共资源)
==我们将同学看做线程,自习室看做公共资源!——翻译过来就是,当线程访问公共资源完毕之后,不能立马申请锁!而是要到等待队列的最末端重新开始等待才能申请!的这种形式就是线程同步==
==线程同步的本质就是在临界资源访问安全的前提下,让多个线程按照一定的顺序进行资源访问!==——从而解决因为一个线程竞争能力特别强,导致其他资源饥饿的问题!
生产消费模型
什么是生产消费模型——专业的说法是一种多执行流协同的方式
那么为什么要协同呢?——因为在多线程访问的时候,总会访问到公共资源!这些公共资源在被线程无序访问的时候一定会导致数据不一致问题!虽然可以通过加锁来解决这个问题
但是又时候导致一些不合理的问题,例如:饥饿问题
所以我们==既要保证安全,又要保证顺序(这种顺序可以不是一种绝对的顺序,但是一定要有顺序),尽可能的保证一个线程能合适合理的访问某种资源!所以我们要有一种多线程在这种工作场景下的模式!——这种模式最常见的就是生产者消费者模型!==
接下来我们用生活中的一些例子来解释一下什么是生产者消费者模型!
==那么到底什么是生成消费模型呢?==
超市,可以被生产者和消费者访问,生产者将东西放在超市,消费者从超市拿东西!——==这是生产者和消费者都能看到的东西!即超市就是一个共享资源!==
那么有没有可能出现这样的情况,消费者来超市!但是发现展架上没有东西了!都被卖完了!然后恰好来了一个超市人员来补货,==正在==往展架上补货的时候,消费者就去拿货物拿能不能拿成功呢?——==不确定!==因为只要这个工作人员要么放,要么不放,==如果有中间状态,当消费者正在访问的时候,那么当前究竟有没有放是不确定的!==——==所以当生成和消费在并发访问的时候,因为超市的某一些资源都是共享的!如果访问到了同一块资源,那么就有可能出现同时访问的问题!——从而造成数据不一致问题!==(现实中,消费者还能与补货员沟通!==但是线程之间是做不到这一点的!==)
==那么所谓的消费者在代码当中——其实就是一个线程或者多个线程!==
==那么所谓的生产在代码当中——也是一个线程或者多个线程!==
==因为涉及到了多执行流访问!所以这份共享资源首先就要被保护起来!==
那么该如何被保护呢?——这我们就要先明白生产者和消费者之间的关系
首先是生产者和生产者之间是什么关系?
概念总结
-
生产消费模型的三种关系:生产者和生产者(互斥),消费者和消费者(互斥),生产者和消费者(互斥与同步)——==都是为了保证共享资源的安全性!==
-
生产消费模型的两种关系:生产者线程与消费者线程
-
生产消费模型的一个场所:一段特定结构的缓冲区!(存放产品——即数据)
上面可以总结为321原则——只要我们想写生成消费模型!本质就是维护这个321原则
生成消费模型的特点
- 将生产者线程和消费者线程进行解耦!
条件变量
我们上面说过,一个线程频繁的申请锁资源!而导致其他线程长期得不到资源的问题就是==饥饿问题!==,要实现线程同步!首先就是要先解决一个线程频繁的申请锁资源(竞争能力过强),像是上面的生产消费模型,就有可能出现生产者线程疯狂的进行生产,导致不断申请锁资源,从而导致消费者无法进行消费!
所以就有了条件变量这个解决策略!——条件变量的存在就是为了满足多线程协同的需求所诞生了满足多线程协同的技术!
==什么是条件变量?==
- 当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
条件变量接口介绍
条件变量初始化函数——pthread_cond_init
实现基于BlockingQueue的生成消费模型
**在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。**我们会给队列设置一个上限!
其与普通的队列区别在于,当队列为空时,从队列获取元素的操作(进行消费)将会被阻塞,直到队列中被放入了元素(进行生产);当队列满时,往队列里存放元素的操作也会被阻塞(不能再再生产),直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
==肯定至少要有一个生产者线程(或者多个)向BlockingQueue放数据,一个消费者线程(或者多个)向BlockingQueue里面取数据!==
==这个BlockingQueue就是我们上面消费者模型说的“交易场所”——即一段特定结构的缓冲区!也是我们将要实现的!==
阻塞队列的应用
==阻塞队列里面不仅仅可以放整形!还可以放其他的东西!——例如:任务!==
//Block_queues
#pragma once
#include<iostream>
#include<queue>
#include<pthread.h>
const int gmaxcap = 5;
template<class T>
class BlockQueue
{
public:
BlockQueue(const int &maxcap = gmaxcap)
: maxcap_(maxcap)
{
//首先进行内部成员初始化!
pthread_mutex_init(&mutex_,nullptr);
pthread_cond_init(&pcond_,nullptr);
pthread_cond_init(&ccond_,nullptr);
}
//因为生产者和消费只关心拿数据和放数据!
//所以阻塞队列只有两个对外接口最重要!
void push(const T& in)//输入型参数,一般设计成const &
{
pthread_mutex_lock(&mutex_);
//1.判断!
while(is_full()) //判断是不是满的!
{
pthread_cond_wait(&pcond_,&mutex_);//生产条件不满足,无法进行生产!生产者进行等待!
}
//2.走到这里肯定是没有慢的!
q_.push(in);//生产数据!
//3.走到这里一定能保证阻塞队列里面有数据!
//让消费者进行消费!
pthread_cond_signal(&ccond_);//唤醒消费者!进行消费!
pthread_mutex_unlock(&mutex_);
}
void pop(T* out)//输出型参数!一般设计成指针(*),如果是输入输出型那么就是&
//通过这样的风格来区分参数的作用是什么!
{
pthread_mutex_lock(&mutex_);
//1.判断是不是空的!
while(is_empty())//循环判断!
{
pthread_cond_wait(&ccond_,&mutex_);//是空的!放在条件变量下面进行等待!
}
// 2 .走到这里可以保证一定有数据!一定不为空!
*out = q_.front();//获取数据!
q_.pop();//弹出数据!
//3. 可以保证阻塞队列里面至少有一个空的位置!
pthread_cond_signal(&pcond_);//唤醒生产者
pthread_mutex_unlock(&mutex_);
}
~BlockQueue()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&pcond_);
pthread_cond_destroy(&ccond_);
}
private:
bool is_empty()
{
return q_.empty();
}
bool is_full()
{
return q_.size() == maxcap_;
}
private:
std::queue<T> q_;
int maxcap_;//表示队列元素的上限!
pthread_mutex_t mutex_;//因为stl本事不是线程安全的!所以要有锁来进行保护!
pthread_cond_t pcond_;//万一队列满了让生产者去对应的条件变量下面休眠!
pthread_cond_t ccond_;//万一队列空了让消费者去对应的条件变量下面休眠!
};
#pragma once
#include<iostream>
#include<functional>
#include<cstdio>
class Task
{
using func_t = std::function<int(int,int,const std::string&)>;
public:
Task()
{}
Task(int x, int y, const std::string& op, func_t func)
: x_(x), y_(y), op_(op), callback_(func)
{}
std::string operator()()
{
int result = callback_(x_, y_, op_);
char buffer[1024];
snprintf(buffer, sizeof buffer, "%d %s %d = %d", x_, op_.c_str(), y_, result);
return buffer;
}
std::string toTaskString()
{
char buffer[1024];
snprintf(buffer, sizeof buffer, "%d %s %d = ?", x_, op_.c_str(), y_);
return buffer;
}
private:
int x_;
int y_;
std::string op_;
func_t callback_;
};
#include"BlockingQueue.hpp"
#include"Task.hpp"
#include<unistd.h>
#include<sys/types.h>
#include<ctime>
#include<string>
#include<map>
const std:: string oper = "+-*/%";
int mymath(int x,int y,const std::string& op)
{
using func_t = std::function<int(int,int)>;
std::map<std::string,func_t> opfuncmap =
{
{"/",[](int x,int y)
{
if(y == 0)
{
std::cout << "div zero error!" << std::endl;
return -1;
}
else return x/y;
}},
{"%",[](int x,int y)
{
if(y == 0)
{
std::cout << "mod zero error!" << std::endl;
return -1;
}
else return x%y;
}},
{"*",[](int x,int y){return x*y;}},
{"+",[](int x,int y){return x+y;}},
{"-",[](int x,int y){return x-y;}}
};
return opfuncmap[op](x,y);
}
//调用逻辑
void* consumer(void* bq_)
{
BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(bq_);
while(true)
{
//从事消费活动!
Task t;
bq->pop(&t);
std::cout << "消费任务: " << t() << std::endl;
// sleep(1);
}
return nullptr;
}
void* productor(void* bq_)
{
BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(bq_);
while(true)
{
//从事生产活动
int x = rand()% 10 +1;//在这里先用随机数构建一个数据!
int y = rand()% 5;//在这里先用随机数构建一个数据!
int operCode = rand()%oper.size();
std::string op(1,oper[operCode]);
Task t(x,y,op,mymath);
bq->push(t);//这样子就完成生产了!
std::cout << "生产任务! " <<t.toTaskString()<<std::endl;
sleep(1);
}
return nullptr;
}
int main()
{
srand((unsigned long)time(nullptr)^getpid());
//要看到同一份资源!
BlockQueue<Task>* bq = new BlockQueue<Task>;
pthread_t c,p;//consum and product
pthread_create(&c,nullptr,consumer,bq);
pthread_create(&p,nullptr,productor,bq);
//通过第四个参数来将同一份资源传给不同的线程!
pthread_join(c,nullptr);
pthread_join(p,nullptr);
delete bq;
return 0;
}
==但是无论是什么,我们最底层的阻塞队列都是不变的!!==