pom.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.2</version>
</parent>
<packaging>jar</packaging>
<groupId>com.kaven</groupId>
<artifactId>springboot</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot</name>
<description>springboot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.properties
:
spring.rabbitmq.host=192.168.1.9
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtualHost=/
spring.rabbitmq.exchange=""
spring.rabbitmq.routingKey=kaven
spring.rabbitmq.queue=kaven
RabbitMQProperties
类(RabbitMQ
的参数类):
package com.kaven.springboot.rabbitmq;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "spring.rabbitmq")
@Setter
@Getter
public class RabbitMQProperties {
private String host;
private int port;
private String username;
private String password;
private String exchange;
private String queue;
private String routingKey;
private String virtualHost;
}
CustomizeMessageListener
类(自定义消息监听器):
package com.kaven.springboot.rabbitmq;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
@Component
public class CustomizeMessageListener {
public void customizeHandleMessage(byte[] msgBody) {
System.out.printf("自定义处理消息: %s\n", new String(msgBody, StandardCharsets.UTF_8));
}
}
RabbitMQConfig
类(定义RabbitMQ
组件的配置类):
package com.kaven.springboot.rabbitmq;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import javax.annotation.Resource;
@Configuration
public class RabbitMQConfig {
@Resource
private RabbitMQProperties properties;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(properties.getHost(), properties.getPort());
connectionFactory.setUsername(properties.getUsername());
connectionFactory.setPassword(properties.getPassword());
connectionFactory.setVirtualHost(properties.getVirtualHost());
return connectionFactory;
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
//必须是prototype类型
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory,
CustomizeMessageListener delegate) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
// 设置连接工厂
container.setConnectionFactory(connectionFactory);
// 指定要创建的并发消费者数量
// 默认值为1
container.setConcurrentConsumers(3);
// 设置消费者数量的上限
// 默认为concurrentConsumers
// 消费者将按需添加
// 不能小于concurrentConsumers
container.setMaxConcurrentConsumers(5);
// 设置要从中接收消息的队列名称
// 参数为String... queueName
container.setQueueNames(properties.getQueue());
// 控制容器在消息确认方面的行为
// 自动确认
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
// 告诉代理在单个请求中向每个消费者发送多少条消息
// 通常可以将其设置得相当高以提高吞吐量
container.setPrefetchCount(3);
// 创建适配器
MessageListenerAdapter adapter = new MessageListenerAdapter();
// 设置委托对象
adapter.setDelegate(delegate);
// 设置默认的消息监听方法名称
adapter.setDefaultListenerMethod("customizeHandleMessage");
// 设置MessageListener(消息监听器)
container.setMessageListener(adapter);
return container;
}
@Bean
public DirectExchange exchange() {
return new DirectExchange(properties.getExchange());
}
@Bean
public Queue queue() {
//队列持久
return new Queue(properties.getQueue(), true);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with(properties.getRoutingKey());
}
}
Producer
类(用于发布消息):
package com.kaven.springboot.rabbitmq;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
@Component
public class Producer {
private final RabbitTemplate rabbitTemplate;
@Resource
private RabbitMQProperties properties;
@Autowired
public Producer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendMsg(String msg) {
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("10000");
Message message = new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties);
rabbitTemplate.send(properties.getExchange(), properties.getRoutingKey(), message, correlationId);
}
}
ProducerController
类(用于发布消息的接口):
package com.kaven.springboot.rabbitmq;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class ProducerController {
@Resource
private Producer producer;
@GetMapping("/send")
public String send(String msg) {
producer.sendMsg(msg);
return "发送消息成功";
}
}
启动类:
package com.kaven.springboot;
import com.kaven.springboot.rabbitmq.RabbitMQProperties;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationPropertiesScan;
@SpringBootApplication
@ConfigurationPropertiesScan(basePackageClasses = {RabbitMQProperties.class})
public class SpringbootApplication {
public static void main(String[] args) {
SpringApplication application = new SpringApplication(SpringbootApplication.class);
application.run(args);
}
}
启动应用,请求接口。
控制台的输出如下图所示,说明消息监听器适配器起作用了,CustomizeMessageListener
类的customizeHandleMessage
方法被调用了。
下面这两行代码设置了消息监听器适配器需要将消息监听业务委托给哪个对象,以及需要调用该对象的哪个方法。
// 设置委托对象
adapter.setDelegate(delegate);
// 设置默认的消息监听方法名称
adapter.setDefaultListenerMethod("customizeHandleMessage");
还有其他方式来指定需要调用该对象的方法名称。
Map<String, String> methodNameMap = new HashMap<>(8);
methodNameMap.put(properties.getQueue(), "customizeHandleMessage");
// 设置队列名称或消费者标签到方法名称的映射
adapter.setQueueOrTagToMethodName(methodNameMap);
MessageListenerAdapter
本身就是一种消息监听器。
只不过它在消息监听方法中将消息监听业务委托给了指定对象的指定方法。
/**
* 将消息委托给目标监听器方法
*/
@Override
public void onMessage(Message message, Channel channel) throws Exception { // NOSONAR
// 检查委托是否是本身
// 在这种情况下,适配器将仅充当传递
Object delegateListener = getDelegate();
if (!delegateListener.equals(this)) {
if (delegateListener instanceof ChannelAwareMessageListener) {
((ChannelAwareMessageListener) delegateListener).onMessage(message, channel);
return;
}
else if (delegateListener instanceof MessageListener) {
((MessageListener) delegateListener).onMessage(message);
return;
}
}
// 获取处理方法
Object convertedMessage = extractMessage(message);
String methodName = getListenerMethodName(message, convertedMessage);
if (methodName == null) {
throw new AmqpIllegalStateException("No default listener method specified: "
+ "Either specify a non-null value for the 'defaultListenerMethod' property or "
+ "override the 'getListenerMethodName' method.");
}
// 使用适当的参数调用处理方法
Object[] listenerArguments = buildListenerArguments(convertedMessage, channel, message);
Object result = invokeListenerMethod(methodName, listenerArguments, message);
if (result != null) {
handleResult(new InvocationResult(result, null, null, null, null), message, channel);
}
else {
logger.trace("No result object given - no result to handle");
}
}
MessageListenerAdapter
获取需要调用指定对象的方法名称源码:
/**
* 确定将处理给定消息的监听器方法的名称
* 默认实现首先查询queueOrTagToMethodName映射
* 在消费者队列或消费者标签上寻找匹配
* 如果未找到匹配项,则仅返回设置的默认监听器方法(defaultListenerMethod)
* 如果未设置defaultListenerMethod,则返回“handleMessage”(defaultListenerMethod的默认值)
*/
protected String getListenerMethodName(Message originalMessage, Object extractedMessage) {
if (this.queueOrTagToMethodName.size() > 0) {
MessageProperties props = originalMessage.getMessageProperties();
String methodName = this.queueOrTagToMethodName.get(props.getConsumerQueue());
if (methodName == null) {
methodName = this.queueOrTagToMethodName.get(props.getConsumerTag());
}
if (methodName != null) {
return methodName;
}
}
return getDefaultListenerMethod();
}
protected String getDefaultListenerMethod() {
return this.defaultListenerMethod;
}
MessageListenerAdapter
调用指定对象的方法传入的参数:
Object[] listenerArguments = buildListenerArguments(convertedMessage, channel, message);
protected Object[] buildListenerArguments(Object extractedMessage, Channel channel, Message message) {
return new Object[] { extractedMessage };
}
convertedMessage
是byte[]
类型,而listenerArguments
是Object[]
类型,并且只有一个数组元素,就是convertedMessage
。
最后MessageListenerAdapter
通过反射调用指定对象的指定方法。
// 调用指定的监听器方法
protected Object invokeListenerMethod(String methodName, Object[] arguments, Message originalMessage) {
try {
MethodInvoker methodInvoker = new MethodInvoker();
methodInvoker.setTargetObject(getDelegate());
methodInvoker.setTargetMethod(methodName);
methodInvoker.setArguments(arguments);
methodInvoker.prepare();
return methodInvoker.invoke();
}
catch (InvocationTargetException ex) {
Throwable targetEx = ex.getTargetException();
if (targetEx instanceof IOException) {
throw new AmqpIOException((IOException) targetEx); // NOSONAR lost stack trace
}
else {
throw new ListenerExecutionFailedException("Listener method '" // NOSONAR lost stack trace
+ methodName + "' threw exception", targetEx, originalMessage);
}
}
catch (Exception ex) {
ArrayList<String> arrayClass = new ArrayList<>();
if (arguments != null) {
for (Object argument : arguments) {
arrayClass.add(argument.getClass().toString());
}
}
throw new ListenerExecutionFailedException("Failed to invoke target method '" + methodName
+ "' with argument type = [" + StringUtils.collectionToCommaDelimitedString(arrayClass)
+ "], value = [" + ObjectUtils.nullSafeToString(arguments) + "]", ex, originalMessage);
}
}
MessageListenerAdapter
消息监听器适配器使用与源码分析就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。