0
点赞
收藏
分享

微信扫一扫

dremio 的InformationSchemaCatalog 服务二

GhostInMatrix 2022-07-27 阅读 52

今天简单说明了下关于InformationSchemaCatalog 服务的介绍,当时介绍的InformationSchemaCatalog 更多属于基础能力(实际上这个类核心还是dremio 内部使用的)
对于在bi 工具以及系统使用的,dremio 是通过了独立的存储扩展(dremio 存储扩展比较方便,我们可以灵活的扩展dremio)

InfoSchemaConf定义

 

@SourceType(value = "INFORMATION_SCHEMA", configurable = false)

public class InfoSchemaConf extends ConnectionConf<InfoSchemaConf, InfoSchemaStoragePlugin> {

 

@Override

public InfoSchemaStoragePlugin newPlugin(SabotContext context, String name, Provider<StoragePluginId> pluginIdProvider) {

// InfoSchemaStoragePlugin 存储扩展,实现了对于数据的查询,name 以及明确了查询了格式必须是INFORMATION_SCHEMA了,就类似我们自己开发的plugin 配置的sourcename

return new InfoSchemaStoragePlugin(context, name);

}

 

@Override

public boolean isInternal() {

return true;

}

}

InfoSchemaStoragePlugin 处理

InfoSchemaStoragePlugin 核心是读取操作,因为实际的数据已经直接在不通catalog 处理了(catalog 依赖了InformationSchemaCatalog服务)

类图

dremio 的InformationSchemaCatalog 服务二_java

 

 

核心部分,主要包含了datasethandler 的处理

 

@Override

public DatasetHandleListing listDatasetHandles(GetDatasetOption... options) {

return () -> Arrays.stream(InformationSchemaTable.values()).iterator();

}

 

@Override

public Optional<DatasetHandle> getDatasetHandle(EntityPath datasetPath, GetDatasetOption... options) {

if (datasetPath.size() != 2) {

return Optional.empty();

}

 

final InformationSchemaTable table = TABLE_MAP.get(datasetPath.getName().toLowerCase());

if (table == null) {

return Optional.empty();

}

 

return Optional.of(table);

}

对于datasethandle 的处理,nformationSchema包装了自己了,使用了枚举类型(InformationSchemaTable 类)

对于数据的读取操作,使用了InformationSchemaRecordReader, 此方法内部使用了包装的rpc 服务(InformationSchemaServiceGrpc)

具体在catalog-api 这个服务中

dremio 的InformationSchemaCatalog 服务二_数据_02

 

 

代码

 

public class InformationSchemaRecordReader extends AbstractRecordReader {

private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InformationSchemaRecordReader.class);

// grpc 的InformationSchemaServiceBlockingStub

private final InformationSchemaServiceBlockingStub catalogStub;

// 使用了InformationSchemaTable

private final InformationSchemaTable table;

private final String catalogName;

private final String username;

private final SearchQuery searchQuery;

private final boolean complexTypeSupport;

 

private Context.CancellableContext context;

private TableWriter<?> tableWriter;

 

public InformationSchemaRecordReader(

OperatorContext context,

List<SchemaPath> fields,

InformationSchemaServiceBlockingStub catalogStub,

InformationSchemaTable table,

String catalogName,

String username,

SearchQuery searchQuery,

boolean complexTypeSupport

) {

super(context, fields);

this.catalogStub = catalogStub;

this.table = table;

this.catalogName = catalogName;

this.username = username;

this.searchQuery = searchQuery;

this.complexTypeSupport = complexTypeSupport;

}

 

@Override

public void setup(OutputMutator output) {

context = Context.current().withCancellation();

context.run(() -> {

tableWriter = createTableWriter();

tableWriter.init(output);

});

}

 

@Override

public int next() {

Preconditions.checkNotNull(tableWriter, "Reader must be #setup first");

return tableWriter.write(numRowsPerBatch);

}

 

@Override

public void close() throws Exception {

if (context != null) {

context.close();

}

 

context = null;

tableWriter = null;

}

 

@Override

protected boolean supportsSkipAllQuery() {

return true;

}

 

private Set<String> getGivenFields() {

return getColumns()

stream()

peek(path -> Preconditions.checkArgument(path.isSimplePath()))

map(path -> path.getAsUnescapedPath().toUpperCase())

collect(Collectors.toSet());

}

 

private TableWriter<?> createTableWriter() {

final Set<String> selectedFields =

isStarQuery() ? InformationSchemaMetadata.getAllFieldNames(table.getRecordSchema()) : getGivenFields();

 

switch (table) {

 

case CATALOGS: {

final ListCatalogsRequest.Builder catalogsRequest = ListCatalogsRequest.newBuilder()

setUsername(username);

if (searchQuery != null) {

catalogsRequest.setQuery(searchQuery);

}

 

// start Catalog stream from catalog service

return new CatalogsTableWriter(catalogStub.listCatalogs(catalogsRequest.build()), selectedFields, catalogName);

}

 

case SCHEMATA: {

final ListSchemataRequest.Builder schemataRequest = ListSchemataRequest.newBuilder()

setUsername(username);

if (searchQuery != null) {

schemataRequest.setQuery(searchQuery);

}

 

// start Schema stream from catalog service

return new SchemataTableWriter(catalogStub.listSchemata(schemataRequest.build()), selectedFields, catalogName);

}

 

case TABLES: {

final ListTablesRequest.Builder tablesRequest = ListTablesRequest.newBuilder()

setUsername(username);

if (searchQuery != null) {

tablesRequest.setQuery(searchQuery);

}

 

// start Table stream from catalog service

return new TablesTableWriter(catalogStub.listTables(tablesRequest.build()), selectedFields, catalogName);

}

 

case VIEWS: {

final ListViewsRequest.Builder viewsRequest = ListViewsRequest.newBuilder()

setUsername(username);

if (searchQuery != null) {

viewsRequest.setQuery(searchQuery);

}

 

// start View stream from catalog service

return new ViewsTableWriter(catalogStub.listViews(viewsRequest.build()), selectedFields, catalogName);

}

 

case COLUMNS: {

final ListTableSchemataRequest.Builder columnsRequest = ListTableSchemataRequest.newBuilder()

setUsername(username);

if (searchQuery != null) {

columnsRequest.setQuery(searchQuery);

}

// start TableSchema stream from catalog service

final Iterator<TableSchema> tableSchemata = catalogStub.listTableSchemata(columnsRequest.build());

 

// For each TableSchema, iterates over #flatMap of batch_schema field, which represents the records in the

// "COLUMNS" table, and not the TableSchema message itself (unlike other tables).

final Iterator<Column> columnIterator = new AbstractIterator<Column>() {

Iterator<Column> currentIterator = null;

 

@Override

protected Column computeNext() {

while (true) {

if (currentIterator != null && currentIterator.hasNext()) {

return currentIterator.next();

}

 

if (!tableSchemata.hasNext()) {

return endOfData();

}

 

// Gets next TableSchema from the catalog service only after exhausting the current one. See comment in

// TableWriter#write.

final TableSchema currentSchema = tableSchemata.next();

BatchSchema bs = BatchSchema.deserialize(currentSchema.getBatchSchema().toByteArray());

//If an inconsistency is detected don't attempt converting to Arrow format since it will cause an assertion failure.  Put out a warning and move on to next row.

if (bs.getFieldCount() == 0) {

// Add a warning message to indicate this table has missing fields

logger.warn("{}.{}.{} has missing fields or incorrect format. ", currentSchema.getCatalogName(), currentSchema.getSchemaName(), currentSchema.getTableName());

continue;

}

final RelDataType rowType =

CalciteArrowHelper.wrap(bs)

toCalciteRecordType(JavaTypeFactoryImpl.INSTANCE, complexTypeSupport);

//noinspection ConstantConditions

currentIterator = Iterators.transform(rowType.getFieldList().iterator(),

field -> new Column(Strings.isNullOrEmpty(catalogName) ? currentSchema.getCatalogName() : catalogName,

currentSchema.getSchemaName(),

currentSchema.getTableName(),

field));

}

}

};

return new ColumnsTableWriter(columnIterator, selectedFields, catalogName);

}

default:

throw UserException.unsupportedError()

message("InformationSchemaRecordReader does not support table of '%s' type", table)

buildSilently();

}

}

}

此调用链的学习,可以参考我以前写的文章

参考调用链

dremio 的InformationSchemaCatalog 服务二_数据_03

 

 

说明

如果了解presto 以及trion 这类查询引擎的话,他们的InformationSchema 是到插件级别的,并不是类似dremio以及drill 这种工具的,全局级别的

参考资料

sabot/kernel/src/main/java/com/dremio/exec/store/ischema/InfoSchemaConf.java
sabot/kernel/src/main/java/com/dremio/exec/store/ischema/InformationSchemaTable.java
sabot/kernel/src/main/java/com/dremio/exec/store/ischema/InfoSchemaStoragePlugin.java
sabot/kernel/src/main/java/com/dremio/exec/store/ischema/InformationSchemaRecordReader.java
services/catalog-api/target/generated-sources/protobuf/com/dremio/service/catalog/InformationSchemaServiceGrpc.java

举报

相关推荐

0 条评论