0
点赞
收藏
分享

微信扫一扫

dremio datastore简单说明

承蒙不弃 2023-01-08 阅读 199

datastore 实际上是进行数据存储的实现(主要是配置以及元数据相关的)不少服务都使用到了此功能(namespace,catalog,user,job )
实际上dremio 官方对于dremio 的部署(软件版,尤其是是社区版)有比较明确的说明,需要zk 以及本地存储(或者nas)对于协调节点
的元数据进行存储,同时如果需要实现ha 模式,可以支持对于协调节点配置主从抢占模式,从节点可以进行数据查询处理,但是不进行
元数据操作(比如比较典型的反射处理,元数据刷新。。。。),通过基本的阅读datastore

datastore 提供的能力

  • key,value 存储
  • key,value 序列化,反序列化处理,当然部分上就有了protobuf
  • 索引能力提供,这部分实际上应该是加速key,value 检索的,实现上基于了lucene,同时自己包装了,实际上我们实际使用的kv存储,基本都会包含存储以及索引
  • rpc 能力,提供通用rpc 包装,方便不通client 使用DatastoreRpcService,DatastoreRpcClient,rpc 整体是基于FabricProtocol 协议的,dremio 通用的rpc 处理

参加实现图

从下图可以看出面向不同场景实现的还是不少的

dremio  datastore简单说明_java

 

 

KVStoreProviderHelper 对于kv 存储的初始化

KVStoreProviderHelper 包含了不少判断,基于系统配置 ,以及基于动态类创建机制,在DACDaemonModule 模块中

 

private static KVStoreProvider internalKVStoreProvider(DACConfig dacConfig,

BootStrapContext bootstrap,

Provider<FabricService> fabricService,

Provider<NodeEndpoint> endPoint) {

DremioConfig dremioConfig = dacConfig.getConfig();

Map<String, Object> config = new HashMap<>();

String thisNode = dremioConfig.getThisNode();

 

// instantiate NoopKVStoreProvider on all non-coordinator nodes.

boolean isCoordinator = dremioConfig.getBoolean(DremioConfig.ENABLE_COORDINATOR_BOOL);

if (!isCoordinator) {

return new NoopKVStoreProvider(bootstrap.getClasspathScan(), fabricService, endPoint, bootstrap.getAllocator(), config);

}

 

// Configure the default KVStore , 通过配置约定

String datastoreType = System.getProperty(KVSTORE_TYPE_PROPERTY_NAME, DEFAULT_DB);

config.put(DremioConfig.DEBUG_USE_MEMORY_STRORAGE_BOOL, dacConfig.inMemoryStorage);

config.put(LocalKVStoreProvider.CONFIG_DISABLEOCC, "false");

config.put(LocalKVStoreProvider.CONFIG_VALIDATEOCC, "true");

config.put(LocalKVStoreProvider.CONFIG_TIMED, "true");

config.put(LocalKVStoreProvider.CONFIG_BASEDIRECTORY, dremioConfig.getString(DremioConfig.DB_PATH_STRING));

config.put(LocalKVStoreProvider.CONFIG_HOSTNAME, System.getProperty(KVSTORE_HOSTNAME_PROPERTY_NAME, thisNode));

config.put(RemoteKVStoreProvider.HOSTNAME, thisNode);

config.put(DremioConfig.REMOTE_DATASTORE_RPC_TIMEOUT_SECS, dremioConfig.getLong(DremioConfig.REMOTE_DATASTORE_RPC_TIMEOUT_SECS));

 

// find the appropriate KVStoreProvider from path

// first check for the default behavior (if services.datastore.type is set to "default")

// if services.datastore.type is set, check ClassPath for associated KVStoreProvider type

Class<? extends KVStoreProvider> cls = null;

switch (datastoreType) {

// 默认因为就没有配置KVSTORE_TYPE_PROPERTY_NAME 的定义,所以就只能是本地LocalKVStoreProvider 的

case DEFAULT_DB:

config.put(LocalKVStoreProvider.CONFIG_HOSTNAME, thisNode);

// fall through to TEST_CLUSTER_DB

// 从命名上主要是测试使用的

case TEST_CLUSTER_DB:

boolean isMasterless = dremioConfig.isMasterlessEnabled();

boolean isMaster = (!isMasterless && dremioConfig.getBoolean(DremioConfig.ENABLE_MASTER_BOOL));

boolean needsLocalKVStore = (isMasterless && thisNode.equals(config.get(LocalKVStoreProvider.CONFIG_HOSTNAME)));

cls = (isMaster || needsLocalKVStore)? LocalKVStoreProvider.class : RemoteKVStoreProvider.class;

break;

 

default:

// 基于ClassPathScanner 查找实现,所以我们可以自己扩展

final ScanResult results = ClassPathScanner.fromPrescan(dremioConfig.getSabotConfig());

final Set<Class<? extends KVStoreProvider>> classes = results.getImplementations(KVStoreProvider.class);

for (Class<? extends KVStoreProvider> it : classes) {

try {

KVStoreProviderType anno = it.getAnnotation(KVStoreProviderType.class);

if (anno != null && anno.type().equals(datastoreType)) {

cls = it;

break;

}

} catch (Exception e) {

logger.info(String.format("Unable to find KVStoreProviderType annotation in %s during search, skipping", cls.getName()));

continue;

}

}

break;

}

 

// not able to find a KVStoreProvider for the requested services.datastore.type

if (cls == null) {

throw new RuntimeException("Unable to find appropriate KVStoreProvider for " + datastoreType);

}

 

try {

// 动态创建KVStoreProvider 的定义

final Constructor<? extends KVStoreProvider> con = cls.getDeclaredConstructor(

ScanResult.class,

Provider.class,

Provider.class,

BufferAllocator.class,

Map.class

);

 

return con.newInstance(bootstrap.getClasspathScan(),

fabricService,

endPoint,

bootstrap.getAllocator(),

config

);

} catch (Exception e) {

throw new RuntimeException(e);

}

}

历史遗留问题

从官方代码上可以看出,官方包含了一个遗留实现,具体通过的了一个adapter 进行适配解决
DACDaemonModule 参考

 

registry.bind(

LegacyKVStoreProvider.class,

new LegacyKVStoreProviderAdapter(

registry.provider(KVStoreProvider.class).get()) // i此处就使用到了上边的初始化定义了

);

说明

以上只是一个简单的说明,具体会通过namespaceservice 以及其他模块进行集成说明,关于datastore服务的一些配置实际上官方文档并没有介绍
但是通过学习源码,我们可以了解一些内部处理,方便自己扩展以及研究

参考资料

dac/backend/src/main/java/com/dremio/dac/daemon/KVStoreProviderHelper.java
services/datastore/src/main/java/com/dremio/datastore/RemoteKVStoreProvider.java
services/datastore/src/main/java/com/dremio/datastore/DatastoreRpcClient.java
services/datastore/src/main/java/com/dremio/datastore/DatastoreRpcService.java

举报

相关推荐

0 条评论