0
点赞
收藏
分享

微信扫一扫

postgresql源码学习(27)—— 事务日志⑦-日志落盘上层函数 XLogFlush


一、 预备知识

1. XLOG什么时候需要落盘

  • 事务commit之前
  • log buffer被覆盖之前
  • 后台进程定期落盘

2. 两个核心结构体

这两个结构体定义代码在xlog.c,它们在日志落盘过程中非常重要,会反反复复出现。

  • XLogwrtRqst表示请求落盘的XLOG LSN
  • XLogwrtResult表示已经落盘的XLOG LSN

typedef struct XLogwrtRqst
{
	XLogRecPtr	Write;			/* last byte + 1 to write out */
	XLogRecPtr	Flush;			/* last byte + 1 to flush */
} XLogwrtRqst;

typedef struct XLogwrtResult
{
	XLogRecPtr	Write;			/* last byte + 1 written out */
	XLogRecPtr	Flush;			/* last byte + 1 flushed */
} XLogwrtResult;

Write与Flush的区别

  • Write表示在此位置之前的XLOG已经写入操作系统缓存(不确定是否落盘)。
  • Flush表示在此位置之前的XLOG已经落盘。
  • 一般的环境使用的都是同步提交,Write和Flush是相等的

postgresql源码学习(27)—— 事务日志⑦-日志落盘上层函数 XLogFlush_事务

源码中关于这两个结构体的注释很长,值得关注的是:

  • XLogwrtRqst与XLogwrtResult都是存放于共享内存中被所有进程共享的,所以在读写时需要加锁:读时需要info_lck或者WALWriteLock任意一个锁,写时两个锁都需要。
  • 每一个处理进程都有一个本地的LogwrtResult,这是共享LogwrtResult的一个copy,而XLogwrtRqst并没有本地copy。

 二、 调试XLogFlush函数

XLogFlush函数用于将record之前的所有XLog全部落盘,它是XLogWrite的上层函数,XLogWrite是真正的落盘函数,下一篇我们会讨论。

       在上面三种情况中,最简单也是最可控的就是commit之前的XLOG落盘,为了方便调试排除其他干扰因素,我们需要首先将后台落盘的进程walwriter挂起,以免在commit之前后台进程将XLOG进行落盘。后台进程挂起后,在commit之前,事务中的相关XLOG一定没有落盘,就可以很好地观察了。

1. 挂起walwriter进程

postgresql源码学习(27)—— 事务日志⑦-日志落盘上层函数 XLogFlush_源码学习_02

2. 设置断点

断点函数为XLogFlush

3. 执行insert操作(含commit)

insert into t_insert values(1,'11','12','13');

4. debug函数

postgresql源码学习(27)—— 事务日志⑦-日志落盘上层函数 XLogFlush_postgresql_03

函数入参,待刷入的LSN  24119528

postgresql源码学习(27)—— 事务日志⑦-日志落盘上层函数 XLogFlush_调试_04

        比较record与本地缓存的XLogwrtResult.Flush值。LogwrtResult.Flush表示已经刷入的LSN 24118560,小于24119528,说明入参的LSN 还未被刷入磁盘。如果已经刷入,则直接结束函数。

postgresql源码学习(27)—— 事务日志⑦-日志落盘上层函数 XLogFlush_源码学习_05

       LogwrtRqst由info_lck保护,读取需要先获取锁,如果没能获取,则循环等待。参考postgresql源码学习(21)—— 事务日志②-日志初始化_Hehuy

       代码注释有提到,fsync是一个昂贵的操作,因此我们尽量每次多刷一些。获取锁后if语句对比请求写入的LSN和入参LSN,并将请求写入的WriteRqstPtr 设为其中较大的值,可以多写一点。这里WriteRqstPtr的值更大,因此直接用这个值即可。  

这里遇到一个问题是调试过程中pg挂掉了,怀疑跟挂起了walwriter进程有关。下面重新debug了一次,所以LSN号变了。

postgresql源码学习(27)—— 事务日志⑦-日志落盘上层函数 XLogFlush_源码学习_06

加锁后获取全局XLogwrtResult,以更新本地XLogwrtResult。

postgresql源码学习(27)—— 事务日志⑦-日志落盘上层函数 XLogFlush_源码学习_07

       更新本地XLogwrtResult后再次判断record之前的XLOG是否已经落盘。这是一个典型的乐观锁方式,以此提高并发性。

这里如果没有挂起WAL后台进程,很有可能在调试过程中它已经把record对应LSN给落盘了,就会出现record<=LogwrtResult.Flush 退出循环的情况。因为我们挂起了WAL后台进程,目前record还是大于LogwrtResult.Flush,即还未落盘。

postgresql源码学习(27)—— 事务日志⑦-日志落盘上层函数 XLogFlush_事务日志_08

       再往下走,遇到一个老朋友WaitXLogInsertionsToFinish,这就是前篇提到的,日志落盘之前必须先确认相关进程XLOG都已写入WAL Buffer。


postgresql源码学习(27)—— 事务日志⑦-日志落盘上层函数 XLogFlush_事务_09

postgresql源码学习(27)—— 事务日志⑦-日志落盘上层函数 XLogFlush_调试_10

然后,我们需要获取WALWriteLock锁,前面提到过必须拿到两个锁才能更新。如果还没获取到,则进入下一次循环,继续等待。

       获取锁之后,需要再次读取全局XLogwrtResult,然后判断record之前的XLOG是否已经落盘。如果已经刷入,则释放锁,退出循环。

postgresql源码学习(27)—— 事务日志⑦-日志落盘上层函数 XLogFlush_事务_11

       判断有没有设置CommitDelay(组提交)、enableFsync是否为true,最小活跃事务数量是否大于CommitSiblings(值为5)。 这里CommitDelay=0 ,所以不会进这个if。如果进的话需要sleep CommitDelay的时间,另外再次调用WaitXLogInsertionsToFinish函数。

postgresql源码学习(27)—— 事务日志⑦-日志落盘上层函数 XLogFlush_调试_12

insertpos表示最旧的、仍在进行WAL写入的LSN,在这点之前的所有WAL数据均已写入WAL buffer,可以被刷入磁盘。

      因此将insertpos赋值给WriteRqst.Write和WriteRqst.Flush,表示希望将该LSN之前的数据全部刷入磁盘。

下面调用XLogWrite函数,真正将日志刷入磁盘(我们下篇来看)。

postgresql源码学习(27)—— 事务日志⑦-日志落盘上层函数 XLogFlush_postgresql_13

postgresql源码学习(27)—— 事务日志⑦-日志落盘上层函数 XLogFlush_源码学习_14

postgresql源码学习(27)—— 事务日志⑦-日志落盘上层函数 XLogFlush_源码学习_15

最后判断,确保已落盘的LSN大于入参LSN,否则说明落盘失败了。

三、 函数源码

/*
 * Ensure that all XLOG data through the given position is flushed to disk.
 *
 * NOTE: this differs from XLogWrite mainly in that the WALWriteLock is not
 * already held, and we try to avoid acquiring it if possible.
 */
void
XLogFlush(XLogRecPtr record)
{
    XLogRecPtr  WriteRqstPtr;
    XLogwrtRqst WriteRqst;

    /*
     * During REDO, we are reading not writing WAL.  Therefore, instead of
     * trying to flush the WAL, we should update minRecoveryPoint instead. We
     * test XLogInsertAllowed(), not InRecovery, because we need checkpointer
     * to act this way too, and because when it tries to write the
     * end-of-recovery checkpoint, it should indeed flush.
     */
    if (!XLogInsertAllowed())
    {
        UpdateMinRecoveryPoint(record, false);
        return;
    }

    /* Quick exit if already known flushed,比较record LSN与本地缓存的XLogwrtResult.Flush LSN值。如果已经刷入,则直接结束函数。 */
    if (record <= LogwrtResult.Flush)
        return;

#ifdef WAL_DEBUG
    if (XLOG_DEBUG)
        elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X",
             LSN_FORMAT_ARGS(record),
             LSN_FORMAT_ARGS(LogwrtResult.Write),
             LSN_FORMAT_ARGS(LogwrtResult.Flush));
#endif

    START_CRIT_SECTION();

    /*
     * fsync是一个昂贵的操作,因此我们尽量每次多刷一些。获取锁后if语句对比请求写入的LSN和入参LSN,并将请求写入的WriteRqstPtr 设为其中较大的值,可以多写一点。     
*/

    /* initialize to given target; may increase below,初始化请求写入的位置为传入的参数record */
    WriteRqstPtr = record;

    /*
     * Now wait until we get the write lock, or someone else does the flush for us. 读取需要先获取锁,如果没能获取,则循环等待。或者等到有别人(例如walwriter)替我们完成日志落盘。
     */
    for (;;)
    {
        XLogRecPtr  insertpos;

        /* read LogwrtResult and update local state,获取info_lck锁,读取全局变量XLogwrtRqst和XLogwrtResult,以更新本地XLogwrtResult */
        SpinLockAcquire(&XLogCtl->info_lck);
        if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write)
            WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
        LogwrtResult = XLogCtl->LogwrtResult;
        SpinLockRelease(&XLogCtl->info_lck);

        /* done already? 当前记录是否已经落盘?这就是所谓的等到有别人(例如walwriter)替我们完成日志落盘。*/
        if (record <= LogwrtResult.Flush)
            break;

        /*
         * Before actually performing the write, wait for all in-flight
         * insertions to the pages we're about to write to finish. 老朋友WaitXLogInsertionsToFinish,这就是前篇提到的,日志落盘之前必须先确认相关进程XLOG都已写入WAL Buffer。
         */
        insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr);

        /*
         * 然后,我们需要获取WALWriteLock锁,前面提到过必须拿到两个锁才能更新。如果还没获取到,则进入下一次循环,继续等待。
         */
        if (!LWLockAcquireOrWait(WALWriteLock, LW_EXCLUSIVE))
        {
            /*
             * The lock is now free, but we didn't acquire it yet. Before we
             * do, loop back to check if someone else flushed the record for
             * us already.
             */
            continue;
        }

        /* Got the lock; recheck whether request is satisfied. 获取锁之后,需要再次读取全局XLogwrtResult,然后判断record之前的XLOG是否已经落盘。如果已经刷入,则释放锁,退出循环。 */
        LogwrtResult = XLogCtl->LogwrtResult;
        if (record <= LogwrtResult.Flush)
        {
            LWLockRelease(WALWriteLock);
            break;
        }

        /*
         * 判断有没有设置CommitDelay(组提交)、enableFsync是否为true,最小活跃事务数量是否大于CommitSiblings
         */
        if (CommitDelay > 0 && enableFsync &&
            MinimumActiveBackends(CommitSiblings))
        {
            pg_usleep(CommitDelay);

            /*
             * 如果进这个if,需要sleep CommitDelay的时间,另外再次调用WaitXLogInsertionsToFinish函数。             
*/
            insertpos = WaitXLogInsertionsToFinish(insertpos);
        }

        /* try to write/flush later additions to XLOG as well,insertpos表示最旧的、仍在进行WAL写入的LSN,在这点之前的所有WAL数据均已写入WAL buffer,可以被刷入磁盘。因此将insertpos赋值给WriteRqst.Write和WriteRqst.Flush,表示希望将该LSN之前的数据全部刷入磁盘。*/
        WriteRqst.Write = insertpos;
        WriteRqst.Flush = insertpos;

// 调用XLogWrite函数,真正将日志刷入磁盘(我们下篇来看)
        XLogWrite(WriteRqst, false);
        // 释放锁
        LWLockRelease(WALWriteLock);
        /* done */
        break;
    }

    END_CRIT_SECTION();

    /* wake up walsenders now that we've released heavily contended locks */
    WalSndWakeupProcessRequests();

    /*
     * 最后判断,确保已落盘的LSN大于入参LSN,否则说明落盘失败了。
     */
    if (LogwrtResult.Flush < record)
        elog(ERROR,
             "xlog flush request %X/%X is not satisfied --- flushed only to %X/%X",
             LSN_FORMAT_ARGS(record),
             LSN_FORMAT_ARGS(LogwrtResult.Flush));
}


举报

相关推荐

0 条评论