@ -25,11 +25,28 @@
* what keeps the XID considered running by TransactionIdIsInProgress .
* It is also convenient as a PGPROC to hook the gxact ' s locks to .
*
* In order to survive crashes and shutdowns , all prepared
* transactions must be stored in permanent storage . This includes
* locking information , pending notifications etc . All that state
* information is written to the per - transaction state file in
* the pg_twophase directory .
* Information to recover prepared transactions in case of crash is
* now stored in WAL for the common case . In some cases there will be
* an extended period between preparing a GXACT and commit / abort , in
* which case we need to separately record prepared transaction data
* in permanent storage . This includes locking information , pending
* notifications etc . All that state information is written to the
* per - transaction state file in the pg_twophase directory .
* All prepared transactions will be written prior to shutdown .
*
* Life track of state data is following :
*
* * On PREPARE TRANSACTION backend writes state data only to the WAL and
* stores pointer to the start of the WAL record in
* gxact - > prepare_start_lsn .
* * If COMMIT occurs before checkpoint then backend reads data from WAL
* using prepare_start_lsn .
* * On checkpoint state data copied to files in pg_twophase directory and
* fsynced
* * If COMMIT happens after checkpoint then backend reads state data from
* files
* * In case of crash replay will move data from xlog to files , if that
* hasn ' t happened before . XXX TODO - move to shmem in replay also
*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
@ -51,6 +68,7 @@
# include "access/xlog.h"
# include "access/xloginsert.h"
# include "access/xlogutils.h"
# include "access/xlogreader.h"
# include "catalog/pg_type.h"
# include "catalog/storage.h"
# include "funcapi.h"
@ -117,10 +135,21 @@ typedef struct GlobalTransactionData
int pgprocno ; /* ID of associated dummy PGPROC */
BackendId dummyBackendId ; /* similar to backend id for backends */
TimestampTz prepared_at ; /* time of preparation */
XLogRecPtr prepare_lsn ; /* XLOG offset of prepare record */
/*
* Note that we need to keep track of two LSNs for each GXACT .
* We keep track of the start LSN because this is the address we must
* use to read state data back from WAL when committing a prepared GXACT .
* We keep track of the end LSN because that is the LSN we need to wait
* for prior to commit .
*/
XLogRecPtr prepare_start_lsn ; /* XLOG offset of prepare record start */
XLogRecPtr prepare_end_lsn ; /* XLOG offset of prepare record end */
Oid owner ; /* ID of user that executed the xact */
BackendId locking_backend ; /* backend currently working on the xact */
bool valid ; /* TRUE if PGPROC entry is in proc array */
bool ondisk ; /* TRUE if prepare state file is on disk */
char gid [ GIDSIZE ] ; /* The GID assigned to the prepared xact */
} GlobalTransactionData ;
@ -166,6 +195,7 @@ static void ProcessRecords(char *bufptr, TransactionId xid,
const TwoPhaseCallback callbacks [ ] ) ;
static void RemoveGXact ( GlobalTransaction gxact ) ;
static void XlogReadTwoPhaseData ( XLogRecPtr lsn , char * * buf , int * len ) ;
/*
* Initialization of shared memory
@ -398,11 +428,13 @@ MarkAsPreparing(TransactionId xid, const char *gid,
pgxact - > nxids = 0 ;
gxact - > prepared_at = prepared_at ;
/* initialize LSN to 0 (start of WAL) */
gxact - > prepare_lsn = 0 ;
/* initialize LSN to InvalidXLogRecPtr */
gxact - > prepare_start_lsn = InvalidXLogRecPtr ;
gxact - > prepare_end_lsn = InvalidXLogRecPtr ;
gxact - > owner = owner ;
gxact - > locking_backend = MyBackendId ;
gxact - > valid = false ;
gxact - > ondisk = false ;
strcpy ( gxact - > gid , gid ) ;
/* And insert it into the active array */
@ -578,41 +610,6 @@ RemoveGXact(GlobalTransaction gxact)
elog ( ERROR , " failed to find %p in GlobalTransaction array " , gxact ) ;
}
/*
* TransactionIdIsPrepared
* True iff transaction associated with the identifier is prepared
* for two - phase commit
*
* Note : only gxacts marked " valid " are considered ; but notice we do not
* check the locking status .
*
* This is not currently exported , because it is only needed internally .
*/
static bool
TransactionIdIsPrepared ( TransactionId xid )
{
bool result = false ;
int i ;
LWLockAcquire ( TwoPhaseStateLock , LW_SHARED ) ;
for ( i = 0 ; i < TwoPhaseState - > numPrepXacts ; i + + )
{
GlobalTransaction gxact = TwoPhaseState - > prepXacts [ i ] ;
PGXACT * pgxact = & ProcGlobal - > allPgXact [ gxact - > pgprocno ] ;
if ( gxact - > valid & & pgxact - > xid = = xid )
{
result = true ;
break ;
}
}
LWLockRelease ( TwoPhaseStateLock ) ;
return result ;
}
/*
* Returns an array of all prepared transactions for the user - level
* function pg_prepared_xact .
@ -1013,21 +1010,13 @@ StartPrepare(GlobalTransaction gxact)
}
/*
* Finish preparing state file .
*
* Calculates CRC and writes state file to WAL and in pg_twophase directory .
* Finish preparing state data and writing it to WAL .
*/
void
EndPrepare ( GlobalTransaction gxact )
{
PGXACT * pgxact = & ProcGlobal - > allPgXact [ gxact - > pgprocno ] ;
TransactionId xid = pgxact - > xid ;
TwoPhaseFileHeader * hdr ;
char path [ MAXPGPATH ] ;
StateFileChunk * record ;
pg_crc32c statefile_crc ;
pg_crc32c bogus_crc ;
int fd ;
/* Add the end sentinel to the list of 2PC records */
RegisterTwoPhaseRecord ( TWOPHASE_RM_END_ID , 0 ,
@ -1039,8 +1028,9 @@ EndPrepare(GlobalTransaction gxact)
hdr - > total_len = records . total_len + sizeof ( pg_crc32c ) ;
/*
* If the file size exceeds MaxAllocSize , we won ' t be able to read it in
* ReadTwoPhaseFile . Check for that now , rather than fail at commit time .
* If the data size exceeds MaxAllocSize , we won ' t be able to read it in
* ReadTwoPhaseFile . Check for that now , rather than fail in the case
* where we write data to file and then re - read at commit time .
*/
if ( hdr - > total_len > MaxAllocSize )
ereport ( ERROR ,
@ -1048,70 +1038,8 @@ EndPrepare(GlobalTransaction gxact)
errmsg ( " two-phase state file maximum length exceeded " ) ) ) ;
/*
* Create the 2 PC state file .
*/
TwoPhaseFilePath ( path , xid ) ;
fd = OpenTransientFile ( path ,
O_CREAT | O_EXCL | O_WRONLY | PG_BINARY ,
S_IRUSR | S_IWUSR ) ;
if ( fd < 0 )
ereport ( ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " could not create two-phase state file \" %s \" : %m " ,
path ) ) ) ;
/* Write data to file, and calculate CRC as we pass over it */
INIT_CRC32C ( statefile_crc ) ;
for ( record = records . head ; record ! = NULL ; record = record - > next )
{
COMP_CRC32C ( statefile_crc , record - > data , record - > len ) ;
if ( ( write ( fd , record - > data , record - > len ) ) ! = record - > len )
{
CloseTransientFile ( fd ) ;
ereport ( ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " could not write two-phase state file: %m " ) ) ) ;
}
}
FIN_CRC32C ( statefile_crc ) ;
/*
* Write a deliberately bogus CRC to the state file ; this is just paranoia
* to catch the case where four more bytes will run us out of disk space .
*/
bogus_crc = ~ statefile_crc ;
if ( ( write ( fd , & bogus_crc , sizeof ( pg_crc32c ) ) ) ! = sizeof ( pg_crc32c ) )
{
CloseTransientFile ( fd ) ;
ereport ( ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " could not write two-phase state file: %m " ) ) ) ;
}
/* Back up to prepare for rewriting the CRC */
if ( lseek ( fd , - ( ( off_t ) sizeof ( pg_crc32c ) ) , SEEK_CUR ) < 0 )
{
CloseTransientFile ( fd ) ;
ereport ( ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " could not seek in two-phase state file: %m " ) ) ) ;
}
/*
* The state file isn ' t valid yet , because we haven ' t written the correct
* CRC yet . Before we do that , insert entry in WAL and flush it to disk .
*
* Between the time we have written the WAL entry and the time we write
* out the correct state file CRC , we have an inconsistency : the xact is
* prepared according to WAL but not according to our on - disk state . We
* use a critical section to force a PANIC if we are unable to complete
* the write - - - then , WAL replay should repair the inconsistency . The
* odds of a PANIC actually occurring should be very tiny given that we
* were able to write the bogus CRC above .
* Now writing 2 PC state data to WAL . We let the WAL ' s CRC protection
* cover us , so no need to calculate a separate CRC .
*
* We have to set delayChkpt here , too ; otherwise a checkpoint starting
* immediately after the WAL record is inserted could complete without
@ -1131,24 +1059,13 @@ EndPrepare(GlobalTransaction gxact)
XLogBeginInsert ( ) ;
for ( record = records . head ; record ! = NULL ; record = record - > next )
XLogRegisterData ( record - > data , record - > len ) ;
gxact - > prepare_lsn = XLogInsert ( RM_XACT_ID , XLOG_XACT_PREPARE ) ;
XLogFlush ( gxact - > prepare_lsn ) ;
gxact - > prepare_end_ lsn = XLogInsert ( RM_XACT_ID , XLOG_XACT_PREPARE ) ;
XLogFlush ( gxact - > prepare_end_ lsn ) ;
/* If we crash now, we have prepared: WAL replay will fix things */
/* write correct CRC and close file */
if ( ( write ( fd , & statefile_crc , sizeof ( pg_crc32c ) ) ) ! = sizeof ( pg_crc32c ) )
{
CloseTransientFile ( fd ) ;
ereport ( ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " could not write two-phase state file: %m " ) ) ) ;
}
if ( CloseTransientFile ( fd ) ! = 0 )
ereport ( ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " could not close two-phase state file: %m " ) ) ) ;
/* Store record's start location to read that later on Commit */
gxact - > prepare_start_lsn = ProcLastRecPtr ;
/*
* Mark the prepared transaction as valid . As soon as xact . c marks
@ -1186,7 +1103,7 @@ EndPrepare(GlobalTransaction gxact)
* Note that at this stage we have marked the prepare , but still show as
* running in the procarray ( twice ! ) and continue to hold locks .
*/
SyncRepWaitForLSN ( gxact - > prepare_lsn ) ;
SyncRepWaitForLSN ( gxact - > prepare_end_ lsn ) ;
records . tail = records . head = NULL ;
records . num_chunks = 0 ;
@ -1315,6 +1232,57 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
return buf ;
}
/*
* Reads 2 PC data from xlog . During checkpoint this data will be moved to
* twophase files and ReadTwoPhaseFile should be used instead .
*
* Note clearly that this function accesses WAL during normal operation , similarly
* to the way WALSender or Logical Decoding would do . It does not run during
* crash recovery or standby processing .
*/
static void
XlogReadTwoPhaseData ( XLogRecPtr lsn , char * * buf , int * len )
{
XLogRecord * record ;
XLogReaderState * xlogreader ;
char * errormsg ;
Assert ( ! RecoveryInProgress ( ) ) ;
xlogreader = XLogReaderAllocate ( & read_local_xlog_page , NULL ) ;
if ( ! xlogreader )
ereport ( ERROR ,
( errcode ( ERRCODE_OUT_OF_MEMORY ) ,
errmsg ( " out of memory " ) ,
errdetail ( " Failed while allocating an XLog reading processor. " ) ) ) ;
record = XLogReadRecord ( xlogreader , lsn , & errormsg ) ;
if ( record = = NULL )
ereport ( ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " could not read two-phase state from xlog at %X/%X " ,
( uint32 ) ( lsn > > 32 ) ,
( uint32 ) lsn ) ) ) ;
if ( XLogRecGetRmid ( xlogreader ) ! = RM_XACT_ID | |
( XLogRecGetInfo ( xlogreader ) & XLOG_XACT_OPMASK ) ! = XLOG_XACT_PREPARE )
ereport ( ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " expected two-phase state data is not present in xlog at %X/%X " ,
( uint32 ) ( lsn > > 32 ) ,
( uint32 ) lsn ) ) ) ;
if ( len ! = NULL )
* len = XLogRecGetDataLen ( xlogreader ) ;
* buf = palloc ( sizeof ( char ) * XLogRecGetDataLen ( xlogreader ) ) ;
memcpy ( * buf , XLogRecGetData ( xlogreader ) , sizeof ( char ) * XLogRecGetDataLen ( xlogreader ) ) ;
XLogReaderFree ( xlogreader ) ;
}
/*
* Confirms an xid is prepared , during recovery
*/
@ -1375,14 +1343,16 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
xid = pgxact - > xid ;
/*
* Read and validate the state file
* Read and validate 2 PC state data .
* State data will typically be stored in WAL files if the LSN is after the
* last checkpoint record , or moved to disk if for some reason they have
* lived for a long time .
*/
buf = ReadTwoPhaseFile ( xid , true ) ;
if ( buf = = NULL )
ereport ( ERROR ,
( errcode ( ERRCODE_DATA_CORRUPTED ) ,
errmsg ( " two-phase state file for transaction %u is corrupt " ,
xid ) ) ) ;
if ( gxact - > ondisk )
buf = ReadTwoPhaseFile ( xid , true ) ;
else
XlogReadTwoPhaseData ( gxact - > prepare_start_lsn , & buf , NULL ) ;
/*
* Disassemble the header area
@ -1482,9 +1452,10 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
AtEOXact_PgStat ( isCommit ) ;
/*
* And now we can clean up our mess .
* And now we can clean up any files we may have left .
*/
RemoveTwoPhaseFile ( xid , true ) ;
if ( gxact - > ondisk )
RemoveTwoPhaseFile ( xid , true ) ;
RemoveGXact ( gxact ) ;
MyLockedGxact = NULL ;
@ -1493,8 +1464,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
}
/*
* Scan a 2 PC state file ( already read into memory by ReadTwoPhaseFile )
* and call the indicated callbacks for each 2 PC record .
* Scan 2 PC state data in memory and call the indicated callbacks for each 2 PC record .
*/
static void
ProcessRecords ( char * bufptr , TransactionId xid ,
@ -1539,7 +1509,8 @@ RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
}
/*
* Recreates a state file . This is used in WAL replay .
* Recreates a state file . This is used in WAL replay and during
* checkpoint creation .
*
* Note : content and len don ' t include CRC .
*/
@ -1610,97 +1581,71 @@ RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
* This is deliberately run as late as possible in the checkpoint sequence ,
* because GXACTs ordinarily have short lifespans , and so it is quite
* possible that GXACTs that were valid at checkpoint start will no longer
* exist if we wait a little bit .
* exist if we wait a little bit . With typical checkpoint settings this
* will be about 3 minutes for an online checkpoint , so as a result we
* we expect that there will be no GXACTs that need to be copied to disk .
*
* If a GXACT remains valid across multiple checkpoints , it ' ll be fsynced
* each time . This is considered unusual enough that we don ' t bother to
* expend any extra code to avoid the redundant fsyncs . ( They should be
* reasonably cheap anyway , since they won ' t cause I / O . )
* If a GXACT remains valid across multiple checkpoints , it will already
* be on disk so we don ' t bother to repeat that write .
*/
void
CheckPointTwoPhase ( XLogRecPtr redo_horizon )
{
TransactionId * xids ;
int nxids ;
char path [ MAXPGPATH ] ;
int i ;
int serialized_xacts = 0 ;
/*
* We don ' t want to hold the TwoPhaseStateLock while doing I / O , so we grab
* it just long enough to make a list of the XIDs that require fsyncing ,
* and then do the I / O afterwards .
*
* This approach creates a race condition : someone else could delete a
* GXACT between the time we release TwoPhaseStateLock and the time we try
* to open its state file . We handle this by special - casing ENOENT
* failures : if we see that , we verify that the GXACT is no longer valid ,
* and if so ignore the failure .
*/
if ( max_prepared_xacts < = 0 )
return ; /* nothing to do */
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START ( ) ;
xids = ( TransactionId * ) palloc ( max_prepared_xacts * sizeof ( TransactionId ) ) ;
nxids = 0 ;
/*
* We are expecting there to be zero GXACTs that need to be
* copied to disk , so we perform all I / O while holding
* TwoPhaseStateLock for simplicity . This prevents any new xacts
* from preparing while this occurs , which shouldn ' t be a problem
* since the presence of long - lived prepared xacts indicates the
* transaction manager isn ' t active .
*
* It ' s also possible to move I / O out of the lock , but on
* every error we should check whether somebody commited our
* transaction in different backend . Let ' s leave this optimisation
* for future , if somebody will spot that this place cause
* bottleneck .
*
* Note that it isn ' t possible for there to be a GXACT with
* a prepare_end_lsn set prior to the last checkpoint yet
* is marked invalid , because of the efforts with delayChkpt .
*/
LWLockAcquire ( TwoPhaseStateLock , LW_SHARED ) ;
for ( i = 0 ; i < TwoPhaseState - > numPrepXacts ; i + + )
{
GlobalTransaction gxact = TwoPhaseState - > prepXacts [ i ] ;
PGXACT * pgxact = & ProcGlobal - > allPgXact [ gxact - > pgprocno ] ;
if ( gxact - > valid & &
gxact - > prepare_lsn < = redo_horizon )
xids [ nxids + + ] = pgxact - > xid ;
}
LWLockRelease ( TwoPhaseStateLock ) ;
for ( i = 0 ; i < nxids ; i + + )
{
TransactionId xid = xids [ i ] ;
int fd ;
TwoPhaseFilePath ( path , xid ) ;
fd = OpenTransientFile ( path , O_RDWR | PG_BINARY , 0 ) ;
if ( fd < 0 )
! gxact - > ondisk & &
gxact - > prepare_end_lsn < = redo_horizon )
{
if ( errno = = ENOENT )
{
/* OK if gxact is no longer valid */
if ( ! TransactionIdIsPrepared ( xid ) )
continue ;
/* Restore errno in case it was changed */
errno = ENOENT ;
}
ereport ( ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " could not open two-phase state file \" %s \" : %m " ,
path ) ) ) ;
}
char * buf ;
int len ;
if ( pg_fsync ( fd ) ! = 0 )
{
CloseTransientFile ( fd ) ;
ereport ( ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " could not fsync two-phase state file \" %s \" : %m " ,
path ) ) ) ;
XlogReadTwoPhaseData ( gxact - > prepare_start_lsn , & buf , & len ) ;
RecreateTwoPhaseFile ( pgxact - > xid , buf , len ) ;
gxact - > ondisk = true ;
pfree ( buf ) ;
serialized_xacts + + ;
}
if ( CloseTransientFile ( fd ) ! = 0 )
ereport ( ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " could not close two-phase state file \" %s \" : %m " ,
path ) ) ) ;
}
pfree ( xids ) ;
LWLockRelease ( TwoPhaseStateLock ) ;
TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE ( ) ;
if ( log_checkpoints & & serialized_xacts > 0 )
ereport ( LOG ,
( errmsg ( " %u two-phase state files were written "
" for long-running prepared transactions " ,
serialized_xacts ) ) ) ;
}
/*
@ -2029,17 +1974,11 @@ RecoverPreparedTransactions(void)
/*
* Recreate its GXACT and dummy PGPROC
*
* Note : since we don ' t have the PREPARE record ' s WAL location at
* hand , we leave prepare_lsn zeroes . This means the GXACT will
* be fsync ' d on every future checkpoint . We assume this
* situation is infrequent enough that the performance cost is
* negligible ( especially since we know the state file has already
* been fsynced ) .
*/
gxact = MarkAsPreparing ( xid , hdr - > gid ,
hdr - > prepared_at ,
hdr - > owner , hdr - > database ) ;
gxact - > ondisk = true ;
GXactLoadSubxactData ( gxact , hdr - > nsubxacts , subxids ) ;
MarkAsPrepared ( gxact ) ;