目录
RabbitMQ
MQ的相关概念
什么是MQ
MQ(message queue),从字面意思上看,本质是个队列, FIFO先入先出,只不过队列中存放的内容是message而已,还是一-种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务。
MQ的特点
- 流量消峰(使用队列的性质)
- 应用解耦(使用中间件的性质)
- 异步处理(使用消息的性质)
RabbitMQ
:RabbitMQ是- -个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裏时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑RabbitMQ一个快递站, 一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据。
四大核心概念:
- 生产者:产生数据发送消息的程序是生产者
- 交换机是RabbitMQ非常重要的一个部件, -方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定
- 消费者:消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。 请注意生产者,消费
者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。 - 队列:队列是RabbitMQ内部使用的一-种数据结构,尽管消息流经RabbitMQ和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式。
内部构造:
消息队列协议
消息中间件使用什么协议:Openwire,AMQP,MQTT,Kafka,OpenMessage协议。
消息中间件协议和http协议的区别
- 消息中间件协议较为简洁和高效
- http一般是短连接,消息中间件协议是一个长期获取消息的行为,出现问题就要对数据进行持久化,用来保证高可用。
消息持久化
将数据存入磁盘,而不是存在内存中随服务器重启断开而消失,使数据能够永久保存。
消息的分发策略
发布订阅,轮询分发,公平分发,重发,消息拉取
RabbitMQ
核心
Hello World
:简单事件。单生产者,单消费者,单队列(简单模式)Work Queue
:单队列分发任务。单生产者,多消费者,单队列(工作模式)Publish Subscribe
:多队列分发任务。单生产者,单交换机,多队列,多消费者(发布/订阅模式)Routing
:路由模式。Topics
:主题模式。
docker安装RabbitMQ
# 下载镜像
docker pull rabbitmq
rabbitmq
三个端口
4369/tcp, 5671-5672/tcp, 15691-15692/tcp, 25672-15672/tcp
-
5672:client端通信口
-
15672:管理界面ui端口
-
25672:server间内部通信口
添加用户
# 创建用户
rabbitmqctl add_user username password
# 设置用户角色
rabbitmqctl set_user_tags username administrator
# 设置用户权限
rabbitmqctl set_permissions -p "/" username ".*" ".*" ".*"
# 列出用户列表
rabbitmqctl list_users
AMQP协议
AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个进程间传递异步消息的网络协议。
AMQP主要做了三件事
- 创造任意类型交换机和消息队列的能力
- 连接交换机和消息队列创造一个理想的消息处理系统的能力
- 完全通过协议控制的能力
AMQP协议:
特点:生产者-消息队列-消费者
发布/订阅fanout模式
特点:广播机制,没有路由key的模式
路由direct模式
特点:有routing-key
匹配模式
主题topic模式
特点:模糊的routing-key
匹配模式
工作work模式
特点:分发机制
当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢?
主要有两种模式:
- 轮询模式的分发:一个消费者-条,按均分配;
- 公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少,按劳分配;
参数header模式
特点:参数匹配模式
MQ的使用场景
-
解耦、削峰、异步
- 同步、异步的问题(串行):串行执行时间较长
- 并行方式,异步线程池:需要自己维护线程池、持久性、高可用都需要自己实现,最重要的是耦合在应用程序中
- 异步消息队列:
-
高内聚、低耦合
-
流量的削峰
-
分布式事务的可靠消息和可靠生产
-
索引、缓存、静态化处理的数据同步
-
流量监控
-
日志监控(ELK)
-
下单、订单分发、抢票
SpringBoot整合RabbitMQ
以fanout订阅/发布模式为例
加入依赖包
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
生产者类
// server.serverOrderService
package com.liuhao.springproducer;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
@Service
public class serviceOrderService {
private RabbitTemplate rabbitTemplate;
@Autowired
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void makeOrder(String userId, String productId, Integer num) {
String s = UUID.randomUUID().toString();
String exchangeName = "fanout_order";
String routingKey = "";
rabbitTemplate.convertAndSend(exchangeName, routingKey, s);
}
}
配置类
package com.liuhao.springproducer;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
// 创建交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout_order", true, false);
}
// 创建消息队列
@Bean
public Queue smsQueue() {
return new Queue("sms.fanout.queue", true);
}
@Bean
public Queue emailQueue() {
return new Queue("email.fanout.queue", true);
}
@Bean
public Queue wxQueue() {
return new Queue("wx.fanout.queue", true);
}
// 绑定交换机和消息队列
@Bean
public Binding smsBinding() {
return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
}
@Bean
public Binding emailBinding() {
return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
}
@Bean
public Binding wxBinding() {
return BindingBuilder.bind(wxQueue()).to(fanoutExchange());
}
}
消费者
package com.liuhao.demo1.service;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@RabbitListener(queues = {"email.fanout.queue"})
@Service
public class EmailService {
@RabbitHandler
public void receive(String msg) {
System.out.println(msg);
}
}
TTL过期时间
过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除
RabbitMQ可以对消息队列设置TTL。目前有两种方法可以设置。
- 第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。
- 第二种方法是对消息进行单独设置,每条消息TTL可以不同。
对队列进行设置
@Bean
public Queue wxQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 5000); // 这个队列的消息存在时间为5000秒
return new Queue("wx.fanout.queue", true, false, false, args);
}
对消息进行设置
public void makeOrder(String userId, String productId, Integer num) {
String s = UUID.randomUUID().toString();
String exchangeName = "fanout_order";
String routingKey = "";
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("5000"); // 设置过期时间
message.getMessageProperties().setContentEncoding("utf-8");
return message;
}
};
rabbitTemplate.convertAndSend(exchangeName, routingKey, s, messagePostProcessor);
}
死信队列
DLX,全称为Dead-Letter- Exchange,可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX,绑定DLX的队列就称之为死信队列
消息变成死信,可能是由于以下的原因:
- 消息被拒绝
- 消息过期
- 队列达到最大长度
DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属
性。当这个队列中存在死信时,Rabbitmq就 会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另-个队
列,即死信队列。
要想使用死信队列,只需要在定义队列的时候设置队列参数x-dead-letter-exchange
指定交换机即可。
@Bean
public Queue wxQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 5000); // 这个队列的消息存在时间为5000秒
args.put("x-dead-letter-exchange", "DLX");
args.put("x-dead-letter-routing-key", "DLX_routing_key");
return new Queue("wx.fanout.queue", true, false, false, args);
}
内存磁盘的监控
内存警告
当内存使用超过配置的阈值或者磁盘空间剩余空间对于配置的阈值时,RabbitMQ会暂时阻塞 客户端的连接,并且停止
接收从客户端发来的消息,以此避免服务器的崩溃,客户端与服务端的心态检测机制也会失效。
设置方式:命令
rabbitmqctl set_vm_memory_high_watermark <fraction>
rabbitmqctl set_vm_memory_high_watermark absolute 50MB
设置方式:配置文件
# 相对值
vm_memory_high_watermark.relative = 0.6
# 绝对值
vm_memory_high_watermark.absolute = 50MB
内存换页
在某个Broker节点及内存阻塞生产者之前,白会尝试将队到中的消息换页到磁盘以释放内存空间,持久化和非持久化的消息都会写入磁盘中,其中持久化的消息本身就在磁盘中有-个副本,所以在转移的过程中持久化的消息会先从内存中清除掉。
# 一般小于1
vm_memory_high_watermark_paging_ratio = 0.7
磁盘预警
当磁盘的剩余空间低于确定的阈值时,RabbitMQ同样会阻塞生产者,这样可以避免因非持久化的消息持续换页而耗尽磁盘空间导致服务器崩溃。
分布式事务
分布式事务概念:分布式事务指事务的操作位于不同的节点上,需要保证事务的AICD特性。例如在下单场景下,库存和订单如果不在同一个节点上,就涉及分布式事务。
分布式事务的方式
两阶段提交(2PC),通过引入协调者(coordinator)来协调参与者的行为,并决定这些参与者是否真正要执行事务。
- 缺点:同步阻塞、单点问题、数据不一致、没有容错机制。
补偿事务(TCC),TCC其实就是采用的补偿机制,其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销) 操作。它分为三个阶段:
- Try阶段主要是对业务系统做检测及资源预留
- Confirm阶段主要是对业务系统做确认提交,下ry阶段执行成功并开始执行Confirm阶段时,默认一Confirm阶段是不会出错的,郎只要Try成功, Confirm一定成功。
- Cancel阶段主要是在业务执行错误,需要 回滚的状态下执行的业务取消1预留资源释放。
- 缺点:2、3步可能失败
本地消息表(异步确保),本地消息表与业务数据表处于同一个数据库中,这样就能利用本地事务来保证在对这两个表的操作满足事务特性,并且使用了消息队列来保证最终一致性。
- 缺点:消息表耦合到业务系统中,杂活较多。
MQ事务消息,异步场景,通用性较强,拓展性较高。
- 第一阶段Prepared消息,会拿到消息的地址。第二阶段执行本地事务,第三阶段通过第一阶段拿到的地址去访问消息,并修改状态。
- 也就是说在业务方法内要想消息队列提交两次请求,一次发送消息和一次确认消息。如果确认消息发送失败了RabbitMQ会定期扫描消息集群中的事务消息,这时候发现了Prepared消息,它会向消息发送者确认,所以生产方需要实现一个check接口,RabbitMQ会 根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。
生产可靠性
- 为了保证数据一定发送到MQ中
- 在同一事务中,增加一个冗余表(字段一般为消息内容和消息状态)的记录订单数据每条数据是否发送成功的状态
- 然后利用RabbitMQ提供的publish/confirm机制开启确认机制后,如果消息正常发送到MQ就会获取到回执消息,然后把发送状态改成已发送状态
- 如果
使用消息确认机制,给予生产者的消息回执,来确保生产者的可靠性。
若消息确认机制收到的是一个未传输到MQ中的回执消息,则使用一个定时器,每隔一定时间便往MQ中进行投递,这样保证了消息一定可以投递到消息队列,然后进行数据修改。
// 被@PostConstruct修饰的方法会在服务器加载servlet的时候运行,并且只会被服务器执行一次,
// 在构造函数之后执行,在init()之前执行
@PostConstruct
public void regCallback() {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String orderId = correlationData.getId();
if (!ack) {
System.out.println("消息传输失败");
return ;
}
try {
// 更新数据库(sql, orderId)
}
catch (Exception e) {
e.printStackTrace();
}
}
});
}
消费可靠性
如果消息队列已经到达了消息队列中,那么消费者可以直接从消息队列中取得数据,但是当一直没有想要的数据的时候,会产生死循环。
解决方案
- 控制死循环的次数 + 死信队列(
retry
) try+catch
+手动ack(acknowledge-mode:manual
)try+catch
+手动ack+死信队列处理
手动ack
/* 手动ack告诉mq消息正常消费 */
channel.basicAck(tag, false);
/**
* 第二个false表示当发生了nack时不重发
*/
channel.basicNAck(tag, false, false);
@RabbitListener(queues = {"", ""})
public void consumerDoAck(String data, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)
throws IOException {
System.out.println("consumerDoAck: " + data);
if (data.contains("success")) {
// RabbitMQ的ack机制中,第二个参数返回true,表示需要将这条消息投递给其他的消费者重新消费
channel.basicAck(deliveryTag, false);
} else {
// 第三个参数true,表示这个消息会重新进入队列
channel.basicNack(deliveryTag, false, true);
}
}