处理器清单
上篇说了,我们除了使用netty内置的处理器外,还自定义了10个处理器用于实现自己的业务处理,所有处理器清单如下:
序号 | 处理器类型 | 职责 | 实现 | 说明 |
---|---|---|---|---|
1 | SslHandler | 处理可靠安全连接 | 内置 | 仅在生产环境,需要进行ssl加解密 |
2 | HttpClientCodec | HTTP 编解码 | 内置 | 对Http请求进行解码与编码 |
3 | HttpObjectAggregator | 聚合HTTP 请求或响应 | 内置 | 将http请求或响应聚合为一个完整对象 |
4 | IdleStateHandler | 空闲监测 | 内置 | 监测空闲状态,触发后续超时处理 |
5 | HeartbeatTimeoutHandler | 心跳超时处理 | 自定义 | 心跳超时执行关闭连接,触发重连 |
6 | WebSocketClientHandshakerHandler | WebSocket专用处理 | 自定义 | 处理WebSocket的握手以及Ping、Pong、Close消息 |
7 | HeartbeatRequestHandler | 发送心跳请求 | 自定义 | 客户端向服务端定时发送心跳 |
8 | ValidateMessageHandler | 验证消息 | 自定义 | 对消息进行基本验证,如非空,日期格式正确等 |
9 | DistinctMessageHandler | 过滤掉重复消息 | 自定义 | 根据消息唯一性标识判断,如曾接收过,则直接返回成功处理,不再进行后续处理 |
10 | MessageTypeDecodeHandler | 文本反序列化成消息对象 | 自定义 | 将文本按消息类型转换为请求消息或响应消息 |
11 | RequestMessageBusinessHandler | 处理请求消息 | 自定义 | 请求消息业务逻辑处理器 |
12 | ResponseMessageBusinessHandler | 处理响应消息 | 自定义 | 响应消息业务逻辑处理器 |
13 | TextWebSocketFrameEncodeHandler | JSON格式转文本帧 | 自定义 | 将json格式字符串编码为TextWebSocketFrame |
14 | JsonEncodeHandler | 对象序列化为JSON字符串 | 自定义 | 将对象序列化为json格式字符串 |
心跳超时处理
我们的心跳策略是客户端来发送心跳,服务端响应心跳,客户端发现服务端未及时响应心跳时,认为通道异常,进行重连。
这个处理器实际是配合netty内置的空闲检测处理器IdleStateHandler使用的,只有满足IdleStateHandler中设置的触发条件,才会触发本处理器中的userEventTriggered方法,执行自定义的逻辑操作。
客户端每隔固定时间频率向服务器端发送心跳,WebSocket协议约定的PingWebSocketFrame,服务端收到后马上会回复PongWebSocketFrame,如通道失效或服务端无响应情况下,就会触发客户端读空闲。
同时,我们还覆写了channelInactive方法,在通道失效时,启动重连。虽然处理链条上任何一个处理器的这个方法都可以覆写,但从职责上来说,放到这个处理器里更合理一些。
/**
* 心跳超时处理器
* @author wqliu
* @date 2021-10-2 14:25
**/
@Slf4j
public class HeartbeatTimeoutHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.READER_IDLE)) {
log.info("读空闲");
//关闭连接
ctx.channel().close();
}
} else {
//非空闲事件,传递到下个处理器
super.userEventTriggered(ctx, evt);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("客户端检测通道失效,启动重连");
MessageClient messageClient= SpringUtil.getBean(MessageClient.class);
messageClient.reconnect();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("与服务端建立连接,通道开启!");
//必须调用父类方法,避免其他处理器的channelActive事件不再触发
super.channelActive(ctx);
}
}
WebSocket专用处理
这是很关键的一个处理器,自身也比较复杂。
主要实现是借助netty提供的一个WebSocketClientHandshaker类,在初始化时设置websocket服务端连接信息,然后在客户端启动时,调用该类的发起握手方法handshake,服务器端收到该握手请求后,会进行后续处理,响应一个协议升级,将http协议升级为WebSocket协议。
同时需要注意的是,这里还有一个我们自定义的操作,即在握手成功,协议升级后,客户端发出一个登录服务端的请求消息。
/**
* 处理web socket握手
*
* @author wqliu
* @date 2021-9-28 16:33
**/
@EqualsAndHashCode(callSuper = true)
@Slf4j
@Data
public class WebSocketClientHandshakerHandler extends SimpleChannelInboundHandler<Object> {
/**
* 握手
*/
private WebSocketClientHandshaker handshaker;
/**
* 握手 异步处理
*/
private ChannelPromise handshakeFuture;
public WebSocketClientHandshakerHandler() {
//初始化握手处理者
MessageClientGlobalHolder config = SpringUtil.getBean(MessageClientGlobalHolder.class);
URI webSocketUri = null;
try {
webSocketUri = new URI(config.getWebSocketUrl());
} catch (URISyntaxException e) {
log.error("解析远程服务器地址出错", e);
}
WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(webSocketUri, WebSocketVersion.V13, (String) null, true, new DefaultHttpHeaders());
this.setHandshaker(handshaker);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
FullHttpResponse response;
//进行握手操作
if (!this.handshaker.isHandshakeComplete()) {
try {
response = (FullHttpResponse) msg;
//握手协议返回,设置结束握手
this.handshaker.finishHandshake(ch, response);
//设置成功
this.handshakeFuture.setSuccess();
} catch (WebSocketHandshakeException var7) {
FullHttpResponse res = (FullHttpResponse) msg;
String errorMsg = String.format("握手失败,status:%s,reason:%s", res.status(), res.content().toString(CharsetUtil.UTF_8));
this.handshakeFuture.setFailure(new Exception(errorMsg));
}
} else if (msg instanceof FullHttpResponse) {
//已握手成功并将http协议升级为了WebSocket协议,不应再收到Http消息,发生这种情况则抛出异常
response = (FullHttpResponse) msg;
throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
} else if (msg instanceof CloseWebSocketFrame) {
log.info("收到关闭信息");
} else if (msg instanceof TextWebSocketFrame) {
ReferenceCountUtil.retain(msg);
ctx.fireChannelRead(msg);
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
//设置握手成功后,发起登录请求
this.handshakeFuture = ctx.newPromise();
ChannelFuture handshakeFuture = this.handshakeFuture;
handshakeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
//发送登录请求
log.info("握手成功");
LoginRequestSender login = new LoginRequestSender();
login.sendMessage();
} else {
//握手失败
log.error("握手失败", future.cause());
}
}
});
}
/**
* 发起握手
*/
public void handshake(Channel channel) {
this.getHandshaker().handshake(channel);
}
}
发送心跳请求
心跳机制是客户端每隔固定时间频率向服务器端发送心跳,WebSocket协议约定的PingWebSocketFrame。
这里是从ChannelHandlerContext拿到EventLoop对象,调用了任务调度方法scheduleWithFixedDelay,实现了定时心跳发送功能。
/**
* 心跳请求处理器
* @author wqliu
* @date 2021-10-2 13:24
**/
@Slf4j
public class HeartbeatRequestHandler extends ChannelInboundHandlerAdapter {
/**
* 心跳发送间隔,单位秒
*/
private int heartbeatInterval=5;
public HeartbeatRequestHandler(int heartbeatInterval){
this.heartbeatInterval=heartbeatInterval;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
EventLoop eventLoop = ctx.channel().eventLoop();
eventLoop.scheduleWithFixedDelay(new Runnable() {
private Channel channel;
@Override
public void run() {
// log.info("发送心跳");
PingWebSocketFrame frame=new PingWebSocketFrame();
ChannelFuture channelFuture = channel.writeAndFlush(frame);
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// log.error(future.isSuccess()+"",future.cause());
}
});
}
public Runnable setChannel(Channel channel){
this.channel=channel;
return this;
}
}.setChannel(ctx.channel()),15,heartbeatInterval, TimeUnit.SECONDS);
//必须调用父类方法,避免其他处理器的channelActive事件不再触发
super.channelActive(ctx);
}
}
验证消息
对输入有效验证,可以避免后续大量的异常问题,这里实际是基本的数据验证,主要验证属性是否为空、日期格式是否合法,消息类型在枚举类定义的范围内。
这里面我们加了一句日志,打印消息,这样可以保证在系统调试阶段,所有收到的消息可以在控制台打印;系统运行阶段,所有收到的消息可以通过磁盘日志文件追溯查看。
/**
* 消息验证处理器
* @author wqliu
* @date 2022-1-19 13:50
**/
@Slf4j
public class ValidateMessageHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
TextWebSocketFrame textWebSocketFrame=(TextWebSocketFrame)msg;
String content = textWebSocketFrame.text();
log.info("收到消息:{}",content);
BaseMessage message = JSON.parseObject(content, BaseMessage.class);
if(message!=null) {
//验证消息属性
validateProperty(message);
ctx.fireChannelRead(textWebSocketFrame);
}else {
// 当收到未按约定格式发送过来的无法解析的消息时,仅记录错误日志
log.error("收到无法解析的消息:{}",content);
}
}
/**
* 验证消息属性
*
* @param message 消息
*/
protected void validateProperty(BaseMessage message) {
String errorCode;
String errorMessage;
// id
if (StringUtils.isBlank(message.getId())){
errorCode = "S001";
errorMessage = "消息标识不能为空";
throw new MessageException(errorCode, errorMessage);
}
// 主题
if (StringUtils.isBlank(message.getTopic())) {
errorCode = "S002";
errorMessage = "消息主题不能为空";
throw new MessageException(errorCode, errorMessage);
}
// 发布者
if (StringUtils.isBlank(message.getPublishAppCode())) {
errorCode = "S003";
errorMessage = "消息发布者不能为空";
throw new MessageException(errorCode, errorMessage);
}
// 发布时间
String publishTimeString = message.getPublishTime();
if (StringUtils.isBlank(publishTimeString)) {
errorCode = "S004";
errorMessage = "发布时间不能为空";
throw new MessageException(errorCode, errorMessage);
} else if (!ValidateUtil.dateIsFormat(publishTimeString))
{
errorCode = "S005";
errorMessage = "发布时间格式不正确";
throw new MessageException(errorCode, errorMessage);
}
// 消息类型
if (StringUtils.isBlank(message.getMessageType())) {
errorCode = "S006";
errorMessage = "消息类型不能为空";
throw new MessageException(errorCode, errorMessage);
}else{
if(EnumUtils.isValidEnum(MessageTypeEnum.class,message.getMessageType())==false){
errorCode = "S007";
errorMessage = "消息类型无效";
throw new MessageException(errorCode, errorMessage);
}
}
}
}
过滤重复消息
我们为了保证消息传输的可靠性,采用请求应答机制,当发送方未及时收到响应时,会进行消息重发,而接收方,则需要去重。
本处理器的职责就是去重,实现机制是根据消息日志查询消息标识是否已存在,若不存在,则继续往下处理,若已存在,则返回处理成功的响应。
/**
* 去重处理器
* @author wqliu
* @date 2022-1-19 13:50
**/
public class DistinctMessageHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private ApiMessageLogService apiMessageLogService = SpringUtil.getBean(ApiMessageLogService.class);
private ApiMessageTopicService apiMessageTopicService = SpringUtil.getBean(ApiMessageTopicService.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {
String content = textWebSocketFrame.text();
BaseMessage message = JSON.parseObject(content, BaseMessage.class);
String messageType = message.getMessageType();
String messageId = message.getId();
String topic=message.getTopic();
boolean hasReceived=false;
if(messageType.equals(MessageTypeEnum.REQUEST.name())){
hasReceived = apiMessageLogService.checkRequestMessageExist(messageId);
if(hasReceived){
//发送响应,终止流程
String responseTopic = apiMessageTopicService.getResponseTopicCodeByCode(topic);
RequestMessageSender sender = (RequestMessageSender) MessageSenderFactory.createSender(responseTopic);
ApiMessageLog log = apiMessageLogService.getByRequestMessageId(messageId);
sender.sendMessage(log.getResponseData(), log.getResponseId());
}else{
//继续往下传递
ReferenceCountUtil.retain(textWebSocketFrame);
ctx.fireChannelRead(textWebSocketFrame);
}
}else if(messageType.equals(MessageTypeEnum.RESPONSE.name())){
hasReceived =apiMessageLogService.checkResponseMessageExist(messageId);
if(hasReceived){
//不做处理,终止流程
}else{
//继续往下传递
ReferenceCountUtil.retain(textWebSocketFrame);
ctx.fireChannelRead(textWebSocketFrame);
}
}
}
}
文本反序列化成消息对象
我们将消息设计为两大类,请求消息和响应消息,这里通过自己实现的一个处理器,将客户端传来的文本帧,通过消息类型属性反序列化成请求消息对象或响应消息对象,然后继续往下传递。这里调用的是公用的处理器,即服务端也使用相同的处理器。
/**
* 消息类型解码
* @author wqliu
* @date 2021-10-6 11:23
**/
public class MessageTypeDecodeHandler extends MessageToMessageDecoder<TextWebSocketFrame> {
@Override
protected void decode(ChannelHandlerContext ctx, TextWebSocketFrame msg, List<Object> out) throws Exception {
String message=msg.text();
//消息解析
JSONObject jsonObject = JSONObject.parseObject(message);
String messageType = jsonObject.getString("messageType");
if (messageType.equals(MessageTypeEnum.REQUEST.name())) {
RequestMessage requestMessage = JSON.parseObject(message, RequestMessage.class);
out.add(requestMessage);
}else if (messageType.equals(MessageTypeEnum.RESPONSE.name())) {
ResponseMessage responseMessage = JSON.parseObject(message, ResponseMessage.class);
out.add(responseMessage);
}
}
}
处理请求消息/响应消息
上一步把消息内容通过解码形成了请求消息或响应消息,而这两个处理器只需加入到链条中即可,根据传入的消息类型,也就是泛型参数类型,会自动识别处理或者往下传递。
这里采用了类似的处理机制,通过简单工厂模式,根据消息主题来获取到对应的业务消息处理器,由具体的消息处理器来进行业务逻辑处理
/**
* 客户端请求消息业务逻辑处理
* @author wqliu
* @date 2021-2-5 16:25
**/
@Slf4j
public class RequestMessageBusinessHandler extends SimpleChannelInboundHandler<RequestMessage> {
private ApiMessageLogService apiMessageLogService = SpringUtil.getBean(ApiMessageLogService.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, RequestMessage message) throws Exception {
String requestMessageId= StringUtils.EMPTY;
String topic =StringUtils.EMPTY;
try {
//获取消息主题
topic = message.getTopic();
//获取消息标识
requestMessageId=message.getId();
//转具体的消息处理器进行处理
RequestMessageHandler handler = (RequestMessageHandler) MessageHandlerFactory.createHandler(topic);
handler.handleMessage(message,ctx.channel());
} catch (Exception e) {
// 记录消息请求日志
apiMessageLogService.createRequestPart(message);
String errorMessage = e.getMessage();
//统一响应错误
ResponseError responseError=new ResponseError();
responseError.sendResponse(ctx.channel(), MessageResponseResultEnum.ERROR.name(),errorMessage,requestMessageId,
topic);
log.error("发生异常:" , e);
}
}
}
下面这个响应消息处理器跟请求消息处理器非常类似,区别在于这是入站的最后一个处理器,因此我们覆写了exceptionCaught方法,来打印错误日志。
/**
* 客户端响应消息业务逻辑处理
* @author wqliu
* @date 2021-2-5 16:25
**/
@Slf4j
public class ResponseMessageBusinessHandler extends SimpleChannelInboundHandler<ResponseMessage> {
private ApiMessageLogService apiMessageLogService = SpringUtil.getBean(ApiMessageLogService.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, ResponseMessage message) throws Exception {
String requestMessageId= StringUtils.EMPTY;
String topic =StringUtils.EMPTY;
try {
//获取消息主题
topic = message.getTopic();
//获取消息标识
requestMessageId=message.getId();
//转具体的消息处理器进行处理
ResponseMessageHandler handler = (ResponseMessageHandler) MessageHandlerFactory.createHandler(topic);
handler.handleMessage(message,ctx.channel());
} catch (Exception e) {
// 更新消息日志
apiMessageLogService.updateResponsePart(message);
String errorMessage = e.getMessage();
//统一响应错误
ResponseError responseError=new ResponseError();
responseError.sendResponse(ctx.channel(), MessageResponseResultEnum.ERROR.name(),errorMessage,requestMessageId,topic);
log.error("发生异常:" , e);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("发生异常", cause);
}
}
JSON格式转文本帧
这个非常简单,就是把JSON格式字符串放到文本帧中,是一个公用的处理器。
这里面我们加了一句日志,打印消息,这样可以保证在系统调试阶段,所有发送的消息可以在控制台打印;系统运行阶段,所有发送的消息可以通过磁盘日志文件追溯查看。
/**
* 将json格式字符串编码为TextWebSocketFrame
* @author wqliu
* @date 2021-10-6 11:23
**/
@Slf4j
public class TextWebSocketFrameEncodeHandler extends MessageToMessageEncoder<String> {
@Override
protected void encode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception {
log.info("发出消息:{}",msg);
TextWebSocketFrame frame=new TextWebSocketFrame(msg);
out.add(frame);
}
}
对象序列化为JSON字符串
这个比较简单,就是把JSON格式字符串放到文本帧中,是一个公用的处理器。
/**
* 将对象序列化为json格式字符串
* @author wqliu
* @date 2021-10-6 11:23
**/
public class JsonEncodeHandler extends MessageToMessageEncoder<BaseMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, BaseMessage msg, List<Object> out) throws Exception {
if(msg instanceof BaseMessage) {
out.add(JSONObject.toJSONString(msg));
}else{
out.add(msg);
}
}
}