@ -221,13 +221,13 @@ static void ProcessRecords(char *bufptr, TransactionId xid,
static void RemoveGXact ( GlobalTransaction gxact ) ;
static void XlogReadTwoPhaseData ( XLogRecPtr lsn , char * * buf , int * len ) ;
static char * ProcessTwoPhaseBuffer ( TransactionId xid ,
static char * ProcessTwoPhaseBuffer ( Full TransactionId xid ,
XLogRecPtr prepare_start_lsn ,
bool fromdisk , bool setParent , bool setNextXid ) ;
static void MarkAsPreparingGuts ( GlobalTransaction gxact , TransactionId xid ,
const char * gid , TimestampTz prepared_at , Oid owner ,
Oid databaseid ) ;
static void RemoveTwoPhaseFile ( TransactionId xid , bool giveWarning ) ;
static void RemoveTwoPhaseFile ( Full TransactionId f xid, bool giveWarning ) ;
static void RecreateTwoPhaseFile ( TransactionId xid , void * content , int len ) ;
/*
@ -927,41 +927,26 @@ TwoPhaseGetDummyProc(TransactionId xid, bool lock_held)
/************************************************************************/
/*
* Compute the FullTransactionId for the given TransactionId .
*
* The wrap logic is safe here because the span of active xids cannot exceed one
* epoch at any given time .
* Compute FullTransactionId for the given TransactionId , using the current
* epoch .
*/
static inline FullTransactionId
AdjustTo FullTransactionId( TransactionId xid )
FullTransactionIdFromCurrentEpoch ( TransactionId xid )
{
FullTransactionId fxid ;
FullTransactionId nextFullXid ;
TransactionId nextXid ;
uint32 epoch ;
Assert ( TransactionIdIsValid ( xid ) ) ;
LWLockAcquire ( XidGenLock , LW_SHARED ) ;
nextFullXid = TransamVariables - > nextXid ;
LWLockRelease ( XidGenLock ) ;
nextXid = XidFromFullTransactionId ( nextFullXid ) ;
nextFullXid = ReadNextFullTransactionId ( ) ;
epoch = EpochFromFullTransactionId ( nextFullXid ) ;
if ( unlikely ( xid > nextXid ) )
{
/* Wraparound occurred, must be from a prev epoch. */
Assert ( epoch > 0 ) ;
epoch - - ;
}
return FullTransactionIdFromEpochAndXid ( epoch , xid ) ;
fxid = FullTransactionIdFromEpochAndXid ( epoch , xid ) ;
return fxid ;
}
static inline int
TwoPhaseFilePath ( char * path , TransactionId xid )
TwoPhaseFilePath ( char * path , Full TransactionId f xid)
{
FullTransactionId fxid = AdjustToFullTransactionId ( xid ) ;
return snprintf ( path , MAXPGPATH , TWOPHASE_DIR " /%08X%08X " ,
EpochFromFullTransactionId ( fxid ) ,
XidFromFullTransactionId ( fxid ) ) ;
@ -1297,7 +1282,8 @@ RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
* If it looks OK ( has a valid magic number and CRC ) , return the palloc ' d
* contents of the file , issuing an error when finding corrupted data . If
* missing_ok is true , which indicates that missing files can be safely
* ignored , then return NULL . This state can be reached when doing recovery .
* ignored , then return NULL . This state can be reached when doing recovery
* after discarding two - phase files from other epochs .
*/
static char *
ReadTwoPhaseFile ( TransactionId xid , bool missing_ok )
@ -1311,8 +1297,10 @@ ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
pg_crc32c calc_crc ,
file_crc ;
int r ;
FullTransactionId fxid ;
TwoPhaseFilePath ( path , xid ) ;
fxid = FullTransactionIdFromCurrentEpoch ( xid ) ;
TwoPhaseFilePath ( path , fxid ) ;
fd = OpenTransientFile ( path , O_RDONLY | PG_BINARY ) ;
if ( fd < 0 )
@ -1677,10 +1665,16 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
AtEOXact_PgStat ( isCommit , false ) ;
/*
* And now we can clean up any files we may have left .
* And now we can clean up any files we may have left . These should be
* from the current epoch .
*/
if ( ondisk )
RemoveTwoPhaseFile ( xid , true ) ;
{
FullTransactionId fxid ;
fxid = FullTransactionIdFromCurrentEpoch ( xid ) ;
RemoveTwoPhaseFile ( fxid , true ) ;
}
MyLockedGxact = NULL ;
@ -1719,13 +1713,17 @@ ProcessRecords(char *bufptr, TransactionId xid,
*
* If giveWarning is false , do not complain about file - not - present ;
* this is an expected case during WAL replay .
*
* This routine is used at early stages at recovery where future and
* past orphaned files are checked , hence the FullTransactionId to build
* a complete file name fit for the removal .
*/
static void
RemoveTwoPhaseFile ( TransactionId xid , bool giveWarning )
RemoveTwoPhaseFile ( Full TransactionId f xid, bool giveWarning )
{
char path [ MAXPGPATH ] ;
TwoPhaseFilePath ( path , xid ) ;
TwoPhaseFilePath ( path , f xid) ;
if ( unlink ( path ) )
if ( errno ! = ENOENT | | giveWarning )
ereport ( WARNING ,
@ -1745,13 +1743,16 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
char path [ MAXPGPATH ] ;
pg_crc32c statefile_crc ;
int fd ;
FullTransactionId fxid ;
/* Recompute CRC */
INIT_CRC32C ( statefile_crc ) ;
COMP_CRC32C ( statefile_crc , content , len ) ;
FIN_CRC32C ( statefile_crc ) ;
TwoPhaseFilePath ( path , xid ) ;
/* Use current epoch */
fxid = FullTransactionIdFromCurrentEpoch ( xid ) ;
TwoPhaseFilePath ( path , fxid ) ;
fd = OpenTransientFile ( path ,
O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY ) ;
@ -1899,7 +1900,9 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
* Scan pg_twophase and fill TwoPhaseState depending on the on - disk data .
* This is called once at the beginning of recovery , saving any extra
* lookups in the future . Two - phase files that are newer than the
* minimum XID horizon are discarded on the way .
* minimum XID horizon are discarded on the way . Two - phase files with
* an epoch older or newer than the current checkpoint ' s record epoch
* are also discarded .
*/
void
restoreTwoPhaseData ( void )
@ -1914,14 +1917,11 @@ restoreTwoPhaseData(void)
if ( strlen ( clde - > d_name ) = = 16 & &
strspn ( clde - > d_name , " 0123456789ABCDEF " ) = = 16 )
{
TransactionId xid ;
FullTransactionId fxid ;
char * buf ;
fxid = FullTransactionIdFromU64 ( strtou64 ( clde - > d_name , NULL , 16 ) ) ;
xid = XidFromFullTransactionId ( fxid ) ;
buf = ProcessTwoPhaseBuffer ( xid , InvalidXLogRecPtr ,
buf = ProcessTwoPhaseBuffer ( fxid , InvalidXLogRecPtr ,
true , false , false ) ;
if ( buf = = NULL )
continue ;
@ -1972,6 +1972,7 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
TransactionId origNextXid = XidFromFullTransactionId ( nextXid ) ;
TransactionId result = origNextXid ;
TransactionId * xids = NULL ;
uint32 epoch = EpochFromFullTransactionId ( nextXid ) ;
int nxids = 0 ;
int allocsize = 0 ;
int i ;
@ -1980,6 +1981,7 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
for ( i = 0 ; i < TwoPhaseState - > numPrepXacts ; i + + )
{
TransactionId xid ;
FullTransactionId fxid ;
char * buf ;
GlobalTransaction gxact = TwoPhaseState - > prepXacts [ i ] ;
@ -1987,7 +1989,12 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
xid = gxact - > xid ;
buf = ProcessTwoPhaseBuffer ( xid ,
/*
* All two - phase files with past and future epoch in pg_twophase are
* gone at this point , so we ' re OK to rely on only the current epoch .
*/
fxid = FullTransactionIdFromEpochAndXid ( epoch , xid ) ;
buf = ProcessTwoPhaseBuffer ( fxid ,
gxact - > prepare_start_lsn ,
gxact - > ondisk , false , true ) ;
@ -2049,11 +2056,18 @@ void
StandbyRecoverPreparedTransactions ( void )
{
int i ;
uint32 epoch ;
FullTransactionId nextFullXid ;
/* get current epoch */
nextFullXid = ReadNextFullTransactionId ( ) ;
epoch = EpochFromFullTransactionId ( nextFullXid ) ;
LWLockAcquire ( TwoPhaseStateLock , LW_EXCLUSIVE ) ;
for ( i = 0 ; i < TwoPhaseState - > numPrepXacts ; i + + )
{
TransactionId xid ;
FullTransactionId fxid ;
char * buf ;
GlobalTransaction gxact = TwoPhaseState - > prepXacts [ i ] ;
@ -2061,7 +2075,12 @@ StandbyRecoverPreparedTransactions(void)
xid = gxact - > xid ;
buf = ProcessTwoPhaseBuffer ( xid ,
/*
* At this stage , we ' re OK to work with the current epoch as all past
* and future files have been already discarded .
*/
fxid = FullTransactionIdFromEpochAndXid ( epoch , xid ) ;
buf = ProcessTwoPhaseBuffer ( fxid ,
gxact - > prepare_start_lsn ,
gxact - > ondisk , true , false ) ;
if ( buf ! = NULL )
@ -2090,11 +2109,18 @@ void
RecoverPreparedTransactions ( void )
{
int i ;
uint32 epoch ;
FullTransactionId nextFullXid ;
/* get current epoch */
nextFullXid = ReadNextFullTransactionId ( ) ;
epoch = EpochFromFullTransactionId ( nextFullXid ) ;
LWLockAcquire ( TwoPhaseStateLock , LW_EXCLUSIVE ) ;
for ( i = 0 ; i < TwoPhaseState - > numPrepXacts ; i + + )
{
TransactionId xid ;
FullTransactionId fxid ;
char * buf ;
GlobalTransaction gxact = TwoPhaseState - > prepXacts [ i ] ;
char * bufptr ;
@ -2102,6 +2128,10 @@ RecoverPreparedTransactions(void)
TransactionId * subxids ;
const char * gid ;
/*
* At this stage , we ' re OK to work with the current epoch as all past
* and future files have been already discarded .
*/
xid = gxact - > xid ;
/*
@ -2113,7 +2143,8 @@ RecoverPreparedTransactions(void)
* SubTransSetParent has been set before , if the prepared transaction
* generated xid assignment records .
*/
buf = ProcessTwoPhaseBuffer ( xid ,
fxid = FullTransactionIdFromEpochAndXid ( epoch , xid ) ;
buf = ProcessTwoPhaseBuffer ( fxid ,
gxact - > prepare_start_lsn ,
gxact - > ondisk , true , false ) ;
if ( buf = = NULL )
@ -2181,7 +2212,7 @@ RecoverPreparedTransactions(void)
/*
* ProcessTwoPhaseBuffer
*
* Given a transaction i d, read it either from disk or read it directly
* Given a FullTransactionI d, read it either from disk or read it directly
* via shmem xlog record pointer using the provided " prepare_start_lsn " .
*
* If setParent is true , set up subtransaction parent linkages .
@ -2190,32 +2221,35 @@ RecoverPreparedTransactions(void)
* value scanned .
*/
static char *
ProcessTwoPhaseBuffer ( TransactionId xid ,
ProcessTwoPhaseBuffer ( Full TransactionId f xid,
XLogRecPtr prepare_start_lsn ,
bool fromdisk ,
bool setParent , bool setNextXid )
{
FullTransactionId nextXid = TransamVariables - > nextXid ;
TransactionId origNextXid = XidFromFullTransactionId ( nextXid ) ;
TransactionId * subxids ;
char * buf ;
TwoPhaseFileHeader * hdr ;
int i ;
TransactionId xid = XidFromFullTransactionId ( fxid ) ;
Assert ( LWLockHeldByMeInMode ( TwoPhaseStateLock , LW_EXCLUSIVE ) ) ;
if ( ! fromdisk )
Assert ( prepare_start_lsn ! = InvalidXLogRecPtr ) ;
/* Reject XID if too new */
if ( TransactionIdFollowsOrEquals ( xid , origNextXid ) )
/*
* Reject full XID if too new . Note that this discards files from future
* epochs .
*/
if ( FullTransactionIdFollowsOrEquals ( fxid , nextXid ) )
{
if ( fromdisk )
{
ereport ( WARNING ,
( errmsg ( " removing future two-phase state file for transaction %u " ,
xid ) ) ) ;
RemoveTwoPhaseFile ( xid , true ) ;
( errmsg ( " removing future two-phase state file of epoch %u for transaction %u " ,
EpochFromFullTransactionId ( fxid ) , xid ) ) ) ;
RemoveTwoPhaseFile ( f xid, true ) ;
}
else
{
@ -2227,6 +2261,26 @@ ProcessTwoPhaseBuffer(TransactionId xid,
return NULL ;
}
/* Discard files from past epochs */
if ( EpochFromFullTransactionId ( fxid ) < EpochFromFullTransactionId ( nextXid ) )
{
if ( fromdisk )
{
ereport ( WARNING ,
( errmsg ( " removing past two-phase state file of epoch %u for transaction %u " ,
EpochFromFullTransactionId ( fxid ) , xid ) ) ) ;
RemoveTwoPhaseFile ( fxid , true ) ;
}
else
{
ereport ( WARNING ,
( errmsg ( " removing past two-phase state from memory for transaction %u " ,
xid ) ) ) ;
PrepareRedoRemove ( xid , true ) ;
}
return NULL ;
}
/* Already processed? */
if ( TransactionIdDidCommit ( xid ) | | TransactionIdDidAbort ( xid ) )
{
@ -2235,7 +2289,7 @@ ProcessTwoPhaseBuffer(TransactionId xid,
ereport ( WARNING ,
( errmsg ( " removing stale two-phase state file for transaction %u " ,
xid ) ) ) ;
RemoveTwoPhaseFile ( xid , true ) ;
RemoveTwoPhaseFile ( f xid, true ) ;
}
else
{
@ -2521,8 +2575,11 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
if ( ! XLogRecPtrIsInvalid ( start_lsn ) )
{
char path [ MAXPGPATH ] ;
FullTransactionId fxid ;
TwoPhaseFilePath ( path , hdr - > xid ) ;
/* Use current epoch */
fxid = FullTransactionIdFromCurrentEpoch ( hdr - > xid ) ;
TwoPhaseFilePath ( path , fxid ) ;
if ( access ( path , F_OK ) = = 0 )
{
@ -2617,7 +2674,15 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning)
*/
elog ( DEBUG2 , " removing 2PC data for transaction %u " , xid ) ;
if ( gxact - > ondisk )
RemoveTwoPhaseFile ( xid , giveWarning ) ;
{
FullTransactionId fxid ;
/*
* We should deal with a file at the current epoch here .
*/
fxid = FullTransactionIdFromCurrentEpoch ( xid ) ;
RemoveTwoPhaseFile ( fxid , giveWarning ) ;
}
RemoveGXact ( gxact ) ;
}