0
点赞
收藏
分享

微信扫一扫

MQRabbitMQ死信队列

大柚子top 2022-04-29 阅读 72
rabbitmq

这里写自定义目录标题

什么是死信队列

当生产者通过交换机将消息传到消费者的队列中,消费者的队列由于种种原因,不能正常接收消息,这个时候就需要死信队列来处理

死信队列产生的三个条件

消息 TTL 过期
队列达到最大长度(队列满了,无法再添加数据到 mq 中)
消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

消息TTL过期

说明:设置TTL过期时间可以在生产者处声明也可以在消费者处声明
在生产者中声明比较常用,可以根据信息的种类,设置不同的TTL时间

        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
            channel.basicPublish(normalExchangeName, "zhangsan", properties, message.getBytes());

在消费者处声明

 Map<String, Object> params = new HashMap<String, Object>();
//        正常队列设置
        params.put("x-dead-letter-exchange", deadExchangeName);
// 消息传送到死信交换机还得告诉他应该发送给谁,就是rottingkey
        params.put("x-dead-letter-routing-key", "lisi");
        params.put("x-expires", 10000);
//主要作用是当正常队列不能接收消息后,将消息发送到死信交换机
        String normalQueue = "normalQueue1";
        channel.queueDeclare(normalQueue, false, false, false, params);

队列达到最大长度

 Map<String, Object> params = new HashMap<String, Object>();
        params.put("x-key-letter-exchange", deadlExchange);
        params.put("x-key-letter-exchange", "lisi");
//        还能设置交换时间
        params.put("x-max-length", 5);

        channel.queueDeclare(normalQueue, false, false, false, params);

队列拒绝接收消息

      DeliverCallback deliverCallback = new DeliverCallback() {
            public void handle(String s, Delivery delivery) throws IOException {
                System.out.println(new String(delivery.getBody(), "utf-8"));
//  第二个参数为mulitply  true表示向生产者发送所有(可能会丢失数据,应答了一个,其他没接收到会发生数据丢失,false表示接收一个应答一个              channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
//                false表示拒绝入队
                channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        CancelCallback cancelCallback = new CancelCallback() {
            public void handle(String s) throws IOException {
            }
        };
//取消自动应答
        boolean autoAck = false;
        channel.basicConsume(normalQueue, autoAck, deliverCallback, cancelCallback);

实例代码

package xiang.test10死信队列;

import com.rabbitmq.client.*;

import xiang.utils.ConnectionUtils;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class consumer01 {
    public static final String normalExchangeName = "normalExchangeName";
    public static final String deadExchangeName = "dead";

    public static void main(String[] args) throws IOException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(normalExchangeName, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(deadExchangeName, BuiltinExchangeType.DIRECT);

        //声明死信队列
        final String deadQueue = "deadQueue1";
        channel.queueDeclare(deadQueue, false, false, false, null);
        //死信队列绑定交换机
        channel.queueBind(deadQueue, deadExchangeName, "lisi");

//        正常队列绑定死信队列
        Map<String, Object> params = new HashMap<String, Object>();
//        正常队列设置
        params.put("x-dead-letter-exchange", deadExchangeName);
        params.put("x-dead-letter-routing-key", "lisi");
        params.put("x-expires", 10000);
//
        String normalQueue = "normalQueue1";
        channel.queueDeclare(normalQueue, false, false, false, params);
        channel.queueBind(normalQueue, normalExchangeName, "zhangsan");

        DeliverCallback deliverCallback = new DeliverCallback() {
            public void handle(String s, Delivery delivery) throws IOException {
                System.out.println(new String(delivery.getBody(), "utf-8"));
            }
        };
        CancelCallback cancelCallback = new CancelCallback() {
            public void handle(java.lang.String s) throws IOException {
            }
        };
        channel.basicConsume(normalQueue, true, deliverCallback, cancelCallback);
    }
}

package xiang.test10死信队列;

import com.rabbitmq.client.*;
import xiang.utils.ConnectionUtils;

import java.io.IOException;

public class consumer02 {
    public static final String deadExchangeName = "dead";

    public static void main(String[] args) throws IOException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.exchangeDeclare(deadExchangeName, BuiltinExchangeType.DIRECT);
        final String deadQueue = "deadQueue1";

        channel.queueBind(deadQueue, deadExchangeName, "lisi");

        DeliverCallback deliverCallback = new DeliverCallback() {
            public void handle(String s, Delivery delivery) throws IOException {
                System.out.println(new String(delivery.getBody(), "utf-8"));
            }
        };
        CancelCallback cancelCallback = new CancelCallback() {
            public void handle(java.lang.String s) throws IOException {
            }
        };
        channel.basicConsume(deadQueue, true, deliverCallback, cancelCallback);
    }
}

package xiang.test10死信队列;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import sun.reflect.generics.tree.BaseType;
import xiang.utils.ConnectionUtils;

import java.io.IOException;

public class producer {
    public static final String normalExchangeName = "normalExchangeName";

    public static void main(String[] args) throws IOException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(normalExchangeName, BuiltinExchangeType.DIRECT);
//        设置TTL时间
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
        for (int i = 0; i < 10; i++) {
            String message = "info" + i;
            channel.basicPublish(normalExchangeName, "zhangsan", properties, message.getBytes());
        }
    }
}

举报

相关推荐

0 条评论