@Service
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group}", topic = "${rocketmq.consumer.topic}")
public class MsgListener implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {
private org.slf4j.Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void onMessage(MessageExt msg) {
logger.debug("RECEIVE_MSG_BEGIN: " + msg.toString());
logger.debug(String.format("消费消息,消息ID:%s,消息KEY:%s,消息体:%s ", msg.getMsgId(), msg.getKeys(), new String(msg.getBody())));
}
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
consumer.setInstanceName("testTopic-tag1");
}
}
原因
多消费组实例的场景下,只配置了一个通用的name server配置,导致有些消费组
consumer.setInstanceName("testTopic-tag1");正常连接到b-name server,但是使用默认配置连到了a-name server上面,导致关系错乱,无法正常消费消息
consumer.setInstanceName("testTopic-tag2");也是被错误的连接到了a-name server
在初始化的时候
问题排查思路
RocketMQ中,如果不同消费组消费同一个Topic,理论上每个消费组应该只消费该Topic的消息一次。然而,确实有可能出现某个消费组偶尔消费不到消息的情况,这可能是由以下原因导致的:
- 消费分组不正确:
确保你在创建消费者时指定了正确的消费组名称,并且这个消费组已经订阅了要消费的Topic。 - 消息过滤或Tag匹配问题:
如果消费者使用了消息过滤或者Tag匹配,那么只有满足过滤条件或者Tag匹配的消息才会被消费。检查你的消费代码,确保过滤条件和Tag匹配设置正确。 - 消费线程数不足或阻塞:
如果消费者的消费线程数不足或者消费过程中出现了阻塞,可能会导致部分消息未能及时消费。检查消费者的配置和日志,确保消费线程数足够并且没有异常情况。 - 消息堆积:
如果Broker中的消息积压严重,新产生的消息可能会被延迟投递,从而导致某个消费组暂时消费不到消息。 - 网络问题或 Broker 故障:
网络波动、Broker重启或者其他故障可能会影响消息的正常投递和消费。 - 消费位点问题:
消费者的消费位点(即消费进度)可能存在异常,导致某些消息未被正确消费。检查消费者的消费位点信息,确保其与实际消费进度一致。 - 系统负载过高:
如果系统负载过高,包括CPU、内存或者磁盘I/O等资源紧张,可能会影响RocketMQ的正常运行和消息投递。
为了解决这个问题,你可以按照以下步骤进行排查和处理:
- 确认消费组的配置和订阅关系是否正确。
- 检查消费者的消费代码,特别是消息过滤和Tag匹配的部分。
- 调整消费者的消费线程数和消费参数,以适应实际的负载和性能需求。
- 监控Broker的状态和网络连接,确保其正常运行。
本篇文章如有帮助到您,请给「翎野君」点个赞,感谢您的支持。