@ -407,6 +407,7 @@ CreateSharedProcArray(void)
procArray - > lastOverflowedXid = InvalidTransactionId ;
procArray - > lastOverflowedXid = InvalidTransactionId ;
procArray - > replication_slot_xmin = InvalidTransactionId ;
procArray - > replication_slot_xmin = InvalidTransactionId ;
procArray - > replication_slot_catalog_xmin = InvalidTransactionId ;
procArray - > replication_slot_catalog_xmin = InvalidTransactionId ;
ShmemVariableCache - > xactCompletionCount = 1 ;
}
}
allProcs = ProcGlobal - > allProcs ;
allProcs = ProcGlobal - > allProcs ;
@ -534,6 +535,9 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
/* Advance global latestCompletedXid while holding the lock */
/* Advance global latestCompletedXid while holding the lock */
MaintainLatestCompletedXid ( latestXid ) ;
MaintainLatestCompletedXid ( latestXid ) ;
/* Same with xactCompletionCount */
ShmemVariableCache - > xactCompletionCount + + ;
ProcGlobal - > xids [ proc - > pgxactoff ] = 0 ;
ProcGlobal - > xids [ proc - > pgxactoff ] = 0 ;
ProcGlobal - > subxidStates [ proc - > pgxactoff ] . overflowed = false ;
ProcGlobal - > subxidStates [ proc - > pgxactoff ] . overflowed = false ;
ProcGlobal - > subxidStates [ proc - > pgxactoff ] . count = 0 ;
ProcGlobal - > subxidStates [ proc - > pgxactoff ] . count = 0 ;
@ -667,6 +671,7 @@ ProcArrayEndTransactionInternal(PGPROC *proc, TransactionId latestXid)
{
{
size_t pgxactoff = proc - > pgxactoff ;
size_t pgxactoff = proc - > pgxactoff ;
Assert ( LWLockHeldByMe ( ProcArrayLock ) ) ;
Assert ( TransactionIdIsValid ( ProcGlobal - > xids [ pgxactoff ] ) ) ;
Assert ( TransactionIdIsValid ( ProcGlobal - > xids [ pgxactoff ] ) ) ;
Assert ( ProcGlobal - > xids [ pgxactoff ] = = proc - > xid ) ;
Assert ( ProcGlobal - > xids [ pgxactoff ] = = proc - > xid ) ;
@ -698,6 +703,9 @@ ProcArrayEndTransactionInternal(PGPROC *proc, TransactionId latestXid)
/* Also advance global latestCompletedXid while holding the lock */
/* Also advance global latestCompletedXid while holding the lock */
MaintainLatestCompletedXid ( latestXid ) ;
MaintainLatestCompletedXid ( latestXid ) ;
/* Same with xactCompletionCount */
ShmemVariableCache - > xactCompletionCount + + ;
}
}
/*
/*
@ -1916,6 +1924,93 @@ GetMaxSnapshotSubxidCount(void)
return TOTAL_MAX_CACHED_SUBXIDS ;
return TOTAL_MAX_CACHED_SUBXIDS ;
}
}
/*
* Initialize old_snapshot_threshold specific parts of a newly build snapshot .
*/
static void
GetSnapshotDataInitOldSnapshot ( Snapshot snapshot )
{
if ( ! OldSnapshotThresholdActive ( ) )
{
/*
* If not using " snapshot too old " feature , fill related fields with
* dummy values that don ' t require any locking .
*/
snapshot - > lsn = InvalidXLogRecPtr ;
snapshot - > whenTaken = 0 ;
}
else
{
/*
* Capture the current time and WAL stream location in case this
* snapshot becomes old enough to need to fall back on the special
* " old snapshot " logic .
*/
snapshot - > lsn = GetXLogInsertRecPtr ( ) ;
snapshot - > whenTaken = GetSnapshotCurrentTimestamp ( ) ;
MaintainOldSnapshotTimeMapping ( snapshot - > whenTaken , snapshot - > xmin ) ;
}
}
/*
* Helper function for GetSnapshotData ( ) that checks if the bulk of the
* visibility information in the snapshot is still valid . If so , it updates
* the fields that need to change and returns true . Otherwise it returns
* false .
*
* This very likely can be evolved to not need ProcArrayLock held ( at very
* least in the case we already hold a snapshot ) , but that ' s for another day .
*/
static bool
GetSnapshotDataReuse ( Snapshot snapshot )
{
uint64 curXactCompletionCount ;
Assert ( LWLockHeldByMe ( ProcArrayLock ) ) ;
if ( unlikely ( snapshot - > snapXactCompletionCount = = 0 ) )
return false ;
curXactCompletionCount = ShmemVariableCache - > xactCompletionCount ;
if ( curXactCompletionCount ! = snapshot - > snapXactCompletionCount )
return false ;
/*
* If the current xactCompletionCount is still the same as it was at the
* time the snapshot was built , we can be sure that rebuilding the
* contents of the snapshot the hard way would result in the same snapshot
* contents :
*
* As explained in transam / README , the set of xids considered running by
* GetSnapshotData ( ) cannot change while ProcArrayLock is held . Snapshot
* contents only depend on transactions with xids and xactCompletionCount
* is incremented whenever a transaction with an xid finishes ( while
* holding ProcArrayLock ) exclusively ) . Thus the xactCompletionCount check
* ensures we would detect if the snapshot would have changed .
*
* As the snapshot contents are the same as it was before , it is is safe
* to re - enter the snapshot ' s xmin into the PGPROC array . None of the rows
* visible under the snapshot could already have been removed ( that ' d
* require the set of running transactions to change ) and it fulfills the
* requirement that concurrent GetSnapshotData ( ) calls yield the same
* xmin .
*/
if ( ! TransactionIdIsValid ( MyProc - > xmin ) )
MyProc - > xmin = TransactionXmin = snapshot - > xmin ;
RecentXmin = snapshot - > xmin ;
Assert ( TransactionIdPrecedesOrEquals ( TransactionXmin , RecentXmin ) ) ;
snapshot - > curcid = GetCurrentCommandId ( false ) ;
snapshot - > active_count = 0 ;
snapshot - > regd_count = 0 ;
snapshot - > copied = false ;
GetSnapshotDataInitOldSnapshot ( snapshot ) ;
return true ;
}
/*
/*
* GetSnapshotData - - returns information about running transactions .
* GetSnapshotData - - returns information about running transactions .
*
*
@ -1963,6 +2058,7 @@ GetSnapshotData(Snapshot snapshot)
TransactionId oldestxid ;
TransactionId oldestxid ;
int mypgxactoff ;
int mypgxactoff ;
TransactionId myxid ;
TransactionId myxid ;
uint64 curXactCompletionCount ;
TransactionId replication_slot_xmin = InvalidTransactionId ;
TransactionId replication_slot_xmin = InvalidTransactionId ;
TransactionId replication_slot_catalog_xmin = InvalidTransactionId ;
TransactionId replication_slot_catalog_xmin = InvalidTransactionId ;
@ -2007,12 +2103,19 @@ GetSnapshotData(Snapshot snapshot)
*/
*/
LWLockAcquire ( ProcArrayLock , LW_SHARED ) ;
LWLockAcquire ( ProcArrayLock , LW_SHARED ) ;
if ( GetSnapshotDataReuse ( snapshot ) )
{
LWLockRelease ( ProcArrayLock ) ;
return snapshot ;
}
latest_completed = ShmemVariableCache - > latestCompletedXid ;
latest_completed = ShmemVariableCache - > latestCompletedXid ;
mypgxactoff = MyProc - > pgxactoff ;
mypgxactoff = MyProc - > pgxactoff ;
myxid = other_xids [ mypgxactoff ] ;
myxid = other_xids [ mypgxactoff ] ;
Assert ( myxid = = MyProc - > xid ) ;
Assert ( myxid = = MyProc - > xid ) ;
oldestxid = ShmemVariableCache - > oldestXid ;
oldestxid = ShmemVariableCache - > oldestXid ;
curXactCompletionCount = ShmemVariableCache - > xactCompletionCount ;
/* xmax is always latestCompletedXid + 1 */
/* xmax is always latestCompletedXid + 1 */
xmax = XidFromFullTransactionId ( latest_completed ) ;
xmax = XidFromFullTransactionId ( latest_completed ) ;
@ -2266,6 +2369,7 @@ GetSnapshotData(Snapshot snapshot)
snapshot - > xcnt = count ;
snapshot - > xcnt = count ;
snapshot - > subxcnt = subcount ;
snapshot - > subxcnt = subcount ;
snapshot - > suboverflowed = suboverflowed ;
snapshot - > suboverflowed = suboverflowed ;
snapshot - > snapXactCompletionCount = curXactCompletionCount ;
snapshot - > curcid = GetCurrentCommandId ( false ) ;
snapshot - > curcid = GetCurrentCommandId ( false ) ;
@ -2277,26 +2381,7 @@ GetSnapshotData(Snapshot snapshot)
snapshot - > regd_count = 0 ;
snapshot - > regd_count = 0 ;
snapshot - > copied = false ;
snapshot - > copied = false ;
if ( old_snapshot_threshold < 0 )
GetSnapshotDataInitOldSnapshot ( snapshot ) ;
{
/*
* If not using " snapshot too old " feature , fill related fields with
* dummy values that don ' t require any locking .
*/
snapshot - > lsn = InvalidXLogRecPtr ;
snapshot - > whenTaken = 0 ;
}
else
{
/*
* Capture the current time and WAL stream location in case this
* snapshot becomes old enough to need to fall back on the special
* " old snapshot " logic .
*/
snapshot - > lsn = GetXLogInsertRecPtr ( ) ;
snapshot - > whenTaken = GetSnapshotCurrentTimestamp ( ) ;
MaintainOldSnapshotTimeMapping ( snapshot - > whenTaken , xmin ) ;
}
return snapshot ;
return snapshot ;
}
}