Send new protocol keepalive messages to standby servers.

Allows streaming replication users to calculate transfer latency
and apply delay via internal functions. No external functions yet.
pull/1/head
Simon Riggs 14 years ago
parent 2ae2e9c007
commit 64233902d2
  1. 48
      doc/src/sgml/protocol.sgml
  2. 43
      src/backend/access/transam/xlog.c
  3. 47
      src/backend/replication/walreceiver.c
  4. 63
      src/backend/replication/walreceiverfuncs.c
  5. 42
      src/backend/replication/walsender.c
  6. 1
      src/include/access/xlog.h
  7. 22
      src/include/replication/walprotocol.h
  8. 8
      src/include/replication/walreceiver.h

@ -1463,6 +1463,54 @@ The commands accepted in walsender mode are:
CopyData message):
</para>
<para>
<variablelist>
<varlistentry>
<term>
Primary keepalive message (B)
</term>
<listitem>
<para>
<variablelist>
<varlistentry>
<term>
Byte1('k')
</term>
<listitem>
<para>
Identifies the message as a sender keepalive.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
Byte8
</term>
<listitem>
<para>
The current end of WAL on the server, given in
XLogRecPtr format.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>
Byte8
</term>
<listitem>
<para>
The server's system clock at the time of transmission,
given in TimestampTz format.
</para>
</listitem>
</varlistentry>
</variablelist>
</para>
</listitem>
</varlistentry>
</variablelist>
</para>
<para>
<variablelist>
<varlistentry>

@ -452,6 +452,9 @@ typedef struct XLogCtlData
XLogRecPtr recoveryLastRecPtr;
/* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
TimestampTz recoveryLastXTime;
/* timestamp of when we started replaying the current chunk of WAL data,
* only relevant for replication or archive recovery */
TimestampTz currentChunkStartTime;
/* end of the last record restored from the archive */
XLogRecPtr restoreLastRecPtr;
/* Are we requested to pause recovery? */
@ -606,6 +609,7 @@ static void exitArchiveRecovery(TimeLineID endTLI,
static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
static void recoveryPausesHere(void);
static void SetLatestXTime(TimestampTz xtime);
static void SetCurrentChunkStartTime(TimestampTz xtime);
static void CheckRequiredParameterValues(void);
static void XLogReportParameters(void);
static void LocalSetXLogInsertAllowed(void);
@ -5847,6 +5851,41 @@ GetLatestXTime(void)
return xtime;
}
/*
* Save timestamp of the next chunk of WAL records to apply.
*
* We keep this in XLogCtl, not a simple static variable, so that it can be
* seen by all backends.
*/
static void
SetCurrentChunkStartTime(TimestampTz xtime)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
SpinLockAcquire(&xlogctl->info_lck);
xlogctl->currentChunkStartTime = xtime;
SpinLockRelease(&xlogctl->info_lck);
}
/*
* Fetch timestamp of latest processed commit/abort record.
* Startup process maintains an accurate local copy in XLogReceiptTime
*/
TimestampTz
GetCurrentChunkReplayStartTime(void)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
TimestampTz xtime;
SpinLockAcquire(&xlogctl->info_lck);
xtime = xlogctl->currentChunkStartTime;
SpinLockRelease(&xlogctl->info_lck);
return xtime;
}
/*
* Returns time of receipt of current chunk of XLOG data, as well as
* whether it was received from streaming replication or from archives.
@ -6390,6 +6429,7 @@ StartupXLOG(void)
xlogctl->replayEndRecPtr = ReadRecPtr;
xlogctl->recoveryLastRecPtr = ReadRecPtr;
xlogctl->recoveryLastXTime = 0;
xlogctl->currentChunkStartTime = 0;
xlogctl->recoveryPause = false;
SpinLockRelease(&xlogctl->info_lck);
@ -9696,7 +9736,10 @@ retry:
{
havedata = true;
if (!XLByteLT(*RecPtr, latestChunkStart))
{
XLogReceiptTime = GetCurrentTimestamp();
SetCurrentChunkStartTime(XLogReceiptTime);
}
}
else
havedata = false;

@ -124,6 +124,7 @@ static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(bool dying);
static void XLogWalRcvSendReply(void);
static void XLogWalRcvSendHSFeedback(void);
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
/* Signal handlers */
static void WalRcvSigHupHandler(SIGNAL_ARGS);
@ -218,6 +219,10 @@ WalReceiverMain(void)
/* Fetch information required to start streaming */
strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
startpoint = walrcv->receiveStart;
/* Initialise to a sanish value */
walrcv->lastMsgSendTime = walrcv->lastMsgReceiptTime = GetCurrentTimestamp();
SpinLockRelease(&walrcv->mutex);
/* Arrange to clean up at walreceiver exit */
@ -433,12 +438,28 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
errmsg_internal("invalid WAL message received from primary")));
/* memcpy is required here for alignment reasons */
memcpy(&msghdr, buf, sizeof(WalDataMessageHeader));
ProcessWalSndrMessage(msghdr.walEnd, msghdr.sendTime);
buf += sizeof(WalDataMessageHeader);
len -= sizeof(WalDataMessageHeader);
XLogWalRcvWrite(buf, len, msghdr.dataStart);
break;
}
case 'k': /* Keepalive */
{
PrimaryKeepaliveMessage keepalive;
if (len != sizeof(PrimaryKeepaliveMessage))
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid keepalive message received from primary")));
/* memcpy is required here for alignment reasons */
memcpy(&keepalive, buf, sizeof(PrimaryKeepaliveMessage));
ProcessWalSndrMessage(keepalive.walEnd, keepalive.sendTime);
break;
}
default:
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
@ -711,3 +732,27 @@ XLogWalRcvSendHSFeedback(void)
memcpy(&buf[1], &feedback_message, sizeof(StandbyHSFeedbackMessage));
walrcv_send(buf, sizeof(StandbyHSFeedbackMessage) + 1);
}
/*
* Keep track of important messages from primary.
*/
static void
ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
/* Update shared-memory status */
SpinLockAcquire(&walrcv->mutex);
walrcv->lastMsgSendTime = sendTime;
walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
SpinLockRelease(&walrcv->mutex);
elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d transfer latency %d",
timestamptz_to_str(sendTime),
timestamptz_to_str(lastMsgReceiptTime),
GetReplicationApplyDelay(),
GetReplicationTransferLatency());
}

@ -28,6 +28,7 @@
#include "replication/walreceiver.h"
#include "storage/pmsignal.h"
#include "storage/shmem.h"
#include "utils/timestamp.h"
WalRcvData *WalRcv = NULL;
@ -238,3 +239,65 @@ GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart)
return recptr;
}
/*
* Returns the replication apply delay in ms
*/
int
GetReplicationApplyDelay(void)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
XLogRecPtr receivePtr;
XLogRecPtr replayPtr;
long secs;
int usecs;
SpinLockAcquire(&walrcv->mutex);
receivePtr = walrcv->receivedUpto;
SpinLockRelease(&walrcv->mutex);
replayPtr = GetXLogReplayRecPtr(NULL);
if (XLByteLE(receivePtr, replayPtr))
return 0;
TimestampDifference(GetCurrentChunkReplayStartTime(),
GetCurrentTimestamp(),
&secs, &usecs);
return (((int) secs * 1000) + (usecs / 1000));
}
/*
* Returns the network latency in ms, note that this includes any
* difference in clock settings between the servers, as well as timezone.
*/
int
GetReplicationTransferLatency(void)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
TimestampTz lastMsgSendTime;
TimestampTz lastMsgReceiptTime;
long secs = 0;
int usecs = 0;
int ms;
SpinLockAcquire(&walrcv->mutex);
lastMsgSendTime = walrcv->lastMsgSendTime;
lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
SpinLockRelease(&walrcv->mutex);
TimestampDifference(lastMsgSendTime,
lastMsgReceiptTime,
&secs, &usecs);
ms = ((int) secs * 1000) + (usecs / 1000);
return ms;
}

@ -131,6 +131,7 @@ static void ProcessStandbyMessage(void);
static void ProcessStandbyReplyMessage(void);
static void ProcessStandbyHSFeedbackMessage(void);
static void ProcessRepliesIfAny(void);
static void WalSndKeepalive(char *msgbuf);
/* Main entry point for walsender process */
@ -823,30 +824,24 @@ WalSndLoop(void)
*/
if (caughtup || pq_is_send_pending())
{
TimestampTz finish_time = 0;
long sleeptime = -1;
TimestampTz timeout = 0;
long sleeptime = 10000; /* 10 s */
int wakeEvents;
wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
WL_SOCKET_READABLE;
WL_SOCKET_READABLE | WL_TIMEOUT;
if (pq_is_send_pending())
wakeEvents |= WL_SOCKET_WRITEABLE;
else
WalSndKeepalive(output_message);
/* Determine time until replication timeout */
if (replication_timeout > 0)
{
long secs;
int usecs;
finish_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
replication_timeout);
TimestampDifference(GetCurrentTimestamp(),
finish_time, &secs, &usecs);
sleeptime = secs * 1000 + usecs / 1000;
/* Avoid Assert in WaitLatchOrSocket if timeout is past */
if (sleeptime < 0)
sleeptime = 0;
wakeEvents |= WL_TIMEOUT;
sleeptime = 1 + (replication_timeout / 10);
}
/* Sleep until something happens or replication timeout */
@ -859,7 +854,7 @@ WalSndLoop(void)
* timeout ... he's supposed to reply *before* that.
*/
if (replication_timeout > 0 &&
GetCurrentTimestamp() >= finish_time)
GetCurrentTimestamp() >= timeout)
{
/*
* Since typically expiration of replication timeout means
@ -1627,6 +1622,23 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
return (Datum) 0;
}
static void
WalSndKeepalive(char *msgbuf)
{
PrimaryKeepaliveMessage keepalive_message;
/* Construct a new message */
keepalive_message.walEnd = sentPtr;
keepalive_message.sendTime = GetCurrentTimestamp();
elog(DEBUG2, "sending replication keepalive");
/* Prepend with the message type and send it. */
msgbuf[0] = 'k';
memcpy(msgbuf + 1, &keepalive_message, sizeof(PrimaryKeepaliveMessage));
pq_putmessage_noblock('d', msgbuf, sizeof(PrimaryKeepaliveMessage) + 1);
}
/*
* This isn't currently used for anything. Monitoring tools might be
* interested in the future, and we'll need something like this in the

@ -293,6 +293,7 @@ extern XLogRecPtr GetXLogWriteRecPtr(void);
extern bool RecoveryIsPaused(void);
extern void SetRecoveryPause(bool recoveryPause);
extern TimestampTz GetLatestXTime(void);
extern TimestampTz GetCurrentChunkReplayStartTime(void);
extern void UpdateControlFile(void);
extern uint64 GetSystemIdentifier(void);

@ -16,6 +16,20 @@
#include "datatype/timestamp.h"
/*
* All messages from WalSender must contain these fields to allow us to
* correctly calculate the replication delay.
*/
typedef struct
{
/* Current end of WAL on the sender */
XLogRecPtr walEnd;
/* Sender's system clock at the time of transmission */
TimestampTz sendTime;
} WalSndrMessage;
/*
* Header for a WAL data message (message type 'w'). This is wrapped within
* a CopyData message at the FE/BE protocol level.
@ -39,6 +53,14 @@ typedef struct
TimestampTz sendTime;
} WalDataMessageHeader;
/*
* Keepalive message from primary (message type 'k'). (lowercase k)
* This is wrapped within a CopyData message at the FE/BE protocol level.
*
* Note that the data length is not specified here.
*/
typedef WalSndrMessage PrimaryKeepaliveMessage;
/*
* Reply message from standby (message type 'r'). This is wrapped within
* a CopyData message at the FE/BE protocol level.

@ -78,6 +78,12 @@ typedef struct
*/
XLogRecPtr latestChunkStart;
/*
* Time of send and receive of any message received.
*/
TimestampTz lastMsgSendTime;
TimestampTz lastMsgReceiptTime;
/*
* connection string; is used for walreceiver to connect with the primary.
*/
@ -112,5 +118,7 @@ extern void ShutdownWalRcv(void);
extern bool WalRcvInProgress(void);
extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart);
extern int GetReplicationApplyDelay(void);
extern int GetReplicationTransferLatency(void);
#endif /* _WALRECEIVER_H */

Loading…
Cancel
Save