0
点赞
收藏
分享

微信扫一扫

【基础】Kafka -- 日志存储

古得曼_63b6 2023-04-25 阅读 55

 

目录

 什么是信号量

如何理解信号量的使用

基于环形队列的生产消费者模型

如上问题我们如何用编码保证 ?(信号量)

编码:


POSIX信号量和SystemV信号量作用相同,都是用于同步操作。POSIX可以用于线程同步。

信号量本质上就是一个计数器。

 什么是信号量

只要保证共享资源任何时刻都只有一个执行流在进行访问,就有了临界资源和临界区的概念。互斥(加锁)时候,共享资源是被当做整体使用的。如果不同线程访问的是共享资源中的不同的位置,这个共享资源就可以分开来使用,让不同的执行流访问不同的区域,就可以继续并发了。

那么我们如何知道这个一共有多少个资源,还剩多少个?自己初始化的

你怎么保证这个资源就是给你的?程序员编码

我怎么知道我一定可以具有一个共享资源呢?信号量

电影院的例子:

买票的本质:叫做资源(座位)的预定机制。

信号量本质:是一个计数器,访问临界资源的时候,必须先申请信号量资源(sem--, 预定资源, P),使用完毕信号量资源(sem++, 释放资源,V)。

如何理解信号量的使用

我们申请了一个信号量,代表我当前执行流一定具有一个资源,可以被它使用了,那么是哪一个资源呢?需要程序员根据场景自己编码完成!

基于环形队列的生产消费者模型

项目:就是 物理结构->逻辑结构,在加一些条件循环判断。看项目时,看核心的物理结构代码,联想到逻辑结构。

环形队列:

判空:start == end

判满:1.计数器 2.浪费一个格子

空的时候,消费者线程等待;满的时候,生产者线程等待。

 如果生产和消费指向了环形的同一个位置(表示队列为空or未满):生产和消费要有互斥或者同步问题。但是大概率生产线程和消费线程指向的是不同的位置,所以只需要让生产和消费指向同一个位置,具有互斥同步关系就可以了,而让生产和消费不指向同一个位置时,并发指向!

我们的期望:生产者和消费者套圈(数据覆盖问题)、消费者不能超过生产者,指向同一个位置的情况:1.为空时,让生产者先运行;2.为满,让消费者先运行;其他情况可以并发访问。

如上问题我们如何用编码保证 ?(信号量)

先看看两者关注什么

生产者:最关注的是空间资源 -> spaceSem信号量 -> N

消费者:最关注的是数据资源 -> dataSem信号量 -> 0

进行生产:先申请信号量 P(spaceSem) -> spaceSem--,然后再特定位置生产。V(dataSem)

进行消费:先申请信号量P(dataSem) -> dataSem--, 然后消费特定的数据。V(spaceSem)

信号量申请失败,线程被挂起。

问题:如何保证在特定的位置访问自己的资源? 两者线程都有自己的下标即可。

编码:

#include "ringQueue.hpp"
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>

int myAdd(int x, int y)
{
    return  x + y;
}

void* consume(void* args)
{
    RingQueue<Task>* rq = (RingQueue<Task>*)args;

    while (true)
    {
        Task t;
        rq->pop(&t);
        std::cout << "线程 " << pthread_self() << " 消费成功:" << "=" << t() << std::endl;
    }
}

void* product(void* args)
{
    RingQueue<Task>* rq = (RingQueue<Task>*)args;
    while (true)
    {
        int x = rand() % 100;
        int y = rand() % 100;
        Task t(x, y, myAdd);
        rq->push(t);
        std::cout << "线程 " << pthread_self() << " 生产任务成功:" << x << "+" << y << "=?" << std::endl;
    }
}

int main()
{
    srand((unsigned int)time(nullptr));

    pthread_t c[3], p[5];
    RingQueue<int> *rq = new RingQueue<int>();

    for (int i = 0; i < 3; ++i) pthread_create(c + i, nullptr, consume, rq);
    for (int i = 0; i < 5; ++i) pthread_create(p, nullptr, product, rq);

    for (int i = 0; i < 3; ++i) pthread_join(c[i], nullptr);
    for (int i = 0; i < 5; ++i) pthread_join(p[i], nullptr);

    return 0;
}
#pragma once

#include "sem.hpp"
#include "Task.hpp"
#include <iostream>
#include <vector>
#include <pthread.h>

const int g_data_num = 5;

// 定义环形队列
template<class T>
class RingQueue
{
public:

    RingQueue(int num = g_data_num)
    :_num(num)
    ,_ring_queue(num)
    ,_c_step(0)
    ,_p_step(0)
    ,_space_sem(num)
    ,_data_sem(0)
    {
        pthread_mutex_init(&_c_mtx, nullptr);
        pthread_mutex_init(&_p_mtx, nullptr);
    }

    void push(T& in)
    {
        // 先预空间申请信号量
        _space_sem.p();
        // 在进行实际处理
        // std::cout << "空间申请成功" << std::endl;
        pthread_mutex_lock(&_p_mtx);
        _ring_queue[_p_step++] = in;
        _p_step %= _num;

        // std::cout << "线程 " << pthread_self() << " 生产任务成功:" << in << std::endl;
        pthread_mutex_unlock(&_p_mtx);
        _data_sem.v();

    }

    void pop(T* out)
    {
        // 先预数据申请信号量
        _data_sem.p();
        // 在进行实际处理
        // std::cout << "数据申请成功" << std::endl;
        pthread_mutex_lock(&_c_mtx);
        *out = _ring_queue[_c_step++];
        _c_step %= _num;

        // std::cout << "线程 " << pthread_self() << " 任务消费成功:" << *out << std::endl;
        pthread_mutex_unlock(&_c_mtx);
        _space_sem.v();
    }

    ~RingQueue()
    {
        pthread_mutex_destroy(&_c_mtx);
        pthread_mutex_destroy(&_p_mtx);
    }
private:
    std::vector<T> _ring_queue; // 队列
    int _num; // 数据最大个数
    int _c_step; // 消费者下标
    int _p_step; // 生产者下标
    Sem _space_sem; // 空间信号量
    Sem _data_sem; // 数据信号量
    pthread_mutex_t _c_mtx; // 消费者锁
    pthread_mutex_t _p_mtx; // 生产者锁
};
#ifndef __TASK_HPP
#define __TASK_HPP

#include <iostream>
#include <functional>

// typedef std::function<int (int, int)> func_t;
typedef int(*func_t)(int, int);

class Task
{
public:
    Task()
    {}

    Task(int x, int y, func_t func)
    :_x(x)
    ,_y(y)
    ,_func(func)
    {

    }

    int operator()()
    {
        return  _func(_x, _y);
    }

private:
    int _x;
    int _y;
    func_t _func;
};

#endif
#ifndef __TASK_HPP
#define __TASK_HPP

#include <iostream>
#include <functional>

// typedef std::function<int (int, int)> func_t;
typedef int(*func_t)(int, int);

class Task
{
public:
    Task()
    {}

    Task(int x, int y, func_t func)
    :_x(x)
    ,_y(y)
    ,_func(func)
    {

    }

    int operator()()
    {
        return  _func(_x, _y);
    }

private:
    int _x;
    int _y;
    func_t _func;
};

#endif
ring_queue:testMain.cc
	g++ -o $@ $^ -std=c++11 -lpthread 
.PHONY:clean
clean:
	rm -f ring_queue

多生产多消费的意义? 不要狭隘的认为,把认为或者数据放在交易场所,就是生产和消费,将数据或者任务生产前和那道之后的处理,才是最耗费时间的。

生产的本质:私有的任务->公共空间

消费的本质:公共空间中的认为->私有的

信号量的本质是一个计数器,计数器的意义是什么?可以不用进入临界区,就可以得知资源情况,甚至可以减少临界区内部的判断。

以前申请锁 -> 判断与访问 -> 释放锁 --- 这么做的本质是我们并不清楚临界资源的情况!!信号量要提前预设资源的情况,而且在pv变化过程中,我们可以在外部就能知晓临界资源的情况!

举报

相关推荐

0 条评论