@ -24,11 +24,17 @@
* are treated as not a crash but approximately normal termination ;
* the walsender will exit quickly without sending any more XLOG records .
*
* If the server is shut down , postmaster sends us SIGUSR2 after all
* regular backends have exited and the shutdown checkpoint has been written .
* This instructs walsender to send any outstanding WAL , including the
* shutdown checkpoint record , wait for it to be replicated to the standby ,
* and then exit .
* If the server is shut down , checkpointer sends us
* PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited . If
* the backend is idle or runs an SQL query this causes the backend to
* shutdown , if logical replication is in progress all existing WAL records
* are processed followed by a shutdown . Otherwise this causes the walsender
* to switch to the " stopping " state . In this state , the walsender will reject
* any further replication commands . The checkpointer begins the shutdown
* checkpoint once all walsenders are confirmed as stopping . When the shutdown
* checkpoint finishes , the postmaster sends us SIGUSR2 . This instructs
* walsender to send any outstanding WAL , including the shutdown checkpoint
* record , wait for it to be replicated to the standby , and then exit .
*
*
* Portions Copyright ( c ) 2010 - 2017 , PostgreSQL Global Development Group
@ -177,13 +183,14 @@ static bool WalSndCaughtUp = false;
/* Flags set by signal handlers for later service in main loop */
static volatile sig_atomic_t got_SIGHUP = false ;
static volatile sig_atomic_t walsender_ready_to_stop = false ;
static volatile sig_atomic_t got_SIGUSR2 = false ;
static volatile sig_atomic_t got_STOPPING = false ;
/*
* This is set while we are streaming . When not set , SIGUSR2 signal will be
* handled like SIGTERM . When set , the main loop is responsible for checking
* walsender_ready_to_stop and terminating when it ' s set ( after streaming any
* remaining WAL ) .
* This is set while we are streaming . When not set
* PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM . When set ,
* the main loop is responsible for checking got_STOPPING and terminating when
* it ' s set ( after streaming any remaining WAL ) .
*/
static volatile sig_atomic_t replication_active = false ;
@ -300,7 +307,8 @@ WalSndErrorCleanup(void)
ReplicationSlotCleanup ( ) ;
replication_active = false ;
if ( walsender_ready_to_stop )
if ( got_STOPPING | | got_SIGUSR2 )
proc_exit ( 0 ) ;
/* Revert back to startup state */
@ -677,7 +685,7 @@ StartReplication(StartReplicationCmd *cmd)
WalSndLoop ( XLogSendPhysical ) ;
replication_active = false ;
if ( walsender_ready_to_stop )
if ( got_STOPPING )
proc_exit ( 0 ) ;
WalSndSetState ( WALSNDSTATE_STARTUP ) ;
@ -1055,7 +1063,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
{
ereport ( LOG ,
( errmsg ( " terminating walsender process after promotion " ) ) ) ;
walsender_ready_to_stop = true ;
got_STOPPING = true ;
}
WalSndSetState ( WALSNDSTATE_CATCHUP ) ;
@ -1106,7 +1114,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
ReplicationSlotRelease ( ) ;
replication_active = false ;
if ( walsender_ready_to_stop )
if ( got_STOPPING )
proc_exit ( 0 ) ;
WalSndSetState ( WALSNDSTATE_STARTUP ) ;
@ -1311,6 +1319,14 @@ WalSndWaitForWal(XLogRecPtr loc)
/* Check for input from the client */
ProcessRepliesIfAny ( ) ;
/*
* If we ' re shutting down , trigger pending WAL to be written out ,
* otherwise we ' d possibly end up waiting for WAL that never gets
* written , because walwriter has shut down already .
*/
if ( got_STOPPING )
XLogBackgroundFlush ( ) ;
/* Update our idea of the currently flushed position. */
if ( ! RecoveryInProgress ( ) )
RecentFlushPtr = GetFlushRecPtr ( ) ;
@ -1326,7 +1342,7 @@ WalSndWaitForWal(XLogRecPtr loc)
* RecentFlushPtr , so we can send all remaining data before shutting
* down .
*/
if ( walsender_ready_to_stop )
if ( got_STOPPING )
break ;
/*
@ -1400,6 +1416,22 @@ exec_replication_command(const char *cmd_string)
MemoryContext cmd_context ;
MemoryContext old_context ;
/*
* If WAL sender has been told that shutdown is getting close , switch its
* status accordingly to handle the next replication commands correctly .
*/
if ( got_STOPPING )
WalSndSetState ( WALSNDSTATE_STOPPING ) ;
/*
* Throw error if in stopping mode . We need prevent commands that could
* generate WAL while the shutdown checkpoint is being written . To be
* safe , we just prohibit all new commands .
*/
if ( MyWalSnd - > state = = WALSNDSTATE_STOPPING )
ereport ( ERROR ,
( errmsg ( " cannot execute new commands while WAL sender is in stopping mode " ) ) ) ;
/*
* CREATE_REPLICATION_SLOT . . . LOGICAL exports a snapshot until the next
* command arrives . Clean up the old stuff if there ' s anything .
@ -2128,7 +2160,7 @@ WalSndLoop(WalSndSendDataCallback send_data)
* normal termination at shutdown , or a promotion , the walsender
* is not sure which .
*/
if ( walsender_ready_to_stop )
if ( got_SIGUSR2 )
WalSndDone ( send_data ) ;
}
@ -2443,6 +2475,10 @@ XLogSendPhysical(void)
XLogRecPtr endptr ;
Size nbytes ;
/* If requested switch the WAL sender to the stopping state. */
if ( got_STOPPING )
WalSndSetState ( WALSNDSTATE_STOPPING ) ;
if ( streamingDoneSending )
{
WalSndCaughtUp = true ;
@ -2733,7 +2769,16 @@ XLogSendLogical(void)
* point , then we ' re caught up .
*/
if ( logical_decoding_ctx - > reader - > EndRecPtr > = GetFlushRecPtr ( ) )
{
WalSndCaughtUp = true ;
/*
* Have WalSndLoop ( ) terminate the connection in an orderly
* manner , after writing out all the pending data .
*/
if ( got_STOPPING )
got_SIGUSR2 = true ;
}
}
/* Update shared memory status */
@ -2843,6 +2888,26 @@ WalSndRqstFileReload(void)
}
}
/*
* Handle PROCSIG_WALSND_INIT_STOPPING signal .
*/
void
HandleWalSndInitStopping ( void )
{
Assert ( am_walsender ) ;
/*
* If replication has not yet started , die like with SIGTERM . If
* replication is active , only set a flag and wake up the main loop . It
* will send any outstanding WAL , wait for it to be replicated to the
* standby , and then exit gracefully .
*/
if ( ! replication_active )
kill ( MyProcPid , SIGTERM ) ;
else
got_STOPPING = true ;
}
/* SIGHUP: set flag to re-read config file at next convenient time */
static void
WalSndSigHupHandler ( SIGNAL_ARGS )
@ -2856,22 +2921,17 @@ WalSndSigHupHandler(SIGNAL_ARGS)
errno = save_errno ;
}
/* SIGUSR2: set flag to do a last cycle and shut down afterwards */
/*
* SIGUSR2 : set flag to do a last cycle and shut down afterwards . The WAL
* sender should already have been switched to WALSNDSTATE_STOPPING at
* this point .
*/
static void
WalSndLastCycleHandler ( SIGNAL_ARGS )
{
int save_errno = errno ;
/*
* If replication has not yet started , die like with SIGTERM . If
* replication is active , only set a flag and wake up the main loop . It
* will send any outstanding WAL , wait for it to be replicated to the
* standby , and then exit gracefully .
*/
if ( ! replication_active )
kill ( MyProcPid , SIGTERM ) ;
walsender_ready_to_stop = true ;
got_SIGUSR2 = true ;
SetLatch ( MyLatch ) ;
errno = save_errno ;
@ -2969,6 +3029,77 @@ WalSndWakeup(void)
}
}
/*
* Signal all walsenders to move to stopping state .
*
* This will trigger walsenders to move to a state where no further WAL can be
* generated . See this file ' s header for details .
*/
void
WalSndInitStopping ( void )
{
int i ;
for ( i = 0 ; i < max_wal_senders ; i + + )
{
WalSnd * walsnd = & WalSndCtl - > walsnds [ i ] ;
pid_t pid ;
SpinLockAcquire ( & walsnd - > mutex ) ;
pid = walsnd - > pid ;
SpinLockRelease ( & walsnd - > mutex ) ;
if ( pid = = 0 )
continue ;
SendProcSignal ( pid , PROCSIG_WALSND_INIT_STOPPING , InvalidBackendId ) ;
}
}
/*
* Wait that all the WAL senders have quit or reached the stopping state . This
* is used by the checkpointer to control when the shutdown checkpoint can
* safely be performed .
*/
void
WalSndWaitStopping ( void )
{
for ( ; ; )
{
int i ;
bool all_stopped = true ;
for ( i = 0 ; i < max_wal_senders ; i + + )
{
WalSndState state ;
WalSnd * walsnd = & WalSndCtl - > walsnds [ i ] ;
SpinLockAcquire ( & walsnd - > mutex ) ;
if ( walsnd - > pid = = 0 )
{
SpinLockRelease ( & walsnd - > mutex ) ;
continue ;
}
state = walsnd - > state ;
SpinLockRelease ( & walsnd - > mutex ) ;
if ( state ! = WALSNDSTATE_STOPPING )
{
all_stopped = false ;
break ;
}
}
/* safe to leave if confirmation is done for all WAL senders */
if ( all_stopped )
return ;
pg_usleep ( 10000L ) ; /* wait for 10 msec */
}
}
/* Set state for current walsender (only called in walsender) */
void
WalSndSetState ( WalSndState state )
@ -3002,6 +3133,8 @@ WalSndGetStateString(WalSndState state)
return " catchup " ;
case WALSNDSTATE_STREAMING :
return " streaming " ;
case WALSNDSTATE_STOPPING :
return " stopping " ;
}
return " UNKNOWN " ;
}