目录
一、 Nacos概述
- Nacos是什么?
- Nacos中的相关概念
二、微服务的注册与发现
- Nacos-client提供注册接口的原理
- 服务注册的原理分析
- 服务注册的时机分析
- NacosServiceRegistryAutoConfiguration类以及核心的四个类
- Nacos-server的服务注册原理
- 服务注册接口的逻辑实现
- 服务注册的容器实现
- 服务心跳检测实现
- 服务心跳检测原理分析
- 服务发现原理分析
一、Nacos概述
1.Nacos是什么?
What does it do
Nacos (official site: nacos.io) is an easy-to-use platform designed for dynamic service discovery(动态服务发现) and configuration(动态配置) and service management.(服务管理) It helps you to build cloud native applications and microservices platform easily.
Service is a first-class citizen in Nacos. (服务是Nacos的公民)Nacos supports almost all type of services,for example,Dubbo/gRPC service, Spring Cloud RESTFul service or Kubernetes service.
Nacos provides four major functions.
Service Discovery and Service Health Check 服务发现与服务健康
Nacos makes it simple for services to register themselves and to discover other services via a DNS or HTTP interface. Nacos also provides real-time health checks of services to prevent sending requests to unhealthy hosts or service instances.
Dynamic Configuration Management 动态配置管理
Dynamic Configuration Service allows you to manage configurations of all services in a centralized and dynamic manner across all environments. Nacos eliminates 消除 the need to redeploy重发布 applications and services when configurations are updated, which makes configuration changes more efficient and agile.
Dynamic DNS Service 动态DNS路由服务
Nacos supports weighted routing, making it easier for you to implement mid-tier load balancing, flexible routing policies, flow control, and simple DNS resolution services in the production environment within your data center. It helps you to implement DNS-based service discovery easily and prevent applications from coupling to vendor-specific service discovery APIs.
Service and MetaData Management 服务和元数据管理
Nacos provides an easy-to-use service dashboard 控制面板 to help you manage your services metadata 元数据, configuration 配置, kubernetes DNS, service health 服务健康 and metrics statistics指标统计.
官网:https://nacos.io/zh-cn/docs/what-is-nacos.html有很详细点的QuickStrart,下载一个nacos-server然后执行指令即可开启Nacos服务
GitHub地址:https://github.com/alibaba/nacos,这个地址是下载的源码文件,他是一个Maven构建的SpringBoot工程,包含Nacos-server和Nacos-client
2.Nacos中相关概念
- 地域
物理的数据中心,资源创建成功后不能更换。 - 可用区
同一地域内,电力和网络互相独立的物理区域。同一可用区内,实例的网络延迟较低。 - 接入点
地域的某个服务的入口域名。 - 命名空间
用于进行租户粒度的配置隔离。不同的命名空间下,可以存在相同的 Group 或 Data ID 的配置。Namespace 的常用场景之一是不同环境的配置的区分隔离,例如开发测试环境和生产环境的资源(如配置、服务)隔离等。 - 配置
在系统开发过程中,开发者通常会将一些需要变更的参数、变量等从代码中分离出来独立管理,以独立的配置文件的形式存在。目的是让静态的系统工件或者交付物(如 WAR,JAR 包等)更好地和实际的物理运行环境进行适配。配置管理一般包含在系统部署的过程中,由系统管理员或者运维人员完成。配置变更是调整系统运行时的行为的有效手段。 - 配置管理
系统配置的编辑、存储、分发、变更管理、历史版本管理、变更审计等所有与配置相关的活动。 - 配置项
一个具体的可配置的参数与其值域,通常以 param-key=param-value 的形式存在。例如我们常配置系统的日志输出级别(logLevel=INFO|WARN|ERROR) 就是一个配置项。 - 配置集
一组相关或者不相关的配置项的集合称为配置集。在系统中,一个配置文件通常就是一个配置集,包含了系统各个方面的配置。例如,一个配置集可能包含了数据源、线程池、日志级别等配置项。 - 配置集 ID
Nacos 中的某个配置集的 ID。配置集 ID 是组织划分配置的维度之一。Data ID 通常用于组织划分系统的配置集。一个系统或者应用可以包含多个配置集,每个配置集都可以被一个有意义的名称标识。Data ID 通常采用类 Java 包(如 com.taobao.tc.refund.log.level)的命名规则保证全局唯一性。此命名规则非强制。 - 配置分组
Nacos 中的一组配置集,是组织配置的维度之一。通过一个有意义的字符串(如 Buy 或 Trade )对配置集进行分组,从而区分 Data ID 相同的配置集。当您在 Nacos 上创建一个配置时,如果未填写配置分组的名称,则配置分组的名称默认采用 DEFAULT_GROUP 。配置分组的常见场景:不同的应用或组件使用了相同的配置类型,如 database_url 配置和 MQ_topic 配置。 - 配置快照
Nacos 的客户端 SDK 会在本地生成配置的快照。当客户端无法连接到 Nacos Server 时,可以使用配置快照显示系统的整体容灾能力。配置快照类似于 Git 中的本地 commit,也类似于缓存,会在适当的时机更新,但是并没有缓存过期(expiration)的概念。 - 服务
通过预定义接口网络访问的提供给客户端的软件功能。 - 服务名
服务提供的标识,通过该标识可以唯一确定其指代的服务。 - 服务注册中心
存储服务实例和服务负载均衡策略的数据库。 - 服务发现
在计算机网络上,(通常使用服务名)对服务下的实例的地址和元数据进行探测,并以预先定义的接口提供给客户端进行查询。 - 元信息
Nacos数据(如配置和服务)描述信息,如服务版本、权重、容灾策略、负载均衡策略、鉴权配置、各种自定义标签 (label),从作用范围来看,分为服务级别的元信息、集群的元信息及实例的元信息。 - 应用
用于标识服务提供方的服务的属性。 - 服务分组
不同的服务可以归类到同一分组。 - 虚拟集群
同一个服务下的所有服务实例组成一个默认集群, 集群可以被进一步按需求划分,划分的单位可以是虚拟集群。 - 实例
提供一个或多个服务的具有可访问网络地址(IP:Port)的进程。 - 权重
实例级别的配置。权重为浮点数。权重越大,分配给该实例的流量越大。 - 健康检查
以指定方式检查服务下挂载的实例 (Instance) 的健康度,从而确认该实例 (Instance) 是否能提供服务。根据检查结果,实例 (Instance) 会被判断为健康或不健康。对服务发起解析请求时,不健康的实例 (Instance) 不会返回给客户端。 - 健康保护阈值
为了防止因过多实例 (Instance) 不健康导致流量全部流向健康实例 (Instance) ,继而造成流量压力把健康实例 (Instance) 压垮并形成雪崩效应,应将健康保护阈值定义为一个 0 到 1 之间的浮点数。当域名健康实例数 (Instance) 占总服务实例数 (Instance) 的比例小于该值时,无论实例 (Instance) 是否健康,都会将这个实例 (Instance) 返回给客户端。这样做虽然损失了一部分流量,但是保证了集群中剩余健康实例 (Instance) 能正常工作。
二、微服务注册与发现
1.Nacos-client提供注册接口原理
分析微服务模块是如何借助nacos-client提供的接口完成注册的?以及注册的时机?
// 1.1服务注册的逻辑原理分析
Nacos是一个SpringBoot工程,其他微服务模块也是一个SpringBoot工程,当其他微服务模块引入对应的服务注册与发现的依赖之后
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
查看该依赖,发现会引入nacos-client、nacos-api等依赖
然后在服务启动的时候,会调用nacos-client中提供的OpenAPI接口,像nacos-server发起http请求完成服务注册,OpenAPI注册接口举栗
// 1.2服务注册的时机分析
当微服务启动的时候就要像nacos-server注册,此处探究的是什么时候完成OpenAPI注册接口的调用的?
分析入口从自动装配开始,自动装配原理参考往期博客:《Spring源码深度解析 郝佳 第2版》SpringBoot体系分析、Starter的原理,导入相应的依赖starter依赖之后,内部包含一个spring.factories文件,该文件内容
// 1.3NacosServiceRegistryAutoConfiguration类以及核心的四个类
微服务注册的自动配置类,发现内部主要注册了3个bean,主要涉及的关键类
-
NacosDiscoveryProperties
:承载配置文件的实例 -
NacosServiceRegistry
:实现ServiceRegistry接口,内部包含真正的注册register方法,需要拿到参数NacosRegistration -
NacosRegistration
:实现Registration接口,内部继承ServiceInstance接口,包含getInstanceId、getSrviceId、getHost、getPort等方法 -
NacosAutoServiceRegistration
:继承AbstractAutoServiceRegistration完成注册的核心实现,AbstractAutoServiceRegistration实现了ApplicationListener接口重写onApplicationEvent方法,当监听到NacosAutoServiceRegistration 实例创建的时候会调用相应的bind—>start—>register—>NacosServiceRegistry.register—>NacosNamingService.registerInstance—>NamingProxy.registerService,最后完成配置文件的填充然后注册,注册的话就是调用reqApi—>委托NacosRestTemplate.exchangeForm---->this.requestClient().execute(uri, httpMethod, requestEntity)完成http请求
// 标注配置类
@Configuration(
proxyBeanMethods = false
)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
// 是否开启自动注册,缺省开启
@ConditionalOnProperty(
value = {"spring.cloud.service-registry.auto-registration.enabled"},
matchIfMissing = true
)
@AutoConfigureAfter({AutoServiceRegistrationConfiguration.class, AutoServiceRegistrationAutoConfiguration.class, NacosDiscoveryAutoConfiguration.class})
public class NacosServiceRegistryAutoConfiguration {
public NacosServiceRegistryAutoConfiguration() {
}
// 1. 需要注册的serviceDiscovery封装,内部封装 NacosDiscoveryProperties
@Bean
public NacosServiceRegistry nacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosServiceRegistry(nacosDiscoveryProperties);
}
// 2. NacosRegistration
@Bean
@ConditionalOnBean({AutoServiceRegistrationProperties.class})
public NacosRegistration nacosRegistration(ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers, NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) {
return new NacosRegistration((List)registrationCustomizers.getIfAvailable(), nacosDiscoveryProperties, context);
}
// 3. NacosAutoServiceRegistration
@Bean
@ConditionalOnBean({AutoServiceRegistrationProperties.class})
public NacosAutoServiceRegistration nacosAutoServiceRegistration(NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) {
return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration);
}
}
// NacosDiscoveryProperties配置信息的封装bean
@ConfigurationProperties("spring.cloud.nacos.discovery")
public class NacosDiscoveryProperties {
private static final Logger log = LoggerFactory.getLogger(NacosDiscoveryProperties.class);
public static final String PREFIX = "spring.cloud.nacos.discovery";
private static final Pattern PATTERN = Pattern.compile("-(\\w)");
private String serverAddr;
private String username;
private String password;
private String endpoint;
private String namespace;
private long watchDelay = 30000L;
private String logName;
@Value("${spring.cloud.nacos.discovery.service:${spring.application.name:}}")
private String service;
private float weight = 1.0F;
private String clusterName = "DEFAULT";
private String group = "DEFAULT_GROUP";
private String namingLoadCacheAtStart = "false";
private Map<String, String> metadata = new HashMap();
private boolean registerEnabled = true;
private String ip;
private String networkInterface = "";
private int port = -1;
private boolean secure = false;
private String accessKey;
private String secretKey;
private Integer heartBeatInterval;
private Integer heartBeatTimeout;
private Integer ipDeleteTimeout;
private boolean instanceEnabled = true;
private boolean ephemeral = true;
...
...
}
// AbstractAutoServiceRegistration完成注册的核心实现
AbstractAutoServiceRegistration实现了ApplicationListener接口重写onApplicationEvent方法,当监听到NacosAutoServiceRegistration 实例创建的时候会调用相应的bind—>start—>register—>NacosServiceRegistry.register—>NacosNamingService.registerInstance—>NamingProxy.registerService,最后完成配置文件的填充然后注册,注册的话就是调用reqApi—>委托NacosRestTemplate.exchangeForm---->this.requestClient().execute(uri, httpMethod, requestEntity)完成http请求
// NamingProxy.registerService方法,内部封装注册service信息,调用reqApi完成接口调用
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
LogUtils.NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", new Object[]{this.namespaceId, serviceName, instance});
Map<String, String> params = new HashMap(16);
params.put("namespaceId", this.namespaceId);
params.put("serviceName", serviceName);
params.put("groupName", groupName);
params.put("clusterName", instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
// 像对应的服务注册OpenAPI发请求
this.reqApi(UtilAndComs.nacosUrlInstance, params, "POST");
}
public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers, String method) throws NacosException {
params.put("namespaceId", this.getNamespaceId());
if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(this.nacosDomain)) {
throw new NacosException(400, "no server available");
} else {
NacosException exception = new NacosException();
if (StringUtils.isNotBlank(this.nacosDomain)) {
int i = 0;
while(i < this.maxRetry) {
try {
return this.callServer(api, params, body, this.nacosDomain, method);
} catch (NacosException var12) {
exception = var12;
if (LogUtils.NAMING_LOGGER.isDebugEnabled()) {
LogUtils.NAMING_LOGGER.debug("request {} failed.", this.nacosDomain, var12);
}
++i;
}
}
} else {
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());
int i = 0;
while(i < servers.size()) {
String server = (String)servers.get(index);
try {
// 调用接口
return this.callServer(api, params, body, server, method);
} catch (NacosException var13) {
exception = var13;
if (LogUtils.NAMING_LOGGER.isDebugEnabled()) {
LogUtils.NAMING_LOGGER.debug("request {} failed.", server, var13);
}
index = (index + 1) % servers.size();
++i;
}
}
}
LogUtils.NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", new Object[]{api, servers, exception.getErrCode(), exception.getErrMsg()});
throw new NacosException(exception.getErrCode(), "failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());
}
}
后续就是委托NacosRestTemplate.exchangeForm---->this.requestClient().execute(uri, httpMethod, requestEntity)完成http请求
2.Nacos-server的服务注册原理
nacos-server接收到服务注册请求处理过程及原理,在naming模块下可以看到nacos-server处理注册、心跳等请求的controllers
// 2.1服务注册的接口实现
在InstanceController中,可以看到register方法,也就是由各个微服务调用该nacos-server的http接口完成服务注册
// InstanceController
@CanDistro
@PostMapping
@Secured(action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
final Instance instance = HttpRequestInstanceBuilder.newBuilder()
.setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();
// 委托ServiceManager完成注册
getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
return "ok";
}
// ServiceManager
/**
* Register an instance to a service in AP mode.
*
* <p>This method creates service or cluster silently if they don't exist.
*
* @param namespaceId id of namespace
* @param serviceName service name
* @param instance instance to register
* @throws Exception any error occurred in the process
*/
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
// 1. 根据namespaceId, serviceName 初始化一个空服务Service结构,便于存放服务的信息Service,乃至注册Instance实例
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
// 验证上一步据namespaceId, serviceName生成的Service结构,保证不为空,方便下面的注册Instance
Service service = getService(namespaceId, serviceName);
checkServiceIsNull(service, namespaceId, serviceName);
// 2. 真正的服务注册,注册Instance
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
// 2.2服务注册的容器实现
// Service及注册表结构
上面分析了注册接口方法registerInstance内部的实现逻辑,在此方法内部涉及承载注册Service的容器就是nacos的注册表,是一个双层map结构private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
- // 外层map的key可以理解为环境,如dev、prod、test等,对应namespace
- // 内层的key可以理解为group,如订单服务的组,每个组中可以包含集群部署的微服务,对应groupName
- // 内层的value可以理解为就是各个的微服务模块Service信息,对应name,其中每个Service又可以对应多个Instance实例
服务抽象的Service结构
// Service
public class Service implements Serializable {
private static final long serialVersionUID = -3470985546826874460L;
/**
* service name. 服务名
*/
private String name;
/**
* protect threshold. 阈值
*/
private float protectThreshold = 0.0F;
/**
* application name of this service. 服务的应用名
*/
private String appName;
/**
* Service group to classify services into different sets. 服务的所属组名
*/
private String groupName;
private Map<String, String> metadata = new HashMap<String, String>();
}
// 2.2.1. 根据namespaceId, serviceName 初始化一个空服务Service结构,便于存放服务的信息Service,乃至注册Instance实例
首先会根据 namespaceId, serviceName 判断Service是否存在,不存在则生成的空服务Service实例,然后进行属性填充,之后核心在 putServiceAndInit
方法
// ServiceManager
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
throws NacosException {
Service service = getService(namespaceId, serviceName);
// 还不存在Service实例,需要生成一个空的,然后填充
if (service == null) {
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
service.validate();
// 委托putService 进行Service注册 已及 心跳检测
putServiceAndInit(service);
if (!local) {
addOrReplaceService(service);
}
}
}
接着分析在上面ServiceManager.createServiceIfAbsent---->putServiceAndInit
内部会调用
-
putService
:像注册表添加服务信息,还不包含Instance实例 -
init
:将服务的状态更新,包括上线、下线,定时任务健康检测,默认5s执行一次检测遍历全部实例,使用当前系统时间减去上一次获取的心跳时间,若大于阈值默认15s,则更新状态为下线
private void putServiceAndInit(Service service) throws NacosException {
// (1) putService像注册表添加服务信息
putService(service);
// 拿到当前Service
service = getService(service.getNamespaceId(), service.getName());
// (2) 内部创建定时任务,更新服务的状态信息
service.init();
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
// (1) putService像注册表添加服务信息
// ServiceManager
// 注册表结构
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
public void putService(Service service) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
// 像容器中添加一个生成的空服务Service,里面还不包含Instance
serviceMap.putIfAbsent(service.getNamespaceId(), new ConcurrentSkipListMap<>());
}
serviceMap.get(service.getNamespaceId()).putIfAbsent(service.getName(), service);
}
// (2) 服务初始化及定时任务更新服务的状态信息
// core.Service extends pojo.Service
public void init() {
// 定时任务健康检测
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
// 集群递归检测
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}
定时检测源码
// HealthCheckReactor
/**
* Schedule client beat check task with a delay.
*
* @param task client beat check task
*/
public static void scheduleCheck(BeatCheckTask task) {
Runnable wrapperTask =
task instanceof NacosHealthCheckTask ? new HealthCheckTaskInterceptWrapper((NacosHealthCheckTask) task)
: task;
futureMap.computeIfAbsent(task.taskKey(),
k -> GlobalExecutor.scheduleNamingHealth(wrapperTask, 5000, 5000, TimeUnit.MILLISECONDS));
}
// 定时执行
public static ScheduledFuture<?> scheduleNamingHealth(Runnable command, long initialDelay, long delay,
TimeUnit unit) {
return NAMING_HEALTH_EXECUTOR.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
// 2.2.2. addInstance真正的服务注册,注册服务每个Instance实例
// Instance结构
每个微服务实例需要注册进去的信息由Instance封装,属性有
- instanceId
- ip
- port
- weight = 1.0D
- healthy = true
- enabled = true
- ephemeral = true
- clusterName
- serviceName
- metadata
public class Instance implements Serializable {
private static final long serialVersionUID = -742906310567291979L;
/**
* unique id of this instance. 实例id
*/
private String instanceId;
/**
* instance ip. 实例ip
*/
private String ip;
/**
* instance port. 实例端口
*/
private int port;
/**
* instance weight. 实例权重
*/
private double weight = 1.0D;
/**
* instance health status. 实例状态
*/
private boolean healthy = true;
/**
* If instance is enabled to accept request. 实例是否开启,默认开启
*/
private boolean enabled = true;
/**
* If instance is ephemeral. 是否是临时实例
*
* @since 1.0.0
*/
private boolean ephemeral = true;
/**
* cluster information of instance. 实例集群
*/
private String clusterName;
/**
* Service information of instance. 实例所属服务名
*/
private String serviceName;
/**
* user extended attributes. 实例元信息
*/
private Map<String, String> metadata = new HashMap<String, String>();
}
// 注册每个Instance实例到注册表中的service原理分析
// ServiceManager
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
// 生成一个key
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
// 当前微服务整体信息,从注册表拿到
Service service = getService(namespaceId, serviceName);
synchronized (service) {
// 当前微服务下的多个实例
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
// 放入map完成注册,委托DelegateConsistencyServiceImpl的put
consistencyService.put(key, instances);
}
}
// DelegateConsistencyServiceImpl
@Override
public void put(String key, Record value) throws NacosException {
mapConsistencyService(key).put(key, value);
}
最终委托给DistroConsistencyServiceImpl的put。内部逻辑:
- // 1.在onPut方法完成真正的注册进map, 最后保存在DataStore中,他的value为Datum类型,datum.value属性为 T,可以包含Recod类型,也就是Instances类型转化的,对应多个Instance实例。然后将注册的实例信息放进一个同步阻塞队列之后就返回,因此Nacos速度很高,然后在使用多线程消费队列中的消息,在
- // 2.在sync方法中完成Nacos集群中实例信息的同步
// DistroConsistencyServiceImpl implements EphemeralConsistencyService
@Override
public void put(String key, Record value) throws NacosException {
// 1. 真正的放入map
onPut(key, value);
// If upgrade to 2.0.X, do not sync for v1.
if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
return;
}
// 2. 一般Nacos都是集群,集群同步信息
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
DistroConfig.getInstance().getSyncDelayMillis());
}
//DistroConsistencyServiceImpl
// 1. onPut
public void onPut(String key, Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
// 1.1 最后保存在DataStore中,datum的value属性为 T,可以包含Recod类型,也就是Instances类型转化的,对应多个Instance实例
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
// 1.2 添加注册的实例到内部类 高速阻塞队列
notifier.addTask(key, DataOperation.CHANGE);
}
// 核心1:内部类Notifier 完成注册实例消息入阻塞队列、出队消费等任务,立即返回
// 内部类Notifier
public class Notifier implements Runnable {
private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
// 同步阻塞队列,存放需要注册微服务实例
private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
/**
* Add new notify task to queue.
*
* @param datumKey data key
* @param action action for data
*/
// 1.2 添加注册的实例到内部类 高速阻塞队列
public void addTask(String datumKey, DataOperation action) {
if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
// 入队
tasks.offer(Pair.with(datumKey, action));
}
public int getTaskSize() {
return tasks.size();
}
// 多线程消费队列中的实例,调用handle处理
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
for (; ; ) {
try {
// 取出
Pair<String, DataOperation> pair = tasks.take();
// 消费
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
// 处理
private void handle(Pair<String, DataOperation> pair) {
try {
String datumKey = pair.getValue0();
DataOperation action = pair.getValue1();
services.remove(datumKey);
int count = 0;
if (!listeners.containsKey(datumKey)) {
return;
}
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
if (action == DataOperation.CHANGE) {
// 真正的注册,委托Service完成注册
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
if (action == DataOperation.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
// 日志信息...
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
// 日志信息...
}
} catch (Throwable e) {
// 日志信息...
}
}
// 核心2: 真正的注册,委托Service异步消费同步阻塞队列实现实例注册
// Service
@Override
public void onChange(String key, Instances value) throws Exception {
Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
// 遍历
for (Instance instance : value.getInstanceList()) {
if (instance == null) {
// Reject this abnormal instance list:
throw new RuntimeException("got null instance " + key);
}
if (instance.getWeight() > 10000.0D) {
instance.setWeight(10000.0D);
}
if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
instance.setWeight(0.01D);
}
}
// 真正的注册逻辑
updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
recalculateChecksum();
}
// 注意:异步消费同步阻塞队列实现实例注册的时机
Notifier实现Runnable接口,可以使用多线程处理注册实例,具体的时机就是在DistroConsistencyServiceImpl的init方法,他标注PostConstruct注解,会在Bean完成初始化之后自动调用
@PostConstruct
public void init() {
// 将Notifier 实例交给定时任务线程池处理
GlobalExecutor.submitDistroNotifyTask(notifier);
}
// GlobalExecutor
public static void submitDistroNotifyTask(Runnable runnable) {
DISTRO_NOTIFY_EXECUTOR.submit(runnable);
}
// GlobalExecutor
private static final ScheduledExecutorService DISTRO_NOTIFY_EXECUTOR = ExecutorFactory.Managed
.newSingleScheduledExecutorService(ClassUtils.getCanonicalName(NamingApp.class),
new NameThreadFactory("com.alibaba.nacos.naming.distro.notifier"));
3. 服务心跳检测原理分析
各个Instance注册进nacos之后,Instance会通过nacos客户端开启定时任务定时的像nacos服务发送心跳检测,反馈上下线的状态,及时更新健康状态,首先看看到openAPI的心跳检测接口
// 心跳发送接口
那么心跳检测机制是如何实现的呢?回到前面AbstractAutoServiceRegistration和三个核心实现类逻辑,发现
AbstractAutoServiceRegistration
实现了ApplicationListener
接口重写onApplicationEvent
方法,当监听到NacosAutoServiceRegistration
实例创建的时候会
- 调用相应的
bind—>start—>register—>NacosServiceRegistry.register—>NacosNamingService.registerInstance—>NamingProxy.registerService
- 最后完成配置文件的填充然后注册,注册的话就是调用
reqApi—>委托NacosRestTemplate.exchangeForm---->this.requestClient().execute(uri, httpMethod, requestEntity)
完成http请求
其中在NacosNamingService.registerInstance方法中,发现一个判断临时实例和封装BeatInfo实例的过程
// NacosServiceRegistry
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
} else {
NamingService namingService = this.namingService();
String serviceId = registration.getServiceId();
String group = this.nacosDiscoveryProperties.getGroup();
// 获取当前service的信息封装成instance实例
Instance instance = this.getNacosInstanceFromRegistration(registration);
try {
// 注册Instance
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", new Object[]{group, serviceId, instance.getIp(), instance.getPort()});
} catch (Exception var7) {
log.error("nacos registry, {} register failed...{},", new Object[]{serviceId, registration.toString(), var7});
ReflectionUtils.rethrowRuntimeException(var7);
}
}
}
// NamingService
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
// 判断是不是临时实例,默认是的,也就是不将service持久化到磁盘
if (instance.isEphemeral()) {
// 1. 封装心跳信息
BeatInfo beatInfo = this.beatReactor.buildBeatInfo(groupedServiceName, instance);
// 2. 进行心跳发送任务设置
this.beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
this.serverProxy.registerService(groupedServiceName, groupName, instance);
}
// 1. 封装心跳信息
// BeanReactor
public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) {
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(groupedServiceName);
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
// 默认服务像nacos发送心跳的时间间隔
beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
return beatInfo;
}
// 2. 线程池定时发送心跳请求
// 添加定时发送心跳信息
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
LogUtils.NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
String key = this.buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
BeatInfo existBeat = null;
if ((existBeat = (BeatInfo)this.dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);
}
this.dom2Beat.put(key, beatInfo);
// 线程池,执行BeatTask任务,它是一个Runnable接口
this.executorService.schedule(new BeatReactor.BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size());
}
// BeatTask 类
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}
public void run() {
if (!this.beatInfo.isStopped()) {
long nextTime = this.beatInfo.getPeriod();
try {
JsonNode result = BeatReactor.this.serverProxy.sendBeat(this.beatInfo, BeatReactor.this.lightBeatEnabled);
long interval = result.get("clientBeatInterval").asLong();
boolean lightBeatEnabled = false;
if (result.has("lightBeatEnabled")) {
lightBeatEnabled = result.get("lightBeatEnabled").asBoolean();
}
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
if (interval > 0L) {
nextTime = interval;
}
int code = 10200;
if (result.has("code")) {
code = result.get("code").asInt();
}
if (code == 20404) {
Instance instance = new Instance();
instance.setPort(this.beatInfo.getPort());
instance.setIp(this.beatInfo.getIp());
instance.setWeight(this.beatInfo.getWeight());
instance.setMetadata(this.beatInfo.getMetadata());
instance.setClusterName(this.beatInfo.getCluster());
instance.setServiceName(this.beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try {
BeatReactor.this.serverProxy.registerService(this.beatInfo.getServiceName(), NamingUtils.getGroupName(this.beatInfo.getServiceName()), instance);
} catch (Exception var10) {
}
}
} catch (NacosException var11) {
LogUtils.NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}", new Object[]{JacksonUtils.toJson(this.beatInfo), var11.getErrCode(), var11.getErrMsg()});
}
BeatReactor.this.executorService.schedule(BeatReactor.this.new BeatTask(this.beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
}
4. 服务发现原理分析
入口NacosDiscoveryAutoConfiguration类服务发现自动配置类
// 声明配置类
@Configuration(
proxyBeanMethods = false
)
// 服务自动注册发现开关 开启此配置类生效
@ConditionalOnDiscoveryEnabled
@ConditionalOnNacosDiscoveryEnabled
public class NacosDiscoveryAutoConfiguration {
public NacosDiscoveryAutoConfiguration() {
}
// 1. 承载 NacosDiscoveryProperties配置信息的bean
@Bean
@ConditionalOnMissingBean
public NacosDiscoveryProperties nacosProperties() {
return new NacosDiscoveryProperties();
}
// 2. 构建一个将要注册的ServiceDiscovery
// 参数为上面的NacosDiscoveryProperties 和 NacosServiceManager
@Bean
@ConditionalOnMissingBean
public NacosServiceDiscovery nacosServiceDiscovery(NacosDiscoveryProperties discoveryProperties, NacosServiceManager nacosServiceManager) {
return new NacosServiceDiscovery(discoveryProperties, nacosServiceManager);
}
}
// 2. 构建一个将要注册的ServiceDiscovery
待更新…