这里写自定义目录标题
什么是死信队列
当生产者通过交换机将消息传到消费者的队列中,消费者的队列由于种种原因,不能正常接收消息,这个时候就需要死信队列来处理
死信队列产生的三个条件
消息 TTL 过期
队列达到最大长度(队列满了,无法再添加数据到 mq 中)
消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.
消息TTL过期
说明:设置TTL过期时间可以在生产者处声明也可以在消费者处声明
在生产者中声明比较常用,可以根据信息的种类,设置不同的TTL时间
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
channel.basicPublish(normalExchangeName, "zhangsan", properties, message.getBytes());
在消费者处声明
Map<String, Object> params = new HashMap<String, Object>();
// 正常队列设置
params.put("x-dead-letter-exchange", deadExchangeName);
// 消息传送到死信交换机还得告诉他应该发送给谁,就是rottingkey
params.put("x-dead-letter-routing-key", "lisi");
params.put("x-expires", 10000);
//主要作用是当正常队列不能接收消息后,将消息发送到死信交换机
String normalQueue = "normalQueue1";
channel.queueDeclare(normalQueue, false, false, false, params);
队列达到最大长度
Map<String, Object> params = new HashMap<String, Object>();
params.put("x-key-letter-exchange", deadlExchange);
params.put("x-key-letter-exchange", "lisi");
// 还能设置交换时间
params.put("x-max-length", 5);
channel.queueDeclare(normalQueue, false, false, false, params);
队列拒绝接收消息
DeliverCallback deliverCallback = new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(new String(delivery.getBody(), "utf-8"));
// 第二个参数为mulitply true表示向生产者发送所有(可能会丢失数据,应答了一个,其他没接收到会发生数据丢失,false表示接收一个应答一个 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// false表示拒绝入队
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
}
};
CancelCallback cancelCallback = new CancelCallback() {
public void handle(String s) throws IOException {
}
};
//取消自动应答
boolean autoAck = false;
channel.basicConsume(normalQueue, autoAck, deliverCallback, cancelCallback);
实例代码
package xiang.test10死信队列;
import com.rabbitmq.client.*;
import xiang.utils.ConnectionUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class consumer01 {
public static final String normalExchangeName = "normalExchangeName";
public static final String deadExchangeName = "dead";
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(normalExchangeName, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(deadExchangeName, BuiltinExchangeType.DIRECT);
//声明死信队列
final String deadQueue = "deadQueue1";
channel.queueDeclare(deadQueue, false, false, false, null);
//死信队列绑定交换机
channel.queueBind(deadQueue, deadExchangeName, "lisi");
// 正常队列绑定死信队列
Map<String, Object> params = new HashMap<String, Object>();
// 正常队列设置
params.put("x-dead-letter-exchange", deadExchangeName);
params.put("x-dead-letter-routing-key", "lisi");
params.put("x-expires", 10000);
//
String normalQueue = "normalQueue1";
channel.queueDeclare(normalQueue, false, false, false, params);
channel.queueBind(normalQueue, normalExchangeName, "zhangsan");
DeliverCallback deliverCallback = new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(new String(delivery.getBody(), "utf-8"));
}
};
CancelCallback cancelCallback = new CancelCallback() {
public void handle(java.lang.String s) throws IOException {
}
};
channel.basicConsume(normalQueue, true, deliverCallback, cancelCallback);
}
}
package xiang.test10死信队列;
import com.rabbitmq.client.*;
import xiang.utils.ConnectionUtils;
import java.io.IOException;
public class consumer02 {
public static final String deadExchangeName = "dead";
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.exchangeDeclare(deadExchangeName, BuiltinExchangeType.DIRECT);
final String deadQueue = "deadQueue1";
channel.queueBind(deadQueue, deadExchangeName, "lisi");
DeliverCallback deliverCallback = new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(new String(delivery.getBody(), "utf-8"));
}
};
CancelCallback cancelCallback = new CancelCallback() {
public void handle(java.lang.String s) throws IOException {
}
};
channel.basicConsume(deadQueue, true, deliverCallback, cancelCallback);
}
}
package xiang.test10死信队列;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import sun.reflect.generics.tree.BaseType;
import xiang.utils.ConnectionUtils;
import java.io.IOException;
public class producer {
public static final String normalExchangeName = "normalExchangeName";
public static void main(String[] args) throws IOException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(normalExchangeName, BuiltinExchangeType.DIRECT);
// 设置TTL时间
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
for (int i = 0; i < 10; i++) {
String message = "info" + i;
channel.basicPublish(normalExchangeName, "zhangsan", properties, message.getBytes());
}
}
}