0
点赞
收藏
分享

微信扫一扫

通用接口开放平台设计与实现——(23)消息客户端之处理器配置

上篇说了,消息客户端的启动框架基本是netty的标准模式,关键在于自定义的处理器链条的配置上,也就是MessageClientChannelInitializer的实现。

一共涉及到14个消息处理器,与服务端相比,一是多了1个心跳发送的处理器,二是WebSocket专用处理器,不是内置而是自定义实现的。

其中4个是netty内置的,只是进行了参数配置,其他10个是自己实现的,用于处理逻辑和数据的,依次如下:

序号处理器类型职责实现说明
1SslHandler处理可靠安全连接内置仅在生产环境,需要进行ssl加解密
2HttpClientCodecHTTP 编解码内置对Http请求进行解码与编码
3HttpObjectAggregator聚合HTTP 请求或响应内置将http请求或响应聚合为一个完整对象
4IdleStateHandler空闲监测内置监测空闲状态,触发后续超时处理
5HeartbeatTimeoutHandler心跳超时处理自定义心跳超时执行关闭连接,触发重连
6WebSocketClientHandshakerHandlerWebSocket专用处理自定义处理WebSocket的握手以及Ping、Pong、Close消息
7HeartbeatRequestHandler发送心跳请求自定义客户端向服务端定时发送心跳
8ValidateMessageHandler验证消息自定义对消息进行基本验证,如非空,日期格式正确等
9DistinctMessageHandler过滤掉重复消息自定义根据消息唯一性标识判断,如曾接收过,则直接返回成功处理,不再进行后续处理
10MessageTypeDecodeHandler文本反序列化成消息对象自定义将文本按消息类型转换为请求消息或响应消息
11RequestMessageBusinessHandler处理请求消息自定义请求消息业务逻辑处理器
12ResponseMessageBusinessHandler处理响应消息自定义响应消息业务逻辑处理器
13TextWebSocketFrameEncodeHandlerJSON格式转文本帧自定义将json格式字符串编码为TextWebSocketFrame
14JsonEncodeHandler对象序列化为JSON字符串自定义将对象序列化为json格式字符串

自定义的10个处理器中,最后两个13和14是出站处理器,注意实际执行顺序是先14后13,也就是,业务逻辑处理器11或12的处理结果是一个对象,先由出站处理器14将其序列化为json字符串,然后再由出站处理器13将其包装为一个WebSocket协议约定的文本帧TextWebSocketFrame。

netty的对于消息的处理,实际使用的是设计模式中的职责链模式,我们这里的自定义处理器数量多一点,但是每个职责都单一,从而部分通用功能的处理器可复用。

整个链条上处理器的装配实现代码如下

/**
 * 初始化通道
 *
 * @author wqliu
 * @date 2021-2-5 15:12
 */
@Slf4j
@Component
public class MessageClientChannelInitializer extends ChannelInitializer<SocketChannel> {


    @Autowired
    private MessageClientGlobalHolder config;

    @Autowired
    private Environment environment;

    /**
     * 生产运行模式
     */
    private final String PRD_MODE="prd";

    /**
     * 初始化channel
     */
    @Override
    public void initChannel(SocketChannel socketChannel) throws Exception {



        //获取通道链路
        ChannelPipeline pipeline = socketChannel.pipeline();

        //仅在生产模式下加载ssl过滤器
        String mode=environment.getProperty("spring.profiles.active");
        if(PRD_MODE.equals(mode)){
            //ssl
            SSLContext sslContext = createSslContext();
            SSLEngine engine = sslContext.createSSLEngine();
            engine.setNeedClientAuth(false);
            engine.setUseClientMode(false);
            pipeline.addLast(new SslHandler(engine));
        }


        //HTTP 编解码
        pipeline.addLast(new HttpClientCodec());

        // 聚合为单个 FullHttpRequest 或者 FullHttpResponse
        pipeline.addLast(new HttpObjectAggregator(64 * 1024));

        /**
         * 注意,因WebSocketClientHandshakerHandler继承自SimpleChannelInboundHandler,会自动释放消息
         * 对于收到服务端的pong消息,默认情况下不会往通道后续的处理器传递,所以若放到WebSocketClientHandshakerHandler之后,
         * 则会产生读空闲,导致心跳超时失效。
         */
        // 添加读写通道空闲处理器,当空闲满足设置时,会触发userEventTrigger,由下个处理器获取到
        pipeline.addLast(new IdleStateHandler(config.getReadIdleTimeOut(), 0,
                0, TimeUnit.SECONDS));

        // 心跳超时处理
        pipeline.addLast(new HeartbeatTimeoutHandler());


        //处理web socket协议与握手
        pipeline.addLast("hookedHandler", new WebSocketClientHandshakerHandler());


        //心跳发送
        pipeline.addLast(new HeartbeatRequestHandler(config.getHeartbeatRate()));


        //数据基本验证
        pipeline.addLast(new ValidateMessageHandler());

        //去重
        pipeline.addLast(new DistinctMessageHandler());


        //将文本按消息类型转换为请求消息或响应消息
        pipeline.addLast(new MessageTypeDecodeHandler());

        //请求消息业务逻辑处理器
        pipeline.addLast(new RequestMessageBusinessHandler());

        //响应消息业务逻辑处理器
        pipeline.addLast(new ResponseMessageBusinessHandler());

        //编码为TextWebSocketFrame
        pipeline.addLast(new TextWebSocketFrameEncodeHandler());

        //json序列化
        pipeline.addLast(new JsonEncodeHandler());


    }

    /**
     * 创建ssl上下文对象
     * @param type
     * @param path
     * @param password
     * @return
     * @throws Exception
     */
    public SSLContext createSslContext() throws Exception {

        //读取配置信息
        String path=environment.getProperty("server.ssl.key-store");
        log.info("证书地址:{}",path);
        String password=environment.getProperty("server.ssl.key-store-password");
        String type=environment.getProperty("server.ssl.key-store-type");

        //构建证书上下文对象
        KeyStore ks = KeyStore.getInstance(type);
        path=path.replace("classpath:","");
        log.info("处理后的证书地址:{}",path);
        ClassPathResource resource = new ClassPathResource(path);
        InputStream ksInputStream = resource.getInputStream();
        ks.load(ksInputStream, password.toCharArray());
        //KeyManagerFactory充当基于密钥内容源的密钥管理器的工厂。
        KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
        kmf.init(ks, password.toCharArray());
        //SSLContext的实例表示安全套接字协议的实现,它充当用于安全套接字工厂或 SSLEngine 的工厂。
        SSLContext sslContext = SSLContext.getInstance("TLS");
        sslContext.init(kmf.getKeyManagers(), null, null);
        return sslContext;
    }


}

举报

相关推荐

0 条评论