@ -1,7 +1,7 @@
/*-------------------------------------------------------------------------
*
* waitlsn . c
* Implements waiting for the given LSN , which is used in
* Implements waiting for the given replay LSN , which is used in
* CALL pg_wal_replay_wait ( target_lsn pg_lsn , timeout float8 ) .
*
* Copyright ( c ) 2024 , PostgreSQL Global Development Group
@ -26,21 +26,17 @@
# include "storage/latch.h"
# include "storage/proc.h"
# include "storage/shmem.h"
# include "utils/fmgrprotos.h"
# include "utils/pg_lsn.h"
# include "utils/snapmgr.h"
# include "utils/fmgrprotos.h"
# include "utils/wait_event_types.h"
/* Add to / delete from shared memory array */
static void addLSNWaiter ( XLogRecPtr lsn ) ;
static void deleteLSNWaiter ( void ) ;
static int lsn_cmp ( const pairingheap_node * a , const pairingheap_node * b ,
void * arg ) ;
struct WaitLSNState * waitLSN = NULL ;
static volatile sig_atomic_t haveShmemItem = false ;
/*
* Report the amount of shared memory space needed for WaitLSNState
*/
/* Report the amount of shared memory space needed for WaitLSNState. */
Size
WaitLSNShmemSize ( void )
{
@ -51,7 +47,7 @@ WaitLSNShmemSize(void)
return size ;
}
/* Initialize the WaitLSNState in the shared memory */
/* Initialize the WaitLSNState in the shared memory. */
void
WaitLSNShmemInit ( void )
{
@ -62,81 +58,93 @@ WaitLSNShmemInit(void)
& found ) ;
if ( ! found )
{
SpinLockInit ( & waitLSN - > mutex ) ;
waitLSN - > numWaitedProcs = 0 ;
pg_atomic_init_u64 ( & waitLSN - > minLSN , PG_UINT64_MAX ) ;
SpinLockInit ( & waitLSN - > waitersHeapMutex ) ;
pg_atomic_init_u64 ( & waitLSN - > minWaitedLSN , PG_UINT64_MAX ) ;
pairingheap_initialize ( & waitLSN - > waitersHeap , lsn_cmp , NULL ) ;
memset ( & waitLSN - > procInfos , 0 , MaxBackends * sizeof ( WaitLSNProcInfo ) ) ;
}
}
/*
* Add the information about the LSN waiter backend to the shared memory
* array .
* Comparison function for waitLSN - > waitersHeap heap . Waiting processes are
* ordered by lsn , so that the waiter with smallest lsn is at the top .
*/
static void
addLSNWaiter ( XLogRecPtr lsn )
static int
lsn_cmp ( const pairingheap_node * a , const pairingheap_node * b , void * arg )
{
WaitLSNProcInfo cur ;
int i ;
const WaitLSNProcInfo * aproc = pairingheap_const_container ( WaitLSNProcInfo , phNode , a ) ;
const WaitLSNProcInfo * bproc = pairingheap_const_container ( WaitLSNProcInfo , phNode , b ) ;
cur . procnum = MyProcNumber ;
cur . waitLSN = lsn ;
if ( aproc - > waitLSN < bproc - > waitLSN )
return 1 ;
else if ( aproc - > waitLSN > bproc - > waitLSN )
return - 1 ;
else
return 0 ;
}
SpinLockAcquire ( & waitLSN - > mutex ) ;
/*
* Update waitLSN - > minWaitedLSN according to the current state of
* waitLSN - > waitersHeap .
*/
static void
updateMinWaitedLSN ( void )
{
XLogRecPtr minWaitedLSN = PG_UINT64_MAX ;
for ( i = 0 ; i < waitLSN - > numWaitedProcs ; i + + )
if ( ! pairingheap_is_empty ( & waitLSN - > waitersHeap ) )
{
if ( waitLSN - > procInfos [ i ] . waitLSN > = cur . waitLSN )
{
WaitLSNProcInfo tmp ;
pairingheap_node * node = pairingheap_first ( & waitLSN - > waitersHeap ) ;
tmp = waitLSN - > procInfos [ i ] ;
waitLSN - > procInfos [ i ] = cur ;
cur = tmp ;
}
minWaitedLSN = pairingheap_container ( WaitLSNProcInfo , phNode , node ) - > waitLSN ;
}
waitLSN - > procInfos [ i ] = cur ;
waitLSN - > numWaitedProcs + + ;
pg_atomic_write_u64 ( & waitLSN - > minLSN , waitLSN - > procInfos [ i ] . waitLSN ) ;
SpinLockRelease ( & waitLSN - > mutex ) ;
pg_atomic_write_u64 ( & waitLSN - > minWaitedLSN , minWaitedLSN ) ;
}
/*
* Delete the information about the LSN waiter backend from the shared memory
* array .
* Put the current process into the heap of LSN waiters .
*/
static void
deleteLSNWaiter ( void )
addLSNWaiter ( XLogRecPtr lsn )
{
int i ;
bool found = false ;
WaitLSNProcInfo * procInfo = & waitLSN - > procInfos [ MyProcNumber ] ;
SpinLockAcquire ( & waitLSN - > mutex ) ;
Assert ( ! procInfo - > inHeap ) ;
for ( i = 0 ; i < waitLSN - > numWaitedProcs ; i + + )
{
if ( waitLSN - > procInfos [ i ] . procnum = = MyProcNumber )
found = true ;
procInfo - > procnum = MyProcNumber ;
procInfo - > waitLSN = lsn ;
if ( found & & i < waitLSN - > numWaitedProcs - 1 )
{
waitLSN - > procInfos [ i ] = waitLSN - > procInfos [ i + 1 ] ;
}
}
SpinLockAcquire ( & waitLSN - > waitersHeapMutex ) ;
if ( ! found )
pairingheap_add ( & waitLSN - > waitersHeap , & procInfo - > phNode ) ;
procInfo - > inHeap = true ;
updateMinWaitedLSN ( ) ;
SpinLockRelease ( & waitLSN - > waitersHeapMutex ) ;
}
/*
* Remove the current process from the heap of LSN waiters if it ' s there .
*/
static void
deleteLSNWaiter ( void )
{
WaitLSNProcInfo * procInfo = & waitLSN - > procInfos [ MyProcNumber ] ;
SpinLockAcquire ( & waitLSN - > waitersHeapMutex ) ;
if ( ! procInfo - > inHeap )
{
SpinLockRelease ( & waitLSN - > mutex ) ;
SpinLockRelease ( & waitLSN - > waitersHeapM utex) ;
return ;
}
waitLSN - > numWaitedProcs - - ;
if ( waitLSN - > numWaitedProcs ! = 0 )
pg_atomic_write_u64 ( & waitLSN - > minLSN , waitLSN - > procInfos [ i ] . waitLSN ) ;
else
pg_atomic_write_u64 ( & waitLSN - > minLSN , PG_UINT64_MAX ) ;
pairingheap_remove ( & waitLSN - > waitersHeap , & procInfo - > phNode ) ;
procInfo - > inHeap = false ;
updateMinWaitedLSN ( ) ;
SpinLockRelease ( & waitLSN - > mutex ) ;
SpinLockRelease ( & waitLSN - > waitersHeapM utex) ;
}
/*
@ -148,41 +156,33 @@ WaitLSNSetLatches(XLogRecPtr currentLSN)
{
int i ;
int * wakeUpProcNums ;
int numWakeUpProcs ;
int numWakeUpProcs = 0 ;
wakeUpProcNums = palloc ( sizeof ( int ) * MaxBackends ) ;
SpinLockAcquire ( & waitLSN - > m utex) ;
SpinLockAcquire ( & waitLSN - > waitersHeapM utex) ;
/*
* Remember processes , whose waited LSNs are already replayed . We should
* set their latches later after spinlock release .
* Iterate the pairing heap of waiting processes till we find LSN not yet
* replayed . Record the process numbers to set their latches later .
*/
for ( i = 0 ; i < waitLSN - > numWaitedProcs ; i + + )
while ( ! pairingheap_is_empty ( & waitLSN - > waitersHeap ) )
{
pairingheap_node * node = pairingheap_first ( & waitLSN - > waitersHeap ) ;
WaitLSNProcInfo * procInfo = pairingheap_container ( WaitLSNProcInfo , phNode , node ) ;
if ( ! XLogRecPtrIsInvalid ( currentLSN ) & &
waitLSN - > procInfos [ i ] . waitLSN > currentLSN )
procInfo - > waitLSN > currentLSN )
break ;
wakeUpProcNums [ i ] = waitLSN - > procInfos [ i ] . procnum ;
wakeUpProcNums [ numWakeUpProcs + + ] = procInfo - > procnum ;
( void ) pairingheap_remove_first ( & waitLSN - > waitersHeap ) ;
procInfo - > inHeap = false ;
}
/*
* Immediately remove those processes from the shmem array . Otherwise ,
* shmem array items will be here till corresponding processes wake up and
* delete themselves .
*/
numWakeUpProcs = i ;
for ( i = 0 ; i < waitLSN - > numWaitedProcs - numWakeUpProcs ; i + + )
waitLSN - > procInfos [ i ] = waitLSN - > procInfos [ i + numWakeUpProcs ] ;
waitLSN - > numWaitedProcs - = numWakeUpProcs ;
if ( waitLSN - > numWaitedProcs ! = 0 )
pg_atomic_write_u64 ( & waitLSN - > minLSN , waitLSN - > procInfos [ i ] . waitLSN ) ;
else
pg_atomic_write_u64 ( & waitLSN - > minLSN , PG_UINT64_MAX ) ;
updateMinWaitedLSN ( ) ;
SpinLockRelease ( & waitLSN - > m utex) ;
SpinLockRelease ( & waitLSN - > waitersHeapMutex ) ;
/*
* Set latches for processes , whose waited LSNs are already replayed . This
@ -204,7 +204,7 @@ WaitLSNSetLatches(XLogRecPtr currentLSN)
void
WaitLSNCleanup ( void )
{
if ( haveShmemItem )
if ( waitLSN - > procInfos [ MyProcNumber ] . inHeap )
deleteLSNWaiter ( ) ;
}
@ -222,7 +222,7 @@ WaitForLSN(XLogRecPtr targetLSN, int64 timeout)
Assert ( waitLSN ) ;
/* Should be only called by a backend */
Assert ( MyBackendType = = B_BACKEND ) ;
Assert ( MyBackendType = = B_BACKEND & & MyProcNumber < = MaxBackends ) ;
if ( ! RecoveryInProgress ( ) )
ereport ( ERROR ,
@ -238,7 +238,6 @@ WaitForLSN(XLogRecPtr targetLSN, int64 timeout)
endtime = TimestampTzPlusMilliseconds ( GetCurrentTimestamp ( ) , timeout ) ;
addLSNWaiter ( targetLSN ) ;
haveShmemItem = true ;
for ( ; ; )
{
@ -280,17 +279,12 @@ WaitForLSN(XLogRecPtr targetLSN, int64 timeout)
if ( targetLSN > currentLSN )
{
deleteLSNWaiter ( ) ;
haveShmemItem = false ;
ereport ( ERROR ,
( errcode ( ERRCODE_QUERY_CANCELED ) ,
errmsg ( " timed out while waiting for target LSN %X/%X to be replayed; current replay LSN %X/%X " ,
LSN_FORMAT_ARGS ( targetLSN ) ,
LSN_FORMAT_ARGS ( currentLSN ) ) ) ) ;
}
else
{
haveShmemItem = false ;
}
}
Datum