linux线程之信号量
POSIX信号量
阻塞队列的缺陷
==这是一个我们自己的实现阻塞队列!==
class BlockQueue
{
public:
BlockQueue(const int &maxcap = gmaxcap)
: maxcap_(maxcap)
{
pthread_mutex_init(&mutex_,nullptr);
pthread_cond_init(&pcond_,nullptr);
pthread_cond_init(&ccond_,nullptr);
}
void push(const T& in)//生产
{
pthread_mutex_lock(&mutex_);
while(is_full())//判断是不是满的
{
pthread_cond_wait(&pcond_,&mutex_);
}
q_.push(in);
pthread_cond_signal(&ccond_);
pthread_mutex_unlock(&mutex_);
}
void pop(T* out)//消费
{
pthread_mutex_lock(&mutex_);
while(is_empty())//判断是不是空的
{
pthread_cond_wait(&ccond_,&mutex_);
}
*out = q_.front();
q_.pop();
pthread_cond_signal(&pcond_);
pthread_mutex_unlock(&mutex_);
}
~BlockQueue()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&pcond_);
pthread_cond_destroy(&ccond_);
}
private:
bool is_empty()
{
return q_.empty();
}
bool is_full()
{
return q_.size() == maxcap_;
}
private:
std::queue<T> q_;
int maxcap_;
pthread_mutex_t mutex_;
pthread_cond_t pcond_;
pthread_cond_t ccond_;
};
==我们自己实现的这个阻塞队列有什么“不足”的地方呢?==
什么是信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步,而system V版本是用于线程互斥
了解信号量之前我们要明白
那么什么是信号量呢?
==信号量的本质就是一个计数器!==——衡量临界资源中资源数量多少的计数器!可以用来保证!访问这个公共资源的线程,不会超过这个资源的数量!
==申请信号量的本质就是对临界资源中特定小块资源的预定机制!==——只要拥有信号量!就在未来一定能够拥有临界资源的一部分!
通过申请信号量去预定一部分的公共资源!只有持有信号量才能去访问这个公共资源!然后通过程序员编码的方式来保证不同线程可以并发的访问公共资源的不同区域!
==因为访问临界资源之前,要先申请信号量!而信号量的本质是一个计数器!——那么我们就可以在真正的访问临界资源之前,我们就可以知道临界资源的使用情况!(这就是为什么要有信号量)==
这样子就可以解决阻塞队列中要先访问公共资源,来知道公共资源状态的不足!任何线程都不用接触到临界资源!
只要申请信号量成功,就说明一定存在我们需要的资源!
只要申请信号量失败,就说明条件不就绪!只能等待!
==就不需要在临界资源里面进行判断了!直接去申请信号量!不管是申请成功还是不成功其实都没有访问对应的临界资源!==
==通过信号量提前将资源的数目暴露在外部!让我们通过申请信号量就能知道资源有没有就绪==
==但是其实上面我们的阻塞队列是不适合使用信号量的!因为阻塞队列里面虽然可以放很多节点,但是其实真正用的只有头和尾!我们只是说阻塞队列的不足而已!==
线程要访问临界资源中的某个区域——要申请信号量——所以所有的线程都必须先看到信号量!——所以信号量本身也必须是==公共资源==
公共资源的是一个计数器!那么就要进行递减和递增!
//伪代码
int sem = 10;//假设这就是一个计数器——信号量
sem--;//意味着可用资源变少了!说明有人拿走资源了!——这个操作就是申请资源!
sem++;//说明可用资源变多了!有线程释放资源了!——这个操作就归还资源!
==因为信号量本身是一个公共资源的!但是++/--这个操作本身并不是安全的!所以也必须保证操作的原子性==
PV操作
sem_t sem;//所以信号量不是简单的int类型!而是sem_t类型
sem--;//这种保证原子性的--的操作我们称之为P操作!
sem++;//这种保证原子性的++的操作我们称之为V操作!
==所以信号量的核心操作就是——PV原语==
信号量的基本使用接口
信号量头文件
include <semaphore.h>//信号量头文件!
初始化信号量
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
sem_t 是信号量类型,由pthread库替我们进行维护!
pshared:表示是否共享,0表示线程间不共享,非零表示进程间共享——一般是设为0
value:信号量计数器的初始值——这个值的多少取决于这个临界资源的值是多少!
//返回值是成功返回0 失败返回 -1 并设置错误码!
销毁信号量
int sem_destroy(sem_t *sem);
//返回值是成功返回0 失败返回 -1 并设置错误码!
等待信号量——P操作,相当于对信号量计数器--
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()——p操作
//这一旦等待成功就会立刻返回!让我们的代码继续运行!
//如果申请失败!就会一直阻塞,直到信号量大于0,线程才会被唤醒!
//返回值是成功返回0 失败返回 -1 并设置错误码!
发布信号量——v操作 ,相当于对信号量计数器++
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()——v操作
//返回值是成功返回0 失败返回 -1 并设置错误码!
基于环形队列的生产消费模型
==阻塞队列就是互斥锁的应用场景!==
==那么信号量有什么对应的应用场景呢?——环形队列==
那么什么是环形队列?
==那么为了完成环形队列的cp问题(create and product),我们的核心工作是什么呢?==
就是我们上面提到的三个规则!
- 消费者不能超过生产者
- 生产者不能把消费者套一个圈
- 解决在满和空的情况下,生产者和消费者的互斥与同步问题
==我们该如何保证这个三个性质呢?==
信号量是用来衡量临界资源中数量的多少——那么数量是指什么呢?
==对于生产者而言,看中的就是队列中的剩余空间!==
==对于消费者而言,看中的就是放入队列中的数据!==
所以为了更好的去衡量生产者和消费者的资源——我们该怎么做呢?
==给空间资源定义一个信号量!给数据资源也定义一个信号量!==
只有申请空间资源的计数器成功!那么就是说明还有队列里面有剩余空间!
只有申请数据资源的计数器成功!那么就说明队列里面还有数据!
==有了这两个信号量,我们皆可以维护这个三个规则!==
==对于生产和消费的位置我们要想清楚!==
位置其实本质就是队列中的下标!
一定是两个下标!——一个生产者一个是消费者
只有为空或者为满,下标才会相同!——此时一定会有一个线程拿不到资源!
==在编码中我们就可以通过下标这种方式,我们就能让生产和消费来访问不同的位置!也可以通过下标来让线程去指派要访问的资源!==
基于环形队列生产消费模型的实现
==这是单生产者,单消费者版本的环形队列!!==
//RingQueue.hpp
#pragma once
#include<iostream>
#include<vector>
#include<semaphore.h>//信号量的头文件!
#include<cassert>
static int gcap = 5;
template<class T>
class RingQueue
{
private:
void P(sem_t& sem)
{
//等待信号量,等待成功,则就返回线程继续运行!
//等待失败则就直接将自己挂起
int n = sem_wait(&sem);
assert(n == 0);
(void)n;
}
void V(sem_t& sem)
{
//发布信号量!
int n = sem_post(&sem);
assert(n == 0);
(void)n;
}
public:
RingQueue(const int &cap = gcap)
: queue_(cap), cap_(cap)
{
int n = sem_init(&SpaceSem_,0,cap_);//0表示线程不共享!
//对于空间资源一开始的值就是数组空间的大小!
assert(n == 0);
n = sem_init(&DataSem_,0,0);//刚开始乜没有数据所以信号量的值就是0
assert(n == 0);
productorStep_ = consumerStep_ = 0; //开始的下标都是0
}
//生产者调用push函数!
void Push(const T &in)//向环形队列里面插入数据!
{
P(SpaceSem_);//生产者首先就是进行操作!申请空间资源的信号量!
//申请信号量成功意味着一定可以进行正常的生产!
queue_[productorStep_++] = in;
//向可以生产的位置进行生产数据!
productorStep_ %= cap_;
//等到最后的时候会自动回到最开始!
V(DataSem_);//生产出来数据并放入空间之后!说明数据资源多了一份!
//那么就要对数据资源的信号量进行V操作!
}
void Pop( T* out)//像环形队列里面拿数据!
{
P(DataSem_);//访问环形队列之前就要去申请信号量!
//数据信号量如果申请成功说明里面有数据!
*out = queue_[consumerStep_++];
//从队列可以消费的地方开始进行消费!
consumerStep_ %= cap_;
//到数组最后,越界了最返回最开始
V(SpaceSem_);//已经消费了一个数据!说明此时一个空间已经空出来了!
//那么我们就要对空间资源信号量进行V操作!
}
~RingQueue()
{
//销毁信号量!
sem_destroy(&SpaceSem_);
sem_destroy(&DataSem_);
}
private:
std::vector<T> queue_;
int cap_;//队列的容量
sem_t SpaceSem_;//生产者信号量!——也就是生产者想要的空间资源!
sem_t DataSem_;//消费者信号量——也就是消费者想要的数据资源!
//sem_t这个类型是由pthread库进行维护的!
int productorStep_;//生产者在哪里进行生产!
int consumerStep_;//消费者在哪里进行消费!
//生产消费位置的本质就是数组下标!
};
//main.cc主程序
#include"RingQueue.hpp"
#include<pthread.h>
#include<ctime>
#include<cstdlib>
#include<sys/types.h>
#include<unistd.h>
void* ProductorRoutine(void* rq_)
{
RingQueue<int>* rq = static_cast<RingQueue<int>*>(rq_);
while(true)
{
int data = rand()%100 +1;
rq->Push(data);
std::cout << "生产完成!生产的数据是: "<< data << std::endl;
sleep(1);//让生产者慢一点
//就会出现生产一个消费一个!
//因为生产者如果不生产,消费者就无法消费!
//这就是典型的同步关系!
}
}
void *ConsumerRoutine(void *rq_)
{
RingQueue<int>* rq = static_cast<RingQueue<int>*>(rq_);
while(true)
{
int data;
rq->Pop(&data);
std::cout << "消费完成!消费的数据是: "<< data << std::endl;
//sleep(1);
//让消费者慢一点!就会出现,消费者都是一直消费的是历史的数据
}
}
int main()
{
srand((unsigned int)time(nullptr)^getpid()^0x121312312);//生成一个随机数
RingQueue<int> *rq = new RingQueue<int>();
pthread_t c,p;
pthread_create(&c,nullptr,ProductorRoutine,rq);
pthread_create(&p,nullptr,ConsumerRoutine,rq);
pthread_join(p,nullptr);
pthread_join(c,nullptr);
return 0;
}
生产者慢,消费者快!
消费生成任务
//Task.hpp
#pragma once
#include<iostream>
#include<functional>
#include<cstdio>
#include<ctime>
#include<string>
#include<map>
#include<fstream>
class CalTask
{
using func_t = std::function<int(int,int,const std::string&)>;
public:
CalTask()
{}
CalTask(int x, int y, const std::string& op, func_t func)
: x_(x), y_(y), op_(op), callback_(func)
{}
std::string operator()()
{
int result = callback_(x_, y_, op_);
char buffer[1024];
snprintf(buffer, sizeof buffer, "%d %s %d = %d", x_, op_.c_str(), y_, result);
return buffer;
}
std::string toTaskString()
{
char buffer[1024];
snprintf(buffer, sizeof buffer, "%d %s %d = ?", x_, op_.c_str(), y_);
return buffer;
}
private:
int x_;
int y_;
std::string op_;
func_t callback_;
};
const std:: string oper = "+-*/%";
int mymath(int x,int y,const std::string& op)
{
using func_t = std::function<int(int,int)>;
std::map<std::string,func_t> opfuncmap =
{
{"/",[](int x,int y)
{
if(y == 0)
{
std::cout << "div zero error!" << std::endl;
return -1;
}
else return x/y;
}},
{"%",[](int x,int y)
{
if(y == 0)
{
std::cout << "mod zero error!" << std::endl;
return -1;
}
else return x%y;
}},
{"*",[](int x,int y){return x*y;}},
{"+",[](int x,int y){return x+y;}},
{"-",[](int x,int y){return x-y;}}
};
return opfuncmap[op](x,y);
}
//main.cc
#include"RingQueue.hpp"
#include"Task.hpp"
#include<pthread.h>
#include<ctime>
#include<cstdlib>
#include<sys/types.h>
#include<unistd.h>
void* ProductorRoutine(void* rq_)
{
RingQueue<CalTask>* rq = static_cast<RingQueue<CalTask>*>(rq_);
while(true)
{
int x = rand() % 100 +1;
int y = rand() % 123 +1;
int operindex = rand()%oper.size();
std::string op(1,oper[operindex]);
CalTask t(x,y,op,mymath);
rq->Push(t);
std::cout << "生产完成!生产的任务是 :" << t.toTaskString() <<std::endl;
sleep(1);
}
}
void *ConsumerRoutine(void *rq_)
{
RingQueue<CalTask>* rq = static_cast<RingQueue<CalTask>*>(rq_);
while(true)
{
CalTask t;
rq->Pop(&t);
std::cout << "消费任务完成!消费的任务是: "<< t.toTaskString() << " 计算结果为: "<<t()<<std::endl;
}
}
//我们可以看到其实消费者不在乎任务到底是什么!
//它只要不断的运行就可以!
//这就是底层设计和业务做解耦!
//这个任务可以是任何任务!
//我们未来只要改变任务内容!这个里不进行任何的修改也是可以运行的!
int main()
{
srand((unsigned int)time(nullptr)^getpid()^0x121312312);//生成一个随机数
RingQueue<CalTask> *rq = new RingQueue<CalTask>();
pthread_t c,p;
pthread_create(&c,nullptr,ProductorRoutine,rq);
pthread_create(&p,nullptr,ConsumerRoutine,rq);
pthread_join(p,nullptr);
pthread_join(c,nullptr);
return 0;
}
多生产多消费的循环队列
多生产线程和多消费线程如果是在阻塞队列里面!任意一个时刻都是只有一个线程在阻塞队列里面访问临界资源,线程之间都是阻塞关系!资源都是被整体使用的
==在环形队列里面生产者和生产者之间都是互斥关系!消费者和消费者之间也都是互斥关系!——无论有多少个生产者或者多少个消费者,在任意时刻!都是只允许一个生产者和一个消费者先进到我们临界区里面!==
==所以我们要将那个代码改成多生产多消费其实是很简单的!只要保证最终进入临界区是一个生成一个消费即可!——怎么做呢?加上两个互斥锁皆可以了!==
#pragma once
#include<iostream>
#include<vector>
#include<cassert>
#include<semaphore.h>//信号量的头文件!
#include<pthread.h>
static int gcap = 5;
template<class T>
class RingQueue
{
private:
void P(sem_t& sem)
{
int n = sem_wait(&sem);
assert(n == 0);
(void)n;
}
void V(sem_t& sem)
{
int n = sem_post(&sem);
assert(n == 0);
(void)n;
}
public:
RingQueue(const int &cap = gcap)
: queue_(cap), cap_(cap)
{
int n = sem_init(&SpaceSem_,0,cap_);
assert(n == 0);
n = sem_init(&DataSem_,0,0);
assert(n == 0);
productorStep_ = consumerStep_ = 0;
//将互斥锁进行初始化!
pthread_mutex_init(&pmutex_,nullptr);
pthread_mutex_init(&cmutex_,nullptr);
}
void Push(const T &in)
{
pthread_mutex_lock(&pmutex_);//先对生产者的互斥锁进行加锁!
P(SpaceSem_);//生产者首先就是进行操作!申请空间资源的信号量!
//申请信号量成功意味着一定可以进行正常的生产!
//如果此时持有锁的时候,因为没有信号量而被挂起有问题么?
//没有!因为这个线程本身就是所有生产者线程中的竞争胜利者!
//这个生产者被挂起之后!等到有信号量的时候!再被唤醒
//然后继续向下执行!这个是没有问题的!
queue_[productorStep_++] = in;
productorStep_ %= cap_;
V(DataSem_);
pthread_mutex_unlock(&pmutex_);//对生产者互斥锁解锁!
}
void Pop( T* out)//像环形队列里面拿数据!
{
pthread_mutex_lock(&cmutex_);//先对消费者的互斥锁进行加锁!
P(DataSem_);
*out = queue_[consumerStep_++];
consumerStep_ %= cap_;
V(SpaceSem_);
pthread_mutex_unlock(&cmutex_);//对消费者互斥锁解锁!
}
~RingQueue()
{
sem_destroy(&SpaceSem_);
sem_destroy(&DataSem_);
//销毁互斥锁!
pthread_mutex_destroy(&pmutex_);
pthread_mutex_destroy(&cmutex_);
}
private:
std::vector<T> queue_;
int cap_;
sem_t SpaceSem_;
sem_t DataSem_;
int productorStep_;
int consumerStep_;
pthread_mutex_t pmutex_;//生产者的互斥锁!
pthread_mutex_t cmutex_;//消费者的互斥锁!
// 要有两个互斥锁!一个管理生产者,一个管理消费者!
//先让生产者之间进行竞争,然后再让消费者之间进行竞争!
//最后决胜出一个生产者和一个消费者!
//让最后的这个生产者和消费者进入临界区!
};
#include"RingQueue.hpp"
#include"Task.hpp"
#include<pthread.h>
#include<ctime>
#include<cstdlib>
#include<sys/types.h>
#include<unistd.h>
std::string SelfName()
{
char name[128];
snprintf(name,sizeof name,"thread[0x%x]",pthread_self());
return name;
}
void* ProductorRoutine(void* rq_)
{
RingQueue<CalTask>* rq = static_cast<RingQueue<CalTask>*>(rq_);
while(true)
{
int x = rand() % 100 +1;
int y = rand() % 123 +1;
int operindex = rand()%oper.size();
std::string op(1,oper[operindex]);
CalTask t(x,y,op,mymath);
rq->Push(t);
std::cout << SelfName()<< " 生产完成!生产的任务是 :" << t.toTaskString() <<std::endl;
sleep(1);
}
}
void *ConsumerRoutine(void *rq_)
{
RingQueue<CalTask>* rq = static_cast<RingQueue<CalTask>*>(rq_);
while(true)
{
CalTask t;
rq->Pop(&t);
std::cout << SelfName()<< " 消费任务完成!消费的任务是: "<< t.toTaskString() << " 计算结果为: "<<t()<<std::endl;
}
}
int main()
{
srand((unsigned int)time(nullptr)^getpid()^0x121312312);//生成一个随机数
RingQueue<CalTask> *rq = new RingQueue<CalTask>();
pthread_t p[4],c[8];
for(int i = 0;i<4;++i) pthread_create(p+i,nullptr,ConsumerRoutine,rq);
for(int i = 0;i<8;++i) pthread_create(c+i,nullptr,ProductorRoutine,rq);
for(int i =0;i< 4;++i) pthread_join(p[i],nullptr);
for(int i = 0;i<8;++i) pthread_join(c[i],nullptr);
delete rq;
return 0;
}
==此时这就是一个多线程的多生产多消费模型!==
==上面的代码有什么可以优化的地方呢?——是先申请信号量,再加锁!还是先加锁再申请信号量呢比较好呢?==
其实是先申请信号量再加锁!——我们维护互斥关系==不是为了保护信号量!==因为信号量的申请本身就是原子的!==我们是为了保护临界资源的!例如:循环队列和队列下标!==
什么情况下要用到多生产多消费?
==我们知道,无论多少个生成线程,消费线程进入共享资源的永远只有一个生成,一个消费==那么多生产多消费的意义是什么呢?
==因为获取和构建任务——是要花时间的!==
==我们今天使用的只是简单的随机数来构建任务!任务也比较简单!但是未来我们可能会从文件里面,数据库里面,网络里面去读取数据,这样子效率一定是比较低的!==
==消费任务也是如此!以后执行任务的可能会花费很长的时间!——从队列里面拿出任务反倒可能是最快的==
==创建任务和执行任务的时候多线程情况下是可以并行执行的!只是放任务和拿任务是串行的而已——而且放任务和拿任务在未来可能划花费的时间可能是最少的!==
==如果构建任务和处理任务本身是比较花费时间的而且我们可以通过创建多线程来解决!那么我们才采用多生产多消费!==