一、 预备知识
1. XLOG什么时候需要落盘
- 事务commit之前
- log buffer被覆盖之前
- 后台进程定期落盘
2. 两个核心结构体
这两个结构体定义代码在xlog.c,它们在日志落盘过程中非常重要,会反反复复出现。
- XLogwrtRqst表示请求落盘的XLOG LSN
- XLogwrtResult表示已经落盘的XLOG LSN
typedef struct XLogwrtRqst
{
XLogRecPtr Write; /* last byte + 1 to write out */
XLogRecPtr Flush; /* last byte + 1 to flush */
} XLogwrtRqst;
typedef struct XLogwrtResult
{
XLogRecPtr Write; /* last byte + 1 written out */
XLogRecPtr Flush; /* last byte + 1 flushed */
} XLogwrtResult;
Write与Flush的区别
- Write表示在此位置之前的XLOG已经写入操作系统缓存(不确定是否落盘)。
- Flush表示在此位置之前的XLOG已经落盘。
- 一般的环境使用的都是同步提交,Write和Flush是相等的
源码中关于这两个结构体的注释很长,值得关注的是:
- XLogwrtRqst与XLogwrtResult都是存放于共享内存中被所有进程共享的,所以在读写时需要加锁:读时需要info_lck或者WALWriteLock任意一个锁,写时两个锁都需要。
- 每一个处理进程都有一个本地的LogwrtResult,这是共享LogwrtResult的一个copy,而XLogwrtRqst并没有本地copy。
二、 调试XLogFlush函数
XLogFlush函数用于将record之前的所有XLog全部落盘,它是XLogWrite的上层函数,XLogWrite是真正的落盘函数,下一篇我们会讨论。
在上面三种情况中,最简单也是最可控的就是commit之前的XLOG落盘,为了方便调试排除其他干扰因素,我们需要首先将后台落盘的进程walwriter挂起,以免在commit之前后台进程将XLOG进行落盘。后台进程挂起后,在commit之前,事务中的相关XLOG一定没有落盘,就可以很好地观察了。
1. 挂起walwriter进程
2. 设置断点
断点函数为XLogFlush
3. 执行insert操作(含commit)
insert into t_insert values(1,'11','12','13');
4. debug函数
函数入参,待刷入的LSN 24119528
比较record与本地缓存的XLogwrtResult.Flush值。LogwrtResult.Flush表示已经刷入的LSN 24118560,小于24119528,说明入参的LSN 还未被刷入磁盘。如果已经刷入,则直接结束函数。
LogwrtRqst由info_lck保护,读取需要先获取锁,如果没能获取,则循环等待。参考postgresql源码学习(21)—— 事务日志②-日志初始化_Hehuy
代码注释有提到,fsync是一个昂贵的操作,因此我们尽量每次多刷一些。获取锁后if语句对比请求写入的LSN和入参LSN,并将请求写入的WriteRqstPtr 设为其中较大的值,可以多写一点。这里WriteRqstPtr的值更大,因此直接用这个值即可。
这里遇到一个问题是调试过程中pg挂掉了,怀疑跟挂起了walwriter进程有关。下面重新debug了一次,所以LSN号变了。
加锁后获取全局XLogwrtResult,以更新本地XLogwrtResult。
更新本地XLogwrtResult后再次判断record之前的XLOG是否已经落盘。这是一个典型的乐观锁方式,以此提高并发性。
这里如果没有挂起WAL后台进程,很有可能在调试过程中它已经把record对应LSN给落盘了,就会出现record<=LogwrtResult.Flush 退出循环的情况。因为我们挂起了WAL后台进程,目前record还是大于LogwrtResult.Flush,即还未落盘。
再往下走,遇到一个老朋友WaitXLogInsertionsToFinish,这就是前篇提到的,日志落盘之前必须先确认相关进程XLOG都已写入WAL Buffer。
然后,我们需要获取WALWriteLock锁,前面提到过必须拿到两个锁才能更新。如果还没获取到,则进入下一次循环,继续等待。
获取锁之后,需要再次读取全局XLogwrtResult,然后判断record之前的XLOG是否已经落盘。如果已经刷入,则释放锁,退出循环。
判断有没有设置CommitDelay(组提交)、enableFsync是否为true,最小活跃事务数量是否大于CommitSiblings(值为5)。 这里CommitDelay=0 ,所以不会进这个if。如果进的话需要sleep CommitDelay的时间,另外再次调用WaitXLogInsertionsToFinish函数。
insertpos表示最旧的、仍在进行WAL写入的LSN,在这点之前的所有WAL数据均已写入WAL buffer,可以被刷入磁盘。
因此将insertpos赋值给WriteRqst.Write和WriteRqst.Flush,表示希望将该LSN之前的数据全部刷入磁盘。
下面调用XLogWrite函数,真正将日志刷入磁盘(我们下篇来看)。
最后判断,确保已落盘的LSN大于入参LSN,否则说明落盘失败了。
三、 函数源码
/*
* Ensure that all XLOG data through the given position is flushed to disk.
*
* NOTE: this differs from XLogWrite mainly in that the WALWriteLock is not
* already held, and we try to avoid acquiring it if possible.
*/
void
XLogFlush(XLogRecPtr record)
{
XLogRecPtr WriteRqstPtr;
XLogwrtRqst WriteRqst;
/*
* During REDO, we are reading not writing WAL. Therefore, instead of
* trying to flush the WAL, we should update minRecoveryPoint instead. We
* test XLogInsertAllowed(), not InRecovery, because we need checkpointer
* to act this way too, and because when it tries to write the
* end-of-recovery checkpoint, it should indeed flush.
*/
if (!XLogInsertAllowed())
{
UpdateMinRecoveryPoint(record, false);
return;
}
/* Quick exit if already known flushed,比较record LSN与本地缓存的XLogwrtResult.Flush LSN值。如果已经刷入,则直接结束函数。 */
if (record <= LogwrtResult.Flush)
return;
#ifdef WAL_DEBUG
if (XLOG_DEBUG)
elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X",
LSN_FORMAT_ARGS(record),
LSN_FORMAT_ARGS(LogwrtResult.Write),
LSN_FORMAT_ARGS(LogwrtResult.Flush));
#endif
START_CRIT_SECTION();
/*
* fsync是一个昂贵的操作,因此我们尽量每次多刷一些。获取锁后if语句对比请求写入的LSN和入参LSN,并将请求写入的WriteRqstPtr 设为其中较大的值,可以多写一点。
*/
/* initialize to given target; may increase below,初始化请求写入的位置为传入的参数record */
WriteRqstPtr = record;
/*
* Now wait until we get the write lock, or someone else does the flush for us. 读取需要先获取锁,如果没能获取,则循环等待。或者等到有别人(例如walwriter)替我们完成日志落盘。
*/
for (;;)
{
XLogRecPtr insertpos;
/* read LogwrtResult and update local state,获取info_lck锁,读取全局变量XLogwrtRqst和XLogwrtResult,以更新本地XLogwrtResult */
SpinLockAcquire(&XLogCtl->info_lck);
if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write)
WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
LogwrtResult = XLogCtl->LogwrtResult;
SpinLockRelease(&XLogCtl->info_lck);
/* done already? 当前记录是否已经落盘?这就是所谓的等到有别人(例如walwriter)替我们完成日志落盘。*/
if (record <= LogwrtResult.Flush)
break;
/*
* Before actually performing the write, wait for all in-flight
* insertions to the pages we're about to write to finish. 老朋友WaitXLogInsertionsToFinish,这就是前篇提到的,日志落盘之前必须先确认相关进程XLOG都已写入WAL Buffer。
*/
insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr);
/*
* 然后,我们需要获取WALWriteLock锁,前面提到过必须拿到两个锁才能更新。如果还没获取到,则进入下一次循环,继续等待。
*/
if (!LWLockAcquireOrWait(WALWriteLock, LW_EXCLUSIVE))
{
/*
* The lock is now free, but we didn't acquire it yet. Before we
* do, loop back to check if someone else flushed the record for
* us already.
*/
continue;
}
/* Got the lock; recheck whether request is satisfied. 获取锁之后,需要再次读取全局XLogwrtResult,然后判断record之前的XLOG是否已经落盘。如果已经刷入,则释放锁,退出循环。 */
LogwrtResult = XLogCtl->LogwrtResult;
if (record <= LogwrtResult.Flush)
{
LWLockRelease(WALWriteLock);
break;
}
/*
* 判断有没有设置CommitDelay(组提交)、enableFsync是否为true,最小活跃事务数量是否大于CommitSiblings
*/
if (CommitDelay > 0 && enableFsync &&
MinimumActiveBackends(CommitSiblings))
{
pg_usleep(CommitDelay);
/*
* 如果进这个if,需要sleep CommitDelay的时间,另外再次调用WaitXLogInsertionsToFinish函数。
*/
insertpos = WaitXLogInsertionsToFinish(insertpos);
}
/* try to write/flush later additions to XLOG as well,insertpos表示最旧的、仍在进行WAL写入的LSN,在这点之前的所有WAL数据均已写入WAL buffer,可以被刷入磁盘。因此将insertpos赋值给WriteRqst.Write和WriteRqst.Flush,表示希望将该LSN之前的数据全部刷入磁盘。*/
WriteRqst.Write = insertpos;
WriteRqst.Flush = insertpos;
// 调用XLogWrite函数,真正将日志刷入磁盘(我们下篇来看)
XLogWrite(WriteRqst, false);
// 释放锁
LWLockRelease(WALWriteLock);
/* done */
break;
}
END_CRIT_SECTION();
/* wake up walsenders now that we've released heavily contended locks */
WalSndWakeupProcessRequests();
/*
* 最后判断,确保已落盘的LSN大于入参LSN,否则说明落盘失败了。
*/
if (LogwrtResult.Flush < record)
elog(ERROR,
"xlog flush request %X/%X is not satisfied --- flushed only to %X/%X",
LSN_FORMAT_ARGS(record),
LSN_FORMAT_ARGS(LogwrtResult.Flush));
}