0
点赞
收藏
分享

微信扫一扫

SpringBoot-RabbitMQ发送消息的监控

概述

刚才我们发送消息,不管成功还是失败,都不报错,结果看效果时,发现有的没有发进去,那么如何知道消息是否发送成功呢,RabbitMQ提供了一个消费监视的功能。注意:RabbitMQ发送消息分为2个阶段,消息发送到交互机里面,可以监视,消息由交互机到队列里面,也可以监视。

创建项目

SpringBoot-RabbitMQ发送消息的监控_RabbitMQ

需要的依赖如下:

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

配置 application.yml,将原来的 properties 改一下后缀名就行,内容如下:

server:
port: 8080
spring:
application:
name: Springboot-RabbitMQ
rabbitmq:
username: user
password: 123456
host: 139.196.183.130
port: 5672
virtual-host: v-it6666
# 开启消息到达交换机的确认机制
publisher-confirm-type: correlated
# 消息由交换机到达队列时失败触发
publisher-returns: true

监控消息到达交换机

创建 ConfirmImpl 给 RabbitTemplate 设置消息到达交换机的回调对象,内容如下所示:

/**
* @author BNTang
*/
@Component
public class ConfirmImpl implements RabbitTemplate.ConfirmCallback {

@Autowired
private RabbitTemplate rabbitTemplate;

@Autowired
private ConfirmImpl confirmImpl;

@PostConstruct
private void initRabbitTemplate() {
this.rabbitTemplate.setConfirmCallback(confirmImpl);
}

/**
* 消息到达交换机后,该方法会回调
*
* @param correlationData 相关的数据
* @param ack 交换机是否接收成功
* @param cause 如果没有接收成功,返回拒绝的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("交换机接收消息成功");
} else {
System.out.println("交换机接收消息失败,失败原因为:" + cause);
}
}
}

发送消息

@SpringBootTest
class SpringbootRabbitmqApplicationTests {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
void contextLoads() {
this.rabbitTemplate.convertAndSend("directs", "error", "error 的日志信息");
System.out.println("消息发送成功");
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

SpringBoot-RabbitMQ发送消息的监控_spring_02

监控消息由交换机到队列里面

创建 ReturnsImpl 给 RabbitTemplate 设置消息到达队列失败后回调对象,内容如下所示:

/**
* @author BNTang
*/
public class ReturnsImpl implements RabbitTemplate.ReturnCallback {

@Autowired
private RabbitTemplate rabbitTemplate;

@Autowired
private ReturnsImpl returnsImpl;

@PostConstruct
private void initRabbitMQ() {
this.rabbitTemplate.setReturnCallback(returnsImpl);
}

/**
* 当消息到达队列失败时,回调的方法,消息被退回了,我们可以把消息记录下来,分析错误的原因,以后重新发送,这样的话,消息就不会再丢失了
*
* @param message 消息
* @param replyCode 回退的响应码
* @param replyText 响应文本
* @param exchange 该消息来自哪个交换机
* @param routingKey 该消息的路由key
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println(message);
System.out.println(replyCode + " " + replyText + " " + exchange);
}
}

测试方式,不用启动消费者,然后再去 RabbitMQ 的管理界面中删除对应的交换机,然后在发送消息即可验证。

优化消息监控和到达队列的监控

创建 WatchMessageImpl,内容如下所示:

/**
* @author BNTang
*/
@Component
public class WatchMessageImpl implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

@Autowired
private RabbitTemplate rabbitTemplate;

@PostConstruct
private void initRabbitTemplate() {
this.rabbitTemplate.setConfirmCallback(this);
this.rabbitTemplate.setReturnCallback(this);
}

/**
* 消息到达交换机的回调
*
* @param correlationData
* @param ack 是否到达交换机
* @param cause 如果没有接收成功,返回拒绝的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息正常到达交换机");
} else {
System.out.println("消息没有到达交换机,原因为:" + cause);
}
}

/**
* 当消息到达队列失败时,回调的方法,消息被退回了,我们可以把消息记录下来,分析错误的原因,以后重新发送,这样的话,消息就不会再丢失了
*
* @param message 消息体
* @param replyCode 回退的响应码
* @param replyText 响应文本
* @param exchange 该消息来自哪个交换机
* @param routingKey 该消息的路由key
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println(new String(message.getBody()) + "到达队列失败:" + replyCode + " " + replyText + " 交换机为:" + exchange + " 路由key:" + routingKey);
// 处理重新发送的问题
}
}

消息转换参数的问题

可以传自定义对象,但是自定义的对象必须序列化,在实际开发中一般使用 JSON 串去传自定义对象。

SpringBoot-RabbitMQ发送消息的监控_SpringBoot_03




举报

相关推荐

0 条评论