@ -90,7 +90,7 @@ WaitLSNShmemInit(void)
for ( i = 0 ; i < WAIT_LSN_TYPE_COUNT ; i + + )
{
pg_atomic_init_u64 ( & waitLSNState - > minWaitedLSN [ i ] , PG_UINT64_MAX ) ;
pairingheap_initialize ( & waitLSNState - > waitersHeap [ i ] , waitlsn_cmp , ( void * ) ( uintptr_t ) i ) ;
pairingheap_initialize ( & waitLSNState - > waitersHeap [ i ] , waitlsn_cmp , NULL ) ;
}
/* Initialize process info array */
@ -106,9 +106,8 @@ WaitLSNShmemInit(void)
static int
waitlsn_cmp ( const pairingheap_node * a , const pairingheap_node * b , void * arg )
{
int i = ( uintptr_t ) arg ;
const WaitLSNProcInfo * aproc = pairingheap_const_container ( WaitLSNProcInfo , heapNode [ i ] , a ) ;
const WaitLSNProcInfo * bproc = pairingheap_const_container ( WaitLSNProcInfo , heapNode [ i ] , b ) ;
const WaitLSNProcInfo * aproc = pairingheap_const_container ( WaitLSNProcInfo , heapNode , a ) ;
const WaitLSNProcInfo * bproc = pairingheap_const_container ( WaitLSNProcInfo , heapNode , b ) ;
if ( aproc - > waitLSN < bproc - > waitLSN )
return 1 ;
@ -132,7 +131,7 @@ updateMinWaitedLSN(WaitLSNType lsnType)
if ( ! pairingheap_is_empty ( & waitLSNState - > waitersHeap [ i ] ) )
{
pairingheap_node * node = pairingheap_first ( & waitLSNState - > waitersHeap [ i ] ) ;
WaitLSNProcInfo * procInfo = pairingheap_container ( WaitLSNProcInfo , heapNode [ i ] , node ) ;
WaitLSNProcInfo * procInfo = pairingheap_container ( WaitLSNProcInfo , heapNode , node ) ;
minWaitedLSN = procInfo - > waitLSN ;
}
@ -154,10 +153,11 @@ addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType)
procInfo - > procno = MyProcNumber ;
procInfo - > waitLSN = lsn ;
procInfo - > lsnType = lsnType ;
Assert ( ! procInfo - > inHeap [ i ] ) ;
pairingheap_add ( & waitLSNState - > waitersHeap [ i ] , & procInfo - > heapNode [ i ] ) ;
procInfo - > inHeap [ i ] = true ;
Assert ( ! procInfo - > inHeap ) ;
pairingheap_add ( & waitLSNState - > waitersHeap [ i ] , & procInfo - > heapNode ) ;
procInfo - > inHeap = true ;
updateMinWaitedLSN ( lsnType ) ;
LWLockRelease ( WaitLSNLock ) ;
@ -176,10 +176,12 @@ deleteLSNWaiter(WaitLSNType lsnType)
LWLockAcquire ( WaitLSNLock , LW_EXCLUSIVE ) ;
if ( procInfo - > inHeap [ i ] )
Assert ( procInfo - > lsnType = = lsnType ) ;
if ( procInfo - > inHeap )
{
pairingheap_remove ( & waitLSNState - > waitersHeap [ i ] , & procInfo - > heapNode [ i ] ) ;
procInfo - > inHeap [ i ] = false ;
pairingheap_remove ( & waitLSNState - > waitersHeap [ i ] , & procInfo - > heapNode ) ;
procInfo - > inHeap = false ;
updateMinWaitedLSN ( lsnType ) ;
}
@ -228,7 +230,7 @@ wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
WaitLSNProcInfo * procInfo ;
/* Get procInfo using appropriate heap node */
procInfo = pairingheap_container ( WaitLSNProcInfo , heapNode [ i ] , node ) ;
procInfo = pairingheap_container ( WaitLSNProcInfo , heapNode , node ) ;
if ( XLogRecPtrIsValid ( currentLSN ) & & procInfo - > waitLSN > currentLSN )
break ;
@ -238,7 +240,7 @@ wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
( void ) pairingheap_remove_first ( & waitLSNState - > waitersHeap [ i ] ) ;
/* Update appropriate flag */
procInfo - > inHeap [ i ] = false ;
procInfo - > inHeap = false ;
if ( numWakeUpProcs = = WAKEUP_PROC_STATIC_ARRAY_SIZE )
break ;
@ -289,20 +291,14 @@ WaitLSNCleanup(void)
{
if ( waitLSNState )
{
int i ;
/*
* We do a fast - path check of the heap flags without the lock . These
* flags are set to true only by the process itself . So , it ' s only
* We do a fast - path check of the inHeap flag without the lock . This
* flag is set to true only by the process itself . So , it ' s only
* possible to get a false positive . But that will be eliminated by a
* recheck inside deleteLSNWaiter ( ) .
*/
for ( i = 0 ; i < ( int ) WAIT_LSN_TYPE_COUNT ; i + + )
{
if ( waitLSNState - > procInfos [ MyProcNumber ] . inHeap [ i ] )
deleteLSNWaiter ( ( WaitLSNType ) i ) ;
}
if ( waitLSNState - > procInfos [ MyProcNumber ] . inHeap )
deleteLSNWaiter ( waitLSNState - > procInfos [ MyProcNumber ] . lsnType ) ;
}
}