0
点赞
收藏
分享

微信扫一扫

dremio web ui手工format 处理简单说明

以前我介绍过通过自动提升处理format,当然dremio也支持手工操作,对于处理包含了文件以及文件夹类型的,我主要说下
关于文件夹的

处理简单说明

对于ui部分是首先进行预处理,获取可能的类型格式,然后就是通过格式预览服务进行数据处理

folder_format参考处理

  • 代码
    包含了一个快速以及默认,默认配置是快速检测

 @GET
  @Path("/folder_format/{path: .*}")
  @Produces(MediaType.APPLICATION_JSON)
  public FileFormatUI getFolderFormat(@PathParam("path") String path)
      throws PhysicalDatasetNotFoundException, NamespaceException, SourceNotFoundException, IOException {
    if (useFastPreview()) {
      SourceFolderPath folderPath = asFolderPath(path);
     // 使用formatTools 工具类处理
      return new FileFormatUI(formatTools.getOrDetectFormat(folderPath, DatasetType.PHYSICAL_DATASET_SOURCE_FOLDER), folderPath);
    }
 
    SourceFolderPath folderPath = SourceFolderPath.fromURLPath(sourceName, path);
    sourceService.checkSourceExists(folderPath.getSourceName());
 
    FileFormat fileFormat;
    try {
      final PhysicalDatasetConfig physicalDatasetConfig = sourceService.getFilesystemPhysicalDataset(folderPath);
      fileFormat = FileFormat.getForFolder(physicalDatasetConfig.getFormatSettings());
      fileFormat.setVersion(physicalDatasetConfig.getTag());
    } catch (PhysicalDatasetNotFoundException nfe) {
      fileFormat = sourceService.getDefaultFileFormat(sourceName, folderPath, securityContext.getUserPrincipal().getName());
    }
    return new FileFormatUI(fileFormat, folderPath);
  }
FormatTools getOrDetectFormat处理
 public FileFormat getOrDetectFormat(NamespacePath folderPath, DatasetType expectedType) throws NamespaceException {
    // folder
    final FileFormat fileFormat;
    try {
      final PhysicalDatasetConfig physicalDatasetConfig = sourceService.getFilesystemPhysicalDataset(folderPath, expectedType);
 
      if (!expectedType.equals(physicalDatasetConfig.getType())) {
        throw new IllegalStateException(
            String.format(
                "Expected format of type %s but actually of format %s.",
                expectedType,
                physicalDatasetConfig.getType()
                )
            );
      }
 
      // determine whether folder or file.
      final boolean isFolder;
      final DatasetType datasetType = physicalDatasetConfig.getType();
      Span.current().setAttribute("formattools.getOrDetectFormat.datasetType", datasetType.name());
 
      switch(datasetType) {
        case PHYSICAL_DATASET_HOME_FILE:
        case PHYSICAL_DATASET_SOURCE_FILE:
          isFolder = false;
          break;
        case PHYSICAL_DATASET_SOURCE_FOLDER:
        case PHYSICAL_DATASET_HOME_FOLDER:
          isFolder = true;
          break;
        case PHYSICAL_DATASET:
        case VIRTUAL_DATASET:
        default:
          throw new IllegalStateException("Dataset is neither a file nor a folder.");
      }
 
      final FileConfig fileConfig = physicalDatasetConfig.getFormatSettings();
     //  核心是基于FileFormat 工具类处理的
      fileFormat = isFolder ? FileFormat.getForFolder(fileConfig) : FileFormat.getForFile(fileConfig);
      fileFormat.setVersion(physicalDatasetConfig.getTag());
      return fileFormat;
    } catch (PhysicalDatasetNotFoundException nfe) {
      // ignore and fall through to detect the format, so we don't have extra nested blocks.
    }
 
    final NamespaceKey key = folderPath.toNamespaceKey();
    return detectFileFormat(key);
  }
FileFormat get 方法
  private static FileFormat get(FileConfig fileConfig) {
    // TODO (Amit H) Remove after defining classes for tsv, csv, and psv
    FileType fileType = fileConfig.getType();
    if (fileType == FileType.CSV || fileType == FileType.TSV || fileType == FileType.PSV) {
      fileType = FileType.TEXT;
    }
    final Class<? extends FileFormat> fileFormatClass = FileFormatDefinitions.CLASS_TYPES.get(fileType);
    final Schema<FileFormat> schema = (Schema<FileFormat>) FileFormatDefinitions.SCHEMAS.get(fileFormatClass);
 
    final FileFormat fileFormat = schema.newMessage();
    if (fileConfig.getExtendedConfig() != null) {
      ProtobufIOUtil.mergeFrom(fileConfig.getExtendedConfig().toByteArray(), fileFormat, schema);
    }
 
    fileFormat.setCtime(fileConfig.getCtime());
    fileFormat.setName(fileConfig.getName());
    fileFormat.setOwner(fileConfig.getOwner());
    fileFormat.setFullPath(fileConfig.getFullPathList());
    fileFormat.setVersion(fileConfig.getTag());
    fileFormat.setLocation(fileConfig.getLocation());
    return fileFormat;
  }
接口返回格式
参考如下
{
    "fileFormat": {
        "type": "Iceberg", //  类型
        "name": "NYC-taxi-trips-iceberg",
        "fullPath": [
            "Samples",
            "samples.dremio.com",
            "NYC-taxi-trips-iceberg"
        ],
        "ctime": 1708002674824,
        "isFolder": true
    },
    "links": {
        "format_preview": "/source/Samples/folder_preview/samples.dremio.com/NYC-taxi-trips-iceberg", // 格式预览地址
        "self": "/source/Samples/folder_format/samples.dremio.com/NYC-taxi-trips-iceberg"
    },
    "id": "/source/Samples/folder_format/samples.dremio.com/NYC-taxi-trips-iceberg"
}
格式预览处理
  protected JobDataFragment previewFormat(FileFormat format, NamespacePath path) {
    if (useFastPreview()) {
      try {
      // 与格式预处理类似也提供了快速以及默认,默认是快速
        return formatTools.previewData(format, path, false);
      } catch (AccessControlException e) {
        throw UserException.validationError()
          .message(e.getMessage())
          .buildSilently();
      }
    }
    SourceFilePath filePath = SourceFilePath.fromURLPath(sourceName, path.toPathString());
    return executor.previewPhysicalDataset(filePath.toString(), format, getOrCreateAllocator("previewFileFormat"));
  }
快速预览使用了格式工具类处理的
实际上是通过格式插件处理的,dremio 实现了不同数据类型的处理,直接读取的是文件系统的数据

 public JobDataFragment previewData(FileFormat format, NamespacePath namespacePath, boolean useFormatLocation) {
    final NamespaceKey key = namespacePath.toNamespaceKey();
    final FileSystemPlugin<?> plugin = getPlugin(key);
    FileSystem fs;
    try {
      fs = plugin.createFS(securityContext.getUserPrincipal().getName());
    } catch (IOException ex) {
      throw new IllegalStateException("No files detected or unable to read data.", ex);
    }
    final Path path = FileSelection.getPathBasedOnFullPath(plugin.resolveTableNameToValidPath(key.getPathComponents()));
 
    // for now, existing rudimentary behavior that uses extension detection.
    final FileAttributes attributes;
    try {
      attributes = fs.getFileAttributes(path);
    } catch(IOException ex) {
      // we could return unknown but if there are no files, what's the point.
      throw new IllegalStateException("No files detected or unable to read data.", ex);
    }
 
    try {
      final FormatPluginConfig formatPluginConfig = PhysicalDatasetUtils.toFormatPlugin(format.asFileConfig(), Collections.<String>emptyList());
      final FormatPlugin formatPlugin = plugin.getFormatPlugin(formatPluginConfig);
 
      if(attributes.isRegularFile()) {
        return getData(formatPlugin, fs, Collections.singleton(attributes).iterator());
      }
 
      try(DirectoryStream<FileAttributes> files = formatPlugin.getFilesForSamples(fs, plugin, path)) {
        Iterator<FileAttributes> iter = files.iterator();
        return getData(formatPlugin, fs, Iterators.filter(iter, FileAttributes::isRegularFile));
      }
    } catch (DirectoryIteratorException ex) {
      throw new RuntimeException(ex.getCause());
    } catch (Exception ex) {
      throw Throwables.propagate(ex);
    }
  }
默认预览处理
默认的处理是通过QueryExecutor 的previewPhysicalDataset 处理的,内部是通过JobsService 接口的实现处理的

  public JobDataFragment previewPhysicalDataset(String table, FileFormat formatOptions, BufferAllocator allocator) {
   //  注意此处因为没有元数据信息,所以dremio 对于sql 查询使用了一个table 表函数(macro)进行处理,这样就不需要metadata 了,不然dremio 会提示表找不到
    final com.dremio.service.job.SqlQuery query = JobRequestUtil.createSqlQuery(format("select * from table(%s (%s))", table, formatOptions.toTableOptions()),
      null, context.getUserPrincipal().getName());
    // We still need to truncate the results to 500 as the preview physical datasets doesn't support pagination yet
    final CompletionListener listener = new CompletionListener();
    final JobSubmission jobSubmission = jobsService.submitJob(
      SubmitJobRequest.newBuilder().setSqlQuery(query).setQueryType(com.dremio.service.job.QueryType.UI_INITIAL_PREVIEW).build(),
      listener);
    listener.awaitUnchecked();
 
    return new JobDataWrapper(jobsService, jobSubmission.getJobId(), jobSubmission.getSessionId(), SystemUser.SYSTEM_USERNAME)
      .truncate(allocator, 500);
  }
}

表函数生成的sql参考
一个示例(这样就可以直接查询了,就不直接需要metadata 了)
select * from table(s3v2.data (type => 'text', fieldDelimiter => ',', comment => '#', "escape" => '"', quote => '"', lineDelimiter => '
', extractHeader => false, skipFirstLine => false, autoGenerateColumnNames => true, trimHeader => true))
JobsService 处理
实际上是发起了一个grpc 请求,获取数据

  public JobSubmission submitJob(SubmitJobRequest jobRequest, JobStatusListener statusListener) {
    final JobStatusListenerAdapter adapter = new JobStatusListenerAdapter(statusListener);
    getAsyncStub().submitJob(jobRequest, adapter);
    JobSubmission jobSubmission = adapter.getJobSubmission();
    enableTraces(jobSubmission);
    return jobSubmission;
  }

@Path("/folder_format/{path: .*}")

  • 后续处理

预览之后dremio 同时还会发起一个put 请求对于实际dataset 数据格式的更新,实际上操作的就是datasets 的元数据,确保之后查询直接可以作为一个dremio 标准表了

说明

以上是一个简单说明ui 层是如何进行格式化处理的,内部详细的可以阅读源码,format 是dremio ui 层一个很不错的功能,可以实现灵活的数据处理,设计上值得好好学习下

参考资料

dac/backend/src/main/java/com/dremio/dac/resource/SourceResource.java
dac/backend/src/main/java/com/dremio/dac/model/sources/FormatTools.java
dac/backend/src/main/java/com/dremio/dac/service/source/SourceService.java
services/namespace/src/main/java/com/dremio/service/namespace/file/FileFormat.java
services/namespace/src/main/java/com/dremio/service/namespace/file/FileFormatDefinitions.java
sabot/kernel/src/main/java/com/dremio/exec/store/dfs/FormatPlugin.java
dac/backend/src/main/java/com/dremio/dac/explore/QueryExecutor.java
services/jobs/src/main/java/com/dremio/service/jobs/HybridJobsService.java
services/jobs/src/main/java/com/dremio/service/jobs/JobsService.java
services/jobs/target/generated-sources/protobuf/grpc-java/com/dremio/service/job/JobsServiceGrpc.java

举报

相关推荐

0 条评论