文章目录
- 1.RabbitAutoConfiguration
- 2.RabbitConnectionFactoryCreator
- 3.RabbitTemplateConfiguration
- 4.MessagingTemplateConfiguration
- 5.RabbitAnnotationDrivenConfiguration
- 6.EnableRabbitConfiguration
- 7.RabbitBootstrapConfiguration
1.RabbitAutoConfiguration
RabbitAutoConfiguration类是springboot的自动配置类。
@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
//省略具体代码
}
-
RabbitAutoConfiguration
通过EnableConfigurationProperties
注解注入了RabbitProperties
类以识别rabbit的相关配置 - 并且通过import注解注入了另一个rabbit的配置类
RabbitAnnotationDrivenConfiguration
。 - 另外,RabbitAutoConfiguration包含的3个内部类也都是配置类。
RabbitConnectionFactoryCreator
RabbitTemplateConfiguration
MessagingTemplateConfiguration
2.RabbitConnectionFactoryCreator
基于rabbit的配置文件内容在用户没有自定义的情况下初始化了CachingConnectionFactory类作为rabbit默认的ConnectionFactory
@Configuration
@ConditionalOnMissingBean(ConnectionFactory.class)
protected static class RabbitConnectionFactoryCreator {
@Bean
public CachingConnectionFactory rabbitConnectionFactory(
RabbitProperties properties) throws Exception {
PropertyMapper map = PropertyMapper.get();
CachingConnectionFactory factory = new CachingConnectionFactory(
getRabbitConnectionFactoryBean(properties).getObject());
map.from(properties::determineAddresses).to(factory::setAddresses);
map.from(properties::isPublisherConfirms).to(factory::setPublisherConfirms);
map.from(properties::isPublisherReturns).to(factory::setPublisherReturns);
RabbitProperties.Cache.Channel channel = properties.getCache().getChannel();
map.from(channel::getSize).whenNonNull().to(factory::setChannelCacheSize);
map.from(channel::getCheckoutTimeout).whenNonNull().as(Duration::toMillis)
.to(factory::setChannelCheckoutTimeout);
RabbitProperties.Cache.Connection connection = properties.getCache()
.getConnection();
map.from(connection::getMode).whenNonNull().to(factory::setCacheMode);
map.from(connection::getSize).whenNonNull()
.to(factory::setConnectionCacheSize);
return factory;
}
private RabbitConnectionFactoryBean getRabbitConnectionFactoryBean(
RabbitProperties properties) throws Exception {
PropertyMapper map = PropertyMapper.get();
RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean();
map.from(properties::determineHost).whenNonNull().to(factory::setHost);
map.from(properties::determinePort).to(factory::setPort);
map.from(properties::determineUsername).whenNonNull()
.to(factory::setUsername);
map.from(properties::determinePassword).whenNonNull()
.to(factory::setPassword);
map.from(properties::determineVirtualHost).whenNonNull()
.to(factory::setVirtualHost);
map.from(properties::getRequestedHeartbeat).whenNonNull()
.asInt(Duration::getSeconds).to(factory::setRequestedHeartbeat);
RabbitProperties.Ssl ssl = properties.getSsl();
if (ssl.isEnabled()) {
factory.setUseSSL(true);
map.from(ssl::getAlgorithm).whenNonNull().to(factory::setSslAlgorithm);
map.from(ssl::getKeyStoreType).to(factory::setKeyStoreType);
map.from(ssl::getKeyStore).to(factory::setKeyStore);
map.from(ssl::getKeyStorePassword).to(factory::setKeyStorePassphrase);
map.from(ssl::getTrustStoreType).to(factory::setTrustStoreType);
map.from(ssl::getTrustStore).to(factory::setTrustStore);
map.from(ssl::getTrustStorePassword).to(factory::setTrustStorePassphrase);
}
map.from(properties::getConnectionTimeout).whenNonNull()
.asInt(Duration::toMillis).to(factory::setConnectionTimeout);
factory.afterPropertiesSet();
return factory;
}
}
3.RabbitTemplateConfiguration
基于ConnectionFactory
, 在用户没有自定义的场景下,生成默认的RabbitTemplate
和amqpAdmin
因此,需要通过import引入RabbitConnectionFactoryCreator
配置类
@Configuration
@Import(RabbitConnectionFactoryCreator.class)
protected static class RabbitTemplateConfiguration {
private final ObjectProvider<MessageConverter> messageConverter;
private final RabbitProperties properties;
public RabbitTemplateConfiguration(
ObjectProvider<MessageConverter> messageConverter,
RabbitProperties properties) {
this.messageConverter = messageConverter;
this.properties = properties;
}
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnMissingBean(RabbitTemplate.class)
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
PropertyMapper map = PropertyMapper.get();
RabbitTemplate template = new RabbitTemplate(connectionFactory);
MessageConverter messageConverter = this.messageConverter.getIfUnique();
if (messageConverter != null) {
template.setMessageConverter(messageConverter);
}
template.setMandatory(determineMandatoryFlag());
RabbitProperties.Template properties = this.properties.getTemplate();
if (properties.getRetry().isEnabled()) {
template.setRetryTemplate(createRetryTemplate(properties.getRetry()));
}
map.from(properties::getReceiveTimeout).whenNonNull().as(Duration::toMillis)
.to(template::setReceiveTimeout);
map.from(properties::getReplyTimeout).whenNonNull().as(Duration::toMillis)
.to(template::setReplyTimeout);
map.from(properties::getExchange).to(template::setExchange);
map.from(properties::getRoutingKey).to(template::setRoutingKey);
return template;
}
private boolean determineMandatoryFlag() {
Boolean mandatory = this.properties.getTemplate().getMandatory();
return (mandatory != null ? mandatory : this.properties.isPublisherReturns());
}
private RetryTemplate createRetryTemplate(RabbitProperties.Retry properties) {
PropertyMapper map = PropertyMapper.get();
RetryTemplate template = new RetryTemplate();
SimpleRetryPolicy policy = new SimpleRetryPolicy();
map.from(properties::getMaxAttempts).to(policy::setMaxAttempts);
template.setRetryPolicy(policy);
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
map.from(properties::getInitialInterval).whenNonNull().as(Duration::toMillis)
.to(backOffPolicy::setInitialInterval);
map.from(properties::getMultiplier).to(backOffPolicy::setMultiplier);
map.from(properties::getMaxInterval).whenNonNull().as(Duration::toMillis)
.to(backOffPolicy::setMaxInterval);
template.setBackOffPolicy(backOffPolicy);
return template;
}
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
@ConditionalOnMissingBean(AmqpAdmin.class)
public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
}
4.MessagingTemplateConfiguration
基于IOC容器中RabbitTemplate
生成RabbitMessagingTemplate
,因此需要通过import引入RabbitTemplateConfiguration
配置类
@Configuration
@ConditionalOnClass(RabbitMessagingTemplate.class)
@ConditionalOnMissingBean(RabbitMessagingTemplate.class)
@Import(RabbitTemplateConfiguration.class)
protected static class MessagingTemplateConfiguration {
@Bean
@ConditionalOnSingleCandidate(RabbitTemplate.class)
public RabbitMessagingTemplate rabbitMessagingTemplate(
RabbitTemplate rabbitTemplate) {
return new RabbitMessagingTemplate(rabbitTemplate);
}
}
5.RabbitAnnotationDrivenConfiguration
该配置的主要目的是注入工厂RabbitListenerContainerFactory
- 在用户没有自定义的情况下注入默认的配置类,然后基于配置类注入
RabbitListenerContainerFactory
消费者容器 - 分成两类,分别是simple类型和direct类型
simple类型
@Bean
@ConditionalOnMissingBean
public SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFactoryConfigurer() {
SimpleRabbitListenerContainerFactoryConfigurer configurer = new SimpleRabbitListenerContainerFactoryConfigurer();
configurer.setMessageConverter(this.messageConverter.getIfUnique());
configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique());
configurer.setRabbitProperties(this.properties);
return configurer;
}
@Bean(name = "rabbitListenerContainerFactory")
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple", matchIfMissing = true)
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
6.EnableRabbitConfiguration
RabbitAnnotationDrivenConfiguration
还有一个内部配置类EnableRabbitConfiguration
EnableRabbitConfiguration
配置类中没有任何的属性和方法,该配置类的主要目的就是为了添加EnableRabbit
注解,以启用RabbitListener
注解的解析和使用。
@Configuration
@EnableRabbit
@ConditionalOnMissingBean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
protected static class EnableRabbitConfiguration {
}
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(RabbitBootstrapConfiguration.class)
public @interface EnableRabbit {
}
7.RabbitBootstrapConfiguration
RabbitBootstrapConfiguration
配置类主要是注入后置处理器RabbitListenerAnnotationBeanPostProcessor
用于解析RabbitListener
注解,同时注入一个默认的RabbitListenerEndpointRegistry
。
@Configuration
public class RabbitBootstrapConfiguration {
@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
return new RabbitListenerAnnotationBeanPostProcessor();
}
@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
return new RabbitListenerEndpointRegistry();
}
}