@ -99,6 +99,9 @@ ReplicationSlot *MyReplicationSlot = NULL;
int max_replication_slots = 0 ; /* the maximum number of replication
* slots */
static ReplicationSlot * SearchNamedReplicationSlot ( const char * name ) ;
static int ReplicationSlotAcquireInternal ( ReplicationSlot * slot ,
const char * name , SlotAcquireBehavior behavior ) ;
static void ReplicationSlotDropAcquired ( void ) ;
static void ReplicationSlotDropPtr ( ReplicationSlot * slot ) ;
@ -322,38 +325,89 @@ ReplicationSlotCreate(const char *name, bool db_specific,
}
/*
* Find a previously created slot and mark it as used by this backend .
* Search for the named replication slot .
*
* Return the replication slot if found , otherwise NULL .
*
* The caller must hold ReplicationSlotControlLock in shared mode .
*/
static ReplicationSlot *
SearchNamedReplicationSlot ( const char * name )
{
int i ;
ReplicationSlot * slot = NULL ;
Assert ( LWLockHeldByMeInMode ( ReplicationSlotControlLock ,
LW_SHARED ) ) ;
for ( i = 0 ; i < max_replication_slots ; i + + )
{
ReplicationSlot * s = & ReplicationSlotCtl - > replication_slots [ i ] ;
if ( s - > in_use & & strcmp ( name , NameStr ( s - > data . name ) ) = = 0 )
{
slot = s ;
break ;
}
}
return slot ;
}
/*
* Find a previously created slot and mark it as used by this process .
*
* The return value is only useful if behavior is SAB_Inquire , in which
* it ' s zero if we successfully acquired the slot , or the PID of the
* owning process otherwise . If behavior is SAB_Error , then trying to
* acquire an owned slot is an error . If SAB_Block , we sleep until the
* slot is released by the owning process .
* it ' s zero if we successfully acquired the slot , - 1 if the slot no longer
* exists , or the PID of the owning process otherwise . If behavior is
* SAB_Error , then trying to acquire an owned slot is an error .
* If SAB_Block , we sleep until the slot is released by the owning process .
*/
int
ReplicationSlotAcquire ( const char * name , SlotAcquireBehavior behavior )
{
ReplicationSlot * slot ;
return ReplicationSlotAcquireInternal ( NULL , name , behavior ) ;
}
/*
* Mark the specified slot as used by this process .
*
* Only one of slot and name can be specified .
* If slot = = NULL , search for the slot with the given name .
*
* See comments about the return value in ReplicationSlotAcquire ( ) .
*/
static int
ReplicationSlotAcquireInternal ( ReplicationSlot * slot , const char * name ,
SlotAcquireBehavior behavior )
{
ReplicationSlot * s ;
int active_pid ;
int i ;
AssertArg ( ( slot = = NULL ) ^ ( name = = NULL ) ) ;
retry :
Assert ( MyReplicationSlot = = NULL ) ;
LWLockAcquire ( ReplicationSlotControlLock , LW_SHARED ) ;
/*
* Search for the named slot and mark it active if we find it . If the
* slot is already active , we exit the loop with active_pid set to the PID
* of the backend that owns it .
* Search for the slot with the specified name if the slot to acquire is
* not given . If the slot is not found , we either return - 1 or error out .
*/
active_pid = 0 ;
slot = NULL ;
LWLockAcquire ( ReplicationSlotControlLock , LW_SHARED ) ;
for ( i = 0 ; i < max_replication_slots ; i + + )
s = slot ? slot : SearchNamedReplicationSlot ( name ) ;
if ( s = = NULL | | ! s - > in_use )
{
ReplicationSlot * s = & ReplicationSlotCtl - > replication_slots [ i ] ;
LWLockRelease ( ReplicationSlotControlLock ) ;
if ( behavior = = SAB_Inquire )
return - 1 ;
ereport ( ERROR ,
( errcode ( ERRCODE_UNDEFINED_OBJECT ) ,
errmsg ( " replication slot \" %s \" does not exist " ,
name ? name : NameStr ( slot - > data . name ) ) ) ) ;
}
if ( s - > in_use & & strcmp ( name , NameStr ( s - > data . name ) ) = = 0 )
{
/*
* This is the slot we want ; check if it ' s active under some other
* process . In single user mode , we don ' t need this check .
@ -361,38 +415,27 @@ retry:
if ( IsUnderPostmaster )
{
/*
* Get ready to sleep on it in case it is active . ( We may end
* up not sleeping , but we don ' t want to do this while holding
* the spinlock . )
* Get ready to sleep on the slot in case it is active if SAB_Block .
* ( We may end up not sleeping , but we don ' t want to do this while
* holding the spinlock . )
*/
if ( behavior = = SAB_Block )
ConditionVariablePrepareToSleep ( & s - > active_cv ) ;
SpinLockAcquire ( & s - > mutex ) ;
if ( s - > active_pid = = 0 )
s - > active_pid = MyProcPid ;
active_pid = s - > active_pid ;
if ( active_pid = = 0 )
active_pid = s - > active_pid = MyProcPid ;
SpinLockRelease ( & s - > mutex ) ;
}
else
active_pid = MyProcPid ;
slot = s ;
break ;
}
}
LWLockRelease ( ReplicationSlotControlLock ) ;
/* If we did not find the slot, error out. */
if ( slot = = NULL )
ereport ( ERROR ,
( errcode ( ERRCODE_UNDEFINED_OBJECT ) ,
errmsg ( " replication slot \" %s \" does not exist " , name ) ) ) ;
/*
* If we found the slot but it ' s already active in another backend , we
* either error out or retry after a short wait , as caller specified .
* If we found the slot but it ' s already active in another process , we
* either error out , return the PID of the owning process , or retry
* after a short wait , as caller specified .
*/
if ( active_pid ! = MyProcPid )
{
@ -400,24 +443,24 @@ retry:
ereport ( ERROR ,
( errcode ( ERRCODE_OBJECT_IN_USE ) ,
errmsg ( " replication slot \" %s \" is active for PID %d " ,
name , active_pid ) ) ) ;
NameStr ( s - > data . name ) , active_pid ) ) ) ;
else if ( behavior = = SAB_Inquire )
return active_pid ;
/* Wait here until we get signaled, and then restart */
ConditionVariableSleep ( & slot - > active_cv ,
ConditionVariableSleep ( & s - > active_cv ,
WAIT_EVENT_REPLICATION_SLOT_DROP ) ;
ConditionVariableCancelSleep ( ) ;
goto retry ;
}
else
else if ( behavior = = SAB_Block )
ConditionVariableCancelSleep ( ) ; /* no sleep needed after all */
/* Let everybody know we've modified this slot */
ConditionVariableBroadcast ( & slot - > active_cv ) ;
ConditionVariableBroadcast ( & s - > active_cv ) ;
/* We made this slot active, so it's ours now. */
MyReplicationSlot = slot ;
MyReplicationSlot = s ;
/* success */
return 0 ;
@ -1100,43 +1143,82 @@ restart:
ReplicationSlot * s = & ReplicationSlotCtl - > replication_slots [ i ] ;
XLogRecPtr restart_lsn = InvalidXLogRecPtr ;
NameData slotname ;
int wspid ;
int last_signaled_pid = 0 ;
if ( ! s - > in_use )
continue ;
SpinLockAcquire ( & s - > mutex ) ;
if ( s - > data . restart_lsn = = InvalidXLogRecPtr | |
s - > data . restart_lsn > = oldestLSN )
{
SpinLockRelease ( & s - > mutex ) ;
continue ;
}
slotname = s - > data . name ;
restart_lsn = s - > data . restart_lsn ;
SpinLockRelease ( & s - > mutex ) ;
if ( XLogRecPtrIsInvalid ( restart_lsn ) | | restart_lsn > = oldestLSN )
continue ;
LWLockRelease ( ReplicationSlotControlLock ) ;
/* Get ready to sleep on the slot in case it is active */
ConditionVariablePrepareToSleep ( & s - > active_cv ) ;
for ( ; ; )
{
int wspid = ReplicationSlotAcquire ( NameStr ( slotname ) ,
SAB_Inquire ) ;
/*
* Try to mark this slot as used by this process .
*
* Note that ReplicationSlotAcquireInternal ( SAB_Inquire )
* should not cancel the prepared condition variable
* if this slot is active in other process . Because in this case
* we have to wait on that CV for the process owning
* the slot to be terminated , later .
*/
wspid = ReplicationSlotAcquireInternal ( s , NULL , SAB_Inquire ) ;
/* no walsender? success! */
if ( wspid = = 0 )
/*
* Exit the loop if we successfully acquired the slot or
* the slot was dropped during waiting for the owning process
* to be terminated . For example , the latter case is likely to
* happen when the slot is temporary because it ' s automatically
* dropped by the termination of the owning process .
*/
if ( wspid < = 0 )
break ;
/*
* Signal to terminate the process that owns the slot .
*
* There is the race condition where other process may own
* the slot after the process using it was terminated and before
* this process owns it . To handle this case , we signal again
* if the PID of the owning process is changed than the last .
*
* XXX This logic assumes that the same PID is not reused
* very quickly .
*/
if ( last_signaled_pid ! = wspid )
{
ereport ( LOG ,
( errmsg ( " terminating walsender %d because replication slot \" %s \" is too far behind " ,
( errmsg ( " terminating process %d because replication slot \" %s \" is too far behind " ,
wspid , NameStr ( slotname ) ) ) ) ;
( void ) kill ( wspid , SIGTERM ) ;
last_signaled_pid = wspid ;
}
ConditionVariableTimedSleep ( & s - > active_cv , 10 ,
WAIT_EVENT_REPLICATION_SLOT_DROP ) ;
}
ConditionVariableCancelSleep ( ) ;
/*
* Do nothing here and start from scratch if the slot has
* already been dropped .
*/
if ( wspid = = - 1 )
{
CHECK_FOR_INTERRUPTS ( ) ;
goto restart ;
}
ereport ( LOG ,
( errmsg ( " invalidating slot \" %s \" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size " ,
NameStr ( slotname ) ,