0
点赞
收藏
分享

微信扫一扫

rabbitmq-sharding插件使用


RabbitMQ是非常流行的消息中间件,大家都知道通过集群能够增大它的吞吐量,那么针对单个队列,集群能增大他的吞吐量吗?如果不能,我们要怎么做呢?

rabbitmq-sharding插件使用_吞吐量

南山远眺

问题

RabbitMQ是非常流行的消息中间件,大家都知道通过集群能够增大它的吞吐量,那么针对单个队列,集群能增大他的吞吐量吗?如果不能,我们要怎么做呢?

答案是集群并不能的增加单个队列的吞吐量,这是因为RabbitMQ的普通集群只是共享元数据信息,达到将整个集群规模的队列扩大以增加吞吐量的目的。普通集群甚至不能保证消息数据的高可用,任意一个broker宕机,都会导致这个broker上的队列不可用。

而镜像队列也仅仅只是保证了实现镜像复制的队列的高可用。消费者并不能并发消费复制出来的队列。

那么RabbitMQ是否也能提高类似Kafka的topic分区的机制,来加大单个主题队列的吞吐量呢?

通过使用 RabbitMQ Sharding 插件、Consistent-hash Sharding Exchange 来更加灵活地动态均衡队列压力,可以更从容地达到百万并发的性能。

这里我重点介绍下,RabbitMQ Sharding 插件,有兴趣的伙伴可以自己研究下Consistent-hash Sharding Exchange,两者的基本思路一致,都是根据Routeing Key的hash值将消息分发到分片队列中。

原理介绍

​​官网:https://github.com/rabbitmq/rabbitmq-sharding​​

rabbitmq sharding插件为您自动对队列进行分区,也就是说,一旦您将一个exchange 定义为sharded,那么在每个集群节点上自动创建支持队列,并在它们之间共享消息。rabbitmq sharding向使用者显示了一个队列,但它可能是后台运行在它后面的多个队列。rabbitmq sharding插件为您提供了一个集中的位置,通过向集群中的其他节点添加队列,您可以将消息以及跨多个节点的负载平衡发送到该位置。

rabbitmq-sharding插件使用_sharding_02

 

插件安装

查看当前插件
find / -name rabbitmq-plugins
cd /usr/sbin/
./rabbitmq-plugins list

rabbitmq-sharding插件使用_吞吐量_03

 

如果有没有对应的插件,自己下载后复制插件到指定的目录
手动下载安装,​​​https://www.rabbitmq.com/community-plugins/​​

rabbitmq-sharding插件使用_吞吐量_04

 

RabbitMQ的有些插件没有集成在初始的安装中,它们需要额外安装,这些文件的后缀为.ez,安装时需要将.ez文件拷贝到安装的插件目录。以下是不同系统中默认安装的插件目录路径:

rabbitmq-sharding插件使用_插件安装_05

 

插件安装完成后可以通过命令sudo rabbitmq-plugins list查看已有插件列表,eg:

rabbitmq-sharding插件使用_rabbitmq_06

 

指定安装插件命令:
./rabbitmq-plugins enable rabbitmq_sharding

 

说明安装成功,重启生效:
service rabbitmq-server restart

队列分片

1.配置策略

find / -name rabbitmqctl

cd /usr/sbin/

./rabbitmqctl set_policy history-shard "^history" \
'{"shards-per-node": 2, "routing-key": "1234"}' \
--apply-to exchanges

说明:
通过rabbitmqctl set_policy设置新增策略,策略名称为history-shard,
匹配规则为^history,shards-per-node表示每个节点上分片出2个分片队列,
routing-key为1234,应用到所有交换器exchanges上去匹配执行。

2、在管理界面,手动创建名称为history的交换器,交换器类型选择x-consistent-hash

rabbitmq-sharding插件使用_rabbitmq_07

 

rabbitmq-sharding插件使用_吞吐量_08

 

如上图现实,说明我们的消息分片已经成功。

但是,我们怎么去消费history交换器下的消息呢?

代码实战

1、引入rabbitmq的依赖包

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、生产者

public class ProductTest {
private static final String EXCHANGE_NAME = "history";

public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setHost("wanli");
factory.setPort(5672);

Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();

AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder();
for (int i = 0; i < 10000; i++) {
//第一个参数是交换器名称,第二个参数是routeing key ,注意这里的routeing key一定要是随机的,不然消息都会发送到同一个队列中
channel.basicPublish(EXCHANGE_NAME, String.valueOf(i), bldr.build(), "hello".getBytes("UTF-8"));
}
channel.waitForConfirmsOrDie(10000);

TimeUnit.SECONDS.sleep(5);

channel.close();
connection.close();
}
}

消费者向名称为history的交换器,发送10000条routeing key为0~10000,内容为hello的消息

rabbitmq-sharding插件使用_rabbitmq_09

 

3、消费者

public class ConsumerTest {
private static final String QUEUE_NAME = "history";

public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setHost("wanli");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

//每次抓取的消息数量
channel.basicQos(32);
for (int i = 0; i < 10; i++) {
Consumer consumer = new MyConsumer(channel);
channel.basicConsume(QUEUE_NAME,consumer);
}
TimeUnit.SECONDS.sleep(120);

channel.close();
connection.close();

}

private static class MyConsumer extends DefaultConsumer{
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到的消息为:" + new String(body));
super.getChannel().basicAck(envelope.getDeliveryTag(),false);
}
}
}

10个消费者去消费名称为history的队列,消费者均匀分布在每个队列上(每个队列上绑定了5个),每次抓取32个消息去消费。

rabbitmq-sharding插件使用_插件安装_10

 

总结

通过rabbitmq-sharding插件,将原本单个队列history的分成了2个队列,但是对消费者来说,还是消费的原来的history队列,而不用管底层实际对应的物理队列。
极大的提高了单个队列在大并发下的吞吐量。

感谢每一次关注和点赞

rabbitmq-sharding插件使用_插件安装_11

 

举报

相关推荐

0 条评论