0
点赞
收藏
分享

微信扫一扫

RabbitMq——发布确认高级和消息回退

早安地球 2022-03-19 阅读 78
rabbitmqjava

发布确认高级:消息在传递过程中,我们需要确定消息状态信息,开启发布确认高级模式,消息传递结束后会返回传递结果信息,若发送失败的消息,该消息会被存入缓存中,定时任务发送失败消息,交换机收到消息后,缓存会删除该信息。

如果只开启发布确认模式的话,当交换机收到生产者发送的消息后,会发布确认消息给生产者,如果发现路由不通,则会直接丢弃消息,此时生产者处于不知情状态,就会造成消息丢失。

消息回退:通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者,好让生产者做后续 。

案例:生产者发送两条消息,一条通过路由confirm发送,一条通过路由unknow发送,如下图

1、配置properties文件,开启发布确认高级和消息回退


spring.rabbitmq.host=192.168.22.129
spring.rabbitmq.port=5672
spring.rabbitmq.username=user
spring.rabbitmq.password=123456
#发布消息成功到交换机后会触发回调方法
spring.rabbitmq.publisher-confirm-type=correlated
#路由不通,回退消息给生产者
spring.rabbitmq.publisher-returns=true

2、创建配置类声明交换机,队列,以及绑定交换机和队列

        (1)声明交换机,队列;

        (2)调用BindingBuilder.bind(队列名).to(交换机名).with(绑定路由)绑定交换机与队列

@Configuration
public class ConfirmConfigs {

    //交换机,队列,路由名
    public final static String CONFIRM_EXCHANGE="confirm_exchange";
    public final static String CONFIRM_QUEUE="confirm_queue";
    public final static String CONFIRM_ROUTING_KEY="confirm";

    //声明交换机,队列
    @Bean
    public DirectExchange confirmExchange(){
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE).durable(true).build();
    }
    @Bean
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE).build();
    }

    //绑定交换机和队列
    @Bean
    public Binding queueBindExchange(@Qualifier("confirmExchange") DirectExchange exchange,
                                     @Qualifier("confirmQueue") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY);
    }
}

3、实现RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback接口,实现两个回调函数:

(1)confirm方法:交换机不管有无收到消息都会回调

        第一个参数:保存消息id以及相关信息;

        第二个参数:交换机是否收到消息,收到未true

        第三个参数:消息接收不到原因,成功接收消息则为null

 (2)returnMessage()方法:消息无法路由时会被回调

        第一个参数:被退回到消息体

        第二个参数:错误编码

        第三个参数:消息接收失败原因

        第四个参数:发送消息到交换机

        第五个参数:路由

@Component
@Slf4j
public class MyCallBacks implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
   @Autowired
   private RabbitTemplate rabbitTemplate;

    //注入
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        String id=correlationData!=null?correlationData.getId():"";
        if (b){
            //消息成功发送
            log.info("交换机已经收到id为:{}的消息",id);
        }else {
            //消息发送失败
            log.info("交换机未收到id为:{}的消息,收不打原因为:{}",id,s);
        }
    }

    @Override
    public void returnedMessage(Message message, int replayCode, String replayText, String exchange, String routingKey) {
        log.info("消息:{},被{}交换机退回,路由为:{},被退回原因为:{}",new String(message.getBody()),exchange,routingKey,replayText);
    }
}

4、生产者发送消息

        (1)创建CorrelationDate(消息id)对象,用于存放消息id以及相关信息

        (2)调用convertAndSend(交换机,路由,消息,CorrelationDate对象)方法发送消息

@Slf4j
@RestController
public class Producer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //发送消息
    @RequestMapping("/producer/{message}/{routing}/{id}")
    public void send(@PathVariable String message,
                       @PathVariable String routing,
                       @PathVariable String id){
        //存放消息id和相关信息
        CorrelationData correlationData = new CorrelationData(id);

        rabbitTemplate.convertAndSend(ConfirmConfigs.CONFIRM_EXCHANGE,
                routing, message,correlationData);

        log.info("生产者发送消息内容为:'{}',消息id为:{},发送路由为:{}",message,id,routing);
    }

}

5、注册监听器监听队列confirm_queue,接收消息

@Component
@Slf4j
public class Consumers {

    //消费者
    @RabbitListener(queues = ConfirmConfigs.CONFIRM_QUEUE)
    public void consumer(Message message){
        log.info("consumer收到消息:'{}',路由为:{}",
                new String(message.getBody()), message.getMessageProperties().getReceivedRoutingKey());
    }

}

6、测试,发送两条消息:

发送消息:吃饭了吗,路由为:confirm

发送消息:跑步了吗,路由为:confirms

 

 

 可以发现,消息:跑步了吗被退回,因为路由confirms不通,消息:吃饭了吗成功接收

举报

相关推荐

0 条评论