文章目录
一.概念辨析
1.什么是消费者?
2.服务端为什么要管理消费者?
3.怎么管理消费者?
4.需要管理生产者吗?
二.编写思路
1.定义消费者
2.定义队列消费者管理管理类
3.定义消费者管理类
三.代码实践
#pragma once
#include "../common/Log.hpp"
#include "../common/message.pb.h"
#include <functional>
#include <memory>
#include <atomic>
#include <mutex>
#include <vector>
#include <unordered_map>
namespace ns_consumer
{
using namespace ns_log;
class Consumer;
class QueueConsumerManager;
using ConsumerPtr = std::shared_ptr<Consumer>;
using QueueConsumerManagerPtr = std::shared_ptr<QueueConsumerManager>;
using MessagePtr = std::shared_ptr<ns_data::Message>;
using ConsumerCallback_t = std::function<void(const std::string& qname, const std::string& consumerId, MessagePtr msgPtr)>;
struct Consumer
{
std::string _id;
std::string _qname;
ConsumerCallback_t _callback;
bool _autoAck;
Consumer(const std::string id, const std::string &qname, ConsumerCallback_t callback, bool autoAck)
: _id(id),
_qname(qname),
_callback(callback),
_autoAck(autoAck)
{
LOG(DEBUG) << "创建消费者: " << _id << endl;
}
~Consumer()
{
LOG(DEBUG) << "析构消费者: " << _id << endl;
}
};
class QueueConsumerManager
{
private:
const std::string _qname;
std::vector<ConsumerPtr> _consumers;
size_t _rotateOrder;
std::mutex _mtx;
public:
QueueConsumerManager(const std::string &qname)
: _qname(qname),
_consumers(),
_rotateOrder(0),
_mtx()
{
}
/***********
* 新增消费者
* ****************/
ConsumerPtr addConsumer(const std::string &id, const std::string &qname, ConsumerCallback_t callback, bool autoAck)
{
std::unique_lock<std::mutex> lck(_mtx);
// 判断消费者是否重复
for (auto &consumerPtr : _consumers)
{
if (consumerPtr->_id == id)
{
return consumerPtr;
}
}
ConsumerPtr ret = std::make_shared<Consumer>(id, qname, callback, autoAck);
_consumers.push_back(ret);
return ret;
}
/**************
* 移除消费者
* ***************/
void removeConsumer(const std::string &cid)
{
std::unique_lock<std::mutex> lck(_mtx);
for (auto it = _consumers.begin(); it != _consumers.end(); ++it)
{
if ((*it)->_id == cid)
{
_consumers.erase(it);
break;
}
}
}
/***************
* 负载均衡地获取一个消费者
* *************/
ConsumerPtr chooseConsumer()
{
std::unique_lock<std::mutex> lck(_mtx);
if (_consumers.size() == 0)
{
return nullptr;
}
_rotateOrder %= _consumers.size();
return _consumers[_rotateOrder++];
}
};
class ConsumerManager
{
private:
std::unordered_map<std::string, QueueConsumerManagerPtr> _qConsumerManagers;
std::mutex _mtx;
public:
ConsumerManager(const std::vector<std::string> &qnames)
{
for (const auto &qname : qnames)
{
_qConsumerManagers[qname] = std::make_shared<QueueConsumerManager>(qname);
}
}
/**************
* 初始化队列消费者管理句柄--新增队列时调用
* **************/
void initQueueConsumerManager(const std::string &qname)
{
std::unique_lock<std::mutex> lck(_mtx);
if (_qConsumerManagers.count(qname))
{
return;
}
_qConsumerManagers[qname] = std::make_shared<QueueConsumerManager>(qname);
}
/*******************
* 销毁指定的队列消费者管理句柄--销毁队列时调用
* ****************/
void removeQueueConsumerManager(const std::string &qname)
{
std::unique_lock<std::mutex> lck(_mtx);
_qConsumerManagers.erase(qname);
}
/*************
* 给指定队列新增消费者
* ************/
ConsumerPtr addConsumer(const std::string &id, const std::string &qname, ConsumerCallback_t callback, bool autoAck)
{
std::unique_lock<std::mutex> lck(_mtx);
if (_qConsumerManagers.count(qname) == 0)
{
LOG(WARNING) << "QueueConsumerManager not found, qname: " << qname << endl;
return nullptr;
}
return _qConsumerManagers[qname]->addConsumer(id, qname, callback, autoAck);
}
/***********
* 删除指定队列的消费者
* ****************/
void removeConsumer(const std::string &qname, const std::string &cid)
{
std::unique_lock<std::mutex> lck(_mtx);
if (_qConsumerManagers.count(qname) == 0)
{
LOG(WARNING) << "QueueConsumerManager not found, qname: " << qname << endl;
return;
}
_qConsumerManagers[qname]->removeConsumer(cid);
}
/*****************
* 获取指定队列的一个消费者
* **************/
ConsumerPtr chooseConsumer(const std::string& qname)
{
std::unique_lock<std::mutex> lck(_mtx);
if (_qConsumerManagers.count(qname) == 0)
{
LOG(WARNING) << "QueueConsumerManager not found, qname: " << qname << endl;
return nullptr;
}
return _qConsumerManagers[qname]->chooseConsumer();
}
};
}