0
点赞
收藏
分享

微信扫一扫

Event Loop——事件循环

1kesou 2023-09-30 阅读 47

目录

1、初识 RabbitMQ 消息队列

1.1 MQ 四大核心概念

1.2 消息的发送(无交换机态)

1.3 关于消息自动重新入队

1.3.1 消息的常见应答方法(R)

1.4 关于 RabbitMQ 的持久化、不公平分发以及预取值

2、RabbitMQ 消息的发布确认

2.1 MQ的单个确认发布

2.2 MQ的批量确认发布

2.3 MQ的异步确认发布(重点)

3、关于 Exchanges 交换机

4、死信队列(重点)

5、延迟队列(整合SpringBoot)

6、备份交换机(重点)


 什么是 RabbitMQ ?

1、初识 RabbitMQ 消息队列

1.1 MQ 四大核心概念

以下是 RabbitMQ 的原理图:

1.2 消息的发送(无交换机态)

这里使用MQ中间件进行简单的消息发送,大致流程图如下所示:

这里需要注意的是,当一次性有多条消息发送到队列时,这时需要多个消费者(工作线程),消费者进行消费信息是根据轮询的方式进行消费

创建一个Utils工具类,与 MQ 进行交互连接:

/**
 * 这里是与 MQ 交互的工具类
 */
public class RabbitMQUtils {

    public static Channel getChannel() throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();    //创建连接工厂

        factory.setHost("192.168.101.65");
        factory.setUsername("admin");
        factory.setPassword("123");

        Connection connection = factory.newConnection();    //创建连接

        return connection.createChannel();  //获取连接信道
    }
}

【消息生产者】代码如下所示:

/**
 * 生产者
 */
public class Produce {

    public static final String QUEUE_NAME = "hello";    //队列名称

    public static void main(String[] args) throws IOException, TimeoutException {

        //这里创建一个工厂,与 RabbitMQ 进行交互
        Channel channel01 = RabbitMQUtils.getChannel();

        //1.队列名称  2.队列是否持久化  3.消息是否供多个消费者消费  4.消息是否自动删除  5.其他参数
        channel01.queueDeclare(QUEUE_NAME,false,false,false,null);


        String message = "hello mq";    //发消息

        //1.对应的交换机  2.路由的KEY值(本次是队列名)   3.其他参数  4.发送消息的消息体
        channel01.basicPublish("",QUEUE_NAME,null,message.getBytes());

        System.out.println("消息发送完毕!");

    }
}

消息栏:

RabbitMQ 中(以上创建的 hello 队列):

【消息消费者】代码如下所示:

/**
 * 消费者
 */
public class Consumer {

    public static final String QUEUE_NAME = "hello";    //要进行消费消息的队列

    public static void main(String[] args) throws IOException, TimeoutException {

        //创建连接工厂,与MQ进行交互
        Channel channel = RabbitMQUtils.getChannel();

        //接收消息的回调
        DeliverCallback deliverCallback = (consumerTag,message)->{

            System.out.println("成功接收消息:"+new String(message.getBody()));    //接收其消息的消息体才能显示对应的消息
        };
        
        //取消消息时的回调
        CancelCallback cancelCallback = (consumerTag) ->{

            System.out.println(consumerTag + "消费者的消息被中断!");
        };
        
        /**
         * 1.要被消费信息的队列
         * 2.消费成功之后是否需要自动应答
         * 3.消费成功时的回调
         * 4.取消消息发送时的回调
         */
        //消费者消费信息
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

消息栏:

MQ 中的消息已经被消费:


1.3 关于消息自动重新入队

        如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息 未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队;如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者;这样,即使某个消费者偶尔死亡,也可以确 保不会丢失任何消息

1.3.1 消息的常见应答方法(R)

丢失的消息重新入队,传递给正常工作的消费者进行消费的大致图:

由于生产者的代码没有改变,这里就不写了,以下是消费者(两个消费者只有 sleep 的时间不一样)关于 ACK 手动应答消息的代码:

/**
 * 这里是消费者手动接受消息 ACK,使发送失败的消息重新排队
 */
public class Consumer01 {

    public static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = RabbitMQUtils.getChannel();

        SleepUtils.sleep(8);   //模拟消息多的情况

        //1、接收到消息的回调
        DeliverCallback deliverCallback = (consumerTag,message) ->{

            System.out.println("接收到消息:" + new String(message.getBody(), StandardCharsets.UTF_8));

            /**
             * 手动应答
             * 1. 消息的标记
             * 2. 是否批量应答信道中的消息
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };

        //2.消息中断的回调
        CancelCallback  cancelCallback = (consumerTag) -> {

            System.out.println(consumerTag + "消费者取消了消息的接收!");
        };

        //3.使用手动应答
        boolean autoACK = false;
        channel.basicConsume(QUEUE_NAME,autoACK,deliverCallback,cancelCallback);
    }
}

首先创建两个消费者,分别为C1和C2,这里生产者连续发送四条消息: 

消费者一 处于正常状态消费者二 接收了一条消息后就宕机了,这时,消费者一 将发送失败的消息从信道中取出并进行消费,结果图如下所示:

消费者一:         消费者二:

1.4 关于 RabbitMQ 的持久化、不公平分发以及预取值

队列的持久化:

平时消息队列都是保存在内存中,若 RabbitMQ服务 突然停止,则之前的队列都会消失;所以,为了减少损失的可能性,通常将消息队列保存到磁盘上,即持久化

boolean durable =true;  //将队列进行持久化
        //1.队列名称  2.队列是否持久化  3.消息是否供多个消费者消费  4.消息是否自动删除  5.其他参数
        channel01.queueDeclare(QUEUE_NAME,durable,false,false,null);


 

消息的持久化:

将 MessageProperties.PERSISTENT_TEXT_PLAIN 标识放入 basicPublish消息发送方法的第三个参数中,以开启消息持久化

将消息标记为持久化并不能完全保证不会丢失消息;尽管它告诉 RabbitMQ 将消息保存到磁盘,但是,这里依然存在当消息刚准备存储在磁盘的时候,还没有存储完,消息还在缓存的一个间隔点;此时并没有真正写入磁盘,持久性保证并不强

while (sc.hasNext()){

            String message = sc.next();    //发消息
            //1.对应的交换机  2.路由的KEY值(本次是队列名)   3.其他参数  4.发送消息的消息体
            channel01.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

            System.out.println("消息发送完毕!");
        }

不公平分发(在消费者处开启):

相对于轮询分发,不公平分发采用能者多劳的策略,谁干的快消息就先给谁发送,避免慢进程拖慢整个服务的进度

预取值:

不公平分发的值设置为1,若设置的数值大于1,则表示为预取值;所谓的预取值,是设置消费者缓冲信道中最大存储的数量;

比如:消费者C1设置预取值为2,消费者C2设置预取值为5,假设有8条消息进来时,C1有可能消费了3条,因为已经消费的消息不算入“预取值”内;而C2信道中存入5条消息,若这五条消息即使还未被C2消费,C1也不能将其消费,因为这5条消息已经放入C2的信道中进行等待排队了

 MQ 中:

可见,这里明确标明了对应消费者的预取值


2、RabbitMQ 消息的发布确认

在设置发布确认时,一般有三个步骤:

设置队列的持久化  --->>  设置队列中的消息进行持久化  --->>   通过MQ将消息保存在磁盘上,然后MQ跟生产者说明一声 “已经保存在磁盘上”(这里就是消息确认)

2.1 MQ的单个确认发布

定义:

这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布;如果没有确认发布的消息,就会阻塞所有后续消息的发布

这里是模拟消息单个确认的代码:

    /**
     * 单个消息确认发布
     */
    public static void SingleConfirmMessage () throws Exception{

        Channel channel = RabbitMQUtils.getChannel();

        String QUEUE_NAME = UUID.randomUUID().toString();

        channel.queueDeclare(QUEUE_NAME,durable,false,false,null);  //开启队列的持久化

        //开启消息的发布确认
        channel.confirmSelect();

        long begin = System.currentTimeMillis();    //开始时间

        for (int i =0;i<MESSAGE_COUNT;i++){
            String message = i + "";

            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());

            //消息单个确认,发送成功一次就确认一次
            boolean singleRes = channel.waitForConfirms();
            if(singleRes) {
                System.out.println("消息发布成功!");
            }
        }

        long end = System.currentTimeMillis();    //结束时间

        System.out.println("总耗时为:"+(end-begin)+"ms");
    }

运行结果:

可见,总耗时时间为 1269 ms,虽然保证了消息的可靠性,但是性能下来了,需要一条条确认

2.2 MQ的批量确认发布

定义:

这是也一种同步确认发布消息的方式;先发布一批消息,然后一起确认可以极大地提高吞吐量;当然这种方式的缺点就是:由于是批量确认发布,当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息,而后重新发布消息

这里是模拟批量确认发布的代码:

    /**
     * 批量消息确认发布
     */
    public static void BatchConfirmMessage() throws Exception{

        Channel channel = RabbitMQUtils.getChannel();

        String QUEUE_NAME = UUID.randomUUID().toString();

        channel.queueDeclare(QUEUE_NAME,durable,false,false,null);  //开启队列的持久化

        //开启消息的发布确认
        channel.confirmSelect();

        long begin = System.currentTimeMillis();    //开始时间

        int batchSize = 100;    //批量确认的长度

        for (int i=0;i<MESSAGE_COUNT;i++){
            String message = i + "";

            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());

            //每发送一百条数据,就进行批量发布确认
            if(i % batchSize == 0){
                boolean batchRes = channel.waitForConfirms();
                if(batchRes) {
                    System.out.println("批量发送消息成功!");
                }
            }
        }

        long end = System.currentTimeMillis();    //结束时间
        System.out.println("总耗时为:"+(end-begin)+"ms");
    }

运行结果:

可见,总耗时为 199 ms, 相比于单个确认发布,在性能方面有了很大的提升,但是容错率相对来说就升高了,因为由于批量,很难确定是哪一条消息出现了错误

2.3 MQ的异步确认发布(重点)

定义:

很显然,这是一种异步确认发布消息的方式,异步虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说;它是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功

大致流程图:

异步发送消息时,不需要等待当前消息经过确认后才能将之后的消息发送出去;我们要做的只是发布消息,其余的交给 broker 中间人处理;而最终的消息是否发布成功,取决于之后的回调确认消息的函数,由于每一个发出去的消息都有 KEY VALUE ,因此,我们能很快的找到对应发送失败的消息

存在问题:

解决方案:

这里是模拟异步确认发布的代码:

   /**
     * 异步确认发布消息
     */
    public static void AsynchronousConfirmMessage() throws Exception{

        Channel channel = RabbitMQUtils.getChannel();

        String QUEUE_NAME = UUID.randomUUID().toString();

        channel.queueDeclare(QUEUE_NAME,durable,false,false,null);  //开启队列的持久化

        //1.开启消息的发布确认
        channel.confirmSelect();


        /**
         * 线程安全有序的一个哈希表,适用于高并发的情况
         * 【优点】:
         * 1.将消息与对应的序号相关联
         * 2.批量的根据序号删除条目
         * 3.支持高并发
         */
        ConcurrentSkipListMap<Long,String> concurrentSkipListMap
                = new ConcurrentSkipListMap<>();

        //2.这里准备消息监听器,以便于监听消息的成功与否 (delivery:消息的编号,multiple:用来判断是否为批量)
        ConfirmCallback ACK_callback = (deliveryTag,multiple) ->{   //消息确认成功 回调函数

            //2.2【第二步】若是批量发消息,则进行批量的删除
            //【注意这里只有已经确认的消息,不会干扰到未确认的消息】
            if(multiple) {
                ConcurrentNavigableMap<Long, String> concurrentNavigableMap
                        = concurrentSkipListMap.headMap(deliveryTag);

                concurrentNavigableMap.clear();
            }else {
                //2.3 若是单个发消息,则单个删除
                concurrentSkipListMap.remove(deliveryTag);
            }

            log.info("确认的消息编号:"+deliveryTag);
        };

        ConfirmCallback NACK_callback = (deliveryTag,multiple)->{   //消息确认失败 回调函数

            log.error("未确认的消息编号:"+deliveryTag);
        };

        channel.addConfirmListener(ACK_callback,NACK_callback); //将以上回调确认函数添加到监听器中


        long begin = System.currentTimeMillis();    //开始时间

        //【第一步】这里为模拟消息的发送
        for (int i=0;i<MESSAGE_COUNT;i++){

            String message = i +"";
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());

            //3.记录消息的总和,往里面存入信道的序号以及对应序号的信息
            //channel.getNextPublishSeqNo() 表示获取当前消息的下一个消息编号
            concurrentSkipListMap.put(channel.getNextPublishSeqNo(),message);
        }

        long end = System.currentTimeMillis();    //结束时间
        System.out.println("总耗时为:"+(end-begin)+"ms");
    }

关于其中使用 concurrentSkipListMap.headMap(deliveryTag) 进行批量删除的解释说明:

运行结果:

可见,异步确认发布消息效率比以上两种方式都要高,由于是异步发送的消息,所以顺序会很不一致


3、关于 Exchanges 交换机

四种 MQ 交换机(未按先后排序):


4、死信队列(重点)

定义:死信,顾名思义,死掉的信息,即无法被消费的消息;由于存在有该类型的消息,所以对应保存该类型的队列随即产生,即死信队列

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息 消费发生异常时,将消息投入死信队列中例:用户在商城下单成功并点击去支付后,在指定时间未支付,则自动失效

死信队列大致流程图:

  • 正常情况下,生产者(producer)将消息通过普通交换机(normal_exchange)所绑定的普通队列(normal_queue)发送到消费者 C1
  • 而异常情况下,将异常的消息保存到死信队列(dead_queue),并发送到消费者 C2

【消费者C1】

C1需要不仅需要处理正常消息的发送,还需要处理失效消息往死信队列中的传递,以及普通队列与死信队列之间的绑定关系

/**
 * 模拟死信队列
 *
 * 消费者 C1
 */
public class Receive01 {

    public static final String Normal_Exchange = "normal_exchange"; //普通交换机

    public static final String Dead_Exchange = "dead_exchange"; //死信交换机

    public static final String Normal_Queue = "normal_queue"; //普通队列

    public static final String Dead_Queue = "dead_queue"; //死信队列

    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMQUtils.getChannel();

        //1.声明死信以及普通队列的交换机
        channel.exchangeDeclare(Normal_Exchange,BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(Dead_Exchange,BuiltinExchangeType.DIRECT);

        //2.声明死信队列以及普通队列
        HashMap<String, Object> deadLetters = new HashMap<>();
        //2.1 普通队列设置死信交换机(注意:这里的 KEY 是固定的)
        deadLetters.put("x-dead-letter-exchange",Dead_Exchange);
        //2.2 设置死信的 Routing Key
        deadLetters.put("x-dead-letter-routing-key","list");
        //2.3 设置过期时间(这里不进行设置)
//        deadLetters.put("x-message-ttl",10000);

        channel.queueDeclare(Normal_Queue,false,false,false,deadLetters);
        channel.queueDeclare(Dead_Queue,false,false,false,null);

        //3.将队列与交换机进行绑定
        channel.exchangeBind(Normal_Queue,Normal_Exchange,"zhangsan");
        channel.exchangeBind(Dead_Queue,Dead_Exchange,"lisi");

        System.out.println("等待接收消息中.....");

        //4.接收到消息的回调
        DeliverCallback deliverCallback = (consumerTag,message) ->{

            System.out.println("消费者一接收到消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
        };

        //5.消息中断的回调
        CancelCallback  cancelCallback = (consumerTag) -> {

            System.out.println(consumerTag + "消费者取消了消息的接收!");
        };

        channel.basicConsume(Normal_Queue,true,deliverCallback,cancelCallback);
    }
}

【生产者Producer】

这里模拟的是消息被拒

生产者不需要管理队列的消息是否发送成功,只需要将消息发送到普通队列中

public static final String Normal_Exchange = "normal_exchange"; //普通交换机

    public static final int MESSAGE_COUNT = 10; //消息的总数

    public static final String TTL_TIME = "10000"; //过期时间

    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMQUtils.getChannel();

        //1.声明一个交换机
        channel.exchangeDeclare(Normal_Exchange, BuiltinExchangeType.DIRECT);

        //2.死信消息,设置 TTL 消息过期时间,过期则传送到死信队列中
        AMQP.BasicProperties properties =
                new AMQP.BasicProperties()
                        .builder().expiration(TTL_TIME).build(); //10s

        //3.这里进行依次发送消息,同时设置了消息过期时间
        for (int i=0;i<=MESSAGE_COUNT;i++){
            String message = "info"+i;

            channel.basicPublish(Normal_Exchange,"zhangsan",properties,message.getBytes());

            System.out.println("消息生产者发送消息:"+message);
        }
    }

【消费者C2】

消费者C2的任务是只需要消费死信队列中的消息

public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMQUtils.getChannel();

        //1.声明死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        System.out.println("消费者二等待接收消息...");

        DeliverCallback deliverCallback = (consumerTag,message)->{

            System.out.println("消费者二接收到消息:"+ new String(message.getBody(), StandardCharsets.UTF_8));
        };

        CancelCallback cancelCallback = (consumerTag)->{

            System.out.println(consumerTag+"消费者中断了消息..");
        };

        //2.消费死信队列中的消息
        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);
    }

运行结果:

MQ 中:

这里,进行中断普通队列的接收,模拟死信队列场景

当死信队列中存在未被消费的消息时,C2感应到存在的消息,并将之前发送失败的消息进行消费


5、延迟队列(整合SpringBoot)

定义:

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理;简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列

延迟队列一般使用的场景:

代码架构图:

创建两个队列 QAQB,两者队列 TTL 分别设置为 10S40S,然后在创建一个交换机 X 和死信 交换机 Y,它们的类型都是 Direct,创建一个 死信队列 QD,它们的绑定关系如下:

当然,这里进行整合 SpringBoot 进行使用 🔜

【这里是 application.yaml 文件中的配置】

spring:
  #这里是 RabbitMQ 的配置
  rabbitmq:
    port: 5672  #指定的 rabbitMQ 服务器端口号
    username: admin
    password: 123
    host: 192.168.101.65

【这里是延迟队列队列以及交换机的配置类】

这里是图中的中间那一段,也就是队列以及交换机的声明以及绑定

/**
 * 延迟队列中【队列】与【交换机】的 "声明" 以及 "绑定" 的配置类
 */
@Configuration
public class TTLQueueConfig {

    //普通交换机
    public static final String Normal_Exchange = "X";
    //死信交换机
    public static final String Dead_Exchange = "Y";
    //普通队列
    public static final String Normal_QueueA = "QA";
    public static final String Normal_QueueB = "QB";
    //死信队列
    public static final String Dead_Queue = "QD";


    //1.声明交换机
    //1.1声明普通交换机
    @Bean("Normal_Exchange")
    public DirectExchange Normal_Exchange(){

        return new DirectExchange(Normal_Exchange);
    }
    //1.2 声明死信交换机
    @Bean("Dead_Exchange")
    public DirectExchange Dead_Exchange(){

        return new DirectExchange(Dead_Exchange);
    }

    //2.声明普通队列,与死信交换机进行绑定,并声明过期时间
    @Bean("Normal_QueueA")  //【队列A】
    public Queue QA(){

        HashMap<String, Object> arguments = new HashMap<>(2); //这里初始化 map 的长度,加快编译速度
        //2.1 设置死信交换机
        arguments.put("x-dead-letter-exchange",Dead_Exchange);
        //2.2 设置死信 Routing Key
        arguments.put("x-dead-letter-routing-key","YD");
        //2.3 设置 TTL 过期时间
//        arguments.put("x-message-ttl",10000);

        return QueueBuilder
                .durable(Normal_QueueA) //开启队列持久化
                .withArguments(arguments).ttl(10000).build();
    }

    @Bean("Normal_QueueB")  //【队列B】
    public Queue QB(){

        HashMap<String, Object> arguments = new HashMap<>(2); //这里初始化 map 的长度,加快编译速度
        arguments.put("x-dead-letter-exchange",Dead_Exchange);
        arguments.put("x-dead-letter-routing-key","YD");
//        arguments.put("x-message-ttl",30000);

        return QueueBuilder
                .durable(Normal_QueueB)
                .withArguments(arguments).ttl(30000).build();
    }

    //3.声明死信队列
    @Bean("Dead_Queue")
    public Queue QD(){

        return QueueBuilder
                .durable(Dead_Queue).build();
    }


    //4.将普通交换机与队列A进行绑定
    @Bean
    public Binding QA_Binding_NormalQueue(@Qualifier("Normal_QueueA") Queue queueA,
                                          @Qualifier("Normal_Exchange") DirectExchange normalExchange){

        return BindingBuilder.bind(queueA)
                .to(normalExchange).with("XA");
    }
    //4.1将普通交换机与队列B进行绑定
    @Bean
    public Binding QB_Binding_NormalQueue(@Qualifier("Normal_QueueB") Queue queueB,
                                          @Qualifier("Normal_Exchange") DirectExchange normalExchange){

        return BindingBuilder.bind(queueB)
                .to(normalExchange).with("XB");
    }
    //4.2将死信交换机与死信队列进行绑定
    @Bean
    public Binding QD_Binding_DeadExchange(@Qualifier("Dead_Queue")Queue deadQueue,
                                           @Qualifier("Dead_Exchange")DirectExchange deadExchange){

        return BindingBuilder.bind(deadQueue)
                .to(deadExchange).with("YD");
    }

}

【消息生产者】

这里打算发送一个消息请求,分别给不同的 TTL 队列

/**
 * 消息生产者
 *
 * 这里进行发送 http://localhost:8080/ttl/sendMsg/小白
 */
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {

    @Resource
    RabbitTemplate rabbitTemplate;

    //发送消息
    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable("message")String message){

        log.info("当前时间:{},发送了一条信息:{}给两个 TTL 队列",new Date(),message);

        rabbitTemplate.convertAndSend("X","XA","消息来自10s的队列:"+message);
        rabbitTemplate.convertAndSend("X","XB","消息来自30s的队列:"+message);
    }
}

【消息消费者】

使用 Listener 监听器进行死信队列的监听

/**
 * 这里是消息的消费者
 */
@Slf4j
@Component
public class DeadLetterConsumer {


    //死信队列中进行接收 TTL 延迟消息
    @RabbitListener(queues="QD")    //调用监听器监听死信队列
    public void DeadQueue_consumer(Message message, Channel channel){

        String msg = new String(message.getBody());
        log.info("当前时间:{}"+"消费者死信队列接收到消费的消息:{}",new Date().toString(),msg);
    }
}

运行结果:

发现问题🤔:

若以后需要多个不同的 TTL 消息,那么就需要建立多个消息队列,以达到传递不同 TTL 的消息;这样导致耦合度升高,不符合开闭原则,所以接下来进行延迟队列的优化

解决问题:

新增一个 QC 队列,这个队列不设置延迟时间,而是让 Producer消息生产者 发送消息的时候进行设置消息的 TTL 时间,这样,就不用频繁改动队列的 TTL 时间

做绑定的代码跟上面一样,不做展示,这里是生产者的代码,进行设置发送消息的 TTL时间:

 @GetMapping("/sendMsg/ttl/{message}/{ttlTime}")
    public void QC_sendMsgByTTL(@PathVariable("message") String message,@PathVariable("ttlTime") String ttlTime){

        log.info("当前时间:{},发送了一条消息给QC:{}",new Date().toString(),message);

        rabbitTemplate.convertAndSend("X","QC",message,msg->{

            //这里进行设置消息的 TTL 时间
            msg.getMessageProperties().setExpiration(ttlTime);
            return msg;
        });
    }

运行结果(这里使用的是低版本的 RabbitMQ)

由结果可知,由于队列的先进先出特性,先发的 TTL消息若时间设置大于后发的 TTL消息,那么,后发的消息就会被堵塞,直到先发的 TTL消息发送完毕,后发的 TTL 消息才能继续发送,这是一个弊端


6、备份交换机(重点)

定义:

大致流程图如下:

【application.yaml配置类】

这里需要手动的开启消息回调与失败消息的回退

spring:
  #这里是 RabbitMQ 的配置
  rabbitmq:
    port: 5672  #指定的 rabbitMQ 服务器端口号
    username: admin
    password: 123
    host: 192.168.101.65

    #这里进行开启发布确认以及消息的回调
    publisher-confirm-type: correlated

    #将发送失败的消息回退给生产者
    publisher-returns: true

【消息回调接口】

需要注意的是:RabbitTemplate 注入必须在 init 前,不然会报未注入异常

@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {

    @Resource
    RabbitTemplate rabbitTemplate;


    /**
     * 由于这里重写继承接口中的方法,所以需要进行注入操作
     */
    @PostConstruct
    public void init(){

        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }


    /**
     * 交换机回调信息的接口
     * @param correlationData 保存回调消息的 id 以及相关的信息
     * @param Ack_Message 消息确认
     * @param reason 消息发送失败,回调的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean Ack_Message, String reason) {

        String messageId = "";

        if (ObjectUtil.isNotNull(correlationData)) { //判断是否为空,防止空指针异常

            messageId = correlationData.getId();
        }

        if (Ack_Message) {
            log.info("成功接收到消息,消息ID为:{}", messageId);
        } else {
            log.info("接收消息失败,消息ID为:{},失败的原因为:{}", messageId, reason);
        }
    }


    /**
     * 将发送失败的消息进行回退
     */
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {

        log.error("消息“:{} 被交换机:{} 退回,退回的原因:{},消息的 RoutingKey:{}",
                new String(returnedMessage.getMessage().getBody()),
                returnedMessage.getExchange(),returnedMessage.getReplyText(),returnedMessage.getRoutingKey());
    }
}
举报

相关推荐

0 条评论