目录
1.4 关于 RabbitMQ 的持久化、不公平分发以及预取值
什么是 RabbitMQ ?
1、初识 RabbitMQ 消息队列
1.1 MQ 四大核心概念
以下是 RabbitMQ 的原理图:
1.2 消息的发送(无交换机态)
这里使用MQ中间件进行简单的消息发送,大致流程图如下所示:
这里需要注意的是,当一次性有多条消息发送到队列时,这时需要多个消费者(工作线程),消费者进行消费信息是根据轮询的方式进行消费
创建一个Utils工具类,与 MQ 进行交互连接:
/**
* 这里是与 MQ 交互的工具类
*/
public class RabbitMQUtils {
public static Channel getChannel() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); //创建连接工厂
factory.setHost("192.168.101.65");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection(); //创建连接
return connection.createChannel(); //获取连接信道
}
}
【消息生产者】代码如下所示:
/**
* 生产者
*/
public class Produce {
public static final String QUEUE_NAME = "hello"; //队列名称
public static void main(String[] args) throws IOException, TimeoutException {
//这里创建一个工厂,与 RabbitMQ 进行交互
Channel channel01 = RabbitMQUtils.getChannel();
//1.队列名称 2.队列是否持久化 3.消息是否供多个消费者消费 4.消息是否自动删除 5.其他参数
channel01.queueDeclare(QUEUE_NAME,false,false,false,null);
String message = "hello mq"; //发消息
//1.对应的交换机 2.路由的KEY值(本次是队列名) 3.其他参数 4.发送消息的消息体
channel01.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息发送完毕!");
}
}
消息栏:
RabbitMQ 中(以上创建的 hello 队列):
【消息消费者】代码如下所示:
/**
* 消费者
*/
public class Consumer {
public static final String QUEUE_NAME = "hello"; //要进行消费消息的队列
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂,与MQ进行交互
Channel channel = RabbitMQUtils.getChannel();
//接收消息的回调
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println("成功接收消息:"+new String(message.getBody())); //接收其消息的消息体才能显示对应的消息
};
//取消消息时的回调
CancelCallback cancelCallback = (consumerTag) ->{
System.out.println(consumerTag + "消费者的消息被中断!");
};
/**
* 1.要被消费信息的队列
* 2.消费成功之后是否需要自动应答
* 3.消费成功时的回调
* 4.取消消息发送时的回调
*/
//消费者消费信息
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
消息栏:
MQ 中的消息已经被消费:
1.3 关于消息自动重新入队
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息 未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队;如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者;这样,即使某个消费者偶尔死亡,也可以确 保不会丢失任何消息
1.3.1 消息的常见应答方法(R)
丢失的消息重新入队,传递给正常工作的消费者进行消费的大致图:
由于生产者的代码没有改变,这里就不写了,以下是消费者(两个消费者只有 sleep 的时间不一样)关于 ACK 手动应答消息的代码:
/**
* 这里是消费者手动接受消息 ACK,使发送失败的消息重新排队
*/
public class Consumer01 {
public static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtils.getChannel();
SleepUtils.sleep(8); //模拟消息多的情况
//1、接收到消息的回调
DeliverCallback deliverCallback = (consumerTag,message) ->{
System.out.println("接收到消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
/**
* 手动应答
* 1. 消息的标记
* 2. 是否批量应答信道中的消息
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
//2.消息中断的回调
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag + "消费者取消了消息的接收!");
};
//3.使用手动应答
boolean autoACK = false;
channel.basicConsume(QUEUE_NAME,autoACK,deliverCallback,cancelCallback);
}
}
首先创建两个消费者,分别为C1和C2,这里生产者连续发送四条消息:
消费者一 处于正常状态,消费者二 接收了一条消息后就宕机了,这时,消费者一 将发送失败的消息从信道中取出并进行消费,结果图如下所示:
消费者一: 消费者二:
1.4 关于 RabbitMQ 的持久化、不公平分发以及预取值
队列的持久化:
平时消息队列都是保存在内存中,若 RabbitMQ服务 突然停止,则之前的队列都会消失;所以,为了减少损失的可能性,通常将消息队列保存到磁盘上,即持久化
boolean durable =true; //将队列进行持久化
//1.队列名称 2.队列是否持久化 3.消息是否供多个消费者消费 4.消息是否自动删除 5.其他参数
channel01.queueDeclare(QUEUE_NAME,durable,false,false,null);
消息的持久化:
将 MessageProperties.PERSISTENT_TEXT_PLAIN 标识放入 basicPublish消息发送方法的第三个参数中,以开启消息持久化
将消息标记为持久化并不能完全保证不会丢失消息;尽管它告诉 RabbitMQ 将消息保存到磁盘,但是,这里依然存在当消息刚准备存储在磁盘的时候,还没有存储完,消息还在缓存的一个间隔点;此时并没有真正写入磁盘,持久性保证并不强
while (sc.hasNext()){
String message = sc.next(); //发消息
//1.对应的交换机 2.路由的KEY值(本次是队列名) 3.其他参数 4.发送消息的消息体
channel01.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
System.out.println("消息发送完毕!");
}
不公平分发(在消费者处开启):
相对于轮询分发,不公平分发采用能者多劳的策略,谁干的快消息就先给谁发送,避免慢进程拖慢整个服务的进度
预取值:
不公平分发的值设置为1,若设置的数值大于1,则表示为预取值;所谓的预取值,是设置消费者缓冲信道中最大存储的数量;
比如:消费者C1设置预取值为2,消费者C2设置预取值为5,假设有8条消息进来时,C1有可能消费了3条,因为已经消费的消息不算入“预取值”内;而C2信道中存入5条消息,若这五条消息即使还未被C2消费,C1也不能将其消费,因为这5条消息已经放入C2的信道中进行等待排队了
MQ 中:
可见,这里明确标明了对应消费者的预取值
2、RabbitMQ 消息的发布确认
在设置发布确认时,一般有三个步骤:
设置队列的持久化 --->> 设置队列中的消息进行持久化 --->> 通过MQ将消息保存在磁盘上,然后MQ跟生产者说明一声 “已经保存在磁盘上”(这里就是消息确认)
2.1 MQ的单个确认发布
定义:
这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布;如果没有确认发布的消息,就会阻塞所有后续消息的发布
这里是模拟消息单个确认的代码:
/**
* 单个消息确认发布
*/
public static void SingleConfirmMessage () throws Exception{
Channel channel = RabbitMQUtils.getChannel();
String QUEUE_NAME = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME,durable,false,false,null); //开启队列的持久化
//开启消息的发布确认
channel.confirmSelect();
long begin = System.currentTimeMillis(); //开始时间
for (int i =0;i<MESSAGE_COUNT;i++){
String message = i + "";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
//消息单个确认,发送成功一次就确认一次
boolean singleRes = channel.waitForConfirms();
if(singleRes) {
System.out.println("消息发布成功!");
}
}
long end = System.currentTimeMillis(); //结束时间
System.out.println("总耗时为:"+(end-begin)+"ms");
}
运行结果:
可见,总耗时时间为 1269 ms,虽然保证了消息的可靠性,但是性能下来了,需要一条条确认
2.2 MQ的批量确认发布
定义:
这是也一种同步确认发布消息的方式;先发布一批消息,然后一起确认可以极大地提高吞吐量;当然这种方式的缺点就是:由于是批量确认发布,当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息,而后重新发布消息
这里是模拟批量确认发布的代码:
/**
* 批量消息确认发布
*/
public static void BatchConfirmMessage() throws Exception{
Channel channel = RabbitMQUtils.getChannel();
String QUEUE_NAME = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME,durable,false,false,null); //开启队列的持久化
//开启消息的发布确认
channel.confirmSelect();
long begin = System.currentTimeMillis(); //开始时间
int batchSize = 100; //批量确认的长度
for (int i=0;i<MESSAGE_COUNT;i++){
String message = i + "";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
//每发送一百条数据,就进行批量发布确认
if(i % batchSize == 0){
boolean batchRes = channel.waitForConfirms();
if(batchRes) {
System.out.println("批量发送消息成功!");
}
}
}
long end = System.currentTimeMillis(); //结束时间
System.out.println("总耗时为:"+(end-begin)+"ms");
}
运行结果:
可见,总耗时为 199 ms, 相比于单个确认发布,在性能方面有了很大的提升,但是容错率相对来说就升高了,因为由于批量,很难确定是哪一条消息出现了错误
2.3 MQ的异步确认发布(重点)
定义:
很显然,这是一种异步确认发布消息的方式,异步虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说;它是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功
大致流程图:
异步发送消息时,不需要等待当前消息经过确认后才能将之后的消息发送出去;我们要做的只是发布消息,其余的交给 broker 中间人处理;而最终的消息是否发布成功,取决于之后的回调确认消息的函数,由于每一个发出去的消息都有 KEY 和 VALUE ,因此,我们能很快的找到对应发送失败的消息
存在问题:
解决方案:
这里是模拟异步确认发布的代码:
/**
* 异步确认发布消息
*/
public static void AsynchronousConfirmMessage() throws Exception{
Channel channel = RabbitMQUtils.getChannel();
String QUEUE_NAME = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME,durable,false,false,null); //开启队列的持久化
//1.开启消息的发布确认
channel.confirmSelect();
/**
* 线程安全有序的一个哈希表,适用于高并发的情况
* 【优点】:
* 1.将消息与对应的序号相关联
* 2.批量的根据序号删除条目
* 3.支持高并发
*/
ConcurrentSkipListMap<Long,String> concurrentSkipListMap
= new ConcurrentSkipListMap<>();
//2.这里准备消息监听器,以便于监听消息的成功与否 (delivery:消息的编号,multiple:用来判断是否为批量)
ConfirmCallback ACK_callback = (deliveryTag,multiple) ->{ //消息确认成功 回调函数
//2.2【第二步】若是批量发消息,则进行批量的删除
//【注意这里只有已经确认的消息,不会干扰到未确认的消息】
if(multiple) {
ConcurrentNavigableMap<Long, String> concurrentNavigableMap
= concurrentSkipListMap.headMap(deliveryTag);
concurrentNavigableMap.clear();
}else {
//2.3 若是单个发消息,则单个删除
concurrentSkipListMap.remove(deliveryTag);
}
log.info("确认的消息编号:"+deliveryTag);
};
ConfirmCallback NACK_callback = (deliveryTag,multiple)->{ //消息确认失败 回调函数
log.error("未确认的消息编号:"+deliveryTag);
};
channel.addConfirmListener(ACK_callback,NACK_callback); //将以上回调确认函数添加到监听器中
long begin = System.currentTimeMillis(); //开始时间
//【第一步】这里为模拟消息的发送
for (int i=0;i<MESSAGE_COUNT;i++){
String message = i +"";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
//3.记录消息的总和,往里面存入信道的序号以及对应序号的信息
//channel.getNextPublishSeqNo() 表示获取当前消息的下一个消息编号
concurrentSkipListMap.put(channel.getNextPublishSeqNo(),message);
}
long end = System.currentTimeMillis(); //结束时间
System.out.println("总耗时为:"+(end-begin)+"ms");
}
关于其中使用 concurrentSkipListMap.headMap(deliveryTag) 进行批量删除的解释说明:
运行结果:
可见,异步确认发布消息效率比以上两种方式都要高,由于是异步发送的消息,所以顺序会很不一致
3、关于 Exchanges 交换机
四种 MQ 交换机(未按先后排序):
4、死信队列(重点)
定义:死信,顾名思义,死掉的信息,即无法被消费的消息;由于存在有该类型的消息,所以对应保存该类型的队列随即产生,即死信队列
应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息 消费发生异常时,将消息投入死信队列中;例:用户在商城下单成功并点击去支付后,在指定时间未支付,则自动失效
死信队列大致流程图:
- 正常情况下,生产者(producer)将消息通过普通交换机(normal_exchange)所绑定的普通队列(normal_queue)发送到消费者 C1中
- 而异常情况下,将异常的消息保存到死信队列(dead_queue),并发送到消费者 C2 中
【消费者C1】
C1需要不仅需要处理正常消息的发送,还需要处理失效消息往死信队列中的传递,以及普通队列与死信队列之间的绑定关系
/**
* 模拟死信队列
*
* 消费者 C1
*/
public class Receive01 {
public static final String Normal_Exchange = "normal_exchange"; //普通交换机
public static final String Dead_Exchange = "dead_exchange"; //死信交换机
public static final String Normal_Queue = "normal_queue"; //普通队列
public static final String Dead_Queue = "dead_queue"; //死信队列
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
//1.声明死信以及普通队列的交换机
channel.exchangeDeclare(Normal_Exchange,BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(Dead_Exchange,BuiltinExchangeType.DIRECT);
//2.声明死信队列以及普通队列
HashMap<String, Object> deadLetters = new HashMap<>();
//2.1 普通队列设置死信交换机(注意:这里的 KEY 是固定的)
deadLetters.put("x-dead-letter-exchange",Dead_Exchange);
//2.2 设置死信的 Routing Key
deadLetters.put("x-dead-letter-routing-key","list");
//2.3 设置过期时间(这里不进行设置)
// deadLetters.put("x-message-ttl",10000);
channel.queueDeclare(Normal_Queue,false,false,false,deadLetters);
channel.queueDeclare(Dead_Queue,false,false,false,null);
//3.将队列与交换机进行绑定
channel.exchangeBind(Normal_Queue,Normal_Exchange,"zhangsan");
channel.exchangeBind(Dead_Queue,Dead_Exchange,"lisi");
System.out.println("等待接收消息中.....");
//4.接收到消息的回调
DeliverCallback deliverCallback = (consumerTag,message) ->{
System.out.println("消费者一接收到消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
};
//5.消息中断的回调
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag + "消费者取消了消息的接收!");
};
channel.basicConsume(Normal_Queue,true,deliverCallback,cancelCallback);
}
}
【生产者Producer】
这里模拟的是消息被拒
生产者不需要管理队列的消息是否发送成功,只需要将消息发送到普通队列中
public static final String Normal_Exchange = "normal_exchange"; //普通交换机
public static final int MESSAGE_COUNT = 10; //消息的总数
public static final String TTL_TIME = "10000"; //过期时间
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
//1.声明一个交换机
channel.exchangeDeclare(Normal_Exchange, BuiltinExchangeType.DIRECT);
//2.死信消息,设置 TTL 消息过期时间,过期则传送到死信队列中
AMQP.BasicProperties properties =
new AMQP.BasicProperties()
.builder().expiration(TTL_TIME).build(); //10s
//3.这里进行依次发送消息,同时设置了消息过期时间
for (int i=0;i<=MESSAGE_COUNT;i++){
String message = "info"+i;
channel.basicPublish(Normal_Exchange,"zhangsan",properties,message.getBytes());
System.out.println("消息生产者发送消息:"+message);
}
}
【消费者C2】
消费者C2的任务是只需要消费死信队列中的消息
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
//1.声明死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
System.out.println("消费者二等待接收消息...");
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println("消费者二接收到消息:"+ new String(message.getBody(), StandardCharsets.UTF_8));
};
CancelCallback cancelCallback = (consumerTag)->{
System.out.println(consumerTag+"消费者中断了消息..");
};
//2.消费死信队列中的消息
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);
}
运行结果:
MQ 中:
这里,进行中断普通队列的接收,模拟死信队列场景
当死信队列中存在未被消费的消息时,C2感应到存在的消息,并将之前发送失败的消息进行消费
5、延迟队列(整合SpringBoot)
定义:
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理;简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列
延迟队列一般使用的场景:
代码架构图:
创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信 交换机 Y,它们的类型都是 Direct,创建一个 死信队列 QD,它们的绑定关系如下:
当然,这里进行整合 SpringBoot 进行使用 🔜
【这里是 application.yaml 文件中的配置】
spring:
#这里是 RabbitMQ 的配置
rabbitmq:
port: 5672 #指定的 rabbitMQ 服务器端口号
username: admin
password: 123
host: 192.168.101.65
【这里是延迟队列队列以及交换机的配置类】
这里是图中的中间那一段,也就是队列以及交换机的声明以及绑定
/**
* 延迟队列中【队列】与【交换机】的 "声明" 以及 "绑定" 的配置类
*/
@Configuration
public class TTLQueueConfig {
//普通交换机
public static final String Normal_Exchange = "X";
//死信交换机
public static final String Dead_Exchange = "Y";
//普通队列
public static final String Normal_QueueA = "QA";
public static final String Normal_QueueB = "QB";
//死信队列
public static final String Dead_Queue = "QD";
//1.声明交换机
//1.1声明普通交换机
@Bean("Normal_Exchange")
public DirectExchange Normal_Exchange(){
return new DirectExchange(Normal_Exchange);
}
//1.2 声明死信交换机
@Bean("Dead_Exchange")
public DirectExchange Dead_Exchange(){
return new DirectExchange(Dead_Exchange);
}
//2.声明普通队列,与死信交换机进行绑定,并声明过期时间
@Bean("Normal_QueueA") //【队列A】
public Queue QA(){
HashMap<String, Object> arguments = new HashMap<>(2); //这里初始化 map 的长度,加快编译速度
//2.1 设置死信交换机
arguments.put("x-dead-letter-exchange",Dead_Exchange);
//2.2 设置死信 Routing Key
arguments.put("x-dead-letter-routing-key","YD");
//2.3 设置 TTL 过期时间
// arguments.put("x-message-ttl",10000);
return QueueBuilder
.durable(Normal_QueueA) //开启队列持久化
.withArguments(arguments).ttl(10000).build();
}
@Bean("Normal_QueueB") //【队列B】
public Queue QB(){
HashMap<String, Object> arguments = new HashMap<>(2); //这里初始化 map 的长度,加快编译速度
arguments.put("x-dead-letter-exchange",Dead_Exchange);
arguments.put("x-dead-letter-routing-key","YD");
// arguments.put("x-message-ttl",30000);
return QueueBuilder
.durable(Normal_QueueB)
.withArguments(arguments).ttl(30000).build();
}
//3.声明死信队列
@Bean("Dead_Queue")
public Queue QD(){
return QueueBuilder
.durable(Dead_Queue).build();
}
//4.将普通交换机与队列A进行绑定
@Bean
public Binding QA_Binding_NormalQueue(@Qualifier("Normal_QueueA") Queue queueA,
@Qualifier("Normal_Exchange") DirectExchange normalExchange){
return BindingBuilder.bind(queueA)
.to(normalExchange).with("XA");
}
//4.1将普通交换机与队列B进行绑定
@Bean
public Binding QB_Binding_NormalQueue(@Qualifier("Normal_QueueB") Queue queueB,
@Qualifier("Normal_Exchange") DirectExchange normalExchange){
return BindingBuilder.bind(queueB)
.to(normalExchange).with("XB");
}
//4.2将死信交换机与死信队列进行绑定
@Bean
public Binding QD_Binding_DeadExchange(@Qualifier("Dead_Queue")Queue deadQueue,
@Qualifier("Dead_Exchange")DirectExchange deadExchange){
return BindingBuilder.bind(deadQueue)
.to(deadExchange).with("YD");
}
}
【消息生产者】
这里打算发送一个消息请求,分别给不同的 TTL 队列
/**
* 消息生产者
*
* 这里进行发送 http://localhost:8080/ttl/sendMsg/小白
*/
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
@Resource
RabbitTemplate rabbitTemplate;
//发送消息
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable("message")String message){
log.info("当前时间:{},发送了一条信息:{}给两个 TTL 队列",new Date(),message);
rabbitTemplate.convertAndSend("X","XA","消息来自10s的队列:"+message);
rabbitTemplate.convertAndSend("X","XB","消息来自30s的队列:"+message);
}
}
【消息消费者】
使用 Listener 监听器进行死信队列的监听
/**
* 这里是消息的消费者
*/
@Slf4j
@Component
public class DeadLetterConsumer {
//死信队列中进行接收 TTL 延迟消息
@RabbitListener(queues="QD") //调用监听器监听死信队列
public void DeadQueue_consumer(Message message, Channel channel){
String msg = new String(message.getBody());
log.info("当前时间:{}"+"消费者死信队列接收到消费的消息:{}",new Date().toString(),msg);
}
}
运行结果:
发现问题🤔:
若以后需要多个不同的 TTL 消息,那么就需要建立多个消息队列,以达到传递不同 TTL 的消息;这样导致耦合度升高,不符合开闭原则,所以接下来进行延迟队列的优化
解决问题:
新增一个 QC 队列,这个队列不设置延迟时间,而是让 Producer消息生产者 发送消息的时候进行设置消息的 TTL 时间,这样,就不用频繁改动队列的 TTL 时间
做绑定的代码跟上面一样,不做展示,这里是生产者的代码,进行设置发送消息的 TTL时间:
@GetMapping("/sendMsg/ttl/{message}/{ttlTime}")
public void QC_sendMsgByTTL(@PathVariable("message") String message,@PathVariable("ttlTime") String ttlTime){
log.info("当前时间:{},发送了一条消息给QC:{}",new Date().toString(),message);
rabbitTemplate.convertAndSend("X","QC",message,msg->{
//这里进行设置消息的 TTL 时间
msg.getMessageProperties().setExpiration(ttlTime);
return msg;
});
}
运行结果(这里使用的是低版本的 RabbitMQ):
由结果可知,由于队列的先进先出特性,先发的 TTL消息若时间设置大于后发的 TTL消息,那么,后发的消息就会被堵塞,直到先发的 TTL消息发送完毕,后发的 TTL 消息才能继续发送,这是一个弊端
6、备份交换机(重点)
定义:
大致流程图如下:
【application.yaml配置类】
这里需要手动的开启消息回调与失败消息的回退
spring:
#这里是 RabbitMQ 的配置
rabbitmq:
port: 5672 #指定的 rabbitMQ 服务器端口号
username: admin
password: 123
host: 192.168.101.65
#这里进行开启发布确认以及消息的回调
publisher-confirm-type: correlated
#将发送失败的消息回退给生产者
publisher-returns: true
【消息回调接口】
需要注意的是:RabbitTemplate 注入必须在 init 前,不然会报未注入异常
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
@Resource
RabbitTemplate rabbitTemplate;
/**
* 由于这里重写继承接口中的方法,所以需要进行注入操作
*/
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
/**
* 交换机回调信息的接口
* @param correlationData 保存回调消息的 id 以及相关的信息
* @param Ack_Message 消息确认
* @param reason 消息发送失败,回调的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean Ack_Message, String reason) {
String messageId = "";
if (ObjectUtil.isNotNull(correlationData)) { //判断是否为空,防止空指针异常
messageId = correlationData.getId();
}
if (Ack_Message) {
log.info("成功接收到消息,消息ID为:{}", messageId);
} else {
log.info("接收消息失败,消息ID为:{},失败的原因为:{}", messageId, reason);
}
}
/**
* 将发送失败的消息进行回退
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.error("消息“:{} 被交换机:{} 退回,退回的原因:{},消息的 RoutingKey:{}",
new String(returnedMessage.getMessage().getBody()),
returnedMessage.getExchange(),returnedMessage.getReplyText(),returnedMessage.getRoutingKey());
}
}