@ -524,7 +524,9 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
LOCKTAG locktag ;
LOCKTAG locktag ;
/* Already processed? */
/* Already processed? */
if ( TransactionIdDidCommit ( xid ) | | TransactionIdDidAbort ( xid ) )
if ( ! TransactionIdIsValid ( xid ) | |
TransactionIdDidCommit ( xid ) | |
TransactionIdDidAbort ( xid ) )
return ;
return ;
elog ( trace_recovery ( DEBUG4 ) ,
elog ( trace_recovery ( DEBUG4 ) ,
@ -606,34 +608,86 @@ StandbyReleaseLockTree(TransactionId xid, int nsubxids, TransactionId *subxids)
}
}
/*
/*
* StandbyReleaseLocksMany
* Called at end of recovery and when we see a shutdown checkpoint .
* Release standby locks held by XIDs < removeXid
*
* If keepPreparedXacts is true , keep prepared transactions even if
* they ' re older than removeXid
*/
*/
static void
void
StandbyReleaseLocksMany ( TransactionId removeXid , bool keepPreparedXacts )
StandbyReleaseAllLocks ( void )
{
ListCell * cell ,
* prev ,
* next ;
LOCKTAG locktag ;
elog ( trace_recovery ( DEBUG2 ) , " release all standby locks " ) ;
prev = NULL ;
for ( cell = list_head ( RecoveryLockList ) ; cell ; cell = next )
{
xl_standby_lock * lock = ( xl_standby_lock * ) lfirst ( cell ) ;
next = lnext ( cell ) ;
elog ( trace_recovery ( DEBUG4 ) ,
" releasing recovery lock: xid %u db %u rel %u " ,
lock - > xid , lock - > dbOid , lock - > relOid ) ;
SET_LOCKTAG_RELATION ( locktag , lock - > dbOid , lock - > relOid ) ;
if ( ! LockRelease ( & locktag , AccessExclusiveLock , true ) )
elog ( LOG ,
" RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u " ,
lock - > xid , lock - > dbOid , lock - > relOid ) ;
RecoveryLockList = list_delete_cell ( RecoveryLockList , cell , prev ) ;
pfree ( lock ) ;
}
}
/*
* StandbyReleaseOldLocks
* Release standby locks held by XIDs that aren ' t running , as long
* as they ' re not prepared transactions .
*/
void
StandbyReleaseOldLocks ( int nxids , TransactionId * xids )
{
{
ListCell * cell ,
ListCell * cell ,
* prev ,
* prev ,
* next ;
* next ;
LOCKTAG locktag ;
LOCKTAG locktag ;
/*
* Release all matching locks .
*/
prev = NULL ;
prev = NULL ;
for ( cell = list_head ( RecoveryLockList ) ; cell ; cell = next )
for ( cell = list_head ( RecoveryLockList ) ; cell ; cell = next )
{
{
xl_standby_lock * lock = ( xl_standby_lock * ) lfirst ( cell ) ;
xl_standby_lock * lock = ( xl_standby_lock * ) lfirst ( cell ) ;
bool remove = false ;
next = lnext ( cell ) ;
next = lnext ( cell ) ;
if ( ! TransactionIdIsValid ( removeXid ) | | TransactionIdPrecedes ( lock - > xid , removeXid ) )
Assert ( TransactionIdIsValid ( lock - > xid ) ) ;
if ( StandbyTransactionIdIsPrepared ( lock - > xid ) )
remove = false ;
else
{
int i ;
bool found = false ;
for ( i = 0 ; i < nxids ; i + + )
{
if ( lock - > xid = = xids [ i ] )
{
found = true ;
break ;
}
}
/*
* If its not a running transaction , remove it .
*/
if ( ! found )
remove = true ;
}
if ( remove )
{
{
if ( keepPreparedXacts & & StandbyTransactionIdIsPrepared ( lock - > xid ) )
continue ;
elog ( trace_recovery ( DEBUG4 ) ,
elog ( trace_recovery ( DEBUG4 ) ,
" releasing recovery lock: xid %u db %u rel %u " ,
" releasing recovery lock: xid %u db %u rel %u " ,
lock - > xid , lock - > dbOid , lock - > relOid ) ;
lock - > xid , lock - > dbOid , lock - > relOid ) ;
@ -650,27 +704,6 @@ StandbyReleaseLocksMany(TransactionId removeXid, bool keepPreparedXacts)
}
}
}
}
/*
* Called at end of recovery and when we see a shutdown checkpoint .
*/
void
StandbyReleaseAllLocks ( void )
{
elog ( trace_recovery ( DEBUG2 ) , " release all standby locks " ) ;
StandbyReleaseLocksMany ( InvalidTransactionId , false ) ;
}
/*
* StandbyReleaseOldLocks
* Release standby locks held by XIDs < removeXid , as long
* as they ' re not prepared transactions .
*/
void
StandbyReleaseOldLocks ( TransactionId removeXid )
{
StandbyReleaseLocksMany ( removeXid , true ) ;
}
/*
/*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
* Recovery handling for Rmgr RM_STANDBY_ID
* Recovery handling for Rmgr RM_STANDBY_ID
@ -812,6 +845,13 @@ standby_desc(StringInfo buf, uint8 xl_info, char *rec)
* Later , when we apply the running xact data we must be careful to ignore
* Later , when we apply the running xact data we must be careful to ignore
* transactions already committed , since those commits raced ahead when
* transactions already committed , since those commits raced ahead when
* making WAL entries .
* making WAL entries .
*
* The loose timing also means that locks may be recorded that have a
* zero xid , since xids are removed from procs before locks are removed .
* So we must prune the lock list down to ensure we hold locks only for
* currently running xids , performed by StandbyReleaseOldLocks ( ) .
* Zero xids should no longer be possible , but we may be replaying WAL
* from a time when they were possible .
*/
*/
void
void
LogStandbySnapshot ( TransactionId * nextXid )
LogStandbySnapshot ( TransactionId * nextXid )