@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $ PostgreSQL : pgsql / src / backend / storage / ipc / sinvaladt . c , v 1.66 2008 / 01 / 01 19 : 45 : 51 momjian Exp $
* $ PostgreSQL : pgsql / src / backend / storage / ipc / sinvaladt . c , v 1.67 2008 / 03 / 16 19 : 47 : 33 alvherre Exp $
*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
@ -24,7 +24,82 @@
# include "storage/sinvaladt.h"
SISeg * shmInvalBuffer ;
/*
* Conceptually , the shared cache invalidation messages are stored in an
* infinite array , where maxMsgNum is the next array subscript to store a
* submitted message in , minMsgNum is the smallest array subscript containing a
* message not yet read by all backends , and we always have maxMsgNum > =
* minMsgNum . ( They are equal when there are no messages pending . ) For each
* active backend , there is a nextMsgNum pointer indicating the next message it
* needs to read ; we have maxMsgNum > = nextMsgNum > = minMsgNum for every
* backend .
*
* In reality , the messages are stored in a circular buffer of MAXNUMMESSAGES
* entries . We translate MsgNum values into circular - buffer indexes by
* computing MsgNum % MAXNUMMESSAGES ( this should be fast as long as
* MAXNUMMESSAGES is a constant and a power of 2 ) . As long as maxMsgNum
* doesn ' t exceed minMsgNum by more than MAXNUMMESSAGES , we have enough space
* in the buffer . If the buffer does overflow , we reset it to empty and
* force each backend to " reset " , ie , discard all its invalidatable state .
*
* We would have problems if the MsgNum values overflow an integer , so
* whenever minMsgNum exceeds MSGNUMWRAPAROUND , we subtract MSGNUMWRAPAROUND
* from all the MsgNum variables simultaneously . MSGNUMWRAPAROUND can be
* large so that we don ' t need to do this often . It must be a multiple of
* MAXNUMMESSAGES so that the existing circular - buffer entries don ' t need
* to be moved when we do it .
*/
/*
* Configurable parameters .
*
* MAXNUMMESSAGES : max number of shared - inval messages we can buffer .
* Must be a power of 2 for speed .
*
* MSGNUMWRAPAROUND : how often to reduce MsgNum variables to avoid overflow .
* Must be a multiple of MAXNUMMESSAGES . Should be large .
*/
# define MAXNUMMESSAGES 4096
# define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 4096)
/* Shared cache invalidation memory segment */
typedef struct SISeg
{
/*
* General state information
*/
int minMsgNum ; /* oldest message still needed */
int maxMsgNum ; /* next message number to be assigned */
int lastBackend ; /* index of last active procState entry, +1 */
int maxBackends ; /* size of procState array */
int freeBackends ; /* number of empty procState slots */
/*
* Next LocalTransactionId to use for each idle backend slot . We keep
* this here because it is indexed by BackendId and it is convenient to
* copy the value to and from local memory when MyBackendId is set .
*/
LocalTransactionId * nextLXID ; /* array of maxBackends entries */
/*
* Circular buffer holding shared - inval messages
*/
SharedInvalidationMessage buffer [ MAXNUMMESSAGES ] ;
/*
* Per - backend state info .
*
* We declare procState as 1 entry because C wants a fixed - size array , but
* actually it is maxBackends entries long .
*/
ProcState procState [ 1 ] ; /* reflects the invalidation state */
} SISeg ;
static SISeg * shmInvalBuffer ; /* pointer to the shared inval buffer */
static LocalTransactionId nextLocalTransactionId ;
@ -49,13 +124,12 @@ SInvalShmemSize(void)
}
/*
* SIBufferInit
* Create and initialize a new SI message buffer
* Shared Inval BufferInit
* Create and initialize the SI message buffer
*/
void
SIBufferInit ( void )
CreateSharedInvalidationState ( void )
{
SISeg * segP ;
Size size ;
int i ;
bool found ;
@ -64,49 +138,43 @@ SIBufferInit(void)
size = offsetof ( SISeg , procState ) ;
size = add_size ( size , mul_size ( sizeof ( ProcState ) , MaxBackends ) ) ;
shmInvalBuffer = segP = ( SISeg * )
shmInvalBuffer = ( SISeg * )
ShmemInitStruct ( " shmInvalBuffer " , size , & found ) ;
if ( found )
return ;
segP - > nextLXID = ShmemAlloc ( sizeof ( LocalTransactionId ) * MaxBackends ) ;
shmInvalBuffer - > nextLXID = ShmemAlloc ( sizeof ( LocalTransactionId ) * MaxBackends ) ;
/* Clear message counters, save size of procState array */
segP - > minMsgNum = 0 ;
segP - > maxMsgNum = 0 ;
segP - > lastBackend = 0 ;
segP - > maxBackends = MaxBackends ;
segP - > freeBackends = MaxBackends ;
shmInvalBuffer - > minMsgNum = 0 ;
shmInvalBuffer - > maxMsgNum = 0 ;
shmInvalBuffer - > lastBackend = 0 ;
shmInvalBuffer - > maxBackends = MaxBackends ;
shmInvalBuffer - > freeBackends = MaxBackends ;
/* The buffer[] array is initially all unused, so we need not fill it */
/* Mark all backends inactive, and initialize nextLXID */
for ( i = 0 ; i < segP - > maxBackends ; i + + )
for ( i = 0 ; i < shmInvalBuffer - > maxBackends ; i + + )
{
segP - > procState [ i ] . nextMsgNum = - 1 ; /* inactive */
segP - > procState [ i ] . resetState = false ;
segP - > nextLXID [ i ] = InvalidLocalTransactionId ;
shmInvalBuffer - > procState [ i ] . nextMsgNum = - 1 ; /* inactive */
shmInvalBuffer - > procState [ i ] . resetState = false ;
shmInvalBuffer - > nextLXID [ i ] = InvalidLocalTransactionId ;
}
}
/*
* SIBackendInit
* Shared Inval BackendInit
* Initialize a new backend to operate on the sinval buffer
*
* Returns :
* > 0 A - OK
* 0 Failed to find a free procState slot ( ie , MaxBackends exceeded )
* < 0 Some other failure ( not currently used )
*
* NB : this routine , and all following ones , must be executed with the
* SInvalLock lock held , since there may be multiple backends trying
* to access the buffer .
*/
int
SIBackendInit ( SISeg * segP )
void
SharedInvalBackendInit ( void )
{
int index ;
ProcState * stateP = NULL ;
SISeg * segP = shmInvalBuffer ;
LWLockAcquire ( SInvalLock , LW_EXCLUSIVE ) ;
/* Look for a free entry in the procState array */
for ( index = 0 ; index < segP - > lastBackend ; index + + )
@ -128,9 +196,14 @@ SIBackendInit(SISeg *segP)
}
else
{
/* out of procState slots */
/*
* out of procState slots : MaxBackends exceeded - - report normally
*/
MyBackendId = InvalidBackendId ;
return 0 ;
LWLockRelease ( SInvalLock ) ;
ereport ( FATAL ,
( errcode ( ERRCODE_TOO_MANY_CONNECTIONS ) ,
errmsg ( " sorry, too many clients already " ) ) ) ;
}
}
@ -153,10 +226,10 @@ SIBackendInit(SISeg *segP)
stateP - > nextMsgNum = segP - > maxMsgNum ;
stateP - > resetState = false ;
LWLockRelease ( SInvalLock ) ;
/* register exit routine to mark my entry inactive at exit */
on_shmem_exit ( CleanupInvalidationState , PointerGetDatum ( segP ) ) ;
return 1 ;
}
/*
@ -210,9 +283,16 @@ CleanupInvalidationState(int status, Datum arg)
* Returns true for normal successful insertion , false if had to reset .
*/
bool
SIInsertDataEntry ( SISeg * segP , S haredInvalidationMessage * data )
SIInsertDataEntry ( SharedInvalidationMessage * data )
{
int numMsgs = segP - > maxMsgNum - segP - > minMsgNum ;
int numMsgs ;
bool signal_postmaster = false ;
SISeg * segP ;
LWLockAcquire ( SInvalLock , LW_EXCLUSIVE ) ;
segP = shmInvalBuffer ;
numMsgs = segP - > maxMsgNum - segP - > minMsgNum ;
/* Is the buffer full? */
if ( numMsgs > = MAXNUMMESSAGES )
@ -222,12 +302,13 @@ SIInsertDataEntry(SISeg *segP, SharedInvalidationMessage *data)
* messages but not yet have done SIDelExpiredDataEntries ( ) to advance
* minMsgNum . So , make sure minMsgNum is up - to - date .
*/
SIDelExpiredDataEntries ( segP ) ;
SIDelExpiredDataEntries ( true ) ;
numMsgs = segP - > maxMsgNum - segP - > minMsgNum ;
if ( numMsgs > = MAXNUMMESSAGES )
{
/* Yup, it's definitely full, no choice but to reset */
SISetProcStateInvalid ( segP ) ;
LWLockRelease ( SInvalLock ) ;
return false ;
}
}
@ -246,7 +327,7 @@ SIInsertDataEntry(SISeg *segP, SharedInvalidationMessage *data)
IsUnderPostmaster )
{
elog ( DEBUG4 , " SI table is 70%% full, signaling postmaster " ) ;
SendPostmasterSignal ( PMSIGNAL_WAKEN_CHILDREN ) ;
signal_postmaster = true ;
}
/*
@ -255,6 +336,11 @@ SIInsertDataEntry(SISeg *segP, SharedInvalidationMessage *data)
segP - > buffer [ segP - > maxMsgNum % MAXNUMMESSAGES ] = * data ;
segP - > maxMsgNum + + ;
LWLockRelease ( SInvalLock ) ;
if ( signal_postmaster )
SendPostmasterSignal ( PMSIGNAL_WAKEN_CHILDREN ) ;
return true ;
}
@ -293,14 +379,23 @@ SISetProcStateInvalid(SISeg *segP)
* - 1 : SI reset message extracted
*
* NB : this can run in parallel with other instances of SIGetDataEntry
* executing on behalf of other backends . See comments in sinval . c in
* ReceiveSharedInvalidMessages ( ) .
* executing on behalf of other backends , since each instance will modify only
* fields of its own backend ' s ProcState , and no instance will look at fields
* of other backends ' ProcStates . We express this by grabbing SInvalLock in
* shared mode . Note that this is not exactly the normal ( read - only )
* interpretation of a shared lock ! Look closely at the interactions before
* allowing SInvalLock to be grabbed in shared mode for any other reason !
*/
int
SIGetDataEntry ( SISeg * segP , int backendId ,
SharedInvalidationMessage * data )
SIGetDataEntry ( int backendId , SharedInvalidationMessage * data )
{
ProcState * stateP = & segP - > procState [ backendId - 1 ] ;
ProcState * stateP ;
SISeg * segP ;
LWLockAcquire ( SInvalLock , LW_SHARED ) ;
segP = shmInvalBuffer ;
stateP = & segP - > procState [ backendId - 1 ] ;
if ( stateP - > resetState )
{
@ -310,11 +405,15 @@ SIGetDataEntry(SISeg *segP, int backendId,
*/
stateP - > resetState = false ;
stateP - > nextMsgNum = segP - > maxMsgNum ;
LWLockRelease ( SInvalLock ) ;
return - 1 ;
}
if ( stateP - > nextMsgNum > = segP - > maxMsgNum )
{
LWLockRelease ( SInvalLock ) ;
return 0 ; /* nothing to read */
}
/*
* Retrieve message and advance my counter .
@ -327,6 +426,8 @@ SIGetDataEntry(SISeg *segP, int backendId,
* delete it here . SIDelExpiredDataEntries ( ) should be called to remove
* dead messages .
*/
LWLockRelease ( SInvalLock ) ;
return 1 ; /* got a message */
}
@ -335,15 +436,23 @@ SIGetDataEntry(SISeg *segP, int backendId,
* Remove messages that have been consumed by all active backends
*/
void
SIDelExpiredDataEntries ( SISeg * segP )
SIDelExpiredDataEntries ( bool locked )
{
SISeg * segP = shmInvalBuffer ;
int min ,
i ,
h ;
if ( ! locked )
LWLockAcquire ( SInvalLock , LW_EXCLUSIVE ) ;
min = segP - > maxMsgNum ;
if ( min = = segP - > minMsgNum )
{
if ( ! locked )
LWLockRelease ( SInvalLock ) ;
return ; /* fast path if no messages exist */
}
/* Recompute minMsgNum = minimum of all backends' nextMsgNum */
@ -372,6 +481,9 @@ SIDelExpiredDataEntries(SISeg *segP)
segP - > procState [ i ] . nextMsgNum - = MSGNUMWRAPAROUND ;
}
}
if ( ! locked )
LWLockRelease ( SInvalLock ) ;
}