@ -157,6 +157,7 @@ ReplicationSlotsShmemInit(void)
/* everything else is zeroed by the memset above */
SpinLockInit ( & slot - > mutex ) ;
LWLockInitialize ( & slot - > io_in_progress_lock , LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS ) ;
ConditionVariableInit ( & slot - > active_cv ) ;
}
}
}
@ -313,25 +314,35 @@ ReplicationSlotCreate(const char *name, bool db_specific,
LWLockRelease ( ReplicationSlotControlLock ) ;
/*
* Now that the slot has been marked as in_use and in_ active, it ' s safe to
* Now that the slot has been marked as in_use and active , it ' s safe to
* let somebody else try to allocate a slot .
*/
LWLockRelease ( ReplicationSlotAllocationLock ) ;
/* Let everybody know we've modified this slot */
ConditionVariableBroadcast ( & slot - > active_cv ) ;
}
/*
* Find a previously created slot and mark it as used by this backend .
*/
void
ReplicationSlotAcquire ( const char * name )
ReplicationSlotAcquire ( const char * name , bool nowait )
{
ReplicationSlot * slot = NULL ;
ReplicationSlot * slot ;
int active_pid ;
int i ;
int active_pid = 0 ; /* Keep compiler quiet */
retry :
Assert ( MyReplicationSlot = = NULL ) ;
/* Search for the named slot and mark it active if we find it. */
/*
* Search for the named slot and mark it active if we find it . If the
* slot is already active , we exit the loop with active_pid set to the PID
* of the backend that owns it .
*/
active_pid = 0 ;
slot = NULL ;
LWLockAcquire ( ReplicationSlotControlLock , LW_SHARED ) ;
for ( i = 0 ; i < max_replication_slots ; i + + )
{
@ -339,35 +350,66 @@ ReplicationSlotAcquire(const char *name)
if ( s - > in_use & & strcmp ( name , NameStr ( s - > data . name ) ) = = 0 )
{
/*
* This is the slot we want . We don ' t know yet if it ' s active ,
* so get ready to sleep on it in case it is . ( We may end up not
* sleeping , but we don ' t want to do this while holding the
* spinlock . )
*/
ConditionVariablePrepareToSleep ( & s - > active_cv ) ;
SpinLockAcquire ( & s - > mutex ) ;
active_pid = s - > active_pid ;
if ( active_pid = = 0 )
active_pid = s - > active_pid = MyProcPid ;
SpinLockRelease ( & s - > mutex ) ;
slot = s ;
break ;
}
}
LWLockRelease ( ReplicationSlotControlLock ) ;
/* If we did not find the slot or it was already active , error out. */
/* If we did not find the slot, error out. */
if ( slot = = NULL )
ereport ( ERROR ,
( errcode ( ERRCODE_UNDEFINED_OBJECT ) ,
errmsg ( " replication slot \" %s \" does not exist " , name ) ) ) ;
/*
* If we found the slot but it ' s already active in another backend , we
* either error out or retry after a short wait , as caller specified .
*/
if ( active_pid ! = MyProcPid )
ereport ( ERROR ,
( errcode ( ERRCODE_OBJECT_IN_USE ) ,
errmsg ( " replication slot \" %s \" is active for PID %d " ,
name , active_pid ) ) ) ;
{
if ( nowait )
ereport ( ERROR ,
( errcode ( ERRCODE_OBJECT_IN_USE ) ,
errmsg ( " replication slot \" %s \" is active for PID %d " ,
name , active_pid ) ) ) ;
/* Wait here until we get signaled, and then restart */
ConditionVariableSleep ( & slot - > active_cv , PG_WAIT_LOCK ) ;
ConditionVariableCancelSleep ( ) ;
goto retry ;
}
else
ConditionVariableCancelSleep ( ) ; /* no sleep needed after all */
/* Let everybody know we've modified this slot */
ConditionVariableBroadcast ( & slot - > active_cv ) ;
/* We made this slot active, so it's ours now. */
MyReplicationSlot = slot ;
}
/*
* Release a replication slot , this or another backend can ReAcquire it
* later . Resources this slot requires will be preserved .
* Release the replication slot that this backend considers to own .
*
* This or another backend can re - acquire the slot later .
* Resources this slot requires will be preserved .
*/
void
ReplicationSlotRelease ( void )
@ -385,17 +427,6 @@ ReplicationSlotRelease(void)
*/
ReplicationSlotDropAcquired ( ) ;
}
else if ( slot - > data . persistency = = RS_PERSISTENT )
{
/*
* Mark persistent slot inactive . We ' re not freeing it , just
* disconnecting .
*/
SpinLockAcquire ( & slot - > mutex ) ;
slot - > active_pid = 0 ;
SpinLockRelease ( & slot - > mutex ) ;
}
/*
* If slot needed to temporarily restrain both data and catalog xmin to
@ -412,6 +443,18 @@ ReplicationSlotRelease(void)
ReplicationSlotsComputeRequiredXmin ( false ) ;
}
if ( slot - > data . persistency = = RS_PERSISTENT )
{
/*
* Mark persistent slot inactive . We ' re not freeing it , just
* disconnecting , but wake up others that may be waiting for it .
*/
SpinLockAcquire ( & slot - > mutex ) ;
slot - > active_pid = 0 ;
SpinLockRelease ( & slot - > mutex ) ;
ConditionVariableBroadcast ( & slot - > active_cv ) ;
}
MyReplicationSlot = NULL ;
/* might not have been set when we've been a plain slot */
@ -430,32 +473,43 @@ ReplicationSlotCleanup(void)
Assert ( MyReplicationSlot = = NULL ) ;
/*
* No need for locking as we are only interested in slots active in
* current process and those are not touched by other processes .
*/
restart :
LWLockAcquire ( ReplicationSlotControlLock , LW_SHARED ) ;
for ( i = 0 ; i < max_replication_slots ; i + + )
{
ReplicationSlot * s = & ReplicationSlotCtl - > replication_slots [ i ] ;
if ( ! s - > in_use )
continue ;
SpinLockAcquire ( & s - > mutex ) ;
if ( s - > active_pid = = MyProcPid )
{
Assert ( s - > in_use & & s - > data . persistency = = RS_TEMPORARY ) ;
Assert ( s - > data . persistency = = RS_TEMPORARY ) ;
SpinLockRelease ( & s - > mutex ) ;
LWLockRelease ( ReplicationSlotControlLock ) ; /* avoid deadlock */
ReplicationSlotDropPtr ( s ) ;
ConditionVariableBroadcast ( & s - > active_cv ) ;
goto restart ;
}
else
SpinLockRelease ( & s - > mutex ) ;
}
LWLockRelease ( ReplicationSlotControlLock ) ;
}
/*
* Permanently drop replication slot identified by the passed in name .
*/
void
ReplicationSlotDrop ( const char * name )
ReplicationSlotDrop ( const char * name , bool nowait )
{
Assert ( MyReplicationSlot = = NULL ) ;
ReplicationSlotAcquire ( name ) ;
ReplicationSlotAcquire ( name , nowait ) ;
ReplicationSlotDropAcquired ( ) ;
}
@ -527,6 +581,9 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
slot - > active_pid = 0 ;
SpinLockRelease ( & slot - > mutex ) ;
/* wake up anyone waiting on this slot */
ConditionVariableBroadcast ( & slot - > active_cv ) ;
ereport ( fail_softly ? WARNING : ERROR ,
( errcode_for_file_access ( ) ,
errmsg ( " could not rename file \" %s \" to \" %s \" : %m " ,
@ -535,15 +592,18 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
/*
* The slot is definitely gone . Lock out concurrent scans of the array
* long enough to kill it . It ' s OK to clear the active flag here without
* long enough to kill it . It ' s OK to clear the active PID here without
* grabbing the mutex because nobody else can be scanning the array here ,
* and nobody can be attached to this slot and thus access it without
* scanning the array .
*
* Also wake up processes waiting for it .
*/
LWLockAcquire ( ReplicationSlotControlLock , LW_EXCLUSIVE ) ;
slot - > active_pid = 0 ;
slot - > in_use = false ;
LWLockRelease ( ReplicationSlotControlLock ) ;
ConditionVariableBroadcast ( & slot - > active_cv ) ;
/*
* Slot is dead and doesn ' t prevent resource removal anymore , recompute