@ -45,8 +45,27 @@
* fsynced
* * If COMMIT happens after checkpoint then backend reads state data from
* files
* * In case of crash replay will move data from xlog to files , if that
* hasn ' t happened before . XXX TODO - move to shmem in replay also
*
* During replay and replication , TwoPhaseState also holds information
* about active prepared transactions that haven ' t been moved to disk yet .
*
* Replay of twophase records happens by the following rules :
*
* * At the beginning of recovery , pg_twophase is scanned once , filling
* TwoPhaseState with entries marked with gxact - > inredo and
* gxact - > ondisk . Two - phase file data older than the XID horizon of
* the redo position are discarded .
* * On PREPARE redo , the transaction is added to TwoPhaseState - > prepXacts .
* gxact - > inredo is set to true for such entries .
* * On Checkpoint we iterate through TwoPhaseState - > prepXacts entries
* that have gxact - > inredo set and are behind the redo_horizon . We
* save them to disk and then switch gxact - > ondisk to true .
* * On COMMIT / ABORT we delete the entry from TwoPhaseState - > prepXacts .
* If gxact - > ondisk is true , the corresponding entry from the disk
* is additionally deleted .
* * RecoverPreparedTransactions ( ) , StandbyRecoverPreparedTransactions ( )
* and PrescanPreparedTransactions ( ) have been modified to go through
* gxact - > inredo entries that have not made it to disk .
*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
@ -147,11 +166,13 @@ typedef struct GlobalTransactionData
*/
XLogRecPtr prepare_start_lsn ; /* XLOG offset of prepare record start */
XLogRecPtr prepare_end_lsn ; /* XLOG offset of prepare record end */
TransactionId xid ; /* The GXACT id */
Oid owner ; /* ID of user that executed the xact */
BackendId locking_backend ; /* backend currently working on the xact */
bool valid ; /* TRUE if PGPROC entry is in proc array */
bool ondisk ; /* TRUE if prepare state file is on disk */
bool inredo ; /* TRUE if entry was added via xlog_redo */
char gid [ GIDSIZE ] ; /* The GID assigned to the prepared xact */
} GlobalTransactionData ;
@ -198,6 +219,15 @@ static void ProcessRecords(char *bufptr, TransactionId xid,
static void RemoveGXact ( GlobalTransaction gxact ) ;
static void XlogReadTwoPhaseData ( XLogRecPtr lsn , char * * buf , int * len ) ;
static char * ProcessTwoPhaseBuffer ( TransactionId xid ,
XLogRecPtr prepare_start_lsn ,
bool fromdisk , bool overwriteOK , bool setParent ,
TransactionId * result , TransactionId * maxsubxid ) ;
static void MarkAsPreparingGuts ( GlobalTransaction gxact , TransactionId xid ,
const char * gid , TimestampTz prepared_at , Oid owner ,
Oid databaseid ) ;
static void RemoveTwoPhaseFile ( TransactionId xid , bool giveWarning ) ;
static void RecreateTwoPhaseFile ( TransactionId xid , void * content , int len ) ;
/*
* Initialization of shared memory
@ -342,18 +372,12 @@ PostPrepare_Twophase(void)
/*
* MarkAsPreparing
* Reserve the GID for the given transaction .
*
* Internally , this creates a gxact struct and puts it into the active array .
* NOTE : this is also used when reloading a gxact after a crash ; so avoid
* assuming that we can use very much backend context .
*/
GlobalTransaction
MarkAsPreparing ( TransactionId xid , const char * gid ,
TimestampTz prepared_at , Oid owner , Oid databaseid )
{
GlobalTransaction gxact ;
PGPROC * proc ;
PGXACT * pgxact ;
int i ;
if ( strlen ( gid ) > = GIDSIZE )
@ -401,6 +425,37 @@ MarkAsPreparing(TransactionId xid, const char *gid,
gxact = TwoPhaseState - > freeGXacts ;
TwoPhaseState - > freeGXacts = gxact - > next ;
MarkAsPreparingGuts ( gxact , xid , gid , prepared_at , owner , databaseid ) ;
gxact - > ondisk = false ;
/* And insert it into the active array */
Assert ( TwoPhaseState - > numPrepXacts < max_prepared_xacts ) ;
TwoPhaseState - > prepXacts [ TwoPhaseState - > numPrepXacts + + ] = gxact ;
LWLockRelease ( TwoPhaseStateLock ) ;
return gxact ;
}
/*
* MarkAsPreparingGuts
*
* This uses a gxact struct and puts it into the active array .
* NOTE : this is also used when reloading a gxact after a crash ; so avoid
* assuming that we can use very much backend context .
*
* Note : This function should be called with appropriate locks held .
*/
static void
MarkAsPreparingGuts ( GlobalTransaction gxact , TransactionId xid , const char * gid ,
TimestampTz prepared_at , Oid owner , Oid databaseid )
{
PGPROC * proc ;
PGXACT * pgxact ;
int i ;
Assert ( gxact ! = NULL ) ;
proc = & ProcGlobal - > allProcs [ gxact - > pgprocno ] ;
pgxact = & ProcGlobal - > allPgXact [ gxact - > pgprocno ] ;
@ -431,28 +486,18 @@ MarkAsPreparing(TransactionId xid, const char *gid,
pgxact - > nxids = 0 ;
gxact - > prepared_at = prepared_at ;
/* initialize LSN to InvalidXLogRecPtr */
gxact - > prepare_start_lsn = InvalidXLogRecPtr ;
gxact - > prepare_end_lsn = InvalidXLogRecPtr ;
gxact - > xid = xid ;
gxact - > owner = owner ;
gxact - > locking_backend = MyBackendId ;
gxact - > valid = false ;
gxact - > ondisk = false ;
gxact - > inredo = false ;
strcpy ( gxact - > gid , gid ) ;
/* And insert it into the active array */
Assert ( TwoPhaseState - > numPrepXacts < max_prepared_xacts ) ;
TwoPhaseState - > prepXacts [ TwoPhaseState - > numPrepXacts + + ] = gxact ;
/*
* Remember that we have this GlobalTransaction entry locked for us . If we
* abort after this , we must release it .
*/
MyLockedGxact = gxact ;
LWLockRelease ( TwoPhaseStateLock ) ;
return gxact ;
}
/*
@ -1244,9 +1289,9 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
* Reads 2 PC data from xlog . During checkpoint this data will be moved to
* twophase files and ReadTwoPhaseFile should be used instead .
*
* Note clearly that this function accesse s WAL during normal operation , similarly
* to the way WALSender or Logical Decoding would do . It does not run during
* crash recovery or standby processing .
* Note clearly that this function can acces s WAL during normal operation ,
* similarly to the way WALSender or Logical Decoding would do .
*
*/
static void
XlogReadTwoPhaseData ( XLogRecPtr lsn , char * * buf , int * len )
@ -1255,8 +1300,6 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
XLogReaderState * xlogreader ;
char * errormsg ;
Assert ( ! RecoveryInProgress ( ) ) ;
xlogreader = XLogReaderAllocate ( & read_local_xlog_page , NULL ) ;
if ( ! xlogreader )
ereport ( ERROR ,
@ -1501,7 +1544,7 @@ ProcessRecords(char *bufptr, TransactionId xid,
* If giveWarning is false , do not complain about file - not - present ;
* this is an expected case during WAL replay .
*/
void
static void
RemoveTwoPhaseFile ( TransactionId xid , bool giveWarning )
{
char path [ MAXPGPATH ] ;
@ -1521,7 +1564,7 @@ RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
*
* Note : content and len don ' t include CRC .
*/
void
static void
RecreateTwoPhaseFile ( TransactionId xid , void * content , int len )
{
char path [ MAXPGPATH ] ;
@ -1587,9 +1630,11 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
/*
* CheckPointTwoPhase - - handle 2 PC component of checkpointing .
*
* We must fsync the state file of any GXACT that is valid and has a PREPARE
* LSN < = the checkpoint ' s redo horizon . ( If the gxact isn ' t valid yet or
* has a later LSN , this checkpoint is not responsible for fsyncing it . )
* We must fsync the state file of any GXACT that is valid or has been
* generated during redo and has a PREPARE LSN < = the checkpoint ' s redo
* horizon . ( If the gxact isn ' t valid yet , has not been generated in
* redo , or has a later LSN , this checkpoint is not responsible for
* fsyncing it . )
*
* This is deliberately run as late as possible in the checkpoint sequence ,
* because GXACTs ordinarily have short lifespans , and so it is quite
@ -1631,10 +1676,10 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
LWLockAcquire ( TwoPhaseStateLock , LW_SHARED ) ;
for ( i = 0 ; i < TwoPhaseState - > numPrepXacts ; i + + )
{
/* Note that we are using gxact not pgxact so this works in recovery also */
GlobalTransaction gxact = TwoPhaseState - > prepXacts [ i ] ;
PGXACT * pgxact = & ProcGlobal - > allPgXact [ gxact - > pgprocno ] ;
if ( gxact - > valid & &
if ( ( gxact - > valid | | gxact - > inredo ) & &
! gxact - > ondisk & &
gxact - > prepare_end_lsn < = redo_horizon )
{
@ -1642,8 +1687,10 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
int len ;
XlogReadTwoPhaseData ( gxact - > prepare_start_lsn , & buf , & len ) ;
RecreateTwoPhaseFile ( p gxact- > xid , buf , len ) ;
RecreateTwoPhaseFile ( gxact - > xid , buf , len ) ;
gxact - > ondisk = true ;
gxact - > prepare_start_lsn = InvalidXLogRecPtr ;
gxact - > prepare_end_lsn = InvalidXLogRecPtr ;
pfree ( buf ) ;
serialized_xacts + + ;
}
@ -1670,13 +1717,50 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
serialized_xacts ) ) ) ;
}
/*
* restoreTwoPhaseData
*
* Scan pg_twophase and fill TwoPhaseState depending on the on - disk data .
* This is called once at the beginning of recovery , saving any extra
* lookups in the future . Two - phase files that are newer than the
* minimum XID horizon are discarded on the way .
*/
void
restoreTwoPhaseData ( void )
{
DIR * cldir ;
struct dirent * clde ;
cldir = AllocateDir ( TWOPHASE_DIR ) ;
while ( ( clde = ReadDir ( cldir , TWOPHASE_DIR ) ) ! = NULL )
{
if ( strlen ( clde - > d_name ) = = 8 & &
strspn ( clde - > d_name , " 0123456789ABCDEF " ) = = 8 )
{
TransactionId xid ;
char * buf ;
xid = ( TransactionId ) strtoul ( clde - > d_name , NULL , 16 ) ;
buf = ProcessTwoPhaseBuffer ( xid , InvalidXLogRecPtr ,
true , false , false ,
NULL , NULL ) ;
if ( buf = = NULL )
continue ;
PrepareRedoAdd ( buf , InvalidXLogRecPtr , InvalidXLogRecPtr ) ;
}
}
FreeDir ( cldir ) ;
}
/*
* PrescanPreparedTransactions
*
* Scan the pg_twophase directory and determine the range of valid XIDs
* present . This is run during database startup , after we have completed
* reading WAL . ShmemVariableCache - > nextXid has been set to one more than
* the highest XID for which evidence exists in WAL .
* Scan the shared memory entries of TwoPhaseState and determine the range
* of valid XIDs present . This is run during database startup , after we
* have completed reading WAL . ShmemVariableCache - > nextXid has been set to
* one more than the highest XID for which evidence exists in WAL .
*
* We throw away any prepared xacts with main XID beyond nextXid - - - if any
* are present , it suggests that the DBA has done a PITR recovery to an
@ -1702,97 +1786,30 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
{
TransactionId origNextXid = ShmemVariableCache - > nextXid ;
TransactionId result = origNextXid ;
DIR * cldir ;
struct dirent * clde ;
TransactionId maxsubxid = origNextXid ;
TransactionId * xids = NULL ;
int nxids = 0 ;
int allocsize = 0 ;
int i ;
cldir = AllocateDir ( TWOPHASE_DIR ) ;
while ( ( clde = ReadDir ( cldir , TWOPHASE_DIR ) ) ! = NULL )
{
if ( strlen ( clde - > d_name ) = = 8 & &
strspn ( clde - > d_name , " 0123456789ABCDEF " ) = = 8 )
LWLockAcquire ( TwoPhaseStateLock , LW_SHARED ) ;
for ( i = 0 ; i < TwoPhaseState - > numPrepXacts ; i + + )
{
TransactionId xid ;
char * buf ;
TwoPhaseFileHeader * hdr ;
TransactionId * subxids ;
int i ;
GlobalTransaction gxact = TwoPhaseState - > prepXacts [ i ] ;
xid = ( TransactionId ) strtoul ( clde - > d_name , NULL , 16 ) ;
Assert ( gxact - > inredo ) ;
/* Reject XID if too new */
if ( TransactionIdFollowsOrEquals ( xid , origNextXid ) )
{
ereport ( WARNING ,
( errmsg ( " removing future two-phase state file \" %s \" " ,
clde - > d_name ) ) ) ;
RemoveTwoPhaseFile ( xid , true ) ;
continue ;
}
xid = gxact - > xid ;
/*
* Note : we can ' t check if already processed because clog
* subsystem isn ' t up yet .
*/
buf = ProcessTwoPhaseBuffer ( xid ,
gxact - > prepare_start_lsn ,
gxact - > ondisk , false , false ,
& result , & maxsubxid ) ;
/* Read and validate file */
buf = ReadTwoPhaseFile ( xid , true ) ;
if ( buf = = NULL )
{
ereport ( WARNING ,
( errmsg ( " removing corrupt two-phase state file \" %s \" " ,
clde - > d_name ) ) ) ;
RemoveTwoPhaseFile ( xid , true ) ;
continue ;
}
/* Deconstruct header */
hdr = ( TwoPhaseFileHeader * ) buf ;
if ( ! TransactionIdEquals ( hdr - > xid , xid ) )
{
ereport ( WARNING ,
( errmsg ( " removing corrupt two-phase state file \" %s \" " ,
clde - > d_name ) ) ) ;
RemoveTwoPhaseFile ( xid , true ) ;
pfree ( buf ) ;
continue ;
}
/*
* OK , we think this file is valid . Incorporate xid into the
* running - minimum result .
*/
if ( TransactionIdPrecedes ( xid , result ) )
result = xid ;
/*
* Examine subtransaction XIDs . . . they should all follow main
* XID , and they may force us to advance nextXid .
*
* We don ' t expect anyone else to modify nextXid , hence we don ' t
* need to hold a lock while examining it . We still acquire the
* lock to modify it , though .
*/
subxids = ( TransactionId * ) ( buf +
MAXALIGN ( sizeof ( TwoPhaseFileHeader ) ) +
MAXALIGN ( hdr - > gidlen ) ) ;
for ( i = 0 ; i < hdr - > nsubxacts ; i + + )
{
TransactionId subxid = subxids [ i ] ;
Assert ( TransactionIdFollows ( subxid , xid ) ) ;
if ( TransactionIdFollowsOrEquals ( subxid ,
ShmemVariableCache - > nextXid ) )
{
LWLockAcquire ( XidGenLock , LW_EXCLUSIVE ) ;
ShmemVariableCache - > nextXid = subxid ;
TransactionIdAdvance ( ShmemVariableCache - > nextXid ) ;
LWLockRelease ( XidGenLock ) ;
}
}
if ( xids_p )
{
@ -1814,8 +1831,7 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
pfree ( buf ) ;
}
}
FreeDir ( cldir ) ;
LWLockRelease ( TwoPhaseStateLock ) ;
if ( xids_p )
{
@ -1823,14 +1839,25 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
* nxids_p = nxids ;
}
/* update nextXid if needed */
if ( TransactionIdFollowsOrEquals ( maxsubxid , ShmemVariableCache - > nextXid ) )
{
LWLockAcquire ( XidGenLock , LW_EXCLUSIVE ) ;
ShmemVariableCache - > nextXid = maxsubxid ;
TransactionIdAdvance ( ShmemVariableCache - > nextXid ) ;
LWLockRelease ( XidGenLock ) ;
}
return result ;
}
/*
* StandbyRecoverPreparedTransactions
*
* Scan the pg_twophase directory and setup all the required information to
* allow standby queries to treat prepared transactions as still active .
* Scan the shared memory entries of TwoPhaseState and setup all the required
* information to allow standby queries to treat prepared transactions as still
* active .
*
* This is never called at the end of recovery - we use
* RecoverPreparedTransactions ( ) at that point .
*
@ -1841,136 +1868,69 @@ PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
void
StandbyRecoverPreparedTransactions ( bool overwriteOK )
{
DIR * cldir ;
struct dirent * clde ;
int i ;
cldir = AllocateDir ( TWOPHASE_DIR ) ;
while ( ( clde = ReadDir ( cldir , TWOPHASE_DIR ) ) ! = NULL )
{
if ( strlen ( clde - > d_name ) = = 8 & &
strspn ( clde - > d_name , " 0123456789ABCDEF " ) = = 8 )
LWLockAcquire ( TwoPhaseStateLock , LW_SHARED ) ;
for ( i = 0 ; i < TwoPhaseState - > numPrepXacts ; i + + )
{
TransactionId xid ;
char * buf ;
TwoPhaseFileHeader * hdr ;
TransactionId * subxids ;
int i ;
xid = ( TransactionId ) strtoul ( clde - > d_name , NULL , 16 ) ;
/* Already processed? */
if ( TransactionIdDidCommit ( xid ) | | TransactionIdDidAbort ( xid ) )
{
ereport ( WARNING ,
( errmsg ( " removing stale two-phase state file \" %s \" " ,
clde - > d_name ) ) ) ;
RemoveTwoPhaseFile ( xid , true ) ;
continue ;
}
/* Read and validate file */
buf = ReadTwoPhaseFile ( xid , true ) ;
if ( buf = = NULL )
{
ereport ( WARNING ,
( errmsg ( " removing corrupt two-phase state file \" %s \" " ,
clde - > d_name ) ) ) ;
RemoveTwoPhaseFile ( xid , true ) ;
continue ;
}
/* Deconstruct header */
hdr = ( TwoPhaseFileHeader * ) buf ;
if ( ! TransactionIdEquals ( hdr - > xid , xid ) )
{
ereport ( WARNING ,
( errmsg ( " removing corrupt two-phase state file \" %s \" " ,
clde - > d_name ) ) ) ;
RemoveTwoPhaseFile ( xid , true ) ;
pfree ( buf ) ;
continue ;
}
GlobalTransaction gxact = TwoPhaseState - > prepXacts [ i ] ;
/*
* Examine subtransaction XIDs . . . they should all follow main
* XID .
*/
subxids = ( TransactionId * ) ( buf +
MAXALIGN ( sizeof ( TwoPhaseFileHeader ) ) +
MAXALIGN ( hdr - > gidlen ) ) ;
for ( i = 0 ; i < hdr - > nsubxacts ; i + + )
{
TransactionId subxid = subxids [ i ] ;
Assert ( gxact - > inredo ) ;
Assert ( TransactionIdFollows ( subxid , xid ) ) ;
SubTransSetParent ( xid , subxid , overwriteOK ) ;
}
xid = gxact - > xid ;
buf = ProcessTwoPhaseBuffer ( xid ,
gxact - > prepare_start_lsn ,
gxact - > ondisk , overwriteOK , true ,
NULL , NULL ) ;
if ( buf ! = NULL )
pfree ( buf ) ;
}
}
FreeDir ( cldir ) ;
LWLockRelease ( TwoPhaseStateLock ) ;
}
/*
* RecoverPreparedTransactions
*
* Scan the pg_twophase directory and reload shared - memory state for each
* prepared transaction ( reacquire locks , etc ) . This is run during database
* startup .
* Scan the shared memory entries of TwoPhaseState and reload the state for
* each prepared transaction ( reacquire locks , etc ) .
*
* This is run during database startup .
*/
void
RecoverPreparedTransactions ( void )
{
char dir [ MAXPGPATH ] ;
DIR * cldir ;
struct dirent * clde ;
bool overwriteOK = false ;
snprintf ( dir , MAXPGPATH , " %s " , TWOPHASE_DIR ) ;
int i ;
cldir = AllocateDir ( dir ) ;
while ( ( clde = ReadDir ( cldir , dir ) ) ! = NULL )
{
if ( strlen ( clde - > d_name ) = = 8 & &
strspn ( clde - > d_name , " 0123456789ABCDEF " ) = = 8 )
/*
* Don ' t need a lock in the recovery phase .
*/
for ( i = 0 ; i < TwoPhaseState - > numPrepXacts ; i + + )
{
TransactionId xid ;
char * buf ;
GlobalTransaction gxact = TwoPhaseState - > prepXacts [ i ] ;
char * bufptr ;
TwoPhaseFileHeader * hdr ;
TransactionId * subxids ;
GlobalTransaction gxact ;
const char * gid ;
bool overwriteOK = false ;
int i ;
xid = ( TransactionId ) strtoul ( clde - > d_name , NULL , 16 ) ;
/* Already processed? */
if ( TransactionIdDidCommit ( xid ) | | TransactionIdDidAbort ( xid ) )
{
ereport ( WARNING ,
( errmsg ( " removing stale two-phase state file \" %s \" " ,
clde - > d_name ) ) ) ;
RemoveTwoPhaseFile ( xid , true ) ;
continue ;
}
xid = gxact - > xid ;
/* Read and validate file */
buf = ReadTwoPhaseFile ( xid , true ) ;
buf = ProcessTwoPhaseBuffer ( xid ,
gxact - > prepare_start_lsn ,
gxact - > ondisk , false , false ,
NULL , NULL ) ;
if ( buf = = NULL )
{
ereport ( WARNING ,
( errmsg ( " removing corrupt two-phase state file \" %s \" " ,
clde - > d_name ) ) ) ;
RemoveTwoPhaseFile ( xid , true ) ;
continue ;
}
ereport ( LOG ,
( errmsg ( " recovering prepared transaction %u " , xid ) ) ) ;
( errmsg ( " recovering prepared transaction %u from shared memory " , xid ) ) ) ;
/* Deconstruct header */
hdr = ( TwoPhaseFileHeader * ) buf ;
Assert ( TransactionIdEquals ( hdr - > xid , xid ) ) ;
bufptr = buf + MAXALIGN ( sizeof ( TwoPhaseFileHeader ) ) ;
@ -2002,12 +1962,20 @@ RecoverPreparedTransactions(void)
SubTransSetParent ( subxids [ i ] , xid , overwriteOK ) ;
/*
* Recreate its GXACT and dummy PGPROC
* Recreate its GXACT and dummy PGPROC . But , check whether
* it was added in redo and already has a shmem entry for
* it .
*/
gxact = MarkAsPreparing ( xid , gid ,
LWLockAcquire ( TwoPhaseStateLock , LW_EXCLUSIVE ) ;
MarkAsPreparingGuts ( gxact , xid , gid ,
hdr - > prepared_at ,
hdr - > owner , hdr - > database ) ;
gxact - > ondisk = true ;
/* recovered, so reset the flag for entries generated by redo */
gxact - > inredo = false ;
LWLockRelease ( TwoPhaseStateLock ) ;
GXactLoadSubxactData ( gxact , hdr - > nsubxacts , subxids ) ;
MarkAsPrepared ( gxact ) ;
@ -2034,9 +2002,158 @@ RecoverPreparedTransactions(void)
pfree ( buf ) ;
}
}
FreeDir ( cldir ) ;
/*
* ProcessTwoPhaseBuffer
*
* Given a transaction id , read it either from disk or read it directly
* via shmem xlog record pointer using the provided " prepare_start_lsn " .
*
* If setParent is true , then use the overwriteOK parameter to set up
* subtransaction parent linkages .
*
* If result and maxsubxid are not NULL , fill them up with smallest
* running transaction id ( lesser than ShmemVariableCache - > nextXid )
* and largest subtransaction id for this transaction respectively .
*/
static char *
ProcessTwoPhaseBuffer ( TransactionId xid ,
XLogRecPtr prepare_start_lsn ,
bool fromdisk , bool overwriteOK ,
bool setParent , TransactionId * result ,
TransactionId * maxsubxid )
{
TransactionId origNextXid = ShmemVariableCache - > nextXid ;
TransactionId res ;
TransactionId maxsub ;
TransactionId * subxids ;
char * buf ;
TwoPhaseFileHeader * hdr ;
int i ;
if ( ! fromdisk )
Assert ( prepare_start_lsn ! = InvalidXLogRecPtr ) ;
if ( result )
res = * result ;
if ( maxsubxid )
maxsub = * maxsubxid ;
/* Already processed? */
if ( TransactionIdDidCommit ( xid ) | | TransactionIdDidAbort ( xid ) )
{
if ( fromdisk )
{
ereport ( WARNING ,
( errmsg ( " removing stale two-phase state file for \" %u \" " ,
xid ) ) ) ;
RemoveTwoPhaseFile ( xid , true ) ;
}
else
{
ereport ( WARNING ,
( errmsg ( " removing stale two-phase state from "
" shared memory for \" %u \" " , xid ) ) ) ;
PrepareRedoRemove ( xid , true ) ;
}
return NULL ;
}
/* Reject XID if too new */
if ( TransactionIdFollowsOrEquals ( xid , origNextXid ) )
{
if ( fromdisk )
{
ereport ( WARNING ,
( errmsg ( " removing future two-phase state file for \" %u \" " ,
xid ) ) ) ;
RemoveTwoPhaseFile ( xid , true ) ;
}
else
{
ereport ( WARNING ,
( errmsg ( " removing future two-phase state from memory for \" %u \" " ,
xid ) ) ) ;
PrepareRedoRemove ( xid , true ) ;
}
return NULL ;
}
if ( fromdisk )
{
/* Read and validate file */
buf = ReadTwoPhaseFile ( xid , true ) ;
if ( buf = = NULL )
{
ereport ( WARNING ,
( errmsg ( " removing corrupt two-phase state file for \" %u \" " ,
xid ) ) ) ;
RemoveTwoPhaseFile ( xid , true ) ;
return NULL ;
}
}
else
{
/* Read xlog data */
XlogReadTwoPhaseData ( prepare_start_lsn , & buf , NULL ) ;
}
/* Deconstruct header */
hdr = ( TwoPhaseFileHeader * ) buf ;
if ( ! TransactionIdEquals ( hdr - > xid , xid ) )
{
if ( fromdisk )
{
ereport ( WARNING ,
( errmsg ( " removing corrupt two-phase state file for \" %u \" " ,
xid ) ) ) ;
RemoveTwoPhaseFile ( xid , true ) ;
}
else
{
ereport ( WARNING ,
( errmsg ( " removing corrupt two-phase state from memory for \" %u \" " ,
xid ) ) ) ;
PrepareRedoRemove ( xid , true ) ;
}
pfree ( buf ) ;
return NULL ;
}
/*
* OK , we think this buffer is valid . Incorporate xid into the
* running - minimum result .
*/
if ( TransactionIdPrecedes ( xid , res ) )
res = xid ;
/*
* Examine subtransaction XIDs . . . they should all follow main
* XID , and they may force us to advance nextXid .
*/
subxids = ( TransactionId * ) ( buf +
MAXALIGN ( sizeof ( TwoPhaseFileHeader ) ) +
MAXALIGN ( hdr - > gidlen ) ) ;
for ( i = 0 ; i < hdr - > nsubxacts ; i + + )
{
TransactionId subxid = subxids [ i ] ;
Assert ( TransactionIdFollows ( subxid , xid ) ) ;
if ( TransactionIdFollowsOrEquals ( subxid , maxsub ) )
maxsub = subxid ;
if ( setParent )
SubTransSetParent ( xid , subxid , overwriteOK ) ;
}
if ( result )
* result = res ;
if ( maxsubxid )
* maxsubxid = maxsub ;
return buf ;
}
/*
* RecordTransactionCommitPrepared
*
@ -2187,3 +2304,111 @@ RecordTransactionAbortPrepared(TransactionId xid,
*/
SyncRepWaitForLSN ( recptr , false ) ;
}
/*
* PrepareRedoAdd
*
* Store pointers to the start / end of the WAL record along with the xid in
* a gxact entry in shared memory TwoPhaseState structure . If caller
* specifies InvalidXLogRecPtr as WAL position to fetch the two - phase
* data , the entry is marked as located on disk .
*/
void
PrepareRedoAdd ( char * buf , XLogRecPtr start_lsn , XLogRecPtr end_lsn )
{
TwoPhaseFileHeader * hdr = ( TwoPhaseFileHeader * ) buf ;
char * bufptr ;
const char * gid ;
GlobalTransaction gxact ;
Assert ( RecoveryInProgress ( ) ) ;
bufptr = buf + MAXALIGN ( sizeof ( TwoPhaseFileHeader ) ) ;
gid = ( const char * ) bufptr ;
/*
* Reserve the GID for the given transaction in the redo code path .
*
* This creates a gxact struct and puts it into the active array .
*
* In redo , this struct is mainly used to track PREPARE / COMMIT entries
* in shared memory . Hence , we only fill up the bare minimum contents here .
* The gxact also gets marked with gxact - > inredo set to true to indicate
* that it got added in the redo phase
*/
LWLockAcquire ( TwoPhaseStateLock , LW_EXCLUSIVE ) ;
/* Get a free gxact from the freelist */
if ( TwoPhaseState - > freeGXacts = = NULL )
ereport ( ERROR ,
( errcode ( ERRCODE_OUT_OF_MEMORY ) ,
errmsg ( " maximum number of prepared transactions reached " ) ,
errhint ( " Increase max_prepared_transactions (currently %d). " ,
max_prepared_xacts ) ) ) ;
gxact = TwoPhaseState - > freeGXacts ;
TwoPhaseState - > freeGXacts = gxact - > next ;
gxact - > prepared_at = hdr - > prepared_at ;
gxact - > prepare_start_lsn = start_lsn ;
gxact - > prepare_end_lsn = end_lsn ;
gxact - > xid = hdr - > xid ;
gxact - > owner = hdr - > owner ;
gxact - > locking_backend = InvalidBackendId ;
gxact - > valid = false ;
gxact - > ondisk = XLogRecPtrIsInvalid ( start_lsn ) ;
gxact - > inredo = true ; /* yes, added in redo */
strcpy ( gxact - > gid , gid ) ;
/* And insert it into the active array */
Assert ( TwoPhaseState - > numPrepXacts < max_prepared_xacts ) ;
TwoPhaseState - > prepXacts [ TwoPhaseState - > numPrepXacts + + ] = gxact ;
LWLockRelease ( TwoPhaseStateLock ) ;
elog ( DEBUG2 , " Adding 2PC data to shared memory %u " , gxact - > xid ) ;
}
/*
* PrepareRedoRemove
*
* Remove the corresponding gxact entry from TwoPhaseState . Also
* remove the 2 PC file if a prepared transaction was saved via
* an earlier checkpoint .
*/
void
PrepareRedoRemove ( TransactionId xid , bool giveWarning )
{
GlobalTransaction gxact = NULL ;
int i ;
Assert ( RecoveryInProgress ( ) ) ;
LWLockAcquire ( TwoPhaseStateLock , LW_SHARED ) ;
for ( i = 0 ; i < TwoPhaseState - > numPrepXacts ; i + + )
{
gxact = TwoPhaseState - > prepXacts [ i ] ;
if ( gxact - > xid = = xid )
{
Assert ( gxact - > inredo ) ;
break ;
}
}
LWLockRelease ( TwoPhaseStateLock ) ;
/*
* Just leave if there is nothing , this is expected during WAL replay .
*/
if ( gxact = = NULL )
return ;
/*
* And now we can clean up any files we may have left .
*/
elog ( DEBUG2 , " Removing 2PC data from shared memory %u " , xid ) ;
if ( gxact - > ondisk )
RemoveTwoPhaseFile ( xid , giveWarning ) ;
RemoveGXact ( gxact ) ;
return ;
}