0
点赞
收藏
分享

微信扫一扫

#yyds干货盘点#RocketMQ顺序消息之消息消费

RocketMQ顺序消息之消息消费

RocketMQ消费模式:集群消费和广播消费

  • 广播消费:每条消息会被ConsumerGroup的每个Consumer消费
  • 集群消费:每条消息只会被ConsumerGroup的一个Consumer消费

顺序消费的原理是同一消息队列只允许Consumer中的一个消费线程拉取消费。Consumer中有个消费线程池,多个线程同时消费消息。在顺序消费的场景下消费线程请求到Broker时会先申请独占锁,获得锁请求允许消费

ConsumeMessageOrderlyService的内部类ConsumeRequest:

class ConsumeRequest implements Runnable {
    private final ProcessQueue processQueue;
    private final MessageQueue messageQueue;

    public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
        this.processQueue = processQueue;
        this.messageQueue = messageQueue;
    }

    public ProcessQueue getProcessQueue() {
        return processQueue;
    }

    public MessageQueue getMessageQueue() {
        return messageQueue;
    }

    @Override
    public void run() {
        if (this.processQueue.isDropped()) {
            log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
            return;
        }

        final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
        synchronized (objLock) {
            if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
                final long beginTime = System.currentTimeMillis();
                for (boolean continueConsume = true; continueConsume; ) {
                    if (this.processQueue.isDropped()) {
                        log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                        break;
                    }

                    if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                        && !this.processQueue.isLocked()) {
                        log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
                        ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                        break;
                    }

                    if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                        && this.processQueue.isLockExpired()) {
                        log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
                        ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                        break;
                    }

                    long interval = System.currentTimeMillis() - beginTime;
                    if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
                        ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
                        break;
                    }

                    final int consumeBatchSize =
                        ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();

                    List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
                    if (!msgs.isEmpty()) {
                        final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);

                        ConsumeOrderlyStatus status = null;

                        ConsumeMessageContext consumeMessageContext = null;
                        if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                            consumeMessageContext = new ConsumeMessageContext();
                            consumeMessageContext
                                .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
                            consumeMessageContext.setMq(messageQueue);
                            consumeMessageContext.setMsgList(msgs);
                            consumeMessageContext.setSuccess(false);
                            // init the consume context type
                            consumeMessageContext.setProps(new HashMap<String, String>());
                            ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
                        }

                        long beginTimestamp = System.currentTimeMillis();
                        ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                        boolean hasException = false;
                        try {
                            this.processQueue.getLockConsume().lock();
                            if (this.processQueue.isDropped()) {
                                log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                    this.messageQueue);
                                break;
                            }

                            status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                        } catch (Throwable e) {
                            log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                                RemotingHelper.exceptionSimpleDesc(e),
                                ConsumeMessageOrderlyService.this.consumerGroup,
                                msgs,
                                messageQueue);
                            hasException = true;
                        } finally {
                            this.processQueue.getLockConsume().unlock();
                        }

                        if (null == status
                            || ConsumeOrderlyStatus.ROLLBACK == status
                            || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
                            log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",
                                ConsumeMessageOrderlyService.this.consumerGroup,
                                msgs,
                                messageQueue);
                        }

                        long consumeRT = System.currentTimeMillis() - beginTimestamp;
                        if (null == status) {
                            if (hasException) {
                                returnType = ConsumeReturnType.EXCEPTION;
                            } else {
                                returnType = ConsumeReturnType.RETURNNULL;
                            }
                        } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
                            returnType = ConsumeReturnType.TIME_OUT;
                        } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
                            returnType = ConsumeReturnType.FAILED;
                        } else if (ConsumeOrderlyStatus.SUCCESS == status) {
                            returnType = ConsumeReturnType.SUCCESS;
                        }

                        if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                            consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
                        }

                        if (null == status) {
                            status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                        }

                        if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                            consumeMessageContext.setStatus(status.toString());
                            consumeMessageContext
                                .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
                            ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
                        }

                        ConsumeMessageOrderlyService.this.getConsumerStatsManager()
                            .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

                        continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                    } else {
                        continueConsume = false;
                    }
                }
            } else {
                if (this.processQueue.isDropped()) {
                    log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                    return;
                }

                ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
            }
        }
    }

}

消息消费成功后,会向Broker提交消费进度,更新消费位点信息,避免下次拉取到已消费的消息。

顺序消费中如果消费线程在监听器中进行业务处理时抛出异常,则不会提交消费进度,消费进度会阻塞在当前消息,并不会继续消费该队列中后续的消息,从而保证顺序消费。

在顺序消费的场景下,如果重试失败,会一直阻塞当前消息,直到超出最大重试次数,从而在很长一段时间内无法消费后续消息造成队列消息的堆积

并发消费原理是同一个消息队列提供给Consumer中的多个消费线程拉取消费。Consumer中会维护一个消息线程池,多个消费线程可以并发去同一个消息队列中拉取进行消费。如果某个消费线程在监听器中进行业务处理时抛出异常,当前消费线程拉取的消息会进行重试,不影响其他消费线程和消息队列的消费进度

并发消费没有资源争抢上锁的过程,消费消息的速度比顺序消息快

举报

相关推荐

RocketMQ 顺序消费消息

RocketMQ顺序消息

0 条评论