|
|
|
|
@ -124,6 +124,7 @@ static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); |
|
|
|
|
static void XLogWalRcvFlush(bool dying); |
|
|
|
|
static void XLogWalRcvSendReply(void); |
|
|
|
|
static void XLogWalRcvSendHSFeedback(void); |
|
|
|
|
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); |
|
|
|
|
|
|
|
|
|
/* Signal handlers */ |
|
|
|
|
static void WalRcvSigHupHandler(SIGNAL_ARGS); |
|
|
|
|
@ -218,6 +219,10 @@ WalReceiverMain(void) |
|
|
|
|
/* Fetch information required to start streaming */ |
|
|
|
|
strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO); |
|
|
|
|
startpoint = walrcv->receiveStart; |
|
|
|
|
|
|
|
|
|
/* Initialise to a sanish value */ |
|
|
|
|
walrcv->lastMsgSendTime = walrcv->lastMsgReceiptTime = GetCurrentTimestamp(); |
|
|
|
|
|
|
|
|
|
SpinLockRelease(&walrcv->mutex); |
|
|
|
|
|
|
|
|
|
/* Arrange to clean up at walreceiver exit */ |
|
|
|
|
@ -433,12 +438,28 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) |
|
|
|
|
errmsg_internal("invalid WAL message received from primary"))); |
|
|
|
|
/* memcpy is required here for alignment reasons */ |
|
|
|
|
memcpy(&msghdr, buf, sizeof(WalDataMessageHeader)); |
|
|
|
|
|
|
|
|
|
ProcessWalSndrMessage(msghdr.walEnd, msghdr.sendTime); |
|
|
|
|
|
|
|
|
|
buf += sizeof(WalDataMessageHeader); |
|
|
|
|
len -= sizeof(WalDataMessageHeader); |
|
|
|
|
|
|
|
|
|
XLogWalRcvWrite(buf, len, msghdr.dataStart); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
case 'k': /* Keepalive */ |
|
|
|
|
{ |
|
|
|
|
PrimaryKeepaliveMessage keepalive; |
|
|
|
|
|
|
|
|
|
if (len != sizeof(PrimaryKeepaliveMessage)) |
|
|
|
|
ereport(ERROR, |
|
|
|
|
(errcode(ERRCODE_PROTOCOL_VIOLATION), |
|
|
|
|
errmsg_internal("invalid keepalive message received from primary"))); |
|
|
|
|
/* memcpy is required here for alignment reasons */ |
|
|
|
|
memcpy(&keepalive, buf, sizeof(PrimaryKeepaliveMessage)); |
|
|
|
|
|
|
|
|
|
ProcessWalSndrMessage(keepalive.walEnd, keepalive.sendTime); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
default: |
|
|
|
|
ereport(ERROR, |
|
|
|
|
(errcode(ERRCODE_PROTOCOL_VIOLATION), |
|
|
|
|
@ -711,3 +732,27 @@ XLogWalRcvSendHSFeedback(void) |
|
|
|
|
memcpy(&buf[1], &feedback_message, sizeof(StandbyHSFeedbackMessage)); |
|
|
|
|
walrcv_send(buf, sizeof(StandbyHSFeedbackMessage) + 1); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Keep track of important messages from primary. |
|
|
|
|
*/ |
|
|
|
|
static void |
|
|
|
|
ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) |
|
|
|
|
{ |
|
|
|
|
/* use volatile pointer to prevent code rearrangement */ |
|
|
|
|
volatile WalRcvData *walrcv = WalRcv; |
|
|
|
|
|
|
|
|
|
TimestampTz lastMsgReceiptTime = GetCurrentTimestamp(); |
|
|
|
|
|
|
|
|
|
/* Update shared-memory status */ |
|
|
|
|
SpinLockAcquire(&walrcv->mutex); |
|
|
|
|
walrcv->lastMsgSendTime = sendTime; |
|
|
|
|
walrcv->lastMsgReceiptTime = lastMsgReceiptTime; |
|
|
|
|
SpinLockRelease(&walrcv->mutex); |
|
|
|
|
|
|
|
|
|
elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d transfer latency %d", |
|
|
|
|
timestamptz_to_str(sendTime), |
|
|
|
|
timestamptz_to_str(lastMsgReceiptTime), |
|
|
|
|
GetReplicationApplyDelay(), |
|
|
|
|
GetReplicationTransferLatency()); |
|
|
|
|
} |
|
|
|
|
|