前言
由于生产者和消费者不直接通信,生产者只负责把消息发送到队列,消费者负责从队列获取消息,消息被"消费"后,是需要从队列中删除的,那怎么确认消息被"成功消费"了呢?
消费者确认
在自动确认模式中,消息在发送到消费者后即被认为"成功消费",这种模式可以降低吞吐量(只要消费者可以跟上),以降低交付和消费者处理的安全性,这种模式通常被称为“即发即忘”,与手动确认模型不同,如果消费者的TCP连接或通道在真正的"成功消费"之前关闭,则服务器发送的消息将丢失。因此,自动消息确认应被视为不安全,并不适用于所有工作负载.
消费者过载
手动确认模式通常与有界信道预取(BasicQos方法)一起使用,该预取限制了信道上未完成(“进行中”)的消息数量。自动确认没有这种限制。因此消费者可能会被消息的发送速度所淹没,可能会导致消息积压并耗尽堆,或使操作系统终止其进程。
案例说明
分布式系统分布式事务解决方案——配送中心篇
dispatcher 配送中心系统相关配置增加
①. 配送中心系统pom.xml依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>2.2.3</version>
<classifier>jdk15</classifier><!-- jdk版本 -->
</dependency>
</dependencies>
②. application.yml 文件增加MQ的连接配置
spring:
rabbitmq:
host: 127.0.0.1
username: wpf2
password: 123
port: 5672
virtual-host: test_host
③ . 建立 Order 实体类,与订单服务系统中一致,主要用于接收消息的json转换
@Data
public class Order {
// 订单ID
private int orderId;
// 用户名
private String userName;
// 商品内容
private String context;
// 购买数量
private int num;
}
整体项目结构调整如下:
消费者:普通消费
1. 编写 OrderMQConsumer 消费者类,监听订单队列,进行消息的消费
/**
*
* @RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用
* @RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理
* 具体使用哪个方法处理,根据 MessageConverter转换后的参数类型 (message的类型匹配到哪个方法就交给哪个方法处理)
*/
// 监听order_confirm_fanoutQueue队列,该队列在订单服务系统声明创建的
@Component
@RabbitListener(queues = {"order_confirm_fanoutQueue"})
public class OrderMQConsumer {
@Autowired
private DispatchService dispatchService;
private int count=1;
// 接收消息
@RabbitHandler
public void receiveMess(String message, Channel channel, CorrelationData correlationData
, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
System.out.println("接收到订单消息:"+message+",count:"+count++);
// 2.获取订单信息:mq消息存的是json格式,需要转换回来
Order order = (Order) JSONObject.toBean(JSONObject.fromObject(message), Order.class);
String orderId = order.getOrderId()+"";
// 3.保存运单
dispatchService.dispatch(orderId);
}
}
2. 由于在上篇博文中,我们已经往消息队列中投递了2条消息,因此这里直接消费即可,不需要再启动订单系统的创建订单方法了
3. 查看数据库配送表结果
4. 查看图形化界面,队列消息的消费情况
➳ 结论:在消费者类中读取队列中的消息,根据订单ID,调用dispatchService.dispatch(orderId) 方法,往配送表插入了2条数据。
☁ 思考:MQ服务器宕机或其他因素,导致数据未入库怎么办?
由运行结果可知,如果接收消息时发生异常,会触发服务器的重试机制,陷入死循环!如果是集群模式下,会造成MQ服务器奔溃,引发磁盘和内存消耗殆尽,知道服务器宕机为止!
消费者:可靠消费
- 控制重试的次数 + 死信队列
- try/catch + 手动ack
- try/catch + 手动ack + 死信队列处理 + 人工干预
▎ 方式一:配置消息重试次数
1. 在application.yml文件中配置MQ重试次数,如下
spring:
rabbitmq:
host: 127.0.0.1
username: wpf2
password: 123
port: 5672
virtual-host: test_host
listener:
simple:
retry:
enabled: true # 开启重试,默认是false关闭状态
max-attempts: 3 # 最大重试次数,默认是3次
initial-interval: 2000ms # 每次重试间隔时间
2. 启动订单服务,让其创建一条订单,并投递到消息队列
3. 查看消息投递结果
4. 启动配送中心服务,进行消息消费
5. 查看运行结果
6. 图形化队列消息结果:达到最大重试次数后,队列中的消息被抛弃,无法再次捞回
➳ 结论:由于我们配置了重试次数,因此消费消息时,即使发生了异常,也不会陷入死循环,不断的重试,最终导致系统奔溃,cpu飙升等情况
▎ 方式二:try/catch + 手动确认消息
1. 配送中心系统的application.yml 配置文件中配置开启手动ack
# 参数说明:none 不确认 auto 自动确认 manual 手动确认
acknowledge-mode: manual
注意:之前的配置的重试策略的参数可以去除掉了,重试策略本质也是针对消息的确认
即使没把重试的参数配置删除,也不会生效的,如果开启了手动ack
2. 接收消息代码如下
// 接收消息
@RabbitHandler
public void receiveMess(String message, Channel channel, CorrelationData correlationData
, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
try {
// 1.获取消息队列的消息
System.out.println("接收到订单消息:"+message+",count:"+count++);
// 2.获取订单信息:mq消息存的是json格式,需要转换回来
Order order = (Order) JSONObject.toBean(JSONObject.fromObject(message), Order.class);
String orderId = order.getOrderId()+"";
// 3.保存运单
dispatchService.dispatch(orderId);
System.out.println(1/0); // 出现异常
// 4.收到ack告诉mq消息已经正常消费
channel.basicAck(tag,false);
}catch (Exception e){
// 如果出现异常的情况下,根据实际的情况去进行重发
/** @param1 : 传递标签,消息的tag
* @param2 : 确认一条消息还是多条, false:只确认e.DeliverTag这条消息 true:确认小于等于e.DeliverTag的所有消息
* @param3 : 消息失败了是否进行重发
* false:消息直接丢弃,不重发,如果绑定了死信队列,则消息打入死信队列
* true:重发,设置为true,就不要加到try/catch代码中,否则会进入重发死循环
*/
channel.basicNack(tag,false,false);
}
}
了解三个代码关键处:
☛ 流程解析:
- 发生异常,流程进入到catch块
- channel.basicAck(tag,false); ,第一个参数是消息的标签,第二个参数是确认一条消息还是多条,我们设置的是false,表示只确认当前处理的这条消息,确认消费成功了
- catch块是针对消息处理的策略,准备如何处理这条消息?直接抛异常丢弃消息,还是触发消息的重新发送,具体需要根据业务进行处理
!! 注意:用了try/catch ,yml 配置了重试次数没有意义,try/catch会屏蔽掉重试策略!!
➳ 选择建议:用了try/catch ,就不要使用try/catch,二选其一即可
▎ 方式三:死信队列配置
由于我们的队列绑定和声明是在订单服务中完成的,因此需要修改订单系统的代码,配送中心的代码保持方式二中的消费配置,不需要修改。
1. 声明创建死信组件
@Configuration
public class Order_RabbitConfiguration {
// 1.配置死信交换机、队列
@Bean
public FanoutExchange getDeadExchange(){
return new FanoutExchange("dead_order_fanoutExchange",true,false);
}
@Bean
public Queue getDeadQueue(){
return new Queue("dead_order_fanoutQueue",true);
}
@Bean
public Binding getDeadBinding(){
return BindingBuilder.bind(getDeadQueue()).to(getDeadExchange());
}
// 2.配置普通队列
@Bean
public FanoutExchange getExchange(){
return new FanoutExchange("order_confirm_fanoutExchange",true,false);
}
@Bean
public Queue getQueue(){
// 设置消息接盘侠:队列已满、消息拒收、消息异常 等情况,该条消息就会被重新路由到死信队列
Map<String,Object> args = new HashMap<>();
args.put("x-dead-letter-exchange","dead_order_fanoutExchange");
return new Queue("order_confirm_fanoutQueue",true,false,false,args);
}
@Bean
public Binding getBinding(){
return BindingBuilder.bind(getQueue()).to(getExchange());
}
}
2. 去图形化界面中,删除原来的队列,由于我们增加了队列的接盘侠,因此重新设置属性的话,需要删除原来的队列,重新创建,否则启动会报异常
!! 注意:生产环境不建议这么做,最好是重新创建新的队列进行绑定,生产者路由到新队列中
3. 运行创建订单接口
4. 图形化界面查看, 消息投递情况和队列创建信息
5. 消费的代码不变,同样是try/catch + 手动ack模式,并且制造一个异常
6. 启动配送中心系统
7. 查看图形化结果
监听死信队列
在配送中心系统,新建死信队列的消费者
@Component
@RabbitListener(queues = {"dead_order_fanoutQueue"}) // 监听死信队列
public class DeadOrderMQConsumer {
@Autowired
private DispatchService dispatchService;
// 接收消息:代码与订单消费者一致
@RabbitHandler
public void receiveMess(String message, Channel channel, CorrelationData correlationData
, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
try {
// 1.获取消息队列的消息
System.out.println("消息进入到了死信队列,开始处理异常消息:"+message);
// 2.获取订单信息:mq消息存的是json格式,需要转换回来
Order order = (Order) JSONObject.toBean(JSONObject.fromObject(message), Order.class);
String orderId = order.getOrderId()+"";
// 3.保存运单
dispatchService.dispatch(orderId);
// 4.收到ack告诉mq消息已经正常消费
channel.basicAck(tag,false);
}catch (Exception e){
// 由于消息进入了死信队列,说明消息有异常,需要采取新的措施来处理这条消息
// 比如人为进行处理,同时也需要从队列中移除这条消息,防止死信队列堆积
System.out.println("人工干预");
System.out.println("发送邮件、短信通知技术人员等");
System.out.println("将消息存入其他DB库,技术人员好根据消息排查");
// 同样也要Nack这条消息,保障死信队列不会产生消息堆积
channel.basicNack(tag,false,false);
}
}
}
2. 重新启动配送系统
3. 查看图形化界面
➳ 结论:由上图可知,死信队列的消息被正常消费成功了,从队列中移除。死信队列的消费代码与订单消费者一致,只是在catch块的处理消息策略,需要额外增加其他处理机制
其他问题
yml 配置手动ACK后,消费时没有进行消息确认,会导致重复消费
1. 配置了2种
2. 消费代码如下,虽然配置了手动ack参数,但是代码中并没有手动确认
@RabbitHandler
public void receiveMess(String message, Channel channel, CorrelationData correlationData
, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
System.out.println("接收到订单消息:"+message+",count:"+count++);
// 2.获取订单信息:mq消息存的是json格式,需要转换回来
Order order = (Order) JSONObject.toBean(JSONObject.fromObject(message), Order.class);
String orderId = order.getOrderId()+"";
// 3.保存运单
dispatchService.dispatch(orderId);
}
3. 控制台打印成功消费的信息,但是队列中消息并不会移除,而是从ready就绪状态,变更为未应答状态,重启项目,又会再次重复消费,直到有手动ack的消费者,将这条消息消费掉
!! 注意:由于执行了dispatchService.dispatch(orderId);,导致数据库创建了多条 99087的数据,因此需要注意,如果yml配置了手动确认ack,但代码消费时并没有确认消息就会造成重复消费!
写在最后:生产环境可靠消费注意事项
例如订单服务投递消息成功了,但由于MQ服务器宕机,订单服务未及时收到消息投递的回执结果,触发消息的重试机制,消息被二次投递,实际消息队列中存在多条同一个订单消息记录。
➳ 结论:消费者在消费消息时,要保证数据的幂等性,不能重复消费同一个订单。
基于MQ的分布式事务解决方案总结
优点:
-
通用性强
-
拓展方便
-
耦合度低,方案也比较成熟
缺点:
-
基于消息中间件,只适合异步场景
-
消息会延迟处理,需要业务上能够容忍