接上一篇:(企业内部需求实战_进阶_01)SSM集成RabbitMQ 关键代码讲解、开发、测试
文章目录
- 一、RabbitMQ配置文件
- 1. RabbitMQ生产者配置文件
- 2. RabbitMQ消费者配置文件
- 3. 连接配置文件
- 二、生产者Java代码Conding
- 2.1. 生产者代码①
- 2.2. 生产者代码②
- 三、消费者Java代码Conding
- 3.1. 生产者代码①
- 3.2. 生产者代码②
- 四、项目准备
- 4.1. 启动项目
- 4.2. 清空控制台
- 五、管控台 队列绑定交换机
- 5.1. 启动 RabbitMQ
- 5.2. 管控台总览
- 5.3. 队列①绑定
- 5.4. 队列②绑定
- 六、管控台绑定后纵览
- 6.1. 在交换机菜单查看 绑定后的队列
- 6.2. 在队列①菜单中 查看 绑定后的交换机
- 6.3. 在队列②菜单中 查看 绑定后的交换机
- 七、生产者请求测试
- 7.1. 生产者①请求
- 7.2. 生产者②请求
- 7.3. 消费者①请求
- 7.4. 消费者②请求
一、RabbitMQ配置文件
1. RabbitMQ生产者配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd">
<!--生产者者配置如下:-->
<!-- 定义RabbitMQ的连接工厂 -->
<rabbit:connection-factory id="connectionFactory"
host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}"
password="${rabbitmq.password}" virtual-host="${rabbitmq.vhost}"/>
<!-- 管理消息队列 -->
<rabbit:admin connection-factory="connectionFactory"/>
<!-- 定义交换机 自动声明-->
<rabbit:direct-exchange name="ORDER-TRACE-EXCHANGE"
auto-declare="true" durable="true"/>
<!-- 定义MQ消息模板 -->
<rabbit:template id="rabbitTemplate"
connection-factory="connectionFactory" exchange="ORDER-TRACE-EXCHANGE"/>
</beans>
2. RabbitMQ消费者配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd">
<!--消费者配置如下:-->
<!-- 定义RabbitMQ的连接工厂 -->
<rabbit:connection-factory id="connectionFactory"
host="${rabbitmq.host}" port="${rabbitmq.port}" username="${rabbitmq.username}"
password="${rabbitmq.password}" virtual-host="${rabbitmq.vhost}"/>
<!-- 管理消息队列 -->
<rabbit:admin connection-factory="connectionFactory"/>
<!-- 定义一个队列或者多个队列 -->
<rabbit:queue name="FIS-TRACE-QUEUE" auto-declare="true" durable="true"/>
<rabbit:queue name="FIS-TRACE-MONITOR-QUEUE" auto-declare="true" durable="true"/>
<rabbit:queue name="ORDER-MENU-CATEGORY-QUEUE" auto-declare="true" durable="true"/>
<!-- 声明多个消费者对象 -->
<bean id="fisMQMsgHandler" class="com.gblfy.order.mqhandler.FisMQMsgHandler"/>
<bean id="fisMQMonitorMsgHandler" class="com.gblfy.order.mqhandler.FisMQMonitorMsgHandler"/>
<bean id="mQSimpleMsgHandler" class="com.gblfy.order.mqhandler.MQSimpleMsgHandler"/>
<!-- 监听队列 -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="fisMQMsgHandler" method="execute" queue-names="FIS-TRACE-QUEUE"/>
<rabbit:listener ref="fisMQMonitorMsgHandler" method="onMessage" queue-names="FIS-TRACE-MONITOR-QUEUE"/>
<rabbit:listener ref="mQSimpleMsgHandler" method="execute" queue-names="ORDER-MENU-CATEGORY-QUEUE"/>
</rabbit:listener-container>
</beans>
3. 连接配置文件
rabbitmq.host=127.0.0.1
rabbitmq.port=5672
rabbitmq.username=admin
rabbitmq.password=admin
rabbitmq.vhost=/admin
二、生产者Java代码Conding
此次案例:主要演示对MQ消息的两种不同方式。
有2个生产者和2个消费者,2个队列分别对应一个交换机,路由key和队列名称不一样,消费者处理MQ的消息的2种不同步处理方法而已!
2.1. 生产者代码①
package com.gblfy.order.controller;
import com.gblfy.order.pojo.FisCallingTrace;
import com.gblfy.order.utils.MQSendMsgUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;
@Controller
@Slf4j
public class FisSendMQControllor {
public static final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");// 日期格式
public static final DateFormat timeFormat = new SimpleDateFormat("HH:mm:ss");// 日期格式
@Autowired
private MQSendMsgUtils mqSendMsgUtils;
/**
* 发送轨迹数据 MQ异步存储轨迹
*
* @return
*/
@RequestMapping(value = "/sendMQObjMsg", method = RequestMethod.GET)
@ResponseBody
public String sendObj() throws Exception {
Date tStartDate = new Date();// 记录转发结束时间
Date tEndDate = new Date();// 记录转发结束时间
//模拟请求和响应报文
String reqXml = "my name is reqXml";
String resXml = "my name is resXml";
String uuid = UUID.randomUUID().toString();
//模拟 轨迹储存数据
FisCallingTrace mFisCallingTrace = new FisCallingTrace().builder()
.servicename("myServiceNme is A")
.servicetype("2")
.interfacetype("2")
.resstatus("1")
.resremark("1")
.reqdate(dateFormat.parse(dateFormat.format(tStartDate)))
.reqtime(timeFormat.format(tStartDate))
.resdate(dateFormat.parse(dateFormat.format(tEndDate)))
.restime(timeFormat.format(tEndDate))
.reqxml("")
.resxml("")
.build();
//定义路由routingKey
String routingKey = "trace";
//调用MQ松松消息公共方法
mqSendMsgUtils.sendMsg(mFisCallingTrace, routingKey, reqXml, resXml, uuid);
return "send sendMQObjMsg success !!!";
}
}
2.2. 生产者代码②
package com.gblfy.order.controller;
import com.gblfy.order.pojo.FisCallingTrace;
import com.gblfy.order.utils.MQSendMsgUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;
@Controller
@Slf4j
public class FisSendMQMsgControllor {
public static final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");// 日期格式
public static final DateFormat timeFormat = new SimpleDateFormat("HH:mm:ss");// 日期格式
@Autowired
private MQSendMsgUtils mqSendMsgUtils;
/**
* 发送轨迹数据 MQ异步存储轨迹
*
* @return
*/
@RequestMapping(value = "/sendMQObjMsg2", method = RequestMethod.GET)
@ResponseBody
public String sendObj() throws Exception {
Date tStartDate = new Date();// 记录转发结束时间
Date tEndDate = new Date();// 记录转发结束时间
//模拟请求和响应报文
String reqXml = "my name is reqXml";
String resXml = "my name is resXml";
String uuid = UUID.randomUUID().toString();
//模拟 轨迹储存数据
FisCallingTrace mFisCallingTrace = new FisCallingTrace().builder()
.servicename("myServiceNme is A")
.servicetype("2")
.interfacetype("2")
.resstatus("1")
.resremark("1")
.reqdate(dateFormat.parse(dateFormat.format(tStartDate)))
.reqtime(timeFormat.format(tStartDate))
.resdate(dateFormat.parse(dateFormat.format(tEndDate)))
.restime(timeFormat.format(tEndDate))
.reqxml("")
.resxml("")
.build();
//定义路由routingKey
String routingKey = "trace2";
//调用MQ松松消息公共方法
mqSendMsgUtils.sendMsg(mFisCallingTrace, routingKey, reqXml, resXml, uuid);
return "send sendMQObjMsg success !!!";
}
}
三、消费者Java代码Conding
3.1. 生产者代码①
package com.gblfy.order.mqhandler;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.gblfy.order.pojo.FisCallingTrace;
import com.gblfy.order.pojo.RequestInfo;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class FisMQMsgHandler {
/**
* 接收MQ消息,保存轨迹
*
* @param msg
*/
public void execute(String msg) {
try {
//通过 判断路由routingKey是否等于trace相同即可
//fastjson解析MQ接收的json字符串 转换成RequestInfo对象
JSONObject jsonObject = JSON.parseObject(msg);
RequestInfo requestInfo = JSON.toJavaObject(jsonObject, RequestInfo.class);
log.info("请求报文 mReqXml:" + requestInfo.getMReqXml());
log.info("响应报文 mResXml:" + requestInfo.getMResXml());
log.info("接口名称 serviceName:" + requestInfo.getServiceName());
log.info("路由routingKey:" + requestInfo.getType());
log.info("生成的 mUUID:" + requestInfo.getMUUID());
/**
* 1.从requestInfo对象中,获取fisCallingTrace轨迹对象
* 2.请求报文和响应报文需要添加进去 fisCallingTrace对象中的请求报文和响应报文默认是空字符串
* 3.将fisCallingTrace 轨技数据保存数据库
*/
FisCallingTrace fisCallingTrace = requestInfo.getFisCallingTrace();
fisCallingTrace.setTraceId(requestInfo.getMUUID());
fisCallingTrace.setReqxml(requestInfo.getMReqXml());
fisCallingTrace.setResxml(requestInfo.getMResXml());
log.info("从MQ接收消息并封装完成!!!");
log.info("开始进行插入数据库操作!!!");
//把MQ接收消息的数据进行 保存轨迹数据库操作 todo
//注入mqpper 插入数据库 todo
} catch (Exception e) {
log.info("如果对象中没有,指定的元素,一般会导致空指针异常!!!");
e.printStackTrace();
}
}
}
3.2. 生产者代码②
package com.gblfy.order.mqhandler;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class MQSimpleMsgHandler {
private static final ObjectMapper MAPPER = new ObjectMapper();
/**
* 接收MQ消息
*
* @param msg
*/
public void execute(String msg) {
try {
JsonNode jsonNode = MAPPER.readTree(msg);
String serviceName = jsonNode.get("serviceName").asText();
String routingKey = jsonNode.get("routingKey").asText();
String currentDate = jsonNode.get("currentDate").asText();
log.info("接口名称:" + serviceName);
log.info("路由routingKey:" + routingKey);
log.info("当前时间:" + currentDate);
} catch (Exception e) {
e.printStackTrace();
}
}
}
四、项目准备
4.1. 启动项目
4.2. 清空控制台
五、管控台 队列绑定交换机
5.1. 启动 RabbitMQ
双击运行
5.2. 管控台总览
5.3. 队列①绑定
5.4. 队列②绑定
六、管控台绑定后纵览
6.1. 在交换机菜单查看 绑定后的队列
6.2. 在队列①菜单中 查看 绑定后的交换机
6.3. 在队列②菜单中 查看 绑定后的交换机
七、生产者请求测试
7.1. 生产者①请求
7.2. 生产者②请求
其他和生产①一样的,知识路由key值不一样
7.3. 消费者①请求
7.4. 消费者②请求