0
点赞
收藏
分享

微信扫一扫

消息中间件 RabbitMQ 之 发布高级确认 详解

爱薇Ivy趣闻 2022-04-28 阅读 128

8. 发布高级确认

于是,我们开始思考,如何才能进行RabbitMQ的消息可靠投递呢?特别是在这样比较极端的情况下,RabbitMQ集群不可用的时候,无法投递的消息该如何处理呢?

文章目录

8.1 发布确认SpringBoot版本


8.1.1 发布确认方案


[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xDO2fxaX-1651076846401)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20220427131623300.png)]

  • 当交换机不存在时,生产者发送的消息会直接丢失,当队列不存在时,存放在队列中的消息缓存也会被清除。

  • 所以,交换机和队列只要有一个不在了,那么消息势必就会丢失

具体方案:在生产者发送消息时应当使用缓存将消息存放,并使用定时任务将未成功发送的消息进行重新投递

8.1.2 代码架构图


[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BFjM6dQW-1651076846402)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20220427132804891.png)]

消息生产者将消息发送给一个直接类型的交换机,一旦交换机接收不到消息,会将消息放进缓存中

8.1.3 配置文件


在配置文件application.properties中需要添加:

#开启发布确认 发布成功后触发回调方法
spring.rabbitmq.publisher-confirm-type=correlated
  • none

    禁用发布确认模式,是默认值

  • correlated

    发布消息成功到交换机后会触发回调方法

  • simple (类似单个确认发布)

    两种效果

    其一效果和correlated模式一样

    其二,在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法,等待broker节点返回发送结果,根据返回结果判断下一步的逻辑

    要注意的点是:waitForConfirmsOrDie方法如果返回false则会关闭channel,接下来无法发送消息到broker

8.1.4 添加配置类


package com.example.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author 且听风吟
 * @version 1.0
 * @description: 发布确认(高级) 配置类
 * @date 2022/4/27 0027 16:06
 */
@Configuration
public class ConfirmConfig {

    /**
     * 交换机
     */
    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
    /**
     * 队列
     */
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
    /**
     * RoutingKey
     */
    public static final String CONFIRM_ROUTING_KEY = "key1";

    /**
     * 声名交换机
     */
    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }

    /**
     * 声名队列
     */
    @Bean("confirmQueue")
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    /**
     * 绑定 将交换机和队列通过routingKey绑定
     * @param confirmQueue 队列
     * @param confirmExchange 交换机
     */
    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,
                                        @Qualifier("confirmExchange") DirectExchange confirmExchange){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    }

}

8.1.5 消息生产者


package com.example.rabbitmq.controller;

import com.example.rabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author 且听风吟
 * @version 1.0
 * @description: 发布确认高级 发送消息
 * @date 2022/4/27 0027 16:50
 */
@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发正常消息
     * @param message 正常消息
     */
    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message){
        //回调接口内使用
        CorrelationData correlationData = new CorrelationData("1");

        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData);
        log.info("发送消息内容:{}",message);
    }

}

8.1.6 消息消费者


package com.example.rabbitmq.consumer;

import com.example.rabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author 且听风吟
 * @version 1.0
 * @description: 接收消息 确认高级消息
 * @date 2022/4/27 0027 16:57
 */
@Slf4j
@Component
public class Consumer {

    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
    public void receiveConfirmMessage(Message message){
        String msg = new String(message.getBody());
        log.info("接收到的队列confirm.queue消息:{}",msg);
    }

}

8.1.7 回调接口


package com.example.rabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * @author 且听风吟
 * @version 1.0
 * @description: 发布确认高级 消息回调接口
 * @date 2022/4/27 0027 17:22
 */
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 注入  @PostConstruct: Java注解,服务器启动时执行
     * 初始化执行顺序:Constructor(构造方法) -> @Autowired(依赖注入) -> @PostConstruct(注释的方法)
     */
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * 交换及确认回调方法
     * 该方法调用时机:发了消息 并且 交换机接收到了回调 
     * @param correlationData 保存回调消息的id及相关信息 (发消息的时候自己填写的)
     * @param b 交换机是否收到消息 true & false
     * @param s 回调失败的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        String id = correlationData != null ? correlationData.getId() : " ";
        if (b){
            log.info("交换机已经接收到id为:{}的消息",id);
        }else {
            log.info("交换机还未接收到id为 {} 的消息,由于原因:{}",id,s);
        }
    }

}

8.1.8 结果分析


浏览器发送请求: http://localhost:8080/confirm/sendMsg/hello

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-r52Is6Zt-1651076846402)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20220427203603527.png)]

可以查看到交换机接收消息的情况,方便后续进行处理

8.2 回退消息


8.2.1 Mandatory 参数


  • 在仅开启了生产者确认消息的情况下,交换机接收到消息后,会直接给消息生产者确认消息,如果发现该消息不可 路由(交换机将消息发给队列),那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的

那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,可以自己处理。通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。

在application.properties中添加:

#开启消息回退 路由不出去的消息会回退给生产者
spring.rabbitmq.publisher-returns=true

开启消息回退,路由不到队列中的消息会回退给生产者

8.2.2 消息生产者代码


添加一个错误的 routingKey 的发送消息代码,模拟消息路由到队列失败的情况

package com.example.rabbitmq.controller;

import com.example.rabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author 且听风吟
 * @version 1.0
 * @description: 发布确认高级 发送消息
 * @date 2022/4/27 0027 16:50
 */
@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发正常消息
     * @param message 正常消息
     */
    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message){
        //回调接口内使用
        CorrelationData correlationData = new CorrelationData("1");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData);
        log.info("发送消息内容:{}",message);

        CorrelationData correlationData2 = new CorrelationData("2");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_ROUTING_KEY+"2", message,correlationData2);
        log.info("发送消息内容:{}",message);
    }

}

8.2.3 配置类代码


package com.example.rabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * @author 且听风吟
 * @version 1.0
 * @description: 发布确认高级 消息回调接口
 * @date 2022/4/27 0027 17:22
 */
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 注入  @PostConstruct: Java注解,服务器启动时执行
     * 初始化执行顺序:Constructor(构造方法) -> @Autowired(依赖注入) -> @PostConstruct(注释的方法)
     */
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    /**
     * 交换及确认回调方法
     * 该方法调用时机:发了消息 并且 交换机接收到了回调
     * @param correlationData 保存回调消息的id及相关信息(发消息的时候自己填写的)
     * @param b 交换机是否收到消息 true & false
     * @param s 回调失败的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        String id = correlationData != null ? correlationData.getId() : " ";
        if (b){
            log.info("交换机已经接收到id为:{}的消息",id);
        }else {
            log.info("交换机还未接收到id为 {} 的消息,由于原因:{}",id,s);
        }
    }

    /**
     * 消息 无法路由(到不了队列)时的回退方法  路由:消息由交换机发送到队列
     */
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.info("消息 {},被交换机 {} 退回,退回原因{},路由key:{}",
                returnedMessage.getMessage().getBody(),returnedMessage.getExchange(),
                returnedMessage.getReplyText(),returnedMessage.getRoutingKey());
    }
}

消息消费者代码没有改变

8.2.4 结果分析


浏览器发送请求: http://localhost:8080/confirm/sendMsg/hello

控制台显示结果:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mAXwmoQ0-1651076846403)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20220427220104245.png)]

可以看到无法路由到队列中的消息会被回退,不会造成消息的丢失。

8.3 备份交换机


8.3.1 代码架构图


[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1eS34HmM-1651076846405)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20220427221018786.png)]

对于消息无法路由到队列的情况,还有另一种解决方法,就是添加备份交换机,在备份交换机后可以添加 备份队列 和 报警队列

需要在代码中添加一个备份交换机、一个备份队列、一个报警队列、一个消费者,并将上面的确认交换机指向备份交换机,将备份交换机和两个队列绑定

8.3.2 修改配置类


package com.example.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author 且听风吟
 * @version 1.0
 * @description: 发布确认(高级) 配置类
 * @date 2022/4/27 0027 16:06
 */
@Configuration
public class ConfirmConfig {

    /**
     * 交换机
     */
    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
    /**
     * 队列
     */
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
    /**
     * RoutingKey
     */
    public static final String CONFIRM_ROUTING_KEY = "key1";
    /**
     * 备份交换机
     */
    public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
    /**
     * 备份队列
     */
    public static final String BACKUP_QUEUE_NAME = "backup.queue";
    /**
     * 报警队列
     */
    public static final String WARNING_QUEUE_NAME = "warning.queue";

    /**
     * 声名交换机
     */
    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("alternate-exchange",BACKUP_EXCHANGE_NAME);
        //确认交换机指向备份交换机  durable:是否持久化
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)
                .withArguments(arguments).build();
    }

    /**
     * 声名队列
     */
    @Bean("confirmQueue")
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    /**
     * 绑定 将交换机和队列通过routingKey绑定
     * @param confirmQueue 队列
     * @param confirmExchange 交换机
     */
    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,
                                        @Qualifier("confirmExchange") DirectExchange confirmExchange){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    }

    /**
     * 声名备份交换机
     * @return
     */
    @Bean("backupExchange")
    public FanoutExchange backupExchange(){
        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    }

    /**
     * 声名备份队列
     */
    @Bean("backupQueue")
    public Queue backupQueue(){
        return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
    }

    /**
     * 声名报警队列
     */
    @Bean("warningQueue")
    public Queue warningQueue(){
        return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
    }

    /**
     * 备份队列绑定备份交换机
     * @param backupQueue 备份队列
     * @param backupExchange 备份交换机
     */
    @Bean
    public Binding backupQueueBinding(@Qualifier("backupQueue") Queue backupQueue,
                                      @Qualifier("backupExchange") FanoutExchange backupExchange){
        return BindingBuilder.bind(backupQueue).to(backupExchange);
    }

    /**
     * 报警队列绑定备份交换机
     * @param warningQueue 报警队列
     * @param backupExchange 备份交换机
     */
    @Bean
    public Binding waringQueueBinding(@Qualifier("warningQueue") Queue warningQueue,
                                      @Qualifier("backupExchange") FanoutExchange backupExchange){
        return BindingBuilder.bind(warningQueue).to(backupExchange);
    }

}

8.3.3 报警消费者代码


package com.example.rabbitmq.consumer;

import com.example.rabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author 且听风吟
 * @version 1.0
 * @description: 报警消费者
 * @date 2022/4/27 0027 23:20
 */
@Slf4j
@Component
public class WarningConsumer {

    @RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)
    public void receiveWarningMsg(Message message){
        String msg = new String(message.getBody());
        log.error("报警发现不可路由消息:{}",msg);
    }

}

8.3.4 测试注意事项


由于改变了之前的交换机代码,所以需要先在RabbitMQ后台管理界面删除之前的 确认交换机(confirm.exchange)

8.3.5 结果分析


[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gnHFrf1X-1651076846406)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20220428001843374.png)]

上面的结果显示:

  • 回退消息 和 备份交换机 可以一起使用,如果两者同时开启,备份交换机的优先级更高
举报

相关推荐

0 条评论