springboot中零侵入全局记录消息(rabbitmq)队列的消息发送和消费状态

阅读 42

2022-10-30


〇、问题背景

  1. ​springboot​​​ 项目,项目中引入了 ​​rabbitmq​​, 有很多发送和接收的地方。
  2. 项目的日志存储在es中,在kibana上查看。
  3. 但es存储空间消耗太快,几天就要删除一部分,导致日志只能保留几天,更久的日志就完全死无对证。
  4. 为了能够将各个mq的消息保留得久一点,使得在未来还可以排查之前的一些问题(比如一个月后)

一、思路概述

思路一

  1. 在每个发消息的地方都加上业务代码,存到一个持久化的数据库里,如mysql某表中,记上exchange,routingkey,messageBody之类的信息。发送成功就记为成功,失败就记为失败。
  2. 接收的地方同上。

点评:丢人现眼,丧尽天良

思路二

  1. 这种统一的操作,当然要使用​​AOP​​啊,存到什么地方倒无所谓了,比如还是mysql某表吧。
  2. 发送和接收的地方都使用此思路

点评: 优雅简洁零侵入,高端大气上档次

二、具体实现

2.1 不论如何,先建一个mysql表,用于一会存储数据。

当然,你要是想用mongodb什么的随意,不是本文的重点。

CREATE TABLE `mq_record` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id;主键',
`exchange` varchar(255) NOT NULL COMMENT 'exchange',
`routingkey` varchar(255) NOT NULL COMMENT 'routingKey',
`message_body` json NOT NULL COMMENT 'json body',
`message_id` varchar(64) DEFAULT NULL COMMENT 'messageId, 每条消息的唯一标识',
`send_status` tinyint(4) DEFAULT NULL COMMENT '发送状态',
`receive_status` tinyint(4) DEFAULT NULL COMMENT '消费状态',
`receive_channel` json DEFAULT NULL COMMENT 'channel信息',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE,
KEY `message_id` (`message_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1DEFAULT CHARSET=utf8mb4 COMMENT='crm系统中的全部mq消息发送和接收记录';

表如图(已经有了一些数据):

springboot中零侵入全局记录消息(rabbitmq)队列的消息发送和消费状态_spring boot

2.2 业务代码中的拦截

项目中已经有很多人写了很多的发送消息的代码,都是这样的:

@Resource(name = "rabbitTemplate")
private AmqpTemplate rabbitTemplate;

MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
Message message = new Message(RabbitMqUtil.objectStr(mqMessage).getBytes("UTF-8"), messageProperties);
rabbitTemplate.send(xxxxxExchange, xxxxxRoutingKey, message);

跳转到​​rabbitTemplate.send(xxxxxExchange, xxxxxRoutingKey, message)​​​ 方法里面去看,发现是​​org.springframework.amqp.core.AmqpTemplate​​接口里面的send方法声明,形式如下。

/**
* Send a message to a specific exchange with a specific routing key.
*
* @param exchange the name of the exchange
* @param routingKey the routing key
* @param message a message to send
* @throws AmqpException if there is a problem
*/
void send(String exchange, String routingKey, Message message) throws AmqpException;

那么,我可以通过AOP方式拦截这个send方法吗?
答案:可以
那么,怎么做?

2.2.1 拦截第三方包中的方法

1. 在项目中增加一个Aspect,拦截发送mq时的代码

预备知识:​​@Pointcut​​​、​​@Around​​​、​​@Before​​​、​​@After​​ 等注解及其用法。

package cn.xuetian.crm.manager.mq.rabbit.aop;

import cn.xuetian.crm.dao.dataobject.mq.MqRecord;
import cn.xuetian.crm.dao.mapper.mq.MqRecordMapper;
import cn.xuetian.framework.plugin.enums.msg.mq.MqSendStatusEnum;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* @author zss
* @date 2022/10/22 14:15
* @Email:451682479@qq.com aop方式记录mq消息发送
*/
@Slf4j
@Aspect
@Component
public class MqSendAspect {

@Autowired
private MqRecordMapper mqRecordMapper;

//org.springframework.amqp.core.AmqpTemplate.send(java.lang.String, java.lang.String, org.springframework.amqp.core.Message)
//void send(String exchange, String routingKey, Message message) throws AmqpException;
@Pointcut("execution(* org.springframework.amqp.core.AmqpTemplate.send(..))")
public void pointcut() {
}

@Around("pointcut()")
public Object doAround(ProceedingJoinPoint pjp) {
Object result = null;

String exchange = "";
String routingKey = "";

Message message = null;
String messageBodyJsonStr = "";
String messageId = "";

MqSendStatusEnum sendStatus = null;

try {
Object[] args = pjp.getArgs();

//待发送
sendStatus = MqSendStatusEnum.WAIT_SEND;

if (null != args && args.length == 3) {
exchange = args[0].toString();
routingKey = args[1].toString();

message = (Message) args[2];
messageId = UUID.randomUUID().toString();
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setMessageId(messageId);

//用于保存
messageBodyJsonStr = new String(message.getBody());


//将message内容赋值messageId后,再发出去
args[2] = message;
}

//正常发送
result = pjp.proceed(args);

//发送成功
sendStatus = MqSendStatusEnum.ALREADY_SEND;

} catch (Throwable throwable) {
log.info(String.format("【mq】发送异常:exchange = %s, routingKey = %s ,messageId = %s ", exchange, routingKey, messageId));
log.info(throwable.getMessage());
log.info(throwable.getStackTrace().toString());

//发送失败
sendStatus = MqSendStatusEnum.SEND_FAIL;
} finally {
//保存记录
MqRecord build = MqRecord.builder()
.exchange(exchange)
.routingkey(routingKey)
.messageId(messageId)
.messageBody(messageBodyJsonStr)
.receiveStatus(null)
.sendStatus(sendStatus)
.build();
mqRecordMapper.insert(build);
}
return result;
}
}

亮点: ​​ result = pjp.proceed(args);​​​ 这一行上面,是拦截到了发消息之前的参数。
并且可以对参数进行修改,然后继续放回去,让原来的方法继续执行。
这里就是给第三个参数(Message)的messageId属性赋了一个值,然后照常发送。

2. 在消费mq时候的拦截

package cn.xuetian.crm.boss.mq.rabbit.aop;

import cn.xuetian.crm.dao.dataobject.mq.MqRecord;
import cn.xuetian.crm.dao.mapper.mq.MqRecordMapper;
import cn.xuetian.framework.plugin.enums.msg.mq.MqReceiveStatusEnum;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
* @author zss
* @date 2022/10/22 14:15
* @Email:451682479@qq.com aop方式记录boss的消息消费
*/
@Slf4j
@Aspect
@Component
public class BossMqReceiveAspect {

@Autowired
private MqRecordMapper mqRecordMapper;

//ocn.xuetian.crm.boss.mq.rabbit.receiver.BossMqReceiver
//xxx(Channel channel, Message message)
@Pointcut("execution(* cn.xuetian.crm.boss.mq.rabbit.receiver.BossMqReceiver.*(..))")
public void pointcut() {
}

@Around("pointcut()")
public Object doAround(ProceedingJoinPoint pjp) {
Object result = null;
Channel channel = null;
Message message = null;

String channelJsonStr = "";
String messageId = "";
MqReceiveStatusEnum receiveStatus = MqReceiveStatusEnum.CONSUME_SUCCESS;

try {
Object[] args = pjp.getArgs();

if (null != args && args.length == 2) {
channel = (Channel) args[0];
channelJsonStr = JSON.toJSONString(channel);

message = (Message) args[1];
messageId = message.getMessageProperties().getMessageId();
}

//正常消费
result = pjp.proceed();

//消费成功
receiveStatus = MqReceiveStatusEnum.CONSUME_SUCCESS;

} catch (Throwable throwable) {
log.info(String.format("【mq】消费异常: messageId = %s ", messageId));
log.info(throwable.getMessage());
log.info(throwable.getStackTrace().toString());

//消费失败
receiveStatus = MqReceiveStatusEnum.CONSUME_FAIL;
} finally {
//更新发送状态
MqRecord mqRecord = mqRecordMapper.selectOneByRecord(MqRecord.builder().messageId(messageId).build());
if (null != mqRecord) {
mqRecord.setReceiveChannel(channelJsonStr);
mqRecord.setReceiveStatus(receiveStatus);
mqRecord.setUpdateTime(new Date());
mqRecordMapper.updateByPrimaryKey(mqRecord);
}

}
return result;
}
}

同上,接收时候根据实际情况,保存接收信息,更新消费状态。
接收的时候根据messageId区分同一条消息,然后更新状态(这也就是前面发消息时非要取到参数,给messageId属性赋了值,再发出去的原因)。

其中用到的两个简单枚举略,随便写就行。

三、进阶

上面的写法,乍一看好像挺好:

  1. 对他人代码零侵入,一点影响都没有
  2. 捕捉效果好,一个不落下
  3. 对过去现在未来的都生效,对他人没有一点影响

第一反应,感觉很完美啊。

那么,有优化空间吗?
答案:有
哪里优化?
性能方面,显然上面的那部分插入表和更新,是阻塞代码。可改为异步
emm……,很有道理。

下面是​​异步处理​​​之后的代码。
不过由于需要使用异步方式,采用了springFramework自带的​​​Event​​,故引入了几个类。

3.1 Event

3.1.1 发消息相关的

Event

package cn.xuetian.crm.manager.event.message;

import cn.xuetian.crm.dao.dataobject.mq.MqRecord;
import lombok.Getter;
import lombok.Setter;
import org.springframework.context.ApplicationEvent;

@Setter
@Getter
public class MqSendEvent extends ApplicationEvent {

private MqRecord mqRecord;

public MqSendEvent(MqRecord mqRecord) {
super(mqRecord);
this.mqRecord = mqRecord;
}
}

EventListener

package cn.xuetian.crm.manager.event;

import cn.xuetian.crm.manager.event.message.MqSendEvent;
import cn.xuetian.crm.dao.dataobject.mq.MqRecord;
import cn.xuetian.crm.dao.mapper.mq.MqRecordMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

/**
* 异步保存mq记录
*/
@Slf4j
@Component
public class MqSendEventListener implements ApplicationListener<MqSendEvent> {

@Autowired
private MqRecordMapper mqRecordMapper;

@Async
@Override
public void onApplicationEvent(MqSendEvent event) {
MqRecord mqRecord = event.getMqRecord();
if (null != mqRecord) {
mqRecordMapper.insert(mqRecord);
}
}
}

发消息的代码

package cn.xuetian.crm.manager.mq.rabbit.aop;

import cn.xuetian.crm.dao.dataobject.mq.MqRecord;
import cn.xuetian.crm.manager.event.message.MqSendEvent;
import cn.xuetian.framework.plugin.enums.msg.mq.MqSendStatusEnum;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;

/**
* @author zss
* @date 2022/10/22 14:15
* @Email:451682479@qq.com aop方式记录mq消息发送
*/
@Slf4j
@Aspect
@Component
public class MqSendAspect {


@Autowired
private ApplicationEventPublisher applicationEventPublisher;

//org.springframework.amqp.core.AmqpTemplate.send(java.lang.String, java.lang.String, org.springframework.amqp.core.Message)
//void send(String exchange, String routingKey, Message message) throws AmqpException;
@Pointcut("execution(* org.springframework.amqp.core.AmqpTemplate.send(..))")
public void pointcut() {
}

@Around("pointcut()")
public Object doAround(ProceedingJoinPoint pjp) {
Object result = null;

String exchange = "";
String routingKey = "";

Message message = null;
String messageBodyJsonStr = "";
String messageId = "";

MqSendStatusEnum sendStatus = null;

try {
Object[] args = pjp.getArgs();

//待发送
sendStatus = MqSendStatusEnum.WAIT_SEND;

if (null != args && args.length == 3) {
exchange = args[0].toString();
routingKey = args[1].toString();

message = (Message) args[2];
messageId = UUID.randomUUID().toString();
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setMessageId(messageId);

//用于保存
messageBodyJsonStr = new String(message.getBody());

//将message内容赋值messageId后,再发出去
args[2] = message;
}

//正常发送
result = pjp.proceed(args);

//发送成功
sendStatus = MqSendStatusEnum.ALREADY_SEND;

} catch (Throwable throwable) {
log.info(String.format("【mq】发送异常:exchange = %s, routingKey = %s ,messageId = %s ", exchange, routingKey, messageId));
log.info(throwable.getMessage());
log.info(throwable.getStackTrace().toString());

//发送失败
sendStatus = MqSendStatusEnum.SEND_FAIL;
} finally {

//异步,新增mq发送记录
MqRecord mqRecord = MqRecord.builder()
.exchange(exchange)
.routingkey(routingKey)
.messageId(messageId)
.messageBody(messageBodyJsonStr)
.receiveStatus(null)
.sendStatus(sendStatus)
.build();
MqSendEvent receiveEvent = new MqSendEvent(mqRecord);
applicationEventPublisher.publishEvent(receiveEvent);
}
return result;
}
}

3.1.2 收消息相关的

Event

package cn.xuetian.crm.boss.event.mq;

import cn.xuetian.framework.plugin.enums.msg.mq.MqReceiveStatusEnum;
import lombok.Getter;
import lombok.Setter;
import org.springframework.context.ApplicationEvent;

@Setter
@Getter
public class MqReceiveEvent extends ApplicationEvent {

private String messageId;

private String channelJsonStr;

private MqReceiveStatusEnum receiveStatus;


public MqReceiveEvent(String messageId, String channelJsonStr, MqReceiveStatusEnum receiveStatus) {
super(messageId);
this.messageId = messageId;
this.channelJsonStr = channelJsonStr;
this.receiveStatus = receiveStatus;
}


}

EventListener

package cn.xuetian.crm.boss.event.mq.listener;

import cn.xuetian.crm.boss.event.mq.MqReceiveEvent;
import cn.xuetian.crm.dao.dataobject.mq.MqRecord;
import cn.xuetian.crm.dao.mapper.mq.MqRecordMapper;
import cn.xuetian.framework.plugin.enums.msg.mq.MqReceiveStatusEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.util.Date;

/**
* 异步更新mq记录
*/
@Slf4j
@Component
public class MqReceiveEventListener implements ApplicationListener<MqReceiveEvent> {


@Autowired
private MqRecordMapper mqRecordMapper;

@Async
@Override
public void onApplicationEvent(MqReceiveEvent event) {
try {


String messageId = event.getMessageId();
if (StringUtils.hasText(messageId)) {
MqRecord mqRecord = mqRecordMapper.selectOneByRecord(MqRecord.builder().messageId(messageId).build());
if (null != mqRecord) {

String channelJsonStr = event.getChannelJsonStr();
MqReceiveStatusEnum receiveStatus = event.getReceiveStatus();

mqRecord.setReceiveChannel(channelJsonStr);
mqRecord.setReceiveStatus(receiveStatus);
mqRecord.setUpdateTime(new Date());
mqRecordMapper.updateByPrimaryKey(mqRecord);
}
}

} catch (Exception e) {
e.printStackTrace();
}
}
}

消费消息的代码

package cn.xuetian.crm.boss.mq.rabbit.aop;

import cn.xuetian.crm.boss.event.mq.MqReceiveEvent;
import cn.xuetian.framework.plugin.enums.msg.mq.MqReceiveStatusEnum;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;

/**
* @author zss
* @date 2022/10/22 14:15
* @Email:451682479@qq.com aop方式记录boss的消息消费
*/
@Slf4j
@Aspect
@Component
public class BossMqReceiveAspect {


@Autowired
private ApplicationEventPublisher applicationEventPublisher;


//ocn.xuetian.crm.boss.mq.rabbit.receiver.BossMqReceiver
//xxx(Channel channel, Message message)
@Pointcut("execution(* cn.xuetian.crm.boss.mq.rabbit.receiver.BossMqReceiver.*(..))")
public void pointcut() {
}

@Around("pointcut()")
public Object doAround(ProceedingJoinPoint pjp) {
Object result = null;
Channel channel;
Message message;

String channelJsonStr = "";
String messageId = "";
MqReceiveStatusEnum receiveStatus = MqReceiveStatusEnum.CONSUME_SUCCESS;

try {
Object[] args = pjp.getArgs();

if (null != args && args.length == 2) {
channel = (Channel) args[0];
channelJsonStr = JSON.toJSONString(channel);

message = (Message) args[1];
messageId = message.getMessageProperties().getMessageId();
}

//正常消费
result = pjp.proceed();

//消费成功
receiveStatus = MqReceiveStatusEnum.CONSUME_SUCCESS;

} catch (Throwable throwable) {
log.info(String.format("【mq】消费异常: messageId = %s ", messageId));
log.info(throwable.getMessage());
log.info(throwable.getStackTrace().toString());

//消费失败
receiveStatus = MqReceiveStatusEnum.CONSUME_FAIL;
} finally {

//异步更新mq的消费状态
MqReceiveEvent receiveEvent = new MqReceiveEvent(messageId, channelJsonStr, receiveStatus);
applicationEventPublisher.publishEvent(receiveEvent);

}
return result;
}
}

注意,这里主要是为了使用异步方式,采用了springFramework自带的Event机制。
故,不得不引入了几个Event​EventListener​

四、成功示例

写完后,效果如下图,表格里的数据看起来还挺漂亮的。

springboot中零侵入全局记录消息(rabbitmq)队列的消息发送和消费状态_spring_02


再也不用担心es清空后mq就死无对证了。


精彩评论(0)

0 0 举报