@ -143,6 +143,7 @@ typedef struct ProcState
int nextMsgNum ; /* next message number to read */
int nextMsgNum ; /* next message number to read */
bool resetState ; /* backend needs to reset its state */
bool resetState ; /* backend needs to reset its state */
bool signaled ; /* backend has been sent catchup signal */
bool signaled ; /* backend has been sent catchup signal */
bool hasMessages ; /* backend has unread messages */
/*
/*
* Backend only sends invalidations , never receives them . This only makes
* Backend only sends invalidations , never receives them . This only makes
@ -248,6 +249,7 @@ CreateSharedInvalidationState(void)
shmInvalBuffer - > procState [ i ] . nextMsgNum = 0 ; /* meaningless */
shmInvalBuffer - > procState [ i ] . nextMsgNum = 0 ; /* meaningless */
shmInvalBuffer - > procState [ i ] . resetState = false ;
shmInvalBuffer - > procState [ i ] . resetState = false ;
shmInvalBuffer - > procState [ i ] . signaled = false ;
shmInvalBuffer - > procState [ i ] . signaled = false ;
shmInvalBuffer - > procState [ i ] . hasMessages = false ;
shmInvalBuffer - > procState [ i ] . nextLXID = InvalidLocalTransactionId ;
shmInvalBuffer - > procState [ i ] . nextLXID = InvalidLocalTransactionId ;
}
}
}
}
@ -264,11 +266,9 @@ SharedInvalBackendInit(bool sendOnly)
SISeg * segP = shmInvalBuffer ;
SISeg * segP = shmInvalBuffer ;
/*
/*
* This can run in parallel with read operations , and for that matter with
* This can run in parallel with read operations , but not with write
* write operations ; but not in parallel with additions and removals of
* operations , since SIInsertDataEntries relies on lastBackend to set
* backends , nor in parallel with SICleanupQueue . It doesn ' t seem worth
* hasMessages appropriately .
* having a third lock , so we choose to use SInvalWriteLock to serialize
* additions / removals .
*/
*/
LWLockAcquire ( SInvalWriteLock , LW_EXCLUSIVE ) ;
LWLockAcquire ( SInvalWriteLock , LW_EXCLUSIVE ) ;
@ -316,6 +316,7 @@ SharedInvalBackendInit(bool sendOnly)
stateP - > nextMsgNum = segP - > maxMsgNum ;
stateP - > nextMsgNum = segP - > maxMsgNum ;
stateP - > resetState = false ;
stateP - > resetState = false ;
stateP - > signaled = false ;
stateP - > signaled = false ;
stateP - > hasMessages = false ;
stateP - > sendOnly = sendOnly ;
stateP - > sendOnly = sendOnly ;
LWLockRelease ( SInvalWriteLock ) ;
LWLockRelease ( SInvalWriteLock ) ;
@ -417,6 +418,7 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
int nthistime = Min ( n , WRITE_QUANTUM ) ;
int nthistime = Min ( n , WRITE_QUANTUM ) ;
int numMsgs ;
int numMsgs ;
int max ;
int max ;
int i ;
n - = nthistime ;
n - = nthistime ;
@ -459,6 +461,19 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
SpinLockRelease ( & vsegP - > msgnumLock ) ;
SpinLockRelease ( & vsegP - > msgnumLock ) ;
}
}
/*
* Now that the maxMsgNum change is globally visible , we give
* everyone a swift kick to make sure they read the newly added
* messages . Releasing SInvalWriteLock will enforce a full memory
* barrier , so these ( unlocked ) changes will be committed to memory
* before we exit the function .
*/
for ( i = 0 ; i < segP - > lastBackend ; i + + )
{
ProcState * stateP = & segP - > procState [ i ] ;
stateP - > hasMessages = TRUE ;
}
LWLockRelease ( SInvalWriteLock ) ;
LWLockRelease ( SInvalWriteLock ) ;
}
}
}
}
@ -499,11 +514,36 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
int max ;
int max ;
int n ;
int n ;
LWLockAcquire ( SInvalReadLock , LW_SHARED ) ;
segP = shmInvalBuffer ;
segP = shmInvalBuffer ;
stateP = & segP - > procState [ MyBackendId - 1 ] ;
stateP = & segP - > procState [ MyBackendId - 1 ] ;
/*
* Before starting to take locks , do a quick , unlocked test to see whether
* there can possibly be anything to read . On a multiprocessor system ,
* it ' s possible that this load could migrate backwards and occur before we
* actually enter this function , so we might miss a sinval message that
* was just added by some other processor . But they can ' t migrate
* backwards over a preceding lock acquisition , so it should be OK . If
* we haven ' t acquired a lock preventing against further relevant
* invalidations , any such occurrence is not much different than if the
* invalidation had arrived slightly later in the first place .
*/
if ( ! stateP - > hasMessages )
return 0 ;
LWLockAcquire ( SInvalReadLock , LW_SHARED ) ;
/*
* We must reset hasMessages before determining how many messages we ' re
* going to read . That way , if new messages arrive after we have
* determined how many we ' re reading , the flag will get reset and we ' ll
* notice those messages part - way through .
*
* Note that , if we don ' t end up reading all of the messages , we had
* better be certain to reset this flag before exiting !
*/
stateP - > hasMessages = FALSE ;
/* Fetch current value of maxMsgNum using spinlock */
/* Fetch current value of maxMsgNum using spinlock */
{
{
/* use volatile pointer to prevent code rearrangement */
/* use volatile pointer to prevent code rearrangement */
@ -544,10 +584,16 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
}
}
/*
/*
* Reset our " signaled " flag whenever we have caught up completely .
* If we have caught up completely , reset our " signaled " flag so that
* we ' ll get another signal if we fall behind again .
*
* If we haven ' t catch up completely , reset the hasMessages flag so that
* we see the remaining messages next time .
*/
*/
if ( stateP - > nextMsgNum > = max )
if ( stateP - > nextMsgNum > = max )
stateP - > signaled = false ;
stateP - > signaled = false ;
else
stateP - > hasMessages = TRUE ;
LWLockRelease ( SInvalReadLock ) ;
LWLockRelease ( SInvalReadLock ) ;
return n ;
return n ;