一、引言
那Nacos服务端是怎么感知客户端的状态是否可用呢 ?
二、目录
目录
三、服务端实例心跳接口源码分析
主线任务:实例心跳接口做了哪些事情 ?
public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
}
// 组装请求参数
Map<String, String> params = new HashMap<String, String>(8);
Map<String, String> bodyMap = new HashMap<String, String>(2);
if (!lightBeatEnabled) {
bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
}
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
params.put("ip", beatInfo.getIp());
params.put("port", String.valueOf(beatInfo.getPort()));
// 发送实例心跳接口请求
String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
return JacksonUtils.toObj(result);
}
@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {
// 省略部分代码
// 获取请求参数namespaceId、serviceName
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
// 通过namespaceId、serviceName、ip、port、clusterName 从内存注册表当中获取对应的 Instance 实例对象
Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
// 如果 instance 为空,那么会重新注册
if (instance == null) {
if (clientBeat == null) {
result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
return result;
}
instance = new Instance();
instance.setPort(clientBeat.getPort());
instance.setIp(clientBeat.getIp());
instance.setWeight(clientBeat.getWeight());
instance.setMetadata(clientBeat.getMetadata());
instance.setClusterName(clusterName);
instance.setServiceName(serviceName);
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(clientBeat.isEphemeral());
// 这里调用重新注册的方法
serviceManager.registerInstance(namespaceId, serviceName, instance);
}
// 通过namespaceId、serviceName获取对应的 Service
Service service = serviceManager.getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.SERVER_ERROR,
"service not found: " + serviceName + "@" + namespaceId);
}
if (clientBeat == null) {
clientBeat = new RsInfo();
clientBeat.setIp(ip);
clientBeat.setPort(port);
clientBeat.setCluster(clusterName);
}
// 重点:开启异步任务,更改 lastBeat 属性
service.processClientBeat(clientBeat);
// 省略部分代码
return result;
}
public void processClientBeat(final RsInfo rsInfo) {
ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
clientBeatProcessor.setService(this);
clientBeatProcessor.setRsInfo(rsInfo);
// 立即执行
HealthCheckReactor.scheduleNow(clientBeatProcessor);
}
public class ClientBeatProcessor implements Runnable {
public static final long CLIENT_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);
private RsInfo rsInfo;
private Service service;
@JsonIgnore
public PushService getPushService() {
return ApplicationUtils.getBean(PushService.class);
}
public RsInfo getRsInfo() {
return rsInfo;
}
public void setRsInfo(RsInfo rsInfo) {
this.rsInfo = rsInfo;
}
public Service getService() {
return service;
}
public void setService(Service service) {
this.service = service;
}
@Override
public void run() {
Service service = this.service;
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
}
// 本小节重点方法
// 获取当前 ip、clusterName
String ip = rsInfo.getIp();
String clusterName = rsInfo.getCluster();
int port = rsInfo.getPort();
Cluster cluster = service.getClusterMap().get(clusterName);
// 获取当前 cluster 下所有的临时实例
List<Instance> instances = cluster.allIPs(true);
// 遍历临时实例
for (Instance instance : instances) {
// 通过判断ip、port,确认是否是当前 instance 的实例
if (instance.getIp().equals(ip) && instance.getPort() == port) {
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
}
// 把 lastBeat属性更改为当前时间
instance.setLastBeat(System.currentTimeMillis());
if (!instance.isMarked()) {
// 如果 instance 为不健康状态,更改为健康状态
if (!instance.isHealthy()) {
instance.setHealthy(true);
Loggers.EVT_LOG
.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
cluster.getService().getName(), ip, port, cluster.getName(),
UtilsAndCommons.LOCALHOST_SITE);
getPushService().serviceChanged(service);
}
}
}
}
}
}
小结:
四、服务端实例心跳健康检查定时任务源码分析
主线任务:服务端是怎么维护不健康的实例的,怎么下线不健康实例的,做了哪些操作 ?
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
// 不知道是创建了一个什么服务
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
// 根据namespaceId、serviceName获取 Service服务
Service service = getService(namespaceId, serviceName);
// service为空就抛出异常
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
// 上面都是分支代码
// 主线任务:添加服务实例
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
public void init() {
// 开启异步延时任务 clientBeatCheckTask ,每5s执行一次
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}
@Override
public void run() {
try {
// 本章重点
// 获取全部临时实例
List<Instance> instances = service.allIPs(true);
for (Instance instance : instances) {
// 当前时间 - instance中 lastBeat属性时间 > 15s
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
if (instance.isHealthy()) {
// 如果这个 instance 实例还是健康状态,就更改为 "不健康状态"!
instance.setHealthy(false);
Loggers.EVT_LOG
.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
instance.getIp(), instance.getPort(), instance.getClusterName(),
service.getName(), UtilsAndCommons.LOCALHOST_SITE,
instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
// 事件发布监听事件,通过 upd 协议发送通知
getPushService().serviceChanged(service);
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
}
}
}
}
if (!getGlobalConfig().isExpireInstance()) {
return;
}
// 这里还是遍历 临时实例
for (Instance instance : instances) {
if (instance.isMarked()) {
continue;
}
// 当前时间 - instance中 lastBeat属性时间 > 30s
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
JacksonUtils.toJson(instance));
// 直接从注册表中删除当前 instance
deleteIp(instance);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}
}
小结:
五、总结
总结:
最后的最后,别忘了把源码分析图补充完整: