7.7 消费端限流
在类似如秒杀活动中,一开始会有大量并发写请求到达服务端,城机对消息进行削峰处理,如何做?
当消息投递的速度远快于消费的速度时,随着时间积累就会出现“消息积压”。消息中间件本身是具备一定的缓冲能力的,但这个能力是有容量限制的,如果长期运行并没有任何处理,最终会导致Broker崩溃,而分布式系统的故障往往会发生上下游传递,连锁反应可能会引起系统大范围的宕机,这就很悲剧了
7.7.1 资源限制限流
在RabbitMQ中可对内存和磁盘使用量设置阈值,当达到阈值后,生产者将被阻塞(block),直到对应指标恢复正常。全局上可以防止超大流量、消息积压等导致的Broker被压垮。当内存受限或磁盘可用空间受限的时候,服务器都会暂时阻止连接,服务器将暂停从发布消息的已连接的客户端的套接字读取数据。连接心中监视也将被禁用。所有网络连接将在rabbitmqctl和管理插件中显示为“已阻止”,这意味着它们尚未尝试发布,因此可以或者被阻止,这意味着它们已发布,现在已暂停。兼容的客户端被阻止将收到通知。
在/etc/rabbitmq/rabbitmq.conf
中配置磁盘可用空间大小:
# 磁盘限制阈值设置
# 设置磁盘的可用空间大小,单位字节。当磁盘可用空间低于这个值的时候,发生磁盘告警,触发限流。
# 如果设置了相对大小,则忽略此绝对大小。
disk_free_limit.absolute = 2000
# 使用计量单位,从RabbitMQ 3.6.0开始有效,对vm_memory_high_watemark同样有效。
# disk_free_limit.absolute = 500KB
# disk_free_limit.absolute = 50mb
# disk_free_limit.absolute = 5GB
# Alternatively, we can set a limit relative to total avaliable RAM.
# Values lower than 1.0 can be dangerous and should be used carefully.
# 还可以使用相对于总可用内存来设置。注意,此值不要低于1.0!
# 当磁盘可用空间低于总可用内存的2.0倍的时候,触发限流
# disk_free_limit.relative = 2.0
# 内存限流阈值设置
# 0.4表示阈值总可用内存的比值。 总可用内存表示操作系统给每个进程分配的大小,或者实际内存大小。
# 如32位的Windows,系统给每个进程最大2GB的内存,则此比值表示阈值为820MB
vm_memory_high_watermark.relative = 0.4
# 还可以直接通过绝对值限制可用内存大小,单位字节
vm_memory_high_watermark.absolute = 1073741824
# 从RabbitMQ 3.6.0开始,绝对值支持计量单位。如果设置了相对值,则忽略此设置值
vm_memory_high_watermark.absolute = 1024MiB
k, kiB : kibibytes(2^10 - 1024 bytes)
M, MiB : mibibytes(2^20 - 1024576 bytes)
G, GiB : gibibytes(2^30 - 1073741824 bytes)
KB: kilobytes (10^3 - 1000 bytes)
MB: megabytes (10^6 - 1000000 bytes)
GB: gigabytes (10^9 - 1000000000 bytes)
可以通过两种来设置生效
-
临时生效
此配制仅当前生效在重启后将失效。
# 硬盘资源限制
rabbitmqctl set_disk_free_limit 68996808704
# 内存资源限制
rabbitmqctl set_vm_memory_high_watermark 0.4
样例:
[root@nullnull-os rabbitmq]# rabbitmqctl set_disk_free_limit 68996808704
Setting disk free limit on rabbit@nullnull-os to 68996808704 bytes ...
- 长期生效
在rabbitmq.conf的配制文件中加入
# 硬盘限制
disk_free_limit.absolute=68455178240
# 内存限制
vm_memory_high_watermark.relative = 0.4
样例:
[root@nullnull-os rabbitmq]# vi /etc/rabbitmq/rabbitmq.conf
# 加入以下内容,注意单位到字节
disk_free_limit.absolute=68455178240
[root@nullnull-os rabbitmq]# cat /etc/rabbitmq/rabbitmq.conf
disk_free_limit.absolute=68455178240
[root@nullnull-os rabbitmq]# systemctl restart rabbitmq-server
[root@nullnull-os rabbitmq]#
注意,此需要重启rabbitMQ才能生效。
磁盘限制配制参考
来自:https://www.rabbitmq.com/disk-alarms.html
内存配制限制参考
https://www.rabbitmq.com/memory.html
中文配制可参考:https://www.cnblogs.com/kaishirenshi/p/12132703.html
更多配制可参见:https://www.rabbitmq.com/configure.html#config-file
样例程序:
maven导入
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
生产程序:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
public class ResourceLimitProduct {
public static void main(String[] args) throws Exception {
// 资源限制
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@node1:5672/%2f");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel(); ) {
// 定义交换器、队列和绑定
channel.exchangeDeclare("res.limit.ex", BuiltinExchangeType.DIRECT, true, false, null);
channel.queueDeclare("res.limit.qu", true, false, false, null);
channel.queueBind("res.limit.qu", "res.limit.ex", "res.limit.rk");
// 开启发送方确认机制
AMQP.Confirm.SelectOk selectOk = channel.confirmSelect();
ConfirmCallback confirm =
new ConfirmCallback() {
@Override
public void handle(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
System.out.println("【批量确认】:小于" + deliveryTag + "已经确认");
} else {
System.out.println("【单条确认】:等于" + deliveryTag + "已经确认");
}
}
};
ConfirmCallback nackConfirm =
new ConfirmCallback() {
@Override
public void handle(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
System.out.println("【批量不确认】:小于" + deliveryTag + "已经确认");
} else {
System.out.println("【单条不确认】:等于" + deliveryTag + "已经确认");
}
}
};
channel.addConfirmListener(confirm, nackConfirm);
for (int i = 0; i < 100000000; i++) {
String msg = getKbMessage(i);
long sequence = channel.getNextPublishSeqNo();
System.out.println("【发送】成功了序列消息:" + sequence);
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.contentType("text/plain");
// 发送的消息持久化
builder.deliveryMode(2);
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(
"res.limit.ex", "res.limit.rk", properties, msg.getBytes(StandardCharsets.UTF_8));
Thread.sleep(ThreadLocalRandom.current().nextInt(5, 100));
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static String getKbMessage(int i) {
StringBuilder msg = new StringBuilder("发送确认消息:" + i + "--");
for (int j = 0; j < 102400; j++) {
msg.append(j);
}
return msg.toString();
}
}
设置硬盘资源限制
[root@nullnull-os rabbitmq]# rabbitmqctl set_disk_free_limit 68996808704
Setting disk free limit on rabbit@nullnull-os to 68996808704 bytes ...
运行生产者的应用程序,查看控制台的输出
【发送】成功了序列消息:1
【单条确认】:等于1已经确认
【发送】成功了序列消息:2
【发送】成功了序列消息:3
【单条确认】:等于2已经确认
【发送】成功了序列消息:4
【单条确认】:等于3已经确认
【发送】成功了序列消息:5
......
【单条确认】:等于702已经确认
【单条确认】:等于703已经确认
【发送】成功了序列消息:704
【发送】成功了序列消息:705
【发送】成功了序列消息:706
【发送】成功了序列消息:707
【发送】成功了序列消息:708
【发送】成功了序列消息:709
【发送】成功了序列消息:710
【发送】成功了序列消息:711
到此使用硬盘空间限制的测试完成。
内存资源限制
编辑配制文件rabbitmq.conf
vi /etc/rabbitmqrabbitmq.conf
# 添加配制
vm_memory_high_watermark.absolute=120M
重启让其生效
systemctl restart rabbitmq-server
检查配制生效情况
[root@nullnull-os rabbitmq]# rabbitmqctl environment
......
{trace_vhosts,[]},
{vhost_restart_strategy,continue},
{vm_memory_calculation_strategy,rss},
{vm_memory_high_watermark,{absolute,"120MB"}},
{vm_memory_high_watermark_paging_ratio,0.5},
{writer_gc_threshold,1000000000}]},
{rabbit_common,[]},
......
查看到如下配制说明生效。
运行生产者
观察客户端输出
【发送】成功了序列消息:1
【发送】成功了序列消息:2
【单条确认】:等于1已经确认
【发送】成功了序列消息:3
【单条确认】:等于2已经确认
【单条确认】:等于3已经确认
【发送】成功了序列消息:4
【发送】成功了序列消息:5
【发送】成功了序列消息:6
【单条确认】:等于4已经确认
【单条确认】:等于5已经确认
【单条确认】:等于6已经确认
【发送】成功了序列消息:7
【单条确认】:等于7已经确认
......
【发送】成功了序列消息:174
【单条确认】:等于174已经确认
【发送】成功了序列消息:175
【单条确认】:等于175已经确认
【发送】成功了序列消息:176
【单条确认】:等于176已经确认
【发送】成功了序列消息:177
【发送】成功了序列消息:178
【发送】成功了序列消息:179
【发送】成功了序列消息:180
【发送】成功了序列消息:181
【发送】成功了序列消息:182
【发送】成功了序列消息:183
【发送】成功了序列消息:184
【发送】成功了序列消息:185
【发送】成功了序列消息:186
【发送】成功了序列消息:187
观察网页端的情况
到此内存资源限制而导致的限流测试完成。
7.7.2 默认的credit flow流控
RabbitMQ Credit Flow Mechanism (信用流控制机制) 是 RabbitMQ 使用的一种流量控制机制,旨在确保生产者(publishers)不会发送太多的消息给消费者(consumers),从而导致系统超载或资源耗尽。这个机制主要是为了保护消费者免受生产者发送太多消息的影响。
以下是 RabbitMQ Credit Flow 机制的基本工作原理:
- 信用计数器(Credit Counter):对于每个消费者,RabbitMQ 维护一个称为信用计数器的值。这个计数器表示消费者当前可以接收多少条消息。
- 初始信用额度(Initial Credit):当一个消费者连接到队列并开始消费消息时,RabbitMQ 为该消费者分配一个初始信用额度。这个额度通常与队列中的未确认消息数量有关。
- 消费者确认(Consumer Acknowledgments):当消费者成功处理一条消息并确认它时,它将会恢复一定数量的信用,这允许 RabbitMQ 将更多的消息发送给消费者。
- 信用降低(Decreasing Credit):当消费者未确认消息超出其信用额度时,其信用额度将降低。这会导致生产者无法继续发送消息给该消费者,直到其信用额度恢复。
- 自动降低的消费者(Auto-decrease Consumers):RabbitMQ 还可以配置为自动降低某些消费者的信用,以避免某个消费者占用太多资源。这通常用于处理慢速或长时间处理的消费者。
这个机制有助于平衡生产者和消费者之间的消息流量,防止生产者发送大量消息导致队列爆满,从而提高系统的稳定性和可靠性。
要注意的是,RabbitMQ 的信用流控制机制是可配置的,您可以根据您的需求来调整信用额度和其他参数,以满足特定的应用场景。此外,RabbitMQ 还提供了一些工具和插件,用于监控和管理流量控制,以确保系统的正常运行。
可以通过查看队列的状态信息来了解 Credit Flow 机制的当前状态。以下是一些常见的方式来查看 Credit Flow 状态:
-
RabbitMQ Management UI:RabbitMQ 提供了一个基于 Web 的管理界面,您可以通过该界面查看队列的状态和统计信息,包括队列的消息数量、未确认消息数量以及消费者的状态。要访问管理界面,请确保已启用 RabbitMQ Management 插件。默认情况下,它通常在 http://localhost:15672/ 上运行。
在管理界面中,您可以选择特定的队列,然后查看其状态和相关的统计信息,包括未确认消息数量。这可以帮助您了解 Credit Flow 是否生效,是否有消费者的信用已用尽。
-
命令行工具:您还可以使用 RabbitMQ 的命令行工具来查看队列的状态。以下是一个示例命令,用于查看队列的状态:
rabbitmqctl list_queues name messages consumers messages_unacknowledged
这将显示队列的名称、消息数量、消费者数量以及未确认消息数量。未确认消息数量表示消费者尚未确认的消息数量,这可以用于判断 Credit Flow 是否生效。
-
监控工具:您可以使用监控工具(如Prometheus和Grafana)来设置自定义监控和警报,以便实时跟踪队列的状态和信用流控制情况。通过这些工具,您可以创建仪表板来显示队列的各种指标,包括未确认消息数量和消费者的信用。
通过以上方法,您可以监视 RabbitMQ 中队列的状态和 Credit Flow 机制的工作情况,以确保系统的稳定性和可靠性。
7.7.3 Qos机制
生产者往往是希望自己产生的消息能快速投递出去,而当消息投递太快县城超过了下游的消费速度时就容易出现消息积压、堆积,所以,从上游来讲我们应该在生产端应用程序中也可以加入限流、应急开关等手段,避免超过broker端的极限承载能力或者压垮下游消费者。
再讲消费者,我们期望消费者能够尽快的消费完消息,而且还要防止瞬时大量消息压垮消费端(推模式),我们期望消费端能够处理速度是最快、最稳定而且还相对均匀(比较理想化)
提供应用吞吐量和缩短消费过程的耗时,主要以下几种方式:
测试
maven导入:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
生产者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;
public class QosProduct {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@node1:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 定义交换机
channel.exchangeDeclare(
"qos.ex",
BuiltinExchangeType.DIRECT,
// 持久化标识
false,
// 是否自动删除
false,
// 属性信息
null);
for (int i = 0; i < 100; i++) {
String msg = "这是发送的消息:" + i;
channel.basicPublish("qos.ex", "qos.rk", null, msg.getBytes(StandardCharsets.UTF_8));
}
}
}
消费者 :
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.concurrent.ThreadLocalRandom;
public class QosConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@node1:5672/%2f");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 定义交换器、队列和绑定
channel.exchangeDeclare("qos.ex", BuiltinExchangeType.DIRECT, false, false, null);
channel.queueDeclare("qos.qu", false, false, false, null);
channel.queueBind("qos.qu", "qos.ex", "qos.rk");
// 设置Qos为5,未被确认ACK的为5,还有一个参数,即是否为全局,true为全局
channel.basicQos(5);
channel.basicConsume(
"qos.qu",
false,
new DefaultConsumer(channel) {
@Override
public void handleDelivery(
String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
LocalDateTime time = LocalDateTime.now();
System.out.println(
"[消费]" + time + "+收到的消息:" + new String(body, StandardCharsets.UTF_8));
int randomSleep = ThreadLocalRandom.current().nextInt(20, 1000);
try {
Thread.sleep(randomSleep);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (envelope.getDeliveryTag() % 3 == 0) {
// 进行消息确认
channel.basicAck(envelope.getDeliveryTag(), true);
}
}
});
}
}
测试
先启动消费都,再启动生产者,查看控制台输出
[消费]2023-08-25T12:08:13.143+收到的消息:这是发送的消息:0
[消费]2023-08-25T12:08:13.765+收到的消息:这是发送的消息:1
[消费]2023-08-25T12:08:14.127+收到的消息:这是发送的消息:2
[消费]2023-08-25T12:08:14.892+收到的消息:这是发送的消息:3
......
[消费]2023-08-25T12:08:57.437+收到的消息:这是发送的消息:96
[消费]2023-08-25T12:08:57.530+收到的消息:这是发送的消息:97
[消费]2023-08-25T12:08:57.566+收到的消息:这是发送的消息:98
[消费]2023-08-25T12:08:57.649+收到的消息:这是发送的消息:99
查看队列的情况:
[root@nullnull-os ~]# rabbitmqctl list_channels name,prefetch_count,global_prefetch_count --formatter pretty_table
Listing channels ...
┌───────────────────────────────────────────┬────────────────┬───────────────────────┐
│ name │ prefetch_count │ global_prefetch_count │
├───────────────────────────────────────────┼────────────────┼───────────────────────┤
│ 61.170.208.88:59116 -> 10.0.4.16:5672 (1) │ 5 │ 0 │
└───────────────────────────────────────────┴────────────────┴───────────────────────┘
[root@nullnull-os ~]#
网页端查看
并行消费者
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
public class QosThreadConsumer {
public static void main(String[] args) throws Exception {
// 资源限制
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://root:123456@node1:5672/%2f");
// 设置channel并发请求最大数
factory.setRequestedChannelMax(5);
// 自定义线程池工厂
ThreadFactory thsFactory = Executors.privilegedThreadFactory();
factory.setThreadFactory(thsFactory);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 定义交换器、队列和绑定
channel.exchangeDeclare("qos.ex", BuiltinExchangeType.DIRECT, false, false, null);
channel.queueDeclare("qos.qu", false, false, false, null);
channel.queueBind("qos.qu", "qos.ex", "qos.rk");
// 设置每秒处理2个
channel.basicQos(5, true);
channel.basicConsume(
"qos.qu",
false,
new DefaultConsumer(channel) {
@Override
public void handleDelivery(
String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
LocalDateTime time = LocalDateTime.now();
long threadId = Thread.currentThread().getId();
System.out.println(
"[消费]"
+ time
+ ",线程:"
+ threadId
+ ",收到的消息:"
+ new String(body, StandardCharsets.UTF_8));
int randomSleep = ThreadLocalRandom.current().nextInt(20, 1000);
try {
Thread.sleep(randomSleep);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (envelope.getDeliveryTag() % 3 == 0) {
// 进行消息确认
channel.basicAck(envelope.getDeliveryTag(), true);
}
}
});
}
}
控制台输出:
[消费]2023-08-26T09:37:21.430,线程:24,收到的消息:这是发送的消息:0
[消费]2023-08-26T09:37:21.866,线程:25,收到的消息:这是发送的消息:1
[消费]2023-08-26T09:37:22.434,线程:25,收到的消息:这是发送的消息:2
[消费]2023-08-26T09:37:22.847,线程:25,收到的消息:这是发送的消息:3
[消费]2023-08-26T09:37:23.685,线程:25,收到的消息:这是发送的消息:4
[消费]2023-08-26T09:37:23.847,线程:26,收到的消息:这是发送的消息:5
......
[消费]2023-08-26T09:39:10.684,线程:28,收到的消息:这是发送的消息:526
[消费]2023-08-26T09:39:10.695,线程:32,收到的消息:这是发送的消息:527
[消费]2023-08-26T09:39:10.767,线程:32,收到的消息:这是发送的消息:528
......
[消费]2023-08-26T09:39:58.270,线程:27,收到的消息:这是发送的消息:996
[消费]2023-08-26T09:39:58.405,线程:27,收到的消息:这是发送的消息:997
[消费]2023-08-26T09:39:58.575,线程:27,收到的消息:这是发送的消息:998
[消费]2023-08-26T09:39:58.671,线程:27,收到的消息:这是发送的消息:999
如果Qos设置为全局,则可以看到到
[root@nullnull-os ~]# rabbitmqctl list_channels name,prefetch_count,global_prefetch_count --formatter pretty_table
Listing channels ...
┌───────────────────────────────────────────┬────────────────┬───────────────────────┐
│ name │ prefetch_count │ global_prefetch_count │
├───────────────────────────────────────────┼────────────────┼───────────────────────┤
│ 61.170.208.88:60591 -> 10.0.4.16:5672 (1) │ 0 │ 5 │
├───────────────────────────────────────────┼────────────────┼───────────────────────┤
│ 61.170.208.88:60610 -> 10.0.4.16:5672 (1) │ 0 │ 0 │
└───────────────────────────────────────────┴────────────────┴───────────────────────┘
[root@nullnull-os ~]#