0
点赞
收藏
分享

微信扫一扫

Springboot RabbitMq源码解析之配置类 (一)


文章目录

  • ​​1.RabbitAutoConfiguration​​
  • ​​2.RabbitConnectionFactoryCreator​​
  • ​​3.RabbitTemplateConfiguration​​
  • ​​4.MessagingTemplateConfiguration​​
  • ​​5.RabbitAnnotationDrivenConfiguration​​
  • ​​6.EnableRabbitConfiguration​​
  • ​​7.RabbitBootstrapConfiguration​​

1.RabbitAutoConfiguration

RabbitAutoConfiguration类是springboot的自动配置类。

@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
//省略具体代码
}

  1. ​RabbitAutoConfiguration​​​通过​​EnableConfigurationProperties​​注解注入了​​RabbitProperties​​类以识别rabbit的相关配置
  2. 并且通过import注解注入了另一个rabbit的配置类​​RabbitAnnotationDrivenConfiguration​​。
  3. 另外,RabbitAutoConfiguration包含的3个内部类也都是配置类。
    RabbitConnectionFactoryCreator
    RabbitTemplateConfiguration
    MessagingTemplateConfiguration

2.RabbitConnectionFactoryCreator

基于rabbit的配置文件内容在用户没有自定义的情况下初始化了CachingConnectionFactory类作为rabbit默认的ConnectionFactory

@Configuration
@ConditionalOnMissingBean(ConnectionFactory.class)
protected static class RabbitConnectionFactoryCreator {

@Bean
public CachingConnectionFactory rabbitConnectionFactory(
RabbitProperties properties) throws Exception {
PropertyMapper map = PropertyMapper.get();
CachingConnectionFactory factory = new CachingConnectionFactory(
getRabbitConnectionFactoryBean(properties).getObject());
map.from(properties::determineAddresses).to(factory::setAddresses);
map.from(properties::isPublisherConfirms).to(factory::setPublisherConfirms);
map.from(properties::isPublisherReturns).to(factory::setPublisherReturns);
RabbitProperties.Cache.Channel channel = properties.getCache().getChannel();
map.from(channel::getSize).whenNonNull().to(factory::setChannelCacheSize);
map.from(channel::getCheckoutTimeout).whenNonNull().as(Duration::toMillis)
.to(factory::setChannelCheckoutTimeout);
RabbitProperties.Cache.Connection connection = properties.getCache()
.getConnection();
map.from(connection::getMode).whenNonNull().to(factory::setCacheMode);
map.from(connection::getSize).whenNonNull()
.to(factory::setConnectionCacheSize);
return factory;
}

private RabbitConnectionFactoryBean getRabbitConnectionFactoryBean(
RabbitProperties properties) throws Exception {
PropertyMapper map = PropertyMapper.get();
RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean();
map.from(properties::determineHost).whenNonNull().to(factory::setHost);
map.from(properties::determinePort).to(factory::setPort);
map.from(properties::determineUsername).whenNonNull()
.to(factory::setUsername);
map.from(properties::determinePassword).whenNonNull()
.to(factory::setPassword);
map.from(properties::determineVirtualHost).whenNonNull()
.to(factory::setVirtualHost);
map.from(properties::getRequestedHeartbeat).whenNonNull()
.asInt(Duration::getSeconds).to(factory::setRequestedHeartbeat);
RabbitProperties.Ssl ssl = properties.getSsl();
if (ssl.isEnabled()) {
factory.setUseSSL(true);
map.from(ssl::getAlgorithm).whenNonNull().to(factory::setSslAlgorithm);
map.from(ssl::getKeyStoreType).to(factory::setKeyStoreType);
map.from(ssl::getKeyStore).to(factory::setKeyStore);
map.from(ssl::getKeyStorePassword).to(factory::setKeyStorePassphrase);
map.from(ssl::getTrustStoreType).to(factory::setTrustStoreType);
map.from(ssl::getTrustStore).to(factory::setTrustStore);
map.from(ssl::getTrustStorePassword).to(factory::setTrustStorePassphrase);
}
map.from(properties::getConnectionTimeout).whenNonNull()
.asInt(Duration::toMillis).to(factory::setConnectionTimeout);
factory.afterPropertiesSet();
return factory;
}
}

3.RabbitTemplateConfiguration

基于​​ConnectionFactory​​​, 在用户没有自定义的场景下,生成默认的​​RabbitTemplate​​​和​​amqpAdmin​​​ 因此,需要通过import引入​​RabbitConnectionFactoryCreator​​配置类

@Configuration
@Import(RabbitConnectionFactoryCreator.class)
protected static class RabbitTemplateConfiguration {

private final ObjectProvider<MessageConverter> messageConverter;

private final RabbitProperties properties;

public RabbitTemplateConfiguration(
ObjectProvider<MessageConverter> messageConverter,
RabbitProperties properties) {
this.messageConverter = messageConverter;
this.properties = properties;
}

@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnMissingBean(RabbitTemplate.class)
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
PropertyMapper map = PropertyMapper.get();
RabbitTemplate template = new RabbitTemplate(connectionFactory);
MessageConverter messageConverter = this.messageConverter.getIfUnique();
if (messageConverter != null) {
template.setMessageConverter(messageConverter);
}
template.setMandatory(determineMandatoryFlag());
RabbitProperties.Template properties = this.properties.getTemplate();
if (properties.getRetry().isEnabled()) {
template.setRetryTemplate(createRetryTemplate(properties.getRetry()));
}
map.from(properties::getReceiveTimeout).whenNonNull().as(Duration::toMillis)
.to(template::setReceiveTimeout);
map.from(properties::getReplyTimeout).whenNonNull().as(Duration::toMillis)
.to(template::setReplyTimeout);
map.from(properties::getExchange).to(template::setExchange);
map.from(properties::getRoutingKey).to(template::setRoutingKey);
return template;
}

private boolean determineMandatoryFlag() {
Boolean mandatory = this.properties.getTemplate().getMandatory();
return (mandatory != null ? mandatory : this.properties.isPublisherReturns());
}

private RetryTemplate createRetryTemplate(RabbitProperties.Retry properties) {
PropertyMapper map = PropertyMapper.get();
RetryTemplate template = new RetryTemplate();
SimpleRetryPolicy policy = new SimpleRetryPolicy();
map.from(properties::getMaxAttempts).to(policy::setMaxAttempts);
template.setRetryPolicy(policy);
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
map.from(properties::getInitialInterval).whenNonNull().as(Duration::toMillis)
.to(backOffPolicy::setInitialInterval);
map.from(properties::getMultiplier).to(backOffPolicy::setMultiplier);
map.from(properties::getMaxInterval).whenNonNull().as(Duration::toMillis)
.to(backOffPolicy::setMaxInterval);
template.setBackOffPolicy(backOffPolicy);
return template;
}

@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
@ConditionalOnMissingBean(AmqpAdmin.class)
public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
}

4.MessagingTemplateConfiguration

基于IOC容器中​​RabbitTemplate​​​ 生成​​RabbitMessagingTemplate​​​,因此需要通过import引入​​RabbitTemplateConfiguration​​配置类

@Configuration
@ConditionalOnClass(RabbitMessagingTemplate.class)
@ConditionalOnMissingBean(RabbitMessagingTemplate.class)
@Import(RabbitTemplateConfiguration.class)
protected static class MessagingTemplateConfiguration {

@Bean
@ConditionalOnSingleCandidate(RabbitTemplate.class)
public RabbitMessagingTemplate rabbitMessagingTemplate(
RabbitTemplate rabbitTemplate) {
return new RabbitMessagingTemplate(rabbitTemplate);
}

}

5.RabbitAnnotationDrivenConfiguration

该配置的主要目的是注入工厂​​RabbitListenerContainerFactory​

  1. 在用户没有自定义的情况下注入默认的配置类,然后基于配置类注入​​RabbitListenerContainerFactory​​消费者容器
  2. 分成两类,分别是simple类型和direct类型

simple类型

@Bean
@ConditionalOnMissingBean
public SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFactoryConfigurer() {
SimpleRabbitListenerContainerFactoryConfigurer configurer = new SimpleRabbitListenerContainerFactoryConfigurer();
configurer.setMessageConverter(this.messageConverter.getIfUnique());
configurer.setMessageRecoverer(this.messageRecoverer.getIfUnique());
configurer.setRabbitProperties(this.properties);
return configurer;
}

@Bean(name = "rabbitListenerContainerFactory")
@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple", matchIfMissing = true)
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}

6.EnableRabbitConfiguration

​RabbitAnnotationDrivenConfiguration​​​还有一个内部配置类​​EnableRabbitConfiguration​

​EnableRabbitConfiguration​​​配置类中没有任何的属性和方法,该配置类的主要目的就是为了添加​​EnableRabbit​​​注解,以启用​​RabbitListener​​注解的解析和使用。

@Configuration
@EnableRabbit
@ConditionalOnMissingBean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
protected static class EnableRabbitConfiguration {

}

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(RabbitBootstrapConfiguration.class)
public @interface EnableRabbit {
}

7.RabbitBootstrapConfiguration

​RabbitBootstrapConfiguration​​​配置类主要是注入后置处理器​​RabbitListenerAnnotationBeanPostProcessor ​​​用于解析​​RabbitListener​​​注解,同时注入一个默认的​​RabbitListenerEndpointRegistry​​。

@Configuration
public class RabbitBootstrapConfiguration {

@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
return new RabbitListenerAnnotationBeanPostProcessor();
}

@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
return new RabbitListenerEndpointRegistry();
}
}


举报

相关推荐

0 条评论