@ -41,7 +41,6 @@ static List *RecoveryLockList;
static void ResolveRecoveryConflictWithVirtualXIDs ( VirtualTransactionId * waitlist ,
ProcSignalReason reason ) ;
static void ResolveRecoveryConflictWithLock ( Oid dbOid , Oid relOid ) ;
static void SendRecoveryConflictWithBufferPin ( ProcSignalReason reason ) ;
static XLogRecPtr LogCurrentRunningXacts ( RunningTransactions CurrRunningXacts ) ;
static void LogAccessExclusiveLocks ( int nlocks , xl_standby_lock * locks ) ;
@ -339,39 +338,65 @@ ResolveRecoveryConflictWithDatabase(Oid dbid)
}
}
static void
ResolveRecoveryConflictWithLock ( Oid dbOid , Oid relOid )
/*
* ResolveRecoveryConflictWithLock is called from ProcSleep ( )
* to resolve conflicts with other backends holding relation locks .
*
* The WaitLatch sleep normally done in ProcSleep ( )
* ( when not InHotStandby ) is performed here , for code clarity .
*
* We either resolve conflicts immediately or set a timeout to wake us at
* the limit of our patience .
*
* Resolve conflicts by cancelling to all backends holding a conflicting
* lock . As we are already queued to be granted the lock , no new lock
* requests conflicting with ours will be granted in the meantime .
*
* Deadlocks involving the Startup process and an ordinary backend process
* will be detected by the deadlock detector within the ordinary backend .
*/
void
ResolveRecoveryConflictWithLock ( LOCKTAG locktag )
{
VirtualTransactionId * backends ;
bool lock_acquired = false ;
int num_attempts = 0 ;
LOCKTAG locktag ;
TimestampTz ltime ;
SET_LOCKTAG_RELATION ( locktag , dbOid , relOid ) ;
Assert ( InHotStandby ) ;
/*
* If blowing away everybody with conflicting locks doesn ' t work , after
* the first two attempts then we just start blowing everybody away until
* it does work . We do this because its likely that we either have too
* many locks and we just can ' t get one at all , or that there are many
* people crowding for the same table . Recovery must win ; the end
* justifies the means .
*/
while ( ! lock_acquired )
{
if ( + + num_attempts < 3 )
backends = GetLockConflicts ( & locktag , AccessExclusiveLock ) ;
else
backends = GetConflictingVirtualXIDs ( InvalidTransactionId ,
InvalidOid ) ;
ltime = GetStandbyLimitTime ( ) ;
if ( GetCurrentTimestamp ( ) > = ltime )
{
/*
* We ' re already behind , so clear a path as quickly as possible .
*/
VirtualTransactionId * backends ;
backends = GetLockConflicts ( & locktag , AccessExclusiveLock ) ;
ResolveRecoveryConflictWithVirtualXIDs ( backends ,
PROCSIG_RECOVERY_CONFLICT_LOCK ) ;
}
else
{
/*
* Wait ( or wait again ) until ltime
*/
EnableTimeoutParams timeouts [ 1 ] ;
if ( LockAcquireExtended ( & locktag , AccessExclusiveLock , true , true , false )
! = LOCKACQUIRE_NOT_AVAIL )
lock_acquired = true ;
timeouts [ 0 ] . id = STANDBY_LOCK_TIMEOUT ;
timeouts [ 0 ] . type = TMPARAM_AT ;
timeouts [ 0 ] . fin_time = ltime ;
enable_timeouts ( timeouts , 1 ) ;
}
/* Wait to be signaled by the release of the Relation Lock */
ProcWaitForSignal ( ) ;
/*
* Clear any timeout requests established above . We assume here that the
* Startup process doesn ' t have any other outstanding timeouts than those
* used by this function . If that stops being true , we could cancel the
* timeouts individually , but that ' d be slower .
*/
disable_all_timeouts ( false ) ;
}
/*
@ -534,6 +559,14 @@ StandbyTimeoutHandler(void)
SendRecoveryConflictWithBufferPin ( PROCSIG_RECOVERY_CONFLICT_BUFFERPIN ) ;
}
/*
* StandbyLockTimeoutHandler ( ) will be called if STANDBY_LOCK_TIMEOUT is exceeded .
* This doesn ' t need to do anything , simply waking up is enough .
*/
void
StandbyLockTimeoutHandler ( void )
{
}
/*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
@ -547,7 +580,7 @@ StandbyTimeoutHandler(void)
* process is the proxy by which the original locks are implemented .
*
* We only keep track of AccessExclusiveLocks , which are only ever held by
* one transaction on one relation , and don ' t worry about lock queuing .
* one transaction on one relation .
*
* We keep a single dynamically expandible list of locks in local memory ,
* RelationLockList , so we can keep track of the various entries made by
@ -589,14 +622,9 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
newlock - > relOid = relOid ;
RecoveryLockList = lappend ( RecoveryLockList , newlock ) ;
/*
* Attempt to acquire the lock as requested , if not resolve conflict
*/
SET_LOCKTAG_RELATION ( locktag , newlock - > dbOid , newlock - > relOid ) ;
if ( LockAcquireExtended ( & locktag , AccessExclusiveLock , true , true , false )
= = LOCKACQUIRE_NOT_AVAIL )
ResolveRecoveryConflictWithLock ( newlock - > dbOid , newlock - > relOid ) ;
LockAcquireExtended ( & locktag , AccessExclusiveLock , true , false , false ) ;
}
static void