8. 发布高级确认
于是,我们开始思考,如何才能进行RabbitMQ的消息可靠投递呢?特别是在这样比较极端的情况下,RabbitMQ集群不可用的时候,无法投递的消息该如何处理呢?
文章目录
8.1 发布确认SpringBoot版本
8.1.1 发布确认方案
-
当交换机不存在时,生产者发送的消息会直接丢失,当队列不存在时,存放在队列中的消息缓存也会被清除。
-
所以,交换机和队列只要有一个不在了,那么消息势必就会丢失
具体方案:在生产者发送消息时应当使用缓存将消息存放,并使用定时任务将未成功发送的消息进行重新投递
8.1.2 代码架构图
消息生产者将消息发送给一个直接类型的交换机,一旦交换机接收不到消息,会将消息放进缓存中
8.1.3 配置文件
在配置文件application.properties中需要添加:
#开启发布确认 发布成功后触发回调方法
spring.rabbitmq.publisher-confirm-type=correlated
-
none
禁用发布确认模式,是默认值
-
correlated
发布消息成功到交换机后会触发回调方法
-
simple (类似单个确认发布)
两种效果
其一效果和correlated模式一样
其二,在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法,等待broker节点返回发送结果,根据返回结果判断下一步的逻辑
要注意的点是:waitForConfirmsOrDie方法如果返回false则会关闭channel,接下来无法发送消息到broker
8.1.4 添加配置类
package com.example.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 且听风吟
* @version 1.0
* @description: 发布确认(高级) 配置类
* @date 2022/4/27 0027 16:06
*/
@Configuration
public class ConfirmConfig {
/**
* 交换机
*/
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
/**
* 队列
*/
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
/**
* RoutingKey
*/
public static final String CONFIRM_ROUTING_KEY = "key1";
/**
* 声名交换机
*/
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
/**
* 声名队列
*/
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
/**
* 绑定 将交换机和队列通过routingKey绑定
* @param confirmQueue 队列
* @param confirmExchange 交换机
*/
@Bean
public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,
@Qualifier("confirmExchange") DirectExchange confirmExchange){
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
}
}
8.1.5 消息生产者
package com.example.rabbitmq.controller;
import com.example.rabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author 且听风吟
* @version 1.0
* @description: 发布确认高级 发送消息
* @date 2022/4/27 0027 16:50
*/
@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发正常消息
* @param message 正常消息
*/
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable String message){
//回调接口内使用
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData);
log.info("发送消息内容:{}",message);
}
}
8.1.6 消息消费者
package com.example.rabbitmq.consumer;
import com.example.rabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author 且听风吟
* @version 1.0
* @description: 接收消息 确认高级消息
* @date 2022/4/27 0027 16:57
*/
@Slf4j
@Component
public class Consumer {
@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
public void receiveConfirmMessage(Message message){
String msg = new String(message.getBody());
log.info("接收到的队列confirm.queue消息:{}",msg);
}
}
8.1.7 回调接口
package com.example.rabbitmq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @author 且听风吟
* @version 1.0
* @description: 发布确认高级 消息回调接口
* @date 2022/4/27 0027 17:22
*/
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 注入 @PostConstruct: Java注解,服务器启动时执行
* 初始化执行顺序:Constructor(构造方法) -> @Autowired(依赖注入) -> @PostConstruct(注释的方法)
*/
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
}
/**
* 交换及确认回调方法
* 该方法调用时机:发了消息 并且 交换机接收到了回调
* @param correlationData 保存回调消息的id及相关信息 (发消息的时候自己填写的)
* @param b 交换机是否收到消息 true & false
* @param s 回调失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id = correlationData != null ? correlationData.getId() : " ";
if (b){
log.info("交换机已经接收到id为:{}的消息",id);
}else {
log.info("交换机还未接收到id为 {} 的消息,由于原因:{}",id,s);
}
}
}
8.1.8 结果分析
浏览器发送请求: http://localhost:8080/confirm/sendMsg/hello
可以查看到交换机接收消息的情况,方便后续进行处理
8.2 回退消息
8.2.1 Mandatory 参数
- 在仅开启了生产者确认消息的情况下,交换机接收到消息后,会直接给消息生产者确认消息,如果发现该消息不可 路由(交换机将消息发给队列),那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的
那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,可以自己处理。通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。
在application.properties中添加:
#开启消息回退 路由不出去的消息会回退给生产者
spring.rabbitmq.publisher-returns=true
开启消息回退,路由不到队列中的消息会回退给生产者
8.2.2 消息生产者代码
添加一个错误的 routingKey 的发送消息代码,模拟消息路由到队列失败的情况
package com.example.rabbitmq.controller;
import com.example.rabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author 且听风吟
* @version 1.0
* @description: 发布确认高级 发送消息
* @date 2022/4/27 0027 16:50
*/
@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发正常消息
* @param message 正常消息
*/
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable String message){
//回调接口内使用
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData);
log.info("发送消息内容:{}",message);
CorrelationData correlationData2 = new CorrelationData("2");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY+"2", message,correlationData2);
log.info("发送消息内容:{}",message);
}
}
8.2.3 配置类代码
package com.example.rabbitmq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @author 且听风吟
* @version 1.0
* @description: 发布确认高级 消息回调接口
* @date 2022/4/27 0027 17:22
*/
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 注入 @PostConstruct: Java注解,服务器启动时执行
* 初始化执行顺序:Constructor(构造方法) -> @Autowired(依赖注入) -> @PostConstruct(注释的方法)
*/
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
/**
* 交换及确认回调方法
* 该方法调用时机:发了消息 并且 交换机接收到了回调
* @param correlationData 保存回调消息的id及相关信息(发消息的时候自己填写的)
* @param b 交换机是否收到消息 true & false
* @param s 回调失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id = correlationData != null ? correlationData.getId() : " ";
if (b){
log.info("交换机已经接收到id为:{}的消息",id);
}else {
log.info("交换机还未接收到id为 {} 的消息,由于原因:{}",id,s);
}
}
/**
* 消息 无法路由(到不了队列)时的回退方法 路由:消息由交换机发送到队列
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.info("消息 {},被交换机 {} 退回,退回原因{},路由key:{}",
returnedMessage.getMessage().getBody(),returnedMessage.getExchange(),
returnedMessage.getReplyText(),returnedMessage.getRoutingKey());
}
}
消息消费者代码没有改变
8.2.4 结果分析
浏览器发送请求: http://localhost:8080/confirm/sendMsg/hello
控制台显示结果:
可以看到无法路由到队列中的消息会被回退,不会造成消息的丢失。
8.3 备份交换机
8.3.1 代码架构图
对于消息无法路由到队列的情况,还有另一种解决方法,就是添加备份交换机,在备份交换机后可以添加 备份队列 和 报警队列
需要在代码中添加一个备份交换机、一个备份队列、一个报警队列、一个消费者,并将上面的确认交换机指向备份交换机,将备份交换机和两个队列绑定
8.3.2 修改配置类
package com.example.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @author 且听风吟
* @version 1.0
* @description: 发布确认(高级) 配置类
* @date 2022/4/27 0027 16:06
*/
@Configuration
public class ConfirmConfig {
/**
* 交换机
*/
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
/**
* 队列
*/
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
/**
* RoutingKey
*/
public static final String CONFIRM_ROUTING_KEY = "key1";
/**
* 备份交换机
*/
public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
/**
* 备份队列
*/
public static final String BACKUP_QUEUE_NAME = "backup.queue";
/**
* 报警队列
*/
public static final String WARNING_QUEUE_NAME = "warning.queue";
/**
* 声名交换机
*/
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
Map<String, Object> arguments = new HashMap<>();
arguments.put("alternate-exchange",BACKUP_EXCHANGE_NAME);
//确认交换机指向备份交换机 durable:是否持久化
return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)
.withArguments(arguments).build();
}
/**
* 声名队列
*/
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
/**
* 绑定 将交换机和队列通过routingKey绑定
* @param confirmQueue 队列
* @param confirmExchange 交换机
*/
@Bean
public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,
@Qualifier("confirmExchange") DirectExchange confirmExchange){
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
}
/**
* 声名备份交换机
* @return
*/
@Bean("backupExchange")
public FanoutExchange backupExchange(){
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}
/**
* 声名备份队列
*/
@Bean("backupQueue")
public Queue backupQueue(){
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}
/**
* 声名报警队列
*/
@Bean("warningQueue")
public Queue warningQueue(){
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
}
/**
* 备份队列绑定备份交换机
* @param backupQueue 备份队列
* @param backupExchange 备份交换机
*/
@Bean
public Binding backupQueueBinding(@Qualifier("backupQueue") Queue backupQueue,
@Qualifier("backupExchange") FanoutExchange backupExchange){
return BindingBuilder.bind(backupQueue).to(backupExchange);
}
/**
* 报警队列绑定备份交换机
* @param warningQueue 报警队列
* @param backupExchange 备份交换机
*/
@Bean
public Binding waringQueueBinding(@Qualifier("warningQueue") Queue warningQueue,
@Qualifier("backupExchange") FanoutExchange backupExchange){
return BindingBuilder.bind(warningQueue).to(backupExchange);
}
}
8.3.3 报警消费者代码
package com.example.rabbitmq.consumer;
import com.example.rabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author 且听风吟
* @version 1.0
* @description: 报警消费者
* @date 2022/4/27 0027 23:20
*/
@Slf4j
@Component
public class WarningConsumer {
@RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)
public void receiveWarningMsg(Message message){
String msg = new String(message.getBody());
log.error("报警发现不可路由消息:{}",msg);
}
}
8.3.4 测试注意事项
由于改变了之前的交换机代码,所以需要先在RabbitMQ后台管理界面删除之前的 确认交换机(confirm.exchange)
8.3.5 结果分析
上面的结果显示:
- 回退消息 和 备份交换机 可以一起使用,如果两者同时开启,备份交换机的优先级更高