文章目录
消费者的Rebalance机制
消费者客户端是通过Rebalance服务做到高可靠的。当发生Broker掉线、消费者实例掉线、Topic扩容等各种突发情况时,消费者组中的消费者实例通过Rebalance服务支持全部队列的正常消费。
Rebalance机制主要是在集群消费模式下,当发生Broker掉线、消费者实例掉线、Topic扩容等各种突发情况时,导致相同消费者组下消费者消费的消息队列不均衡,利用Rebalance机制再次根据设定的策略(默认是平均分配策略)重新分配消息队列给相同消费者组的消费者,达到负载均衡的目的。
而在广播消费模式下,由于广播消费模式就是同一个消费者组下所有消费者消费所有消息,不需要负载均衡。
所以后续的Rebalance流程主要讲解的是集群消费模式的情况下
Rebalance服务类RebalanceImpl有RebalancePullImpl和RebalancePushImpl两个重平衡实现类,分别被DefaultMQPullConsumer和DefaultMQPushConsumer使用。
RebalanceImpl核心属性
- processQueueTable:记录MessageQueue和ProcessQueue的关系,MessageQueue可以简单理解为ConsumeQueue的客户端实现,ProcessQueue是保存Pull消息的本地容器
- topicSubscribeInfoTable:Topic路由信息,保存Topic和MessageQueue的关系
- subscriptionInner:记录topic对应的订阅关系,保存当前消费者组订阅了哪些Topic的哪些tag
- allocateMessageQueueStrategy:MessageQueue消费分配策略的实现
RebalanceImpl、RebalancePushImpl、RebalancePullImpl是Rebalance的核心实现,主要逻辑都在RebalanceImpl中,RebalancePushImpl、RebalancePullImpl是Pull消费者和Push消费者对Rebalance的需求不同,在各自的实现里重写了部分方法,以满足自身需求。
DefaultMQPullConsumer Rebalance流程
首先我们知道Pull模式下,需要用户主动获取、维护topic对应的消息队列的消费进度offset,主动消息队列拉取消息并消息更新/上报消费进度,所以Pull模式下不会使用到ProcessQueue
——Pull消息本地容器processQueueTable
,这个是Push模式下存储主动拉取消息的作用。
所以Pull模式下的集群消费模式的重平衡主要起的作用是重新分配消费者组下的消费者的消息队列。
DefaultMQPullConsumer服务通过启动MQClientInstance,在MQClientInstance#start()启动方法中会启动RebalanceService重平衡服务线程,RebalanceService重平衡服务会每隔20000ms执行*MQClientInstance#doRebalance()*方法,且第一次会等待20000ms:
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
// 等待间隔20000ms,如果hasNotified为true(被唤醒),则重置为false返回
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
在MQClientInstance#doRebalance()方法中,通过遍历consumerTable
中所有注册的消费者客户端,根据消费者的实际类型调用doRebalance方法,DefaultMQPullConsumer调用的是*DefaultMQPullConsumerImpl#doRebalance()*方法:
public void doRebalance() {
// 遍历所有的消费者实例,执行doRebalance方法
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
impl.doRebalance();
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
}
public void doRebalance() {
// 重平衡服务不为空,调用重平衡服务的doRebalance方法
if (this.rebalanceImpl != null) {
// isOrder默认传false
this.rebalanceImpl.doRebalance(false);
}
}
RebalanceImpl#doRebalance方法通过遍历消费者客户端内存中的Topic订阅信息,对每个订阅信息通过Topic进行重平衡。待全部的Topic都重平衡后,调用truncateMessageQueueNotMyTopic方法将不属于当前消费者订阅的topic的本地消息容器ProcessQueue移除(由于在Pull模式下,消费者的订阅的topic的内容不会发生变化,所以并没什么用)。
public void doRebalance(final boolean isOrder) {
// 遍历当前消费者的全部topic对应的订阅关系
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
// 对每个Topic进行Rebalance
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
// push、pull可以调用接口去掉订阅指定的topic
// 将不属于当前消费者订阅的topic的队列快照ProcessQueue移除
this.truncateMessageQueueNotMyTopic();
}
private void truncateMessageQueueNotMyTopic() {
// 获取当前消费者的Topic订阅信息
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
// 移除processQueueTable中不是当前消费者订阅的topic的消息队列MessageQueue关联的本地消息容器
for (MessageQueue mq : this.processQueueTable.keySet()) {
if (!subTable.containsKey(mq.getTopic())) {
ProcessQueue pq = this.processQueueTable.remove(mq);
if (pq != null) {
// ProcessQueue的dropped=true,不会再自动拉取消息
pq.setDropped(true);
log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq);
}
}
}
}
rebalanceByTopic方法根据messageModel消息消费模式分:
-
CLUSTERING(集群消费模式):
-
获取每个topic的消息队列信息-
Set<MessageQueue> mqSet
,然后根据topic获取topic对应的Broker Addr,请求Broker获取consumerGroup的下所有注册的消费者clientId列表-List<String> cidAll
,当topic存在消息队列信息且从Broker中获取指定消费者组注册的消费者信息成功则需要尝试重新分配消息队列。 -
而对
mqAll
、cidAll
(topic消息队列,消费者组的clientId列表)进行排序,目的在于保证所有消费者客户端在执行Rebalance时,看到的MessageQueue列表和消费者id列表都是一样的才能保证Rebalance不会出错。 -
接下来就是根据消息队列分配策略
AllocateMessageQueueStrategy
(默认是平均分配策略——AllocateMessageQueueAveragely
)选择分配给当前消费者哪些消息队列。 -
重新分配之后可能会造成当前消费者订阅的topic的消息队列改变,需要调用接口updateProcessQueueTableInRebalance方法更新
processQueueTable
信息,如果根据重平衡分配给当前消费者的MessageQueue发生变化,则调用messageQueueChanged方法更新本地订阅关系版本,修改本地消费者限流的一些参数,然后发送心跳,通知所有Broker当前订阅关系发生了变化
-
-
BROADCASTING(广播消费模式):
从源码中我们可以看到,对比集群集群消费模式,广播消费模式下不需要去重新分配消费者的消息队列,因为每个消费者都是拿到全量的消息队列去消费。
而调用接口updateProcessQueueTableInRebalance方法和messageQueueChanged方法对于Pull模式并没有用,就不再赘述。
private void rebalanceByTopic(final String topic, final boolean isOrder) {
// 根据消费模式进行Rebalance
switch (messageModel) {
// 广播模式
case BROADCASTING: {
// 获取topic路由信息
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
// 根据分配的MessageQueue mqSet,push模式下查找有没有ProcessQueue拉取时间过期,过期则创建一个新ProcessQueue,并拉取消息
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
// 如果changed=true表示重平衡分配给当前消费者的MessageQueue发生变化
// 则更新本地订阅关系版本,修改本地消费者限流的一些参数,然后发送心跳,通知所有Broker当前订阅关系发生了变化
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
consumerGroup,
topic,
mqSet,
mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
break;
}
// 集群模式
case CLUSTERING: {
// 1、获取当前消费者订阅的topic对应的所有MessageQueue
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// 根据topic获取topic对应的Broker Addr,请求Broker获取consumerGroup的下所有注册的消费者clientId
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
// topic对应的消息队列为空,但是topic也不是重试消息,topic不存在
// 重试topic不做重平衡
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
}
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}
// 只有两者都不为空时
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
mqAll.addAll(mqSet);
// 2、排序的目的在于保证所有消费者客户端在执行Rebalance时,看到的MessageQueue列表和消费者id列表都是一样的才能保证Rebalance不会出错
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
// 3、按照设置的队列分配策略对当前消费者进行消息队列分配
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
// 根据分配策略重平衡后的MessageQueue列表更新processQueueTable
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
"rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
allocateResultSet.size(), allocateResultSet);
// 如果changed=true表示根据重平衡分配给当前消费者的MessageQueue发生变化
// 则更新本地订阅关系版本,修改本地消费者限流的一些参数,然后发送心跳,通知所有Broker当前订阅关系发生了变化
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
break;
}
default:
break;
}
}
DefaultMQPushConsumer Rebalance流程
Push模式下,PullMessageService拉取消息服务会根据pullRequestQueue
-pull请求列表中的PullRequest
-pull请求对象拉取对应的消息存储到processQueue的msgTreeMap
中,并自动管理消费位点,成功拉取到消息会会通过Callback,回调用户注册的监听器MessageListener#consumeMessage消费消息。
所以Push模式下,重平衡不仅仅需要重新分配消费者的消息队列,还得处理消息队列的processQueue-Pull本地消息容器。
DefaultMQPushConsumer启动的时候除了会启动和DefaultMQPullConsumer一样的重平衡服务rebalanceService,启动成功后还会调用*this.mQClientFactory.rebalanceImmediately()*方法立即唤醒rebalanceService服务进行重平衡操作。
public void rebalanceImmediately() {
this.rebalanceService.wakeup();
}
接下来和DefaultMQPullConsumer一样,rebalanceService服务通过调用*MQClientInstance#doRebalance()方法,通过遍历所有的消费者,调用消费者实现的doRebalance()方法实现重平衡,DefaultMQPushConsumer实现的doRebalance()*方法源码如下:
@Override
public void doRebalance() {
// pause默认false
if (!this.pause) {
// consumeOrderly默认false,是否顺序消费
this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
}
}
调用rebalanceImpl.doRebalance方法,遍历当前消费者(假定是DefaultMQPullConsumer)的全部topic订阅关系,对每个topic进行重平衡操作。完成重平衡后,还需要对将不属于当前消费者订阅的topic的队列快照ProcessQueue移除。
因为DefaultMQPullConsumer可以通过unsubscribe方法移除对应topic的订阅,所以在这里通过判断是否有不是当前消费者的topic订阅信息关联的processQueueTable
的Pull消息容器来移除对应的ProcessQueue,并设置ProcessQueue的dropped=true
public void doRebalance(final boolean isOrder) {
// 遍历当前消费者的全部topic对应的订阅关系
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
// 对每个Topic进行Rebalance
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
// 将不属于当前消费者订阅的topic的队列快照ProcessQueue移除
this.truncateMessageQueueNotMyTopic();
}
private void truncateMessageQueueNotMyTopic() {
// 获取当前消费者的Topic订阅信息
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
// 移除processQueueTable中不是当前消费者订阅的topic的消息队列MessageQueue关联的本地消息容器
for (MessageQueue mq : this.processQueueTable.keySet()) {
if (!subTable.containsKey(mq.getTopic())) {
ProcessQueue pq = this.processQueueTable.remove(mq);
if (pq != null) {
pq.setDropped(true);
log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", consumerGroup, mq);
}
}
}
}
rebalanceByTopic方法根据messageModel消息消费模式分:
-
CLUSTERING(集群消费模式):
1-3步的流程与上述的Pull模式的集群消费模式的1-3步一致,不一样是从第4步开始:
-
调用updateProcessQueueTableInRebalance方法根据重平衡后分配的消息队列更新
processQueueTable
:-
遍历
processQueueTable
-pull消息本地容器,如果之前有拉取过消息,则需要判断该topic重平衡后分配的消息队列mqSet
是否包含有processQueueTable
关联的消息队列mq
,如果过不包含,证明不再分配给当前的消费者,设置dropped=true
,及时阻止继续向该消息处理队列进行消息拉取,调用removeUnnecessaryMessageQueue方法固化移除不再需要的消息队列;如果
ProcessQueue
记录的最近拉取消息的时间距离当前时间超过PULL_MAX_IDLE_TIME(默认120000ms)
,表示拉取消息时间超时,表示没有消息或者Broker服务不可用,Push模式下则设置dropped=true
,及时阻止继续向该消息处理队列进行消息拉取,调用removeUnnecessaryMessageQueue方法固化移除不再需要的消息队列 -
遍历重平衡后分配的消息队列
mqSet
,如果存在新增的消息队列,且processQueueTable
中没有对应的pull消息容器对象,则需要创建对应的ProcessQueue对象。首先判断topic消息是否是顺序消费,且请求Broker锁住对应的消息队列是否成功,如果是顺序消费且没锁住消息队列,则返回继续下个循环。
调用this.removeDirtyOffset方法移除RemoteBrokerOffsetSore的
offsetTable
内存中mq对应的offset记录(LocalFileOffsetStore不做任何处理),集群模式下offset默认为RemoteBrokerOffsetStore,广播模式则默认是LocalFileOffsetStore然后调用computePullFromWhereWithException方法根据消费者配置的
consumeFromWhere
决定怎么获取offset:- CONSUME_FROM_LAST_OFFSET:从Broker中获取消费位点offset,Broker从内存中offsetTable获取,如果获取不到,如果是新的消费者、或者订阅了很久之前的topic,且消息还在Broker内存中,则返回0,否则报错找不到;并更新Broker的offset到当前消费者的消费进度offsetTable内存中。如果查询返回的offset>=0表示正常的offset值;等于-1则表示消费者组第一次启动还没消费过,则判断如果是重试消息topic则返回0,不是则向Broker请求获取mq对应的ConsumeQueue最新的消息索引所在位点;其他不正常情况则返回-1
- CONSUME_FROM_FIRST_OFFSET:从Broker中获取消费位点offset,如果是正常返回>=0,则直接返回,如果等于-1,则返回0最小位点,其他异常情况返回-1
- CONSUME_FROM_TIMESTAMP:从Broker中获取消费位点offset,如果正常返回>=0,则直接返回;如果等于-1,则判断如果是重试消息,则向Broker请求获取mq对应的ConsumeQueue最新的消息索引所在位点,否则根据配置的
consumeTimestamp(默认半小时前)
查询mq对应的ConsumeQueue中保存的消息索引的消息存储时间最接近指定的consumeTimestamp
的位点;其他异常情况返回-1。
如果上面返回的offset>=0表示获取到mq的offset成功,通过PullRequest保存消费者组、offset、mq、pq,并添加到
pullRequestList
中,调用this.dispatchPullRequest(pullRequestList);
方法通过PullMessageService拉取消息。不管MessageQueue是新增还是减少,都会设置
changed=true
,表示当前消费者消费的MessgaeQueue有变化
-
-
如果当前消费者消费的MessgaeQueue有变化,则调用
this.messageQueueChanged(topic, mqSet, allocateResultSet);
更新本地订阅关系版本,修改本地消费者限流的一些参数,然后发送心跳,通知所有Broker当前订阅关系发生了变化
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) { boolean changed = false; // 1、遍历processQueueTable pull消息本地容器 Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator(); while (it.hasNext()) { Entry<MessageQueue, ProcessQueue> next = it.next(); MessageQueue mq = next.getKey(); ProcessQueue pq = next.getValue(); // 找到topic对应的消息队列 if (mq.getTopic().equals(topic)) { // 如果重平衡后的MessageQueue队列不包含旧的MessageQueue,表示不再分配给当前的消费者消费,设置dropped为true if (!mqSet.contains(mq)) { pq.setDropped(true); // 将当前消费者不再使用的消息队列的移除 if (this.removeUnnecessaryMessageQueue(mq, pq)) { // 移除消息队列对应的ProcessQueue it.remove(); changed = true; log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq); } // 如果继续用到该mq // push模式下,如果消息队列MessageQueue拉取超时,需要清除对应的消息队列 } else if (pq.isPullExpired()) { switch (this.consumeType()) { // pull case CONSUME_ACTIVELY: break; // push case CONSUME_PASSIVELY: pq.setDropped(true); // 移除不再需要的消息队列 if (this.removeUnnecessaryMessageQueue(mq, pq)) { // 移除消息队列对应的ProcessQueue it.remove(); changed = true; log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it", consumerGroup, mq); } break; default: break; } } } } // 2、拉取新增加的消息队列的消息 List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); for (MessageQueue mq : mqSet) { // processQueueTable中没有对应新增mq的消息 if (!this.processQueueTable.containsKey(mq)) { // 如果是顺序消费 且 请求Broker锁住mq成功 if (isOrder && !this.lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; } // RemoteBrokerOffsetStore移除offsetTable中mq的offset记录,LocalFileOffsetStore不做处理 this.removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue(); long nextOffset = -1L; try { // PullConsumer返回0 // PushConsumer根据ConsumeFromWhere策略获取offset nextOffset = this.computePullFromWhereWithException(mq); } catch (Exception e) { log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq); continue; } // nextOffset>=0表示获取消费位点成功,保存ProcessQueue和MessageQueue关系 if (nextOffset >= 0) { ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre != null) { log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); } else { // 新增的MessageQueue,将ProcessQueue放进pullRequestList log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } else { log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); } } } // PullConsumer不做动作 // PushConsumer将pullRequestList逐个放到PullMessageService的pullRequestQueue中,然后执行拉取消息 this.dispatchPullRequest(pullRequestList); // 返回ProcessQueue是否有变更 return changed; } public boolean isPullExpired() { return (System.currentTimeMillis() - this.lastPullTimestamp) > PULL_MAX_IDLE_TIME; } public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) { // 固化消费消息队列的offset this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq); // 移除offsetTable中mq对应的offset记录 this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq); // 当消费者是有序消费且消费模式是集群模式时 if (this.defaultMQPushConsumerImpl.isConsumeOrderly() && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) { try { if (pq.getConsumeLock().tryLock(1000, TimeUnit.MILLISECONDS)) { try { return this.unlockDelay(mq, pq); } finally { pq.getConsumeLock().unlock(); } } else { log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", mq, pq.getTryUnlockTimes()); pq.incTryUnlockTimes(); } } catch (Exception e) { log.error("removeUnnecessaryMessageQueue Exception", e); } return false; } return true; } public long computePullFromWhereWithException(MessageQueue mq) throws MQClientException { long result = -1; final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere(); final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore(); switch (consumeFromWhere) { case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST: case CONSUME_FROM_MIN_OFFSET: case CONSUME_FROM_MAX_OFFSET: // 从最新的消息的位点开始消费 case CONSUME_FROM_LAST_OFFSET: { /* Local: 读取offset分ReadOffsetType,READ_FROM_MEMORY表示只从内存offsetTable读,READ_FROM_STORE表示只从硬盘读取文件记录的offset信息 MEMORY_FIRST_THEN_STORE则表示先从内存读,读不到再从文件中读取。获取不到mq的offset则返回-1 Remote: 从Broker中获取消费位点offset,Broker从内存中offsetTable获取, 如果获取不到,如果是新的消费者、或者订阅了很久之前的topic,且消息还在Broker内存中,则返回0,否则报错找不到 并更新Broker的offset到当前消费者的消费进度offsetTable内存中 */ long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); if (lastOffset >= 0) { result = lastOffset; } // First start,no offset // lastOffset=-1表示消费者组第一次启动没有消费过 else if (-1 == lastOffset) { // 如果消费队列的topic是重试消息的topic,返回0 if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { result = 0L; } else { try { // 向Broker请求获取mq对应的ConsumeQueue最新的消息索引所在位点 result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e); throw e; } } } else { result = -1; } break; } // 从ConsumeQueue最小位点开始消费 case CONSUME_FROM_FIRST_OFFSET: { // 从硬盘文件、Broker中获取mq对应的已固化的最近offset long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); if (lastOffset >= 0) { result = lastOffset; // lastOffset=-1表示消费者第一次启动没有消费过 } else if (-1 == lastOffset) { // 返回最小位点 0 result = 0L; } else { result = -1; } break; } // 从指定时间consumeTimestamp开始消费 case CONSUME_FROM_TIMESTAMP: { // 从硬盘文件、Broker中获取mq对应的已固化的最近offset long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); if (lastOffset >= 0) { result = lastOffset; // lastOffset=-1表示消费者第一次启动没有消费过 } else if (-1 == lastOffset) { // mq对应的消息是重试消息 if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { try { // 向Broker请求获取mq对应的ConsumeQueue最新的消息索引所在位点 result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e); throw e; } } else { try { long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(), UtilAll.YYYYMMDDHHMMSS).getTime(); // 请求Broker获取mq对应的距离timestamp最近的消息所在位点 result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); } catch (MQClientException e) { log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e); throw e; } } } else { result = -1; } break; } default: break; } return result; } @Override public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { /** * When rebalance result changed, should update subscription's version to notify broker. * Fix: inconsistency subscription may lead to consumer miss messages. */ // 设置新的本地订阅关系版本 SubscriptionData subscriptionData = this.subscriptionInner.get(topic); long newVersion = System.currentTimeMillis(); log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(), newVersion); subscriptionData.setSubVersion(newVersion); // 根据ProcessQueue重新设置流控策略的参数 int currentQueueCount = this.processQueueTable.size(); if (currentQueueCount != 0) { // 一个Topic最大能缓存的消息数。超过该值则采取拉取流控措施。默认是-1,表示不做限制 // 该值根据pullThresholdForQueue的配置决定是否生效,pullThresholdForTopic优先级低于pullThresholdForQueue int pullThresholdForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForTopic(); if (pullThresholdForTopic != -1) { // 设置pullThresholdForTopic为newVal,最小为1,pullThresholdForTopic/currentQueueCount表示当前消费者每个ProcessQueue能缓存的消息数 int newVal = Math.max(1, pullThresholdForTopic / currentQueueCount); log.info("The pullThresholdForQueue is changed from {} to {}", this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForQueue(), newVal); this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdForQueue(newVal); } // 一个Topic最大能缓存的消息数 int pullThresholdSizeForTopic = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForTopic(); // 同上 if (pullThresholdSizeForTopic != -1) { int newVal = Math.max(1, pullThresholdSizeForTopic / currentQueueCount); log.info("The pullThresholdSizeForQueue is changed from {} to {}", this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForQueue(), newVal); this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(newVal); } } // notify broker // 发送心跳通知所有Broker订阅关系发送变化 this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock(); }
-
重平衡分配策略-AllocateMessageQueueStrategy
RocketMQ中重平衡分配策略有以下几种:
AllocateMachineRoomNearby-基于机房就近原则分配策略
用户需要实现MachineRoomResolver接口,实现brokerDeployIn
和consumerDeployIn
两个方法,决定判断消息队列属于哪个机房、消费者属于哪个机房。需要注意的是用户使用这个策略还需要传入一个真正分配消息队列的分配策略(可以是用户自定义实现也可以是其他剩余分配策略)。
-
分别通过用户实现的MachineRoomResolver接口的
brokerDeployIn
和consumerDeployIn
方法计算出传入的消息队列mqAll
所在的机房映射和传入的消费者cidAll
所在机房的映射 -
查询当前消费者所在机房
currentMachineRoom
,根据currentMachineRoom
查找消费者所在机房的mq列表mqInThisMachineRoom
和消费者列表consumerInThisMachineRoom
如果当前消费者所在的机房的mq列表
mqInThisMachineRoom
不为空,则通过传入的分配策略进行实际分配,即当前消费者所在机房的Broker消息队列只分配给相同机房的消费者;如果当前消费者所在的机房没有mq队列
mqInThisMachineRoom
,则遍历所有mq的机房,将那些没有消费者的机房的消息队列根据分配策略分配给插入消费者组下的所有消费者cidAll
。
假设有三个机房,实际负载策略使用算法-AllocateMessageQueueAveragely,机房1和机房3中存在消费者,机房2没有消费者。机房1、机房3中的队列会分配给各自机房中的消费者,机房2的队列会被消费者平均分配。
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}
//group mq by machine room
Map<String/*machine room */, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>();
for (MessageQueue mq : mqAll) {
// 计算broker所在机房
String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq);
if (StringUtils.isNoneEmpty(brokerMachineRoom)) {
if (mr2Mq.get(brokerMachineRoom) == null) {
mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>());
}
mr2Mq.get(brokerMachineRoom).add(mq);
} else {
throw new IllegalArgumentException("Machine room is null for mq " + mq);
}
}
//group consumer by machine room
Map<String/*machine room */, List<String/*clientId*/>> mr2c = new TreeMap<String, List<String>>();
for (String cid : cidAll) {
// 计算消费者所在机房
String consumerMachineRoom = machineRoomResolver.consumerDeployIn(cid);
if (StringUtils.isNoneEmpty(consumerMachineRoom)) {
if (mr2c.get(consumerMachineRoom) == null) {
mr2c.put(consumerMachineRoom, new ArrayList<String>());
}
mr2c.get(consumerMachineRoom).add(cid);
} else {
throw new IllegalArgumentException("Machine room is null for consumer id " + cid);
}
}
List<MessageQueue> allocateResults = new ArrayList<MessageQueue>();
//1.allocate the mq that deploy in the same machine room with the current consumer
// 计算当前消费者所在机房
String currentMachineRoom = machineRoomResolver.consumerDeployIn(currentCID);
// 获取当前消费者所在机房中的mq列表和消费者列表
List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom);
List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom);
if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) {
// 如果当前消费者所在的机房的mq列表不为空,则通过传入的分配策略再次进行实际分配,即当前消费者所在机房的Broker消息队列只分配给相同机房的消费者
allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom));
}
//2.allocate the rest mq to each machine room if there are no consumer alive in that machine room
// 如果当前消费者所在的机房没有mq队列,则遍历所有mq的机房,将那些没有消费者的机房的消息队列根据分配策略分配给消费者组下的所有消费者
for (String machineRoom : mr2Mq.keySet()) {
if (!mr2c.containsKey(machineRoom)) { // no alive consumer in the corresponding machine room, so all consumers share these queues
allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mr2Mq.get(machineRoom), cidAll));
}
}
return allocateResults;
}
AllocateMessageQueueAveragely-平均散列队列算法
该分配策略是尽量平均的分配消息队列给消费者:
- 如果消息队列
mqAll
的长度小于消费者cidAll
个数,则按顺序(在分配策略前已经调用了排序mqAll
、cidAll
)将消息队列逐个分配给消费者,后面不够分配的则没有 - 如果消息队列
mqAll
的长度等于消费者cidAll
个数,那么就能完全均分 - 如果消息队列
mqAll
的长度大于消费者cidAll
个数,那么多出来的消息队列分配给排在前面的消费者(即下标小于mod
的消费者多分配一个,大于mod
的消费者则均分)
- 获取当前消费者的下标,即第几个消费者,获取消息队列
mqAll
大小与消费者队列cidAll
大小的mod,如果为零,则表明可以均分,反则不能均分 - 计算平均分配数量,如果消息队列
mqAll
的长度小于等于消费者cidAll
个数,则可分配的平均数量为1;如果大于则判断mod
是否大于0且当前消费者的下标index
是否小于mod
,如果是则表示当前消费者需要多分配1个,平均分配数量为mqAll.size() / cidAll.size()+1
,否则就是平均分配mqAll.size() / cidAll.size()
- 计算分配给当前消费者的消费队列的开始位置(因为消息队列分配是从0分配给消费者,需要分配多少个则从0开始计算多少个,下一个消费者分配则从上一个消费者的分配结束的下一个下标开始分配),所以判断
mod
是否大于0且当前消费者的下标index
是否小于mod
,如果是则startIndex = index * averageSize
,否则startIndex = index * averageSize + mod
,加上mod
值是为了算上前面消费者多分配的个数 - 计算当前消费者分配几个队列
range
,比较averageSize
和mqAll.size() - startIndex
的最小值是为了队列的长度小于等于消费者的个数时,后续的分配的队列为0个 - 根据
range
分配数量,分配消息队列mqAll
的从startIndex % mqAll.size()
开始到(startIndex + range -1) % mqAll.size()
的下标的消息队列给当前消费者
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}
// 获取当前consumer是第几个consumer
int index = cidAll.indexOf(currentCID);
// 消息队列数与消费者数的模,如果mod不为零,说明不能完全平均
int mod = mqAll.size() % cidAll.size();
// 如果队列的长度小于等于消费者的个数,那么就一个一个队列,屁股后面的就没有队列消费。
// 如果mod为0的话 那么大家完全平均分
// 如果mod不为0,那么当前消费者所处的位置小于mod,那就要多负载一个队列。 大于mod就消费是平均数
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
// 消费队列的开始位置
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
// 决定到底消费几个队列,去averageSize和mqAll.size() - startIndex的最小值是为了队列的长度小于等于消费者的个数时,后续的分配的队列为0个
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}
AllocateMessageQueueAveragelyByCircle-循环平均散列队列算法
该算法就是基于消费者队列cidAll
,根据当前消费者的index和消费队列大小,分配那些消息队列的下标与消费者大小的模等于当前消费者的index的消费队列给当前消费者。就像消费者队列cidAll
是一个分片的圆,消费者队列从0开始循环分配给消费者。
当消息队列个数小于可消费客户端时,消息队列与客户端对应情况如左侧图;当消息队列个数大于可消费客户端时,消息队列与客户端对应情况如右侧图:
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}
// 计算当前消费者是第几个消费者
int index = cidAll.indexOf(currentCID);
// 根据当前消费者的index和消费队列大小,分配那些消息队列的下标与消费者大小的模等于当前消费者的index的消费队列给当前消费者
for (int i = index; i < mqAll.size(); i++) {
if (i % cidAll.size() == index) {
result.add(mqAll.get(i));
}
}
return result;
}
AllocateMessageQueueByConfig-根据配置分配
直接根据用户传入的消息队列列表分配
public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy {
private List<MessageQueue> messageQueueList;
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
return this.messageQueueList;
}
@Override
public String getName() {
return "CONFIG";
}
public List<MessageQueue> getMessageQueueList() {
return messageQueueList;
}
public void setMessageQueueList(List<MessageQueue> messageQueueList) {
this.messageQueueList = messageQueueList;
}
}
AllocateMessageQueueByMachineRoom-机房哈希队列算法
只消费特定broker中的消息,通过用户调用setConsumeridcs()
方法设置consumeridcs
。如下图所示,第一张图是消费者小于队列数情况,第二张图是消费者多余队列数情况。假设有三个机房,配置机房三不在消费者的服务范围内,则实际消费对应关系如下两图所示。
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
List<MessageQueue> result = new ArrayList<MessageQueue>();
// 计算当前消费者所在位置
int currentIndex = cidAll.indexOf(currentCID);
if (currentIndex < 0) {
return result;
}
// 获取指定机房consumeridcs的消息队列
List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
for (MessageQueue mq : mqAll) {
String[] temp = mq.getBrokerName().split("@");
if (temp.length == 2 && consumeridcs.contains(temp[0])) {
premqAll.add(mq);
}
}
// 计算指定机房的消息队列与消费者平均分配数
int mod = premqAll.size() / cidAll.size();
// 获取指定机房的消息队列与消费者的模
int rem = premqAll.size() % cidAll.size();
// 获取消息队列分配开始位置
int startIndex = mod * currentIndex;
// 获取消息队列分配结束位置
int endIndex = startIndex + mod;
// 分配
for (int i = startIndex; i < endIndex; i++) {
result.add(premqAll.get(i));
}
// 如果当前消费者小于模的值rem,则多分配一个后面多余的
if (rem > currentIndex) {
result.add(premqAll.get(currentIndex + mod * cidAll.size()));
}
return result;
}
AllocateMessageQueueConsistentHash-一致性哈希队列算法
可以先看一下一致性hash算法,看完就明白了:一致性hash算法 - consistent hashing
使用一致性哈希算法进行负载,每次负载都会重新创建一致性hash路由表,获取本地客户端负责的所有队列信息。RocketMQ默认的hash算法为MD5。假设有4个客户端的clientId和两个消息队列mq1,mq2,,通过hash后分布在hash环的不同位置,按照一致性hash的顺时针查找原则,mq1被client2消费,mq2被client3消费。
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");
}
if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return result;
}
// ClientNode集合
Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();
for (String cid : cidAll) {
cidNodes.add(new ClientNode(cid));
}
// 构建hash环
final ConsistentHashRouter<ClientNode> router; //for building hash ring
if (customHashFunction != null) {
router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);
} else {
router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);
}
List<MessageQueue> results = new ArrayList<MessageQueue>();
// 遍历mqAll的消息队列,
for (MessageQueue mq : mqAll) {
// 获取mq的hash对应分配的虚拟节点的实际节点
ClientNode clientNode = router.routeNode(mq.toString());
// 如果clientNode不为空且分配的实际节点就是本身当前的消费者,表示该mq分配给自己
if (clientNode != null && currentCID.equals(clientNode.getKey())) {
results.add(mq);
}
}
return results;
}
public ConsistentHashRouter(Collection<T> pNodes, int vNodeCount, HashFunction hashFunction) {
if (hashFunction == null) {
throw new NullPointerException("Hash Function is null");
}
this.hashFunction = hashFunction;
// 创建节点key hash与虚拟节点的关系以及虚拟节点和物理节点的关系映射
if (pNodes != null) {
for (T pNode : pNodes) {
addNode(pNode, vNodeCount);
}
}
}
public void addNode(T pNode, int vNodeCount) {
if (vNodeCount < 0)
throw new IllegalArgumentException("illegal virtual node counts :" + vNodeCount);
// 判断pNode是否已经存在在ring中有虚拟节点,有则返回重复个数
int existingReplicas = getExistingReplicas(pNode);
// 根据配置的创建虚拟节点个数vNodeCount,创建虚拟节点,hash映射该虚拟节点放进ring中
for (int i = 0; i < vNodeCount; i++) {
VirtualNode<T> vNode = new VirtualNode<T>(pNode, i + existingReplicas);
ring.put(hashFunction.hash(vNode.getKey()), vNode);
}
}
public T routeNode(String objectKey) {
if (ring.isEmpty()) {
return null;
}
// 计算objectKey hash值
Long hashVal = hashFunction.hash(objectKey);
// 获取key大于等于hashVal的ring的自己
SortedMap<Long, VirtualNode<T>> tailMap = ring.tailMap(hashVal);
// 如果tailMap不为空,则获取tailMap的第一个对象的key(hash)
// 否则获取ring的第一个key(此时objectKey的hash在ring中是最大的,所以objectKey应该分配给ring第一个虚拟节点)
Long nodeHashVal = !tailMap.isEmpty() ? tailMap.firstKey() : ring.firstKey();
// 返回object分配的虚拟节点的物理节点
return ring.get(nodeHashVal).getPhysicalNode();
}
参考文档
RocketMQ之八:水平扩展及负载均衡详解
RocketMQ分布式消息中间件核心原理与最佳实践
一致性hash算法 - consistent hashing