|
|
|
@ -23,6 +23,7 @@ |
|
|
|
|
#include <signal.h> |
|
|
|
|
|
|
|
|
|
#include "access/xlog_internal.h" |
|
|
|
|
#include "pgstat.h" |
|
|
|
|
#include "postmaster/startup.h" |
|
|
|
|
#include "replication/walreceiver.h" |
|
|
|
|
#include "storage/pmsignal.h" |
|
|
|
@ -62,6 +63,7 @@ WalRcvShmemInit(void) |
|
|
|
|
/* First time through, so initialize */ |
|
|
|
|
MemSet(WalRcv, 0, WalRcvShmemSize()); |
|
|
|
|
WalRcv->walRcvState = WALRCV_STOPPED; |
|
|
|
|
ConditionVariableInit(&WalRcv->walRcvStoppedCV); |
|
|
|
|
SpinLockInit(&WalRcv->mutex); |
|
|
|
|
pg_atomic_init_u64(&WalRcv->writtenUpto, 0); |
|
|
|
|
WalRcv->latch = NULL; |
|
|
|
@ -95,12 +97,18 @@ WalRcvRunning(void) |
|
|
|
|
|
|
|
|
|
if ((now - startTime) > WALRCV_STARTUP_TIMEOUT) |
|
|
|
|
{ |
|
|
|
|
SpinLockAcquire(&walrcv->mutex); |
|
|
|
|
bool stopped = false; |
|
|
|
|
|
|
|
|
|
SpinLockAcquire(&walrcv->mutex); |
|
|
|
|
if (walrcv->walRcvState == WALRCV_STARTING) |
|
|
|
|
{ |
|
|
|
|
state = walrcv->walRcvState = WALRCV_STOPPED; |
|
|
|
|
|
|
|
|
|
stopped = true; |
|
|
|
|
} |
|
|
|
|
SpinLockRelease(&walrcv->mutex); |
|
|
|
|
|
|
|
|
|
if (stopped) |
|
|
|
|
ConditionVariableBroadcast(&walrcv->walRcvStoppedCV); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -140,12 +148,18 @@ WalRcvStreaming(void) |
|
|
|
|
|
|
|
|
|
if ((now - startTime) > WALRCV_STARTUP_TIMEOUT) |
|
|
|
|
{ |
|
|
|
|
SpinLockAcquire(&walrcv->mutex); |
|
|
|
|
bool stopped = false; |
|
|
|
|
|
|
|
|
|
SpinLockAcquire(&walrcv->mutex); |
|
|
|
|
if (walrcv->walRcvState == WALRCV_STARTING) |
|
|
|
|
{ |
|
|
|
|
state = walrcv->walRcvState = WALRCV_STOPPED; |
|
|
|
|
|
|
|
|
|
stopped = true; |
|
|
|
|
} |
|
|
|
|
SpinLockRelease(&walrcv->mutex); |
|
|
|
|
|
|
|
|
|
if (stopped) |
|
|
|
|
ConditionVariableBroadcast(&walrcv->walRcvStoppedCV); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -165,6 +179,7 @@ ShutdownWalRcv(void) |
|
|
|
|
{ |
|
|
|
|
WalRcvData *walrcv = WalRcv; |
|
|
|
|
pid_t walrcvpid = 0; |
|
|
|
|
bool stopped = false; |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED |
|
|
|
@ -178,6 +193,7 @@ ShutdownWalRcv(void) |
|
|
|
|
break; |
|
|
|
|
case WALRCV_STARTING: |
|
|
|
|
walrcv->walRcvState = WALRCV_STOPPED; |
|
|
|
|
stopped = true; |
|
|
|
|
break; |
|
|
|
|
|
|
|
|
|
case WALRCV_STREAMING: |
|
|
|
@ -191,6 +207,10 @@ ShutdownWalRcv(void) |
|
|
|
|
} |
|
|
|
|
SpinLockRelease(&walrcv->mutex); |
|
|
|
|
|
|
|
|
|
/* Unnecessary but consistent. */ |
|
|
|
|
if (stopped) |
|
|
|
|
ConditionVariableBroadcast(&walrcv->walRcvStoppedCV); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Signal walreceiver process if it was still running. |
|
|
|
|
*/ |
|
|
|
@ -201,16 +221,11 @@ ShutdownWalRcv(void) |
|
|
|
|
* Wait for walreceiver to acknowledge its death by setting state to |
|
|
|
|
* WALRCV_STOPPED. |
|
|
|
|
*/ |
|
|
|
|
ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV); |
|
|
|
|
while (WalRcvRunning()) |
|
|
|
|
{ |
|
|
|
|
/*
|
|
|
|
|
* This possibly-long loop needs to handle interrupts of startup |
|
|
|
|
* process. |
|
|
|
|
*/ |
|
|
|
|
HandleStartupProcInterrupts(); |
|
|
|
|
|
|
|
|
|
pg_usleep(100000); /* 100ms */ |
|
|
|
|
} |
|
|
|
|
ConditionVariableSleep(&walrcv->walRcvStoppedCV, |
|
|
|
|
WAIT_EVENT_WALRCV_EXIT); |
|
|
|
|
ConditionVariableCancelSleep(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|