0
点赞
收藏
分享

微信扫一扫

消费者详解-消费进度保存机制

時小白 2022-04-08 阅读 87
java

文章目录

消费进度保存机制

在 消费者详解-启动流程中我们可以知道,在消费者启动是会同时启动位点管理器,RocketMQ设计了远程位点管理-RemoteBrokerOffsetStore和本地位点管理-LocalFileOffsetStore。集群消费时,消费者使用远程位点管理-RemoteBrokerOffsetStore,将消费位点提交给Broker保存;广播消费时,消费位点保存在消费者本地磁盘上-LocalFileOffsetStore

客户端消费进度保存也叫消费进度持久化,开源RocketMQ4.2.0支持定时持久化不定时持久化两种方式。

定时持久化

定时持久化位点实现方法是:org.apache.rocketmq.client.impl.factory.MQClientInstance.startScheduledTask(),定时任务在启动后延迟10000ms后执行第一次,后续每隔persistConsumerOffsetInterval(默认5000ms)时间执行一次:

    // 定时持久化所有消费位点
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

      @Override
      public void run() {
        try {
          MQClientInstance.this.persistAllConsumerOffset();
        } catch (Exception e) {
          log.error("ScheduledTask persistAllConsumerOffset exception", e);
        }
      }
    }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

MQClientInstance.this.persistAllConsumerOffset()方法通过遍历当前MQClientInstance管理的所有消费者集合,调用每个消费者实现的*persistConsumerOffset()*方法持久化消费位点。

    private void persistAllConsumerOffset() {
        Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, MQConsumerInner> entry = it.next();
            MQConsumerInner impl = entry.getValue();
            impl.persistConsumerOffset();
        }
    }

消费者实现又分为DefaultMQPullConsumerImplDefaultMQPushConsumerImpl,但两者的实现一致:都是先确定当前消费者的状态是否处于RUNNING状态,然后获取processQueueTable中记录的消费者正在使用中的MessageQueue队列,然后调用位点管理器offsetStorepersistAll方法持久化获取到的MessageQueue队列。

###Pull
    public void persistConsumerOffset() {
        try {
            this.isRunning();
            Set<MessageQueue> mqs = new HashSet<MessageQueue>();
            Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
            mqs.addAll(allocateMq);
            this.offsetStore.persistAll(mqs);
        } catch (Exception e) {
            log.error("group: " + this.defaultMQPullConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
        }
    }

###Push
    public void persistConsumerOffset() {
        try {
            this.makeSureStateOK();
            Set<MessageQueue> mqs = new HashSet<MessageQueue>();
            Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
            mqs.addAll(allocateMq);

            this.offsetStore.persistAll(mqs);
        } catch (Exception e) {
            log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
        }
    }
  • 集群模式下使用的是远程位点管理器-RemoteBrokerOffsetStore,通过遍历位点管理器中消费端本地内存保存的位点信息offsetTableMessageQueueConsumeOffset的映射),找到那些消费位点不为空且消费者在使用的消费队列和消费位点,通过oneWay形式发送RequestCodeUPDATE_CONSUMER_OFFSET的请求更新Broker该MessageQueue的消费位点,消费者不使用的消息队列则将其在offsetTable中移除。

        public void persistAll(Set<MessageQueue> mqs) {
            if (null == mqs || mqs.isEmpty())
                return;
            // 不是在使用的MessageQueue
            final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
            // 遍历offsetTable
            for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
                MessageQueue mq = entry.getKey();
                AtomicLong offset = entry.getValue();
                // 如果offset不为空,且mq是在使用中的
                if (offset != null) {
                    if (mqs.contains(mq)) {
                        try {
                            // 更新Broker的消费位点
                            this.updateConsumeOffsetToBroker(mq, offset.get());
                            log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
                                this.groupName,
                                this.mQClientFactory.getClientId(),
                                mq,
                                offset.get());
                        } catch (Exception e) {
                            log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
                        }
                    } else {
                        // 不包含则添加进unusedMQ
                        unusedMQ.add(mq);
                    }
                }
            }
            // 移除那些不使用的消息队列
            if (!unusedMQ.isEmpty()) {
                for (MessageQueue mq : unusedMQ) {
                    this.offsetTable.remove(mq);
                    log.info("remove unused mq, {}, {}", mq, this.groupName);
                }
            }
        }
    
        /**
         * Update the Consumer Offset in one way, once the Master is off, updated to Slave, here need to be optimized.
         */
        private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException,
            MQBrokerException, InterruptedException, MQClientException {
            updateConsumeOffsetToBroker(mq, offset, true);
        }
    
        /**
         * Update the Consumer Offset synchronously, once the Master is off, updated to Slave, here need to be optimized.
         */
        @Override
        public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
            MQBrokerException, InterruptedException, MQClientException {
            // 从brokerAddrTable查找消息队列对应的broker信息
            FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
            if (null == findBrokerResult) {
                // 找不到broker信息则尝试更新后再次获取
                this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
                findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
            }
    
            if (findBrokerResult != null) {
                UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
                requestHeader.setTopic(mq.getTopic());
                requestHeader.setConsumerGroup(this.groupName);
                requestHeader.setQueueId(mq.getQueueId());
                requestHeader.setCommitOffset(offset);
    
                if (isOneway) {
                    this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
                        findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
                } else {
                    this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
                        findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
                }
            } else {
                throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
            }
        }
    

    而在Broker服务端,Broker启动时会注册ConsumerManageProcessor作为消费进度处理器:this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);,处理请求编码RequestCodeGET_CONSUMER_LIST_BY_GROUPUPDATE_CONSUMER_OFFSETQUERY_CONSUMER_OFFSET的请求:

        public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
            throws RemotingCommandException {
            switch (request.getCode()) {
                // 根据消费者组获取消费者组下的消费者列表
                case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
                    return this.getConsumerListByGroup(ctx, request);
                    // 更新消费位点
                case RequestCode.UPDATE_CONSUMER_OFFSET:
                    return this.updateConsumerOffset(ctx, request);
                    // 查询消费位点
                case RequestCode.QUERY_CONSUMER_OFFSET:
                    return this.queryConsumerOffset(ctx, request);
                default:
                    break;
            }
            return null;
        }
    

    ConsumerManagerProcessor#updateConsumerOffset方法实际上是通过更新Broker的消费位点管理器ConsumerOffsetManager中保存管理的topic消费进度offsetTableprivate ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer/* queueId */, Long/* offset */>> offsetTable = new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);,以topic + @ + consumerGroup作为key保存topic在该消费者组下的每个消费队列queueId的消费进度offset

        private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
            throws RemotingCommandException {
            final RemotingCommand response =
                RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);
            final UpdateConsumerOffsetRequestHeader requestHeader =
                (UpdateConsumerOffsetRequestHeader) request
                    .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
            this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(),
                requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
            return response;
        }
    
        public void commitOffset(final String clientHost, final String group, final String topic, final int queueId,
            final long offset) {
            // topic@group
            String key = topic + TOPIC_GROUP_SEPARATOR + group;
            this.commitOffset(clientHost, key, queueId, offset);
        }
    
        private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
            ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
            if (null == map) {
                map = new ConcurrentHashMap<Integer, Long>(32);
                map.put(queueId, offset);
                this.offsetTable.put(key, map);
            } else {
                Long storeOffset = map.put(queueId, offset);
                if (storeOffset != null && offset < storeOffset) {
                    log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
                }
            }
        }
    

    当然Broker这边也需要持久化offsetTable才能尽可能的保证消费进度不丢失,Broker有以下三种方式持久化消费位点:

    1. 定时任务:Broker启动时会启动定时ConsumerOffsetManager持久化,将消费位点信息保存在Broker本地文件中,延迟10000ms执行,每隔flushConsumerOffsetInterval默认5000ms执行该任务,源码在org.apache.rocketmq.broker.BrokerController.initialize方法中:

          // 定时持久化任务
          this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
              try {
                BrokerController.this.consumerOffsetManager.persist();
              } catch (Throwable e) {
                log.error("schedule persist consumerOffset error.", e);
              }
            }
          }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
      
    2. 关闭持久化:Broker在关闭shutdown时会调用ConsumerOffsetManagerpersist方法持久化

    3. 主从配置数据同步:Slave会启动slaveSynchronize同步服务,每隔10000ms同步Master Broker的TopicConfig、ConsumerOffset、DelayOffset、subscriptionGroupConfig数据。Slave会向Master Broker请求获取Master Broker中的所有消费位点信息,并将Master Broker的所有消费位点信息覆盖到Slave中,然后持久化

          private void handleSlaveSynchronize(BrokerRole role) {
              if (role == BrokerRole.SLAVE) {
                  if (null != slaveSyncFuture) {
                      // 取消正在运行的slave同步任务,不强制中断
                      slaveSyncFuture.cancel(false);
                  }
                  this.slaveSynchronize.setMasterAddr(null);
                  slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                      @Override
                      public void run() {
                          try {
                              BrokerController.this.slaveSynchronize.syncAll();
                          }
                          catch (Throwable e) {
                              log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
                          }
                      }
                  }, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
              } else {
                  //handle the slave synchronise
                  if (null != slaveSyncFuture) {
                      slaveSyncFuture.cancel(false);
                  }
                  this.slaveSynchronize.setMasterAddr(null);
              }
          }
      
          public void syncAll() {
              // 同步Topic配置
              this.syncTopicConfig();
              // 同步消费者位点
              this.syncConsumerOffset();
              // 同步延迟位点
              this.syncDelayOffset();
              // 同步订阅关系配置
              this.syncSubscriptionGroupConfig();
          }
      
          private void syncConsumerOffset() {
              String masterAddrBak = this.masterAddr;
              // 自己不是master
              if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
                  try {
                      // 向Master Broker请求获取Master Broker的消费位点信息
                      ConsumerOffsetSerializeWrapper offsetWrapper =
                          this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
                      // 直接覆盖
                      this.brokerController.getConsumerOffsetManager().getOffsetTable()
                          .putAll(offsetWrapper.getOffsetTable());
                      // 持久化
                      this.brokerController.getConsumerOffsetManager().persist();
                      log.info("Update slave consumer offset from master, {}", masterAddrBak);
                  } catch (Exception e) {
                      log.error("SyncConsumerOffset Exception, {}", masterAddrBak, e);
                  }
              }
          }
      
  • 广播模式下则是使用的是本地位点管理器-LocalFileOffsetStore,通过遍历offsetTable,将所有位点信息保存在OffsetSerializeWrapper中,并转换为json保存在文件中。

        public void persistAll(Set<MessageQueue> mqs) {
            if (null == mqs || mqs.isEmpty())
                return;
            // 将所有位点信息保存在OffsetSerializeWrapper中,并转换为json保存在文件中
            OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
            for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
                if (mqs.contains(entry.getKey())) {
                    AtomicLong offset = entry.getValue();
                    offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
                }
            }
    
            String jsonString = offsetSerializeWrapper.toJson(true);
            if (jsonString != null) {
                try {
                    MixAll.string2File(jsonString, this.storePath);
                } catch (IOException e) {
                    log.error("persistAll consumer offset Exception, " + this.storePath, e);
                }
            }
        }
    

不定时持久化

不定时持久化又叫Pull-And-Commit,也就是在执行pull方法的同时,把队列最新消费位点信息发给Broker。具体实现在*org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.pullMessage()*方法中。

该方法有两处持久化位点信息:

  1. 在消息拉去完成返回,如果拉取位点非法,则此时客户端会主动根据返回的offset更新本地消费,并且提交最新的消费位点信息给Broker,移除该MessageQueue的位点、ProcessQueue等信息:

        case OFFSET_ILLEGAL:
        log.warn("the pull request offset illegal, {} {}",
                 pullRequest.toString(), pullResult.toString());
        pullRequest.setNextOffset(pullResult.getNextBeginOffset());
    
        pullRequest.getProcessQueue().setDropped(true);
        DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
    
          @Override
          public void run() {
            try {
              // 更新offsetTable messageQueue的位点新
              DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                                                      pullRequest.getNextOffset(), false);
              // 持久化位点信息 LocalFile/Broker
              DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
              // 移除processQueueTable中messageQueue的processQueue,设置dropped=true,并且再次持久化位点信息、移除offsetTable的mq的位点信息
              DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
    
              log.warn("fix the pull request offset, {}", pullRequest);
            } catch (Throwable e) {
              log.error("executeTaskLater Exception", e);
            }
          }
        }, 10000);
        break;
    
  2. 在消费者执行拉取消息动作之前,如果是集群消费模式(对应的offsetStoreRemoteBrokerOffsetStore),则会尝试获取消费者内存中是否有该消息队列的消费位点commitOffsetValue,如果有即commitOffsetValue > 0,则commitOffsetEnable = true,并且吧消费者本地最新的offset-commitOffsetValue传递给Broker。

    boolean commitOffsetEnable = false;
            long commitOffsetValue = 0L;
            // 集群模式下,从消费者客户端内存offsetTable中读取messageQueue的消费位点,如果找到的commitOffsetValue大于0表示有消费位点记录
            // commitOffsetEnable = true
            if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
                commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
                if (commitOffsetValue > 0) {
                    commitOffsetEnable = true;
                }
            }
    
            String subExpression = null;
            boolean classFilter = false;
            SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
            if (sd != null) {
                if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
                    subExpression = sd.getSubString();
                }
    
                classFilter = sd.isClassFilterMode();
            }
    
            int sysFlag = PullSysFlag.buildSysFlag(
                    commitOffsetEnable, // commitOffset
                    true, // suspend
                    subExpression != null, // subscription
                    classFilter // class filter
            );
            try {
                this.pullAPIWrapper.pullKernelImpl(
                        pullRequest.getMessageQueue(),
                        subExpression,
                        subscriptionData.getExpressionType(),
                        subscriptionData.getSubVersion(),
                        pullRequest.getNextOffset(),
                        this.defaultMQPushConsumer.getPullBatchSize(),
                        sysFlag,
                        commitOffsetValue,
                        BROKER_SUSPEND_MAX_TIME_MILLIS,
                        CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
                        CommunicationMode.ASYNC,
                        pullCallback
                );
            } catch (Exception e) {
                log.error("pullKernelImpl exception", e);
                this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
            }
    

    Broker通过传入的sysFlag判断是否包含FLAG_COMMIT_OFFSET标志位,判断Broker是否根据commitOffsetValue更新当前消费队列的消费位点:

        private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
          throws RemotingCommandException {
    			…………        
          // 是否有commitOffset标志,Broker是否允许更新对应的消费位点
          final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());
          …………
                  // brokerAllowSuspend表示Broker是否能挂起,如果Broker是挂起状态,brokerAllowSuspend=false;
            boolean storeOffsetEnable = brokerAllowSuspend;
            storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
            storeOffsetEnable = storeOffsetEnable
                && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
            if (storeOffsetEnable) {
                // 更新该消费者组订阅的topic的queueId消费队列的消费位点
                this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
                    requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
            }
            return response;
        }
    
        public void commitOffset(final String clientHost, final String group, final String topic, final int queueId,
            final long offset) {
            // topic@group
            String key = topic + TOPIC_GROUP_SEPARATOR + group;
            this.commitOffset(clientHost, key, queueId, offset);
        }
    
        private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
            ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
            if (null == map) {
                map = new ConcurrentHashMap<Integer, Long>(32);
                map.put(queueId, offset);
                this.offsetTable.put(key, map);
            } else {
                Long storeOffset = map.put(queueId, offset);
                if (storeOffset != null && offset < storeOffset) {
                    log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
                }
            }
        }
    

    只有当Broker不是挂起状态(brokerAllowSuspend = true)、包含FLAG_COMMIT_OFFSET标志位和当前Broker不是Slave,才允许更新当前Broker中记录的消费队列的消费位点。

消费者关闭

在消费者关闭前,需要保存各种状态,以便在启动后恢复数据,其中包含消费者的消费进度信息,Push模式和Pull模式的shutdown流程大体一致(只是Push模式多个了ConsumeMessageService监听器的shutdown,这是Push模式独有的),都是通过调用this.persistConsumerOffset()方法,该方法也被用于消费者的定时持久化消费位点,上面有解析不再赘述:

    public void shutdown() {
      shutdown(0);
    }

    public synchronized void shutdown(long awaitTerminateMillis) {
      switch (this.serviceState) {
        case CREATE_JUST:
          break;
        case RUNNING:
          // shutdown consumeMessageService监听器
          this.consumeMessageService.shutdown(awaitTerminateMillis);
          // 持久化消费位点
          this.persistConsumerOffset();
          // 取消注册消费者
          this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
          this.mQClientFactory.shutdown();
          log.info("the consumer [{}] shutdown OK", this.defaultMQPushConsumer.getConsumerGroup());
          this.rebalanceImpl.destroy();
          this.serviceState = ServiceState.SHUTDOWN_ALREADY;
          break;
        case SHUTDOWN_ALREADY:
          break;
        default:
          break;
      }
    }
举报

相关推荐

0 条评论