0
点赞
收藏
分享

微信扫一扫

PG虚拟文件描述符(VFD)机制——封装的文件接口:postgresql-8.4.1/src/backend/storage/file/fd.c

左小米z 2022-07-28 阅读 3


本篇博客讲解fd.c文件中对C函数库文件操作API的相关封装。(相关C函数库文件操作API参见博主linux分类下的文章)InitFileAccess函数用于在postgresql启动时初始化VFD LRU池,并向系统注册proc-exit勾子以确保退出时清理临时文件。
InitFileAccess函数在后端启动初始化阶段调用(normal or standalone backend),在数据库运行过程中只能调用一次。主要用于VFD LRU池中的头元素的空间,并设置LRU池的大小为1。最后注册proc-exit勾子以帮助确保退出时临时文件丢弃(register proc-exit hook to ensure temp files are dropped at exit)。on_proc_exit向proc_exit()函数调用的函数列表中添加回调函数。

/*
* InitFileAccess --- initialize this module during backend startup
*
* This is called during either normal or standalone backend start.
* It is *not* called in the postmaster.
*/
void
InitFileAccess(void)
{
Assert(SizeVfdCache == 0); /* call me only once */

/* initialize cache header entry */
VfdCache = (Vfd *) malloc(sizeof(Vfd));
if (VfdCache == NULL)
ereport(FATAL,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));

MemSet((char *) &(VfdCache[0]), 0, sizeof(Vfd));
VfdCache->fd = VFD_CLOSED;

SizeVfdCache = 1;

/* register proc-exit hook to ensure temp files are dropped at exit */
on_proc_exit(AtProcExit_Files, 0);
}

函数on_proc_exit处于backend/storage/ipc/ipc.c文件中

PG虚拟文件描述符(VFD)机制——封装的文件接口:postgresql-8.4.1/src/backend/storage/file/fd.c_临时文件

回调函数指针类型

PG虚拟文件描述符(VFD)机制——封装的文件接口:postgresql-8.4.1/src/backend/storage/file/fd.c_临时文件_02


需要注册的回调函数,用于在关闭后端时清理临时文件(包括interXact文件)

/*
* AtProcExit_Files
*
* on_proc_exit hook to clean up temp files during backend shutdown.
* Here, we want to clean up *all* temp files including interXact ones.
*/
static void
AtProcExit_Files(int code, Datum arg)
{
CleanupTempFiles(true);
}

BasicOpenFile — 除了可以根据需要释放其他FD,该函数与open(2)相同
导出该文件供真正需要普通内核FD的地方使用,但需要证明不会耗尽FD。成功返回FD之后,调用者有责任确保它不会在ereport()上泄漏! 大多数用户不应该直接调用该例程,而应使用VFD抽象级别,该级别提供了防止描述符泄漏以及对需要短时间打开的文件进行管理保护。 理想情况下,这应该是后端中open()的* only *直接调用。 实际上,postmaster直接调用open(),并且在后端启动的早期就完成了一些直接的open()调用。 这样就可以了,因为无论如何该模块都不会关闭任何打开的文件。

也就是该模块不使用Lru池功能,直接用C函数库API open打开文件。但是如果系统fd不足,可能需要释放lru池中的FD,并重新调用open。

int BasicOpenFile(FileName fileName, int fileFlags, int fileMode)
{
int fd;

tryAgain:
fd = open(fileName, fileFlags, fileMode);

if (fd >= 0)
return fd; /* success! */

if (errno == EMFILE || errno == ENFILE)
{
int save_errno = errno;

ereport(LOG,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("out of file descriptors: %m; release and retry")));
errno = 0;
if (ReleaseLruFile())
goto tryAgain;
errno = save_errno;
}

return -1; /* failure */
}

FileAccess成功返回0,重新打开失败返回-1且设置errno。如果文件没有打开(不拥有FD,即FD为-1),使用LruInsert函数(函数中BasicOpenFile打开文件,并将fd放入vfd中对应的成员中)。如果打开了且不是最近使用的vfd,需要将该vfd移动到LRU池的头部称为最近使用的。

static int FileAccess(File file)
{
int returnValue;

DO_DB(elog(LOG, "FileAccess %d (%s)",
file, VfdCache[file].fileName));

/*
* Is the file open? If not, open it and put it at the head of the LRU
* ring (possibly closing the least recently used file to get an FD).
*/

if (FileIsNotOpen(file))
{
returnValue = LruInsert(file);
if (returnValue != 0)
return returnValue;
}
else if (VfdCache[0].lruLessRecently != file)
{
/*
* We now know that the file is open and that it is not the last one
* accessed, so we need to move it to the head of the Lru ring.
*/

Delete(file);
Insert(file);
}

return 0;
}

PG虚拟文件描述符(VFD)机制——封装的文件接口:postgresql-8.4.1/src/backend/storage/file/fd.c_sed_03


PathNameOpenFile函数使用绝对路径打开文件,并使用打开模式和标志参数。当传入相对参数,将会使用进程工作目录的路径作为前缀($PGDATA中存储的路径)。一般流程:分配VFD,使用BasicOpenFile打开文件,将fd与VFD关联,将vfd插入LRU池,配置VFDD的参数(取消O_CREAT | O_TRUNC | O_EXCL模式)。

File PathNameOpenFile(FileName fileName, int fileFlags, int fileMode)
{
char *fnamecopy;
File file;
Vfd *vfdP;

DO_DB(elog(LOG, "PathNameOpenFile: %s %x %o",
fileName, fileFlags, fileMode));

/*
* We need a malloc'd copy of the file name; fail cleanly if no room.
*/
fnamecopy = strdup(fileName);
if (fnamecopy == NULL)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));

file = AllocateVfd();
vfdP = &VfdCache[file];

while (nfile + numAllocatedDescs >= max_safe_fds)
{
if (!ReleaseLruFile())
break;
}

vfdP->fd = BasicOpenFile(fileName, fileFlags, fileMode);

if (vfdP->fd < 0)
{
FreeVfd(file);
free(fnamecopy);
return -1;
}
++nfile;
DO_DB(elog(LOG, "PathNameOpenFile: success %d",
vfdP->fd));

Insert(file);

vfdP->fileName = fnamecopy;
/* Saved flags are adjusted to be OK for re-opening file */
vfdP->fileFlags = fileFlags & ~(O_CREAT | O_TRUNC | O_EXCL);
vfdP->fileMode = fileMode;
vfdP->seekPos = 0;
vfdP->fdstate = 0x0;

return file;
}

FileClosse关闭文件,如果文件打开,则从lru池中删除相应的VFD节点,关闭相应的fd,减少nfile计数。如果文件是临时文件则删除临时文件(vfdP->fdstate & FD_TEMPORARY)。将vfd放入空闲链表。

void FileClose(File file)
{
Vfd *vfdP;
struct stat filestats;

Assert(FileIsValid(file));

DO_DB(elog(LOG, "FileClose: %d (%s)",
file, VfdCache[file].fileName));

vfdP = &VfdCache[file];

if (!FileIsNotOpen(file))
{
/* remove the file from the lru ring */
Delete(file);

/* close the file */
if (close(vfdP->fd))
elog(ERROR, "could not close file \"%s\": %m", vfdP->fileName);

--nfile;
vfdP->fd = VFD_CLOSED;
}

/*
* Delete the file if it was temporary
*/
if (vfdP->fdstate & FD_TEMPORARY)
{
/* reset flag so that die() interrupt won't cause problems */
vfdP->fdstate &= ~FD_TEMPORARY;
if (log_temp_files >= 0)
{
if (stat(vfdP->fileName, &filestats) == 0)
{
if (filestats.st_size >= log_temp_files)
ereport(LOG,
(errmsg("temporary file: path \"%s\", size %lu",
vfdP->fileName,
(unsigned long) filestats.st_size)));
}
else
elog(LOG, "could not stat file \"%s\": %m", vfdP->fileName);
}
if (unlink(vfdP->fileName))
elog(LOG, "could not unlink file \"%s\": %m", vfdP->fileName);
}

/*
* Return the Vfd slot to the free list
*/
FreeVfd(file);
}

FileInvalidate函数检查file对应vfd的fd是否为VFD_CLOSED,并从Lru池中删除该vfd

1 FileInvalidate(File file)
2 {
3 Assert(FileIsValid(file));
4 if (!FileIsNotOpen(file))
5 LruDelete(file);
6 }
#define
#define

FilePrefetch-启动文件给定范围的异步读取(initiate asynchronous read of a given range of the file)。 逻辑查找位置logical seek position不受影响。 当前,此功能的唯一实现是使用posix_fadvise,它是完成此功能的最简单的标准化接口。 我们将来可以使用libaio添加一个实现。 但请注意,此API不适合libaio,因为libaio希望提供一个缓冲区以供读取。

int FilePrefetch(File file, off_t offset, int amount)
{
#if
int returnCode;

Assert(FileIsValid(file));

DO_DB(elog(LOG, "FilePrefetch: %d (%s) " INT64_FORMAT " %d",
file, VfdCache[file].fileName,
(int64) offset, amount));

returnCode = FileAccess(file);
if (returnCode < 0)
return returnCode;

returnCode = posix_fadvise(VfdCache[file].fd, offset, amount,
POSIX_FADV_WILLNEED);

return returnCode;
#else
Assert(FileIsValid(file));
return 0;
#endif
}

int FileWrite(File file, char *buffer, int amount)
{
int returnCode;

Assert(FileIsValid(file));

DO_DB(elog(LOG, "FileWrite: %d (%s) " INT64_FORMAT " %d %p",
file, VfdCache[file].fileName,
(int64) VfdCache[file].seekPos,
amount, buffer));

returnCode = FileAccess(file);
if (returnCode < 0)
return returnCode;

retry:
errno = 0;
returnCode = write(VfdCache[file].fd, buffer, amount);

/* if write didn't set errno, assume problem is no disk space */
if (returnCode != amount && errno == 0)
errno = ENOSPC;

if (returnCode >= 0)
VfdCache[file].seekPos += returnCode;
else
{
/*
* See comments in FileRead()
*/
#ifdef
DWORD error = GetLastError();

switch (error)
{
case ERROR_NO_SYSTEM_RESOURCES:
pg_usleep(1000L);
errno = EINTR;
break;
default:
_dosmaperr(error);
break;
}
#endif
/* OK to retry if interrupted */
if (errno == EINTR)
goto retry;

/* Trouble, so assume we don't know the file position anymore */
VfdCache[file].seekPos = FileUnknownPos;
}

return returnCode;
}

int FileSync(File file)
{
int returnCode;

Assert(FileIsValid(file));

DO_DB(elog(LOG, "FileSync: %d (%s)",
file, VfdCache[file].fileName));

returnCode = FileAccess(file);
if (returnCode < 0)
return returnCode;

return pg_fsync(VfdCache[file].fd);
}

off_t FileSeek(File file, off_t offset, int whence)
{
int returnCode;

Assert(FileIsValid(file));

DO_DB(elog(LOG, "FileSeek: %d (%s) " INT64_FORMAT " " INT64_FORMAT " %d",
file, VfdCache[file].fileName,
(int64) VfdCache[file].seekPos,
(int64) offset, whence));

if (FileIsNotOpen(file))
{
switch (whence)
{
case SEEK_SET:
if (offset < 0)
elog(ERROR, "invalid seek offset: " INT64_FORMAT,
(int64) offset);
VfdCache[file].seekPos = offset;
break;
case SEEK_CUR:
VfdCache[file].seekPos += offset;
break;
case SEEK_END:
returnCode = FileAccess(file);
if (returnCode < 0)
return returnCode;
VfdCache[file].seekPos = lseek(VfdCache[file].fd,
offset, whence);
break;
default:
elog(ERROR, "invalid whence: %d", whence);
break;
}
}
else
{
switch (whence)
{
case SEEK_SET:
if (offset < 0)
elog(ERROR, "invalid seek offset: " INT64_FORMAT,
(int64) offset);
if (VfdCache[file].seekPos != offset)
VfdCache[file].seekPos = lseek(VfdCache[file].fd,
offset, whence);
break;
case SEEK_CUR:
if (offset != 0 || VfdCache[file].seekPos == FileUnknownPos)
VfdCache[file].seekPos = lseek(VfdCache[file].fd,
offset, whence);
break;
case SEEK_END:
VfdCache[file].seekPos = lseek(VfdCache[file].fd,
offset, whence);
break;
default:
elog(ERROR, "invalid whence: %d", whence);
break;
}
}
return VfdCache[file].seekPos;
}

off_t FileTell(File file)
{
Assert(FileIsValid(file));
DO_DB(elog(LOG, "FileTell %d (%s)",
file, VfdCache[file].fileName));
return VfdCache[file].seekPos;
}
#endif


举报

相关推荐

0 条评论