0
点赞
收藏
分享

微信扫一扫

Spring cloud Alibaba Nacos 注册中心(4) 服务发现原理

绪风 2022-04-25 阅读 95

Nacos服务发现原理

文章目录

1、NacosServiceDiscovery

public NacosServiceDiscovery(NacosDiscoveryProperties discoveryProperties,
			NacosServiceManager nacosServiceManager) {
		this.discoveryProperties = discoveryProperties;
		this.nacosServiceManager = nacosServiceManager;
	}

​ 通过构造方法以及方法内容得知,它是nacos服务发现的服务层,底层获取服务列表信息也是通过前面我们提过的NacosServiceManager的namingService进行操作。值得注意的是

/**
	 * Return all instances for the given service.
	 * @param serviceId id of service
	 * @return list of instances
	 * @throws NacosException nacosException
	 */
//这个方法是通过本地缓存 serviceInfoMap拿的
public List<ServiceInstance> getInstances(String serviceId) throws NacosException {
		...
	}

	/**
	 * Return the names of all services.
	 * @return list of service names
	 * @throws NacosException nacosException
	 */
//这个方法数据是直连nacos server使用api请求获取
	public List<String> getServices() throws NacosException {
		...
	}


为什么这么做?不可以都从本地缓存获取吗?

2、NacosDiscoveryClient

​ 实现DiscoveryClient接口,此接口是Spring cloud 服务发现规范,可以把它理解成 NacosServiceDiscovery的包装类,后面的服务注册也采用的这个设计模式。Spring cloud的负载均衡会通过 DiscoveryClient 进行服务列表的获取从而进行选择。具体后面研究Spring cloud LoadBalancer时再说。

3、NacosWatch

​ 通过名字猜到是nacos的观察服务,那么观察啥呢?

//构造方法只是简单的赋值以及初始化 线程池
public NacosWatch(NacosServiceManager nacosServiceManager,
			NacosDiscoveryProperties properties,
			ObjectProvider<ThreadPoolTaskScheduler> taskScheduler) {
		this.nacosServiceManager = nacosServiceManager;
		this.properties = properties;
		this.taskScheduler = taskScheduler.stream().findAny()
				.orElseGet(NacosWatch::getTaskScheduler);
	}

​ 此类实现SmartLifecycle 接口,通过源码得知,当bean初始化完成后,会执行start方法。

public void start() {
		if (this.running.compareAndSet(false, true)) {
            //注册事件监听器,根据后面的subscribe得知,监听的是InstanceChangeEvent,回调listener时				//会转换成namingEvent。	//com.alibaba.nacos.client.naming.event.InstancesChangeNotifier#onEvent
			EventListener eventListener = listenerMap.computeIfAbsent(buildKey(),
					event -> new EventListener() {
						@Override
						public void onEvent(Event event) {
							if (event instanceof NamingEvent) {
								List<Instance> instances = ((NamingEvent) event)
										.getInstances();
								Optional<Instance> instanceOptional = selectCurrentInstance(
										instances);
								instanceOptional.ifPresent(currentInstance -> {
                                    //对比重置元数据
									resetIfNeeded(currentInstance);
								});
							}
						}
					});

			NamingService namingService = nacosServiceManager
					.getNamingService(properties.getNacosProperties());
			try {
                //serviceName+groupName组成key,放到InstanceChangeNotifier map中,订阅事件
                //这里同时还会执行HostReactor#getServiceInfo
				namingService.subscribe(properties.getService(), properties.getGroup(),
						Arrays.asList(properties.getClusterName()), eventListener);
			}
			catch (Exception e) {
				log.error("namingService subscribe failed, properties:{}", properties, e);
			}
			//每隔30S发送HeartbeatEvent
			this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
					this::nacosServicesWatch, this.properties.getWatchDelay());
		}
	}

 //HostReactor方法,注册instanceChangeEvent的监听器
 public void subscribe(String serviceName, String clusters, EventListener eventListener) {
        notifier.registerListener(serviceName, clusters, eventListener);
        getServiceInfo(serviceName, clusters);
 }

//同步当前服务信息到本地缓存,并启动任务
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
        
        NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
        String key = ServiceInfo.getKey(serviceName, clusters);
    	//之前说过的故障转移开关
        if (failoverReactor.isFailoverSwitch()) {
            return failoverReactor.getService(key);
        }
        //从本地缓存 serviceInfoMap中获取服务
        ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
        //服务不存在,则先将服务放入本地缓存,并且发起http请求从服务端拉取该服务的具体信息
        if (null == serviceObj) {
            serviceObj = new ServiceInfo(serviceName, clusters);
            
            serviceInfoMap.put(serviceObj.getKey(), serviceObj);
            
            updatingMap.put(serviceName, new Object());
            updateServiceNow(serviceName, clusters);
            updatingMap.remove(serviceName);
            //这块的设计是因为不存在时,先放入了 updateingMap,这里锁住一会 不用重复发起请求
        } else if (updatingMap.containsKey(serviceName)) {
            
            if (UPDATE_HOLD_INTERVAL > 0) {
                // hold a moment waiting for update finish
                synchronized (serviceObj) {
                    try {
                        serviceObj.wait(UPDATE_HOLD_INTERVAL);
                    } catch (InterruptedException e) {
                        NAMING_LOGGER
                                .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                    }
                }
            }
        }
        //给当前服务添加更新定时任务 1S执行一次,
    	//任务内容:从server端同步服务最小信息到本地缓存
        scheduleUpdateIfAbsent(serviceName, clusters);
        
        return serviceInfoMap.get(serviceObj.getKey());
    }

4、总结

  1. 给当前服务注册 InstanceChangeEvent事件监听器
  2. 每隔30S发送心跳事件,这个事件的监听者有 routeRefreshListener,并根据atomic值判断是否需要更新路由
  3. 更新本地服务缓存,并启动一个每隔1S执行的定时任务,同步服务端该服务最新信息
举报

相关推荐

0 条评论