Nacos服务发现原理
文章目录
1、NacosServiceDiscovery
public NacosServiceDiscovery(NacosDiscoveryProperties discoveryProperties,
NacosServiceManager nacosServiceManager) {
this.discoveryProperties = discoveryProperties;
this.nacosServiceManager = nacosServiceManager;
}
通过构造方法以及方法内容得知,它是nacos服务发现的服务层,底层获取服务列表信息也是通过前面我们提过的NacosServiceManager的namingService进行操作。值得注意的是
/**
* Return all instances for the given service.
* @param serviceId id of service
* @return list of instances
* @throws NacosException nacosException
*/
//这个方法是通过本地缓存 serviceInfoMap拿的
public List<ServiceInstance> getInstances(String serviceId) throws NacosException {
...
}
/**
* Return the names of all services.
* @return list of service names
* @throws NacosException nacosException
*/
//这个方法数据是直连nacos server使用api请求获取
public List<String> getServices() throws NacosException {
...
}
为什么这么做?不可以都从本地缓存获取吗?
2、NacosDiscoveryClient
实现DiscoveryClient接口,此接口是Spring cloud 服务发现规范,可以把它理解成 NacosServiceDiscovery的包装类,后面的服务注册也采用的这个设计模式。Spring cloud的负载均衡会通过 DiscoveryClient 进行服务列表的获取从而进行选择。具体后面研究Spring cloud LoadBalancer时再说。
3、NacosWatch
通过名字猜到是nacos的观察服务,那么观察啥呢?
//构造方法只是简单的赋值以及初始化 线程池
public NacosWatch(NacosServiceManager nacosServiceManager,
NacosDiscoveryProperties properties,
ObjectProvider<ThreadPoolTaskScheduler> taskScheduler) {
this.nacosServiceManager = nacosServiceManager;
this.properties = properties;
this.taskScheduler = taskScheduler.stream().findAny()
.orElseGet(NacosWatch::getTaskScheduler);
}
此类实现SmartLifecycle 接口,通过源码得知,当bean初始化完成后,会执行start方法。
public void start() {
if (this.running.compareAndSet(false, true)) {
//注册事件监听器,根据后面的subscribe得知,监听的是InstanceChangeEvent,回调listener时 //会转换成namingEvent。 //com.alibaba.nacos.client.naming.event.InstancesChangeNotifier#onEvent
EventListener eventListener = listenerMap.computeIfAbsent(buildKey(),
event -> new EventListener() {
@Override
public void onEvent(Event event) {
if (event instanceof NamingEvent) {
List<Instance> instances = ((NamingEvent) event)
.getInstances();
Optional<Instance> instanceOptional = selectCurrentInstance(
instances);
instanceOptional.ifPresent(currentInstance -> {
//对比重置元数据
resetIfNeeded(currentInstance);
});
}
}
});
NamingService namingService = nacosServiceManager
.getNamingService(properties.getNacosProperties());
try {
//serviceName+groupName组成key,放到InstanceChangeNotifier map中,订阅事件
//这里同时还会执行HostReactor#getServiceInfo
namingService.subscribe(properties.getService(), properties.getGroup(),
Arrays.asList(properties.getClusterName()), eventListener);
}
catch (Exception e) {
log.error("namingService subscribe failed, properties:{}", properties, e);
}
//每隔30S发送HeartbeatEvent
this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
this::nacosServicesWatch, this.properties.getWatchDelay());
}
}
//HostReactor方法,注册instanceChangeEvent的监听器
public void subscribe(String serviceName, String clusters, EventListener eventListener) {
notifier.registerListener(serviceName, clusters, eventListener);
getServiceInfo(serviceName, clusters);
}
//同步当前服务信息到本地缓存,并启动任务
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
//之前说过的故障转移开关
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
//从本地缓存 serviceInfoMap中获取服务
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
//服务不存在,则先将服务放入本地缓存,并且发起http请求从服务端拉取该服务的具体信息
if (null == serviceObj) {
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
updatingMap.put(serviceName, new Object());
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName);
//这块的设计是因为不存在时,先放入了 updateingMap,这里锁住一会 不用重复发起请求
} else if (updatingMap.containsKey(serviceName)) {
if (UPDATE_HOLD_INTERVAL > 0) {
// hold a moment waiting for update finish
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER
.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}
//给当前服务添加更新定时任务 1S执行一次,
//任务内容:从server端同步服务最小信息到本地缓存
scheduleUpdateIfAbsent(serviceName, clusters);
return serviceInfoMap.get(serviceObj.getKey());
}
4、总结
- 给当前服务注册 InstanceChangeEvent事件监听器
- 每隔30S发送心跳事件,这个事件的监听者有 routeRefreshListener,并根据atomic值判断是否需要更新路由
- 更新本地服务缓存,并启动一个每隔1S执行的定时任务,同步服务端该服务最新信息