0
点赞
收藏
分享

微信扫一扫

RocketMQ如何做到消息不丢失?


前面写过关于 Kafka 消息不丢失的的文章,现在写一下 RocketMQ 是如何做到消息不丢失。在 RocketMQ 中同样也经历消息三次传递的过程:

  1. Producer 端发送消息给 Broker 端
  2. Broker 将消息进行并持久化数据
  3. Consumer 端从 Broker 将消息拉取并进行消费

在以上这三步中每一步都可能会出现丢失数据的情况, 那么 RocketMQ 到底在什么情况下才能保证消息不丢失呢?接下来我们来细说一下如何保证这三个阶段不出现问题。

生产阶段

生产阶段就是将消息发送到队列之中。生产者(Producer)通过网络请求将消息发送给消息队列,消息队列接受到之后返回响应给生产者。

导致 Producer 端没有发送消息成功的有以下原因:

  • 网络原因:由于网络抖动导致数据没发到 Broker 端
  • 数据原因:消息体太大超出 Broker 承受范围导致 Broker 拒收消息

RocketMQ有两种常用的消息发送方式:同步发送、异步发送。

同步发送

同步发送时只要 send() 方法没有抛出异常,就可以认为消息发送成功,即消息队列 Broker 成功接受到了消息。

既然是同步发送肯定就比较耗费一些时间,如果你的业务比较注重 RT 那么这种方式就不太适合要求。

异步发送

异步发送消息的方式虽然可以降低消息发送的 RT,但在使用异步发送方式时记得重写 SendCallback 类的两个方法,在 onSuccess() 方法中更新消息的发送状态为发送成功,只要不发生异常且回调了 onSuccess( )方法也可以认为成功发送到了 Broker。

Broker存储阶段

默认的情况下,消息队列为了快速响应,在接受到生产者的请求,将消息保存在内存成功之后,就会立刻返回 ACK 响应给生产者。当服务突然宕机或出现意外就可能存在消息丢失的问题。

RocketMQ 有两种常用的消息刷盘方式:同步刷盘、异步刷盘。

同步刷盘

在返回写成功状态时,消息已经被写入磁盘。

具体流程是,消息写入内存的 PageCache 后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。

异步刷盘

在返回写成功状态时,消息可能只是被写入了内存的 PageCache 中,由于写操作的返回快,且在内存里的消息量积累到一定程度时,才统一触发写磁盘动作。

在遇到消息队列宕机、机器异常断电或者内存硬盘损坏的情况,消息就无法成功持久化到硬盘中,那么消息就会存在丢失的问题。对于这种情况,我们就需要改变 RocketMQ 的刷盘机制,将默认的异步刷盘,修改成同步刷盘。即消息成功保存到硬盘上时才返回给生产者 ACK 响应。

虽然同步刷盘的缺点很明显,但是为了不丢失宝贵的消息这一点损耗是值得的。

消费阶段

消费者拉取消息进行本地业务处理,业务处理完成后再提交 ACK 状态。

还有一点要注意的是,消息队列 RocketMQ 默认允许每条消息最多重试 16 次。如果消息重试 16 次后仍然失败,消息将不再投递。此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

在消息队列 RocketMQ 中,这种无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列被称为死信队列(Dead-Letter Queue)。

死信队列里的消息有效期与正常消息相同,均为3天。3天后会被自动删除。针对这种情况,为了不丢失消息我们需要处理死信队列里的消息。

有消息进入死信队列,意味着某些问题导致消费者无法正常消费消息,因此,通常需要人工介入对其进行特殊处理。排查可疑因素并解决问题后,可以在消息队列 RocketMQ 控制台重新发送该消息让消费者重新消费一次,或者直接让专门的消费者订阅死信队列进行消费。

死信队列名称一般是  %DLQ% + ConsumerGroupName 组成,还有个重试队列名称一般是 %RETRY% + ConsumerGroupName 组成,这些都是 RocketMQ 自动创建的。

举报

相关推荐

0 条评论