0
点赞
收藏
分享

微信扫一扫

RabbitMQ之死信交换机

黄昏孤酒 2024-01-26 阅读 13

前言

消息队列是分布式系统中常用的组件,用于异步通信、解耦和提高系统可靠性。然而,在实际应用中,难免会遇到一些异常情况,例如消息处理失败、超时等。为了更好地处理这些异常情况,死信交换机(Dead Letter Exchange)应运而生

一.什么是死信?

在了解死信交换机之前我们先了解什么是死信

消息变成死信一般是由于以下几种情况:

二.什么是死信交换机?

死信交换机是消息队列系统中的一种特殊交换机,用于处理那些无法被正常消费的消息。当消息满足一定的条件,例如重试次数达到上限或者处理失败,就会被标记为死信(Dead Letter)并被发送到死信交换机

1.死信交换机工作原理

1).消息被标记为死信:

当消息无法被正常消费时,可以通过设定一些条件将其标记为死信。这些条件可能包括消息的重试次数、过期时间等。

(2).发送到死信交换机:

一旦消息被标记为死信,它将被发送到预先指定的死信交换机。

(3).重新处理或记录:

死信交换机可以将死信消息重新发送到其他队列进行处理,也可以将其记录到日志中供后续分析。

三.优势与应用场景 

1.优势

2.应用场景

四.实战与应用

1.时间过期进入到死信队列

创建绑定死信延迟队列

//死信、延迟队列
    @Bean
    public Queue queueA() {
        Map<String, Object> config = new HashMap<>();
        //message在该队列queue的存活时间最大为10秒
        config.put("x-message-ttl", 10000);
        //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)
        config.put("x-dead-letter-exchange", "ExchangeB");
        //x-dead-letter-routing-key参数是给这个DLX指定路由键
        config.put("x-dead-letter-routing-key", "bb");
        return new Queue("queueA", true, true, false, config);
    }

    @Bean
    public DirectExchange ExchangeA(){
        return new DirectExchange("ExchangeA");
    }

    @Bean
    public Binding bindingA(){
        return BindingBuilder.bind(queueA()).to(ExchangeA()).with("aa");
    }

    @Bean
    public Queue queueB() {
        return new Queue("queueB");
    }
    @Bean
    public DirectExchange ExchangeB(){
        return new DirectExchange("ExchangeB");
    }
    @Bean
    public Binding bindingB(){
        return BindingBuilder.bind(queueB()).to(ExchangeB()).with("bb");
    }

我们通过测试方法send6向队列queueA中发送一条消息,接收成功后在时间过期时进入死信队列queueB中 

 

2.手动确认是否接收

配置确认模式为手动

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual

编写消费者接收方式为手动,若为true,则为接收,false则为不接收,不接收的同时会进入到死信队列

 package com.yu.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

 @Component
 @SuppressWarnings("all")
 @Slf4j
 @RabbitListener(queues = "queueA")
 public class ReceiverQA {
  @RabbitHandler
  public void process(String id, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception{
       log.error("QA接收到:" + id);
//       channel.basicAck(tag,true);
      channel.basicReject(tag,true);
      Thread.sleep(1000);
     }
 }

 

举报

相关推荐

0 条评论