0
点赞
收藏
分享

微信扫一扫

PostgreSQL数据库进程数据结构——ProcGlobal


allProcs数组包含了后端/台进程PGPROC槽(一个数组元素算一个槽)。allPgXact数组包含了后端/台进程PGXACT槽。freeProcs是指后端/台进程PGPROC空闲链表,autovacFreeProcs是指autovacuum进程PGPROC空闲链表,bgworkerFreeProcs是指bgworker进程PGPROC空闲链表,walsenderFreeProc是指swalsender进程PGPROC空闲链表。

/* Pointers to shared-memory structures */
PROC_HDR *ProcGlobal = NULL;
/* There is one ProcGlobal struct for the whole database cluster. */
typedef struct PROC_HDR {
/* Array of PGPROC structures (not including dummies for prepared txns) */
PGPROC *allProcs;
/* Array of PGXACT structures (not including dummies for prepared txns) */
PGXACT *allPgXact;
/* Length of allProcs array */
uint32 allProcCount;
/* Head of list of free PGPROC structures */
PGPROC *freeProcs;
/* Head of list of autovacuum's free PGPROC structures */
PGPROC *autovacFreeProcs;
/* Head of list of bgworker free PGPROC structures */
PGPROC *bgworkerFreeProcs;
/* Head of list of walsender free PGPROC structures */
PGPROC *walsenderFreeProcs;
/* First pgproc waiting for group XID clear */
pg_atomic_uint32 procArrayGroupFirst;
/* First pgproc waiting for group transaction status update */
pg_atomic_uint32 clogGroupFirst;
/* WALWriter process's latch */
Latch *walwriterLatch;
/* Checkpointer process's latch */
Latch *checkpointerLatch;
/* Current shared estimate of appropriate spins_per_delay value */
int spins_per_delay;
/* The proc of the Startup process, since not in ProcArray */
PGPROC *startupProc;
int startupProcPid;
/* Buffer id of the buffer that Startup process waits for pin on, or -1 */
int startupBufferPinWaitBufId;
} PROC_HDR;

初始化

/* Report shared-memory space needed by InitProcGlobal. */
Size ProcGlobalShmemSize(void) {
Size size = 0;
/* ProcGlobal */
size = add_size(size, sizeof(PROC_HDR));
/* MyProcs, including autovacuum workers and launcher */
size = add_size(size, mul_size(MaxBackends, sizeof(PGPROC)));
/* AuxiliaryProcs */
size = add_size(size, mul_size(NUM_AUXILIARY_PROCS, sizeof(PGPROC)));
/* Prepared xacts */
size = add_size(size, mul_size(max_prepared_xacts, sizeof(PGPROC)));
/* ProcStructLock */
size = add_size(size, sizeof(slock_t));
size = add_size(size, mul_size(MaxBackends, sizeof(PGXACT)));
size = add_size(size, mul_size(NUM_AUXILIARY_PROCS, sizeof(PGXACT)));
size = add_size(size, mul_size(max_prepared_xacts, sizeof(PGXACT)));
return size;
}
/* Report number of semaphores needed by InitProcGlobal. */
int ProcGlobalSemas(void) {
/* We need a sema per backend (including autovacuum), plus one for each auxiliary process. */
return MaxBackends + NUM_AUXILIARY_PROCS;
}

PostgreSQL数据库进程数据结构——ProcGlobal_数据库

/* InitProcGlobal -
* Initialize the global process table during postmaster or standalone
* backend startup.
* We also create all the per-process semaphores we will need to support
* the requested number of backends. We used to allocate semaphores
* only when backends were actually started up, but that is bad because
* it lets Postgres fail under load --- a lot of Unix systems are
* (mis)configured with small limits on the number of semaphores, and
* running out when trying to start another backend is a common failure.
* So, now we grab enough semaphores to support the desired max number
* of backends immediately at initialization --- if the sysadmin has set
* MaxConnections, max_worker_processes, max_wal_senders, or
* autovacuum_max_workers higher than his kernel will support, he'll
* find out sooner rather than later.
* Another reason for creating semaphores here is that the semaphore
* implementation typically requires us to create semaphores in the
* postmaster, not in backends.
* Note: this is NOT called by individual backends under a postmaster,
* not even in the EXEC_BACKEND case. The ProcGlobal and AuxiliaryProcs
* pointers must be propagated specially for EXEC_BACKEND operation. */
void InitProcGlobal(void) {
PGPROC *procs; PGXACT *pgxacts;
int i, j;
bool found;
uint32 TotalProcs = MaxBackends + NUM_AUXILIARY_PROCS + max_prepared_xacts;

/* Create the ProcGlobal shared structure */
ProcGlobal = (PROC_HDR *)ShmemInitStruct("Proc Header", sizeof(PROC_HDR), &found);Assert(!found);

/* Initialize the data structures. */
ProcGlobal->spins_per_delay = DEFAULT_SPINS_PER_DELAY;
ProcGlobal->freeProcs = NULL;
ProcGlobal->autovacFreeProcs = NULL;
ProcGlobal->bgworkerFreeProcs = NULL;
ProcGlobal->walsenderFreeProcs = NULL;
ProcGlobal->startupProc = NULL;
ProcGlobal->startupProcPid = 0;
ProcGlobal->startupBufferPinWaitBufId = -1;
ProcGlobal->walwriterLatch = NULL;
ProcGlobal->checkpointerLatch = NULL;
pg_atomic_init_u32(&ProcGlobal->procArrayGroupFirst, INVALID_PGPROCNO);
pg_atomic_init_u32(&ProcGlobal->clogGroupFirst, INVALID_PGPROCNO);

/* Create and initialize all the PGPROC structures we'll need. There are
* five separate consumers: (1) normal backends, (2) autovacuum workers
* and the autovacuum launcher, (3) background workers, (4) auxiliary
* processes, and (5) prepared transactions. Each PGPROC structure is
* dedicated to exactly one of these purposes, and they do not move
* between groups. */
procs = (PGPROC *) ShmemAlloc(TotalProcs * sizeof(PGPROC));
MemSet(procs, 0, TotalProcs * sizeof(PGPROC)); ProcGlobal->allProcs = procs;
/* XXX allProcCount isn't really all of them; it excludes prepared xacts */
ProcGlobal->allProcCount = MaxBackends + NUM_AUXILIARY_PROCS;

/* Also allocate a separate array of PGXACT structures. This is separate
* from the main PGPROC array so that the most heavily accessed data is
* stored contiguously in memory in as few cache lines as possible. This
* provides significant performance benefits, especially on a
* multiprocessor system. There is one PGXACT structure for every PGPROC
* structure. */
pgxacts = (PGXACT *) ShmemAlloc(TotalProcs * sizeof(PGXACT));
MemSet(pgxacts, 0, TotalProcs * sizeof(PGXACT)); ProcGlobal->allPgXact = pgxacts;

for (i = 0; i < TotalProcs; i++) {
/* Common initialization for all PGPROCs, regardless of type. */
/* Set up per-PGPROC semaphore, latch, and backendLock. Prepared xact
* dummy PGPROCs don't need these though - they're never associated
* with a real process */
if (i < MaxBackends + NUM_AUXILIARY_PROCS){
procs[i].sem = PGSemaphoreCreate();
InitSharedLatch(&(procs[i].procLatch));
LWLockInitialize(&(procs[i].backendLock), LWTRANCHE_PROC);
}
procs[i].pgprocno = i;

/* Newly created PGPROCs for normal backends, autovacuum and bgworkers
* must be queued up on the appropriate free list. Because there can
* only ever be a small, fixed number of auxiliary processes, no free
* list is used in that case; InitAuxiliaryProcess() instead uses a
* linear search. PGPROCs for prepared transactions are added to a
* free list by TwoPhaseShmemInit(). */
if (i < MaxConnections){ /* PGPROC for normal backend, add to freeProcs list */
procs[i].links.next = (SHM_QUEUE *) ProcGlobal->freeProcs;
ProcGlobal->freeProcs = &procs[i];
procs[i].procgloballist = &ProcGlobal->freeProcs;
}else if (i < MaxConnections + autovacuum_max_workers + 1){ /* PGPROC for AV launcher/worker, add to autovacFreeProcs list */
procs[i].links.next = (SHM_QUEUE *) ProcGlobal->autovacFreeProcs;
ProcGlobal->autovacFreeProcs = &procs[i];
procs[i].procgloballist = &ProcGlobal->autovacFreeProcs;
}else if (i < MaxConnections + autovacuum_max_workers + 1 + max_worker_processes){ /* PGPROC for bgworker, add to bgworkerFreeProcs list */
procs[i].links.next = (SHM_QUEUE *) ProcGlobal->bgworkerFreeProcs;
ProcGlobal->bgworkerFreeProcs = &procs[i];
procs[i].procgloballist = &ProcGlobal->bgworkerFreeProcs;
}else if (i < MaxBackends){ /* PGPROC for walsender, add to walsenderFreeProcs list */
procs[i].links.next = (SHM_QUEUE *) ProcGlobal->walsenderFreeProcs;
ProcGlobal->walsenderFreeProcs = &procs[i];
procs[i].procgloballist = &ProcGlobal->walsenderFreeProcs;
}

/* Initialize myProcLocks[] shared memory queues. */
for (j = 0; j < NUM_LOCK_PARTITIONS; j++) SHMQueueInit(&(procs[i].myProcLocks[j]));

/* Initialize lockGroupMembers list. */
dlist_init(&procs[i].lockGroupMembers);

/* Initialize the atomic variables, otherwise, it won't be safe to
* access them for backends that aren't currently in use. */
pg_atomic_init_u32(&(procs[i].procArrayGroupNext), INVALID_PGPROCNO);
pg_atomic_init_u32(&(procs[i].clogGroupNext), INVALID_PGPROCNO);
}

/* Save pointers to the blocks of PGPROC structures reserved for auxiliary
* processes and prepared transactions. */
AuxiliaryProcs = &procs[MaxBackends];
PreparedXactProcs = &procs[MaxBackends + NUM_AUXILIARY_PROCS];
/* Create ProcStructLock spinlock, too */
ProcStructLock = (slock_t *) ShmemAlloc(sizeof(slock_t));
SpinLockInit(ProcStructLock);
}

申请一个PGPROC

InitProcess函数为后端进程初始化进程数据结构体PGPROC。根据后端进程类型从不同的空闲链表中获取空闲的PGPROC结构体。获取ProcStructLock锁,从空闲链表中获取一个PGPROC结构体,将空闲链表指向links.next所指向的PGPROC结构体。

/* InitProcess -- initialize a per-process data structure for this backend */
void InitProcess(void) {
PGPROC *volatile *procgloballist;
/* Decide which list should supply our PGPROC. */
if (IsAnyAutoVacuumProcess()) procgloballist = &ProcGlobal->autovacFreeProcs;
else if (IsBackgroundWorker) procgloballist = &ProcGlobal->bgworkerFreeProcs;
else if (am_walsender) procgloballist = &ProcGlobal->walsenderFreeProcs;
else procgloballist = &ProcGlobal->freeProcs;

/* Try to get a proc struct from the appropriate free list. If this
* fails, we must be out of PGPROC structures (not to mention semaphores).
* While we are holding the ProcStructLock, also copy the current shared
* estimate of spins_per_delay to local storage. */
SpinLockAcquire(ProcStructLock);
set_spins_per_delay(ProcGlobal->spins_per_delay);
MyProc = *procgloballist;

if (MyProc != NULL){
*procgloballist = (PGPROC *) MyProc->links.next;
SpinLockRelease(ProcStructLock);
}else{
/* If we reach here, all the PGPROCs are in use. This is one of the
* possible places to detect "too many backends", so give the standard
* error message. XXX do we need to give a different failure message
* in the autovacuum case? */
SpinLockRelease(ProcStructLock);
if (am_walsender)
ereport(FATAL,(errcode(ERRCODE_TOO_MANY_CONNECTIONS),errmsg("number of requested standby connections exceeds max_wal_senders (currently %d)",max_wal_senders)));
ereport(FATAL,(errcode(ERRCODE_TOO_MANY_CONNECTIONS),errmsg("sorry, too many clients already")));
}
MyPgXact = &ProcGlobal->allPgXact[MyProc->pgprocno];

PostgreSQL数据库进程数据结构——ProcGlobal_数据结构_02


举报

相关推荐

0 条评论