1 ,代码清单 :
package com.keen.confirm;
import com.rabbitmq.client.*;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
// 发布确认 :每条消息,都确认一次
public class ProducerSyncConfirm {
// 1 ,rabbitMq 的 broker 的地址信息
public static final String QUEUE_NAME = "chuzuche";
public static final String QUEUE_HOST = "172.16.13.145";
public static final Integer QUEUE_PORT = 5672;
public static final String USER_NAME = "root";
public static final String USER_PASSWD = "123456";
public static void main(String[] args) throws Exception {
// 连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 配置 :ip,用户名,密码
factory.setHost(QUEUE_HOST);
factory.setPort(QUEUE_PORT);
factory.setUsername(USER_NAME);
factory.setPassword(USER_PASSWD);
// 创建连接 :
Connection conn = factory.newConnection();
// 获取信道 :
Channel channel = conn.createChannel();
// 生成队列 :
// 参数一 :队列名字
// 参数二 :队列存储方式 (内存-false,磁盘-true) ,默认-false
// 参数三 :数据使用范围 ( false-一个人消费,false-多人消费 )
// 参数四 :是否自动删除,最后一个消费者端连接后,该队列是否自动删除( true-自动删除,false-不自动删除 )
// 参数五 :其他参数 ( 延迟消息,死信消息 )
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 开启发布确认
channel.confirmSelect();
// 开始时间 :
long tm1 = System.currentTimeMillis();
// 消息存储器 :
// 生产者 :将消息存储到这里
// 确认 :确认的时候,将消息从里边删除
ConcurrentNavigableMap<Long, String> map = new ConcurrentSkipListMap<>();
// 成功回调 :消息确认,发布成功
// 参数 :消息编号,是否批量确认
ConfirmCallback successBack = (sequenceNum, mutiple) -> {
// 如果 mutiple==true,说明有多条消息
if(mutiple){
// 理论 :如果是批量确认,那么 map 中的数据都被发送成功了,可以清空缓存了
// 将这个消息编号之前的消息,全部清空
ConcurrentNavigableMap<Long, String> mapHead = map.headMap(sequenceNum, true);
System.out.println("===消息存储成功:=============================");
for (Long num : mapHead.keySet()) {
System.out.println("==="+mapHead.get(num));
}
System.out.println("===消息存储成功:=============================");
mapHead.clear();
}else{
// 理论 :如果是单跳确认,那么 map 中的数据只能确定成功了一条,可以把这一条数据删除
String v = map.remove(sequenceNum);
System.out.println("消息存储成功:"+v);
}
};
// 失败回调 :消息确认,发布失败
ConfirmCallback errorBack = (sequenceNum, mutiple) -> {
System.out.println("消息发送失败 :" + map.get(sequenceNum));
};
// 监听器 :异步监听消息是否成功
// 参数 :成功回调,失败回调
channel.addConfirmListener(successBack, errorBack);
for (int i = 0; i < 1000; i++) {
// 发消息 :
// 参数一 :交换机名字
// 参数二 :路由 key ,本次写队列名称
// 参数三 :数据存储方式 - 持久化存储
// 参数四 :消息,字节数组
String messsage = "消息:" + i + " 号";
// 将消息,存储到这个缓存中,为了确保消息是否成功存储
map.put(channel.getNextPublishSeqNo(), messsage);
// channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, messsage.getBytes("utf-8"));
channel.basicPublish("", QUEUE_NAME, null, messsage.getBytes("utf-8"));
}
// 结束时间 :
long tm2 = System.currentTimeMillis();
System.out.println("耗时:" + (tm2 - tm1) + "ms");
// 睡这么多秒,是为了接受回调
Thread.sleep(2000);
// 关闭流
channel.close();
conn.close();
}
}
2 ,效果 :