@ -75,18 +75,19 @@
* Struct for sharing information to control slot synchronization .
*
* The ' pid ' is either the slot sync worker ' s pid or the backend ' s pid running
* the SQL function pg_sync_replication_slots ( ) . When the startup process sets
* ' stopSignaled ' during promotion , it uses this ' pid ' to wake up the currently
* synchronizing process so that the process can immediately stop its
* synchronizing work on seeing ' stopSignaled ' set .
* Setting ' stopSignaled ' is also used to handle the race condition when the
* postmaster has not noticed the promotion yet and thus may end up restarting
* the slot sync worker . If ' stopSignaled ' is set , the worker will exit in such a
* case . The SQL function pg_sync_replication_slots ( ) will also error out if
* this flag is set . Note that we don ' t need to reset this variable as after
* promotion the slot sync worker won ' t be restarted because the pmState
* changes to PM_RUN from PM_HOT_STANDBY and we don ' t support demoting
* primary without restarting the server . See MaybeStartSlotSyncWorker .
* the SQL function pg_sync_replication_slots ( ) . On promotion , the startup
* process sets ' stopSignaled ' and uses this ' pid ' to signal the synchronizing
* process with PROCSIG_SLOTSYNC_MESSAGE and also to wake it up so that the
* process can immediately stop its synchronizing work .
* Setting ' stopSignaled ' on the other hand is used to handle the race
* condition when the postmaster has not noticed the promotion yet and thus may
* end up restarting the slot sync worker . If ' stopSignaled ' is set , the worker
* will exit in such a case . The SQL function pg_sync_replication_slots ( ) will
* also error out if this flag is set . Note that we don ' t need to reset this
* variable as after promotion the slot sync worker won ' t be restarted because
* the pmState changes to PM_RUN from PM_HOT_STANDBY and we don ' t support
* demoting primary without restarting the server .
* See MaybeStartSlotSyncWorker .
*
* The ' syncing ' flag is needed to prevent concurrent slot syncs to avoid slot
* overwrites .
@ -131,6 +132,13 @@ static long sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS;
*/
static bool syncing_slots = false ;
/*
* Interrupt flag set when PROCSIG_SLOTSYNC_MESSAGE is received , asking the
* slotsync worker or pg_sync_replication_slots ( ) to stop because
* standby promotion has been triggered .
*/
volatile sig_atomic_t SlotSyncShutdownPending = false ;
/*
* Structure to hold information fetched from the primary server about a logical
* replication slot .
@ -1186,36 +1194,52 @@ slotsync_reread_config(void)
}
/*
* Interrupt handler for process performing slot synchronization .
* Handle receipt of an interrupt indicating a slotsync shutdown message .
*
* This is called within the SIGUSR1 handler . All we do here is set a flag
* that will cause the next CHECK_FOR_INTERRUPTS ( ) to invoke
* ProcessSlotSyncMessage ( ) .
*/
static void
ProcessSlotSyncInterrupts ( WalReceiverConn * wrconn )
void
HandleSlotSyncMessageInterrupt ( void )
{
CHECK_FOR_INTERRUPTS ( ) ;
InterruptPending = true ;
SlotSyncShutdownPending = true ;
/* latch will be set by procsignal_sigusr1_handler */
}
if ( SlotSyncCtx - > stopSignaled )
{
if ( AmLogicalSlotSyncWorkerProcess ( ) )
{
ereport ( LOG ,
errmsg ( " replication slot synchronization worker will stop because promotion is triggered " ) ) ;
/*
* Handle a PROCSIG_SLOTSYNC_MESSAGE signal , called from ProcessInterrupts ( ) .
*
* If the current process is the slotsync background worker , log a message
* and exit cleanly . If it is a backend executing pg_sync_replication_slots ( ) ,
* raise an error , unless the sync has already finished , in which case there
* is no need to interrupt the caller .
*/
void
ProcessSlotSyncMessage ( void )
{
SlotSyncShutdownPending = false ;
proc_exit ( 0 ) ;
}
else
{
/*
* For the backend executing SQL function
* pg_sync_replication_slots ( ) .
*/
ereport ( ERROR ,
errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " replication slot synchronization will stop because promotion is triggered " ) ) ;
}
if ( AmLogicalSlotSyncWorkerProcess ( ) )
{
ereport ( LOG ,
errmsg ( " replication slot synchronization worker will stop because promotion is triggered " ) ) ;
proc_exit ( 0 ) ;
}
else
{
/*
* If sync has already completed , there is no need to interrupt the
* caller with an error .
*/
if ( ! IsSyncingReplicationSlots ( ) )
return ;
if ( ConfigReloadPending )
slotsync_reread_config ( ) ;
ereport ( ERROR ,
errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " replication slot synchronization will stop because promotion is triggered " ) ) ;
}
}
/*
@ -1322,6 +1346,34 @@ check_and_set_sync_info(pid_t sync_process_pid)
{
SpinLockAcquire ( & SlotSyncCtx - > mutex ) ;
/*
* Exit immediately if promotion has been triggered . This guards against
* a new worker ( or a call to pg_sync_replication_slots ( ) ) that starts
* after the old worker was stopped by ShutDownSlotSync ( ) .
*/
if ( SlotSyncCtx - > stopSignaled )
{
SpinLockRelease ( & SlotSyncCtx - > mutex ) ;
if ( AmLogicalSlotSyncWorkerProcess ( ) )
{
ereport ( DEBUG1 ,
errmsg ( " replication slot synchronization worker will not start because promotion was triggered " ) ) ;
proc_exit ( 0 ) ;
}
else
{
/*
* For the backend executing SQL function
* pg_sync_replication_slots ( ) .
*/
ereport ( ERROR ,
errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " replication slot synchronization will not start because promotion was triggered " ) ) ;
}
}
if ( SlotSyncCtx - > syncing )
{
SpinLockRelease ( & SlotSyncCtx - > mutex ) ;
@ -1525,7 +1577,10 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
{
bool some_slot_updated = false ;
ProcessSlotSyncInterrupts ( wrconn ) ;
CHECK_FOR_INTERRUPTS ( ) ;
if ( ConfigReloadPending )
slotsync_reread_config ( ) ;
some_slot_updated = synchronize_slots ( wrconn ) ;
@ -1625,11 +1680,11 @@ ShutDownSlotSync(void)
SpinLockRelease ( & SlotSyncCtx - > mutex ) ;
/*
* Signal process doing slotsync , if any . The process will stop upon
* detecting that the stopSignaled flag is set to true .
* Signal process doing slotsync , if any , asking it to stop .
*/
if ( sync_process_pid ! = InvalidPid )
kill ( sync_process_pid , SIGUSR1 ) ;
SendProcSignal ( sync_process_pid , PROCSIG_SLOTSYNC_MESSAGE ,
INVALID_PROC_NUMBER ) ;
/* Wait for slot sync to end */
for ( ; ; )
@ -1774,9 +1829,6 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
{
check_and_set_sync_info ( MyProcPid ) ;
/* Check for interrupts and config changes */
ProcessSlotSyncInterrupts ( ) ;
validate_remote_info ( wrconn ) ;
synchronize_slots ( wrconn ) ;