0
点赞
收藏
分享

微信扫一扫

SpringBoot 整合ActiveMQ_企业实战


SpringBoot 整合ActiveMQ_企业实战_字符串

文章目录

  • ​​1. 新建Springboot工程​​
  • ​​2. 引入maven依赖​​
  • ​​3. ActiveMq配置类​​
  • ​​4. MQ生产者​​
  • ​​5. MQ 点对点消费者​​
  • ​​6. MQ 发布点阅消费者A​​
  • ​​7. MQ 发布点阅消费者B​​
  • ​​8. 统一测试类​​

1. 新建Springboot工程

SpringBoot 整合ActiveMQ_企业实战_字符串_02


SpringBoot 整合ActiveMQ_企业实战_字符串_03

SpringBoot 整合ActiveMQ_企业实战_java_04


SpringBoot 整合ActiveMQ_企业实战_ActiveMQ_05

2. 引入maven依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.gblfy</groupId>
<artifactId>springboot-activemq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>springboot-activemq</name>
<description>Spring Boot集成Activemq</description>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.7.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
<!--全局编码设置-->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<!--JDK版本-->
<java.version>1.8</java.version>
<!--全局版本-->
<fastjson.version>1.2.58</fastjson.version>
<lombok.version>1.18.8</lombok.version>
</properties>

<dependencies>

<!--SpringMVC 启动器-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- Activemq Start-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
<!-- Activemq End-->

<!--数据处理-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>

<!--lombok插件-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>

<!--单元测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<!--maven编译插件-->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>

3. ActiveMq配置类

package com.gblfy.activemq.config;


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;

import javax.jms.ConnectionFactory;

/**
* @author gblfy
* @ClassNme ActiveMqConfig
* @Description Mq配置类
* @Date 2019/9/3 18:05
* @version1.0
*/
@Configuration
public class ActiveMqConfig {


// queue模式的ListenerContainer
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}


// topic模式的ListenerContainer
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(true);
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}

}

4. MQ生产者

package com.gblfy.activemq.producer;

import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

import java.io.Serializable;
import java.util.List;
import java.util.Map;

/**
* @author gblfy
* @ClassNme MqProducer
* @Description Mq 生产者封装公共类
* @Date 2019/9/3 18:05
* @version1.0
*/
@Service
public class MqProducer {


@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;


/**
* 发送字符串消息队列
*
* @param queueName 队列名称
* @param message 字符串
*/
public void sendStringQueue(String queueName, String message) {
this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), message);
}


/**
* 发送字符串集合消息队列
*
* @param queueName 队列名称
* @param list 字符串集合
*/
public void sendStringListQueue(String queueName, List<String> list) {
this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), list);
}

/**
* 发送Map消息队列
*
* @param queueName
* @param headers
*/
public void sendMapQueue(String queueName, Map<String, Object> headers) {
this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), headers);
}

/**
* 发送对象消息队列
*
* @param queueName 队列名称
* @param obj 对象
*/
public void sendObjQueue(String queueName, Serializable obj) {
this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), obj);
}


/**
* 发送对象集合消息队列
*
* @param queueName 队列名称
* @param objList 对象集合
*/
public void sendObjListQueue(String queueName, List<Serializable> objList) {
this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), objList);
}


/**
* 发送字符串消息主题
*
* @param topicName 主题名称
* @param message 字符串
*/
public void sendStringTopic(String topicName, String message) {
this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), message);
}


/**
* 发送字符串集合消息主题
*
* @param topicName 主题名称
* @param list 字符串集合
*/
public void sendStringListTopic(String topicName, List<String> list) {
this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), list);
}
/**
* 发送Map消息主题
*
* @param queueName
* @param headers
*/
public void sendMapTopic(String queueName, Map<String, Object> headers) {
this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), headers);
}

/**
* 发送对象消息主题
*
* @param topicName 主题名称
* @param obj 对象
*/
public void sendObjTopic(String topicName, Serializable obj) {
this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), obj);
}


/**
* 发送对象集合消息主题
*
* @param topicName 主题名称
* @param objList 对象集合
*/
public void sendObjListTopic(String topicName, List<Serializable> objList) {
this.jmsMessagingTemplate.convertAndSend(new ActiveMQTopic(topicName), objList);
}

}

5. MQ 点对点消费者

package com.gblfy.activemq.consumer;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.ObjectMessage;
import java.util.List;
import java.util.Map;

/**
* @author gblfy
* @ClassNme QueueConsumer
* @Description Mq点对点消费者
* @Date 2019/9/3 18:05
* @version1.0
*/
@Component
public class QueueConsumer {

@JmsListener(destination = "stringQueue", containerFactory = "jmsListenerContainerQueue")
public void receiveStringQueue(String msg) {
System.out.println("接收到消息...." + msg);
}


@JmsListener(destination = "stringListQueue", containerFactory = "jmsListenerContainerQueue")
public void receiveStringListQueue(List<String> list) {
System.out.println("接收到集合队列消息...." + list);
}

@JmsListener(destination = "mapQueue", containerFactory = "jmsListenerContainerQueue")
public void receiveMapQueue(Map<String, Object> headers) {
System.out.println("接收到集合队列消息...." + headers);
}


@JmsListener(destination = "objQueue", containerFactory = "jmsListenerContainerQueue")
public void receiveObjQueue(ObjectMessage objectMessage) throws Exception {
System.out.println("接收到对象队列消息...." + objectMessage.getObject());
}


@JmsListener(destination = "objListQueue", containerFactory = "jmsListenerContainerQueue")
public void receiveObjListQueue(ObjectMessage objectMessage) throws Exception {
System.out.println("接收到的对象队列消息..." + objectMessage.getObject());
}
}

6. MQ 发布点阅消费者A

package com.gblfy.activemq.consumer;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.ObjectMessage;
import java.util.List;
/**
* @author gblfy
* @ClassNme ATopicConsumer
* @Description Mq订阅者A
* @Date 2019/9/3 18:05
* @version1.0
*/
@Component
public class ATopicConsumer {

@JmsListener(destination = "stringTopic", containerFactory = "jmsListenerContainerTopic")
public void receiveStringTopic(String msg) {
System.out.println("ATopicConsumer接收到消息...." + msg);
}


@JmsListener(destination = "stringListTopic", containerFactory = "jmsListenerContainerTopic")
public void receiveStringListTopic(List<String> list) {
System.out.println("ATopicConsumer接收到集合主题消息...." + list);
}

@JmsListener(destination = "mapTopic", containerFactory = "jmsListenerContainerTopic")
public void receiveMapTopic(List<String> list) {
System.out.println("ATopicConsumer接收到Map主题消息...." + list);
}


@JmsListener(destination = "objTopic", containerFactory = "jmsListenerContainerTopic")
public void receiveObjTopic(ObjectMessage objectMessage) throws Exception {
System.out.println("ATopicConsumer接收到对象主题消息...." + objectMessage.getObject());
}


@JmsListener(destination = "objListTopic", containerFactory = "jmsListenerContainerTopic")
public void receiveObjListTopic(ObjectMessage objectMessage) throws Exception {
System.out.println("ATopicConsumer接收到的对象集合主题消息..." + objectMessage.getObject());
}

}

7. MQ 发布点阅消费者B

package com.gblfy.activemq.consumer;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.ObjectMessage;
import java.util.List;

/**
* @author gblfy
* @ClassNme BTopicConsumer
* @Description Mq订阅者B
* @Date 2019/9/3 18:05
* @version1.0
*/
@Component
public class BTopicConsumer {

@JmsListener(destination = "stringTopic", containerFactory = "jmsListenerContainerTopic")
public void receiveStringTopic(String msg) {
System.out.println("BTopicConsumer接收到消息...." + msg);
}


@JmsListener(destination = "stringListTopic", containerFactory = "jmsListenerContainerTopic")
public void receiveStringListTopic(List<String> list) {
System.out.println("BTopicConsumer接收到集合主题消息...." + list);
}


@JmsListener(destination = "objTopic", containerFactory = "jmsListenerContainerTopic")
public void receiveObjTopic(ObjectMessage objectMessage) throws Exception {
System.out.println("BTopicConsumer接收到对象主题消息...." + objectMessage.getObject());
}


@JmsListener(destination = "objListTopic", containerFactory = "jmsListenerContainerTopic")
public void receiveObjListTopic(ObjectMessage objectMessage) throws Exception {
System.out.println("BTopicConsumer接收到的对象集合主题消息..." + objectMessage.getObject());
}

}

8. 统一测试类

package com.gblfy.activemq.entity;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

/**
* @author gblfy
* @ClassNme User
* @Description Mq发送接送对象模拟
* @Date 2019/9/3 18:05
* @version1.0
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {

private String id;
private String name;
private Integer age;
}

package com.gblfy.activemq;

import com.gblfy.activemq.producer.MqProducer;
import com.gblfy.activemq.entity.User;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* @author gblfy
* @ClassNme ActivemqdemoApplicationTests
* @Description Mq 测试公共类
* @Date 2019/9/3 18:05
* @version1.0
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class ActivemqdemoApplicationTests {

@Autowired
private MqProducer mqProducer;


/**************************MQ点对点测试场景***************************/

/**
* 点对点场景 01
* 消息类型 String
*/
@Test
public void testStringQueue() {

for (int i = 1; i <= 100; i++) {
System.out.println("第" + i + "次发送字符串队列消息");
mqProducer.sendStringQueue("stringQueue", "消息:" + i);
}
}

/**
* 点对点场景 02
* 消息类型 StringList
*/
@Test
public void testStringListQueue() {

List<String> idList = new ArrayList<>();
idList.add("id1");
idList.add("id2");
idList.add("id3");

System.out.println("正在发送集合队列消息ing......");
mqProducer.sendStringListQueue("stringListQueue", idList);
}

/**
* 点对点场景 03
* 消息类型 Map<String,Object></String,Object>
*/
@Test
public void testMapQueue() {
Map<String, Object> map = new HashMap<>();
map.put("1", "sxh");
map.put("2", "ljy");
map.put("3", "sh");
map.put("4", "qjj");
map.put("5", "ygf");
map.put("6", "lxj");
map.put("7", "gblfy");
System.out.println("正在发送Map队列消息ing......");
mqProducer.sendMapQueue("mapQueue", map);
}

/**
* 点对点场景 04
* 消息类型 Obj
*/
@Test
public void testObjQueue() {

System.out.println("正在发送对象队列消息......");
mqProducer.sendObjQueue("objQueue", new User("1", "小明", 20));
}

/**
* 点对点场景 05
* 消息类型 ObjList
*/
@Test
public void testObjListQueue() {

System.out.println("正在发送对象集合队列消息......");

List<Serializable> userList = new ArrayList<>();
userList.add(new User("1", "雨昕", 01));
userList.add(new User("2", "刘英", 26));
userList.add(new User("3", "振振", 12));

mqProducer.sendObjListQueue("objListQueue", userList);
}

/**************************MQ发布订阅测试场景***************************/

/**
* 发布订阅场景 01
* 消息类型 String
*/
@Test
public void testStringTopic() {

for (int i = 1; i <= 100; i++) {
System.out.println("第" + i + "次发送字符串主题消息");
mqProducer.sendStringTopic("stringTopic", "消息:" + i);
}
}

/**
* 发布订阅场景 02
* 消息类型 StringList
*/
@Test
public void testStringListTopic() {

List<String> idList = new ArrayList<>();
idList.add("id1");
idList.add("id2");
idList.add("id3");

System.out.println("正在发送集合主题消息ing......");
mqProducer.sendStringListTopic("stringListTopic", idList);
}

/**
* 发布订阅场景 03
* 消息类型 Map
*/
@Test
public void testMapTopic() {
Map<String, Object> map = new HashMap<>();
map.put("1", "sxh");
map.put("2", "ljy");
map.put("3", "sh");
map.put("4", "qjj");
map.put("5", "ygf");
map.put("6", "lxj");
map.put("7", "gblfy");
System.out.println("正在发送Map队列消息ing......");
mqProducer.sendMapTopic("mapQueue", map);
}

/**
* 发布订阅场景 04
* 消息类型 Obj
*/
@Test
public void testObjTopic() {

System.out.println("正在发送对象主题消息......");
mqProducer.sendObjTopic("objTopic", new User("1", "小明", 20));
}

/**
* 发布订阅场景 05
* 消息类型 ObjList
*/
@Test
public void testObjListTopic() {

System.out.println("正在发送对象集合主题消息......");

List<Serializable> userList = new ArrayList<>();
userList.add(new User("1", "雨昕", 01));
userList.add(new User("2", "刘英", 26));
userList.add(new User("3", "振振", 12));

mqProducer.sendObjListTopic("objListTopic", userList);
}
}

GitLab地址:​​https://gitlab.com/gb-heima/springboot-activemq​​ Git下载方式:

git clone git@gitlab.com:gb-heima/springboot-activemq.git

zip包下载方式:
​​​https://gitlab.com/gb-heima/springboot-activemq/-/archive/master/springboot-activemq-master.zip​​


举报

相关推荐

0 条评论