0
点赞
收藏
分享

微信扫一扫

RocketMQ 保证消息不丢失

酷子腿长一米八 2021-09-30 阅读 100
RocketMQ

消息从生产到消费,一共经历三个阶段:

  • 生产:Producer创建消息,发送至Broker

  • 存储:Broker将受到的消息存储到磁盘中

  • 消费:Consumer从Broker拉取消息

要保证消息不丢失就需要解决这三个阶段的消息丢失

示意图如下

生产阶段

生产者只要接收到返回的ack,就代表这个阶段的消息未丢失。

生产者通过网络将消息发送到Broker,然后等待Broker响应ack,此时的网络是不可靠的,极有可能导致消息发不出去,或者Broker在ack时网络故障导致生产者收不到ack

这个阶段有三种发送消息方式:

同步:同步发送消息的时候就会阻塞并等待Broker返回ack

异步:异步发送消息,然后在回调函数中得知Broker是否ack

单向:单向发送消息,只管发送,不管结果,因此无法保证消息不丢失

Broker返回的ack状态如下:

  • SendStatus.SEND_OK:发送成功

  • SendStatus.FLUSH_DISK_TIMEOUT:消息发送成功,但刷盘超时

  • SendStatus.FLUSH_SLAVE_TIMEOUT:消息发送成功,但同步到Slave超时

  • SendStatus.SLAVE_NOT_AVAILABLE:消息发送成功,但此时Slave不可用

发送消息如果失败或者超时,Producer的send方法支持自动重试,默认重试2次,可以通过api修改

// 设置同步重试次数
producer.setRetryTimesWhenSendFailed(3);

// 设置异步重试次数
producer.setRetryTimesWhenSendAsyncFailed(3);

另一种情况,Broker宕机了,一般生产的Broker是集群部署,有多个master和多个slave节点,当消息发送到某个节点的Broker上,然后宕机,producer收到响应失败,会自动重试。

存储阶段

Broker收到消息后是先存储在内存中的,然后再持久化到磁盘,Broker刚收到Producer消息存储在内存中,然后发生宕机,就会导致消息丢失

RocketMQ的持久化消息有两种方式:

同步刷盘:Broker收到消息后会在持久化到磁盘完成后才发送ack

异步刷盘:Broker收到消息存到内存后返回ack,然后Broker定期将一组消息持久化到磁盘

默认是异步刷盘,要保证存储阶段不丢失消息,可以修改为同步刷盘,即确保消息持久化后再ack

# 默认是:ASYNC_FLUSH,异步刷盘
flushDiskType = SYNC_FLUSH 

即使使用了同步刷盘,但是Broker刷盘后,磁盘坏了,也会导致消息丢失,不过这种几率应该比较小。

解决方法就是:不仅同步刷盘,并且保证主从同步后,再ack

master端

# 设置同步刷盘才返回ack给producer
flushDiskType = SYNC_FLUSH
# 设置同步消息给salve
brokerRole = SYNC_MASTER

slave端

# 角色为salve
brokerRole = slave
# 设置同步刷盘才返回ack给master
flushDiskType = SYNC_FLUSH

消费阶段

在消费时失败了也会导致消息丢失,这个阶段采用重试也可以解决消息不丢失。

所以必须在业务逻辑完成再返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS

否则,返回ConsumeConcurrentlyStatus.RECONSUME_LATER,稍后重试即可

举报

相关推荐

0 条评论