0
点赞
收藏
分享

微信扫一扫

1 ,rabbitMq 消息确认,异步确认,回调函数,全部打印

凯约 2022-04-16 阅读 61
java

1 ,代码清单 :

package com.keen.confirm;

import com.rabbitmq.client.*;

import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

//  发布确认 :每条消息,都确认一次
public class ProducerSyncConfirm {

    //  1 ,rabbitMq 的 broker 的地址信息
    public static final String QUEUE_NAME = "chuzuche";
    public static final String QUEUE_HOST = "172.16.13.145";
    public static final Integer QUEUE_PORT = 5672;
    public static final String USER_NAME = "root";
    public static final String USER_PASSWD = "123456";

    public static void main(String[] args) throws Exception {

        //  连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //  配置 :ip,用户名,密码
        factory.setHost(QUEUE_HOST);
        factory.setPort(QUEUE_PORT);
        factory.setUsername(USER_NAME);
        factory.setPassword(USER_PASSWD);

        //  创建连接 :
        Connection conn = factory.newConnection();
        //  获取信道 :
        Channel channel = conn.createChannel();
        //  生成队列 :
        //      参数一 :队列名字
        //      参数二 :队列存储方式 (内存-false,磁盘-true) ,默认-false
        //      参数三 :数据使用范围 ( false-一个人消费,false-多人消费 )
        //      参数四 :是否自动删除,最后一个消费者端连接后,该队列是否自动删除( true-自动删除,false-不自动删除 )
        //      参数五 :其他参数 ( 延迟消息,死信消息 )
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        //  开启发布确认
        channel.confirmSelect();
        //  开始时间 :
        long tm1 = System.currentTimeMillis();

        //  消息存储器 :
        //      生产者 :将消息存储到这里
        //      确认 :确认的时候,将消息从里边删除
        ConcurrentNavigableMap<Long, String> map = new ConcurrentSkipListMap<>();

        //  成功回调 :消息确认,发布成功
        //  参数 :消息编号,是否批量确认
        ConfirmCallback successBack = (sequenceNum, mutiple) -> {
            //  如果 mutiple==true,说明有多条消息
            if(mutiple){
                //  理论 :如果是批量确认,那么 map 中的数据都被发送成功了,可以清空缓存了
                //  将这个消息编号之前的消息,全部清空
                ConcurrentNavigableMap<Long, String> mapHead = map.headMap(sequenceNum, true);
                System.out.println("===消息存储成功:=============================");
                for (Long num : mapHead.keySet()) {
                    System.out.println("==="+mapHead.get(num));
                }
                System.out.println("===消息存储成功:=============================");
                mapHead.clear();
            }else{
                //  理论 :如果是单跳确认,那么 map 中的数据只能确定成功了一条,可以把这一条数据删除
                String v = map.remove(sequenceNum);
                System.out.println("消息存储成功:"+v);
            }
        };
        //  失败回调 :消息确认,发布失败
        ConfirmCallback errorBack = (sequenceNum, mutiple) -> {
            System.out.println("消息发送失败 :" + map.get(sequenceNum));
        };
        //  监听器 :异步监听消息是否成功
        //  参数 :成功回调,失败回调
        channel.addConfirmListener(successBack, errorBack);
        for (int i = 0; i < 1000; i++) {
            //  发消息 :
            //      参数一 :交换机名字
            //      参数二 :路由 key ,本次写队列名称
            //      参数三 :数据存储方式 - 持久化存储
            //      参数四 :消息,字节数组
            String messsage = "消息:" + i + " 号";
            //  将消息,存储到这个缓存中,为了确保消息是否成功存储
            map.put(channel.getNextPublishSeqNo(), messsage);
            //  channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, messsage.getBytes("utf-8"));
            channel.basicPublish("", QUEUE_NAME, null, messsage.getBytes("utf-8"));
        }
        //  结束时间 :
        long tm2 = System.currentTimeMillis();
        System.out.println("耗时:" + (tm2 - tm1) + "ms");
        //  睡这么多秒,是为了接受回调
        Thread.sleep(2000);
        //  关闭流
        channel.close();
        conn.close();
    }
}

2 ,效果 :

在这里插入图片描述
在这里插入图片描述

举报

相关推荐

0 条评论