@ -144,11 +144,7 @@ int max_prepared_xacts = 0;
*
* typedef struct GlobalTransactionData * GlobalTransaction appears in
* twophase . h
*
* Note that the max value of GIDSIZE must fit in the uint16 gidlen ,
* specified in TwoPhaseFileHeader .
*/
# define GIDSIZE 200
typedef struct GlobalTransactionData
{
@ -211,12 +207,14 @@ static void RecordTransactionCommitPrepared(TransactionId xid,
RelFileNode * rels ,
int ninvalmsgs ,
SharedInvalidationMessage * invalmsgs ,
bool initfileinval ) ;
bool initfileinval ,
const char * gid ) ;
static void RecordTransactionAbortPrepared ( TransactionId xid ,
int nchildren ,
TransactionId * children ,
int nrels ,
RelFileNode * rels ) ;
RelFileNode * rels ,
const char * gid ) ;
static void ProcessRecords ( char * bufptr , TransactionId xid ,
const TwoPhaseCallback callbacks [ ] ) ;
static void RemoveGXact ( GlobalTransaction gxact ) ;
@ -898,7 +896,7 @@ TwoPhaseGetDummyProc(TransactionId xid)
/*
* Header for a 2 PC state file
*/
# define TWOPHASE_MAGIC 0x57F94533 /* format identifier */
# define TWOPHASE_MAGIC 0x57F94534 /* format identifier */
typedef struct TwoPhaseFileHeader
{
@ -914,6 +912,8 @@ typedef struct TwoPhaseFileHeader
int32 ninvalmsgs ; /* number of cache invalidation messages */
bool initfileinval ; /* does relcache init file need invalidation? */
uint16 gidlen ; /* length of the GID - GID follows the header */
XLogRecPtr origin_lsn ; /* lsn of this record at origin node */
TimestampTz origin_timestamp ; /* time of prepare at origin node */
} TwoPhaseFileHeader ;
/*
@ -1065,6 +1065,7 @@ EndPrepare(GlobalTransaction gxact)
{
TwoPhaseFileHeader * hdr ;
StateFileChunk * record ;
bool replorigin ;
/* Add the end sentinel to the list of 2PC records */
RegisterTwoPhaseRecord ( TWOPHASE_RM_END_ID , 0 ,
@ -1075,6 +1076,21 @@ EndPrepare(GlobalTransaction gxact)
Assert ( hdr - > magic = = TWOPHASE_MAGIC ) ;
hdr - > total_len = records . total_len + sizeof ( pg_crc32c ) ;
replorigin = ( replorigin_session_origin ! = InvalidRepOriginId & &
replorigin_session_origin ! = DoNotReplicateId ) ;
if ( replorigin )
{
Assert ( replorigin_session_origin_lsn ! = InvalidXLogRecPtr ) ;
hdr - > origin_lsn = replorigin_session_origin_lsn ;
hdr - > origin_timestamp = replorigin_session_origin_timestamp ;
}
else
{
hdr - > origin_lsn = InvalidXLogRecPtr ;
hdr - > origin_timestamp = 0 ;
}
/*
* If the data size exceeds MaxAllocSize , we won ' t be able to read it in
* ReadTwoPhaseFile . Check for that now , rather than fail in the case
@ -1107,7 +1123,16 @@ EndPrepare(GlobalTransaction gxact)
XLogBeginInsert ( ) ;
for ( record = records . head ; record ! = NULL ; record = record - > next )
XLogRegisterData ( record - > data , record - > len ) ;
XLogSetRecordFlags ( XLOG_INCLUDE_ORIGIN ) ;
gxact - > prepare_end_lsn = XLogInsert ( RM_XACT_ID , XLOG_XACT_PREPARE ) ;
if ( replorigin )
/* Move LSNs forward for this replication origin */
replorigin_session_advance ( replorigin_session_origin_lsn ,
gxact - > prepare_end_lsn ) ;
XLogFlush ( gxact - > prepare_end_lsn ) ;
/* If we crash now, we have prepared: WAL replay will fix things */
@ -1283,6 +1308,44 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
return buf ;
}
/*
* ParsePrepareRecord
*/
void
ParsePrepareRecord ( uint8 info , char * xlrec , xl_xact_parsed_prepare * parsed )
{
TwoPhaseFileHeader * hdr ;
char * bufptr ;
hdr = ( TwoPhaseFileHeader * ) xlrec ;
bufptr = xlrec + MAXALIGN ( sizeof ( TwoPhaseFileHeader ) ) ;
parsed - > origin_lsn = hdr - > origin_lsn ;
parsed - > origin_timestamp = hdr - > origin_timestamp ;
parsed - > twophase_xid = hdr - > xid ;
parsed - > dbId = hdr - > database ;
parsed - > nsubxacts = hdr - > nsubxacts ;
parsed - > nrels = hdr - > ncommitrels ;
parsed - > nabortrels = hdr - > nabortrels ;
parsed - > nmsgs = hdr - > ninvalmsgs ;
strncpy ( parsed - > twophase_gid , bufptr , hdr - > gidlen ) ;
bufptr + = MAXALIGN ( hdr - > gidlen ) ;
parsed - > subxacts = ( TransactionId * ) bufptr ;
bufptr + = MAXALIGN ( hdr - > nsubxacts * sizeof ( TransactionId ) ) ;
parsed - > xnodes = ( RelFileNode * ) bufptr ;
bufptr + = MAXALIGN ( hdr - > ncommitrels * sizeof ( RelFileNode ) ) ;
parsed - > abortnodes = ( RelFileNode * ) bufptr ;
bufptr + = MAXALIGN ( hdr - > nabortrels * sizeof ( RelFileNode ) ) ;
parsed - > msgs = ( SharedInvalidationMessage * ) bufptr ;
bufptr + = MAXALIGN ( hdr - > ninvalmsgs * sizeof ( SharedInvalidationMessage ) ) ;
}
/*
* Reads 2 PC data from xlog . During checkpoint this data will be moved to
@ -1435,11 +1498,12 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
hdr - > nsubxacts , children ,
hdr - > ncommitrels , commitrels ,
hdr - > ninvalmsgs , invalmsgs ,
hdr - > initfileinval ) ;
hdr - > initfileinval , gid ) ;
else
RecordTransactionAbortPrepared ( xid ,
hdr - > nsubxacts , children ,
hdr - > nabortrels , abortrels ) ;
hdr - > nabortrels , abortrels ,
gid ) ;
ProcArrayRemove ( proc , latestXid ) ;
@ -1752,7 +1816,8 @@ restoreTwoPhaseData(void)
if ( buf = = NULL )
continue ;
PrepareRedoAdd ( buf , InvalidXLogRecPtr , InvalidXLogRecPtr ) ;
PrepareRedoAdd ( buf , InvalidXLogRecPtr ,
InvalidXLogRecPtr , InvalidRepOriginId ) ;
}
}
LWLockRelease ( TwoPhaseStateLock ) ;
@ -2165,7 +2230,8 @@ RecordTransactionCommitPrepared(TransactionId xid,
RelFileNode * rels ,
int ninvalmsgs ,
SharedInvalidationMessage * invalmsgs ,
bool initfileinval )
bool initfileinval ,
const char * gid )
{
XLogRecPtr recptr ;
TimestampTz committs = GetCurrentTimestamp ( ) ;
@ -2193,7 +2259,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
ninvalmsgs , invalmsgs ,
initfileinval , false ,
MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK ,
xid ) ;
xid , gid ) ;
if ( replorigin )
@ -2255,7 +2321,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
int nchildren ,
TransactionId * children ,
int nrels ,
RelFileNode * rels )
RelFileNode * rels ,
const char * gid )
{
XLogRecPtr recptr ;
@ -2278,7 +2345,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
nchildren , children ,
nrels , rels ,
MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK ,
xid ) ;
xid , gid ) ;
/* Always flush, since we're about to remove the 2PC state file */
XLogFlush ( recptr ) ;
@ -2309,7 +2376,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
* data , the entry is marked as located on disk .
*/
void
PrepareRedoAdd ( char * buf , XLogRecPtr start_lsn , XLogRecPtr end_lsn )
PrepareRedoAdd ( char * buf , XLogRecPtr start_lsn ,
XLogRecPtr end_lsn , RepOriginId origin_id )
{
TwoPhaseFileHeader * hdr = ( TwoPhaseFileHeader * ) buf ;
char * bufptr ;
@ -2358,6 +2426,13 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
Assert ( TwoPhaseState - > numPrepXacts < max_prepared_xacts ) ;
TwoPhaseState - > prepXacts [ TwoPhaseState - > numPrepXacts + + ] = gxact ;
if ( origin_id ! = InvalidRepOriginId )
{
/* recover apply progress */
replorigin_advance ( origin_id , hdr - > origin_lsn , end_lsn ,
false /* backward */ , false /* WAL */ ) ;
}
elog ( DEBUG2 , " added 2PC data in shared memory for transaction %u " , gxact - > xid ) ;
}