上篇我们对心跳的发送时机做了优化,客户端从握手成功即发送心跳,调整为在登录成功后,才启动心跳发送,从而使心跳机制起到保障客户端与服务端真正建立并保持逻辑上的业务消息通道的有效性。
我们再回顾下心跳机制的策略:
客户端来发送心跳,服务端响应心跳,客户端发现服务端未及时响应心跳时,认为通道异常,进行重连。
具体来说,实现就是客户端每隔固定时间频率向服务器端发送心跳,WebSocket协议约定的PingWebSocketFrame,服务端收到后马上会回复PongWebSocketFrame,如通道失效或服务端无响应情况下,就会触发客户端读空闲。
我们原先的实现,是通过netty内置的IdleStateHandler处理器来设置读空闲的超时时间
// 添加读写通道空闲处理器,当空闲满足设置时,会触发userEventTrigger,由下个处理器获取到
pipeline.addLast(new IdleStateHandler(config.getReadIdleTimeOut(), 0,
0, TimeUnit.SECONDS));
// 心跳超时处理
pipeline.addLast(new HeartbeatTimeoutHandler());
由自己实现的HeartbeatTimeoutHandler处理器来处理超时事件
/**
* 心跳超时处理器
* @author wqliu
* @date 2021-10-2 14:25
**/
@Slf4j
public class HeartbeatTimeoutHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.READER_IDLE)) {
log.info("读空闲");
//关闭连接
ctx.channel().close();
}
} else {
//非空闲事件,传递到下个处理器
super.userEventTriggered(ctx, evt);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//置空连接通道
MessageClientGlobalHolder.channel=null;
log.info("客户端检测通道失效,启动重连");
MessageClient messageClient= SpringUtil.getBean(MessageClient.class);
messageClient.reconnect();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("与服务端建立连接,通道开启!");
//必须调用父类方法,避免其他处理器的channelActive事件不再触发
super.channelActive(ctx);
}
}
这样实现并没有什么问题,实际上,netty还封装了一个更为遍历的处理器ReadTimeoutHandler,源码如下:
public class ReadTimeoutHandler extends IdleStateHandler {
private boolean closed;
/**
* Creates a new instance.
*
* @param timeoutSeconds
* read timeout in seconds
*/
public ReadTimeoutHandler(int timeoutSeconds) {
this(timeoutSeconds, TimeUnit.SECONDS);
}
/**
* Creates a new instance.
*
* @param timeout
* read timeout
* @param unit
* the {@link TimeUnit} of {@code timeout}
*/
public ReadTimeoutHandler(long timeout, TimeUnit unit) {
super(timeout, 0, 0, unit);
}
@Override
protected final void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
assert evt.state() == IdleState.READER_IDLE;
readTimedOut(ctx);
}
/**
* Is called when a read timeout was detected.
*/
protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
if (!closed) {
ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);
ctx.close();
closed = true;
}
}
}
可以看到,这个处理器实际是继承了我们原先使用的IdleStateHandler,特别是关于读超时时间的设置,实际就是调用父类的构造方法实现的。
不过这个处理器默认空闲是触发异常,关闭通道,而我们的设计是自动自动重连,因此我们只要自己定义一个类,继承自ReadTimeoutHandler,并且覆写readTimedOut方法,实现我们的逻辑即可。
/**
* 自定义的读超时处理器
* @author wqliu
* @date 2022-2-10 8:43
**/
@Slf4j
public class CustomReadTimeoutHandler extends ReadTimeoutHandler {
public CustomReadTimeoutHandler(int timeoutSeconds) {
super(timeoutSeconds);
}
@Override
protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
log.info("读空闲");
//关闭连接
ctx.channel().close();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//置空连接通道
MessageClientGlobalHolder.channel=null;
log.info("客户端检测通道失效,启动重连");
MessageClient messageClient= SpringUtil.getBean(MessageClient.class);
messageClient.reconnect();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("与服务端建立连接,通道开启!");
//必须调用父类方法,避免其他处理器的channelActive事件不再触发
super.channelActive(ctx);
}
}
然后在处理器装配环节,可以取消之前的两个处理器,只装入新的处理器,即可实现心跳超时的处理。
// 添加读写通道空闲处理器,当空闲满足设置时,会触发userEventTrigger,由下个处理器获取到
// pipeline.addLast(new IdleStateHandler(config.getReadIdleTimeOut(), 0,
// 0, TimeUnit.SECONDS));
//
// // 心跳超时处理
// pipeline.addLast(new HeartbeatTimeoutHandler());
// 读超时处理
pipeline.addLast(new CustomReadTimeoutHandler(config.getReadIdleTimeOut()));
同理,原先实现的服务端的两个处理器,也可以使用新实现的1个处理器CustomReadTimeoutHandler来替换。
/**
* 自定义的读超时处理器
* @author wqliu
* @date 2022-2-10 8:43
**/
@Slf4j
public class CustomReadTimeoutHandler extends ReadTimeoutHandler {
public CustomReadTimeoutHandler(int timeoutSeconds) {
super(timeoutSeconds);
}
@Override
protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
//发生读空闲超时,说明客户端不再发送心跳,关闭该连接
ctx.channel().close();
log.info("服务端检测到客户端不再发送心跳,主动关闭连接");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("与客户端建立连接,通道开启!");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("与客户端连接失效,通道关闭!");
MessageServerHolder.removeChannel(ctx.channel());
}
}