@ -247,7 +247,7 @@ typedef struct QueueBackendStatus
{
{
int32 pid ; /* either a PID or InvalidPid */
int32 pid ; /* either a PID or InvalidPid */
Oid dboid ; /* backend's database OID, or InvalidOid */
Oid dboid ; /* backend's database OID, or InvalidOid */
BackendId nextListener ; /* id of next listener, or InvalidBackendId */
ProcNumber nextListener ; /* id of next listener, or INVALID_PROC_NUMBER */
QueuePosition pos ; /* backend has read queue up to here */
QueuePosition pos ; /* backend has read queue up to here */
} QueueBackendStatus ;
} QueueBackendStatus ;
@ -273,13 +273,12 @@ typedef struct QueueBackendStatus
* NotifyQueueTailLock , then NotifyQueueLock , and lastly SLRU bank lock .
* NotifyQueueTailLock , then NotifyQueueLock , and lastly SLRU bank lock .
*
*
* Each backend uses the backend [ ] array entry with index equal to its
* Each backend uses the backend [ ] array entry with index equal to its
* BackendId ( which can range from 1 to MaxBackends ) . We rely on this to make
* ProcNumber . We rely on this to make SendProcSignal fast .
* SendProcSignal fast .
*
*
* The backend [ ] array entries for actively - listening backends are threaded
* The backend [ ] array entries for actively - listening backends are threaded
* together using firstListener and the nextListener links , so that we can
* together using firstListener and the nextListener links , so that we can
* scan them without having to iterate over inactive entries . We keep this
* scan them without having to iterate over inactive entries . We keep this
* list in order by BackendId so that the scan is cache - friendly when there
* list in order by ProcNumber so that the scan is cache - friendly when there
* are many active entries .
* are many active entries .
*/
*/
typedef struct AsyncQueueControl
typedef struct AsyncQueueControl
@ -289,10 +288,10 @@ typedef struct AsyncQueueControl
* listening backend */
* listening backend */
int stopPage ; /* oldest unrecycled page; must be <=
int stopPage ; /* oldest unrecycled page; must be <=
* tail . page */
* tail . page */
BackendId firstListener ; /* id of first listener, or InvalidBackendId */
ProcNumber firstListener ; /* id of first listener, or
* INVALID_PROC_NUMBER */
TimestampTz lastQueueFillWarn ; /* time of last queue-full msg */
TimestampTz lastQueueFillWarn ; /* time of last queue-full msg */
QueueBackendStatus backend [ FLEXIBLE_ARRAY_MEMBER ] ;
QueueBackendStatus backend [ FLEXIBLE_ARRAY_MEMBER ] ;
/* backend[0] is not used; used entries are from [1] to [MaxBackends] */
} AsyncQueueControl ;
} AsyncQueueControl ;
static AsyncQueueControl * asyncQueueControl ;
static AsyncQueueControl * asyncQueueControl ;
@ -491,7 +490,7 @@ AsyncShmemSize(void)
Size size ;
Size size ;
/* This had better match AsyncShmemInit */
/* This had better match AsyncShmemInit */
size = mul_size ( MaxBackends + 1 , sizeof ( QueueBackendStatus ) ) ;
size = mul_size ( MaxBackends , sizeof ( QueueBackendStatus ) ) ;
size = add_size ( size , offsetof ( AsyncQueueControl , backend ) ) ;
size = add_size ( size , offsetof ( AsyncQueueControl , backend ) ) ;
size = add_size ( size , SimpleLruShmemSize ( notify_buffers , 0 ) ) ;
size = add_size ( size , SimpleLruShmemSize ( notify_buffers , 0 ) ) ;
@ -510,11 +509,8 @@ AsyncShmemInit(void)
/*
/*
* Create or attach to the AsyncQueueControl structure .
* Create or attach to the AsyncQueueControl structure .
*
* The used entries in the backend [ ] array run from 1 to MaxBackends ; the
* zero ' th entry is unused but must be allocated .
*/
*/
size = mul_size ( MaxBackends + 1 , sizeof ( QueueBackendStatus ) ) ;
size = mul_size ( MaxBackends , sizeof ( QueueBackendStatus ) ) ;
size = add_size ( size , offsetof ( AsyncQueueControl , backend ) ) ;
size = add_size ( size , offsetof ( AsyncQueueControl , backend ) ) ;
asyncQueueControl = ( AsyncQueueControl * )
asyncQueueControl = ( AsyncQueueControl * )
@ -526,14 +522,13 @@ AsyncShmemInit(void)
SET_QUEUE_POS ( QUEUE_HEAD , 0 , 0 ) ;
SET_QUEUE_POS ( QUEUE_HEAD , 0 , 0 ) ;
SET_QUEUE_POS ( QUEUE_TAIL , 0 , 0 ) ;
SET_QUEUE_POS ( QUEUE_TAIL , 0 , 0 ) ;
QUEUE_STOP_PAGE = 0 ;
QUEUE_STOP_PAGE = 0 ;
QUEUE_FIRST_LISTENER = InvalidBackendId ;
QUEUE_FIRST_LISTENER = INVALID_PROC_NUMBER ;
asyncQueueControl - > lastQueueFillWarn = 0 ;
asyncQueueControl - > lastQueueFillWarn = 0 ;
/* zero'th entry won't be used, but let's initialize it anyway */
for ( int i = 0 ; i < MaxBackends ; i + + )
for ( int i = 0 ; i < = MaxBackends ; i + + )
{
{
QUEUE_BACKEND_PID ( i ) = InvalidPid ;
QUEUE_BACKEND_PID ( i ) = InvalidPid ;
QUEUE_BACKEND_DBOID ( i ) = InvalidOid ;
QUEUE_BACKEND_DBOID ( i ) = InvalidOid ;
QUEUE_NEXT_LISTENER ( i ) = InvalidBackendId ;
QUEUE_NEXT_LISTENER ( i ) = INVALID_PROC_NUMBER ;
SET_QUEUE_POS ( QUEUE_BACKEND_POS ( i ) , 0 , 0 ) ;
SET_QUEUE_POS ( QUEUE_BACKEND_POS ( i ) , 0 , 0 ) ;
}
}
}
}
@ -1050,7 +1045,7 @@ Exec_ListenPreCommit(void)
{
{
QueuePosition head ;
QueuePosition head ;
QueuePosition max ;
QueuePosition max ;
BackendId prevListener ;
ProcNumber prevListener ;
/*
/*
* Nothing to do if we are already listening to something , nor if we
* Nothing to do if we are already listening to something , nor if we
@ -1095,28 +1090,28 @@ Exec_ListenPreCommit(void)
LWLockAcquire ( NotifyQueueLock , LW_EXCLUSIVE ) ;
LWLockAcquire ( NotifyQueueLock , LW_EXCLUSIVE ) ;
head = QUEUE_HEAD ;
head = QUEUE_HEAD ;
max = QUEUE_TAIL ;
max = QUEUE_TAIL ;
prevListener = InvalidBackendId ;
prevListener = INVALID_PROC_NUMBER ;
for ( BackendId i = QUEUE_FIRST_LISTENER ; i > 0 ; i = QUEUE_NEXT_LISTENER ( i ) )
for ( ProcNumber i = QUEUE_FIRST_LISTENER ; i ! = INVALID_PROC_NUMBER ; i = QUEUE_NEXT_LISTENER ( i ) )
{
{
if ( QUEUE_BACKEND_DBOID ( i ) = = MyDatabaseId )
if ( QUEUE_BACKEND_DBOID ( i ) = = MyDatabaseId )
max = QUEUE_POS_MAX ( max , QUEUE_BACKEND_POS ( i ) ) ;
max = QUEUE_POS_MAX ( max , QUEUE_BACKEND_POS ( i ) ) ;
/* Also find last listening backend before this one */
/* Also find last listening backend before this one */
if ( i < MyBackendId )
if ( i < MyProcNumber )
prevListener = i ;
prevListener = i ;
}
}
QUEUE_BACKEND_POS ( MyBackendId ) = max ;
QUEUE_BACKEND_POS ( MyProcNumber ) = max ;
QUEUE_BACKEND_PID ( MyBackendId ) = MyProcPid ;
QUEUE_BACKEND_PID ( MyProcNumber ) = MyProcPid ;
QUEUE_BACKEND_DBOID ( MyBackendId ) = MyDatabaseId ;
QUEUE_BACKEND_DBOID ( MyProcNumber ) = MyDatabaseId ;
/* Insert backend into list of listeners at correct position */
/* Insert backend into list of listeners at correct position */
if ( prevListener > 0 )
if ( prevListener ! = INVALID_PROC_NUMBER )
{
{
QUEUE_NEXT_LISTENER ( MyBackendId ) = QUEUE_NEXT_LISTENER ( prevListener ) ;
QUEUE_NEXT_LISTENER ( MyProcNumber ) = QUEUE_NEXT_LISTENER ( prevListener ) ;
QUEUE_NEXT_LISTENER ( prevListener ) = MyBackendId ;
QUEUE_NEXT_LISTENER ( prevListener ) = MyProcNumber ;
}
}
else
else
{
{
QUEUE_NEXT_LISTENER ( MyBackendId ) = QUEUE_FIRST_LISTENER ;
QUEUE_NEXT_LISTENER ( MyProcNumber ) = QUEUE_FIRST_LISTENER ;
QUEUE_FIRST_LISTENER = MyBackendId ;
QUEUE_FIRST_LISTENER = MyProcNumber ;
}
}
LWLockRelease ( NotifyQueueLock ) ;
LWLockRelease ( NotifyQueueLock ) ;
@ -1248,23 +1243,23 @@ asyncQueueUnregister(void)
*/
*/
LWLockAcquire ( NotifyQueueLock , LW_EXCLUSIVE ) ;
LWLockAcquire ( NotifyQueueLock , LW_EXCLUSIVE ) ;
/* Mark our entry as invalid */
/* Mark our entry as invalid */
QUEUE_BACKEND_PID ( MyBackendId ) = InvalidPid ;
QUEUE_BACKEND_PID ( MyProcNumber ) = InvalidPid ;
QUEUE_BACKEND_DBOID ( MyBackendId ) = InvalidOid ;
QUEUE_BACKEND_DBOID ( MyProcNumber ) = InvalidOid ;
/* and remove it from the list */
/* and remove it from the list */
if ( QUEUE_FIRST_LISTENER = = MyBackendId )
if ( QUEUE_FIRST_LISTENER = = MyProcNumber )
QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER ( MyBackendId ) ;
QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER ( MyProcNumber ) ;
else
else
{
{
for ( BackendId i = QUEUE_FIRST_LISTENER ; i > 0 ; i = QUEUE_NEXT_LISTENER ( i ) )
for ( ProcNumber i = QUEUE_FIRST_LISTENER ; i ! = INVALID_PROC_NUMBER ; i = QUEUE_NEXT_LISTENER ( i ) )
{
{
if ( QUEUE_NEXT_LISTENER ( i ) = = MyBackendId )
if ( QUEUE_NEXT_LISTENER ( i ) = = MyProcNumber )
{
{
QUEUE_NEXT_LISTENER ( i ) = QUEUE_NEXT_LISTENER ( MyBackendId ) ;
QUEUE_NEXT_LISTENER ( i ) = QUEUE_NEXT_LISTENER ( MyProcNumber ) ;
break ;
break ;
}
}
}
}
}
}
QUEUE_NEXT_LISTENER ( MyBackendId ) = InvalidBackendId ;
QUEUE_NEXT_LISTENER ( MyProcNumber ) = INVALID_PROC_NUMBER ;
LWLockRelease ( NotifyQueueLock ) ;
LWLockRelease ( NotifyQueueLock ) ;
/* mark ourselves as no longer listed in the global array */
/* mark ourselves as no longer listed in the global array */
@ -1549,7 +1544,7 @@ asyncQueueFillWarning(void)
QueuePosition min = QUEUE_HEAD ;
QueuePosition min = QUEUE_HEAD ;
int32 minPid = InvalidPid ;
int32 minPid = InvalidPid ;
for ( BackendId i = QUEUE_FIRST_LISTENER ; i > 0 ; i = QUEUE_NEXT_LISTENER ( i ) )
for ( ProcNumber i = QUEUE_FIRST_LISTENER ; i ! = INVALID_PROC_NUMBER ; i = QUEUE_NEXT_LISTENER ( i ) )
{
{
Assert ( QUEUE_BACKEND_PID ( i ) ! = InvalidPid ) ;
Assert ( QUEUE_BACKEND_PID ( i ) ! = InvalidPid ) ;
min = QUEUE_POS_MIN ( min , QUEUE_BACKEND_POS ( i ) ) ;
min = QUEUE_POS_MIN ( min , QUEUE_BACKEND_POS ( i ) ) ;
@ -1580,7 +1575,7 @@ asyncQueueFillWarning(void)
* behind . Waken them anyway if they ' re far enough behind , so that they ' ll
* behind . Waken them anyway if they ' re far enough behind , so that they ' ll
* advance their queue position pointers , allowing the global tail to advance .
* advance their queue position pointers , allowing the global tail to advance .
*
*
* Since we know the BackendId and the Pid the signaling is quite cheap .
* Since we know the ProcNumber and the Pid the signaling is quite cheap .
*
*
* This is called during CommitTransaction ( ) , so it ' s important for it
* This is called during CommitTransaction ( ) , so it ' s important for it
* to have very low probability of failure .
* to have very low probability of failure .
@ -1589,7 +1584,7 @@ static void
SignalBackends ( void )
SignalBackends ( void )
{
{
int32 * pids ;
int32 * pids ;
BackendId * id s;
ProcNumber * procno s;
int count ;
int count ;
/*
/*
@ -1601,11 +1596,11 @@ SignalBackends(void)
* preallocate the arrays ? They ' re not that large , though .
* preallocate the arrays ? They ' re not that large , though .
*/
*/
pids = ( int32 * ) palloc ( MaxBackends * sizeof ( int32 ) ) ;
pids = ( int32 * ) palloc ( MaxBackends * sizeof ( int32 ) ) ;
ids = ( BackendId * ) palloc ( MaxBackends * sizeof ( BackendId ) ) ;
procnos = ( ProcNumber * ) palloc ( MaxBackends * sizeof ( ProcNumber ) ) ;
count = 0 ;
count = 0 ;
LWLockAcquire ( NotifyQueueLock , LW_EXCLUSIVE ) ;
LWLockAcquire ( NotifyQueueLock , LW_EXCLUSIVE ) ;
for ( BackendId i = QUEUE_FIRST_LISTENER ; i > 0 ; i = QUEUE_NEXT_LISTENER ( i ) )
for ( ProcNumber i = QUEUE_FIRST_LISTENER ; i ! = INVALID_PROC_NUMBER ; i = QUEUE_NEXT_LISTENER ( i ) )
{
{
int32 pid = QUEUE_BACKEND_PID ( i ) ;
int32 pid = QUEUE_BACKEND_PID ( i ) ;
QueuePosition pos ;
QueuePosition pos ;
@ -1633,7 +1628,7 @@ SignalBackends(void)
}
}
/* OK, need to signal this one */
/* OK, need to signal this one */
pids [ count ] = pid ;
pids [ count ] = pid ;
id s[ count ] = i ;
procno s[ count ] = i ;
count + + ;
count + + ;
}
}
LWLockRelease ( NotifyQueueLock ) ;
LWLockRelease ( NotifyQueueLock ) ;
@ -1659,12 +1654,12 @@ SignalBackends(void)
* NotifyQueueLock ; which is unlikely but certainly possible . So we
* NotifyQueueLock ; which is unlikely but certainly possible . So we
* just log a low - level debug message if it happens .
* just log a low - level debug message if it happens .
*/
*/
if ( SendProcSignal ( pid , PROCSIG_NOTIFY_INTERRUPT , id s[ i ] ) < 0 )
if ( SendProcSignal ( pid , PROCSIG_NOTIFY_INTERRUPT , procno s[ i ] ) < 0 )
elog ( DEBUG3 , " could not signal backend with PID %d: %m " , pid ) ;
elog ( DEBUG3 , " could not signal backend with PID %d: %m " , pid ) ;
}
}
pfree ( pids ) ;
pfree ( pids ) ;
pfree ( id s) ;
pfree ( procno s) ;
}
}
/*
/*
@ -1872,8 +1867,8 @@ asyncQueueReadAllNotifications(void)
/* Fetch current state */
/* Fetch current state */
LWLockAcquire ( NotifyQueueLock , LW_SHARED ) ;
LWLockAcquire ( NotifyQueueLock , LW_SHARED ) ;
/* Assert checks that we have a valid state entry */
/* Assert checks that we have a valid state entry */
Assert ( MyProcPid = = QUEUE_BACKEND_PID ( MyBackendId ) ) ;
Assert ( MyProcPid = = QUEUE_BACKEND_PID ( MyProcNumber ) ) ;
pos = QUEUE_BACKEND_POS ( MyBackendId ) ;
pos = QUEUE_BACKEND_POS ( MyProcNumber ) ;
head = QUEUE_HEAD ;
head = QUEUE_HEAD ;
LWLockRelease ( NotifyQueueLock ) ;
LWLockRelease ( NotifyQueueLock ) ;
@ -1995,7 +1990,7 @@ asyncQueueReadAllNotifications(void)
{
{
/* Update shared state */
/* Update shared state */
LWLockAcquire ( NotifyQueueLock , LW_SHARED ) ;
LWLockAcquire ( NotifyQueueLock , LW_SHARED ) ;
QUEUE_BACKEND_POS ( MyBackendId ) = pos ;
QUEUE_BACKEND_POS ( MyProcNumber ) = pos ;
LWLockRelease ( NotifyQueueLock ) ;
LWLockRelease ( NotifyQueueLock ) ;
}
}
PG_END_TRY ( ) ;
PG_END_TRY ( ) ;
@ -2142,7 +2137,7 @@ asyncQueueAdvanceTail(void)
*/
*/
LWLockAcquire ( NotifyQueueLock , LW_EXCLUSIVE ) ;
LWLockAcquire ( NotifyQueueLock , LW_EXCLUSIVE ) ;
min = QUEUE_HEAD ;
min = QUEUE_HEAD ;
for ( BackendId i = QUEUE_FIRST_LISTENER ; i > 0 ; i = QUEUE_NEXT_LISTENER ( i ) )
for ( ProcNumber i = QUEUE_FIRST_LISTENER ; i ! = INVALID_PROC_NUMBER ; i = QUEUE_NEXT_LISTENER ( i ) )
{
{
Assert ( QUEUE_BACKEND_PID ( i ) ! = InvalidPid ) ;
Assert ( QUEUE_BACKEND_PID ( i ) ! = InvalidPid ) ;
min = QUEUE_POS_MIN ( min , QUEUE_BACKEND_POS ( i ) ) ;
min = QUEUE_POS_MIN ( min , QUEUE_BACKEND_POS ( i ) ) ;