教程说明
- 本系列教程目录大纲:《RabbitMQ系列教程-目录大纲》
- 本系列教程配套代码:https://gitee.com/Horizon1024/rabbitmt.git(码云地址)
RabbitMQ之队列限流
我们在之前讲MQ好处的时候就说过MQ可以做流量削峰,当网站承受的压力已经到达极限时,我们可以采用MQ来进行限流操作
11.1 案例测试
11.1.1配置spring.xml:
一般情况下,队列限流设置为手动签收
<!--
定义监听器容器
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="none" prefetch="1" >
acknowledge:消息签收方式
none: 自动签收(消息被消费就自动签收)
manual: 手动签收(需要调用basicNack或basicReject方法手动签收)
auto: 由rabbitmq来决定是否签收(根据异常进行处理)
prefetch: 当多条消费者接受到多条消息一次性处理几条
-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<rabbit:listener ref="qosListener" queue-names="test_queue_confirm"></rabbit:listener>
</rabbit:listener-container>
11.1.2 监听器
package com.lscl.rabbitmq.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
/**
* Consumer 限流机制
* listener-container配置属性
* perfetch = 1,表示消费端每次从mq拉去一条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息。
*/
@Component
public class QosListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Thread.sleep(1000);
//1.获取消息
System.out.println(new String(message.getBody()));
//2. 处理业务逻辑
System.out.println("处理业务逻辑...");
//3. 签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
}
11.1.3 消息生产者发送消息
@Test
public void testSend() {
for (int i = 1; i <= 10; i++) {
rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "这是第" + i + "条消息");
}
}