@ -241,6 +241,33 @@ struct SnapBuild
*/
TransactionId * xip ;
} committed ;
/*
* Array of transactions and subtransactions that had modified catalogs
* and were running when the snapshot was serialized .
*
* We normally rely on some WAL record types such as HEAP2_NEW_CID to know
* if the transaction has changed the catalog . But it could happen that
* the logical decoding decodes only the commit record of the transaction
* after restoring the previously serialized snapshot in which case we
* will miss adding the xid to the snapshot and end up looking at the
* catalogs with the wrong snapshot .
*
* Now to avoid the above problem , we serialize the transactions that had
* modified the catalogs and are still running at the time of snapshot
* serialization . We fill this array while restoring the snapshot and then
* refer it while decoding commit to ensure if the xact has modified the
* catalog . We discard this array when all the xids in the list become old
* enough to matter . See SnapBuildPurgeOlderTxn for details .
*/
struct
{
/* number of transactions */
size_t xcnt ;
/* This array must be sorted in xidComparator order */
TransactionId * xip ;
} catchange ;
} ;
/*
@ -250,8 +277,8 @@ struct SnapBuild
static ResourceOwner SavedResourceOwnerDuringExport = NULL ;
static bool ExportInProgress = false ;
/* ->committed manipulation */
static void SnapBuildPurgeCommitted Txn ( SnapBuild * builder ) ;
/* ->committed and ->catchange manipulation */
static void SnapBuildPurgeOlder Txn ( SnapBuild * builder ) ;
/* snapshot building/manipulation/distribution functions */
static Snapshot SnapBuildBuildSnapshot ( SnapBuild * builder ) ;
@ -262,6 +289,9 @@ static void SnapBuildSnapIncRefcount(Snapshot snap);
static void SnapBuildDistributeNewCatalogSnapshot ( SnapBuild * builder , XLogRecPtr lsn ) ;
static inline bool SnapBuildXidHasCatalogChanges ( SnapBuild * builder , TransactionId xid ,
uint32 xinfo ) ;
/* xlog reading helper functions for SnapBuildProcessRunningXacts */
static bool SnapBuildFindSnapshot ( SnapBuild * builder , XLogRecPtr lsn , xl_running_xacts * running ) ;
static void SnapBuildWaitSnapshot ( xl_running_xacts * running , TransactionId cutoff ) ;
@ -269,6 +299,7 @@ static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutof
/* serialization functions */
static void SnapBuildSerialize ( SnapBuild * builder , XLogRecPtr lsn ) ;
static bool SnapBuildRestore ( SnapBuild * builder , XLogRecPtr lsn ) ;
static void SnapBuildRestoreContents ( int fd , char * dest , Size size , const char * path ) ;
/*
* Allocate a new snapshot builder .
@ -306,6 +337,9 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
palloc0 ( builder - > committed . xcnt_space * sizeof ( TransactionId ) ) ;
builder - > committed . includes_all_transactions = true ;
builder - > catchange . xcnt = 0 ;
builder - > catchange . xip = NULL ;
builder - > initial_xmin_horizon = xmin_horizon ;
builder - > start_decoding_at = start_lsn ;
builder - > building_full_snapshot = need_full_snapshot ;
@ -888,12 +922,17 @@ SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
}
/*
* Remove knowledge about transactions we treat as committed that are smaller
* than - > xmin . Those won ' t ever get checked via the - > committed array but via
* the clog machinery , so we don ' t need to waste memory on them .
* Remove knowledge about transactions we treat as committed or containing catalog
* changes that are smaller than - > xmin . Those won ' t ever get checked via
* the - > committed or - > catchange array , respectively . The committed xids will
* get checked via the clog machinery .
*
* We can ideally remove the transaction from catchange array once it is
* finished ( committed / aborted ) but that could be costly as we need to maintain
* the xids order in the array .
*/
static void
SnapBuildPurgeCommittedTxn ( SnapBuild * builder )
SnapBuildPurgeOlder Txn ( SnapBuild * builder )
{
int off ;
TransactionId * workspace ;
@ -928,6 +967,30 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder)
builder - > committed . xcnt = surviving_xids ;
pfree ( workspace ) ;
/*
* Either all the xacts got purged or none . It is only possible to
* partially remove the xids from this array if one or more of the xids
* are still running but not all . That can happen if we start decoding
* from a point ( LSN where the snapshot state became consistent ) where all
* the xacts in this were running and then at least one of those got
* committed and a few are still running . We will never start from such a
* point because we won ' t move the slot ' s restart_lsn past the point where
* the oldest running transaction ' s restart_decoding_lsn is .
*/
if ( builder - > catchange . xcnt = = 0 | |
TransactionIdFollowsOrEquals ( builder - > catchange . xip [ 0 ] ,
builder - > xmin ) )
return ;
Assert ( TransactionIdFollows ( builder - > xmin ,
builder - > catchange . xip [ builder - > catchange . xcnt - 1 ] ) ) ;
pfree ( builder - > catchange . xip ) ;
builder - > catchange . xip = NULL ;
builder - > catchange . xcnt = 0 ;
elog ( DEBUG3 , " purged catalog modifying transactions, oldest running xid %u " ,
builder - > xmin ) ;
}
/*
@ -935,7 +998,7 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder)
*/
void
SnapBuildCommitTxn ( SnapBuild * builder , XLogRecPtr lsn , TransactionId xid ,
int nsubxacts , TransactionId * subxacts )
int nsubxacts , TransactionId * subxacts , uint32 xinfo )
{
int nxact ;
@ -983,7 +1046,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
* Add subtransaction to base snapshot if catalog modifying , we don ' t
* distinguish to toplevel transactions there .
*/
if ( ReorderBufferXidHasCatalogChanges ( builder - > reorder , subxid ) )
if ( SnapBuildXidHasCatalogChanges ( builder , subxid , xinfo ) )
{
sub_needs_timetravel = true ;
needs_snapshot = true ;
@ -1012,7 +1075,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
}
/* if top-level modified catalog, it'll need a snapshot */
if ( ReorderBufferXidHasCatalogChanges ( builder - > reorder , xid ) )
if ( SnapBuildXidHasCatalogChanges ( builder , xid , xinfo ) )
{
elog ( DEBUG2 , " found top level transaction %u, with catalog changes " ,
xid ) ;
@ -1089,6 +1152,29 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
}
}
/*
* Check the reorder buffer and the snapshot to see if the given transaction has
* modified catalogs .
*/
static inline bool
SnapBuildXidHasCatalogChanges ( SnapBuild * builder , TransactionId xid ,
uint32 xinfo )
{
if ( ReorderBufferXidHasCatalogChanges ( builder - > reorder , xid ) )
return true ;
/*
* The transactions that have changed catalogs must have invalidation
* info .
*/
if ( ! ( xinfo & XACT_XINFO_HAS_INVALS ) )
return false ;
/* Check the catchange XID array */
return ( ( builder - > catchange . xcnt > 0 ) & &
( bsearch ( & xid , builder - > catchange . xip , builder - > catchange . xcnt ,
sizeof ( TransactionId ) , xidComparator ) ! = NULL ) ) ;
}
/* -----------------------------------
* Snapshot building functions dealing with xlog records
@ -1135,7 +1221,7 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
builder - > xmin = running - > oldestRunningXid ;
/* Remove transactions we don't need to keep track off anymore */
SnapBuildPurgeCommitted Txn ( builder ) ;
SnapBuildPurgeOlder Txn ( builder ) ;
/*
* Advance the xmin limit for the current replication slot , to allow
@ -1438,6 +1524,7 @@ SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
*
* struct SnapBuildOnDisk ;
* TransactionId * committed . xcnt ; ( * not xcnt_space * )
* TransactionId * catchange . xcnt ;
*
*/
typedef struct SnapBuildOnDisk
@ -1467,7 +1554,7 @@ typedef struct SnapBuildOnDisk
offsetof ( SnapBuildOnDisk , version )
# define SNAPBUILD_MAGIC 0x51A1E001
# define SNAPBUILD_VERSION 4
# define SNAPBUILD_VERSION 5
/*
* Store / Load a snapshot from disk , depending on the snapshot builder ' s state .
@ -1493,6 +1580,9 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
{
Size needed_length ;
SnapBuildOnDisk * ondisk = NULL ;
TransactionId * catchange_xip = NULL ;
MemoryContext old_ctx ;
size_t catchange_xcnt ;
char * ondisk_c ;
int fd ;
char tmppath [ MAXPGPATH ] ;
@ -1578,10 +1668,16 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
( errcode_for_file_access ( ) ,
errmsg ( " could not remove file \" %s \" : %m " , tmppath ) ) ) ;
old_ctx = MemoryContextSwitchTo ( builder - > context ) ;
/* Get the catalog modifying transactions that are yet not committed */
catchange_xip = ReorderBufferGetCatalogChangesXacts ( builder - > reorder ) ;
catchange_xcnt = builder - > reorder - > catchange_ntxns ;
needed_length = sizeof ( SnapBuildOnDisk ) +
sizeof ( TransactionId ) * builder - > committed . xcnt ;
sizeof ( TransactionId ) * ( builder - > committed . xcnt + catchange_xcnt ) ;
ondisk_c = MemoryContextAllocZero ( builder - > context , needed_length ) ;
ondisk_c = palloc0 ( needed_length ) ;
ondisk = ( SnapBuildOnDisk * ) ondisk_c ;
ondisk - > magic = SNAPBUILD_MAGIC ;
ondisk - > version = SNAPBUILD_VERSION ;
@ -1598,16 +1694,31 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
ondisk - > builder . snapshot = NULL ;
ondisk - > builder . reorder = NULL ;
ondisk - > builder . committed . xip = NULL ;
ondisk - > builder . catchange . xip = NULL ;
/* update catchange only on disk data */
ondisk - > builder . catchange . xcnt = catchange_xcnt ;
COMP_CRC32C ( ondisk - > checksum ,
& ondisk - > builder ,
sizeof ( SnapBuild ) ) ;
/* copy committed xacts */
sz = sizeof ( TransactionId ) * builder - > committed . xcnt ;
memcpy ( ondisk_c , builder - > committed . xip , sz ) ;
COMP_CRC32C ( ondisk - > checksum , ondisk_c , sz ) ;
ondisk_c + = sz ;
if ( builder - > committed . xcnt > 0 )
{
sz = sizeof ( TransactionId ) * builder - > committed . xcnt ;
memcpy ( ondisk_c , builder - > committed . xip , sz ) ;
COMP_CRC32C ( ondisk - > checksum , ondisk_c , sz ) ;
ondisk_c + = sz ;
}
/* copy catalog modifying xacts */
if ( catchange_xcnt > 0 )
{
sz = sizeof ( TransactionId ) * catchange_xcnt ;
memcpy ( ondisk_c , catchange_xip , sz ) ;
COMP_CRC32C ( ondisk - > checksum , ondisk_c , sz ) ;
ondisk_c + = sz ;
}
FIN_CRC32C ( ondisk - > checksum ) ;
@ -1688,12 +1799,16 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
*/
builder - > last_serialized_snapshot = lsn ;
MemoryContextSwitchTo ( old_ctx ) ;
out :
ReorderBufferSetRestartPoint ( builder - > reorder ,
builder - > last_serialized_snapshot ) ;
/* be tidy */
if ( ondisk )
pfree ( ondisk ) ;
if ( catchange_xip )
pfree ( catchange_xip ) ;
}
/*
@ -1707,7 +1822,6 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
int fd ;
char path [ MAXPGPATH ] ;
Size sz ;
int readBytes ;
pg_crc32c checksum ;
/* no point in loading a snapshot if we're already there */
@ -1739,29 +1853,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
/* read statically sized portion of snapshot */
pgstat_report_wait_start ( WAIT_EVENT_SNAPBUILD_READ ) ;
readBytes = read ( fd , & ondisk , SnapBuildOnDiskConstantSize ) ;
pgstat_report_wait_end ( ) ;
if ( readBytes ! = SnapBuildOnDiskConstantSize )
{
int save_errno = errno ;
CloseTransientFile ( fd ) ;
if ( readBytes < 0 )
{
errno = save_errno ;
ereport ( ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " could not read file \" %s \" : %m " , path ) ) ) ;
}
else
ereport ( ERROR ,
( errcode ( ERRCODE_DATA_CORRUPTED ) ,
errmsg ( " could not read file \" %s \" : read %d of %zu " ,
path , readBytes ,
( Size ) SnapBuildOnDiskConstantSize ) ) ) ;
}
SnapBuildRestoreContents ( fd , ( char * ) & ondisk , SnapBuildOnDiskConstantSize , path ) ;
if ( ondisk . magic ! = SNAPBUILD_MAGIC )
ereport ( ERROR ,
@ -1781,56 +1873,26 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize ) ;
/* read SnapBuild */
pgstat_report_wait_start ( WAIT_EVENT_SNAPBUILD_READ ) ;
readBytes = read ( fd , & ondisk . builder , sizeof ( SnapBuild ) ) ;
pgstat_report_wait_end ( ) ;
if ( readBytes ! = sizeof ( SnapBuild ) )
{
int save_errno = errno ;
CloseTransientFile ( fd ) ;
if ( readBytes < 0 )
{
errno = save_errno ;
ereport ( ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " could not read file \" %s \" : %m " , path ) ) ) ;
}
else
ereport ( ERROR ,
( errcode ( ERRCODE_DATA_CORRUPTED ) ,
errmsg ( " could not read file \" %s \" : read %d of %zu " ,
path , readBytes , sizeof ( SnapBuild ) ) ) ) ;
}
SnapBuildRestoreContents ( fd , ( char * ) & ondisk . builder , sizeof ( SnapBuild ) , path ) ;
COMP_CRC32C ( checksum , & ondisk . builder , sizeof ( SnapBuild ) ) ;
/* restore committed xacts information */
sz = sizeof ( TransactionId ) * ondisk . builder . committed . xcnt ;
ondisk . builder . committed . xip = MemoryContextAllocZero ( builder - > context , sz ) ;
pgstat_report_wait_start ( WAIT_EVENT_SNAPBUILD_READ ) ;
readBytes = read ( fd , ondisk . builder . committed . xip , sz ) ;
pgstat_report_wait_end ( ) ;
if ( readBytes ! = sz )
if ( ondisk . builder . committed . xcnt > 0 )
{
int save_errno = errno ;
CloseTransientFile ( fd ) ;
sz = sizeof ( TransactionId ) * ondisk . builder . committed . xcnt ;
ondisk . builder . committed . xip = MemoryContextAllocZero ( builder - > context , sz ) ;
SnapBuildRestoreContents ( fd , ( char * ) ondisk . builder . committed . xip , sz , path ) ;
COMP_CRC32C ( checksum , ondisk . builder . committed . xip , sz ) ;
}
if ( readBytes < 0 )
{
errno = save_errno ;
ereport ( ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " could not read file \" %s \" : %m " , path ) ) ) ;
}
else
ereport ( ERROR ,
( errcode ( ERRCODE_DATA_CORRUPTED ) ,
errmsg ( " could not read file \" %s \" : read %d of %zu " ,
path , readBytes , sz ) ) ) ;
/* restore catalog modifying xacts information */
if ( ondisk . builder . catchange . xcnt > 0 )
{
sz = sizeof ( TransactionId ) * ondisk . builder . catchange . xcnt ;
ondisk . builder . catchange . xip = MemoryContextAllocZero ( builder - > context , sz ) ;
SnapBuildRestoreContents ( fd , ( char * ) ondisk . builder . catchange . xip , sz , path ) ;
COMP_CRC32C ( checksum , ondisk . builder . catchange . xip , sz ) ;
}
COMP_CRC32C ( checksum , ondisk . builder . committed . xip , sz ) ;
if ( CloseTransientFile ( fd ) ! = 0 )
ereport ( ERROR ,
@ -1885,6 +1947,13 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
}
ondisk . builder . committed . xip = NULL ;
/* set catalog modifying transactions */
if ( builder - > catchange . xip )
pfree ( builder - > catchange . xip ) ;
builder - > catchange . xcnt = ondisk . builder . catchange . xcnt ;
builder - > catchange . xip = ondisk . builder . catchange . xip ;
ondisk . builder . catchange . xip = NULL ;
/* our snapshot is not interesting anymore, build a new one */
if ( builder - > snapshot ! = NULL )
{
@ -1906,9 +1975,43 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
snapshot_not_interesting :
if ( ondisk . builder . committed . xip ! = NULL )
pfree ( ondisk . builder . committed . xip ) ;
if ( ondisk . builder . catchange . xip ! = NULL )
pfree ( ondisk . builder . catchange . xip ) ;
return false ;
}
/*
* Read the contents of the serialized snapshot to ' dest ' .
*/
static void
SnapBuildRestoreContents ( int fd , char * dest , Size size , const char * path )
{
int readBytes ;
pgstat_report_wait_start ( WAIT_EVENT_SNAPBUILD_READ ) ;
readBytes = read ( fd , dest , size ) ;
pgstat_report_wait_end ( ) ;
if ( readBytes ! = size )
{
int save_errno = errno ;
CloseTransientFile ( fd ) ;
if ( readBytes < 0 )
{
errno = save_errno ;
ereport ( ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " could not read file \" %s \" : %m " , path ) ) ) ;
}
else
ereport ( ERROR ,
( errcode ( ERRCODE_DATA_CORRUPTED ) ,
errmsg ( " could not read file \" %s \" : read %d of %zu " ,
path , readBytes , sizeof ( SnapBuild ) ) ) ) ;
}
}
/*
* Remove all serialized snapshots that are not required anymore because no
* slot can need them . This doesn ' t actually have to run during a checkpoint ,