http://www.light-pg.com/docs/lightdb/13.3-22.2/sql-notify.html
http://www.light-pg.com/docs/lightdb/13.3-22.2/sql-listen.html
https://wiki.postgresql.org/wiki/PgNotificationHelper
https://jdbc.postgresql.org/documentation/head/listennotify.html
https://tapoueh.org/blog/2018/07/postgresql-listen-notify/
在写入clog/xact前那一刻,内核会将通知加入队列。如下:
xact.c
static void
CommitTransaction(void)
{
......
/*
* Insert notifications sent by NOTIFY commands into the queue. This
* should be late in the pre-commit sequence to minimize time spent
* holding the notify-insertion lock. However, this could result in
* creating a snapshot, so we must do it before serializable cleanup.
*/
PreCommit_Notify();
......
asyc.c负责notify相关的实现:
/*
* PreCommit_Notify
*
* This is called at transaction commit, before actually committing to
* clog.
*
* If there are pending LISTEN actions, make sure we are listed in the
* shared-memory listener array. This must happen before commit to
* ensure we don't miss any notifies from transactions that commit
* just after ours.
*
* If there are outbound notify requests in the pendingNotifies list,
* add them to the global queue. We do that before commit so that
* we can still throw error if we run out of queue space.
*/
void
PreCommit_Notify(void)
{
ListCell *p;
if (!pendingActions && !pendingNotifies)
return; /* no relevant statements in this xact */
if (Trace_notify)
elog(DEBUG1, "PreCommit_Notify");
/* Preflight for any pending listen/unlisten actions */
if (pendingActions != NULL)
{
foreach(p, pendingActions->actions)
{
ListenAction *actrec = (ListenAction *) lfirst(p);
switch (actrec->action)
{
case LISTEN_LISTEN:
Exec_ListenPreCommit();
break;
case LISTEN_UNLISTEN:
/* there is no Exec_UnlistenPreCommit() */
break;
case LISTEN_UNLISTEN_ALL:
/* there is no Exec_UnlistenAllPreCommit() */
break;
}
}
}
/* Queue any pending notifies (must happen after the above) */
if (pendingNotifies)
{
ListCell *nextNotify;
/*
* Make sure that we have an XID assigned to the current transaction.
* GetCurrentTransactionId is cheap if we already have an XID, but not
* so cheap if we don't, and we'd prefer not to do that work while
* holding NotifyQueueLock.
*/
(void) GetCurrentTransactionId();
/*
* Serialize writers by acquiring a special lock that we hold till
* after commit. This ensures that queue entries appear in commit
* order, and in particular that there are never uncommitted queue
* entries ahead of committed ones, so an uncommitted transaction
* can't block delivery of deliverable notifications.
*
* We use a heavyweight lock so that it'll automatically be released
* after either commit or abort. This also allows deadlocks to be
* detected, though really a deadlock shouldn't be possible here.
*
* The lock is on "database 0", which is pretty ugly but it doesn't
* seem worth inventing a special locktag category just for this.
* (Historical note: before PG 9.0, a similar lock on "database 0" was
* used by the flatfiles mechanism.)
*/
LockSharedObject(DatabaseRelationId, InvalidOid, 0,
AccessExclusiveLock);
/* Now push the notifications into the queue */
backendHasSentNotifications = true;
nextNotify = list_head(pendingNotifies->events);
while (nextNotify != NULL)
{
/*
* Add the pending notifications to the queue. We acquire and
* release NotifyQueueLock once per page, which might be overkill
* but it does allow readers to get in while we're doing this.
*
* A full queue is very uncommon and should really not happen,
* given that we have so much space available in the SLRU pages.
* Nevertheless we need to deal with this possibility. Note that
* when we get here we are in the process of committing our
* transaction, but we have not yet committed to clog, so at this
* point in time we can still roll the transaction back.
*/
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
asyncQueueFillWarning();
if (asyncQueueIsFull())
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("too many notifications in the NOTIFY queue")));
nextNotify = asyncQueueAddEntries(nextNotify);
LWLockRelease(NotifyQueueLock);
}
}
}
调用RecordTransactionCommit()(在此之前,WAL记录已经刷新到pg_wal中)更新事务的提交状态到pg_xact后,会调用AtCommit_Notify发送通知。如下:
true);
AtCommit_Notify();
AtEOXact_GUC(true, 1);
AtEOXact_SPI(true);
async.c中:
/*
* AtCommit_Notify
*
* This is called at transaction commit, after committing to clog.
*
* Update listenChannels and clear transaction-local state.
*/
void
AtCommit_Notify(void)
{
ListCell *p;
/*
* Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
* return as soon as possible
*/
if (!pendingActions && !pendingNotifies)
return;
if (Trace_notify)
elog(DEBUG1, "AtCommit_Notify");
/* Perform any pending listen/unlisten actions */
if (pendingActions != NULL)
{
foreach(p, pendingActions->actions)
{
ListenAction *actrec = (ListenAction *) lfirst(p);
switch (actrec->action)
{
case LISTEN_LISTEN:
Exec_ListenCommit(actrec->channel);
break;
case LISTEN_UNLISTEN:
Exec_UnlistenCommit(actrec->channel);
break;
case LISTEN_UNLISTEN_ALL:
Exec_UnlistenAllCommit();
break;
}
}
}
/* If no longer listening to anything, get out of listener array */
if (amRegisteredListener && listenChannels == NIL)
asyncQueueUnregister();
/* And clean up */
ClearPendingActionsAndNotifies();
}
LightDB Enterprise Postgres--金融级关系型数据库,更快、更稳、更懂金融!