Springboot整合ActiveMQ
安装ActiveMQ
官网下载地址,以apache-activemq-5.16.4-bin.zip为例。将压缩包解压,进入\bin\win64目录,双击activemq.bat运行启动。
打开浏览器,访问localhost:8161,输入账户名密码(admin/admin),出现ActiveMQ的管理界面。
添加ActiveMQ依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
ActiveMQ配置
# ActiveMQ配置
# ActiveMQ通信地址
spring.activemq.broker-url=tcp://localhost:61616
# 不以内存方式启动,额外启动ActiveMQ服务
spring.activemq.in-memory=false
# 信任所有包
spring.activemq.packages.trust-all=true
# 账户名、密码
spring.activemq.user=admin
spring.activemq.password=admin
消息发送与接收
Queue(点对点)
1. 定义消息模式(Queue)对象
@Component
public class ActiveMQConfig{
@Bean
Queue queue(){
return new ActiveMQQueue("my-queue");
}
}
2. Queue消息发送端
@RestController
public class JmsController {
@Autowired
JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
Queue queue;
/**
* 发送queue消息
* @param msg 消息
*/
@RequestMapping("/queue/sendMsg")
public void send(String msg) {
/**
* 消息发送 jmsMessagingTemplate
* -destination:目标对象
* -payload:消息数据
*/
jmsMessagingTemplate.convertAndSend(queue, msg);
}
}
3. Queue消息接收端
@Component
public class QueueListener {
@JmsListener(destination = "my-queue")
public void consumer1(String message){
System.out.println("consumer1:message = " + message);
}
@JmsListener(destination = "my-queue")
public void consumer2(String message){
System.out.println("consumer2:message = " + message);
}
}
这里定义了两个消费者。Queue模式是点对点发送,多个消费者会轮询接收消息
consumer1:message = hello Queue!
Topic(发布-订阅)
1. 定义消息模式(Topic)对象
@Component
public class ActiveMQConfig{
@Bean
Topic topic(){
return new ActiveMQTopic("my-topic");
}
}
2. Topic消息发送端
@RestController
public class JmsController {
@Autowired
JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
Topic topic;
/**
* 发送topic消息
* @param msg 消息
*/
@RequestMapping("/topic/sendMsg")
public void sendTopic(String msg) {
/**
* 消息发送 jmsMessagingTemplate
* -destination:目标对象
* -payload:消息数据
*/
jmsMessagingTemplate.convertAndSend(topic, msg);
}
}
3. Topic消息接收端,需要自定义topic消息监听器
问题
Springboot整合ActiveMQ,默认只能监听一种模式消息进行处理,默认只能处理Queue消息
# 开启发布订阅模式 ,开启后只能处理Topic消息
spring.jms.pub-sub-domain=true
自定义Topic消息监听器,实现同时处理Queue和Topic消息
自定义Topic消息监听器
@Component
public class ActiveMQConfig {
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
/**
* 消息工厂配置
*/
@Bean
public ActiveMQConnectionFactory activeMqConnectionFactory() {
ActiveMQConnectionFactory activeMqConnectionFactory =
new ActiveMQConnectionFactory(brokerUrl);
return activeMqConnectionFactory;
}
/**
* Queue 消息监听器
* @return
*/
@Bean(name = "jmsTemplate")
public JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMqConnectionFactory) {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(activeMqConnectionFactory);
//进行持久化配置 1表示非持久化,2表示持久化
jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
//客户端签收模式
jmsTemplate.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
return jmsTemplate;
}
/**
* Topic 消息监听器
* @return
*/
@Bean(name = "jmsTopicListener")
public JmsListenerContainerFactory<?> jmsTopicListener(ActiveMQConnectionFactory activeMqConnectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(activeMqConnectionFactory);
//这里必须设置为true,false则表示是queue类型
factory.setPubSubDomain(true);
//重连间隔时间
factory.setRecoveryInterval(1000L);
return factory;
}
@Bean
Queue queue(){
return new ActiveMQQueue("my-queue");
}
@Bean
Topic topic(){
return new ActiveMQTopic("my-topic");
}
}
Topic消息接收端,需要指定topic消息监听器
@Component
public class TopicListener {
@JmsListener(destination = "my-topic", containerFactory = "jmsTopicListener")
public void consumer1(String message){
System.out.println("consumer1:message = " + message);
}
@JmsListener(destination = "my-topic", containerFactory = "jmsTopicListener")
public void consumer2(String message){
System.out.println("consumer2:message = " + message);
}
}
这里定义了两个消费者。Topic模式是发布-订阅模式,多个消费者都会接收到消息。
consumer1:message = hello Topic!
consumer2:message = hello Topic!