0
点赞
收藏
分享

微信扫一扫

通用接口开放平台设计与实现——(24)消息客户端之自定义处理器

处理器清单

上篇说了,我们除了使用netty内置的处理器外,还自定义了10个处理器用于实现自己的业务处理,所有处理器清单如下:

序号处理器类型职责实现说明
1SslHandler处理可靠安全连接内置仅在生产环境,需要进行ssl加解密
2HttpClientCodecHTTP 编解码内置对Http请求进行解码与编码
3HttpObjectAggregator聚合HTTP 请求或响应内置将http请求或响应聚合为一个完整对象
4IdleStateHandler空闲监测内置监测空闲状态,触发后续超时处理
5HeartbeatTimeoutHandler心跳超时处理自定义心跳超时执行关闭连接,触发重连
6WebSocketClientHandshakerHandlerWebSocket专用处理自定义处理WebSocket的握手以及Ping、Pong、Close消息
7HeartbeatRequestHandler发送心跳请求自定义客户端向服务端定时发送心跳
8ValidateMessageHandler验证消息自定义对消息进行基本验证,如非空,日期格式正确等
9DistinctMessageHandler过滤掉重复消息自定义根据消息唯一性标识判断,如曾接收过,则直接返回成功处理,不再进行后续处理
10MessageTypeDecodeHandler文本反序列化成消息对象自定义将文本按消息类型转换为请求消息或响应消息
11RequestMessageBusinessHandler处理请求消息自定义请求消息业务逻辑处理器
12ResponseMessageBusinessHandler处理响应消息自定义响应消息业务逻辑处理器
13TextWebSocketFrameEncodeHandlerJSON格式转文本帧自定义将json格式字符串编码为TextWebSocketFrame
14JsonEncodeHandler对象序列化为JSON字符串自定义将对象序列化为json格式字符串

心跳超时处理

我们的心跳策略是客户端来发送心跳,服务端响应心跳,客户端发现服务端未及时响应心跳时,认为通道异常,进行重连。

这个处理器实际是配合netty内置的空闲检测处理器IdleStateHandler使用的,只有满足IdleStateHandler中设置的触发条件,才会触发本处理器中的userEventTriggered方法,执行自定义的逻辑操作。

客户端每隔固定时间频率向服务器端发送心跳,WebSocket协议约定的PingWebSocketFrame,服务端收到后马上会回复PongWebSocketFrame,如通道失效或服务端无响应情况下,就会触发客户端读空闲。

同时,我们还覆写了channelInactive方法,在通道失效时,启动重连。虽然处理链条上任何一个处理器的这个方法都可以覆写,但从职责上来说,放到这个处理器里更合理一些。

/**
 * 心跳超时处理器
 * @author wqliu
 * @date 2021-10-2 14:25
 **/
@Slf4j
public class HeartbeatTimeoutHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state().equals(IdleState.READER_IDLE)) {
                log.info("读空闲");
                //关闭连接
                ctx.channel().close();
            }
        } else {
            //非空闲事件,传递到下个处理器
            super.userEventTriggered(ctx, evt);
        }

    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("客户端检测通道失效,启动重连");
        MessageClient messageClient= SpringUtil.getBean(MessageClient.class);
        messageClient.reconnect();

    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("与服务端建立连接,通道开启!");
        //必须调用父类方法,避免其他处理器的channelActive事件不再触发
        super.channelActive(ctx);

    }


}

WebSocket专用处理

这是很关键的一个处理器,自身也比较复杂。

主要实现是借助netty提供的一个WebSocketClientHandshaker类,在初始化时设置websocket服务端连接信息,然后在客户端启动时,调用该类的发起握手方法handshake,服务器端收到该握手请求后,会进行后续处理,响应一个协议升级,将http协议升级为WebSocket协议。

同时需要注意的是,这里还有一个我们自定义的操作,即在握手成功,协议升级后,客户端发出一个登录服务端的请求消息。

/**
 * 处理web socket握手
 *
 * @author wqliu
 * @date 2021-9-28 16:33
 **/
@EqualsAndHashCode(callSuper = true)
@Slf4j
@Data
public class WebSocketClientHandshakerHandler extends SimpleChannelInboundHandler<Object> {
    /**
     * 握手
     */
    private WebSocketClientHandshaker handshaker;
    /**
     * 握手 异步处理
     */
    private ChannelPromise handshakeFuture;

    public WebSocketClientHandshakerHandler() {
        //初始化握手处理者
        MessageClientGlobalHolder config = SpringUtil.getBean(MessageClientGlobalHolder.class);
        URI webSocketUri = null;
        try {
            webSocketUri = new URI(config.getWebSocketUrl());
        } catch (URISyntaxException e) {
            log.error("解析远程服务器地址出错", e);
        }
        WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(webSocketUri, WebSocketVersion.V13, (String) null, true, new DefaultHttpHeaders());
        this.setHandshaker(handshaker);


    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {

        Channel ch = ctx.channel();
        FullHttpResponse response;
        //进行握手操作
        if (!this.handshaker.isHandshakeComplete()) {
            try {
                response = (FullHttpResponse) msg;
                //握手协议返回,设置结束握手
                this.handshaker.finishHandshake(ch, response);
                //设置成功
                this.handshakeFuture.setSuccess();

            } catch (WebSocketHandshakeException var7) {
                FullHttpResponse res = (FullHttpResponse) msg;
                String errorMsg = String.format("握手失败,status:%s,reason:%s", res.status(), res.content().toString(CharsetUtil.UTF_8));
                this.handshakeFuture.setFailure(new Exception(errorMsg));
            }
        } else if (msg instanceof FullHttpResponse) {
            //已握手成功并将http协议升级为了WebSocket协议,不应再收到Http消息,发生这种情况则抛出异常
            response = (FullHttpResponse) msg;
            throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
        } else if (msg instanceof CloseWebSocketFrame) {
            log.info("收到关闭信息");

        } else if (msg instanceof TextWebSocketFrame) {            
            ReferenceCountUtil.retain(msg);
            ctx.fireChannelRead(msg);
        }

    }


    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {

        //设置握手成功后,发起登录请求
        this.handshakeFuture = ctx.newPromise();
        ChannelFuture handshakeFuture = this.handshakeFuture;
        handshakeFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    //发送登录请求
                    log.info("握手成功");
                    LoginRequestSender login = new LoginRequestSender();
                    login.sendMessage();

                } else {
                    //握手失败
                    log.error("握手失败", future.cause());
                }
            }
        });


    }

    /**
     * 发起握手
     */
    public void handshake(Channel channel) {
        this.getHandshaker().handshake(channel);
    }


}

发送心跳请求

心跳机制是客户端每隔固定时间频率向服务器端发送心跳,WebSocket协议约定的PingWebSocketFrame。

这里是从ChannelHandlerContext拿到EventLoop对象,调用了任务调度方法scheduleWithFixedDelay,实现了定时心跳发送功能。

/**
 * 心跳请求处理器
 * @author wqliu
 * @date 2021-10-2 13:24
 **/
@Slf4j
public class HeartbeatRequestHandler extends ChannelInboundHandlerAdapter {

    /**
     * 心跳发送间隔,单位秒
     */
    private int heartbeatInterval=5;

    public HeartbeatRequestHandler(int heartbeatInterval){
        this.heartbeatInterval=heartbeatInterval;
    }


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        EventLoop eventLoop = ctx.channel().eventLoop();
        eventLoop.scheduleWithFixedDelay(new Runnable() {
            private Channel channel;
            @Override
            public void run() {
                // log.info("发送心跳");
                PingWebSocketFrame frame=new PingWebSocketFrame();
                ChannelFuture channelFuture = channel.writeAndFlush(frame);
                channelFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        // log.error(future.isSuccess()+"",future.cause());
                    }
                });
            }

            public Runnable setChannel(Channel channel){
                this.channel=channel;
                return this;
            }

        }.setChannel(ctx.channel()),15,heartbeatInterval, TimeUnit.SECONDS);

        //必须调用父类方法,避免其他处理器的channelActive事件不再触发
        super.channelActive(ctx);
    }
}

验证消息

对输入有效验证,可以避免后续大量的异常问题,这里实际是基本的数据验证,主要验证属性是否为空、日期格式是否合法,消息类型在枚举类定义的范围内。

这里面我们加了一句日志,打印消息,这样可以保证在系统调试阶段,所有收到的消息可以在控制台打印;系统运行阶段,所有收到的消息可以通过磁盘日志文件追溯查看。

/**
 * 消息验证处理器
 * @author wqliu
 * @date 2022-1-19 13:50
 **/
@Slf4j
public class ValidateMessageHandler  extends ChannelInboundHandlerAdapter {


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        TextWebSocketFrame textWebSocketFrame=(TextWebSocketFrame)msg;
        String content = textWebSocketFrame.text();
        log.info("收到消息:{}",content);
        BaseMessage message = JSON.parseObject(content, BaseMessage.class);
        if(message!=null) {
            //验证消息属性
            validateProperty(message);
            ctx.fireChannelRead(textWebSocketFrame);
        }else {
           // 当收到未按约定格式发送过来的无法解析的消息时,仅记录错误日志
           log.error("收到无法解析的消息:{}",content);
        }

    }

    /**
     * 验证消息属性
     *
     * @param message 消息
     */
    protected void validateProperty(BaseMessage message) {

        String errorCode;
        String errorMessage;
        // id
        if (StringUtils.isBlank(message.getId())){

            errorCode = "S001";
            errorMessage = "消息标识不能为空";
            throw new MessageException(errorCode, errorMessage);
        }
        // 主题
        if (StringUtils.isBlank(message.getTopic())) {

            errorCode = "S002";
            errorMessage = "消息主题不能为空";
            throw new MessageException(errorCode, errorMessage);
        }
        // 发布者
        if (StringUtils.isBlank(message.getPublishAppCode())) {

            errorCode = "S003";
            errorMessage = "消息发布者不能为空";
            throw new MessageException(errorCode, errorMessage);
        }

        // 发布时间
        String publishTimeString = message.getPublishTime();
        if (StringUtils.isBlank(publishTimeString)) {
            errorCode = "S004";
            errorMessage = "发布时间不能为空";
            throw new MessageException(errorCode, errorMessage);

        } else if (!ValidateUtil.dateIsFormat(publishTimeString))
        {
            errorCode = "S005";
            errorMessage = "发布时间格式不正确";
            throw new MessageException(errorCode, errorMessage);

        }
        // 消息类型
        if (StringUtils.isBlank(message.getMessageType())) {

            errorCode = "S006";
            errorMessage = "消息类型不能为空";
            throw new MessageException(errorCode, errorMessage);
        }else{
            if(EnumUtils.isValidEnum(MessageTypeEnum.class,message.getMessageType())==false){
                errorCode = "S007";
                errorMessage = "消息类型无效";
                throw new MessageException(errorCode, errorMessage);
            }
        }
    }

}

过滤重复消息

我们为了保证消息传输的可靠性,采用请求应答机制,当发送方未及时收到响应时,会进行消息重发,而接收方,则需要去重。

本处理器的职责就是去重,实现机制是根据消息日志查询消息标识是否已存在,若不存在,则继续往下处理,若已存在,则返回处理成功的响应。

/**
 * 去重处理器
 * @author wqliu
 * @date 2022-1-19 13:50
 **/
public class DistinctMessageHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    private ApiMessageLogService apiMessageLogService = SpringUtil.getBean(ApiMessageLogService.class);


    private ApiMessageTopicService apiMessageTopicService = SpringUtil.getBean(ApiMessageTopicService.class);


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {
        String content = textWebSocketFrame.text();
        BaseMessage message = JSON.parseObject(content, BaseMessage.class);
        String messageType = message.getMessageType();
        String messageId = message.getId();
        String topic=message.getTopic();
        boolean hasReceived=false;
        if(messageType.equals(MessageTypeEnum.REQUEST.name())){
            hasReceived = apiMessageLogService.checkRequestMessageExist(messageId);
            if(hasReceived){
                //发送响应,终止流程
                String responseTopic = apiMessageTopicService.getResponseTopicCodeByCode(topic);
                RequestMessageSender sender = (RequestMessageSender) MessageSenderFactory.createSender(responseTopic);
                ApiMessageLog log = apiMessageLogService.getByRequestMessageId(messageId);
                sender.sendMessage(log.getResponseData(), log.getResponseId());


            }else{
                //继续往下传递
                ReferenceCountUtil.retain(textWebSocketFrame);
                ctx.fireChannelRead(textWebSocketFrame);
            }

        }else if(messageType.equals(MessageTypeEnum.RESPONSE.name())){
            hasReceived =apiMessageLogService.checkResponseMessageExist(messageId);
            if(hasReceived){
                //不做处理,终止流程
            }else{
                //继续往下传递
                ReferenceCountUtil.retain(textWebSocketFrame);
                ctx.fireChannelRead(textWebSocketFrame);
            }
        }
    }


}

文本反序列化成消息对象

我们将消息设计为两大类,请求消息和响应消息,这里通过自己实现的一个处理器,将客户端传来的文本帧,通过消息类型属性反序列化成请求消息对象或响应消息对象,然后继续往下传递。这里调用的是公用的处理器,即服务端也使用相同的处理器。

/**
 * 消息类型解码
 * @author wqliu
 * @date 2021-10-6 11:23
 **/
public class MessageTypeDecodeHandler extends MessageToMessageDecoder<TextWebSocketFrame> {
    @Override
    protected void decode(ChannelHandlerContext ctx, TextWebSocketFrame msg, List<Object> out) throws Exception {
        String message=msg.text();
        //消息解析
        JSONObject jsonObject = JSONObject.parseObject(message);
        String messageType = jsonObject.getString("messageType");
        if (messageType.equals(MessageTypeEnum.REQUEST.name())) {
            RequestMessage requestMessage = JSON.parseObject(message, RequestMessage.class);
            out.add(requestMessage);

        }else if (messageType.equals(MessageTypeEnum.RESPONSE.name())) {

            ResponseMessage responseMessage = JSON.parseObject(message, ResponseMessage.class);
            out.add(responseMessage);
        }
    }

}

处理请求消息/响应消息

上一步把消息内容通过解码形成了请求消息或响应消息,而这两个处理器只需加入到链条中即可,根据传入的消息类型,也就是泛型参数类型,会自动识别处理或者往下传递。

这里采用了类似的处理机制,通过简单工厂模式,根据消息主题来获取到对应的业务消息处理器,由具体的消息处理器来进行业务逻辑处理

/**
 * 客户端请求消息业务逻辑处理
 * @author wqliu
 * @date 2021-2-5 16:25
 **/
@Slf4j
public class RequestMessageBusinessHandler extends SimpleChannelInboundHandler<RequestMessage> {

    private ApiMessageLogService apiMessageLogService = SpringUtil.getBean(ApiMessageLogService.class);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RequestMessage message) throws Exception {
        String requestMessageId= StringUtils.EMPTY;
        String topic =StringUtils.EMPTY;
        try {

            //获取消息主题
            topic = message.getTopic();
            //获取消息标识
            requestMessageId=message.getId();
            //转具体的消息处理器进行处理
            RequestMessageHandler handler = (RequestMessageHandler) MessageHandlerFactory.createHandler(topic);
            handler.handleMessage(message,ctx.channel());
        } catch (Exception e) {
            // 记录消息请求日志
            apiMessageLogService.createRequestPart(message);
            String errorMessage = e.getMessage();
            //统一响应错误
            ResponseError responseError=new ResponseError();
            responseError.sendResponse(ctx.channel(), MessageResponseResultEnum.ERROR.name(),errorMessage,requestMessageId,
                    topic);
            log.error("发生异常:" , e);
        }
    }


}

下面这个响应消息处理器跟请求消息处理器非常类似,区别在于这是入站的最后一个处理器,因此我们覆写了exceptionCaught方法,来打印错误日志。

/**
 * 客户端响应消息业务逻辑处理
 * @author wqliu
 * @date 2021-2-5 16:25
 **/
@Slf4j
public class ResponseMessageBusinessHandler extends SimpleChannelInboundHandler<ResponseMessage> {

    private ApiMessageLogService apiMessageLogService = SpringUtil.getBean(ApiMessageLogService.class);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ResponseMessage message) throws Exception {
        String requestMessageId= StringUtils.EMPTY;
        String topic =StringUtils.EMPTY;
        try {

            //获取消息主题
            topic = message.getTopic();
            //获取消息标识
            requestMessageId=message.getId();
            //转具体的消息处理器进行处理
            ResponseMessageHandler handler = (ResponseMessageHandler) MessageHandlerFactory.createHandler(topic);
            handler.handleMessage(message,ctx.channel());
        } catch (Exception e) {
            // 更新消息日志
            apiMessageLogService.updateResponsePart(message);

            String errorMessage = e.getMessage();
            //统一响应错误
            ResponseError responseError=new ResponseError();
            responseError.sendResponse(ctx.channel(), MessageResponseResultEnum.ERROR.name(),errorMessage,requestMessageId,topic);
            log.error("发生异常:" , e);
        }


    }







    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error("发生异常", cause);

    }
}

JSON格式转文本帧

这个非常简单,就是把JSON格式字符串放到文本帧中,是一个公用的处理器。

这里面我们加了一句日志,打印消息,这样可以保证在系统调试阶段,所有发送的消息可以在控制台打印;系统运行阶段,所有发送的消息可以通过磁盘日志文件追溯查看。

/**
 * 将json格式字符串编码为TextWebSocketFrame
 * @author wqliu
 * @date 2021-10-6 11:23
 **/
@Slf4j
public class TextWebSocketFrameEncodeHandler extends MessageToMessageEncoder<String> {
    @Override
    protected void encode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception {
        log.info("发出消息:{}",msg);
        TextWebSocketFrame frame=new TextWebSocketFrame(msg);
        out.add(frame);

    }

}

对象序列化为JSON字符串

这个比较简单,就是把JSON格式字符串放到文本帧中,是一个公用的处理器。

/**
 * 将对象序列化为json格式字符串
 * @author wqliu
 * @date 2021-10-6 11:23
 **/
public class JsonEncodeHandler extends MessageToMessageEncoder<BaseMessage> {
    @Override
    protected void encode(ChannelHandlerContext ctx, BaseMessage msg, List<Object> out) throws Exception {

        if(msg instanceof BaseMessage) {
            out.add(JSONObject.toJSONString(msg));
        }else{
            out.add(msg);
        }
    }


}

举报

相关推荐

0 条评论