消息服务采用的发布订阅者模式,由消息生产者将消息发送到消息服务中心,由消息服务中心完成消息的复制及转发至消息消费者。从过程看,可以看成两段交互,一是生产者到消息中心,二是从消息中心到消费者。
以委托单创建这一业务消息为例,我们来实现这两个过程。
在前面已经实现了消息客户端接收器的情况下,我们在客户端和服务端完成登录认证的情况下,访问以下地址,
http://localhost:10001/event?event=lms.transportbill.consignmentbill.create&id=WT202101210005
可以模拟业务系统,向我们的消息客户端推送了一个委托单创建请求,事件编码为lms.transportbill.consignmentbill.create,单号为WT202101210005
生产者推送消息至消息中心
生产者,实际也是一个消息客户端,消息中心则是消息服务端。
接收到业务系统的调用后,生产者会根据事件编码,查找对应的消息发送器,因为主要的逻辑都在父类中实现了,大大简化了业务功能的开发,这里只需要设置消息主题就行了。
/**
* 委托单创建发送器
* @author wqliu
* @date 2022-1-18 11:02
**/
public class ConsignmentBillCreateSender extends RequestMessageSender {
public ConsignmentBillCreateSender(){
super("lms.transportbill.consignmentbill.create");
}
}
依据事件编码和业务单据标识,构建一条请求消息,发送给消息中心。
{"content":"WT202101210005","id":"1489152492813930497","messageType":"REQUEST","publishAppCode":"SCS","publishTime":"2022-02-03 16:23:16","sendCount":0,"topic":"lms.transportbill.consignmentbill.create"}
[
](http://localhost:10001/event?event=lms.transportbill.consignmentbill.create&id=WT202101210005)
消息中心收到该消息时,通过messageType属性确定是一条请求消息,然后依据topic属性,查找对应的消息处理器,同样,这里只需要继承父类即可。
/**
* 委托单创建请求处理器
* @author wqliu
* @date 2022-1-21 19:19
**/
@Slf4j
public class ConsignmentBillCreateRequestHandler extends RequestMessageHandler {
@Override
protected void messageOperation(RequestMessage message, Channel channel) {
}
}
该消息的整体处理流程,则是在父类中定义和实现的。
/**
* 消息处理
*
* @param message 消息
* @param channel 通道
*/
public void handleMessage(RequestMessage requestMessage, Channel channel) {
// 记录消息请求日志
apiMessageLogService.createRequestPart(requestMessage);
//验证框架
validateFramework(requestMessage);
//将请求消息状态默认设置为无需发送
apiMessageLogService.updateStatus(MessageStatusEnum.NOT_TO_REQUEST.name(),requestMessage.getId());
//特殊处理
messageOperation(requestMessage, channel);
//发送响应至消息发送者
sendResponse(requestMessage, channel);
//消息处理(复制及转发)
if(isNeedRepost()){
repostMessage(requestMessage.getTopic(),requestMessage.getContent());
}
}
其中发送响应至消息发送者的实现如下
private void sendResponse(RequestMessage requestMessage, Channel channel) {
//获取响应消息的消息主题
String responseTopicCode = getResponseTopicCode(requestMessage.getTopic());
//根据消息主题构建发送器
ResponseMessageSender responseMessageSender = (ResponseMessageSender)MessageSenderFactory.createSender(responseTopicCode);
//发送消息
responseMessageSender.sendMessage(channel, requestMessage);
}
对于大多数业务消息,如无特殊情况,统一回复一条确认消息,告知客户端,消息已成功接收,主题为framework.message.confirm
/**
* 向请求方发送发生消息确认的响应
* @author wqliu
* @date 2021-10-14 10:38
**/
public class MessageConfirmResponseSender extends ResponseMessageSender {
public MessageConfirmResponseSender() {
super("framework.message.confirm.response");
}
}
完整消息如下:
{"id":"1489152495347277825","messageType":"RESPONSE","publishAppCode":"MessageServer","publishTime":"2022-02-03 16:23:16","requestMessageId":"1489152492813930497","result":"SUCCESS","topic":"framework.message.confirm.response"}
客户端收到上面消息后,根据通过messageType属性确定是一条响应消息,然后依据topic属性,查找对应的消息处理器
/**
* 消息确认 响应处理器
* @author wqliu
* @date 2022-1-16 8:53
**/
@Slf4j
public class MessageConfirmResponseHandler extends ResponseMessageHandler {
}
这个处理器只需要继承父类即可,实际要完成的工作,就是进行数据验证和更新日志。
消息中心推送消息至消费者
消息中心收到生产者推送过来的业务消息外,一方面,需要给生产者推送一条消息确认的响应消息;另一方面,则需要根据消息主题,查找所有订阅该主题的消费者(实际就是消息客户端),将消息复制及转发出去。
这种情况下,实际消息中心在消息的生产者和消费者中间,充当了经纪人的角色,生产者和消费者只需要跟消息中心交互就行了,并不知道对方的存在,也就是实现生产者和消费者的解耦。
下面我们来实现从消息中心推送业务消息至消费者的过程。
以下方法是消息中心处理请求消息的过程,前面几个步骤记录日志、验证框架、设置消息状态、该处理器的个性化处理和发送响应,前面都提到过,而本节的主角是repostMessage方法。
/**
* 消息处理
*
* @param message 消息
* @param channel 通道
*/
public void handleMessage(RequestMessage requestMessage, Channel channel) {
// 记录消息请求日志
apiMessageLogService.createRequestPart(requestMessage);
//验证框架
validateFramework(requestMessage);
//将请求消息状态默认设置为无需发送
apiMessageLogService.updateStatus(MessageStatusEnum.NOT_TO_REQUEST.name(),requestMessage.getId());
//特殊处理
messageOperation(requestMessage, channel);
//发送响应至消息发送者
sendResponse(requestMessage, channel);
//消息处理(复制及转发)
if(isNeedRepost()){
repostMessage(requestMessage.getTopic(),requestMessage.getContent());
}
}
repostMessage方法内容如下:
/**
* 消息转发
* @param topic
* @param content
*/
private void repostMessage(String topic,String content) {
//根据消息主题构建发送器
RequestMessageSender requestMessageSender = (RequestMessageSender) MessageSenderFactory.createSender(topic);
//传入原请求的消息标识和消息内容
requestMessageSender.sendMessage(content);
}
查找订阅主题的消息客户端以及构建转发消息逻辑比较简单,是在消息发送器中实现的。
一个消息主题可能有多个消息客户端订阅,又分为两种情况,一是这些消息客户端能收到所有消息,即不需要控制数据权限,例如将所有的运输任务单创建消息发送给做在途跟踪的服务商;二是某个特定的消息客户端,只能收到其中一部分消息,需要进行数据权限过滤,例如委托单指定了承运商,我们应该将委托单创建消息,只发送给被委托的这家承运商。
委托单创建发送器实现如下:
/**
* 委托单创建发送器
* @author wqliu
* @date 2022-2-1 8:14
**/
public class ConsignmentBillCreateRequestSender extends RequestMessageSender {
public ConsignmentBillCreateRequestSender() {
super("lms.transportbill.consignmentbill.create");
}
@Override
protected boolean dataPermissionFilter(String content, String appCode) {
//获取业务单据标识
String id = content;
//通过api调用,获取该业务单据的承运商编码,此处模拟为001
String carrierCode="001";
//查找当前应用拥有的承运商数据角色列表
List<ApiDataPermission> list = apiDataPermissionService.getPermissionByRoleCode(DataRoleEnum.CARRIER.name(), appCode);
AtomicBoolean hasPermission= new AtomicBoolean(false);
list.stream().forEach(x->{
//如数据权限记录的业务编码与单据编码一致,或者使用了通配符,则有权限
if(x.getBusinessCode().equals(carrierCode) || x.getBusinessCode().equals(DATA_PERMISSION_ALL)){
hasPermission.set(true);
return;
}
});
return hasPermission.get();
}
}
消费者收到服务中心推送过来的消息时,使用如下处理器进行处理
/**
* 委托单创建请求消息处理器
* @author wqliu
* @date 2022-1-22 10:20
**/
@Slf4j
public class ConsignmentBillCreateRequestHandler extends RequestMessageHandler {
@Override
protected void messageOperation(RequestMessage message, Channel channel) {
// 进行业务处理
}
}
其父类,会发送一条消息确认的响应消息给消息中心,以通知消息中心,业务消息已经收到。
而消息中心收到这条消息后,更新消息日志。
/**
* 消息确认 响应处理器
* @author wqliu
* @date 2022-1-16 8:53
**/
@Slf4j
public class MessageConfirmResponseHandler extends ResponseMessageHandler {
}
以上就是整体处理流程,消息中心发业务消息给消费者,消费者进行业务处理,反馈一条消息确认的响应消息,消息中心收到后,更新消息日志的状态,从而完成闭环。