0
点赞
收藏
分享

微信扫一扫

PostgreSQL数据库FDW——Parquet S3 ParquetS3FdwExecutionState


PostgreSQL数据库FDW——Parquet S3 ParquetS3FdwExecutionState_数据库


create_parquet_execution_state函数根据ReaderType类型创建不同的ExecutionState子类,以实现不同的执行行为。

ParquetS3FdwExecutionState *create_parquet_execution_state(ReaderType reader_type, MemoryContext reader_cxt,
const char *dirname, Aws::S3::S3Client *s3_client,
TupleDesc tuple_desc,
std::set<int> &attrs_used, std::list<SortSupportData> sort_keys,
bool use_threads, bool use_mmap, int32_t max_open_files,
bool schemaless,
std::set<std::string> slcols, std::set<std::string> sorted_cols){
switch (reader_type) {
case RT_TRIVIAL: return new TrivialExecutionStateS3();
case RT_SINGLE: return new SingleFileExecutionStateS3(reader_cxt, dirname, s3_client, tuple_desc, attrs_used, use_threads, use_mmap, schemaless, slcols, sorted_cols);
case RT_MULTI: return new MultifileExecutionStateS3(reader_cxt, dirname, s3_client, tuple_desc, attrs_used, use_threads, use_mmap, schemaless, slcols, sorted_cols);
case RT_MULTI_MERGE: return new MultifileMergeExecutionStateS3(reader_cxt, dirname, s3_client, tuple_desc, attrs_used, sort_keys, use_threads, use_mmap, schemaless, slcols, sorted_cols); // 多传入了sort_keys
case RT_CACHING_MULTI_MERGE: return new CachingMultifileMergeExecutionStateS3(reader_cxt, dirname, s3_client, tuple_desc, attrs_used, sort_keys, use_threads, use_mmap, max_open_files, schemaless, slcols, sorted_cols); // 多传入了sort_keys和max_open_files
default: throw std::runtime_error("unknown reader type");
}
}

SingleFileExecutionStateS3

SingleFileExecutionStateS3类的成员除了构造函数会初始化的外还有reader和coord成员需要通过setter函数设置。从下面代码可以看出set_coordinator函数会设置SingleFileExecutionStateS3类对象的coord成员,如果设置了reader,还需要将coord设置到reader成员中;init_coord函数调用ParallelCoordinator对象的init_single函数,init_single函数用于为PC_SINGLE初始化data.single数据域、初始化latch slock_t。PC_SINGLE初始化主要是设置nfiles和nrowgroups。next和rescan函数主要是调用reader的相关函数进行处理。

class SingleFileExecutionStateS3 : public ParquetS3FdwExecutionState{
private:
ParquetReader *reader;
MemoryContext cxt;
ParallelCoordinator *coord;
TupleDesc tuple_desc;
std::set<int> attrs_used;
bool use_mmap;
bool use_threads;
const char *dirname;
Aws::S3::S3Client *s3_client;
bool schemaless;
std::set<std::string> slcols;
std::set<std::string> sorted_cols;
public:
void set_coordinator(ParallelCoordinator *coord) {
this->coord = coord;
if (reader) reader->set_coordinator(coord);
}
Size estimate_coord_size(){ return sizeof(ParallelCoordinator); }
void init_coord() { coord->init_single(NULL, 0); }

bool next(TupleTableSlot *slot, bool fake) {
ReadStatus res;
if ((res = reader->next(slot, fake)) == RS_SUCCESS)
ExecStoreVirtualTuple(slot);
return res == RS_SUCCESS;
}
void rescan(void){
reader->rescan();
}

add_file函数接受需要扫描的rowgroups作为形参,通过create_parquet_reader创建CachingParquetReader或DefaultParquetReader类。调用set_rowgroups_list将rowgroups列表设置到reader成员rowgroups中。调用open函数创建或获取真正的arrow::FileReader。设置schemaless、slcols、sorted_cols信息到ParquetReader对应成员中,最后调用create_column_mapping函数创建arrow列类型到postgres列类型的映射表。

void add_file(const char *filename, List *rowgroups){
ListCell *lc;
std::vector<int> rg;
foreach (lc, rowgroups) rg.push_back(lfirst_int(lc));

reader = create_parquet_reader(filename, cxt);
reader->set_options(use_threads, use_mmap); reader->set_rowgroups_list(rg);

if (s3_client) reader->open(dirname, s3_client);
else reader->open();
reader->set_schemaless_info(schemaless, slcols, sorted_cols);
reader->create_column_mapping(tuple_desc, attrs_used);
}

MultifileExecutionStateS3

MultifileExecutionStateS3相对于SingleFileExecutionStateS3的成员,新增了files vector和cur_reader成员。首先来看一下这两个成员涉及的函数,首先coord还是在PC_SINGLE模式下,cur_reader代表的是当前已经处理files元素FileRowgroups的序号,故在get_next_reader函数中在并行扫描时首先利用coord确认扫描files中哪一个FileRowgroups;如果是串行扫描时,则自行递增cur_reader计数器(因为无需协调进程扫描哪个FileRowgroups)。add_file函数即是初始化FileRowgroups结构体并设置files成员。

class MultifileExecutionStateS3 : public ParquetS3FdwExecutionState{
private:
struct FileRowgroups{
std::string filename;
std::vector<int> rowgroups;
};
std::vector<FileRowgroups> files;
uint64_t cur_reader;

ParquetReader *get_next_reader() {
ParquetReader *r;
if (coord) { coord->lock(); cur_reader = coord->next_reader(); coord->unlock(); }
if (cur_reader >= files.size() || cur_reader < 0) return NULL;

r = create_parquet_reader(files[cur_reader].filename.c_str(), cxt, cur_reader);
r->set_rowgroups_list(files[cur_reader].rowgroups);
r->set_options(use_threads, use_mmap);
r->set_coordinator(coord);
if (s3_client) r->open(dirname, s3_client);
else r->open();
r->set_schemaless_info(schemaless, slcols, sorted_cols);
r->create_column_mapping(tuple_desc, attrs_used);

cur_reader++;
return r;
}
void add_file(const char *filename, List *rowgroups){
FileRowgroups fr;
ListCell *lc;
fr.filename = filename;
foreach (lc, rowgroups) fr.rowgroups.push_back(lfirst_int(lc));
files.push_back(fr);
}

注意初始化coord还是在PC_SINGLE模式下,通过init_coord函数可以看出,调用init_single函数初始化filename的数量,和每个file需要扫描的rowgroups的数量。

void init_coord() {
ParallelCoordinator *coord = (ParallelCoordinator *) this->coord;
int i = 0;
int32 *nrowgroups = (int32 *) palloc(sizeof(int32) * files.size());
for (auto &file : files)
nrowgroups[i++] = file.rowgroups.size();
coord->init_single(nrowgroups, files.size());
pfree(nrowgroups);
}

最后的next函数会通过调用get_next_reader针对获取到的不同的filename创建新的parquet_reader,并对其包含的rowgroup进行扫描。其中添加了,如果filename文件处理完毕,可以通过再次调用get_next_reader函数获取新的针对不同文件的parquet_reader,并对其包含的rowgroup进行扫描。

bool next(TupleTableSlot *slot, bool fake=false){
ReadStatus res;
if (unlikely(reader == NULL)) {
if ((reader = this->get_next_reader()) == NULL)
return false;
}
res = reader->next(slot, fake);

/* Finished reading current reader? Proceed to the next one */
if (unlikely(res != RS_SUCCESS)) {
while (true) {
if (reader) delete reader;
reader = this->get_next_reader();
if (!reader) return false;
res = reader->next(slot, fake);
if (res == RS_SUCCESS) break;
}
}

if (res == RS_SUCCESS) { /* ExecStoreVirtualTuple doesn't throw postgres exceptions thus no need to wrap it into PG_TRY / PG_CATCH */
ExecStoreVirtualTuple(slot);
}
return res;
}


举报

相关推荐

s3 操作

0 条评论