@ -78,13 +78,21 @@ static SlruCtlData CommitTsCtlData;
# define CommitTsCtl (&CommitTsCtlData)
/*
* We keep a cache of the last value set in shared memory . This is protected
* by CommitTsLock .
* We keep a cache of the last value set in shared memory .
*
* This is also good place to keep the activation status . We keep this
* separate from the GUC so that the standby can activate the module if the
* primary has it active independently of the value of the GUC .
*
* This is protected by CommitTsLock . In some places , we use commitTsActive
* without acquiring the lock ; where this happens , a comment explains the
* rationale for it .
*/
typedef struct CommitTimestampShared
{
TransactionId xidLastCommit ;
CommitTimestampEntry dataLastCommit ;
bool commitTsActive ;
} CommitTimestampShared ;
CommitTimestampShared * commitTsShared ;
@ -93,14 +101,6 @@ CommitTimestampShared *commitTsShared;
/* GUC variable */
bool track_commit_timestamp ;
/*
* When this is set , commit_ts is force - enabled during recovery . This is so
* that a standby can replay WAL records coming from a master with the setting
* enabled . ( Note that this doesn ' t enable SQL access to the data ; it ' s
* effectively write - only until the GUC itself is enabled . )
*/
static bool enable_during_recovery ;
static void SetXidCommitTsInPage ( TransactionId xid , int nsubxids ,
TransactionId * subxids , TimestampTz ts ,
RepOriginId nodeid , int pageno ) ;
@ -109,7 +109,7 @@ static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
static int ZeroCommitTsPage ( int pageno , bool writeXlog ) ;
static bool CommitTsPagePrecedes ( int page1 , int page2 ) ;
static void ActivateCommitTs ( void ) ;
static void DeactivateCommitTs ( bool do_wal ) ;
static void DeactivateCommitTs ( void ) ;
static void WriteZeroPageXlogRec ( int pageno ) ;
static void WriteTruncateXlogRec ( int pageno ) ;
static void WriteSetTimestampXlogRec ( TransactionId mainxid , int nsubxids ,
@ -149,10 +149,14 @@ TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
TransactionId newestXact ;
/*
* No - op if the module is not enabled , but allow writes in a standby
* during recovery .
* No - op if the module is not active .
*
* An unlocked read here is fine , because in a standby ( the only place
* where the flag can change in flight ) this routine is only called by
* the recovery process , which is also the only process which can change
* the flag .
*/
if ( ! track_commit_timestamp & & ! enable_during_recovery )
if ( ! commitTsShared - > commitTsActive )
return ;
/*
@ -283,30 +287,45 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
TransactionId oldestCommitTs ;
TransactionId newestCommitTs ;
/* error if the given Xid doesn't normally commit */
if ( ! TransactionIdIsNormal ( xid ) )
ereport ( ERROR ,
( errcode ( ERRCODE_INVALID_PARAMETER_VALUE ) ,
errmsg ( " cannot retrieve commit timestamp for transaction %u " , xid ) ) ) ;
LWLockAcquire ( CommitTsLock , LW_SHARED ) ;
/* Error if module not enabled */
if ( ! track_commit_timestamp )
if ( ! commitTsShared - > commitTsActive )
ereport ( ERROR ,
( errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " could not get commit timestamp data " ) ,
errhint ( " Make sure the configuration parameter \" %s \" is set. " ,
" track_commit_timestamp " ) ) ) ;
/* error if the given Xid doesn't normally commit */
if ( ! TransactionIdIsNormal ( xid ) )
ereport ( ERROR ,
( errcode ( ERRCODE_INVALID_PARAMETER_VALUE ) ,
errmsg ( " cannot retrieve commit timestamp for transaction %u " , xid ) ) ) ;
/*
* Return empty if the requested value is outside our valid range .
* If we ' re asked for the cached value , return that . Otherwise , fall
* through to read from SLRU .
*/
LWLockAcquire ( CommitTsLock , LW_SHARED ) ;
if ( commitTsShared - > xidLastCommit = = xid )
{
* ts = commitTsShared - > dataLastCommit . time ;
if ( nodeid )
* nodeid = commitTsShared - > dataLastCommit . nodeid ;
LWLockRelease ( CommitTsLock ) ;
return * ts ! = 0 ;
}
oldestCommitTs = ShmemVariableCache - > oldestCommitTs ;
newestCommitTs = ShmemVariableCache - > newestCommitTs ;
/* neither is invalid, or both are */
Assert ( TransactionIdIsValid ( oldestCommitTs ) = = TransactionIdIsValid ( newestCommitTs ) ) ;
LWLockRelease ( CommitTsLock ) ;
/*
* Return empty if the requested value is outside our valid range .
*/
if ( ! TransactionIdIsValid ( oldestCommitTs ) | |
TransactionIdPrecedes ( xid , oldestCommitTs ) | |
TransactionIdPrecedes ( newestCommitTs , xid ) )
@ -317,27 +336,6 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
return false ;
}
/*
* Use an unlocked atomic read on our cached value in shared memory ; if
* it ' s a hit , acquire a lock and read the data , after verifying that it ' s
* still what we initially read . Otherwise , fall through to read from
* SLRU .
*/
if ( commitTsShared - > xidLastCommit = = xid )
{
LWLockAcquire ( CommitTsLock , LW_SHARED ) ;
if ( commitTsShared - > xidLastCommit = = xid )
{
* ts = commitTsShared - > dataLastCommit . time ;
if ( nodeid )
* nodeid = commitTsShared - > dataLastCommit . nodeid ;
LWLockRelease ( CommitTsLock ) ;
return * ts ! = 0 ;
}
LWLockRelease ( CommitTsLock ) ;
}
/* lock is acquired by SimpleLruReadPage_ReadOnly */
slotno = SimpleLruReadPage_ReadOnly ( CommitTsCtl , pageno , xid ) ;
memcpy ( & entry ,
@ -366,15 +364,16 @@ GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
{
TransactionId xid ;
LWLockAcquire ( CommitTsLock , LW_SHARED ) ;
/* Error if module not enabled */
if ( ! track_commit_timestamp )
if ( ! commitTsShared - > commitTsActive )
ereport ( ERROR ,
( errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " could not get commit timestamp data " ) ,
errhint ( " Make sure the configuration parameter \" %s \" is set. " ,
" track_commit_timestamp " ) ) ) ;
LWLockAcquire ( CommitTsLock , LW_SHARED ) ;
xid = commitTsShared - > xidLastCommit ;
if ( ts )
* ts = commitTsShared - > dataLastCommit . time ;
@ -493,6 +492,7 @@ CommitTsShmemInit(void)
commitTsShared - > xidLastCommit = InvalidTransactionId ;
TIMESTAMP_NOBEGIN ( commitTsShared - > dataLastCommit . time ) ;
commitTsShared - > dataLastCommit . nodeid = InvalidRepOriginId ;
commitTsShared - > commitTsActive = false ;
}
else
Assert ( found ) ;
@ -566,7 +566,7 @@ CompleteCommitTsInitialization(void)
* any leftover data .
*/
if ( ! track_commit_timestamp )
DeactivateCommitTs ( true ) ;
DeactivateCommitTs ( ) ;
}
/*
@ -588,11 +588,11 @@ CommitTsParameterChange(bool newvalue, bool oldvalue)
*/
if ( newvalue )
{
if ( ! track_commit_timestamp & & ! oldvalu e)
if ( ! commitTsShared - > commitTsActiv e)
ActivateCommitTs ( ) ;
}
else if ( ! track_commit_timestamp & & oldvalu e)
DeactivateCommitTs ( false ) ;
else if ( commitTsShared - > commitTsActiv e)
DeactivateCommitTs ( ) ;
}
/*
@ -645,7 +645,7 @@ ActivateCommitTs(void)
}
LWLockRelease ( CommitTsLock ) ;
/* Finally, c reate the current segment file, if necessary */
/* C reate the current segment file, if necessary */
if ( ! SimpleLruDoesPhysicalPageExist ( CommitTsCtl , pageno ) )
{
int slotno ;
@ -657,8 +657,10 @@ ActivateCommitTs(void)
LWLockRelease ( CommitTsControlLock ) ;
}
/* We can now replay xlog records from this module */
enable_during_recovery = true ;
/* Change the activation status in shared memory. */
LWLockAcquire ( CommitTsLock , LW_EXCLUSIVE ) ;
commitTsShared - > commitTsActive = true ;
LWLockRelease ( CommitTsLock ) ;
}
/*
@ -672,21 +674,25 @@ ActivateCommitTs(void)
* possibly - invalid data ; also removes segments of old data .
*/
static void
DeactivateCommitTs ( bool do_wal )
DeactivateCommitTs ( void )
{
TransactionId xid = ShmemVariableCache - > nextXid ;
int pageno = TransactionIdToCTsPage ( xid ) ;
/*
* Re - Initialize our idea of the latest page number .
* Cleanup the status in the shared memory .
*
* We reset everything in the commitTsShared record to prevent user from
* getting confusing data about last committed transaction on the standby
* when the module was activated repeatedly on the primary .
*/
LWLockAcquire ( CommitTsControlLock , LW_EXCLUSIVE ) ;
CommitTsCtl - > shared - > latest_page_number = pageno ;
LWLockRelease ( CommitTsControlLock ) ;
LWLockAcquire ( CommitTsLock , LW_EXCLUSIVE ) ;
commitTsShared - > commitTsActive = false ;
commitTsShared - > xidLastCommit = InvalidTransactionId ;
TIMESTAMP_NOBEGIN ( commitTsShared - > dataLastCommit . time ) ;
commitTsShared - > dataLastCommit . nodeid = InvalidRepOriginId ;
ShmemVariableCache - > oldestCommitTs = InvalidTransactionId ;
ShmemVariableCache - > newestCommitTs = InvalidTransactionId ;
LWLockRelease ( CommitTsLock ) ;
/*
@ -697,10 +703,9 @@ DeactivateCommitTs(bool do_wal)
* be overwritten anyway when we wrap around , but it seems better to be
* tidy . )
*/
LWLockAcquire ( CommitTsControlLock , LW_EXCLUSIVE ) ;
( void ) SlruScanDirectory ( CommitTsCtl , SlruScanDirCbDeleteAll , NULL ) ;
/* No longer enabled on recovery */
enable_during_recovery = false ;
LWLockRelease ( CommitTsControlLock ) ;
}
/*
@ -739,8 +744,13 @@ ExtendCommitTs(TransactionId newestXact)
{
int pageno ;
/* nothing to do if module not enabled */
if ( ! track_commit_timestamp & & ! enable_during_recovery )
/*
* Nothing to do if module not enabled . Note we do an unlocked read of the
* flag here , which is okay because this routine is only called from
* GetNewTransactionId , which is never called in a standby .
*/
Assert ( ! InRecovery ) ;
if ( ! commitTsShared - > commitTsActive )
return ;
/*
@ -768,7 +778,7 @@ ExtendCommitTs(TransactionId newestXact)
* Note that we don ' t need to flush XLOG here .
*/
void
TruncateCommitTs ( TransactionId oldestXact , bool do_wal )
TruncateCommitTs ( TransactionId oldestXact )
{
int cutoffPage ;
@ -784,8 +794,7 @@ TruncateCommitTs(TransactionId oldestXact, bool do_wal)
return ; /* nothing to remove */
/* Write XLOG record */
if ( do_wal )
WriteTruncateXlogRec ( cutoffPage ) ;
WriteTruncateXlogRec ( cutoffPage ) ;
/* Now we can remove the old CommitTs segment(s) */
SimpleLruTruncate ( CommitTsCtl , cutoffPage ) ;