0
点赞
收藏
分享

微信扫一扫

dremio 的加速文件系统插件简单说明

王传学 2023-01-08 阅读 135

dremio 包含一个强大的功能就是反射加速,dremio 为了对于反射数据的存储开发了自己的文件系统插件

插件类图

dremio 的加速文件系统插件简单说明_ide

 

 

parquet 以及iceberg 的支持

从start 入口可以看出包含了不同的处理,实际上dremio 从21 版本开始就默认基于了iceberg 存储了(同时也需要分布式存储)

@Override

public void start() throws IOException {

super.start();

// 同时还需要初始化MaterializationStore,主要是反射元数据的存储

materializationStore = new MaterializationStore(DirectProvider.<LegacyKVStoreProvider>wrap(getContext().getKVStoreProvider()));

formatPlugin = (ParquetFormatPlugin) formatCreator.getFormatPluginByConfig(new ParquetFormatConfig());

icebergFormatPlugin = (IcebergFormatPlugin)formatCreator.getFormatPluginByConfig(new IcebergFormatConfig());

}

插件对于不同数据的处理(主要是getDatasetHandle)

@Override

public Optional<DatasetHandle> getDatasetHandle(EntityPath datasetPath, GetDatasetOption... options) throws ConnectorException {

// 反射数据存储有自己的格式约定,会进行判断,实际上只包含两部分

List<String> components = normalizeComponents(datasetPath.getComponents());

if (components == null) {

return Optional.empty();

}

Preconditions.checkState(components.size() == 3, "Unexpected number of components in path");

 

ReflectionId reflectionId = new ReflectionId(components.get(1));

MaterializationId materializationId = new MaterializationId(components.get(2));

Materialization materialization = getMaterialization(materializationId);

if (materialization == null) {

return Optional.empty();

}

 

FluentIterable<Refresh> refreshes = getSlices(materialization, reflectionId);

if(refreshes == null) {

return Optional.empty();

}

 

final String selectionRoot = getConfig().getPath().resolve(refreshes.first().get().getReflectionId().getId()).toString();

 

BatchSchema currentSchema = CurrentSchemaOption.getSchema(options);

FileConfig fileConfig = FileConfigOption.getFileConfig(options);

List<String> sortColumns = SortColumnsOption.getSortColumns(options);

Integer fieldCount = MaxLeafFieldCount.getCount(options);

 

boolean icebergDataset = isUsingIcebergDataset(materialization);

final FileSelection selection = getFileSelection(refreshes, selectionRoot, icebergDataset);

 

final PreviousDatasetInfo pdi = new PreviousDatasetInfo(fileConfig, currentSchema, sortColumns, null, null, true);

if (!icebergDataset) {

FileDatasetHandle.checkMaxFiles(datasetPath.getName(), selection.getFileAttributesList().size(), getContext(), getConfig().isInternal());

}

// 此方法包含了实际的处理,具体可以看源码,实际上就是对于parquet以及icegerg 数据集的处理,进行判断,使用不同的访问器

return getDatasetHandle(datasetPath, fieldCount, icebergDataset, selection, pdi);

}

参考资料

services/accelerator/src/main/java/com/dremio/service/reflection/materialization/AccelerationStoragePlugin.java
services/accelerator/src/main/java/com/dremio/service/reflection/materialization/AccelerationFileSystem.java
services/accelerator/src/main/java/com/dremio/service/reflection/materialization/AccelerationStoragePluginConfig.java
sabot/kernel/src/main/java/com/dremio/exec/store/iceberg/IcebergExecutionDatasetAccessor.java
sabot/kernel/src/main/java/com/dremio/exec/store/parquet/ParquetFormatDatasetAccessor.java

举报

相关推荐

0 条评论