0
点赞
收藏
分享

微信扫一扫

12-17章RabbitMQ消息Confirm模式——动力节点RabbitMQ笔记

12.RabbitMQ消息Confirm模式

12.1 Confirm模式简介

消息的confirm确认机制,是指生产者投递消息后,到达了消息服务器Broker里面的exchange交换机,则会给生产者一个应答,生产者接收到应答,用来确定这条消息是否正常的发送到Broker的exchange中,这也是消息可靠性投递的重要保障;

12.2 具体代码设置

1 配置文件application.yml 开启确认模式:spring.rabbitmq.publisher-confirm-type=_correlated_2 写一个类实现implements RabbitTemplate.ConfirmCallback,判断成功和失败的ack结果,可以根据具体的结果,如果ack为false,对消息进行重新发送或记录日志等处理;设置rabbitTemplate的确认回调方法3 rabbitTemplate.setConfirmCallback(messageConfirmCallBack);

  参考代码:

| @Component public class MessageConfirmCallBack implements RabbitTemplate.ConfirmCallback {       /**      * 交换机收到消息后,会回调该方法      *      * @param correlationData  相关联的数据      * @param ack  有两个取值,true和false,true表示成功:消息正确地到达交换机,反之false就是消息没有正确地到达交换机      * @param cause 消息没有正确地到达交换机的原因是什么      */     @Override     public void confirm(CorrelationData correlationData, boolean ack, String cause) {         System.out.println("correlationData = " + correlationData);         System.out.println("ack = " + ack);         System.out.println("cause = " + cause);           if (ack) {             //正常         } else {             //不正常的,可能需要记日志或重新发送         }     }

}

  发消息参考代码

| @Service public class MessageService {       @Resource     private RabbitTemplate rabbitTemplate;       @Resource     private MessageConfirmCallBack messageConfirmCallBack;       @PostConstruct //bean在初始化的时候,会调用一次该方法,只调用一次,起到初始化的作用     public void init() {         rabbitTemplate.setConfirmCallback(messageConfirmCallBack);     }       /**      * 发送消息      */     public void sendMessage() {         //关联数据对象         CorrelationData correlationData = new CorrelationData();         correlationData.setId("O159899323"); //比如设置一个订单ID,到时候在confirm回调里面,你就可以知道是哪个订单没有发送到交换机上去         rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE + 123, "info", "hello", correlationData);         System.out.println("消息发送完毕......");     } }

 

两个示例:可以在发消息时直接实现接口

13. RabbitMQ消息Return模式

rabbitmq 整个消息投递的路径为: producer —> exchange —> queue —> consumer  

spring.rabbitmq.publisher-returns: true

使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到 queue失败后,则会将消息退回给producer,并执行回调函数returnedMessage;

|   @Component public class MessageReturnCallBack implements RabbitTemplate.ReturnsCallback {       /**      * 当消息从交换机 没有正确地 到达队列,则会触发该方法      * 如果消息从交换机 正确地 到达队列了,那么就不会触发该方法      *      * @param returned      */     @Override     public void returnedMessage(ReturnedMessage returned) {         System.out.println("消息return模式:" + returned);     }

}

参考发送代码

| @Service public class MessageService {       @Resource     private RabbitTemplate rabbitTemplate;       @Resource     private MessageReturnCallBack messageReturnCallBack;       @PostConstruct //bean在初始化的时候,会调用一次该方法,只调用一次,起到初始化的作用     public void init() {         rabbitTemplate.setReturnsCallback(messageReturnCallBack);     }       /**      * 发送消息      */     public void sendMessage() {         rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE, "info123", "hello");         System.out.println("消息发送完毕......");     } }

 

代码二,发送消息时直接实现RabbitTemplate.ReturnsCallback

| @Service public class MessageService implements RabbitTemplate.ReturnsCallback {       @Resource     private RabbitTemplate rabbitTemplate;       @PostConstruct //bean在初始化的时候,会调用一次该方法,只调用一次,起到初始化的作用     public void init() {         rabbitTemplate.setReturnsCallback(this);     }       /**      * 当消息从交换机 没有正确地 到达队列,则会触发该方法      * 如果消息从交换机 正确地 到达队列了,那么就不会触发该方法      *      * @param returned      /     @Override     public void returnedMessage(ReturnedMessage returned) {         System.out.println("消息return模式:" + returned);     }       /*      * 发送消息      */     public void sendMessage() {         rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE, "info123", "hello");         System.out.println("消息发送完毕......");     } }

 

  代码参考3 同时实现RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback

|   @Service public class MessageService implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {       @Resource     private RabbitTemplate rabbitTemplate;       @PostConstruct //bean在初始化的时候,会调用一次该方法,只调用一次,起到初始化的作用     public void init() {         rabbitTemplate.setConfirmCallback(this);         rabbitTemplate.setReturnsCallback(this);     }       /**      * 交换机收到消息后,会回调该方法      *      * @param correlationData  相关联的数据      * @param ack  有两个取值,true和false,true表示成功:消息正确地到达交换机,反之false就是消息没有正确地到达交换机      * @param cause 消息没有正确地到达交换机的原因是什么      /     @Override     public void confirm(CorrelationData correlationData, boolean ack, String cause) {         System.out.println("correlationData = " + correlationData);         System.out.println("ack = " + ack);         System.out.println("cause = " + cause);           if (ack) {             //正常         } else {             //不正常的         }     }       /*      * 当消息从交换机 没有正确地 到达队列,则会触发该方法      * 如果消息从交换机 正确地 到达队列了,那么就不会触发该方法      *      * @param returned      /     @Override     public void returnedMessage(ReturnedMessage returned) {         System.out.println("消息return模式:" + returned);     }       /*      * 发送消息      */     public void sendMessage() {         //关联数据对象         CorrelationData correlationData = new CorrelationData();         correlationData.setId("O159899323"); //比如设置一个订单ID,到时候在confirm回调里面,你就可以知道是哪个订单没有发送到交换机上去         rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE, "info123", "hello", correlationData);         System.out.println("消息发送完毕......");     } }

 

14. RabbitMQ交换机详细属性

14.1具体参数

1、Name:交换机名称;就是一个字符串 2、Type:交换机类型,direct, topic, fanout, headers四种 3、Durability:持久化,声明交换机是否持久化,代表交换机在服务器重启后是否还存在; 4、Auto delete:是否自动删除,曾经有队列绑定到该交换机,后来解绑了,那就会自动删除该交换机; 5、Internal:内部使用的,如果是yes,客户端无法直接发消息到此交换机,它只能用于交换机与交换机的绑定。 6、Arguments:只有一个取值alternate-exchange,表示备用交换机;

14.2代码演示

结论1:没发消息之前不会创建交换机和对列 结论2:发消息后,如果交换机不存在,才开始创建交换机,如果队列不存在,则创建新的对列 结论3:创建交换机或者队列完成后再重新创建,如果修改交换机或队列参数则会报错 406错误(inequivalent arg 'durable' for exchange 'exchange.durability' in vhost 'powernode': received 'false' but current is 'true', class-id=40, method-id=10)) 结论4:设置持久化为false ,重启rabbitmq-server,则交换机丢失,实验durable参数,先看下控制台,然后重启rabbitmq-server 结论5:实验自动删除为 true ,从控制台上手动解绑,会发现自动删除

14.3 备用交换机

14.3.1 备用交换机使用场景

当消息经过交换器准备路由给队列的时候,发现没有对应的队列可以投递信息,在rabbitmq中会默认丢弃消息,如果我们想要监测哪些消息被投递到没有对应的队列,我们可以用备用交换机来实现,可以接收备用交换机的消息,然后记录日志或发送报警信息。

14.3.2 主要代码和注意事项

备用交换机示例如下: 注意:备用交换机一般使用fanout交换机 测试时:指定一个错误路由 重点:普通交换机设置参数绑定到备用交换机

| Map<String, Object> arguments = new HashMap<>(); //指定当前正常的交换机的备用交换机是谁 arguments.put("alternate-exchange", EXCHANGE_ALTERNATE); //DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) return new DirectExchange(EXCHANGE, true, false, arguments);

//return ExchangeBuilder.directExchange(EXCHANGE).withArguments(args).build();

14.3.3 参考配置代码

| @Configuration public class RabbitConfig {       //交换机的名字,就是一个字符串     public static final String EXCHANGE = "exchange";     //队列的名字,就是一个字符串     public static final String QUEUE = "queue";     //定义的一个路由键     public static final String INFO = "info";       //-------------------------------------------     //交换机的名字,就是一个字符串     public static final String EXCHANGE_ALTERNATE = "exchange.alternate";     //队列的名字,就是一个字符串     public static final String QUEUE_ALTERNATE = "queue.alternate";     //定义的一个路由键     public static final String ALTERNATE = "alternate";       @Bean     public DirectExchange directExchange() {         Map<String, Object> arguments = new HashMap<>();         arguments.put("alternate-exchange", EXCHANGE_ALTERNATE); //指定当前正常的交换机的备用交换机是谁         //DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)         return new DirectExchange(EXCHANGE, true, false, arguments);     }       /**      * 声明一个队列      *      * @return      /     @Bean     public Queue queue() {         return QueueBuilder.durable(QUEUE).build();     }       /*      * @Qualifier 限定bean的名字是 directExchange 的Bean      *      * @param directExchange      * @return      /     @Bean     public Binding binding(DirectExchange directExchange, Queue queue) {         return BindingBuilder.bind(queue).to(directExchange).with(INFO);     }       //-----------------------------------------       /*      * 备用交换机需要用Fanout交换机;      *      * @return      */     @Bean     public FanoutExchange alternateExchange() {         //DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)         return new FanoutExchange(EXCHANGE_ALTERNATE, true, false);     }     @Bean     public Queue alternateQueue() {         return QueueBuilder.durable(QUEUE_ALTERNATE).build();     }     @Bean     public Binding alternateBnding(FanoutExchange alternateExchange, Queue alternateQueue) {         return BindingBuilder.bind(alternateQueue).to(alternateExchange);     } }

 

14.3.4 参考发送消息代码

| @Service public class MessageService {       @Resource     private RabbitTemplate rabbitTemplate;       /**      * 发送消息      */ public void sendMessage() { //我们故意写错路由key,由于我们正常交换机设置了备用交换机,所以该消息就会进入备用交换机 //从而进入备用对列,我们可以写一个程序接收备用对列的消息,接收到后通知相关人员进行处理 //如果正常交换机没有设置备用交换机,则该消息会被抛弃。           rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE, "info1223", "hello");         System.out.println("消息发送完毕......");     } }

 

15. RabbitMQ队列详细属性

15.1 具体参数

Type:队列类型 Name:队列名称,就是一个字符串,随便一个字符串就可以; Durability:声明队列是否持久化,代表队列在服务器重启后是否还存在; Auto delete: 是否自动删除,如果为true,当没有消费者连接到这个队列的时候,队列会自动删除; Exclusive:exclusive属性的队列只对首次声明它的连接可见,并且在连接断开时自动删除; 基本上不设置它,设置成false Arguments:队列的其他属性,例如指定DLX(死信交换机等); 1、x-expires:Number 当Queue(队列)在指定的时间未被访问,则队列将被自动删除; 2、x-message-ttl:Number 发布的消息在队列中存在多长时间后被取消(单位毫秒); 3、x-overflow:String 设置队列溢出行为,当达到队列的最大长度时,消息会发生什么,有效值为Drop Head或Reject Publish; 4、x-max-length:Number 队列所能容下消息的最大长度,当超出长度后,新消息将会覆盖最前面的消息,类似于Redis的LRU算法;   5、 x-single-active-consumer:默认为false 激活单一的消费者,也就是该队列只能有一个消息者消费消息; 6、x-max-length-bytes:Number 限定队列的最大占用空间,当超出后也使用类似于Redis的LRU算法; 7、x-dead-letter-exchange:String 指定队列关联的死信交换机,有时候我们希望当队列的消息达到上限后溢出的消息不会被删除掉,而是走到另一个队列中保存起来; 8.x-dead-letter-routing-key:String 指定死信交换机的路由键,一般和6一起定义; 9.x-max-priority:Number 如果将一个队列加上优先级参数,那么该队列为优先级队列; (1)、给队列加上优先级参数使其成为优先级队列 x-max-priority=10【0-255取值范围】 (2)、给消息加上优先级属性 通过优先级特性,将一个队列实现插队消费;

| MessageProperties messageProperties=new MessageProperties();

messageProperties.setPriority(8);

  10、x-queue-mode:String(理解下即可) 队列类型x-queue-mode=lazy懒队列,在磁盘上尽可能多地保留消息以减少RAM使用,如果未设置,则队列将保留内存缓存以尽可能快地传递消息; 11、x-queue-master-locator:String(用的较少,不讲) 在集群模式下设置队列分配到的主节点位置信息; 每个queue都有一个master节点,所有对于queue的操作都是事先在master上完成,之后再slave上进行相同的操作; 每个不同的queue可以坐落在不同的集群节点上,这些queue如果配置了镜像队列,那么会有1个master和多个slave。 基本上所有的操作都落在master上,那么如果这些queues的master都落在个别的服务节点上,而其他的节点又很空闲,这样就无法做到负载均衡,那么势必会影响性能; 关于master queue host 的分配有几种策略,可以在queue声明的时候使用x-queue-master-locator参数,或者在policy上设置queue-master-locator,或者直接在rabbitmq的配置文件中定义queue_master_locator,有三种可供选择的策略: (1)min-masters:选择master queue数最少的那个服务节点host; (2)client-local:选择与client相连接的那个服务节点host; (3)random:随机分配;

15.2 参考代码

|   @Configuration public class RabbitConfig {       public static final String EXCHANGE = "exchange";     public static final String QUEUE = "queue";     public static final String KEY = "info";       QueueBuilder builder;       @Bean     public DirectExchange directExchange() {         return ExchangeBuilder.directExchange(EXCHANGE).build();     }       @Bean     public Queue queue() {         Map<String, Object> arguments = new HashMap<>();         //arguments.put("x-expires", 5000);           //arguments.put("x-max-length", 5);         //arguments.put("x-overflow", "reject-publish");           arguments.put("x-single-active-consumer", false); //TODO ???           //arguments.put("x-max-length-bytes", 20); // 单位是字节           //arguments.put("x-max-priority", 10); // 0-255 //表示把当前声明的这个队列设置成了优先级队列,那么该队列它允许消息插队             //将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM内存的使用,如果未设置,队列将保留内存缓存以尽可能快地传递消息;         //有时候我们把这种队列叫:惰性队列         //arguments.put("x-queue-mode", "lazy");           //设置队列版本。默认为版本1。         //版本1有一个基于日志的索引,它嵌入了小消息。         //版本2有一个不同的索引,可以在许多场景中提高内存使用率和性能,并为以前嵌入的消息提供了按队列存储。         //arguments.put("x-queue-version", 2);           // x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。         //arguments.put("x-queue-master-locator", QueueBuilder.LeaderLocator.clientLocal.getValue());             //-------------------------         //arguments.put("x-expires", 10000); //自动过期,10秒         //arguments.put("x-message-ttl", 10000); //自动过期,10秒,不会删除队列         //QueueBuilder 类里面有定义,设置队列溢出行为,当达到队列的最大长度时消息会发生什么,有效值是drop-head、reject-publish         //arguments.put("x-max-length", 5);         //arguments.put("x-overflow", QueueBuilder.Overflow.dropHead.getValue());           //表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)         //arguments.put("x-single-active-consumer", true);           // x-max-length-bytes,队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息;         //arguments.put("x-max-length-bytes", 10);           //参数是1到255之间的正整数,表示队列应该支持的最大优先级,数字越大代表优先级越高,没有设置priority优先级字段,那么priority字段值默认为0;如果优先级队列priority属性被设置为比x-max-priority大,那么priority的值被设置为x-max-priority的值。         //arguments.put("x-max-priority", 10);           //将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息;         //arguments.put("x-queue-mode", "lazy");           arguments.put("x-queue-version", 2);           // x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。         arguments.put("x-queue-master-locator", QueueBuilder.LeaderLocator.clientLocal.getValue());         //---------------------------------------------             // Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)         return new Queue(QUEUE, true, false, false, arguments);     }       @Bean     public Binding binding(DirectExchange directExchange, Queue queue) {         return BindingBuilder.bind(queue).to(directExchange).with(KEY);     } }

 

实验durable 参数 重启rabbitmq-server,队列丢失 实验autodelete参数:加入接收者,发现停掉服务,那么久没有消费者了,对列就会自动删除

16. 消息可靠性投递

消息的可靠性投递就是要保证消息投递过程中每一个环节都要成功,那么这肯定会牺牲一些性能,性能与可靠性是无法兼得的; 如果业务实时一致性要求不是特别高的场景,可以牺牲一些可靠性来换取性能。 ① 代表消息从生产者发送到Exchange; ② 代表消息从Exchange路由到Queue; ③ 代表消息在Queue中存储; ④ 代表消费者监听Queue并消费消息; 1、确保消息发送到RabbitMQ服务器的交换机上 可能因为网络或者Broker的问题导致①失败,而此时应该让生产者知道消息是否正确发送到了Broker的exchange中; 有两种解决方案: 第一种是开启Confirm(确认)模式;(异步) 第二种是开启Transaction(事务)模式;(性能低,实际项目中很少用)   2、确保消息路由到正确的队列 可能因为路由关键字错误,或者队列不存在,或者队列名称错误导致②失败。 使用return模式,可以实现消息无法路由的时候返回给生产者; 当然在实际生产环境下,我们不会出现这种问题,我们都会进行严格测试才会上线(很少有这种问题); 另一种方式就是使用备份交换机(alternate-exchange),无法路由的消息会发送到这个备用交换机上; 3、确保消息在队列正确地存储 可能因为系统宕机、重启、关闭等等情况导致存储在队列的消息丢失,即③出现问题; 解决方案: (1)、队列持久化 代码:

QueueBuilder.durable(QUEUE).build();

(2)、交换机持久化 代码:

ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();

(3)、消息持久化 代码: 默认持久化

|  MessageProperties messageProperties = new MessageProperties(); //设置消息持久化,当然它默认就是持久化,所以可以不用设置,可以查看源码

 messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);

(4)、集群,镜像队列,高可用 (5)确保消息从队列正确地投递到消费者 采用消息消费时的手动ack确认机制来保证; 如果消费者收到消息后未来得及处理即发生异常,或者处理过程中发生异常,会导致④失败。 为了保证消息从队列可靠地达到消费者,RabbitMQ提供了消息确认机制(message acknowledgement); #开启手动ack消息消费确认 spring.rabbitmq.listener.simple.acknowledge-mode=manual 消费者在订阅队列时,通过上面的配置,不自动确认,采用手动确认,RabbitMQ会等待消费者显式地回复确认信号后才从队列中删除消息; 如果消息消费失败,也可以调用basicReject()或者basicNack()来拒绝当前消息而不是确认。如果requeue参数设置为true,可以把这条消息重新存入队列,以便发给下一个消费者(当然,只有一个消费者的时候,这种方式可能会出现无限循环重复消费的情况,可以投递到新的队列中,或者只打印异常日志);

17. 消息的幂等性

消息消费时的幂等性(消息不被重复消费) 同一个消息,第一次接收,正常处理业务,如果该消息第二次再接收,那就不能再处理业务,否则就处理重复了; 幂等性是:对于一个资源,不管你请求一次还是请求多次,对该资源本身造成的影响应该是相同的,不能因为重复的请求而对该资源重复造成影响; 以接口幂等性举例: 接口幂等性是指:一个接口用同样的参数反复调用,不会造成业务错误,那么这个接口就是具有幂等性的; 注册接口; 发送短信验证码接口; 比如同一个订单我支付两次,但是只会扣款一次,第二次支付不会扣款,这说明这个支付接口是具有幂等性的; 如何避免消息的重复消费问题?(消息消费时的幂等性) 全局唯一ID + Redis 生产者在发送消息时,为每条消息设置一个全局唯一的messageId,消费者拿到消息后,使用setnx命令,将messageId作为key放到redis中:setnx(messageId, 1),若返回1,说明之前没有消费过,正常消费;若返回0,说明这条消息之前已消费过,抛弃; 具体代码参考以下代码; 参考代码:

|   //1、把消息的唯一ID写入redis         boolean flag = stringRedisTemplate.opsForValue().setIfAbsent("idempotent:" + orders.getId(), String.valueOf(orders.getId())); //如果redis中该key不存在,那么就设置,存在就不设置           if (flag) { //key不存在返回true             //相当于是第一次消费该消息             //TODO 处理业务             System.out.println("正常处理业务....." + orders.getId());         }

 

   

举报

相关推荐

Go实现RabbitMQ消息模式

0 条评论