@ -56,6 +56,7 @@
# include "pg_trace.h"
# include "pgstat.h"
# include "storage/fd.h"
# include "storage/ipc.h"
# include "storage/procarray.h"
# include "storage/smgr.h"
# include "utils/builtins.h"
@ -81,25 +82,25 @@ int max_prepared_xacts = 0;
*
* The lifecycle of a global transaction is :
*
* 1. After checking that the requested GID is not in use , set up an
* entry in the TwoPhaseState - > prepXacts array with the correct XID and GID ,
* with locking_xid = my own XID and valid = false .
* 1. After checking that the requested GID is not in use , set up an entry in
* the TwoPhaseState - > prepXacts array with the correct GID and valid = false ,
* and mark it as locked by my backend .
*
* 2. After successfully completing prepare , set valid = true and enter the
* contained PGPROC into the global ProcArray .
*
* 3. To begin COMMIT PREPARED or ROLLBACK PREPARED , check that the entry
* is valid and its locking_xid is no longer active , then store my current
* XID into locking_xi d. This prevents concurrent attempts to commit or
* rollback the same prepared xact .
* 3. To begin COMMIT PREPARED or ROLLBACK PREPARED , check that the entry is
* valid and not locked , then mark the entry as locked by storing my current
* backend ID into locking_backen d. This prevents concurrent attempts to
* commit or rollback the same prepared xact .
*
* 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED , remove the entry
* from the ProcArray and the TwoPhaseState - > prepXacts array and return it to
* the freelist .
*
* Note that if the preparing transaction fails between steps 1 and 2 , the
* entry will remain in prepXacts until recycled . We can detect recyclable
* entries by checking for valid = false and locking_xid no longer active .
* entry must be removed so that the GID and the GlobalTransaction struct
* can be reused . See AtAbort_Twophase ( ) .
*
* typedef struct GlobalTransactionData * GlobalTransaction appears in
* twophase . h
@ -113,8 +114,8 @@ typedef struct GlobalTransactionData
TimestampTz prepared_at ; /* time of preparation */
XLogRecPtr prepare_lsn ; /* XLOG offset of prepare record */
Oid owner ; /* ID of user that executed the xact */
TransactionId locking_xid ; /* top-level XID of backend working on xact */
bool valid ; /* TRUE if fully prepared */
BackendId locking_backend ; /* backend currently working on the xact */
bool valid ; /* TRUE if PGPROC entry is in proc array */
char gid [ GIDSIZE ] ; /* The GID assigned to the prepared xact */
} GlobalTransactionData ;
@ -139,6 +140,12 @@ typedef struct TwoPhaseStateData
static TwoPhaseStateData * TwoPhaseState ;
/*
* Global transaction entry currently locked by us , if any .
*/
static GlobalTransaction MyLockedGxact = NULL ;
static bool twophaseExitRegistered = false ;
static void RecordTransactionCommitPrepared ( TransactionId xid ,
int nchildren ,
@ -152,6 +159,7 @@ static void RecordTransactionAbortPrepared(TransactionId xid,
RelFileNode * rels ) ;
static void ProcessRecords ( char * bufptr , TransactionId xid ,
const TwoPhaseCallback callbacks [ ] ) ;
static void RemoveGXact ( GlobalTransaction gxact ) ;
/*
@ -221,6 +229,74 @@ TwoPhaseShmemInit(void)
Assert ( found ) ;
}
/*
* Exit hook to unlock the global transaction entry we ' re working on .
*/
static void
AtProcExit_Twophase ( int code , Datum arg )
{
/* same logic as abort */
AtAbort_Twophase ( ) ;
}
/*
* Abort hook to unlock the global transaction entry we ' re working on .
*/
void
AtAbort_Twophase ( void )
{
if ( MyLockedGxact = = NULL )
return ;
/*
* What to do with the locked global transaction entry ? If we were in
* the process of preparing the transaction , but haven ' t written the WAL
* record and state file yet , the transaction must not be considered as
* prepared . Likewise , if we are in the process of finishing an
* already - prepared transaction , and fail after having already written
* the 2 nd phase commit or rollback record to the WAL , the transaction
* should not be considered as prepared anymore . In those cases , just
* remove the entry from shared memory .
*
* Otherwise , the entry must be left in place so that the transaction
* can be finished later , so just unlock it .
*
* If we abort during prepare , after having written the WAL record , we
* might not have transfered all locks and other state to the prepared
* transaction yet . Likewise , if we abort during commit or rollback ,
* after having written the WAL record , we might not have released
* all the resources held by the transaction yet . In those cases , the
* in - memory state can be wrong , but it ' s too late to back out .
*/
if ( ! MyLockedGxact - > valid )
{
RemoveGXact ( MyLockedGxact ) ;
}
else
{
LWLockAcquire ( TwoPhaseStateLock , LW_EXCLUSIVE ) ;
MyLockedGxact - > locking_backend = InvalidBackendId ;
LWLockRelease ( TwoPhaseStateLock ) ;
}
MyLockedGxact = NULL ;
}
/*
* This is called after we have finished transfering state to the prepared
* PGXACT entry .
*/
void
PostPrepare_Twophase ( )
{
LWLockAcquire ( TwoPhaseStateLock , LW_EXCLUSIVE ) ;
MyLockedGxact - > locking_backend = InvalidBackendId ;
LWLockRelease ( TwoPhaseStateLock ) ;
MyLockedGxact = NULL ;
}
/*
* MarkAsPreparing
@ -250,29 +326,15 @@ MarkAsPreparing(TransactionId xid, const char *gid,
errmsg ( " prepared transactions are disabled " ) ,
errhint ( " Set max_prepared_transactions to a nonzero value. " ) ) ) ;
LWLockAcquire ( TwoPhaseStateLock , LW_EXCLUSIVE ) ;
/*
* First , find and recycle any gxacts that failed during prepare . We do
* this partly to ensure we don ' t mistakenly say their GIDs are still
* reserved , and partly so we don ' t fail on out - of - slots unnecessarily .
*/
for ( i = 0 ; i < TwoPhaseState - > numPrepXacts ; i + + )
{
gxact = TwoPhaseState - > prepXacts [ i ] ;
if ( ! gxact - > valid & & ! TransactionIdIsActive ( gxact - > locking_xid ) )
/* on first call, register the exit hook */
if ( ! twophaseExitRegistered )
{
/* It's dead Jim ... remove from the active array */
TwoPhaseState - > numPrepXacts - - ;
TwoPhaseState - > prepXacts [ i ] = TwoPhaseState - > prepXacts [ TwoPhaseState - > numPrepXacts ] ;
/* and put it back in the freelist */
gxact - > proc . links . next = ( SHM_QUEUE * ) TwoPhaseState - > freeGXacts ;
TwoPhaseState - > freeGXacts = gxact ;
/* Back up index count too, so we don't miss scanning one */
i - - ;
}
on_shmem_exit ( AtProcExit_Twophase , 0 ) ;
twophaseExitRegistered = true ;
}
LWLockAcquire ( TwoPhaseStateLock , LW_EXCLUSIVE ) ;
/* Check for conflicting GID */
for ( i = 0 ; i < TwoPhaseState - > numPrepXacts ; i + + )
{
@ -326,7 +388,7 @@ MarkAsPreparing(TransactionId xid, const char *gid,
gxact - > prepare_lsn . xlogid = 0 ;
gxact - > prepare_lsn . xrecoff = 0 ;
gxact - > owner = owner ;
gxact - > locking_xid = xi d ;
gxact - > locking_backend = MyBackendI d ;
gxact - > valid = false ;
strcpy ( gxact - > gid , gid ) ;
@ -334,6 +396,12 @@ MarkAsPreparing(TransactionId xid, const char *gid,
Assert ( TwoPhaseState - > numPrepXacts < max_prepared_xacts ) ;
TwoPhaseState - > prepXacts [ TwoPhaseState - > numPrepXacts + + ] = gxact ;
/*
* Remember that we have this GlobalTransaction entry locked for us .
* If we abort after this , we must release it .
*/
MyLockedGxact = gxact ;
LWLockRelease ( TwoPhaseStateLock ) ;
return gxact ;
@ -393,6 +461,13 @@ LockGXact(const char *gid, Oid user)
{
int i ;
/* on first call, register the exit hook */
if ( ! twophaseExitRegistered )
{
on_shmem_exit ( AtProcExit_Twophase , 0 ) ;
twophaseExitRegistered = true ;
}
LWLockAcquire ( TwoPhaseStateLock , LW_EXCLUSIVE ) ;
for ( i = 0 ; i < TwoPhaseState - > numPrepXacts ; i + + )
@ -406,15 +481,11 @@ LockGXact(const char *gid, Oid user)
continue ;
/* Found it, but has someone else got it locked? */
if ( TransactionIdIsValid ( gxact - > locking_xid ) )
{
if ( TransactionIdIsActive ( gxact - > locking_xid ) )
if ( gxact - > locking_backend ! = InvalidBackendId )
ereport ( ERROR ,
( errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " prepared transaction with identifier \" %s \" is busy " ,
gid ) ) ) ;
gxact - > locking_xid = InvalidTransactionId ;
}
if ( user ! = gxact - > owner & & ! superuser_arg ( user ) )
ereport ( ERROR ,
@ -435,7 +506,8 @@ LockGXact(const char *gid, Oid user)
errhint ( " Connect to the database where the transaction was prepared to finish it. " ) ) ) ;
/* OK for me to lock it */
gxact - > locking_xid = GetTopTransactionId ( ) ;
gxact - > locking_backend = MyBackendId ;
MyLockedGxact = gxact ;
LWLockRelease ( TwoPhaseStateLock ) ;
@ -1041,6 +1113,13 @@ EndPrepare(GlobalTransaction gxact)
*/
MyProc - > inCommit = false ;
/*
* Remember that we have this GlobalTransaction entry locked for us . If
* we crash after this point , it ' s too late to abort , but we must unlock
* it so that the prepared transaction can be committed or rolled back .
*/
MyLockedGxact = gxact ;
END_CRIT_SECTION ( ) ;
records . tail = records . head = NULL ;
@ -1240,8 +1319,9 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
/*
* In case we fail while running the callbacks , mark the gxact invalid so
* no one else will try to commit / rollback , and so it can be recycled
* properly later . It is still locked by our XID so it won ' t go away yet .
* no one else will try to commit / rollback , and so it will be recycled
* if we fail after this point . It is still locked by our backend so it
* won ' t go away yet .
*
* ( We assume it ' s safe to do this without taking TwoPhaseStateLock . )
*/
@ -1291,6 +1371,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
RemoveTwoPhaseFile ( xid , true ) ;
RemoveGXact ( gxact ) ;
MyLockedGxact = NULL ;
pfree ( buf ) ;
}