0
点赞
收藏
分享

微信扫一扫

通用接口开放平台设计与实现——(25)消息客户端之消息处理器

前面我们介绍了客户端的消息处理链条上各处理器的配置,这部分实际相当于技术框架,今天我们来介绍下处理具体业务消息的处理器。

处理框架由几个主要的部分组成:
1.消息主题:我们根据业务需求,将事件转换为消息主题,消息主题相当于是对不同类型的消息进行分类,是我们实现业务处理的基础支撑部分。
2.消息处理器工厂:根据消息主题来查找对应的处理器完整类名,然后通过反射技术来创建具体的消息处理器对象。
3.消息处理器:真正进行具体业务逻辑处理的地方,消息分为两类,消息处理器也有两类,一方面,可抽取公用部分形成父类,另一方面,可被具体的业务逻辑处理器继承。

下面来一一介绍。

消息主题

上面说了,消息主题是基础数据,实际在平台管理中需要单独定义和维护的,主要属性有以下几个:
code:主题编码,跟业务事件编码一致,能唯一性标识一类消息;
handler:处理器,存放的是该消息主题对应的具体逻辑处理器的完整路径(包名加类名),然后处理器工厂依据该属性通过反射实现处理器的实例化;
responseTopicCode:响应主题编码,对于请求消息,需要配置下默认的响应主题编码,用来实现消息应答。

/**
 * 消息主题
 * @author wqliu
 * @date 2021-08-21
 *
 */
@Data
@EqualsAndHashCode(callSuper = true)
@Accessors(chain = true)
@TableName("ip_api_message_topic")
public class ApiMessageTopic extends BaseEntity {

    private static final long serialVersionUID = 1L;

    /**
     * 编码
     */
    @TableField("code")
    private String code;

    /**
     * 名称
     */
    @TableField("name")
    private String name;

    /**
     * 处理器
     */
    @TableField("handler")
    private String handler;


    /**
     * 响应主题编码
     */
    @TableField("response_topic_code")
    private String responseTopicCode;

    /**
     * 分类
     */
    @TableField("category")
    private String category;

    /**
     * 状态
     */
    @TableField("status")
    private String status;

    /**
     * 备注
     */
    @TableField("remark")
    private String remark;

    /**
     * 排序号
     */
    @TableField("order_no")
    private String orderNo;


}

消息处理器工厂

使用了设计模式中的简单工厂模式,根据消息主题编码拿到处理器的完整路径,通过反射实现处理器的实例化。

/**
 * 消息处理器工厂
 * @author wqliu
 * @date 2021-10-13 9:07
 **/
public  class MessageHandlerFactory {
    private MessageHandlerFactory(){};

    public static MessageHandler createHandler(String topic){
        //使用反射技术获取类
        Class<MessageHandler> messageHandler=null;
        try {
            //根据消息主题获取对应的消息处理类名
            ApiMessageTopicService service = SpringUtil.getBean(ApiMessageTopicService.class);
            String handlerName = service.getHandlerByCode(topic);
            messageHandler = (Class<MessageHandler>) Class.forName(handlerName);
            //返回消息处理类的实例
            return messageHandler.newInstance();
        }
        catch (CustomException e){
            throw new MessageException("S101",e.getMessage());
        }catch (Exception e){
            throw new MessageException("S102","消息处理器不存在");
        }
    }
}

在我们设计的消息处理链条的技术框架中,请求消息处理器和响应消息处理器,都是通过如下代码来实现具体业务消息处理器的实例化的。

     //转具体的消息处理器进行处理
     ResponseMessageHandler handler = (ResponseMessageHandler)MessageHandlerFactory.createHandler(topic);
     handler.handleMessage(message,ctx.channel());

消息处理器

因为消息分了两类,并且对于请求消息和响应消息的处理逻辑是不同的,相应的,消息处理器也有两个,分别是请求消息处理器RequestMessageHandler和响应消息处理器ResponseMessageHandler,并且可以提取这两个处理器的公用操作,形成一个抽象父类MessageHandler。

具体的业务消息处理器,则会继承RequestMessageHandler或ResponseMessageHandler,覆写其中的部分方法即可。

我们先从比较简单的响应消息处理器说起。

响应消息处理器

主要干三件事
1.验证数据
这里的验证比服务端简单得多,只需要验证消息主题。

2.更新日志
根据请求消息标识,查找消息日志,然后填充其响应部分,后面会详细说下消息日志的设计与实现。

3.执行专有逻辑
预留了一个messageOperation方法,当具体的消息处理器有额外的个性化逻辑需要处理时,只需要覆盖该方法即可,这里实际是设计模式中的模板方法的应用。

/**
 * 响应消息处理器
 * @author wqliu
 * @date 2022-1-8 11:07
 **/
@Slf4j
public class ResponseMessageHandler  extends MessageHandler{

    /**
     * 消息处理
     *
     * @param message 消息
     * @param channel 通道
     */
    public void handleMessage(ResponseMessage responseMessage, Channel channel) {


        // 消息主题验证(是否存在及是否可用)
        validateTopic(responseMessage.getTopic());

        // 更新消息日志
        apiMessageLogService.updateResponsePart(responseMessage);

        //特殊处理
        messageOperation(responseMessage, channel);
    }



    /**
     * 响应消息处理
     *
     * @param message
     * @param channel
     */
    protected void messageOperation(ResponseMessage message, Channel channel) {

    }


}

请求消息处理器

请求消息处理器与响应处理相比,除了要验证消息、创建消息日志以及预留的业务逻辑方法外,多了一步,向发送请求的客户端,发送响应消息。

/**
 * 请求消息处理器
 *
 * @author wqliu
 * @date 2022-1-8 10:42
 **/
@Slf4j
public class RequestMessageHandler extends MessageHandler {
    /**
     * 消息处理
     *
     * @param message 消息
     * @param channel 通道
     */
    public void handleMessage(RequestMessage requestMessage, Channel channel) {

        // 记录消息请求日志
        apiMessageLogService.createRequestPart(requestMessage);


        // 消息主题验证(是否存在及是否可用)
        validateTopic(requestMessage.getTopic());

        //特殊处理
        messageOperation(requestMessage, channel);

        //发送响应至消息发送者
        sendResponse(requestMessage, channel);

    }


    private void sendResponse(RequestMessage requestMessage, Channel channel) {
        //获取响应消息的消息主题
        String responseTopicCode = getResponseTopicCode(requestMessage.getTopic());
        //根据消息主题构建发送器
        ResponseMessageSender responseMessageSender = (ResponseMessageSender) MessageSenderFactory.createSender(responseTopicCode);
        //发送消息
        responseMessageSender.sendMessage(channel, requestMessage);
    }
    
    /**
     * 获取响应消息主题
     *
     * @return
     */
    protected String getResponseTopicCode(String topic) {
        //默认从消息主题实体类中获取
        return apiMessageTopicService.getResponseTopicCodeByCode(topic);
    }


   
    /**
     * 请求消息处理
     *
     * @param message
     * @param channel
     */
    protected void messageOperation(RequestMessage message, Channel channel) {

    }


}

公用处理器

请求与响应处理器的父类,主要是公共部分复用,主要完成了数据验证工作。

/**
 * 消息处理抽象类
 *
 * @author wqliu
 * @date 2021-10-13 9:07
 **/
public  class MessageHandler {


    protected ApiMessageLogService apiMessageLogService = SpringUtil.getBean(ApiMessageLogService.class);

    protected ApiMessageTopicService apiMessageTopicService=SpringUtil.getBean(ApiMessageTopicService.class);



    /**
     * 验证主题编码
     *
     * @param topicCode 主题编码
     */
    protected void validateTopic(String topicCode) {
        try {

            ApiMessageTopic messageTopic = apiMessageTopicService.getByCode(topicCode);
            if(messageTopic.getStatus().equals(StatusEnum.DEAD.name())){
                throw new MessageException("S102", "消息主题不可用");
            }
        }catch (Exception ex){
            throw new MessageException("S101", "消息主题不存在");
        }

    }


}

举报

相关推荐

0 条评论