ParallelCoordinator类定义在src/reader.hpp文件中,该类用于支持postgres parallel foreign scan能力。ParallelCoordinator类支持单个parquet文件扫描并行PC_SINGLE和多个parquet文件扫描并行PC_MULTI。latch提供多个进程同步的锁,用于保护data数据域,确定进程所使用的ParquetReader所扫描的parquet files和rowgroup。data联合体保存了对于PC_SINGLE的数据结构体single和对于PC_MULTI的数据结构体multi。
class ParallelCoordinator{
private:
    enum Type { PC_SINGLE = 0, PC_MULTI };
    Type        type;
    
    slock_t     latch;
    
    union {
        struct {
            int32   reader;     /* current reader */        
            int32   nfiles;     /* number of parquet files to read */
            int32   rowgroup;   /* current rowgroup */
            int32   nrowgroups[FLEXIBLE_ARRAY_MEMBER]; /* per-file rowgroups numbers */
        } single;               /* single file and simple multifile case */
        struct {
            int32   next_rowgroup[FLEXIBLE_ARRAY_MEMBER]; /* per-reader counters */
        } multi;   /* multimerge case */
    } data;
public:
    void lock() { SpinLockAcquire(&latch); }
    void unlock() { SpinLockRelease(&latch); }
};init
init_single函数用于为PC_SINGLE初始化data.single数据域、初始化latch slock_t。PC_SINGLE初始化主要是设置nfiles和nrowgroups。init_multi函数用于为PC_MULTI初始化data.multi.next_rowgroup域。
void init_single(int32 *nrowgroups, int32 nfiles){
        type = PC_SINGLE;
        data.single.reader = -1; data.single.rowgroup =-1; data.single.nfiles = nfiles;
        SpinLockInit(&latch);
        if (nfiles) memcpy(data.single.nrowgroups, nrowgroups, sizeof(int32) * nfiles);
    }
    void init_multi(int nfiles){
        type = PC_MULTI;
        for (int i = 0; i < nfiles; ++i)
            data.multi.next_rowgroup[i] = 0;
    }next_reader
next_reader函数主要用于PC_SINGLE,主要考虑两个情况:所有parquet文件是否全部读取、当前parquet文件包含的rowgroup是否全部读取。如果当前parquet文件未读取、当前parquet文件包含的rowgroup未读取,则返回当前reader。否则递增当前reader标识符和重置当前rowgroup计数器。
/* Get the next reader id. Caller must hold the lock. */
int32 next_reader(){
if (type == PC_SINGLE) {
/* Return current reader if it has more rowgroups to read */
if (data.single.reader >= 0 && data.single.reader < data.single.nfiles && data.single.nrowgroups[data.single.reader] > data.single.rowgroup + 1)
return data.single.reader;
data.single.reader++; data.single.rowgroup = -1;
return data.single.reader;
}
Assert(false && "unsupported");
return -1;
}
next_rowgroup
next_rowgroup函数可以用于PC_SINGLE和PC_MULTI。对于PC_MULTI,通过reader_id递增per-reader counters计数器next_rowgroup[reader_id] 。对于PC_SINGLE,递增data.single.rowgroup计数器。
/* Get the next reader id. Caller must hold the lock. */
int32 next_rowgroup(int32 reader_id) {
if (type == PC_SINGLE){
if (reader_id != data.single.reader) return -1;
return ++data.single.rowgroup;
} else {
return data.multi.next_rowgroup[reader_id]++;
}
Assert(false && "unsupported");
return -1;
}
                










