0
点赞
收藏
分享

微信扫一扫

【RabbitMQ】延迟队列之死信交换机

陆公子521 2024-01-26 阅读 18

请添加图片描述

✨前言

了解延迟队列之前我们先了解两个概念TTL和 DXL两个概念:

TTL概念

TTL 顾名思义:指的是消息的存活时间,RabbitMQ可以通过x-message-tt参数来设置指定Queue(队列)和 Message(消息)上消息的存活时间,它的值是一个非负整数,单位为微秒。

 DLX概念

DLX,全称为 Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱。当

消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,这个

交换器就是 DLX,绑定 DLX 的队列就称之为死信队列或者可以称之为延迟队列。

延迟队列存储的对象是对应的延迟消息,所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。

AMQP 协议中,或者 RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过前面

所介绍的 DLX TTL 模拟出延迟队列的功能。

🎉死信交换机的使用

1.案例分析

下面我会讲解两种使用死信的方式:

第一种是我们不为正常的交换机设置消费者,为该队列中的消息设置TTL如果消息过期了就会变为死信就会被发送到死信交换机中处理对应的事务

第二种则是为正常队列创建一个消费者但是开启手动确认,什么意思呢,我们的RabbitMQ中的消费者都是自动消费的,所以我们可以设置为手动确认消费,我接收到你这个消息了,但我还未处理,而是由消费者主动发送确认信号(ACK)给 RabbitMQ,告知消息已经成功处理,这条消息才算是被消费了。

2.案例1实践

创建交换机、队列以及它们的绑定关系
package org.example.produce.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
@SuppressWarnings("all")
public class RabbitConfig {
   

    /**
     * 定义正常队列
     * @return
     */
    @Bean
    public Queue QueueA(){
        Map<String, Object> config = new HashMap<>();
        config.put("x-message-ttl", 10000);//message在该队列queue的存活时间最大为10秒
        config.put("x-dead-letter-exchange", "deadExchange"); //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)
        config.put("x-dead-letter-routing-key", "bb");//x-dead-letter-routing-key参数是给这个DLX指定路由键
        return new Queue("QueueA", true, true, false, config);
    }


    /**
     * 定义死信队列
     * @return
     */
    @Bean
    public Queue QueueB(){
        return new Queue("QueueB");
    }

    /**
     * 自定义直连交换机
     * @return
     */
    @Bean
    public DirectExchange directExchangeA(){
        return new DirectExchange("direct-exchangeA",true,false);
    }

    /**
     * 自定义死信交换机
     * @return
     */
    @Bean
    public DirectExchange directExchangeB(){
        return new DirectExchange("direct-exchangeB",true,false);
    }

    /**
     * 将正常队列与直连交换机进行绑定,并设置路由键与死信交换机以及队列
     * @return
     */
    @Bean
    public Binding bindingExchangeA(){
        return BindingBuilder.bind(QueueA())
                .to(directExchangeA())
                .with("aa");
    }

    /**
     * 将死信队列与死信交换机进行绑定,并设置路由键
     * @return
     */
    @Bean
    public Binding bindingExchangeB(){
        return BindingBuilder.bind(QueueB())
                .to(directExchangeB())
                .with("bb");
    }

}
 创建消息的生产者
package org.example.produce.controller;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;

@RestController
public class Sender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    @RequestMapping("/send")
    public String send() {
        Map<String,Object> data=new HashMap<>();
        data.put("msg","订单ID:121452623345");
        rabbitTemplate.convertAndSend("direct-exchangeA","aa", data);
        return "😎";
    }
}
创建死信队列的消费者 
package org.example.produce.controller;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
@RabbitListener(queues = {"QueueB"})
public class BReceiver {
    @RabbitHandler
    public void handler(Map<String,Object> json){
        System.out.println(json);
    }
}

效果展示: 

 访问一下http://localhost:8081/send

访问一下http://118.178.124.148:15672

可以看到已经有我们的队列了,现在我们开启消费者服务查看一下

也是可以拿到原先队列中的消息的,说明我们的死信交换机和死信队列生效了

RabbitMQ死信队列优化

如果我们想要第一条消息在6s后变成了死信消息,然后被消费者消费掉,第二条消息在60s之后变成了死信消息,然后被消费掉,这样,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有6s和60s两个时间选项,如果需要一个小时后处理,那么就需要增加TTL为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求??

其实我们可以增加一个延时队列,用于接收设置为任意延时时长的消息,增加一个相应的死信队列和routingkey

创建交换机、队列以及它们的绑定关系

package org.example.produce.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
@SuppressWarnings("all")
public class RabbitConfig {


    /**
     * 定义正常队列
     * @return
     */
    @Bean
    public Queue QueueA(){
        Map<String, Object> config = new HashMap<>();
        config.put("x-message-ttl", 10000);//message在该队列queue的存活时间最大为10秒
        config.put("x-dead-letter-exchange", "direct-exchangeB"); //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)
        config.put("x-dead-letter-routing-key", "bb");//x-dead-letter-routing-key参数是给这个DLX指定路由键
        return new Queue("QueueA", true, true, false, config);
    }


    /**
     * 定义死信队列
     * @return
     */
    @Bean
    public Queue QueueB(){
        return new Queue("QueueB");
    }


    // 声明延时队列C 不设置TTL
    @Bean
    public Queue QueueC(){
        Map<String, Object> config = new HashMap<>();
        // x-dead-letter-exchange    这里声明当前队列绑定的正常交换机
        config.put("x-dead-letter-exchange","direct-exchangeA");
        // x-dead-letter-routing-key  这里声明当前队列的死信路由key
        config.put("x-dead-letter-routing-key", "aa");
        return new Queue("QueueC", true, true, false, config);
    }

    /**
     * 自定义直连交换机
     * @return
     */
    @Bean
    public DirectExchange directExchangeA(){
        return new DirectExchange("direct-exchangeA",true,false);
    }

    /**
     * 自定义死信交换机
     * @return
     */
    @Bean
    public DirectExchange directExchangeB(){
        return new DirectExchange("direct-exchangeB",true,false);
    }

    /**
     * 自定义延迟交换机
     * @return
     */
    @Bean
    public DirectExchange directExchangeC(){
        return new DirectExchange("direct-exchangeC",true,false);
    }
    /**
     * 将正常队列与直连交换机进行绑定,并设置路由键与死信交换机以及队列
     * @return
     */
    @Bean
    public Binding bindingExchangeA(){
        return BindingBuilder.bind(QueueA())
                .to(directExchangeA())
                .with("aa");
    }

    /**
     * 将死信队列与死信交换机进行绑定,并设置路由键
     * @return
     */
    @Bean
    public Binding bindingExchangeB(){
        return BindingBuilder.bind(QueueB())
                .to(directExchangeB())
                .with("bb");
    }
    /**
     * 将正常队列与直连交换机进行绑定,并设置路由键与死信交换机以及队列
     * @return
     */
    @Bean
    public Binding bindingExchangeC(){
        return BindingBuilder.bind(QueueC())
                .to(directExchangeC())
                .with("cc");
    }


/



}

 创建消息的生产者

package org.example.produce.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;

@RestController
@Slf4j
public class Sender {
    @Autowired
    private AmqpTemplate rabbitTemplate;


    @RequestMapping("send02")
    public void sendMsg( Integer delay) {
        Map<String,Object> data=new HashMap<>();
        data.put("msg","延迟队列");
        rabbitTemplate.convertAndSend("direct-exchangeC", "cc",data , message -> {
            // 设置延迟毫秒值
            message.getMessageProperties().setExpiration(String.valueOf(delay * 1000));
            return message;
        });
    }
}


 

3.案例2实践

  • 消息通过 ACK 确认是否被正确接收,每个 Message 都要被确认(acknowledged),可以手动去 ACK 或自动 ACK

  • 自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能,如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息

  • 如果消息已经被处理,但后续代码抛出异常,使用 Spring 进行管理的话消费端业务逻辑会进行回滚,这也同样造成了实际意义的消息丢失

  • 如果手动确认则当消费者调用 ack、nack、reject 几种方法进行确认,手动确认可以在业务失败后进行一些操作,如果消息未被 ACK 则会发送到下一个消费者

  • 如果某个服务忘记 ACK 了,则 RabbitMQ 不会再发送数据给它,因为 RabbitMQ 认为该服务的处理能力有限

  • ACK 机制还可以起到限流作用,比如在接收到某条消息时休眠几秒钟

  • 消息确认模式有:

    • AcknowledgeMode.NONE:自动确认

    • AcknowledgeMode.AUTO:根据情况确认

    • AcknowledgeMode.MANUAL:手动确认

配置yml文件关闭自动确认
server:
    port: 9999
spring:
    application:
        name: consume
    rabbitmq:
        host: localhost
        username: weiwei
        password: 123456
        port: 5672
        virtual-host: my_vhost
        listener:
            simple:
                acknowledge-mode: manual
为QueueA创建一个消费者并且手动确认

在刚刚上一个案例中我们不是没有为正常队列创建消费者吗,现在我们创建一个

package org.example.produce.controller;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

@Component
@RabbitListener(queues = {"QueueA"})
public class AReceiver {
    @RabbitHandler
    public void handler(Map<String,Object> json, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        System.out.println("QA接到消息"+json); // 打印接收到的消息
        channel.basicAck(tag,true); // 确认消息已被消费
    }
}

 现在我们看一下我们的消息是否会是怎么样的

2024-01-25 19:55:27.744  INFO 13668 --- [nio-8081-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2024-01-25 19:55:27.744  INFO 13668 --- [nio-8081-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2024-01-25 19:55:27.745  INFO 13668 --- [nio-8081-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 1 ms
QA接到消息{msg=订单ID:121452623345}
QA接到消息{msg=订单ID:121452623345}

直接被QA接收消费,那么如果我拒绝呢?

消费者拒绝消息
package org.example.produce.controller;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
@RabbitListener(queues = {"QueueA"})
public class AReceiver {
    @RabbitHandler
    public void handler(Map<String,Object> json, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        System.out.println("拒绝"+json);
        channel.basicReject(tag,false);  // 拒绝消息
        Thread.sleep(1000);
    }
}

被拒绝就会变成死信消息转到我们的死信交换机然后发送给死信队列

但是我们的死信也没有进行消费 ,只是消息保存在了队列中,那是因为我们开启了全局的手动消息确认,也就是上面所编写的配置,我们只需要像刚刚那样手动确认即可
 

package org.example.produce.controller;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
@RabbitListener(queues = {"QueueB"})
public class BReceiver {
    @RabbitHandler
    public void handler(Map<String,Object> json, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        System.out.println("QB接到消息"+json); // 打印接收到的消息
        channel.basicAck(tag,true); // 确认消息已被消费

    }
}

 可以看到在消息被拒后消息就会跑到死信队列中做处理

2024-01-25 20:00:53.759  INFO 13444 --- [  restartedMain] org.example.produce.ProduceApplication   : Started ProduceApplication in 3.916 seconds (JVM running for 4.403)
2024-01-25 20:01:16.094  INFO 13444 --- [nio-8081-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2024-01-25 20:01:16.095  INFO 13444 --- [nio-8081-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2024-01-25 20:01:16.095  INFO 13444 --- [nio-8081-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 0 ms
拒绝{msg=订单ID:121452623345}
QB接到消息{msg=订单ID:121452623345}

请添加图片描述

到这里我的分享就结束了,欢迎到评论区探讨交流!!

💖如果觉得有用的话还请点个赞吧 💖

举报

相关推荐

0 条评论