0
点赞
收藏
分享

微信扫一扫

rabbitmq手动应答代码实现与测试

梯梯笔记 2022-03-19 阅读 85
rabbitmqjava

rabbitmq作为现在流行的消息队列,它拥有流量削峰、应用解耦、异步处理等优点,使用数量也是较多的。其中重要的特性也就是手动应答避免消息丢失的特点更是使其更上一层楼。消息队列基础的处理流程是:生产者—》队列—》消费者。
rabbitmq的自动应答会导致在消息的分发途中,如果一台消费者角色的服务器宕机了,其处理的消息在自动应答模式下就会丢失,而手动应答则不会,而且退回队列分发给下一个消费者进行处理,有效的规避了因服务器宕机而造成的消息丢失。接下来进行代码演示
首先我们进行创建一个生产者进行消息的输入:

public class Task02 {

    private final static String TASK_QUEUE_NAME="queue01";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();

        channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);

        Scanner scanner = new Scanner(System.in);

        while (scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8"));
            System.out.println("生产者发出的消息是:"+message);

        }


    }
}

紧接着编写消费者的代码:
消费者1号

public class Work01 {

    private final static String TASK_QUEUE_NAME="queue01";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();

        System.out.println("消费者1号等待接收消息处理时间较短!");

        DeliverCallback deliverCallback=(String consumerTag, Delivery message)->{
            //挂起一秒
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("接收到的消息是:"+new String(message.getBody(),"UTF-8"));

            //手动应答操作
            /**
             * 第一个参数:tag消息的标记
             * 第二个参数:是否批量处理,手动应答就为false
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);

        };

        CancelCallback cancelCallback=(String consumerTag)->{
            System.out.println(consumerTag+"消费者消费信息被终止!");
        };
        //采用手动应答
        boolean autoAck=false;
        channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

消费者2号:

public class Work02 {

    private final static String TASK_QUEUE_NAME="queue01";

    public static void main(String[] args) throws IOException, TimeoutException {

        Channel channel = RabbitMQUtils.getChannel();

        System.out.println("消费者2号等待接收消息处理时间较短!");

        DeliverCallback deliverCallback=(String consumerTag, Delivery message)->{
            //挂起一秒
            try {
                Thread.sleep(30000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("接收到的消息是:"+new String(message.getBody(),"UTF-8"));

            //手动应答操作
            /**
             * 第一个参数:tag消息的标记
             * 第二个参数:是否批量处理,手动应答就为false
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);

        };

        CancelCallback cancelCallback=(String consumerTag)->{
            System.out.println(consumerTag+"消费者消费信息被终止!");
        };
        //采用手动应答
        boolean autoAck=false;
        channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

以上代码写完后我们依次点击运行,我们打开rabbitmq的web管理页面会发现刚刚创建得队列
在这里插入图片描述
接着我们发信息:
在这里插入图片描述
然后检查消费者消费情况
在这里插入图片描述
在这里插入图片描述
1号消费者立即就收到了消息,2号因为设置了挂起30秒所以30后受到了消息,接着我们做一个测试,就是在2号消费者接收到消息处理时,关掉它造成服务器宕机,检测消息是否被1号消费者消费或是丢失。
在这里插入图片描述
在这里插入图片描述
结果是在2号服务器宕机之后,消息并未丢失而且重新回到队列被1号消费者消费,所以消息队列中一般多推荐使用手动应答防止消息丢失从而造成损失。

举报

相关推荐

0 条评论