0
点赞
收藏
分享

微信扫一扫

召回05 矩阵补充、最近邻查找

倚然君 2024-09-30 阅读 25

文章目录

一.概念辨析

1.什么是消费者?

2.服务端为什么要管理消费者?

3.怎么管理消费者?

4.需要管理生产者吗?

二.编写思路

1.定义消费者

2.定义队列消费者管理管理类

3.定义消费者管理类

三.代码实践

#pragma once
#include "../common/Log.hpp"
#include "../common/message.pb.h"
#include <functional>
#include <memory>
#include <atomic>
#include <mutex>
#include <vector>
#include <unordered_map>
namespace ns_consumer
{
    using namespace ns_log;

    class Consumer;
    class QueueConsumerManager;

    using ConsumerPtr = std::shared_ptr<Consumer>;
    using QueueConsumerManagerPtr = std::shared_ptr<QueueConsumerManager>;
    using MessagePtr = std::shared_ptr<ns_data::Message>;
    using ConsumerCallback_t = std::function<void(const std::string& qname, const std::string& consumerId, MessagePtr msgPtr)>;
    struct Consumer
    {
        std::string _id;
        std::string _qname;
        ConsumerCallback_t _callback;
        bool _autoAck;

        Consumer(const std::string id, const std::string &qname, ConsumerCallback_t callback, bool autoAck)
            : _id(id),
              _qname(qname),
              _callback(callback),
              _autoAck(autoAck)
        {
            LOG(DEBUG) << "创建消费者: " << _id << endl;
        }

        ~Consumer()
        {
            LOG(DEBUG) << "析构消费者: " << _id << endl;
        }
    };

    class QueueConsumerManager
    {
    private:
        const std::string _qname;
        std::vector<ConsumerPtr> _consumers;
        size_t _rotateOrder;
        std::mutex _mtx;

    public:
        QueueConsumerManager(const std::string &qname)
            : _qname(qname),
              _consumers(),
              _rotateOrder(0),
              _mtx()
        {
        }

        /***********
         * 新增消费者
         * ****************/
        ConsumerPtr addConsumer(const std::string &id, const std::string &qname, ConsumerCallback_t callback, bool autoAck)
        {
            std::unique_lock<std::mutex> lck(_mtx);
            // 判断消费者是否重复
            for (auto &consumerPtr : _consumers)
            {
                if (consumerPtr->_id == id)
                {
                    return consumerPtr;
                }
            }
           ConsumerPtr ret = std::make_shared<Consumer>(id, qname, callback, autoAck);
           _consumers.push_back(ret);
           return ret;
        }

        /**************
         * 移除消费者
         * ***************/
        void removeConsumer(const std::string &cid)
        {
            std::unique_lock<std::mutex> lck(_mtx);

            for (auto it = _consumers.begin(); it != _consumers.end(); ++it)
            {
                if ((*it)->_id == cid)
                {
                    _consumers.erase(it);
                    break;
                }
            }
        }

        /***************
         * 负载均衡地获取一个消费者
         * *************/
        ConsumerPtr chooseConsumer()
        {
            std::unique_lock<std::mutex> lck(_mtx);
            if (_consumers.size() == 0)
            {
                return nullptr;
            }
            _rotateOrder %= _consumers.size();
            return _consumers[_rotateOrder++];
        }
    };

    class ConsumerManager
    {
    private:
        std::unordered_map<std::string, QueueConsumerManagerPtr> _qConsumerManagers;
        std::mutex _mtx;

    public:
        ConsumerManager(const std::vector<std::string> &qnames)
        {
            for (const auto &qname : qnames)
            {
                _qConsumerManagers[qname] = std::make_shared<QueueConsumerManager>(qname);
            }
        }

        /**************
         * 初始化队列消费者管理句柄--新增队列时调用
         * **************/
        void initQueueConsumerManager(const std::string &qname)
        {
            std::unique_lock<std::mutex> lck(_mtx);
            if (_qConsumerManagers.count(qname))
            {
                return;
            }
            _qConsumerManagers[qname] = std::make_shared<QueueConsumerManager>(qname);
        }

        /*******************
         * 销毁指定的队列消费者管理句柄--销毁队列时调用
         * ****************/
        void removeQueueConsumerManager(const std::string &qname)
        {
            std::unique_lock<std::mutex> lck(_mtx);
            _qConsumerManagers.erase(qname);
        }

        /*************
         * 给指定队列新增消费者
         * ************/
        ConsumerPtr addConsumer(const std::string &id, const std::string &qname, ConsumerCallback_t callback, bool autoAck)
        {
            std::unique_lock<std::mutex> lck(_mtx);

            if (_qConsumerManagers.count(qname) == 0)
            {
                LOG(WARNING) << "QueueConsumerManager not found, qname: " << qname << endl;
                return nullptr;
            }
            return _qConsumerManagers[qname]->addConsumer(id, qname, callback, autoAck);
            
        }

        /***********
         * 删除指定队列的消费者
         * ****************/
        void removeConsumer(const std::string &qname, const std::string &cid)
        {
            std::unique_lock<std::mutex> lck(_mtx);

            if (_qConsumerManagers.count(qname) == 0)
            {
                LOG(WARNING) << "QueueConsumerManager not found, qname: " << qname << endl;
                return;
            }
            _qConsumerManagers[qname]->removeConsumer(cid);
        }

        /*****************
         * 获取指定队列的一个消费者
         * **************/
        ConsumerPtr chooseConsumer(const std::string& qname)
        {
            std::unique_lock<std::mutex> lck(_mtx);
            if (_qConsumerManagers.count(qname) == 0)
            {
                LOG(WARNING) << "QueueConsumerManager not found, qname: " << qname << endl;
                return nullptr;
            }
            return _qConsumerManagers[qname]->chooseConsumer();
        }
    };
}
举报

相关推荐

0 条评论