文章目录
消费进度保存机制
在 消费者详解-启动流程中我们可以知道,在消费者启动是会同时启动位点管理器,RocketMQ设计了远程位点管理-RemoteBrokerOffsetStore和本地位点管理-LocalFileOffsetStore。集群消费时,消费者使用远程位点管理-RemoteBrokerOffsetStore,将消费位点提交给Broker保存;广播消费时,消费位点保存在消费者本地磁盘上-LocalFileOffsetStore。
客户端消费进度保存也叫消费进度持久化,开源RocketMQ4.2.0支持定时持久化和不定时持久化两种方式。
定时持久化
定时持久化位点实现方法是:org.apache.rocketmq.client.impl.factory.MQClientInstance.startScheduledTask(),定时任务在启动后延迟10000ms后执行第一次,后续每隔persistConsumerOffsetInterval
(默认5000ms)时间执行一次:
// 定时持久化所有消费位点
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
MQClientInstance.this.persistAllConsumerOffset()方法通过遍历当前MQClientInstance管理的所有消费者集合,调用每个消费者实现的*persistConsumerOffset()*方法持久化消费位点。
private void persistAllConsumerOffset() {
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
impl.persistConsumerOffset();
}
}
消费者实现又分为DefaultMQPullConsumerImpl和DefaultMQPushConsumerImpl,但两者的实现一致:都是先确定当前消费者的状态是否处于RUNNING
状态,然后获取processQueueTable
中记录的消费者正在使用中的MessageQueue队列,然后调用位点管理器offsetStore
的persistAll方法持久化获取到的MessageQueue队列。
###Pull
public void persistConsumerOffset() {
try {
this.isRunning();
Set<MessageQueue> mqs = new HashSet<MessageQueue>();
Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
mqs.addAll(allocateMq);
this.offsetStore.persistAll(mqs);
} catch (Exception e) {
log.error("group: " + this.defaultMQPullConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
}
}
###Push
public void persistConsumerOffset() {
try {
this.makeSureStateOK();
Set<MessageQueue> mqs = new HashSet<MessageQueue>();
Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
mqs.addAll(allocateMq);
this.offsetStore.persistAll(mqs);
} catch (Exception e) {
log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
}
}
-
集群模式下使用的是远程位点管理器-RemoteBrokerOffsetStore,通过遍历位点管理器中消费端本地内存保存的位点信息
offsetTable
(MessageQueue与ConsumeOffset的映射),找到那些消费位点不为空且消费者在使用的消费队列和消费位点,通过oneWay
形式发送RequestCode为UPDATE_CONSUMER_OFFSET的请求更新Broker该MessageQueue的消费位点,消费者不使用的消息队列则将其在offsetTable
中移除。public void persistAll(Set<MessageQueue> mqs) { if (null == mqs || mqs.isEmpty()) return; // 不是在使用的MessageQueue final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>(); // 遍历offsetTable for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) { MessageQueue mq = entry.getKey(); AtomicLong offset = entry.getValue(); // 如果offset不为空,且mq是在使用中的 if (offset != null) { if (mqs.contains(mq)) { try { // 更新Broker的消费位点 this.updateConsumeOffsetToBroker(mq, offset.get()); log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}", this.groupName, this.mQClientFactory.getClientId(), mq, offset.get()); } catch (Exception e) { log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e); } } else { // 不包含则添加进unusedMQ unusedMQ.add(mq); } } } // 移除那些不使用的消息队列 if (!unusedMQ.isEmpty()) { for (MessageQueue mq : unusedMQ) { this.offsetTable.remove(mq); log.info("remove unused mq, {}, {}", mq, this.groupName); } } } /** * Update the Consumer Offset in one way, once the Master is off, updated to Slave, here need to be optimized. */ private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { updateConsumeOffsetToBroker(mq, offset, true); } /** * Update the Consumer Offset synchronously, once the Master is off, updated to Slave, here need to be optimized. */ @Override public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { // 从brokerAddrTable查找消息队列对应的broker信息 FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); if (null == findBrokerResult) { // 找不到broker信息则尝试更新后再次获取 this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic()); findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName()); } if (findBrokerResult != null) { UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader(); requestHeader.setTopic(mq.getTopic()); requestHeader.setConsumerGroup(this.groupName); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setCommitOffset(offset); if (isOneway) { this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway( findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); } else { this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset( findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5); } } else { throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); } }
而在Broker服务端,Broker启动时会注册ConsumerManageProcessor作为消费进度处理器:
this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
,处理请求编码RequestCode为GET_CONSUMER_LIST_BY_GROUP、UPDATE_CONSUMER_OFFSET、QUERY_CONSUMER_OFFSET的请求:public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { switch (request.getCode()) { // 根据消费者组获取消费者组下的消费者列表 case RequestCode.GET_CONSUMER_LIST_BY_GROUP: return this.getConsumerListByGroup(ctx, request); // 更新消费位点 case RequestCode.UPDATE_CONSUMER_OFFSET: return this.updateConsumerOffset(ctx, request); // 查询消费位点 case RequestCode.QUERY_CONSUMER_OFFSET: return this.queryConsumerOffset(ctx, request); default: break; } return null; }
ConsumerManagerProcessor#updateConsumerOffset方法实际上是通过更新Broker的消费位点管理器ConsumerOffsetManager中保存管理的topic消费进度
offsetTable
:private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer/* queueId */, Long/* offset */>> offsetTable = new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
,以topic + @ + consumerGroup作为key保存topic在该消费者组下的每个消费队列queueId
的消费进度offset
。private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class); final UpdateConsumerOffsetRequestHeader requestHeader = (UpdateConsumerOffsetRequestHeader) request .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class); this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset()); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; } public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, final long offset) { // topic@group String key = topic + TOPIC_GROUP_SEPARATOR + group; this.commitOffset(clientHost, key, queueId, offset); } private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) { ConcurrentMap<Integer, Long> map = this.offsetTable.get(key); if (null == map) { map = new ConcurrentHashMap<Integer, Long>(32); map.put(queueId, offset); this.offsetTable.put(key, map); } else { Long storeOffset = map.put(queueId, offset); if (storeOffset != null && offset < storeOffset) { log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset); } } }
当然Broker这边也需要持久化
offsetTable
才能尽可能的保证消费进度不丢失,Broker有以下三种方式持久化消费位点:-
定时任务:Broker启动时会启动定时ConsumerOffsetManager持久化,将消费位点信息保存在Broker本地文件中,延迟10000ms执行,每隔
flushConsumerOffsetInterval
默认5000ms执行该任务,源码在org.apache.rocketmq.broker.BrokerController.initialize方法中:// 定时持久化任务 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerOffsetManager.persist(); } catch (Throwable e) { log.error("schedule persist consumerOffset error.", e); } } }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
-
关闭持久化:Broker在关闭
shutdown
时会调用ConsumerOffsetManager的persist
方法持久化 -
主从配置数据同步:Slave会启动slaveSynchronize同步服务,每隔10000ms同步Master Broker的TopicConfig、ConsumerOffset、DelayOffset、subscriptionGroupConfig数据。Slave会向Master Broker请求获取Master Broker中的所有消费位点信息,并将Master Broker的所有消费位点信息覆盖到Slave中,然后持久化
private void handleSlaveSynchronize(BrokerRole role) { if (role == BrokerRole.SLAVE) { if (null != slaveSyncFuture) { // 取消正在运行的slave同步任务,不强制中断 slaveSyncFuture.cancel(false); } this.slaveSynchronize.setMasterAddr(null); slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.slaveSynchronize.syncAll(); } catch (Throwable e) { log.error("ScheduledTask SlaveSynchronize syncAll error.", e); } } }, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS); } else { //handle the slave synchronise if (null != slaveSyncFuture) { slaveSyncFuture.cancel(false); } this.slaveSynchronize.setMasterAddr(null); } } public void syncAll() { // 同步Topic配置 this.syncTopicConfig(); // 同步消费者位点 this.syncConsumerOffset(); // 同步延迟位点 this.syncDelayOffset(); // 同步订阅关系配置 this.syncSubscriptionGroupConfig(); } private void syncConsumerOffset() { String masterAddrBak = this.masterAddr; // 自己不是master if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) { try { // 向Master Broker请求获取Master Broker的消费位点信息 ConsumerOffsetSerializeWrapper offsetWrapper = this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak); // 直接覆盖 this.brokerController.getConsumerOffsetManager().getOffsetTable() .putAll(offsetWrapper.getOffsetTable()); // 持久化 this.brokerController.getConsumerOffsetManager().persist(); log.info("Update slave consumer offset from master, {}", masterAddrBak); } catch (Exception e) { log.error("SyncConsumerOffset Exception, {}", masterAddrBak, e); } } }
-
-
广播模式下则是使用的是本地位点管理器-LocalFileOffsetStore,通过遍历
offsetTable
,将所有位点信息保存在OffsetSerializeWrapper中,并转换为json保存在文件中。public void persistAll(Set<MessageQueue> mqs) { if (null == mqs || mqs.isEmpty()) return; // 将所有位点信息保存在OffsetSerializeWrapper中,并转换为json保存在文件中 OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper(); for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) { if (mqs.contains(entry.getKey())) { AtomicLong offset = entry.getValue(); offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset); } } String jsonString = offsetSerializeWrapper.toJson(true); if (jsonString != null) { try { MixAll.string2File(jsonString, this.storePath); } catch (IOException e) { log.error("persistAll consumer offset Exception, " + this.storePath, e); } } }
不定时持久化
不定时持久化又叫Pull-And-Commit,也就是在执行pull方法的同时,把队列最新消费位点信息发给Broker。具体实现在*org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.pullMessage()*方法中。
该方法有两处持久化位点信息:
-
在消息拉去完成返回,如果拉取位点非法,则此时客户端会主动根据返回的offset更新本地消费,并且提交最新的消费位点信息给Broker,移除该MessageQueue的位点、ProcessQueue等信息:
case OFFSET_ILLEGAL: log.warn("the pull request offset illegal, {} {}", pullRequest.toString(), pullResult.toString()); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); pullRequest.getProcessQueue().setDropped(true); DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() { @Override public void run() { try { // 更新offsetTable messageQueue的位点新 DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false); // 持久化位点信息 LocalFile/Broker DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); // 移除processQueueTable中messageQueue的processQueue,设置dropped=true,并且再次持久化位点信息、移除offsetTable的mq的位点信息 DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); log.warn("fix the pull request offset, {}", pullRequest); } catch (Throwable e) { log.error("executeTaskLater Exception", e); } } }, 10000); break;
-
在消费者执行拉取消息动作之前,如果是集群消费模式(对应的
offsetStore
为RemoteBrokerOffsetStore
),则会尝试获取消费者内存中是否有该消息队列的消费位点commitOffsetValue
,如果有即commitOffsetValue > 0
,则commitOffsetEnable = true
,并且吧消费者本地最新的offset-commitOffsetValue
传递给Broker。boolean commitOffsetEnable = false; long commitOffsetValue = 0L; // 集群模式下,从消费者客户端内存offsetTable中读取messageQueue的消费位点,如果找到的commitOffsetValue大于0表示有消费位点记录 // commitOffsetEnable = true if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) { commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY); if (commitOffsetValue > 0) { commitOffsetEnable = true; } } String subExpression = null; boolean classFilter = false; SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (sd != null) { if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) { subExpression = sd.getSubString(); } classFilter = sd.isClassFilterMode(); } int sysFlag = PullSysFlag.buildSysFlag( commitOffsetEnable, // commitOffset true, // suspend subExpression != null, // subscription classFilter // class filter ); try { this.pullAPIWrapper.pullKernelImpl( pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this.defaultMQPushConsumer.getPullBatchSize(), sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, CommunicationMode.ASYNC, pullCallback ); } catch (Exception e) { log.error("pullKernelImpl exception", e); this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); }
Broker通过传入的
sysFlag
判断是否包含FLAG_COMMIT_OFFSET
标志位,判断Broker是否根据commitOffsetValue
更新当前消费队列的消费位点:private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) throws RemotingCommandException { ………… // 是否有commitOffset标志,Broker是否允许更新对应的消费位点 final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag()); ………… // brokerAllowSuspend表示Broker是否能挂起,如果Broker是挂起状态,brokerAllowSuspend=false; boolean storeOffsetEnable = brokerAllowSuspend; storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag; storeOffsetEnable = storeOffsetEnable && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE; if (storeOffsetEnable) { // 更新该消费者组订阅的topic的queueId消费队列的消费位点 this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel), requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset()); } return response; } public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, final long offset) { // topic@group String key = topic + TOPIC_GROUP_SEPARATOR + group; this.commitOffset(clientHost, key, queueId, offset); } private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) { ConcurrentMap<Integer, Long> map = this.offsetTable.get(key); if (null == map) { map = new ConcurrentHashMap<Integer, Long>(32); map.put(queueId, offset); this.offsetTable.put(key, map); } else { Long storeOffset = map.put(queueId, offset); if (storeOffset != null && offset < storeOffset) { log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset); } } }
只有当Broker不是挂起状态(
brokerAllowSuspend = true
)、包含FLAG_COMMIT_OFFSET
标志位和当前Broker不是Slave,才允许更新当前Broker中记录的消费队列的消费位点。
消费者关闭
在消费者关闭前,需要保存各种状态,以便在启动后恢复数据,其中包含消费者的消费进度信息,Push模式和Pull模式的shutdown流程大体一致(只是Push模式多个了ConsumeMessageService监听器的shutdown,这是Push模式独有的),都是通过调用this.persistConsumerOffset()
方法,该方法也被用于消费者的定时持久化消费位点,上面有解析不再赘述:
public void shutdown() {
shutdown(0);
}
public synchronized void shutdown(long awaitTerminateMillis) {
switch (this.serviceState) {
case CREATE_JUST:
break;
case RUNNING:
// shutdown consumeMessageService监听器
this.consumeMessageService.shutdown(awaitTerminateMillis);
// 持久化消费位点
this.persistConsumerOffset();
// 取消注册消费者
this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
this.mQClientFactory.shutdown();
log.info("the consumer [{}] shutdown OK", this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.destroy();
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
break;
case SHUTDOWN_ALREADY:
break;
default:
break;
}
}