0
点赞
收藏
分享

微信扫一扫

RocketMQ(十一)——消费的幂等性

李雨喵 2022-01-10 阅读 100

消费幂等

定义

当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这个消费过程就是消费幂等的

  • 幂等:若某操作执行多次与执行一次对系统产生的影响是相同的,则称该操作是幂等的。

在互联网应用中,尤其在网络不稳定的情况下,消息很有可能会出现重复发送或重复消费。如果重复的消息可能会影响业务处理,那么就应该对消息做幂等处理。

常见消息重复的几种情况

发送消息时重复

当一条消息已被成功发送到Broker并完成持久化,此时出现了网络波动,从而导致Broker对Producer应答失败。 如果此时Producer意识到消息发送失败并尝试再次发送消息,此时Broker中就可能会出现两条内容相同并且Message ID也相同的消息,那么后续Consumer就一定会消费两次该消息。

消费时消息重复

消息已投递到Consumer并完成业务处理,当Consumer给Broker反馈应答时网络波动,Broker没有接收到消费成功响应。为了保证消息至少被消费一次的原则,Broker将在网络恢复后再次尝试投递之前已被处理过的消息。此时消费者就会收到与之前处理过的内容相同、Message ID也相同的消息。

Rebalance时消息重复

当Consumer Group中的Consumer数量发生变化时,或其订阅的Topic的Queue数量发生变化时,会触发Rebalance,此时Consumer可能会收到曾经被消费过的消息。

解决方案

  • 幂等令牌:生产者和消费者两者中的既定协议,通常指具备唯一业务标识的字符串。
  • 唯一性处理:服务端通过采用一定的算法策略,保证同一个业务逻辑不会被重复执行成功多次

方案

  1. 通过缓存去重,在缓存中如果已经存在某幂等令牌,则说明本次操作是重复性操作,若缓存没有命中,则进入下一步
  2. 唯一性处理前在数据库中查询幂等令牌作为索引的数据是否存在。若存在,则说明本次操作为重复性操作,若不存在,执行下一步骤
  3. 唯一性处理后,将幂等令牌写入到缓存,并将幂等令牌作为唯一索引的数据写入到数据库中。

消费幂等的实现

为消息指定不会重复的唯一标识。因为Message ID有可能出现重复的情况,所以真正安全的幂等处理,不建议以Message ID作为处理依据。最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息Key设置。

以支付场景为例,可以将消息的Key设置为订单号,作为幂等处理的依据。具体代码示例如下:

Message message = new Message();
message.setKey("testdemo_98034");
SendResult sendResult = producer.send(message);

消费者收到消息时可以根据消息的Key即订单号来实现消费幂等:

consumer.registerMessageListener(new MessageListenerConcurrently() { 
    @Override 
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for(MessageExt msg:msgs){
            String key = msg.getKeys(); 
            // 根据业务唯一标识Key做幂等处理 // …… 
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 
    } 
});
举报

相关推荐

0 条评论