消息服务通信机制为异步,且网络连接不是100%可靠,会因为网络闪断问题丢失消息,作为企业应用,需要保证业务消息传输的可靠性,需实现以下机制:
a)发送方重发机制:消息发送方对未收到响应的消息进行重发,重发时保证消息唯一性标识、消息内容不变
b)接收方消息去重机制:消息接收方依据消息的唯一性标识,对收到的消息进行验证,判断是否已处理过,如已处理过则不再进行处理
实现思路比较简单,通过消息日志表,来缓存待发送或发送失败的消息,然后通过定时器,来执行一段逻辑,从消息日志重建消息,找到要接收消息的客户端连接,然后推送消息。
需要注意的是,消息客户端的角色可能是消息发送者(生产者),也可能是消息接收者(消费者),甚至兼具两种角色。为了保证消息的可靠性,实际消息重发也需要分别实现两段,即从消息生产者到消息中心和从消息中心到消费者。
在消息服务端之消息重发一文中,我们介绍了从消息中心到消费者这一段的消息重发,今天来介绍下从生产者到消息中心这一段的消息重发。
从生产者到消息中心这一段的重发策略如下:
1.消息日志除了要保存消息内容,并可以依据日志重新构建出要发送的消息。
2.基于消息日志重新构建的消息,消息标识需要保持不变,发送时间需要更新为最新,前者是为了接收方能依据唯一性的消息标识去重,后者则是需要通过时效性验证。
3.当通信通道长时间失效时,如接收方宕机,导致发送方积压了大量消息,当重新建立连接后,发送方短时间内推送大量消息会导致接收方负荷过大,因此需要发送方控制重发消息的频率和数量,例如每隔30秒推送10条(可配置)。
4.如消息无限重发,会随着时间累积逐步积压,影响性能甚至堵塞网络,因此需要设置重发次数上限,默认发送1次,重发3次,如仍未收到响应,则停止重发,转人工处理。
5.消息重发需要遵循时间升序处理,即先产生的消息先发送,尽可能避免接收方先收到作废再收到创建消息(注:仅是降低发生的概率,受网络传输的不确定性影响,先发未必先至,并且接收方也是多线程处理)。
定时器有很多种实现方式,例如jdk自带的timer,spring框架的task以及专门的任务调度组件quartz。netty框架自身也考虑了这方面,在EventLoop对象中封装了相关功能。
如下面代码所示,我们在客户端的启动代码里,调用workerGroup对象的scheduleAtFixedRate方法,来启动消息重发。
/**
* 启动客户端方法
*/
public void start() {
EventLoopGroup workerGroup = new NioEventLoopGroup(1);
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(messageClientChannelInitializer);
//客户端与服务端连接的通道,final修饰表示只会有一个
ChannelFuture channelFuture = bootstrap.connect(config.getHost(), config.getPort());
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
//未成功
log.error("连接失败", future.cause());
//执行重连
reconnect();
} else {
log.info("连接成功");
Channel channel = future.channel();
//将channel保存到全局变量
config.setChannel(channel);
//发起握手
WebSocketClientHandshakerHandler handler = (WebSocketClientHandshakerHandler) channel.pipeline().get("hookedHandler");
handler.handshake(config.getChannel());
}
}
});
//消息重发
if(config.isEnableResend()){
log.info("启动消息重发机制");
//延迟30秒启动,给建立连接预留充足的时间
workerGroup.scheduleAtFixedRate(()->{
resendMessage.resend();
},30,config.getSendMessageSpan(), TimeUnit.SECONDS);
}
//等待服务器端关闭
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
log.error("消息客户端启动失败:{}" + e.getMessage(), e);
//执行重连
reconnect();
} finally {
workerGroup.shutdownGracefully();
}
}
重发消息的关键实现如下,获取要重发的消息,最后根据消息主题构建发送器,传入消息标识和内容后对外发送
/**
* 重发消息
*
* @author wqliu
* @date 2022-1-24 8:22
**/
@Component
@Slf4j
public class ResendMessage {
@Autowired
private ApiMessageLogService apiMessageLogService;
@Autowired
private MessageClientGlobalHolder config;
public void resend() {
//需要进行异常处理,否则某次异常会导致定时器停止运行
try {
//判断连接状态
if(config.getChannel()!=null){
//查找待重发的消息
List<ApiMessageLog> list =
apiMessageLogService.getResendMessage(config.getSendMessageCount(),config.getMaxSendCount());
for (int i = 0; i < list.size(); i++) {
ApiMessageLog log=list.get(i);
//根据消息主题构建发送器
RequestMessageSender sender = (RequestMessageSender)MessageSenderFactory.createSender(log.getRequestTopicCode());
//传入原请求的消息标识和消息内容
sender.sendMessage(log.getRequestData(),log.getRequestId());
}
}
} catch (Exception e) {
log.error("消息重发处理异常",e);
}
}
}
获取需重发消息的应用列表的实现方法如下,注释中标注了不少需要注意的点
@Override
public List<ApiMessageLog> getResendMessage(int messageCount,int maxSendCount) {
LocalDateTime now=LocalDateTime.now();
//获取当前时间之前15秒的时间点
LocalDateTime beforeNow = now.minusSeconds(15);
try {
LambdaQueryChainWrapper<ApiMessageLog> query = this.lambdaQuery()
//消息状态为待发送或已发送
.and(x -> x.eq(ApiMessageLog::getStatus, MessageStatusEnum.WAIT_REQUEST.name())
.or(y -> y.eq(ApiMessageLog::getStatus, MessageStatusEnum.REQUESTED.name())))
// 排除掉登录请求
.ne(ApiMessageLog::getRequestTopicCode, "framework.login.request")
//发送次数小于设置的最大发送次数
.lt(ApiMessageLog::getSendCount, maxSendCount)
//请求时间小于当前时间15秒,避免刚产生的消息尚未收到服务端响应时就进行重发
.lt(ApiMessageLog::getRequestTime, beforeNow)
//按照请求时间升序排列
.orderByAsc(ApiMessageLog::getRequestTime)
//只取指定数量的消息
.last("limit " + messageCount);
return query.list();
}catch (Exception e) {
log.error("获取发送信息失败",e);
return null;
}
}