@ -46,20 +46,82 @@
# include "access/transam.h"
# include "access/xact.h"
# include "access/xlog.h"
# include "catalog/catalog.h"
# include "lib/pairingheap.h"
# include "miscadmin.h"
# include "storage/predicate.h"
# include "storage/proc.h"
# include "storage/procarray.h"
# include "storage/sinval.h"
# include "storage/spin.h"
# include "utils/builtins.h"
# include "utils/memutils.h"
# include "utils/rel.h"
# include "utils/resowner_private.h"
# include "utils/snapmgr.h"
# include "utils/syscache.h"
# include "utils/tqual.h"
/*
* GUC parameters
*/
int old_snapshot_threshold ; /* number of minutes, -1 disables */
/*
* Structure for dealing with old_snapshot_threshold implementation .
*/
typedef struct OldSnapshotControlData
{
/*
* Variables for old snapshot handling are shared among processes and are
* only allowed to move forward .
*/
slock_t mutex_current ; /* protect current timestamp */
int64 current_timestamp ; /* latest snapshot timestamp */
slock_t mutex_latest_xmin ; /* protect latest snapshot xmin */
TransactionId latest_xmin ; /* latest snapshot xmin */
slock_t mutex_threshold ; /* protect threshold fields */
int64 threshold_timestamp ; /* earlier snapshot is old */
TransactionId threshold_xid ; /* earlier xid may be gone */
/*
* Keep one xid per minute for old snapshot error handling .
*
* Use a circular buffer with a head offset , a count of entries currently
* used , and a timestamp corresponding to the xid at the head offset . A
* count_used value of zero means that there are no times stored ; a
* count_used value of old_snapshot_threshold means that the buffer is
* full and the head must be advanced to add new entries . Use timestamps
* aligned to minute boundaries , since that seems less surprising than
* aligning based on the first usage timestamp .
*
* It is OK if the xid for a given time slot is from earlier than
* calculated by adding the number of minutes corresponding to the
* ( possibly wrapped ) distance from the head offset to the time of the
* head entry , since that just results in the vacuuming of old tuples
* being slightly less aggressive . It would not be OK for it to be off in
* the other direction , since it might result in vacuuming tuples that are
* still expected to be there .
*
* Use of an SLRU was considered but not chosen because it is more
* heavyweight than is needed for this , and would probably not be any less
* code to implement .
*
* Persistence is not needed .
*/
int head_offset ; /* subscript of oldest tracked time */
int64 head_timestamp ; /* time corresponding to head xid */
int count_used ; /* how many slots are in use */
TransactionId xid_by_minute [ FLEXIBLE_ARRAY_MEMBER ] ;
} OldSnapshotControlData ;
typedef struct OldSnapshotControlData * OldSnapshotControl ;
static volatile OldSnapshotControl oldSnapshotControl ;
/*
* CurrentSnapshot points to the only snapshot taken in transaction - snapshot
* mode , and to the latest one taken in a read - committed transaction .
@ -153,6 +215,7 @@ static Snapshot FirstXactSnapshot = NULL;
static List * exportedSnapshots = NIL ;
/* Prototypes for local functions */
static int64 AlignTimestampToMinuteBoundary ( int64 ts ) ;
static Snapshot CopySnapshot ( Snapshot snapshot ) ;
static void FreeSnapshot ( Snapshot snapshot ) ;
static void SnapshotResetXmin ( void ) ;
@ -174,6 +237,49 @@ typedef struct SerializedSnapshotData
CommandId curcid ;
} SerializedSnapshotData ;
Size
SnapMgrShmemSize ( void )
{
Size size ;
size = offsetof ( OldSnapshotControlData , xid_by_minute ) ;
if ( old_snapshot_threshold > 0 )
size = add_size ( size , mul_size ( sizeof ( TransactionId ) ,
old_snapshot_threshold ) ) ;
return size ;
}
/*
* Initialize for managing old snapshot detection .
*/
void
SnapMgrInit ( void )
{
bool found ;
/*
* Create or attach to the OldSnapshotControl structure .
*/
oldSnapshotControl = ( OldSnapshotControl )
ShmemInitStruct ( " OldSnapshotControlData " ,
SnapMgrShmemSize ( ) , & found ) ;
if ( ! found )
{
SpinLockInit ( & oldSnapshotControl - > mutex_current ) ;
oldSnapshotControl - > current_timestamp = 0 ;
SpinLockInit ( & oldSnapshotControl - > mutex_latest_xmin ) ;
oldSnapshotControl - > latest_xmin = InvalidTransactionId ;
SpinLockInit ( & oldSnapshotControl - > mutex_threshold ) ;
oldSnapshotControl - > threshold_timestamp = 0 ;
oldSnapshotControl - > threshold_xid = InvalidTransactionId ;
oldSnapshotControl - > head_offset = 0 ;
oldSnapshotControl - > head_timestamp = 0 ;
oldSnapshotControl - > count_used = 0 ;
}
}
/*
* GetTransactionSnapshot
* Get the appropriate snapshot for a new query in a transaction .
@ -1405,6 +1511,304 @@ ThereAreNoPriorRegisteredSnapshots(void)
return false ;
}
/*
* Return an int64 timestamp which is exactly on a minute boundary .
*
* If the argument is already aligned , return that value , otherwise move to
* the next minute boundary following the given time .
*/
static int64
AlignTimestampToMinuteBoundary ( int64 ts )
{
int64 retval = ts + ( USECS_PER_MINUTE - 1 ) ;
return retval - ( retval % USECS_PER_MINUTE ) ;
}
/*
* Get current timestamp for snapshots as int64 that never moves backward .
*/
int64
GetSnapshotCurrentTimestamp ( void )
{
int64 now = GetCurrentIntegerTimestamp ( ) ;
/*
* Don ' t let time move backward ; if it hasn ' t advanced , use the old value .
*/
SpinLockAcquire ( & oldSnapshotControl - > mutex_current ) ;
if ( now < = oldSnapshotControl - > current_timestamp )
now = oldSnapshotControl - > current_timestamp ;
else
oldSnapshotControl - > current_timestamp = now ;
SpinLockRelease ( & oldSnapshotControl - > mutex_current ) ;
return now ;
}
/*
* Get timestamp through which vacuum may have processed based on last stored
* value for threshold_timestamp .
*
* XXX : So far , we never trust that a 64 - bit value can be read atomically ; if
* that ever changes , we could get rid of the spinlock here .
*/
int64
GetOldSnapshotThresholdTimestamp ( void )
{
int64 threshold_timestamp ;
SpinLockAcquire ( & oldSnapshotControl - > mutex_threshold ) ;
threshold_timestamp = oldSnapshotControl - > threshold_timestamp ;
SpinLockRelease ( & oldSnapshotControl - > mutex_threshold ) ;
return threshold_timestamp ;
}
static void
SetOldSnapshotThresholdTimestamp ( int64 ts , TransactionId xlimit )
{
SpinLockAcquire ( & oldSnapshotControl - > mutex_threshold ) ;
oldSnapshotControl - > threshold_timestamp = ts ;
oldSnapshotControl - > threshold_xid = xlimit ;
SpinLockRelease ( & oldSnapshotControl - > mutex_threshold ) ;
}
/*
* TransactionIdLimitedForOldSnapshots
*
* Apply old snapshot limit , if any . This is intended to be called for page
* pruning and table vacuuming , to allow old_snapshot_threshold to override
* the normal global xmin value . Actual testing for snapshot too old will be
* based on whether a snapshot timestamp is prior to the threshold timestamp
* set in this function .
*/
TransactionId
TransactionIdLimitedForOldSnapshots ( TransactionId recentXmin ,
Relation relation )
{
if ( TransactionIdIsNormal ( recentXmin )
& & old_snapshot_threshold > = 0
& & RelationNeedsWAL ( relation )
& & ! IsCatalogRelation ( relation )
& & ! RelationIsAccessibleInLogicalDecoding ( relation ) )
{
int64 ts = GetSnapshotCurrentTimestamp ( ) ;
TransactionId xlimit = recentXmin ;
TransactionId latest_xmin = oldSnapshotControl - > latest_xmin ;
bool same_ts_as_threshold = false ;
/*
* Zero threshold always overrides to latest xmin , if valid . Without
* some heuristic it will find its own snapshot too old on , for
* example , a simple UPDATE - - which would make it useless for most
* testing , but there is no principled way to ensure that it doesn ' t
* fail in this way . Use a five - second delay to try to get useful
* testing behavior , but this may need adjustment .
*/
if ( old_snapshot_threshold = = 0 )
{
if ( TransactionIdPrecedes ( latest_xmin , MyPgXact - > xmin )
& & TransactionIdFollows ( latest_xmin , xlimit ) )
xlimit = latest_xmin ;
ts - = 5 * USECS_PER_SEC ;
SetOldSnapshotThresholdTimestamp ( ts , xlimit ) ;
return xlimit ;
}
ts = AlignTimestampToMinuteBoundary ( ts )
- ( old_snapshot_threshold * USECS_PER_MINUTE ) ;
/* Check for fast exit without LW locking. */
SpinLockAcquire ( & oldSnapshotControl - > mutex_threshold ) ;
if ( ts = = oldSnapshotControl - > threshold_timestamp )
{
xlimit = oldSnapshotControl - > threshold_xid ;
same_ts_as_threshold = true ;
}
SpinLockRelease ( & oldSnapshotControl - > mutex_threshold ) ;
if ( ! same_ts_as_threshold )
{
LWLockAcquire ( OldSnapshotTimeMapLock , LW_SHARED ) ;
if ( oldSnapshotControl - > count_used > 0
& & ts > = oldSnapshotControl - > head_timestamp )
{
int offset ;
offset = ( ( ts - oldSnapshotControl - > head_timestamp )
/ USECS_PER_MINUTE ) ;
if ( offset > oldSnapshotControl - > count_used - 1 )
offset = oldSnapshotControl - > count_used - 1 ;
offset = ( oldSnapshotControl - > head_offset + offset )
% old_snapshot_threshold ;
xlimit = oldSnapshotControl - > xid_by_minute [ offset ] ;
if ( NormalTransactionIdFollows ( xlimit , recentXmin ) )
SetOldSnapshotThresholdTimestamp ( ts , xlimit ) ;
}
LWLockRelease ( OldSnapshotTimeMapLock ) ;
}
/*
* Failsafe protection against vacuuming work of active transaction .
*
* This is not an assertion because we avoid the spinlock for
* performance , leaving open the possibility that xlimit could advance
* and be more current ; but it seems prudent to apply this limit . It
* might make pruning a tiny bit less agressive than it could be , but
* protects against data loss bugs .
*/
if ( TransactionIdIsNormal ( latest_xmin )
& & TransactionIdPrecedes ( latest_xmin , xlimit ) )
xlimit = latest_xmin ;
if ( NormalTransactionIdFollows ( xlimit , recentXmin ) )
return xlimit ;
}
return recentXmin ;
}
/*
* Take care of the circular buffer that maps time to xid .
*/
void
MaintainOldSnapshotTimeMapping ( int64 whenTaken , TransactionId xmin )
{
int64 ts ;
/* Fast exit when old_snapshot_threshold is not used. */
if ( old_snapshot_threshold < 0 )
return ;
/* Keep track of the latest xmin seen by any process. */
SpinLockAcquire ( & oldSnapshotControl - > mutex_latest_xmin ) ;
if ( TransactionIdFollows ( xmin , oldSnapshotControl - > latest_xmin ) )
oldSnapshotControl - > latest_xmin = xmin ;
SpinLockRelease ( & oldSnapshotControl - > mutex_latest_xmin ) ;
/* No further tracking needed for 0 (used for testing). */
if ( old_snapshot_threshold = = 0 )
return ;
/*
* We don ' t want to do something stupid with unusual values , but we don ' t
* want to litter the log with warnings or break otherwise normal
* processing for this feature ; so if something seems unreasonable , just
* log at DEBUG level and return without doing anything .
*/
if ( whenTaken < 0 )
{
elog ( DEBUG1 ,
" MaintainOldSnapshotTimeMapping called with negative whenTaken = %ld " ,
( long ) whenTaken ) ;
return ;
}
if ( ! TransactionIdIsNormal ( xmin ) )
{
elog ( DEBUG1 ,
" MaintainOldSnapshotTimeMapping called with xmin = %lu " ,
( unsigned long ) xmin ) ;
return ;
}
ts = AlignTimestampToMinuteBoundary ( whenTaken ) ;
LWLockAcquire ( OldSnapshotTimeMapLock , LW_EXCLUSIVE ) ;
Assert ( oldSnapshotControl - > head_offset > = 0 ) ;
Assert ( oldSnapshotControl - > head_offset < old_snapshot_threshold ) ;
Assert ( ( oldSnapshotControl - > head_timestamp % USECS_PER_MINUTE ) = = 0 ) ;
Assert ( oldSnapshotControl - > count_used > = 0 ) ;
Assert ( oldSnapshotControl - > count_used < = old_snapshot_threshold ) ;
if ( oldSnapshotControl - > count_used = = 0 )
{
/* set up first entry for empty mapping */
oldSnapshotControl - > head_offset = 0 ;
oldSnapshotControl - > head_timestamp = ts ;
oldSnapshotControl - > count_used = 1 ;
oldSnapshotControl - > xid_by_minute [ 0 ] = xmin ;
}
else if ( ts < oldSnapshotControl - > head_timestamp )
{
/* old ts; log it at DEBUG */
LWLockRelease ( OldSnapshotTimeMapLock ) ;
elog ( DEBUG1 ,
" MaintainOldSnapshotTimeMapping called with old whenTaken = %ld " ,
( long ) whenTaken ) ;
return ;
}
else if ( ts < = ( oldSnapshotControl - > head_timestamp +
( ( oldSnapshotControl - > count_used - 1 )
* USECS_PER_MINUTE ) ) )
{
/* existing mapping; advance xid if possible */
int bucket = ( oldSnapshotControl - > head_offset
+ ( ( ts - oldSnapshotControl - > head_timestamp )
/ USECS_PER_MINUTE ) )
% old_snapshot_threshold ;
if ( TransactionIdPrecedes ( oldSnapshotControl - > xid_by_minute [ bucket ] , xmin ) )
oldSnapshotControl - > xid_by_minute [ bucket ] = xmin ;
}
else
{
/* We need a new bucket, but it might not be the very next one. */
int advance = ( ( ts - oldSnapshotControl - > head_timestamp )
/ USECS_PER_MINUTE ) ;
oldSnapshotControl - > head_timestamp = ts ;
if ( advance > = old_snapshot_threshold )
{
/* Advance is so far that all old data is junk; start over. */
oldSnapshotControl - > head_offset = 0 ;
oldSnapshotControl - > count_used = 1 ;
oldSnapshotControl - > xid_by_minute [ 0 ] = xmin ;
}
else
{
/* Store the new value in one or more buckets. */
int i ;
for ( i = 0 ; i < advance ; i + + )
{
if ( oldSnapshotControl - > count_used = = old_snapshot_threshold )
{
/* Map full and new value replaces old head. */
int old_head = oldSnapshotControl - > head_offset ;
if ( old_head = = ( old_snapshot_threshold - 1 ) )
oldSnapshotControl - > head_offset = 0 ;
else
oldSnapshotControl - > head_offset = old_head + 1 ;
oldSnapshotControl - > xid_by_minute [ old_head ] = xmin ;
}
else
{
/* Extend map to unused entry. */
int new_tail = ( oldSnapshotControl - > head_offset
+ oldSnapshotControl - > count_used )
% old_snapshot_threshold ;
oldSnapshotControl - > count_used + + ;
oldSnapshotControl - > xid_by_minute [ new_tail ] = xmin ;
}
}
}
}
LWLockRelease ( OldSnapshotTimeMapLock ) ;
}
/*
* Setup a snapshot that replaces normal catalog snapshots that allows catalog
* access to behave just like it did at a certain point in the past .