0
点赞
收藏
分享

微信扫一扫

RabbitMQ如何保证消息的可靠性传输

zhaoxj0217 2022-02-06 阅读 72

1.使用事务消息
2.使用消息确认机制

发送方确认:
channel设置为confirm模式,每条消息会被分配一个唯一id。消息投递成功,信道会发送ack给生产者,包含了id,回调ConfirmCallback接口。如果发生错误导致消息丢失,发送nack给生产者,回调ReturnCallback接口。
ack和nack只有一个被触发,且只有一次,属于异步触发,可以继续发送消息。

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 注入
     * 直接通过MyCallBack调用confirm方法是调不到的,因为confirm是ConfirmCallback里面的方法,
     * ConfirmCallback是RabbitTemplate内部的接口,所以需要把MyCallBack注入到RabbitTemplate
     */
    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交换机已经收到id为:{}的消息", id);
        } else {
            log.info("交换机还未收到id为:{}的消息,由于原因:{}", id, cause);
        }
    }

	/**
     * 在消息传递的过程中,不可达目的地时,将消息返回给生产者
     * 该方法只有消息传递失败时才会调用
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("消息:{},被交换机:{}退回,退回原因:{},路由key:{}",
        new String(message.getBody()), exchange, replyText, routingKey);
    }

接收方确认:
声明队列时,指定autoAck=false,broker会等待消费者手动返回ack,才会删除消息,否则自动删除!broker的ack没有超时机制,只会判断链接是否断开,如果断开,消息会重新发送。

//消费成功后,是否要自动应答
channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);
举报

相关推荐

0 条评论