@ -463,7 +463,6 @@ static void SignalBackends(void);
static void asyncQueueReadAllNotifications ( void ) ;
static bool asyncQueueProcessPageEntries ( QueuePosition * current ,
QueuePosition stop ,
char * page_buffer ,
Snapshot snapshot ) ;
static void asyncQueueAdvanceTail ( void ) ;
static void ProcessIncomingNotify ( bool flush ) ;
@ -1917,13 +1916,6 @@ asyncQueueReadAllNotifications(void)
QueuePosition head ;
Snapshot snapshot ;
/* page_buffer must be adequately aligned, so use a union */
union
{
char buf [ QUEUE_PAGESIZE ] ;
AsyncQueueEntry align ;
} page_buffer ;
/* Fetch current state */
LWLockAcquire ( NotifyQueueLock , LW_SHARED ) ;
/* Assert checks that we have a valid state entry */
@ -2004,37 +1996,6 @@ asyncQueueReadAllNotifications(void)
do
{
int curpage = QUEUE_POS_PAGE ( pos ) ;
int curoffset = QUEUE_POS_OFFSET ( pos ) ;
int slotno ;
int copysize ;
/*
* We copy the data from SLRU into a local buffer , so as to avoid
* holding the NotifySLRULock while we are examining the entries
* and possibly transmitting them to our frontend . Copy only the
* part of the page we will actually inspect .
*/
slotno = SimpleLruReadPage_ReadOnly ( NotifyCtl , curpage ,
InvalidTransactionId ) ;
if ( curpage = = QUEUE_POS_PAGE ( head ) )
{
/* we only want to read as far as head */
copysize = QUEUE_POS_OFFSET ( head ) - curoffset ;
if ( copysize < 0 )
copysize = 0 ; /* just for safety */
}
else
{
/* fetch all the rest of the page */
copysize = QUEUE_PAGESIZE - curoffset ;
}
memcpy ( page_buffer . buf + curoffset ,
NotifyCtl - > shared - > page_buffer [ slotno ] + curoffset ,
copysize ) ;
/* Release lock that we got from SimpleLruReadPage_ReadOnly() */
LWLockRelease ( NotifySLRULock ) ;
/*
* Process messages up to the stop position , end of page , or an
* uncommitted message .
@ -2050,9 +2011,7 @@ asyncQueueReadAllNotifications(void)
* rewrite pages under us . Especially we don ' t want to hold a lock
* while sending the notifications to the frontend .
*/
reachedStop = asyncQueueProcessPageEntries ( & pos , head ,
page_buffer . buf ,
snapshot ) ;
reachedStop = asyncQueueProcessPageEntries ( & pos , head , snapshot ) ;
} while ( ! reachedStop ) ;
/* Update shared state */
@ -2071,13 +2030,6 @@ asyncQueueReadAllNotifications(void)
* Fetch notifications from the shared queue , beginning at position current ,
* and deliver relevant ones to my frontend .
*
* The current page must have been fetched into page_buffer from shared
* memory . ( We could access the page right in shared memory , but that
* would imply holding the NotifySLRULock throughout this routine . )
*
* We stop if we reach the " stop " position , or reach a notification from an
* uncommitted transaction , or reach the end of the page .
*
* The function returns true once we have reached the stop position or an
* uncommitted notification , and false if we have finished with the page .
* In other words : once it returns true there is no need to look further .
@ -2086,16 +2038,34 @@ asyncQueueReadAllNotifications(void)
static bool
asyncQueueProcessPageEntries ( QueuePosition * current ,
QueuePosition stop ,
char * page_buffer ,
Snapshot snapshot )
{
int64 curpage = QUEUE_POS_PAGE ( * current ) ;
int slotno ;
char * page_buffer ;
bool reachedStop = false ;
bool reachedEndOfPage ;
AsyncQueueEntry * qe ;
/*
* We copy the entries into a local buffer to avoid holding the SLRU lock
* while we transmit them to our frontend . The local buffer must be
* adequately aligned , so use a union .
*/
union
{
char buf [ QUEUE_PAGESIZE ] ;
AsyncQueueEntry align ;
} local_buf ;
char * local_buf_end = local_buf . buf ;
slotno = SimpleLruReadPage_ReadOnly ( NotifyCtl , curpage ,
InvalidTransactionId ) ;
page_buffer = NotifyCtl - > shared - > page_buffer [ slotno ] ;
do
{
QueuePosition thisentry = * current ;
AsyncQueueEntry * qe ;
if ( QUEUE_POS_EQUAL ( thisentry , stop ) )
break ;
@ -2137,18 +2107,23 @@ asyncQueueProcessPageEntries(QueuePosition *current,
reachedStop = true ;
break ;
}
else if ( TransactionIdDidCommit ( qe - > xid ) )
{
/* qe->data is the null-terminated channel name */
char * channel = qe - > data ;
if ( IsListeningOn ( channel ) )
{
/* payload follows channel name */
char * payload = qe - > data + strlen ( channel ) + 1 ;
/*
* Quick check for the case that we ' re not listening on any
* channels , before calling TransactionIdDidCommit ( ) . This makes
* that case a little faster , but more importantly , it ensures
* that if there ' s a bad entry in the queue for which
* TransactionIdDidCommit ( ) fails for some reason , we can skip
* over it on the first LISTEN in a session , and not get stuck on
* it indefinitely .
*/
if ( listenChannels = = NIL )
continue ;
NotifyMyFrontEnd ( channel , payload , qe - > srcPid ) ;
}
if ( TransactionIdDidCommit ( qe - > xid ) )
{
memcpy ( local_buf_end , qe , qe - > length ) ;
local_buf_end + = qe - > length ;
}
else
{
@ -2162,6 +2137,32 @@ asyncQueueProcessPageEntries(QueuePosition *current,
/* Loop back if we're not at end of page */
} while ( ! reachedEndOfPage ) ;
/* Release lock that we got from SimpleLruReadPage_ReadOnly() */
LWLockRelease ( NotifySLRULock ) ;
/*
* Now that we have let go of the SLRU bank lock , send the notifications
* to our backend
*/
Assert ( local_buf_end - local_buf . buf < = BLCKSZ ) ;
for ( char * p = local_buf . buf ; p < local_buf_end ; )
{
AsyncQueueEntry * qe = ( AsyncQueueEntry * ) p ;
/* qe->data is the null-terminated channel name */
char * channel = qe - > data ;
if ( IsListeningOn ( channel ) )
{
/* payload follows channel name */
char * payload = qe - > data + strlen ( channel ) + 1 ;
NotifyMyFrontEnd ( channel , payload , qe - > srcPid ) ;
}
p + = qe - > length ;
}
if ( QUEUE_POS_EQUAL ( * current , stop ) )
reachedStop = true ;