最近在使用Apollp
这个框架时出现了一些问题,所以抽空翻了一下该框架的源码,并且将翻后的结果稍作整理,如果恰巧您也使用,希望这篇粗浅的架构分析能对您有所帮助。
本文行文将会分为三个部分:
- 首先分析
Apollo
整个框架内的组件设计,并谈及组件间协作。 - 其次会基于
SpringBoot
分析本地配置文件的加载流程。 - 再则分析
SpringBoot
与Apollo
之间如何协作来完成整个应用配置的加载。 - 最后探秘
Apollo
的动态更新
前置条件
-
Apollo
的github
:https://github.com/ctripcorp/apollo -
SpringBoot-1.5.x
版本 - 应用集成
Apollo
组件版本1.2.0
Apollo整体框架分析
以上架构图来自Apollo
官方作者提供。这个架构图第一眼不是很好理解,我在官方Github
上找到一篇架构分析的文章,其中对Apollo
架构,包括如何演进做了重点分析。不管是框架还是协议,乃至任何一门技术,架构设计的演进过程对深入学习框架有至关重要的作用,原文传送门:https://mp.weixin.qq.com/s/-hUaQPzfsl9Lm3IqQW3VDQ
-
NginxLB
对应原来架构图中的(Sofware) Load Balancer
,其实可以选择性过滤掉这个组件,因为它只是作为一个负载均衡存在,如果架构图上的服务发现全部指向Meta Server
更容易理解。 -
Meta Server
在实现上是CofigServer
组件中的一部分,只是作为Eureka
对外的一个Proxy
。 -
Eureka
作为整个架构核心的注册中心。 -
Portal
是一个接触较多的组件,即整个配置入口,直连Portal DB
,以及对Admin Service
发起调用,IP则来自于Meta Server
。 -
Client
作为日常开始时业务应用依赖的包,配置一般包括apollo.app.id
、apollo.meta
以及apollo.bootstrap.enabled
。通过Meta Server
获取到ConfigServer
的IP信息,并进行服务调用。 -
ConfigServer
在真实的实现上其实包括ConfigServer
、Eureka
、Meta Server
三者,所以这里的ConfigServer
与项目包中的config server
部署并不同,更恰当的理解是Config Controller
接口 -
AdminServer
为Portal
项目提供对config db
的操作接口
我倾向于认为主要还是因为Portal
与Client
没有直接注册到注册中心,如果是配置的域名,那么就会走域名解析到进入Nginx
层做负载均衡的路程。如果配置的IP地址,那就直接定向某一个Meta Server
副本,所以一般会选择配置域名
- 不同语言
- 不同的架构体系
- 考虑到
Client
的宿主环境已经有了自己的Eureka
- ...
Eureka
作为Spring Cloud
体系中核心的一环,对于非该体系下的语言或者服务想要集成进来,都小有成本。
Github
官网做了详细的解释,传送门:https://ctripcorp.github.io/apollo/#/zh/design/apollo-design
往下会有较多的代码分析,写的比较乱,比较耗费心神
如果不想往下看,推荐:https://ctripcorp.github.io/apollo/#/zh/design/apollo-design
SpringBoot 如何加载配置文件
由于代码流程极多,如下展示了相关调用的入口,以及整个流程所经过的核心类,核心方法等
SpringApplication#run()
SpringApplication#prepareEnvironment(listeners, applicationArguments);
SpringApplicationRunListeners#environmentPrepared(environment);
SpringApplicationRunListeners#environmentPrepared(environment);
SimpleApplicationEventMulticaster#multicastEvent()
SimpleApplicationEventMulticaster#invokeListener(listener, event);
SimpleApplicationEventMulticaster#doInvokeListener()
ConfigFileApplicationListener#onApplicationEvent()
ConfigFileApplicationListener#postProcessEnvironment()
ConfigFileApplicationListener#addPropertySources()
// ApolloApplicationContextInitializer#addPropertySources()
ConfigFileApplicationListener#load()
ConfigFileApplicationListener#doLoadIntoGroup()
PropertySourcesLoader#load()
YamlPropertySourceLoader#load()
ConfigFileApplicationListener#addConfigurationProperties()
其中最需要关注的类是ConfigFileApplicationListener
private Set<String> getSearchNames() {
// 取到spring.config.name 配置的 bootstrap
if (this.environment.containsProperty(CONFIG_NAME_PROPERTY)) {
return asResolvedSet(this.environment.getProperty(CONFIG_NAME_PROPERTY),
null);
}
// 默认application
return asResolvedSet(ConfigFileApplicationListener.this.names, DEFAULT_NAMES);
}
private Set<String> getSearchLocations() {
Set<String> locations = new LinkedHashSet<String>();
// User-configured settings take precedence, so we do them first
if (this.environment.containsProperty(CONFIG_LOCATION_PROPERTY)) {
for (String path : asResolvedSet(
this.environment.getProperty(CONFIG_LOCATION_PROPERTY), null)) {
if (!path.contains("$")) {
path = StringUtils.cleanPath(path);
if (!ResourceUtils.isUrl(path)) {
path = ResourceUtils.FILE_URL_PREFIX + path;
}
}
locations.add(path);
}
}
// "classpath:/,classpath:/config/,file:./,file:./config/" 加载文件的目录
// 排序后变成 file:./,file:./config/,classpath:/,classpath:/config/
locations.addAll(
asResolvedSet(ConfigFileApplicationListener.this.searchLocations,
DEFAULT_SEARCH_LOCATIONS));
return locations;
}
private void load(String location, String name, Profile profile) {
// ....
else {
// Search for a file with the given name
for (String ext : this.propertiesLoader.getAllFileExtensions()) {
// properties、yaml两种处理方式
}
}
//....
}
public void load() {
// ....
// Pre-existing active profiles set via Environment.setActiveProfiles()
// are additional profiles and config files are allowed to add more if
// they want to, so don't call addActiveProfiles() here.
Set<Profile> initialActiveProfiles = initializeActiveProfiles();
this.profiles.addAll(getUnprocessedActiveProfiles(initialActiveProfiles));
if (this.profiles.isEmpty()) {
for (String defaultProfileName : this.environment.getDefaultProfiles()) {
Profile defaultProfile = new Profile(defaultProfileName, true);
if (!this.profiles.contains(defaultProfile)) {
this.profiles.add(defaultProfile);
}
}
}
// The default profile for these purposes is represented as null. We add it
// last so that it is first out of the queue (active profiles will then
// override any settings in the defaults when the list is reversed later).
this.profiles.add(null);
// .... 取到配置项为local, null,下文去profile时用poll,则先null后local,因为指定了spring.profile.active=local
}
while (!this.profiles.isEmpty()) {
Profile profile = this.profiles.poll(); // local null, pop会先null后local
for (String location : getSearchLocations()) { // 获取路径前缀
if (!location.endsWith("/")) {
// location is a filename already, so don't search for more
// filenames
load(location, null, profile);
}
else {
for (String name : getSearchNames()) { // 获取bootstarp, application
load(location, name, profile);
}
}
}
this.processedProfiles.add(profile);
}
如上所示,加载的流程大致为:
file:./bootstrao.properties
file:./bootstrao.yaml
file:./config/bootstrao.properties
file:./config/bootstrao.yaml
classpath:/bootstrao.properties
classpath:/bootstrao.yaml
classpath:/config/bootstrao.properties
classpath:/config/bootstrao.yaml
file:./bootstrao-local.properties
file:./bootstrao-local.yaml
file:./config/bootstrao-local.properties
file:./config/bootstrao-local.yaml
classpath:/bootstrao-local.properties
classpath:/bootstrao-local.yaml
classpath:/config/bootstrao-local.properties
classpath:/config/bootstrao-local.yaml
一般项目文件加载顺序如下:
Apollo加载
ApolloApplicationContextInitializer
就是入口,其余ConfigFileApplicationListener
相似,均实现了EnvironmentPostProcessor
接口
/**
* Initialize Apollo Configurations Just after environment is ready.
*
* @param environment
*/
protected void initialize(ConfigurableEnvironment environment) {
if (environment.getPropertySources().contains(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME)) {
//already initialized
return;
}
// 命名空间namespace,参考:https://ctripcorp.github.io/apollo/#/zh/design/apollo-core-concept-namespace?id=_544-changelistener
String namespaces = environment.getProperty(PropertySourcesConstants.APOLLO_BOOTSTRAP_NAMESPACES, ConfigConsts.NAMESPACE_APPLICATION);
logger.debug("Apollo bootstrap namespaces: {}", namespaces);
// 默认application,也可以配合多个,通过多个,来隔离
List<String> namespaceList = NAMESPACE_SPLITTER.splitToList(namespaces);
CompositePropertySource composite = new CompositePropertySource(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME);
for (String namespace : namespaceList) {
// 获取config配置信息
Config config = ConfigService.getConfig(namespace);
composite.addPropertySource(configPropertySourceFactory.getConfigPropertySource(namespace, config));
}
// 将配置项加载到environment环境中,即spring容器上下文
environment.getPropertySources().addFirst(composite);
}
由上可知,初始化时最重要的就是ConfigServer.getConfig(namespace)
// 初始化
ConfigService#getConfig()
DefaultConfigManager#getConfig()
DefaultConfigFactory#create()
DefaultConfig#initialize()
RemoteConfigRepository#getConfig()
RemoteConfigRepository#sync()
RemoteConfigRepository#loadApolloConfig()
// 启动定时任务
RemoteConfigRepository() // 有些地方称之为推拉结合,其实本质上都是拉,一个是定时一个是长连接
this.trySync(); // 及时进行一次调用,拉取配置信息
this.schedulePeriodicRefresh(); // 定时拉取
this.scheduleLongPollingRefresh(); // 长连接拉取
拉取配置信息如下:
@Override
protected synchronized void sync() {
// ....
try {
ApolloConfig previous = m_configCache.get();
ApolloConfig current = loadApolloConfig(); // 远程加载
//reference equals means HTTP 304
if (previous != current) {
logger.debug("Remote Config refreshed!");
m_configCache.set(current);
this.fireRepositoryChange(m_namespace, this.getConfig()); // 触发监听器,对bean进行赋值
}
// ....
}
private ApolloConfig loadApolloConfig() {
// ...
List<ServiceDTO> configServices = getConfigServices(); // 获取ConfigServer的信息(从meta server获取)
String url = null;
for (int i = 0; i < maxRetries; i++) { // 重试
List<ServiceDTO> randomConfigServices = Lists.newLinkedList(configServices);
Collections.shuffle(randomConfigServices);
//Access the server which notifies the client first
if (m_longPollServiceDto.get() != null) {
randomConfigServices.add(0, m_longPollServiceDto.getAndSet(null));
}
for (ServiceDTO configService : randomConfigServices) {
if (onErrorSleepTime > 0) {
logger.warn(
"Load config failed, will retry in {} {}. appId: {}, cluster: {}, namespaces: {}",
onErrorSleepTime, m_configUtil.getOnErrorRetryIntervalTimeUnit(), appId, cluster, m_namespace);
try {
m_configUtil.getOnErrorRetryIntervalTimeUnit().sleep(onErrorSleepTime); // 发生错误时睡眠时间
} catch (InterruptedException e) {
//ignore
}
}
// 构建访问config service的url
url = assembleQueryConfigUrl(configService.getHomepageUrl(), appId, cluster, m_namespace,
dataCenter, m_remoteMessages.get(), m_configCache.get());
// ....
try {
// 发起远程http调用
HttpResponse<ApolloConfig> response = m_httpUtil.doGet(request, ApolloConfig.class);
m_configNeedForceRefresh.set(false);
m_loadConfigFailSchedulePolicy.success();
// ....
if (response.getStatusCode() == 304) {
logger.debug("Config server responds with 304 HTTP status code.");
return m_configCache.get(); // 从缓存取,认为没有发生变更
}
ApolloConfig result = response.getBody();
logger.debug("Loaded config for {}: {}", m_namespace, result);
return result;
} catch (ApolloConfigStatusCodeException ex) {
// ....
} catch (Throwable ex) {
// ....
} finally {
// ....
}
// if force refresh, do normal sleep, if normal config load, do exponential sleep 计算发生错误时睡眠的时间
onErrorSleepTime = m_configNeedForceRefresh.get() ? m_configUtil.getOnErrorRetryInterval() :
m_loadConfigFailSchedulePolicy.fail();
}
}
// ....
}
定时任务如下:
private void schedulePeriodicRefresh() {
logger.debug("Schedule periodic refresh with interval: {} {}",
m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit());
m_executorService.scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
Tracer.logEvent("Apollo.ConfigService", String.format("periodicRefresh: %s", m_namespace));
logger.debug("refresh config for namespace: {}", m_namespace);
trySync();
Tracer.logEvent("Apollo.Client.Version", Apollo.VERSION);
}
}, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(), // 关注定时时间节点
m_configUtil.getRefreshIntervalTimeUnit());
}
// 默认每5min执行一次
长连接推的设计如下:
// 客户端实现
private void startLongPolling() {
// ....
try {
// ...
final long longPollingInitialDelayInMills = m_configUtil.getLongPollingInitialDelayInMills();
m_longPollingService.submit(new Runnable() {
@Override
public void run() {
if (longPollingInitialDelayInMills > 0) {
try {
logger.debug("Long polling will start in {} ms.", longPollingInitialDelayInMills);
TimeUnit.MILLISECONDS.sleep(longPollingInitialDelayInMills); // 默认是2000
} catch (InterruptedException e) {
//ignore
}
}
doLongPollingRefresh(appId, cluster, dataCenter);
}
});
} catch (Throwable ex) {
// ....
}
}
// 服务端设计时采用Spring的DeferredResult组件
// 调用服务端生成一个阻塞的DeferredResultWrapper,超时时间默认60s
// 另外一个线程不断的轮询数据库,当有更多的ReleaseMessage时,触发监听器ReleaseMessageListener,相关逻辑都在NotificationControllerV2类中
public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> pollNotification(
@RequestParam(value = "appId") String appId,
@RequestParam(value = "cluster") String cluster,
@RequestParam(value = "notifications") String notificationsAsString,
@RequestParam(value = "dataCenter", required = false) String dataCenter,
@RequestParam(value = "ip", required = false) String clientIp) {
List<ApolloConfigNotification> notifications = null;
// ...
DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper(bizConfig.longPollingTimeoutInMilli()); // 6000
// ....
return deferredResultWrapper.getResult();
}
/**ReleaseMessageScanner
* scan messages and send
*
* @return whether there are more messages
*/
private boolean scanAndSendMessages() {
//current batch is 500
List<ReleaseMessage> releaseMessages =
releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);
if (CollectionUtils.isEmpty(releaseMessages)) {
return false;
}
fireMessageScanned(releaseMessages); // 触发NotificationControllerV2#handleMessage
int messageScanned = releaseMessages.size();
maxIdScanned = releaseMessages.get(messageScanned - 1).getId();
return messageScanned == 500;
}
// NotificationControllerV2
public void handleMessage(ReleaseMessage message, String channel) {
logger.info("message received - channel: {}, message: {}", channel, message);
String content = message.getMessage();
// ....
if (!deferredResults.containsKey(content)) {
return;
}
//create a new list to avoid ConcurrentModificationException
List<DeferredResultWrapper> results = Lists.newArrayList(deferredResults.get(content));
ApolloConfigNotification configNotification = new ApolloConfigNotification(changedNamespace, message.getId());
configNotification.addMessage(content, message.getId());
//do async notification if too many clients 客户端请求较多时,异步处理
if (results.size() > bizConfig.releaseMessageNotificationBatch()) {
largeNotificationBatchExecutorService.submit(() -> {
logger.debug("Async notify {} clients for key {} with batch {}", results.size(), content,
bizConfig.releaseMessageNotificationBatch());
for (int i = 0; i < results.size(); i++) {
if (i > 0 && i % bizConfig.releaseMessageNotificationBatch() == 0) {
try {
TimeUnit.MILLISECONDS.sleep(bizConfig.releaseMessageNotificationBatchIntervalInMilli());
} catch (InterruptedException e) {
//ignore
}
}
logger.debug("Async notify {}", results.get(i));
results.get(i).setResult(configNotification);
}
});
return;
}
logger.debug("Notify {} clients for key {}", results.size(), content);
for (DeferredResultWrapper result : results) {
result.setResult(configNotification); // 设置通知结果
}
logger.debug("Notification completed");
}
PS:所谓的推拉结合,其实都是拉,只是通过长连接做到准实时的拉,接近推的时效性。
加入了Apollo
后的配置文件加载顺序如下:
Apollo如何实现动态更新
RemoteConfigRepository#sync()
RemoteConfigRepository#fireRepositoryChange()
DedaultConfig#onRepositoryChange()
AutoUpdateConfigChangeListener#onChange()
看AutoUpdateConfigChangeListener
如何实现ConfigChangeListener.onChange()
方法,当ApolloConfig
事件触发时:
@Override
public void onChange(ConfigChangeEvent changeEvent) {
Set<String> keys = changeEvent.changedKeys();
if (CollectionUtils.isEmpty(keys)) {
return;
}
for (String key : keys) {
// 1. check whether the changed key is relevant
// springValueRegistry 注册表
Collection<SpringValue> targetValues = springValueRegistry.get(beanFactory, key);
if (targetValues == null || targetValues.isEmpty()) {
continue;
}
// 2. check whether the value is really changed or not (since spring property sources have hierarchies)
if (!shouldTriggerAutoUpdate(changeEvent, key)) {
continue;
}
// 3. update the value
for (SpringValue val : targetValues) {
updateSpringValue(val); // 更新值 SpringValue哪里来的?
}
}
}
SpringValueProcessor
类在启动时将会注册到Spring
容器中,在对象被实例化的过程中作为钩子函数被调用,包括创建对象的工厂,创建对象初始化前等,核心钩子函数接口包括BeanPostProcessor
、BeanFactoryPostProcessor
、BeanFactoryAware
@Override
protected void processField(Object bean, String beanName, Field field) {
// register @Value on field
Value value = field.getAnnotation(Value.class); // 注解了@Value
if (value == null) {
return;
}
Set<String> keys = placeholderHelper.extractPlaceholderKeys(value.value()); // 对注解上的key进行处理
if (keys.isEmpty()) {
return;
}
for (String key : keys) {
// 将对象信息放到springValueRegistry注册表中
SpringValue springValue = new SpringValue(key, value.value(), bean, beanName, field, false);
springValueRegistry.register(beanFactory, key, springValue);
logger.debug("Monitoring {}", springValue);
}
}
回头再看updateSpringValue()
方法,可谓是一目了然
public void update(Object newVal) throws IllegalAccessException, InvocationTargetException {
if (isField()) {
injectField(newVal);
} else {
injectMethod(newVal);
}
}
private void injectField(Object newVal) throws IllegalAccessException {
Object bean = beanRef.get();
if (bean == null) {
return;
}
boolean accessible = field.isAccessible();
field.setAccessible(true);
field.set(bean, newVal);// 设置值
field.setAccessible(accessible);
}
private void injectMethod(Object newVal)
throws InvocationTargetException, IllegalAccessException {
Object bean = beanRef.get();
if (bean == null) {
return;
}
methodParameter.getMethod().invoke(bean, newVal); // 调用方法进行设置
}
小结:通过Spring
容器自身提供的钩子函数,将包含@Value
的对象加入到本地的注册表中,当长连接(定时)
发现远程变更后进行新数据的拉取,拉取成功后对ApolloConfig
进行缓存并触发监听器ConfigChangeListener
接口,从注册表中找到已经注册进去的包含@Value
的对象,并对其内的值或者方法进行更新或调用,以此完成Apollo
的动态更新。