@ -71,11 +71,14 @@
/*
* Struct for sharing information to control slot synchronization .
*
* The slot sync worker ' s pid is needed by the startup process to shut it
* down during promotion . The startup process shuts down the slot sync worker
* and also sets stopSignaled = true to handle the race condition when the
* The ' pid ' is either the slot sync worker ' s pid or the backend ' s pid running
* the SQL function pg_sync_replication_slots ( ) . When the startup process sets
* ' stopSignaled ' during promotion , it uses this ' pid ' to wake up the currently
* synchronizing process so that the process can immediately stop its
* synchronizing work on seeing ' stopSignaled ' set .
* Setting ' stopSignaled ' is also used to handle the race condition when the
* postmaster has not noticed the promotion yet and thus may end up restarting
* the slot sync worker . If stopSignaled is set , the worker will exit in such a
* the slot sync worker . If ' stopSignaled ' is set , the worker will exit in such a
* case . The SQL function pg_sync_replication_slots ( ) will also error out if
* this flag is set . Note that we don ' t need to reset this variable as after
* promotion the slot sync worker won ' t be restarted because the pmState
@ -1195,10 +1198,10 @@ ValidateSlotSyncParams(int elevel)
}
/*
* Re - read the config file .
* Re - read the config file for slot synchronization .
*
* Exit if any of the slot sync GUCs have changed . The postmaster will
* restart it .
* Exit or throw error if relevant GUCs have changed depending on whether
* called from slot sync worker or from the SQL function pg_sync_replication_slots ( )
*/
static void
slotsync_reread_config ( void )
@ -1209,7 +1212,10 @@ slotsync_reread_config(void)
bool old_hot_standby_feedback = hot_standby_feedback ;
bool conninfo_changed ;
bool primary_slotname_changed ;
bool is_slotsync_worker = AmLogicalSlotSyncWorkerProcess ( ) ;
bool parameter_changed = false ;
if ( is_slotsync_worker )
Assert ( sync_replication_slots ) ;
ConfigReloadPending = false ;
@ -1221,33 +1227,61 @@ slotsync_reread_config(void)
pfree ( old_primary_slotname ) ;
if ( old_sync_replication_slots ! = sync_replication_slots )
{
if ( is_slotsync_worker )
{
ereport ( LOG ,
/* translator: %s is a GUC variable name */
errmsg ( " replication slot synchronization worker will shut down because \" %s \" is disabled " , " sync_replication_slots " ) ) ;
errmsg ( " replication slot synchronization worker will stop because \" %s \" is disabled " ,
" sync_replication_slots " ) ) ;
proc_exit ( 0 ) ;
}
parameter_changed = true ;
}
else
{
if ( conninfo_changed | |
primary_slotname_changed | |
( old_hot_standby_feedback ! = hot_standby_feedback ) )
{
if ( is_slotsync_worker )
{
ereport ( LOG ,
errmsg ( " replication slot synchronization worker will restart because of a parameter change " ) ) ;
/*
* Reset the last - start time for this worker so that the postmaster
* can restart it without waiting for SLOTSYNC_RESTART_INTERVAL_SEC .
* Reset the last - start time for this worker so that the
* postmaster can restart it without waiting for
* SLOTSYNC_RESTART_INTERVAL_SEC .
*/
SlotSyncCtx - > last_start_time = 0 ;
proc_exit ( 0 ) ;
}
parameter_changed = true ;
}
}
/*
* Interrupt handler for main loop of slot sync worker .
* If we have reached here with a parameter change , we must be running in
* SQL function , emit error in such a case .
*/
if ( parameter_changed )
{
Assert ( ! is_slotsync_worker ) ;
ereport ( ERROR ,
errcode ( ERRCODE_INVALID_PARAMETER_VALUE ) ,
errmsg ( " replication slot synchronization will stop because of a parameter change " ) ) ;
}
}
/*
* Interrupt handler for process performing slot synchronization .
*/
static void
ProcessSlotSyncInterrupts ( void )
@ -1255,12 +1289,25 @@ ProcessSlotSyncInterrupts(void)
CHECK_FOR_INTERRUPTS ( ) ;
if ( SlotSyncCtx - > stopSignaled )
{
if ( AmLogicalSlotSyncWorkerProcess ( ) )
{
ereport ( LOG ,
errmsg ( " replication slot synchronization worker is shutting down because promotion is triggered " ) ) ;
errmsg ( " replication slot synchronization worker will stop because promotion is triggered " ) ) ;
proc_exit ( 0 ) ;
}
else
{
/*
* For the backend executing SQL function
* pg_sync_replication_slots ( ) .
*/
ereport ( ERROR ,
errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " replication slot synchronization will stop because promotion is triggered " ) ) ;
}
}
if ( ConfigReloadPending )
slotsync_reread_config ( ) ;
@ -1362,29 +1409,14 @@ wait_for_slot_activity(bool some_slot_updated)
}
/*
* Emit an error if a promotion or a concurrent sync call is in progress .
* Emit an error if a concurrent sync call is in progress .
* Otherwise , advertise that a sync is in progress .
*/
static void
check_and_set_sync_info ( pid_t worker _pid)
check_and_set_sync_info ( pid_t sync_process _pid)
{
SpinLockAcquire ( & SlotSyncCtx - > mutex ) ;
/* The worker pid must not be already assigned in SlotSyncCtx */
Assert ( worker_pid = = InvalidPid | | SlotSyncCtx - > pid = = InvalidPid ) ;
/*
* Emit an error if startup process signaled the slot sync machinery to
* stop . See comments atop SlotSyncCtxStruct .
*/
if ( SlotSyncCtx - > stopSignaled )
{
SpinLockRelease ( & SlotSyncCtx - > mutex ) ;
ereport ( ERROR ,
errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " cannot synchronize replication slots when standby promotion is ongoing " ) ) ;
}
if ( SlotSyncCtx - > syncing )
{
SpinLockRelease ( & SlotSyncCtx - > mutex ) ;
@ -1393,13 +1425,16 @@ check_and_set_sync_info(pid_t worker_pid)
errmsg ( " cannot synchronize replication slots concurrently " ) ) ;
}
/* The pid must not be already assigned in SlotSyncCtx */
Assert ( SlotSyncCtx - > pid = = InvalidPid ) ;
SlotSyncCtx - > syncing = true ;
/*
* Advertise the required PID so that the startup process can kill the
* slot sync worker on promotion .
* slot sync process on promotion .
*/
SlotSyncCtx - > pid = worker _pid;
SlotSyncCtx - > pid = sync_process _pid;
SpinLockRelease ( & SlotSyncCtx - > mutex ) ;
@ -1414,6 +1449,7 @@ reset_syncing_flag(void)
{
SpinLockAcquire ( & SlotSyncCtx - > mutex ) ;
SlotSyncCtx - > syncing = false ;
SlotSyncCtx - > pid = InvalidPid ;
SpinLockRelease ( & SlotSyncCtx - > mutex ) ;
syncing_slots = false ;
@ -1622,7 +1658,7 @@ update_synced_slots_inactive_since(void)
if ( ! StandbyMode )
return ;
/* The slot sync worker or SQL function mustn't be running by now */
/* The slot sync worker or the SQL function mustn't be running by now */
Assert ( ( SlotSyncCtx - > pid = = InvalidPid ) & & ! SlotSyncCtx - > syncing ) ;
LWLockAcquire ( ReplicationSlotControlLock , LW_SHARED ) ;
@ -1651,16 +1687,18 @@ update_synced_slots_inactive_since(void)
}
/*
* Shut down the slot sync worker .
* Shut down slot synchronization .
*
* This function sends signal to shutdown slot sync worker , if required . It
* also waits till the slot sync worker has exited or
* This function sets stopSignaled = true and wakes up the slot sync process
* ( either worker or backend running the SQL function pg_sync_replication_slots ( ) )
* so that worker can exit or the SQL function pg_sync_replication_slots ( ) can
* finish . It also waits till the slot sync worker has exited or
* pg_sync_replication_slots ( ) has finished .
*/
void
ShutDownSlotSync ( void )
{
pid_t worker _pid;
pid_t sync_process _pid;
SpinLockAcquire ( & SlotSyncCtx - > mutex ) ;
@ -1677,16 +1715,16 @@ ShutDownSlotSync(void)
return ;
}
worker _pid = SlotSyncCtx - > pid ;
sync_process _pid = SlotSyncCtx - > pid ;
SpinLockRelease ( & SlotSyncCtx - > mutex ) ;
/*
* Signal slotsync worker if it was still running . The worker will stop
* upon detecting that the stopSignaled flag is set to true .
* Signal process doing slotsync , if any . The process will stop upon
* detecting that the stopSignaled flag is set to true .
*/
if ( worker _pid ! = InvalidPid )
kill ( worker _pid, SIGUSR1 ) ;
if ( sync_process _pid ! = InvalidPid )
kill ( sync_process _pid, SIGUSR1 ) ;
/* Wait for slot sync to end */
for ( ; ; )
@ -1835,7 +1873,10 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
{
PG_ENSURE_ERROR_CLEANUP ( slotsync_failure_callback , PointerGetDatum ( wrconn ) ) ;
{
check_and_set_sync_info ( InvalidPid ) ;
check_and_set_sync_info ( MyProcPid ) ;
/* Check for interrupts and config changes */
ProcessSlotSyncInterrupts ( ) ;
validate_remote_info ( wrconn ) ;