0
点赞
收藏
分享

微信扫一扫

Apollo架构分析

飞鸟不急 2021-09-21 阅读 85

最近在使用Apollp这个框架时出现了一些问题,所以抽空翻了一下该框架的源码,并且将翻后的结果稍作整理,如果恰巧您也使用,希望这篇粗浅的架构分析能对您有所帮助。

本文行文将会分为三个部分:

  • 首先分析Apollo整个框架内的组件设计,并谈及组件间协作。
  • 其次会基于SpringBoot分析本地配置文件的加载流程。
  • 再则分析SpringBootApollo之间如何协作来完成整个应用配置的加载。
  • 最后探秘Apollo的动态更新

前置条件

Apollo整体框架分析

以上架构图来自Apollo官方作者提供。这个架构图第一眼不是很好理解,我在官方Github上找到一篇架构分析的文章,其中对Apollo架构,包括如何演进做了重点分析。不管是框架还是协议,乃至任何一门技术,架构设计的演进过程对深入学习框架有至关重要的作用,原文传送门:https://mp.weixin.qq.com/s/-hUaQPzfsl9Lm3IqQW3VDQ

  • NginxLB 对应原来架构图中的(Sofware) Load Balancer,其实可以选择性过滤掉这个组件,因为它只是作为一个负载均衡存在,如果架构图上的服务发现全部指向Meta Server更容易理解。
  • Meta Server在实现上是CofigServer组件中的一部分,只是作为Eureka对外的一个Proxy
  • Eureka作为整个架构核心的注册中心。
  • Portal是一个接触较多的组件,即整个配置入口,直连Portal DB,以及对Admin Service发起调用,IP则来自于Meta Server
  • Client作为日常开始时业务应用依赖的包,配置一般包括apollo.app.idapollo.meta以及apollo.bootstrap.enabled。通过Meta Server获取到ConfigServer的IP信息,并进行服务调用。
  • ConfigServer在真实的实现上其实包括ConfigServerEurekaMeta Server三者,所以这里的ConfigServer与项目包中的config server部署并不同,更恰当的理解是Config Controller接口
  • AdminServerPortal项目提供对config db的操作接口

我倾向于认为主要还是因为PortalClient没有直接注册到注册中心,如果是配置的域名,那么就会走域名解析到进入Nginx层做负载均衡的路程。如果配置的IP地址,那就直接定向某一个Meta Server副本,所以一般会选择配置域名

  • 不同语言
  • 不同的架构体系
  • 考虑到Client的宿主环境已经有了自己的Eureka
  • ...

Eureka作为Spring Cloud体系中核心的一环,对于非该体系下的语言或者服务想要集成进来,都小有成本。

Github官网做了详细的解释,传送门:https://ctripcorp.github.io/apollo/#/zh/design/apollo-design


往下会有较多的代码分析,写的比较乱,比较耗费心神
如果不想往下看,推荐:https://ctripcorp.github.io/apollo/#/zh/design/apollo-design


SpringBoot 如何加载配置文件

由于代码流程极多,如下展示了相关调用的入口,以及整个流程所经过的核心类,核心方法等

SpringApplication#run()
SpringApplication#prepareEnvironment(listeners, applicationArguments);
SpringApplicationRunListeners#environmentPrepared(environment);
SpringApplicationRunListeners#environmentPrepared(environment);
SimpleApplicationEventMulticaster#multicastEvent()
SimpleApplicationEventMulticaster#invokeListener(listener, event);

SimpleApplicationEventMulticaster#doInvokeListener()
ConfigFileApplicationListener#onApplicationEvent()
  
ConfigFileApplicationListener#postProcessEnvironment()
ConfigFileApplicationListener#addPropertySources()
// ApolloApplicationContextInitializer#addPropertySources()
ConfigFileApplicationListener#load()
ConfigFileApplicationListener#doLoadIntoGroup()
PropertySourcesLoader#load()
YamlPropertySourceLoader#load()
ConfigFileApplicationListener#addConfigurationProperties()

其中最需要关注的类是ConfigFileApplicationListener


private Set<String> getSearchNames() {
    // 取到spring.config.name 配置的 bootstrap
    if (this.environment.containsProperty(CONFIG_NAME_PROPERTY)) {
        return asResolvedSet(this.environment.getProperty(CONFIG_NAME_PROPERTY),
                        null);
    }
    // 默认application
    return asResolvedSet(ConfigFileApplicationListener.this.names, DEFAULT_NAMES);
}

private Set<String> getSearchLocations() {
    Set<String> locations = new LinkedHashSet<String>();
    // User-configured settings take precedence, so we do them first
    if (this.environment.containsProperty(CONFIG_LOCATION_PROPERTY)) {
        for (String path : asResolvedSet(
                this.environment.getProperty(CONFIG_LOCATION_PROPERTY), null)) {
            if (!path.contains("$")) {
                path = StringUtils.cleanPath(path);
                if (!ResourceUtils.isUrl(path)) {
                    path = ResourceUtils.FILE_URL_PREFIX + path;
                }
            }
            locations.add(path);
        }
    }
    // "classpath:/,classpath:/config/,file:./,file:./config/" 加载文件的目录
    // 排序后变成 file:./,file:./config/,classpath:/,classpath:/config/
    locations.addAll(
            asResolvedSet(ConfigFileApplicationListener.this.searchLocations,
                    DEFAULT_SEARCH_LOCATIONS));
    return locations;
}

private void load(String location, String name, Profile profile) {
    // ....
    else {
        // Search for a file with the given name
        for (String ext : this.propertiesLoader.getAllFileExtensions()) {
            // properties、yaml两种处理方式
        }
    }
    //....
}

public void load() {
    // .... 

    // Pre-existing active profiles set via Environment.setActiveProfiles()
    // are additional profiles and config files are allowed to add more if
    // they want to, so don't call addActiveProfiles() here.
    Set<Profile> initialActiveProfiles = initializeActiveProfiles();
    this.profiles.addAll(getUnprocessedActiveProfiles(initialActiveProfiles));
    if (this.profiles.isEmpty()) {
        for (String defaultProfileName : this.environment.getDefaultProfiles()) {
            Profile defaultProfile = new Profile(defaultProfileName, true);
            if (!this.profiles.contains(defaultProfile)) {
                this.profiles.add(defaultProfile);
            }
        }
    }

    // The default profile for these purposes is represented as null. We add it
    // last so that it is first out of the queue (active profiles will then
    // override any settings in the defaults when the list is reversed later).
    this.profiles.add(null);

    // .... 取到配置项为local, null,下文去profile时用poll,则先null后local,因为指定了spring.profile.active=local
}

while (!this.profiles.isEmpty()) {
    Profile profile = this.profiles.poll(); // local null, pop会先null后local
    for (String location : getSearchLocations()) { // 获取路径前缀
        if (!location.endsWith("/")) {
            // location is a filename already, so don't search for more
            // filenames
            load(location, null, profile);
        }
        else {
            for (String name : getSearchNames()) { // 获取bootstarp, application
                load(location, name, profile);
            }
        }
    }
    this.processedProfiles.add(profile);
}

如上所示,加载的流程大致为:

file:./bootstrao.properties
file:./bootstrao.yaml
file:./config/bootstrao.properties
file:./config/bootstrao.yaml
classpath:/bootstrao.properties
classpath:/bootstrao.yaml
classpath:/config/bootstrao.properties
classpath:/config/bootstrao.yaml
file:./bootstrao-local.properties
file:./bootstrao-local.yaml
file:./config/bootstrao-local.properties
file:./config/bootstrao-local.yaml
classpath:/bootstrao-local.properties
classpath:/bootstrao-local.yaml
classpath:/config/bootstrao-local.properties
classpath:/config/bootstrao-local.yaml

一般项目文件加载顺序如下:


Apollo加载

ApolloApplicationContextInitializer就是入口,其余ConfigFileApplicationListener相似,均实现了EnvironmentPostProcessor接口

/**
 * Initialize Apollo Configurations Just after environment is ready.
 *
 * @param environment
 */
protected void initialize(ConfigurableEnvironment environment) {

  if (environment.getPropertySources().contains(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME)) {
    //already initialized
    return;
  }

  // 命名空间namespace,参考:https://ctripcorp.github.io/apollo/#/zh/design/apollo-core-concept-namespace?id=_544-changelistener
  String namespaces = environment.getProperty(PropertySourcesConstants.APOLLO_BOOTSTRAP_NAMESPACES, ConfigConsts.NAMESPACE_APPLICATION);
  logger.debug("Apollo bootstrap namespaces: {}", namespaces);
// 默认application,也可以配合多个,通过多个,来隔离
  List<String> namespaceList = NAMESPACE_SPLITTER.splitToList(namespaces);

  CompositePropertySource composite = new CompositePropertySource(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME);
  for (String namespace : namespaceList) {
    // 获取config配置信息
    Config config = ConfigService.getConfig(namespace);
 
 composite.addPropertySource(configPropertySourceFactory.getConfigPropertySource(namespace, config));
  }
  // 将配置项加载到environment环境中,即spring容器上下文
  environment.getPropertySources().addFirst(composite);
}

由上可知,初始化时最重要的就是ConfigServer.getConfig(namespace)

// 初始化
ConfigService#getConfig()
DefaultConfigManager#getConfig()
DefaultConfigFactory#create()
DefaultConfig#initialize()
RemoteConfigRepository#getConfig()
RemoteConfigRepository#sync()
RemoteConfigRepository#loadApolloConfig()

// 启动定时任务
RemoteConfigRepository() // 有些地方称之为推拉结合,其实本质上都是拉,一个是定时一个是长连接
  this.trySync(); // 及时进行一次调用,拉取配置信息
  this.schedulePeriodicRefresh(); // 定时拉取
  this.scheduleLongPollingRefresh(); // 长连接拉取

拉取配置信息如下:

@Override
protected synchronized void sync() {
// ....
  try {
    ApolloConfig previous = m_configCache.get();
    ApolloConfig current = loadApolloConfig(); // 远程加载

    //reference equals means HTTP 304
    if (previous != current) {
      logger.debug("Remote Config refreshed!");
      m_configCache.set(current);
      this.fireRepositoryChange(m_namespace, this.getConfig()); // 触发监听器,对bean进行赋值
    }
  // ....
}

private ApolloConfig loadApolloConfig() {
  // ...
  List<ServiceDTO> configServices = getConfigServices(); // 获取ConfigServer的信息(从meta server获取)
  String url = null;
  for (int i = 0; i < maxRetries; i++) { // 重试
    List<ServiceDTO> randomConfigServices = Lists.newLinkedList(configServices);
    Collections.shuffle(randomConfigServices);
    //Access the server which notifies the client first
    if (m_longPollServiceDto.get() != null) {
      randomConfigServices.add(0, m_longPollServiceDto.getAndSet(null));
    }

    for (ServiceDTO configService : randomConfigServices) {
      if (onErrorSleepTime > 0) {
        logger.warn(
            "Load config failed, will retry in {} {}. appId: {}, cluster: {}, namespaces: {}",
            onErrorSleepTime, m_configUtil.getOnErrorRetryIntervalTimeUnit(), appId, cluster, m_namespace);

        try {
          m_configUtil.getOnErrorRetryIntervalTimeUnit().sleep(onErrorSleepTime); // 发生错误时睡眠时间
        } catch (InterruptedException e) {
          //ignore
        }
      }

      // 构建访问config service的url
      url = assembleQueryConfigUrl(configService.getHomepageUrl(), appId, cluster, m_namespace,
              dataCenter, m_remoteMessages.get(), m_configCache.get());
      // ....
      try {
        // 发起远程http调用
        HttpResponse<ApolloConfig> response = m_httpUtil.doGet(request, ApolloConfig.class);
        m_configNeedForceRefresh.set(false);
        m_loadConfigFailSchedulePolicy.success();
        // ....
        if (response.getStatusCode() == 304) {
          logger.debug("Config server responds with 304 HTTP status code.");
          return m_configCache.get(); // 从缓存取,认为没有发生变更
        }

        ApolloConfig result = response.getBody();

        logger.debug("Loaded config for {}: {}", m_namespace, result);

        return result;
      } catch (ApolloConfigStatusCodeException ex) {
        // ....
      } catch (Throwable ex) {
         // ....
      } finally {
        // ....
      }

      // if force refresh, do normal sleep, if normal config load, do exponential sleep 计算发生错误时睡眠的时间
      onErrorSleepTime = m_configNeedForceRefresh.get() ? m_configUtil.getOnErrorRetryInterval() :
          m_loadConfigFailSchedulePolicy.fail();
    }
  }
  // ....
}

定时任务如下:

private void schedulePeriodicRefresh() {
  logger.debug("Schedule periodic refresh with interval: {} {}",
      m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit());
  m_executorService.scheduleAtFixedRate(
      new Runnable() {
        @Override
        public void run() {
          Tracer.logEvent("Apollo.ConfigService", String.format("periodicRefresh: %s", m_namespace));
          logger.debug("refresh config for namespace: {}", m_namespace);
          trySync();
          Tracer.logEvent("Apollo.Client.Version", Apollo.VERSION);
        }
      }, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(), // 关注定时时间节点
      m_configUtil.getRefreshIntervalTimeUnit());
}
// 默认每5min执行一次

长连接推的设计如下:

// 客户端实现
private void startLongPolling() {
  // ....
  try {
    // ...
    final long longPollingInitialDelayInMills = m_configUtil.getLongPollingInitialDelayInMills();
    m_longPollingService.submit(new Runnable() {
      @Override
      public void run() {
        if (longPollingInitialDelayInMills > 0) {
          try {
            logger.debug("Long polling will start in {} ms.", longPollingInitialDelayInMills);
            TimeUnit.MILLISECONDS.sleep(longPollingInitialDelayInMills); // 默认是2000
          } catch (InterruptedException e) {
            //ignore
          }
        }
        doLongPollingRefresh(appId, cluster, dataCenter);
      }
    });
  } catch (Throwable ex) {
    // ....
  }
}

// 服务端设计时采用Spring的DeferredResult组件
// 调用服务端生成一个阻塞的DeferredResultWrapper,超时时间默认60s
// 另外一个线程不断的轮询数据库,当有更多的ReleaseMessage时,触发监听器ReleaseMessageListener,相关逻辑都在NotificationControllerV2类中

public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> pollNotification(
    @RequestParam(value = "appId") String appId,
    @RequestParam(value = "cluster") String cluster,
    @RequestParam(value = "notifications") String notificationsAsString,
    @RequestParam(value = "dataCenter", required = false) String dataCenter,
    @RequestParam(value = "ip", required = false) String clientIp) {
  List<ApolloConfigNotification> notifications = null;

  // ...
  DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper(bizConfig.longPollingTimeoutInMilli()); // 6000
  // ....
  return deferredResultWrapper.getResult();
}

/**ReleaseMessageScanner
 * scan messages and send
 *
 * @return whether there are more messages
 */
private boolean scanAndSendMessages() {
  //current batch is 500
  List<ReleaseMessage> releaseMessages =
      releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);
  if (CollectionUtils.isEmpty(releaseMessages)) {
    return false;
  }
  fireMessageScanned(releaseMessages); // 触发NotificationControllerV2#handleMessage
  int messageScanned = releaseMessages.size();
  maxIdScanned = releaseMessages.get(messageScanned - 1).getId();
  return messageScanned == 500;
}

// NotificationControllerV2
public void handleMessage(ReleaseMessage message, String channel) {
  logger.info("message received - channel: {}, message: {}", channel, message);

  String content = message.getMessage();
  // ....

  if (!deferredResults.containsKey(content)) {
    return;
  }

  //create a new list to avoid ConcurrentModificationException
  List<DeferredResultWrapper> results = Lists.newArrayList(deferredResults.get(content));

  ApolloConfigNotification configNotification = new ApolloConfigNotification(changedNamespace, message.getId());
  configNotification.addMessage(content, message.getId());

  //do async notification if too many clients 客户端请求较多时,异步处理
  if (results.size() > bizConfig.releaseMessageNotificationBatch()) {
    largeNotificationBatchExecutorService.submit(() -> {
      logger.debug("Async notify {} clients for key {} with batch {}", results.size(), content,
          bizConfig.releaseMessageNotificationBatch());
      for (int i = 0; i < results.size(); i++) {
        if (i > 0 && i % bizConfig.releaseMessageNotificationBatch() == 0) {
          try {
            TimeUnit.MILLISECONDS.sleep(bizConfig.releaseMessageNotificationBatchIntervalInMilli());
          } catch (InterruptedException e) {
            //ignore
          }
        }
        logger.debug("Async notify {}", results.get(i));
        results.get(i).setResult(configNotification);
      }
    });
    return;
  }

  logger.debug("Notify {} clients for key {}", results.size(), content);

  for (DeferredResultWrapper result : results) {
    result.setResult(configNotification); // 设置通知结果
  }
  logger.debug("Notification completed");
}

PS:所谓的推拉结合,其实都是拉,只是通过长连接做到准实时的拉,接近推的时效性。

加入了Apollo后的配置文件加载顺序如下:

Apollo如何实现动态更新

RemoteConfigRepository#sync()
RemoteConfigRepository#fireRepositoryChange()
DedaultConfig#onRepositoryChange()
AutoUpdateConfigChangeListener#onChange()

AutoUpdateConfigChangeListener如何实现ConfigChangeListener.onChange()方法,当ApolloConfig事件触发时:

@Override
public void onChange(ConfigChangeEvent changeEvent) {
  Set<String> keys = changeEvent.changedKeys();
  if (CollectionUtils.isEmpty(keys)) {
    return;
  }
  for (String key : keys) {
    // 1. check whether the changed key is relevant
    // springValueRegistry 注册表
    Collection<SpringValue> targetValues = springValueRegistry.get(beanFactory, key);
    if (targetValues == null || targetValues.isEmpty()) {
      continue;
    }

    // 2. check whether the value is really changed or not (since spring property sources have hierarchies)
    if (!shouldTriggerAutoUpdate(changeEvent, key)) {
      continue;
    }

    // 3. update the value
    for (SpringValue val : targetValues) {
      updateSpringValue(val); // 更新值 SpringValue哪里来的?
    }
  }
}

SpringValueProcessor类在启动时将会注册到Spring容器中,在对象被实例化的过程中作为钩子函数被调用,包括创建对象的工厂,创建对象初始化前等,核心钩子函数接口包括BeanPostProcessorBeanFactoryPostProcessorBeanFactoryAware

@Override
protected void processField(Object bean, String beanName, Field field) {
  // register @Value on field
  Value value = field.getAnnotation(Value.class); // 注解了@Value
  if (value == null) {
    return;
  }
  Set<String> keys = placeholderHelper.extractPlaceholderKeys(value.value()); // 对注解上的key进行处理

  if (keys.isEmpty()) {
    return;
  }

  for (String key : keys) {
    // 将对象信息放到springValueRegistry注册表中
    SpringValue springValue = new SpringValue(key, value.value(), bean, beanName, field, false);
    springValueRegistry.register(beanFactory, key, springValue);
    logger.debug("Monitoring {}", springValue);
  }
}

回头再看updateSpringValue()方法,可谓是一目了然

public void update(Object newVal) throws IllegalAccessException, InvocationTargetException {
  if (isField()) {
    injectField(newVal);
  } else {
    injectMethod(newVal);
  }
}

private void injectField(Object newVal) throws IllegalAccessException {
  Object bean = beanRef.get();
  if (bean == null) {
    return;
  }
  boolean accessible = field.isAccessible();
  field.setAccessible(true);
  field.set(bean, newVal);// 设置值
  field.setAccessible(accessible);
}

private void injectMethod(Object newVal)
    throws InvocationTargetException, IllegalAccessException {
  Object bean = beanRef.get();
  if (bean == null) {
    return;
  }
  methodParameter.getMethod().invoke(bean, newVal); // 调用方法进行设置
}

小结:通过Spring容器自身提供的钩子函数,将包含@Value的对象加入到本地的注册表中,当长连接(定时)发现远程变更后进行新数据的拉取,拉取成功后对ApolloConfig进行缓存并触发监听器ConfigChangeListener接口,从注册表中找到已经注册进去的包含@Value的对象,并对其内的值或者方法进行更新或调用,以此完成Apollo的动态更新。

举报

相关推荐

0 条评论