@ -75,8 +75,10 @@
* list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal
* to every listening backend ( we don ' t know which backend is listening on
* which channel so we must signal them all ) . We can exclude backends that
* are already up to date , though . We don ' t bother with a self - signal
* either , but just process the queue directly .
* are already up to date , though , and we can also exclude backends that
* are in other databases ( unless they are way behind and should be kicked
* to make them advance their pointers ) . We don ' t bother with a
* self - signal either , but just process the queue directly .
*
* 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal , the signal handler
* sets the process ' s latch , which triggers the event to be processed
@ -89,13 +91,14 @@
* Inbound - notify processing consists of reading all of the notifications
* that have arrived since scanning last time . We read every notification
* until we reach either a notification from an uncommitted transaction or
* the head pointer ' s position . Then we check if we were the laziest
* backend : if our pointer is set to the same position as the global tail
* pointer is set , then we move the global tail pointer ahead to where the
* second - laziest backend is ( in general , we take the MIN of the current
* head position and all active backends ' new tail pointers ) . Whenever we
* move the global tail pointer we also truncate now - unused pages ( i . e . ,
* delete files in pg_notify / that are no longer used ) .
* the head pointer ' s position .
*
* 6. To avoid SLRU wraparound and limit disk space consumption , the tail
* pointer needs to be advanced so that old pages can be truncated .
* This is relatively expensive ( notably , it requires an exclusive lock ) ,
* so we don ' t want to do it often . We make sending backends do this work
* if they advanced the queue head into a new page , but only once every
* QUEUE_CLEANUP_DELAY pages .
*
* An application that listens on the same channel it notifies will get
* NOTIFY messages for its own NOTIFYs . These can be ignored , if not useful ,
@ -211,6 +214,19 @@ typedef struct QueuePosition
( x ) . page ! = ( y ) . page ? ( x ) : \
( x ) . offset > ( y ) . offset ? ( x ) : ( y ) )
/*
* Parameter determining how often we try to advance the tail pointer :
* we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data . This is
* also the distance by which a backend in another database needs to be
* behind before we ' ll decide we need to wake it up to advance its pointer .
*
* Resist the temptation to make this really large . While that would save
* work in some places , it would add cost in others . In particular , this
* should likely be less than NUM_ASYNC_BUFFERS , to ensure that backends
* catch up before the pages they ' ll need to read fall out of SLRU cache .
*/
# define QUEUE_CLEANUP_DELAY 4
/*
* Struct describing a listening backend ' s status
*/
@ -252,8 +268,8 @@ typedef struct QueueBackendStatus
typedef struct AsyncQueueControl
{
QueuePosition head ; /* head points to the next free location */
QueuePosition tail ; /* the global tail is equivalent to the pos of
* the " slowest " backend */
QueuePosition tail ; /* tail must be <= the queue position of every
* listening backend */
BackendId firstListener ; /* id of first listener, or InvalidBackendId */
TimestampTz lastQueueFillWarn ; /* time of last queue-full msg */
QueueBackendStatus backend [ FLEXIBLE_ARRAY_MEMBER ] ;
@ -402,10 +418,14 @@ static bool amRegisteredListener = false;
/* has this backend sent notifications in the current transaction? */
static bool backendHasSentNotifications = false ;
/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
static bool backendTryAdvanceTail = false ;
/* GUC parameter */
bool Trace_notify = false ;
/* local function prototypes */
static int asyncQueuePageDiff ( int p , int q ) ;
static bool asyncQueuePagePrecedes ( int p , int q ) ;
static void queue_listen ( ListenActionKind action , const char * channel ) ;
static void Async_UnlistenOnExit ( int code , Datum arg ) ;
@ -421,7 +441,7 @@ static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
static ListCell * asyncQueueAddEntries ( ListCell * nextNotify ) ;
static double asyncQueueUsage ( void ) ;
static void asyncQueueFillWarning ( void ) ;
static bool SignalBackends ( void ) ;
static void SignalBackends ( void ) ;
static void asyncQueueReadAllNotifications ( void ) ;
static bool asyncQueueProcessPageEntries ( volatile QueuePosition * current ,
QueuePosition stop ,
@ -436,10 +456,11 @@ static int notification_match(const void *key1, const void *key2, Size keysize);
static void ClearPendingActionsAndNotifies ( void ) ;
/*
* We will work on the page range of 0. . QUEUE_MAX_PAGE .
* Compute the difference between two queue page numbers ( i . e . , p - q ) ,
* accounting for wraparound .
*/
static bool
asyncQueuePagePrecedes ( int p , int q )
static int
asyncQueuePageDiff ( int p , int q )
{
int diff ;
@ -455,7 +476,14 @@ asyncQueuePagePrecedes(int p, int q)
diff - = QUEUE_MAX_PAGE + 1 ;
else if ( diff < - ( ( QUEUE_MAX_PAGE + 1 ) / 2 ) )
diff + = QUEUE_MAX_PAGE + 1 ;
return diff < 0 ;
return diff ;
}
/* Is p < q, accounting for wraparound? */
static bool
asyncQueuePagePrecedes ( int p , int q )
{
return asyncQueuePageDiff ( p , q ) < 0 ;
}
/*
@ -1051,8 +1079,6 @@ Exec_ListenPreCommit(void)
* notification to the frontend . Also , although our transaction might
* have executed NOTIFY , those message ( s ) aren ' t queued yet so we can ' t
* see them in the queue .
*
* This will also advance the global tail pointer if possible .
*/
if ( ! QUEUE_POS_EQUAL ( max , head ) )
asyncQueueReadAllNotifications ( ) ;
@ -1138,6 +1164,8 @@ Exec_UnlistenAllCommit(void)
* of a transaction . If we issued any notifications in the just - completed
* transaction , send signals to other backends to process them , and also
* process the queue ourselves to send messages to our own frontend .
* Also , if we filled enough queue pages with new notifies , try to advance
* the queue tail pointer .
*
* The reason that this is not done in AtCommit_Notify is that there is
* a nonzero chance of errors here ( for example , encoding conversion errors
@ -1156,7 +1184,6 @@ void
ProcessCompletedNotifies ( void )
{
MemoryContext caller_context ;
bool signalled ;
/* Nothing to do if we didn't send any notifications */
if ( ! backendHasSentNotifications )
@ -1185,23 +1212,20 @@ ProcessCompletedNotifies(void)
StartTransactionCommand ( ) ;
/* Send signals to other backends */
signalled = SignalBackends ( ) ;
SignalBackends ( ) ;
if ( listenChannels ! = NIL )
{
/* Read the queue ourselves, and send relevant stuff to the frontend */
asyncQueueReadAllNotifications ( ) ;
}
else if ( ! signalled )
/*
* If it ' s time to try to advance the global tail pointer , do that .
*/
if ( backendTryAdvanceTail )
{
/*
* If we found no other listening backends , and we aren ' t listening
* ourselves , then we must execute asyncQueueAdvanceTail to flush the
* queue , because ain ' t nobody else gonna do it . This prevents queue
* overflow when we ' re sending useless notifies to nobody . ( A new
* listener could have joined since we looked , but if so this is
* harmless . )
*/
backendTryAdvanceTail = false ;
asyncQueueAdvanceTail ( ) ;
}
@ -1242,8 +1266,6 @@ IsListeningOn(const char *channel)
static void
asyncQueueUnregister ( void )
{
bool advanceTail ;
Assert ( listenChannels = = NIL ) ; /* else caller error */
if ( ! amRegisteredListener ) /* nothing to do */
@ -1253,10 +1275,7 @@ asyncQueueUnregister(void)
* Need exclusive lock here to manipulate list links .
*/
LWLockAcquire ( AsyncQueueLock , LW_EXCLUSIVE ) ;
/* check if entry is valid and oldest ... */
advanceTail = ( MyProcPid = = QUEUE_BACKEND_PID ( MyBackendId ) ) & &
QUEUE_POS_EQUAL ( QUEUE_BACKEND_POS ( MyBackendId ) , QUEUE_TAIL ) ;
/* ... then mark it invalid */
/* Mark our entry as invalid */
QUEUE_BACKEND_PID ( MyBackendId ) = InvalidPid ;
QUEUE_BACKEND_DBOID ( MyBackendId ) = InvalidOid ;
/* and remove it from the list */
@ -1278,10 +1297,6 @@ asyncQueueUnregister(void)
/* mark ourselves as no longer listed in the global array */
amRegisteredListener = false ;
/* If we were the laziest backend, try to advance the tail pointer */
if ( advanceTail )
asyncQueueAdvanceTail ( ) ;
}
/*
@ -1467,6 +1482,15 @@ asyncQueueAddEntries(ListCell *nextNotify)
* page without overrunning the queue .
*/
slotno = SimpleLruZeroPage ( AsyncCtl , QUEUE_POS_PAGE ( queue_head ) ) ;
/*
* If the new page address is a multiple of QUEUE_CLEANUP_DELAY ,
* set flag to remember that we should try to advance the tail
* pointer ( we don ' t want to actually do that right here ) .
*/
if ( QUEUE_POS_PAGE ( queue_head ) % QUEUE_CLEANUP_DELAY = = 0 )
backendTryAdvanceTail = true ;
/* And exit the loop */
break ;
}
@ -1570,31 +1594,30 @@ asyncQueueFillWarning(void)
}
/*
* Send signals to all listening backends ( except our own ) .
* Send signals to listening backends .
*
* Returns true if we sent at least one signal .
* We never signal our own process ; that should be handled by our caller .
*
* Since we need EXCLUSIVE lock anyway we also check the position of the other
* backends and in case one is already up - to - date we don ' t signal it .
* This can happen if concurrent notifying transactions have sent a signal and
* the signaled backend has read the other notifications and ours in the same
* step .
* Normally we signal only backends in our own database , since only those
* backends could be interested in notifies we send . However , if there ' s
* notify traffic in our database but no traffic in another database that
* does have listener ( s ) , those listeners will fall further and further
* behind . Waken them anyway if they ' re far enough behind , so that they ' ll
* advance their queue position pointers , allowing the global tail to advance .
*
* Since we know the BackendId and the Pid the signalling is quite cheap .
*/
static bool
static void
SignalBackends ( void )
{
bool signalled = false ;
int32 * pids ;
BackendId * ids ;
int count ;
int32 pid ;
/*
* Identify all backends that are listening and not already up - to - date . We
* don ' t want to send signals while holding the AsyncQueueLock , so we just
* build a list of target PIDs .
* Identify backends that we need to signal . We don ' t want to send
* signals while holding the AsyncQueueLock , so this loop just builds a
* list of target PIDs .
*
* XXX in principle these pallocs could fail , which would be bad . Maybe
* preallocate the arrays ? But in practice this is only run in trivial
@ -1607,26 +1630,43 @@ SignalBackends(void)
LWLockAcquire ( AsyncQueueLock , LW_EXCLUSIVE ) ;
for ( BackendId i = QUEUE_FIRST_LISTENER ; i > 0 ; i = QUEUE_NEXT_LISTENER ( i ) )
{
pid = QUEUE_BACKEND_PID ( i ) ;
int32 pid = QUEUE_BACKEND_PID ( i ) ;
QueuePosition pos ;
Assert ( pid ! = InvalidPid ) ;
if ( pid ! = MyProcPid )
if ( pid = = MyProcPid )
continue ; /* never signal self */
pos = QUEUE_BACKEND_POS ( i ) ;
if ( QUEUE_BACKEND_DBOID ( i ) = = MyDatabaseId )
{
QueuePosition pos = QUEUE_BACKEND_POS ( i ) ;
if ( ! QUEUE_POS_EQUAL ( pos , QUEUE_HEAD ) )
{
pids [ count ] = pid ;
ids [ count ] = i ;
count + + ;
}
/*
* Always signal listeners in our own database , unless they ' re
* already caught up ( unlikely , but possible ) .
*/
if ( QUEUE_POS_EQUAL ( pos , QUEUE_HEAD ) )
continue ;
}
else
{
/*
* Listeners in other databases should be signaled only if they
* are far behind .
*/
if ( asyncQueuePageDiff ( QUEUE_POS_PAGE ( QUEUE_HEAD ) ,
QUEUE_POS_PAGE ( pos ) ) < QUEUE_CLEANUP_DELAY )
continue ;
}
/* OK, need to signal this one */
pids [ count ] = pid ;
ids [ count ] = i ;
count + + ;
}
LWLockRelease ( AsyncQueueLock ) ;
/* Now send signals */
for ( int i = 0 ; i < count ; i + + )
{
pid = pids [ i ] ;
int32 pid = pids [ i ] ;
/*
* Note : assuming things aren ' t broken , a signal failure here could
@ -1636,14 +1676,10 @@ SignalBackends(void)
*/
if ( SendProcSignal ( pid , PROCSIG_NOTIFY_INTERRUPT , ids [ i ] ) < 0 )
elog ( DEBUG3 , " could not signal backend with PID %d: %m " , pid ) ;
else
signalled = true ;
}
pfree ( pids ) ;
pfree ( ids ) ;
return signalled ;
}
/*
@ -1844,7 +1880,6 @@ asyncQueueReadAllNotifications(void)
QueuePosition oldpos ;
QueuePosition head ;
Snapshot snapshot ;
bool advanceTail ;
/* page_buffer must be adequately aligned, so use a union */
union
@ -1966,13 +2001,8 @@ asyncQueueReadAllNotifications(void)
/* Update shared state */
LWLockAcquire ( AsyncQueueLock , LW_SHARED ) ;
QUEUE_BACKEND_POS ( MyBackendId ) = pos ;
advanceTail = QUEUE_POS_EQUAL ( oldpos , QUEUE_TAIL ) ;
LWLockRelease ( AsyncQueueLock ) ;
/* If we were the laziest backend, try to advance the tail pointer */
if ( advanceTail )
asyncQueueAdvanceTail ( ) ;
PG_RE_THROW ( ) ;
}
PG_END_TRY ( ) ;
@ -1980,13 +2010,8 @@ asyncQueueReadAllNotifications(void)
/* Update shared state */
LWLockAcquire ( AsyncQueueLock , LW_SHARED ) ;
QUEUE_BACKEND_POS ( MyBackendId ) = pos ;
advanceTail = QUEUE_POS_EQUAL ( oldpos , QUEUE_TAIL ) ;
LWLockRelease ( AsyncQueueLock ) ;
/* If we were the laziest backend, try to advance the tail pointer */
if ( advanceTail )
asyncQueueAdvanceTail ( ) ;
/* Done with snapshot */
UnregisterSnapshot ( snapshot ) ;
}