0
点赞
收藏
分享

微信扫一扫

SpringBoot整合多个RabbitMQ

一、背景

​ 最近项目中需要用到了​​RabbitMQ​​​来监听消息队列,监听的消息队列的 虚拟主机(​​virtualHost​​​)和队列名(​​queueName​​​)是不一致的,但是接收到的消息格式相同的。而且可能还存在程序不停机的情况下,动态的增加新的队列(​​queue​​​)的监听,因此就需要我们自己在程序中实现一种方法实现动态配置​​RabbitMQ​​。

二、需求

我们有​​2​​​个​​RabbitMQ​​​的配置,在程序启动的时候,动态的配置好这2个​​RabbitMQ​​,实现消息的监听。

​RabbitMQ​​的配置信息

host

port

username

password

virtualHost

queueName

47.101.130.164

5672

rabbit-multi-01

rabbit-multi-01

/rabbit-multi-01

queue-rabbit-multi-01

47.101.130.164

5672

rabbit-multi-02

rabbit-multi-02

/rabbit-multi-02

queue-rabbit-multi-02

三、实现思路

1、动态配置RabbitMQ

包括 ​​ConnectionFactory​​​,​​RabbitAdmin​​​,​​RabbitTemplate​​​,​​SimpleMessageListenerContainer​​等

2、将上方配置好的Bean注入到Spring容器中,之后可能需要用到

向​​Spring​​​容器中注入​​Bean​​的方法

DefaultListableBeanFactory#registerSingleton
DefaultListableBeanFactory#registerBeanDefinition

四、实现步骤

1、引入maven依赖

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

2、创建RabbitProperties用来表示RabbitMQ的配置信息

@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class RabbitProperties {
private String host;
private Integer port;
private String username;
private String password;
private String virtualHost;
private String queueName;
}

3、配置RabbitMQ

配置 ​​ConnectionFactory​​​,​​RabbitAdmin​​​,​​RabbitTemplate​​​,​​SimpleMessageListenerContainer​​​等,并动态注入到​​Spring​​容器中

@Configuration
@RequiredArgsConstructor
@Slf4j
public class MultiRabbitMqConfig {

private final DefaultListableBeanFactory defaultListableBeanFactory;

private static Map<String, RabbitProperties> multiMqPropertiesMap = new HashMap<String, RabbitProperties>() {
{
put("first", RabbitProperties.builder()
.host("47.101.130.164")
.port(5672)
.username("rabbit-multi-01")
.password("rabbit-multi-01")
.virtualHost("/rabbit-multi-01")
.queueName("queue-rabbit-multi-01").build());
put("second", RabbitProperties.builder()
.host("47.101.130.164")
.port(5672)
.username("rabbit-multi-02")
.password("rabbit-multi-02")
.virtualHost("/rabbit-multi-02")
.queueName("queue-rabbit-multi-02").build());
}
};

@PostConstruct
public void initRabbitmq() {
multiMqPropertiesMap.forEach((key, rabbitProperties) -> {

AbstractBeanDefinition beanDefinition = BeanDefinitionBuilder.genericBeanDefinition(CachingConnectionFactory.class)
.addPropertyValue("cacheMode", CachingConnectionFactory.CacheMode.CHANNEL)
.addPropertyValue("host", rabbitProperties.getHost())
.addPropertyValue("port", rabbitProperties.getPort())
.addPropertyValue("username", rabbitProperties.getUsername())
.addPropertyValue("password", rabbitProperties.getPassword())
.addPropertyValue("virtualHost", rabbitProperties.getVirtualHost())
.getBeanDefinition();
String connectionFactoryName = String.format("%s%s", key, "ConnectionFactory");
defaultListableBeanFactory.registerBeanDefinition(connectionFactoryName, beanDefinition);
CachingConnectionFactory connectionFactory = defaultListableBeanFactory.getBean(connectionFactoryName, CachingConnectionFactory.class);

String rabbitAdminName = String.format("%s%s", key, "RabbitAdmin");
AbstractBeanDefinition rabbitAdminBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(RabbitAdmin.class)
.addConstructorArgValue(connectionFactory)
.addPropertyValue("autoStartup", true)
.getBeanDefinition();
defaultListableBeanFactory.registerBeanDefinition(rabbitAdminName, rabbitAdminBeanDefinition);
RabbitAdmin rabbitAdmin = defaultListableBeanFactory.getBean(rabbitAdminName, RabbitAdmin.class);
log.info("rabbitAdmin:[{}]", rabbitAdmin);

RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
defaultListableBeanFactory.registerSingleton(String.format("%s%s", key, "RabbitTemplate"), rabbitTemplate);

SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
// 设置监听的队列
simpleMessageListenerContainer.setQueueNames(rabbitProperties.getQueueName());
// 指定要创建的并发使用者的数量,默认值是1,当并发高时可以增加这个的数值,同时下方max的数值也要增加
simpleMessageListenerContainer.setConcurrentConsumers(3);
// 最大的并发消费者
simpleMessageListenerContainer.setMaxConcurrentConsumers(10);
// 设置是否重回队列
simpleMessageListenerContainer.setDefaultRequeueRejected(false);
// 设置签收模式
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 设置非独占模式
simpleMessageListenerContainer.setExclusive(false);
// 设置consumer未被 ack 的消息个数
simpleMessageListenerContainer.setPrefetchCount(1);
// 设置消息监听器
simpleMessageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
try {
log.info("============> Thread:[{}] 接收到消息:[{}] ", Thread.currentThread().getName(), new String(message.getBody()));
log.info("====>connection:[{}]", channel.getConnection());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.error(e.getMessage(), e);
// 发生异常此处需要捕获到
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
});
/**
* 1、simpleMessageListenerContainer.start();
* 2、simpleMessageListenerContainer.stop();
* 3、如果后期rabbitmq 的配置是从数据库中读取,即用户在页面上添加一个配置,就动态创建这个
* 此时就需要调用 SimpleMessageListenerContainer#start 方法
*/
defaultListableBeanFactory.registerSingleton(String.format("%s%s", key, "SimpleMessageListenerContainer"), simpleMessageListenerContainer);
});
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
RabbitTemplate firstRabbitTemplate = (RabbitTemplate) defaultListableBeanFactory.getBean("firstRabbitTemplate");
firstRabbitTemplate.convertAndSend("exchange-rabbit-multi-01", "", "first queue message");
log.info("over...");
}).start();
}
}

五、实现效果

SpringBoot整合多个RabbitMQ_SpringBoot

六、代码

​​https://gitee.com/huan1993/rabbitmq/tree/master/rabbitmq-springboot-multi​​


举报

相关推荐

0 条评论