0
点赞
收藏
分享

微信扫一扫

dremio FormatMatcher 简单说明

云竹文斋 2023-03-09 阅读 39

FormatMatcher 核心是对于文件系统进行进行格式匹配,方便查询以及执行引擎了解具体支持的数据格式,进行实际数据的处理

每个FormatPlugin 都需要包含一个格式化匹配器


参考类图

dremio FormatMatcher 简单说明_ide

IcebergFormatMatcher 参考处理

参考iceberg table 格式

dremio FormatMatcher 简单说明_json_02

参考代码

/**

iceberg 特征判断内容

* Matcher for iceberg format. We expect :

*

* a. directory with name "metadata",

* (and)

* b. file with pattern v\d*.metadata.json in (a)

* (and)

* c. file with name "version-hint.text" in (a)

*

*/

public class IcebergFormatMatcher extends FormatMatcher {

private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IcebergFormatMatcher.class);

public static final String METADATA_DIR_NAME = "metadata";

private static final Pattern METADATA_FILE_PATTERN = Pattern.compile("v\\\\d*\\.metadata\\.json$");

private static final String VERSION_HINT_FILE_NAME = "version-hint.text";

private final FormatPlugin plugin;


public IcebergFormatMatcher(FormatPlugin plugin) {

this.plugin = plugin;

}


@Override

public FormatPlugin getFormatPlugin() {

return this.plugin;

}


@Override

public boolean matches(FileSystem fs, FileSelection fileSelection, CompressionCodecFactory codecFactory) throws IOException {

return isIcebergTable(fs, fileSelection.getSelectionRoot());

}

// 提供的工具类,方便格式化插件使用

public boolean isFileSystemSupportedIcebergTable(FileSystem fs, String tableRootPath) throws IOException {

if (!isIcebergTable(fs, tableRootPath)) {

return false;

}


Path rootDir = Path.of(tableRootPath);

Path metaDir = rootDir.resolve(METADATA_DIR_NAME);

Path versionHintPath = metaDir.resolve(VERSION_HINT_FILE_NAME);

if (!fs.exists(versionHintPath) || !fs.isFile(versionHintPath)) {

return false;

}


for (FileAttributes file : fs.list(metaDir)) {

if (METADATA_FILE_PATTERN.matcher(file.getPath().getName()).matches()) {

return true;

}

}

return false;

}

// 简单格式判断,此处只进行了目录以及明明的判断,实际上isFileSystemSupportedIcebergTable 才是一个比较完整的判定

private boolean isIcebergTable(FileSystem fs, String tableRootPath) throws IOException {

Path rootDir = Path.of(tableRootPath);

Path metaDir = rootDir.resolve(METADATA_DIR_NAME);

return fs.isDirectory(rootDir) && fs.exists(metaDir) && fs.isDirectory(metaDir);

}

}

实际使用

当前实际使用FormatMatcher 的地方包含了格式化插件的创建管理FormatCreator(主要是一个工具类)以及FileSystemPlugin(利用了FormatCreator)

FormatCreator 主要是进行格式化插件的创建,


FileSystemPlugin的使用

@Override

public void start() throws IOException {

List<Property> properties = getProperties();

if (properties != null) {

for (Property prop : properties) {

fsConf.set(prop.name, prop.value);

}

}


if (!Strings.isNullOrEmpty(config.getConnection())) {

org.apache.hadoop.fs.FileSystem.setDefaultUri(fsConf, config.getConnection());

}


Map<String,String> map = ImmutableMap.of(

"fs.classpath.impl", ClassPathFileSystem.class.getName(),

"fs.dremio-local.impl", LocalSyncableFileSystem.class.getName()

);

for(Entry<String, String> prop : map.entrySet()) {

fsConf.set(prop.getKey(), prop.getValue());

}


this.optionExtractor = new FormatPluginOptionExtractor(context.getClasspathScan());

this.matchers = Lists.newArrayList();

this.layeredMatchers = Lists.newArrayList();

this.formatCreator = new FormatCreator(context, config, context.getClasspathScan(), this);

// Use default Hadoop implementation

this.codecFactory = HadoopCompressionCodecFactory.DEFAULT;

// 通过formatCreator 创建格式化匹配器

matchers.addAll(formatCreator.getFormatMatchers());

layeredMatchers.addAll(formatCreator.getLayeredFormatMatchers());


// boolean footerNoSeek = contetMutext.getOptionManager().getOption(ExecConstants.PARQUET_FOOTER_NOSEEK);

// NOTE: Add fallback format matcher if given in the configuration. Make sure fileMatchers is an order-preserving list.

this.systemUserFS = createFS(SYSTEM_USERNAME);

dropFileMatchers = matchers.subList(0, matchers.size());

this.fsHealthChecker = FSHealthChecker.getInstance(config.getPath(), config.getConnection(), getFsConf()).orElse((p,m) -> healthCheck(p, m));


createIfNecessary();

}

dremio 目前支持的格式

public static Map<String, FormatPluginConfig> getDefaultFormats() {

Map<String, FormatPluginConfig> defaultFormats = new TreeMap<>();

defaultFormats.put("csv", createTextFormatPlugin(false, ',', Lists.newArrayList("csv")));

defaultFormats.put("csvh", createTextFormatPlugin(true, ',', Lists.newArrayList("csvh")));

defaultFormats.put("tsv", createTextFormatPlugin(false, '\t', Lists.newArrayList("tsv")));

defaultFormats.put("psv", createTextFormatPlugin(false, '|', Lists.newArrayList("psv", "tbl")));

defaultFormats.put("txt", createTextFormatPlugin(false, '\u0000', Lists.newArrayList("txt")));

TextFormatConfig psva = createTextFormatPlugin(false, '|', Lists.newArrayList("psva", "tbla"));

psva.autoGenerateColumnNames = true;

defaultFormats.put("psva", psva);


defaultFormats.put("parquet", new ParquetFormatConfig());

defaultFormats.put("json", new JSONFormatPlugin.JSONFormatConfig());

defaultFormats.put("dremarrow1", new ArrowFormatPluginConfig());

defaultFormats.put("iceberg", new IcebergFormatConfig());

defaultFormats.put("delta", new DeltaLakeFormatConfig());

defaultFormats.put("xls", new ExcelFormatPluginConfig(true));

defaultFormats.put("excel", new ExcelFormatPluginConfig(false));

return defaultFormats;

}

说明

以上只是一个简单的说明,dremio 实际上还是复用了apache drill 的easy plugin 套路进行格式化的处理,同时基于此包装了不少,后边详细说明下


参考资料

sabot/kernel/src/main/java/com/dremio/exec/store/dfs/FormatMatcher.java

sabot/kernel/src/main/java/com/dremio/exec/store/dfs/FormatPlugin.java

sabot/kernel/src/main/java/com/dremio/exec/store/dfs/FormatCreator.java


举报

相关推荐

0 条评论