目录
1.1消息从生产者到交换机有可能会丢失。这里可以通过confirm机制来解决
1.2交换机到队列也有可能会丢失。这里可以通过return机制来解决
1.33、从队列到消费者也有可能会丢失。这里可以通过手动ACK解决。
一、为什么要用到RabbitMq?
二、RabbitMq有什么作用?
1.解耦
2.异步
三、RabbitMq的模型
1.helloword模型
2.Work模型
3.发布订阅模型
4.路由键模型
5.主题模型
四、RabbitMq跟SpringBoot的整合
1.导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.yml配置
spring:
rabbitmq:
host: 192.168.107.123 #虚拟主机的ip地址
port: 5672 #RabbitMq的端口号
username: guest #匿名用户
password: guest
virtual-host: / # 虚拟机主机,队列就是保存在虚拟主机中
3.创建队列、创建交换机、将队列与交换机绑定并设置路由键
// 交换机的类型是路由键
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct-exchange");
}
@Bean
public Queue directQueue1() {
return new Queue("direct-queue1");
}
@Bean
public Queue directQueue2() {
return new Queue("direct-queue2");
}
// 绑定
@Bean
public Binding directQueue1Bind() {
// 给direct-queue1绑定了两个队列
BindingBuilder.DirectExchangeRoutingKeyConfigurer to = BindingBuilder.bind(directQueue1()).to(directExchange());
// 绑定了两个路由键
to.with("error");
Binding warn = to.with("info");
return warn;
}
@Bean
public Binding directQueue2Bind() {
return BindingBuilder.bind(directQueue1()).to(directExchange()).with("info");
}
4.生产者发送消息
@Test
void contextLoads() {
rabbitTemplate.convertAndSend("direct-exchange", "error", "toString");
System.out.println("生产者消息发送完成");
}
5.消费者消费消息
@Component
public class HelloQueueListener {
@RabbitListener(queues = "direct-queue1")
public void cosnuermMsg(String msg) {
System.out.println("消费者拿到的数是:" + msg);
}
}
五、ACK机制
1.什么是消息确认机制?
2.手动开启ACK
1.ylm配置文件
spring:
rabbitmq:
host: 192.168.127.102
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
acknowledge-mode: manual # 手动ACK
2.消费者应答
@RabbitListener(queues = "hello-queue")
public void cosnuermMsg(String msg, Channel channel, Message message) {
System.out.println("消费者拿到的数是:" + msg);
// 每个消息的标识
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 开始消费消息
Boolean flag = customerData(msg);
if (flag) {
// 确定消息消费成功,应答ACK
// 第一个参数:消息的唯一标识
// 第二个参数:是否批量应答,一般都是false
channel.basicAck(deliveryTag, false);
System.out.println("消息成功,应答ACK");
return;
}
System.out.println("消息过程中没有出现异常,但是消息没有消费成功,应答NACK");
} catch (Exception e) {
e.printStackTrace();
System.out.println("消费过程中出现了异常,应答NACK");
}
// 消息消费失败,应答NACK
// 第三个参数是:是否压入队列,如果设置为false该消息就丢弃了
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException e) {
e.printStackTrace();
}
}
// 完成消息的消费
private Boolean customerData(String msg) {
Integer i = Integer.parseInt(msg);
if (i == 0) { // 没有插入成功,但是也没有出现异常
return false;
}
return true;
}
六、消息的可靠性
1.消息可靠性讲的不能丢失,MQ是如何保证消息可靠性的?
1.1消息从生产者到交换机有可能会丢失。这里可以通过confirm机制来解决
1.2交换机到队列也有可能会丢失。这里可以通过return机制来解决
1.33、从队列到消费者也有可能会丢失。这里可以通过手动ACK解决。
2.confirm和return机制的实现
2.1yml配置
spring:
rabbitmq:
host: 192.168.127.102
port: 5672
username: guest
password: guest
virtual-host: /
publisher-returns: true #开启return
publisher-confirm-type: simple # 开启confirm
/**
* confirm机制和return机制
*/
@Component
public class MsgConfirm implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
//这里要对mq的return和confirm进行覆盖
rabbitTemplate.setReturnCallback(this);
rabbitTemplate.setConfirmCallback(this);
}
// confirm机制确认消息是否到了交换机
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息已经到了交换机");
} else {
System.out.println("消息没到了交换机");
}
}
// 确认消息是否到了队列
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("消息没有到队列," + replyText + "," + exchange + "," + routingKey);
}
3.MQ是如何实现消息确认机制的?
4.消息补偿机制
5.服务端实现远程调用(RPC)
5.1通过java网络编程包
5.2RestTemplate
@Test
void contextLoads2() throws Exception {
// 这个类可以发送一个请求过去
RestTemplate restTemplate = new RestTemplate();
String info = restTemplate.getForObject("http://localhost:8080/send?msg=HTTP", String.class);
System.out.println("远程调用的返回的结果:"+info);
}
5.3ApacheHttpClient:通过工具类即可
七、消息的重复消费
1.消息被消费多次的后果
2.怎么解决消息被重复消费
3.为什么消息会被重复消费