0
点赞
收藏
分享

微信扫一扫

dremio SourceCatalog 服务说明

伢赞 2023-01-08 阅读 52

SourceCatalog 主要进行source 的管理,包含了获取信息,创建,更新,删除,包含了不同的实现

SourceCatalog 服务定义

/**

* Interface to perform actions on sources.

*/

// PrivilegeCatalog 可以实现权限处理,具体实际上是SqlGrant 提供的能力

public interface SourceCatalog extends PrivilegeCatalog {

 

SourceState refreshSourceStatus(NamespaceKey key) throws Exception;

 

/**

* Get a source based on the provided name. If the source doesn't exist, synchronize with the

* KVStore to confirm creation status.

*

* @param name

* @return A StoragePlugin casted to the expected output.

*/

<T extends StoragePlugin> T getSource(String name);

 

/**

* Create a source based on the provided configuration. Includes both the creation as well the

* startup of the source. If the source fails to start, no creation will be done. The provided

* configuration should have a null version. Failure to create or failure to start with throw an

* exception. Additionally, if "store.plugin.check_state" is enabled, a plugin that starts but

* then reveals a bad state, will also result in exception.

*

* @param config Configuration for the source.

* @param attributes Optional namespace attributes to pass to namespace entity creation

*/

void createSource(SourceConfig config, NamespaceAttribute... attributes);

 

/**

* Update an existing source with the given config. The config version must be the same as the

* currently active source. If it isn't, this call will fail with an exception.

*

* @param config Configuration for the source.

* @param attributes Optional namespace attributes to pass to namespace entity creation

*/

void updateSource(SourceConfig config, NamespaceAttribute... attributes);

 

/**

* Delete a source with the provided config. If the source doesn't exist or the config doesn't

* match, the method with throw an exception.

*

* @param config

*/

void deleteSource(SourceConfig config);

}

实现子类

参考下图,还是比较复杂的,实际实现是CatalogImpl,内部会调用CatalogServiceImpl

dremio SourceCatalog 服务说明_java

 

 

创建source创建处理

参考处理

private void createSource(SourceConfig config, CatalogIdentity subject, NamespaceAttribute... attributes) {

boolean afterUnknownEx = false;

 

try(final AutoCloseable sourceDistributedLock = getDistributedLock(config.getName())) {

logger.debug("Obtained distributed lock for source {}", "-source-"+config.getName());

setInfluxSource(config.getName());

// 通过 PluginsManager 插件管理进行实际source 的创建

getPlugins().create(config, subject.getName(), attributes);

communicateChange(config, RpcType.REQ_SOURCE_CONFIG);

} catch (SourceAlreadyExistsException e) {

throw UserException.concurrentModificationError(e).message("Source already exists with name %s.", config.getName()).buildSilently();

} catch (ConcurrentModificationException ex) {

throw ex;

} catch (UserException ue) {

// If it's a UserException, message is probably helpful, so rethrow

throw ue;

} catch (IllegalArgumentException e) {

throw e;

} catch (Exception ex) {

afterUnknownEx = true;

logger.error("Exception encountered: {}", ex.getMessage(), ex);

throw UserException.validationError(ex).message("Failed to create source with name %s.", config.getName()).buildSilently();

} finally {

logger.debug("Releasing distributed lock for source {}", "-source-"+config.getName());

removeInfluxSource(config.getName(), afterUnknownEx);

}

}

PluginsManager start 部分会对于注册插件的获取基于了NamespaceService.Factory

ImmutableMap.Builder<String, CompletableFuture<SourceState>> futuresBuilder = ImmutableMap.builder();

// 此处获取已经注册的插件,转化为dremio 托管的存储插件

for (SourceConfig source : datasetListing.getSources(SystemUser.SYSTEM_USERNAME)) {

ManagedStoragePlugin plugin = newPlugin(source);

 

futuresBuilder.put(source.getName(), plugin.startAsync());

plugins.put(c(source.getName()), plugin);

}

ConnectionReader 进行插件信息获取

ConnectionReaderImpl 中,方便页面加载
参考处理

 

protected static Collection<Class<? extends ConnectionConf<?, ?>>> getCandidateSources(ScanResult scanResult) {

ImmutableList.Builder<Class<? extends ConnectionConf<?, ?>>> candidates = new ImmutableList.Builder<>();

for(Class<?> input : scanResult.getAnnotatedClasses(SourceType.class)) {

try {

if (Modifier.isAbstract(input.getModifiers())

|| Modifier.isInterface(input.getModifiers())

|| !ConnectionConf.class.isAssignableFrom(input)) {

logger.warn("Failure trying to recognize SourceConf for {}. Expected a concrete implementation of SourceConf.", input.getName());

continue;

}

} catch (Exception e) {

logger.warn("Failure trying to recognize SourceConf for {}", input.getName(), e);

continue;

}

// Check done just above

candidates.add((Class<? extends ConnectionConf<?, ?>>) input);

}

return candidates.build();

}

说明

source 类型,主要面向的是可以界面可见的存储扩展处理,实际上dremio 还包含了不少内置的存储扩展,系统存储扩展(比如加速,home)
同时通过阅读我们发现官方还是提供了一个PrivilegeCatalog 支持权限能力的扩展的,自己扩展下就可以实现一些企业版的特性了

参考资料

sabot/kernel/src/main/java/com/dremio/exec/catalog/SourceCatalog.java
sabot/kernel/src/main/java/com/dremio/exec/planner/sql/parser/SqlGrant.java
dac/backend/src/main/java/com/dremio/dac/api/SourceResource.java
sabot/kernel/src/main/java/com/dremio/exec/catalog/ConnectionReader.java
sabot/kernel/src/main/java/com/dremio/exec/catalog/ConnectionReaderImpl.java

举报

相关推荐

0 条评论