GetSnapshotData函数返回运行事务的信息。返回的快照包含了事件范围(Event horizon) [ xmin(最低仍然运行的xact ID)、xmax(最高完成xact ID +1)] 和运行的xact IDs的列表范围(xmin <= xid <= xmax)。
- 所有xact IDs < xmin认为是完成的
- 所有xact IDs >= xmax认为是正在运行的
- 对于xact IDs在范围xmin <= xid < xmax认为需要查询运行的xact IDs的列表范围来决定运行或者完成
所有运行的top-level XID包含在快照中,除了lazy VACUUM进程。
All running top-level XIDs are included in the snapshot, except for lazy VACUUM processes. We also try to include running subtransaction XIDs, but since PGPROC has only a limited cache area for subxact XIDs, full information may not be available. If we find any overflowed subxid arrays, we have to mark the snapshot’s subxid data as overflowed, and extra work may need to be done to determine what’s running (see XidInMVCCSnapshot() in heapam_visibility.c).
该函数会更新如下的后台进程全局变量:
- TransactionXmin:使用中的当前事务的任何快照的最老的xmin,其和MyPgXact->xmin相同
- RecentXmin:为最近的快照计算出来的xmin,老于该XIDs的被认为是不运行的
- RecentGlobalXmin:全局xxmin(所有运行事务中的最老的TransactionXmin,除了运行中的lazy VACUUM进程),和GetOldestXmin(NULL, PROCARRAY_FLAGS_VACUUM)计算出来的相同
- RecentGlobalDataXmin:对于非系统表的全局xmin,>= RecentGlobalXmin
SnapshotData
typedef struct SnapshotData *Snapshot;
typedef struct SnapshotData
{
SnapshotType snapshot_type; /* type of snapshot */
TransactionId xmin; /* all XID < xmin are visible to me */
TransactionId xmax; /* all XID >= xmax are invisible to me */
TransactionId *xip; // 当前活跃事务的链表
uint32 xcnt; /* # of xact ids in xip[] */ //当前活跃事务的长度
/* For non-historic MVCC snapshots, this contains subxact IDs that are in progress (and other transactions that are in progress if taken during recovery). For historic snapshot it contains *all* xids assigned to the replayed transaction, including the toplevel xid.
* note: all ids in subxip[] are >= xmin, but we don't bother filtering out any that are >= xmax */
TransactionId *subxip; // 当前活跃子事务的链表
int32 subxcnt; /* # of xact ids in subxip[] */ // 当前活跃子事务个数
bool suboverflowed; /* has the subxip array overflowed? */
bool takenDuringRecovery; /* recovery-shaped snapshot? */
bool copied; /* false if it's a static snapshot */
CommandId curcid; /* in my xact, CID < curcid are visible */ // 当前命令的序号
/* An extra return value for HeapTupleSatisfiesDirty, not used in MVCC snapshots. */
uint32 speculativeToken;
/* Book-keeping information, used by the snapshot manager */
uint32 active_count; /* refcount on ActiveSnapshot stack */ // 在活跃快照链表里的引用计数
uint32 regd_count; /* refcount on RegisteredSnapshots */ // 在已注册的活跃快照链表里的引用计数
pairingheap_node ph_node; /* link in the RegisteredSnapshots heap */
TimestampTz whenTaken; /* timestamp when snapshot was taken */
XLogRecPtr lsn; /* position in the WAL stream when taken */
} SnapshotData;
函数分析
为快照中的xip当前获取事务的链表和subxip分配内存
Snapshot GetSnapshotData(Snapshot snapshot) {
ProcArrayStruct *arrayP = procArray;
TransactionId xmin; TransactionId xmax; TransactionId globalxmin;
int index;
int count = 0;
int subcount = 0;
bool suboverflowed = false;
TransactionId replication_slot_xmin = InvalidTransactionId;
TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
Assert(snapshot != NULL);
if (snapshot->xip == NULL){ // 当前获取事务的链表
snapshot->xip = (TransactionId *)malloc(GetMaxSnapshotXidCount() * sizeof(TransactionId));
if (snapshot->xip == NULL)
ereport(ERROR,(errcode(ERRCODE_OUT_OF_MEMORY),errmsg("out of memory")));
Assert(snapshot->subxip == NULL);
snapshot->subxip = (TransactionId *)malloc(GetMaxSnapshotSubxidCount() * sizeof(TransactionId));
if (snapshot->subxip == NULL)
ereport(ERROR,(errcode(ERRCODE_OUT_OF_MEMORY),errmsg("out of memory")));
}
获取ProcArrayLock共享锁LWLockAcquire(ProcArrayLock, LW_SHARED);
获取xmax(最高完成xact ID +1)xmax = ShmemVariableCache->latestCompletedXid; TransactionIdAdvance(xmax);
RecoveryInProgress --> false
通过arrayP->pgprocnos数组遍历PgXact的序号,获取PGXACT结构体。
/* initialize xmin calculation with xmax */
globalxmin = xmin = xmax;
int *pgprocnos = arrayP->pgprocnos;
int numProcs = arrayP->numProcs;
for (index = 0; index < numProcs; index++)
{
int pgprocno = pgprocnos[index];
PGXACT *pgxact = &allPgXact[pgprocno];
TransactionId xid;
剔除lazy vaccum和管理xmin逻辑解码的后端进程
/* Skip over backends doing logical decoding which manages xmin separately (check below) and ones running LAZY VACUUM. */
if (pgxact->vacuumFlags & (PROC_IN_LOGICAL_DECODING | PROC_IN_VACUUM))
continue;
将globalxmin更新为最小的有效xmin
/* Update globalxmin to be the smallest valid xmin */
xid = UINT32_ACCESS_ONCE(pgxact->xmin);
if (TransactionIdIsNormal(xid) && NormalTransactionIdPrecedes(xid, globalxmin))
globalxmin = xid;
获取进程的xid,剔除没有XID的和XID大于xmax的
/* Fetch xid just once - see GetNewTransactionId */
xid = UINT32_ACCESS_ONCE(pgxact->xid);
/* If the transaction has no XID assigned, we can skip it; it won't have sub-XIDs either. If the XID is >= xmax, we can also skip it; such transactions will be treated as running anyway (and any sub-XIDs will also be >= xmax). */
if (!TransactionIdIsNormal(xid) || !NormalTransactionIdPrecedes(xid, xmax))
continue;
将xmin更新为最小的有效xmin
/* We don't include our own XIDs (if any) in the snapshot, but we must include them in xmin. */
if (NormalTransactionIdPrecedes(xid, xmin))
xmin = xid;
剔除当前的后端继续往下运行
if (pgxact == MyPgXact)
continue;
向运行的xact IDs的列表范围添加xid
/* Add XID to snapshot. */
snapshot->xip[count++] = xid;
保存子事务XID,subxact XID必定是落后于其parent,所以不需要检查其和xmin的大小关系。通过xmax进行过滤,当持有ProcArrayLock时最好不要这么重的任务。其他backedn能并发添加subxids,但是不能删除任何一个。因此需要一次获取所有的subxid。使用memcpy是安全的。
/* Save subtransaction XIDs if possible (if we've already overflowed, there's no point). Note that the subxact XIDs must be later than their parent, so no need to check them against xmin. We could filter against xmax, but it seems better not to do that much work while holding the ProcArrayLock.
* The other backend can add more subxids concurrently, but cannot remove any. Hence it's important to fetch nxids just once. Should be safe to use memcpy, though. (We needn't worry about missing any xids added concurrently, because they must postdate xmax.)
* Again, our own XIDs are not included in the snapshot. */
if (!suboverflowed){
if (pgxact->overflowed) suboverflowed = true;
else{
int nxids = pgxact->nxids;
if (nxids > 0){
PGPROC *proc = &allProcs[pgprocno];
pg_read_barrier(); /* pairs with GetNewTransactionId */
memcpy(snapshot->subxip + subcount, (void *) proc->subxids.xids, nxids * sizeof(TransactionId));
subcount += nxids;
RecoveryInProgress --> true
* We're in hot standby, so get XIDs from KnownAssignedXids.
*
* We store all xids directly into subxip[]. Here's why:
*
* In recovery we don't know which xids are top-level and which are
* subxacts, a design choice that greatly simplifies xid processing.
*
* It seems like we would want to try to put xids into xip[] only, but
* that is fairly small. We would either need to make that bigger or
* to increase the rate at which we WAL-log xid assignment; neither is
* an appealing choice.
*
* We could try to store xids into xip[] first and then into subxip[]
* if there are too many xids. That only works if the snapshot doesn't
* overflow because we do not search subxip[] in that case. A simpler
* way is to just store all xids in the subxact array because this is
* by far the bigger array. We just leave the xip array empty.
*
* Either way we need to change the way XidInMVCCSnapshot() works
* depending upon when the snapshot was taken, or change normal
* snapshot processing so it matches.
*
* Note: It is possible for recovery to end before we finish taking
* the snapshot, and for newly assigned transaction ids to be added to
* the ProcArray. xmax cannot change while we hold ProcArrayLock, so
* those newly added transaction ids would be filtered away, so we
* need not be concerned about them.
/* initialize xmin calculation with xmax */
globalxmin = xmin = xmax;
subcount = KnownAssignedXidsGetAndSetXmin(snapshot->subxip, &xmin,
xmax);
if (TransactionIdPrecedesOrEquals(xmin, procArray->lastOverflowedXid))
suboverflowed = true;
将procArray->replication_slot_xmin和procArray->replication_slot_catalog_xmin更新到局部变量中
/* Fetch into local variable while ProcArrayLock is held - the LWLockRelease below is a barrier, ensuring this happens inside the lock. */
replication_slot_xmin = procArray->replication_slot_xmin;
replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
if (!TransactionIdIsValid(MyPgXact->xmin)) MyPgXact->xmin = TransactionXmin = xmin;
LWLockRelease(ProcArrayLock);
最终更新globalxmin为全局最小的xmin,并且更新如下全局变量:
TransactionXmin:使用中的当前事务的任何快照的最老的xmin,其和MyPgXact->xmin相同;RecentXmin:为最近的快照计算出来的xmin,老于该XIDs的被认为是不运行的;RecentGlobalXmin:全局xxmin(所有运行事务中的最老的TransactionXmin,除了运行中的lazy VACUUM进程),和GetOldestXmin(NULL, PROCARRAY_FLAGS_VACUUM)计算出来的相同;RecentGlobalDataXmin:对于非系统表的全局xmin,>= RecentGlobalXmin
/* Update globalxmin to include actual process xids. This is a slightly different way of computing it than GetOldestXmin uses, but should give the same result. */
if (TransactionIdPrecedes(xmin, globalxmin))
globalxmin = xmin;
/* Update global variables too */
RecentGlobalXmin = globalxmin - vacuum_defer_cleanup_age;
if (!TransactionIdIsNormal(RecentGlobalXmin))
RecentGlobalXmin = FirstNormalTransactionId;
/* Check whether there's a replication slot requiring an older xmin. */
if (TransactionIdIsValid(replication_slot_xmin) && NormalTransactionIdPrecedes(replication_slot_xmin, RecentGlobalXmin))
RecentGlobalXmin = replication_slot_xmin;
/* Non-catalog tables can be vacuumed if older than this xid */
RecentGlobalDataXmin = RecentGlobalXmin;
/* Check whether there's a replication slot requiring an older catalog xmin. */
if (TransactionIdIsNormal(replication_slot_catalog_xmin) && NormalTransactionIdPrecedes(replication_slot_catalog_xmin, RecentGlobalXmin))
RecentGlobalXmin = replication_slot_catalog_xmin;
RecentXmin = xmin;
更新快照中的xmin、xmax、xcnt、subxcnt、suboverflowed、curcid等信息
->xmin = xmin;
snapshot->xmax = xmax;
snapshot->xcnt = count;
snapshot->subxcnt = subcount;
snapshot->suboverflowed = suboverflowed;
snapshot->curcid = GetCurrentCommandId(false);
/* This is a new snapshot, so set both refcounts are zero, and mark it as not copied in persistent memory. */
snapshot->active_count = 0;
snapshot->regd_count = 0;
snapshot->copied = false;
如果没有开启了old_snapshot_threshold GUC参数,将lsn和whenTaken设置为初始值。如果开启了,需要设置lsn和whenTaken为当前WAL位置和当前时间戳,然后调用MaintainOldSnapshotTimeMapping函数维护快照。
if (old_snapshot_threshold < 0){
/* If not using "snapshot too old" feature, fill related fields with dummy values that don't require any locking. */
snapshot->lsn = InvalidXLogRecPtr;
snapshot->whenTaken = 0;
}else{
/* Capture the current time and WAL stream location in case this snapshot becomes old enough to need to fall back on the special "old snapshot" logic. */
snapshot->lsn = GetXLogInsertRecPtr();
snapshot->whenTaken = GetSnapshotCurrentTimestamp();
MaintainOldSnapshotTimeMapping(snapshot->whenTaken, xmin);
}