上篇说了,消息客户端的启动框架基本是netty的标准模式,关键在于自定义的处理器链条的配置上,也就是MessageClientChannelInitializer的实现。
一共涉及到14个消息处理器,与服务端相比,一是多了1个心跳发送的处理器,二是WebSocket专用处理器,不是内置而是自定义实现的。
其中4个是netty内置的,只是进行了参数配置,其他10个是自己实现的,用于处理逻辑和数据的,依次如下:
序号 | 处理器类型 | 职责 | 实现 | 说明 |
---|---|---|---|---|
1 | SslHandler | 处理可靠安全连接 | 内置 | 仅在生产环境,需要进行ssl加解密 |
2 | HttpClientCodec | HTTP 编解码 | 内置 | 对Http请求进行解码与编码 |
3 | HttpObjectAggregator | 聚合HTTP 请求或响应 | 内置 | 将http请求或响应聚合为一个完整对象 |
4 | IdleStateHandler | 空闲监测 | 内置 | 监测空闲状态,触发后续超时处理 |
5 | HeartbeatTimeoutHandler | 心跳超时处理 | 自定义 | 心跳超时执行关闭连接,触发重连 |
6 | WebSocketClientHandshakerHandler | WebSocket专用处理 | 自定义 | 处理WebSocket的握手以及Ping、Pong、Close消息 |
7 | HeartbeatRequestHandler | 发送心跳请求 | 自定义 | 客户端向服务端定时发送心跳 |
8 | ValidateMessageHandler | 验证消息 | 自定义 | 对消息进行基本验证,如非空,日期格式正确等 |
9 | DistinctMessageHandler | 过滤掉重复消息 | 自定义 | 根据消息唯一性标识判断,如曾接收过,则直接返回成功处理,不再进行后续处理 |
10 | MessageTypeDecodeHandler | 文本反序列化成消息对象 | 自定义 | 将文本按消息类型转换为请求消息或响应消息 |
11 | RequestMessageBusinessHandler | 处理请求消息 | 自定义 | 请求消息业务逻辑处理器 |
12 | ResponseMessageBusinessHandler | 处理响应消息 | 自定义 | 响应消息业务逻辑处理器 |
13 | TextWebSocketFrameEncodeHandler | JSON格式转文本帧 | 自定义 | 将json格式字符串编码为TextWebSocketFrame |
14 | JsonEncodeHandler | 对象序列化为JSON字符串 | 自定义 | 将对象序列化为json格式字符串 |
自定义的10个处理器中,最后两个13和14是出站处理器,注意实际执行顺序是先14后13,也就是,业务逻辑处理器11或12的处理结果是一个对象,先由出站处理器14将其序列化为json字符串,然后再由出站处理器13将其包装为一个WebSocket协议约定的文本帧TextWebSocketFrame。
netty的对于消息的处理,实际使用的是设计模式中的职责链模式,我们这里的自定义处理器数量多一点,但是每个职责都单一,从而部分通用功能的处理器可复用。
整个链条上处理器的装配实现代码如下
/**
* 初始化通道
*
* @author wqliu
* @date 2021-2-5 15:12
*/
@Slf4j
@Component
public class MessageClientChannelInitializer extends ChannelInitializer<SocketChannel> {
@Autowired
private MessageClientGlobalHolder config;
@Autowired
private Environment environment;
/**
* 生产运行模式
*/
private final String PRD_MODE="prd";
/**
* 初始化channel
*/
@Override
public void initChannel(SocketChannel socketChannel) throws Exception {
//获取通道链路
ChannelPipeline pipeline = socketChannel.pipeline();
//仅在生产模式下加载ssl过滤器
String mode=environment.getProperty("spring.profiles.active");
if(PRD_MODE.equals(mode)){
//ssl
SSLContext sslContext = createSslContext();
SSLEngine engine = sslContext.createSSLEngine();
engine.setNeedClientAuth(false);
engine.setUseClientMode(false);
pipeline.addLast(new SslHandler(engine));
}
//HTTP 编解码
pipeline.addLast(new HttpClientCodec());
// 聚合为单个 FullHttpRequest 或者 FullHttpResponse
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
/**
* 注意,因WebSocketClientHandshakerHandler继承自SimpleChannelInboundHandler,会自动释放消息
* 对于收到服务端的pong消息,默认情况下不会往通道后续的处理器传递,所以若放到WebSocketClientHandshakerHandler之后,
* 则会产生读空闲,导致心跳超时失效。
*/
// 添加读写通道空闲处理器,当空闲满足设置时,会触发userEventTrigger,由下个处理器获取到
pipeline.addLast(new IdleStateHandler(config.getReadIdleTimeOut(), 0,
0, TimeUnit.SECONDS));
// 心跳超时处理
pipeline.addLast(new HeartbeatTimeoutHandler());
//处理web socket协议与握手
pipeline.addLast("hookedHandler", new WebSocketClientHandshakerHandler());
//心跳发送
pipeline.addLast(new HeartbeatRequestHandler(config.getHeartbeatRate()));
//数据基本验证
pipeline.addLast(new ValidateMessageHandler());
//去重
pipeline.addLast(new DistinctMessageHandler());
//将文本按消息类型转换为请求消息或响应消息
pipeline.addLast(new MessageTypeDecodeHandler());
//请求消息业务逻辑处理器
pipeline.addLast(new RequestMessageBusinessHandler());
//响应消息业务逻辑处理器
pipeline.addLast(new ResponseMessageBusinessHandler());
//编码为TextWebSocketFrame
pipeline.addLast(new TextWebSocketFrameEncodeHandler());
//json序列化
pipeline.addLast(new JsonEncodeHandler());
}
/**
* 创建ssl上下文对象
* @param type
* @param path
* @param password
* @return
* @throws Exception
*/
public SSLContext createSslContext() throws Exception {
//读取配置信息
String path=environment.getProperty("server.ssl.key-store");
log.info("证书地址:{}",path);
String password=environment.getProperty("server.ssl.key-store-password");
String type=environment.getProperty("server.ssl.key-store-type");
//构建证书上下文对象
KeyStore ks = KeyStore.getInstance(type);
path=path.replace("classpath:","");
log.info("处理后的证书地址:{}",path);
ClassPathResource resource = new ClassPathResource(path);
InputStream ksInputStream = resource.getInputStream();
ks.load(ksInputStream, password.toCharArray());
//KeyManagerFactory充当基于密钥内容源的密钥管理器的工厂。
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(ks, password.toCharArray());
//SSLContext的实例表示安全套接字协议的实现,它充当用于安全套接字工厂或 SSLEngine 的工厂。
SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(kmf.getKeyManagers(), null, null);
return sslContext;
}
}