总体设计
消息日志是监控系统正常运行及异常排查的重要依据。日志分为两部分,一部分是磁盘日志,记录收到和发出的所有消息具体内容,这部分日志以文本的形式存储在磁盘文件上,主要用于异常排查,但实际查看并不方便。为了实现系统监控的的目的,建立消息日志的库表,在消息发送与接收时,更新到库表记录中,同时,保存消息状态和发送次数等辅助信息,依据该库表和辅助信息,实现消息的重发功能。
我们的消息是异步的,请求消息和响应消息是成对出现的,为了方便查看和监控,日志记录并没有保存原始的日志记录,而是从设计层面做了调整,同一条日志记录,来记录请求消息以及对应的响应消息。
日志类
/**
* 消息日志
* @author wqliu
* @date 2021-08-21
*
*/
@Data
@EqualsAndHashCode(callSuper = true)
@Accessors(chain = true)
@TableName("ip_api_message_log")
public class ApiMessageLog extends BaseEntity {
private static final long serialVersionUID = 1L;
/**
* 请求消息标识
*/
@TableField("request_id")
private String requestId;
/**
* 请求应用编码
*/
@TableField("request_app_code")
private String requestAppCode;
/**
* 请求消息主题编码
*/
@TableField("request_topic_code")
private String requestTopicCode;
/**
* 请求时间
*/
@TableField("request_time")
private LocalDateTime requestTime;
/**
* 请求内容
*/
@TableField("request_data")
private String requestData;
/**
* 响应消息标识
*/
@TableField("response_id")
private String responseId;
/**
* 响应应用编码
*/
@TableField("response_app_code")
private String responseAppCode;
/**
* 响应消息主题编码
*/
@TableField("response_topic_code")
private String responseTopicCode;
/**
* 响应时间
*/
@TableField("response_time")
private LocalDateTime responseTime;
/**
* 响应内容
*/
@TableField("response_data")
private String responseData;
/**
* 响应结果
*/
@TableField("response_result")
private String responseResult;
/**
* 错误信息
*/
@TableField("error_message")
private String errorMessage;
/**
* 当前状态
*/
@TableField("status")
private String status;
/**
* 发送次数
*/
@TableField("send_count")
private Integer sendCount;
}
日志对象主要分为三部分:
请求消息部分:消息标识、应用编码、消息主题编码、请求时间、请求内容
响应消息部分:消息标识、应用编码、消息主题编码、响应时间、响应内容、响应结果、错误信息
辅助信息 :当前状态 、发送次数
下
消息状态
首先,与我们的接口平台对接,有两种模式,一种是基于websocket方式,另外一种是基于api服务轮询的方式,因此消息状态实际是两套。
/**
* 消息状态
* @author wqliu
* @date 2021-10-5 10:07
*/
public enum MessageStatus
{
/**
* 待请求
*/
WAIT_REQUEST,
/**
* 已请求
*/
REQUESTED,
/**
* 已响应
*/
RESPONSED,
/**
* 无需请求(消息类型为响应消息或客户端未订阅消息)
*/
NOT_TO_REQUEST,
/**
* 待处理(消息接口专用)
*/
WAIT_HANDLE
}
基于websocket方式的状态多一些,首先,消息创建时,状态设置为待请求WAIT_REQUEST,然后进一步判断,如果消息类型为响应消息或客户端未订阅消息,则设置为无需请求NOT_TO_REQUEST,否则,查询订阅消息的客户端进行消息推送,将状态更新为已请求REQUESTED,收到客户端消息确认请求后,将状态更新为已响应RESPONSED。
基于api服务轮询的方式比较简单,需要上面的两个状态,消息产生后是WAIT_HANDLE,即等待对接方查询和处理,收到消息确认的接口调用后,将状态更新为RESPONSED。
创建日志
创建日志记录,该动作发生在发起请求消息时,填充日志对象的请求消息部分,以及初始化消息状态和发送次数。
/**
* 保存请求消息日志
* @param requestMessage
*/
protected void saveLog(RequestMessage message) {
ApiMessageLog log=new ApiMessageLog();
log.setRequestId(message.getId());
log.setRequestAppCode(message.getPublishAppCode());
log.setRequestTopicCode(message.getTopic());
log.setRequestTime(LocalDateTime.now());
log.setRequestData(message.getContent());
log.setResponseAppCode(message.getResponseAppCode());
log.setStatus(message.getStatus());
log.setSendCount(message.getSendCount());
apiMessageLogService.add(log);
}
更新日志
更新日志记录,该动作发生在服务端向客户端发送响应消息时或服务端收到客户的响应消息时,根据请求消息标识查找对应的日志记录,更新响应消息部分。
/**
* 更新消息日志
* @param messageResponse 响应消息
*/
protected void updateLog(ResponseMessage message) {
ApiMessageLog log = apiMessageLogService.getByRequestMessageId(message.getRequestMessageId());
//响应消息的发布者,对应请求消息的响应
log.setResponseAppCode(message.getPublishAppCode());
log.setResponseTopicCode(message.getTopic());
log.setResponseTime(LocalDateTime.now());
log.setResponseData(message.getContent());
log.setResponseResult(message.getResult());
log.setErrorMessage(message.getErrorMessage());
log.setResponseId(message.getId());
//将消息更新为已响应
log.setStatus(MessageStatus.RESPONSED.name());
apiMessageLogService.modify(log);
}