0
点赞
收藏
分享

微信扫一扫

RocketMQ原理学习--死信消息实现原理


        上一篇博客​​《RocketMQ原理学习--失败消息实现原理》​​中我们了解到RocketMQ对于失败消息的处理原理,当消息一直失败的情况下RocketMQ是如何处理的,这篇博客我们通过分析源码简单了解一下。

        RocketMQ对于失败次数超过16次的消息设置为死信消息,消息最终被放到DLQ死信队列中,需要人工进行干预处理。处理代码还是在SendMessageProcessor的consumerSendMsgBack方法中,简单来说就是判断重试次数超过16或者延时级别小于0,则将消息设置为新的死信topic为:%DLQ%+consumerGroup

private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final ConsumerSendMsgBackRequestHeader requestHeader =
(ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);

//省略部分代码

//最大重试次数为16
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
//死信队列 %DLQ%+consumerGroup
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;

topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE, 0
);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return response;
}
} else {
if (0 == delayLevel) {
delayLevel = 3 + msgExt.getReconsumeTimes();
}

msgExt.setDelayTimeLevel(delayLevel);
}

MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(newTopic);
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));

msgInner.setQueueId(queueIdInt);
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);

String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);

//消息被持久化到死信队列中
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
//省略部分代码
return response;
}

总结:死信消息需要人为进行处理干预,可以通过RocketMQ控制台等重新发送

举报

相关推荐

0 条评论