|
|
|
@ -211,7 +211,7 @@ typedef struct |
|
|
|
|
#define LAG_TRACKER_BUFFER_SIZE 8192 |
|
|
|
|
|
|
|
|
|
/* A mechanism for tracking replication lag. */ |
|
|
|
|
static struct |
|
|
|
|
typedef struct |
|
|
|
|
{ |
|
|
|
|
XLogRecPtr last_lsn; |
|
|
|
|
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]; |
|
|
|
@ -220,6 +220,8 @@ static struct |
|
|
|
|
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]; |
|
|
|
|
} LagTracker; |
|
|
|
|
|
|
|
|
|
static LagTracker *lag_tracker; |
|
|
|
|
|
|
|
|
|
/* Signal handlers */ |
|
|
|
|
static void WalSndLastCycleHandler(SIGNAL_ARGS); |
|
|
|
|
|
|
|
|
@ -282,7 +284,7 @@ InitWalSender(void) |
|
|
|
|
SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE); |
|
|
|
|
|
|
|
|
|
/* Initialize empty timestamp buffer for lag tracking. */ |
|
|
|
|
memset(&LagTracker, 0, sizeof(LagTracker)); |
|
|
|
|
lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
@ -3439,9 +3441,9 @@ LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time) |
|
|
|
|
* If the lsn hasn't advanced since last time, then do nothing. This way |
|
|
|
|
* we only record a new sample when new WAL has been written. |
|
|
|
|
*/ |
|
|
|
|
if (LagTracker.last_lsn == lsn) |
|
|
|
|
if (lag_tracker->last_lsn == lsn) |
|
|
|
|
return; |
|
|
|
|
LagTracker.last_lsn = lsn; |
|
|
|
|
lag_tracker->last_lsn = lsn; |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* If advancing the write head of the circular buffer would crash into any |
|
|
|
@ -3449,11 +3451,11 @@ LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time) |
|
|
|
|
* slowest reader (presumably apply) is the one that controls the release |
|
|
|
|
* of space. |
|
|
|
|
*/ |
|
|
|
|
new_write_head = (LagTracker.write_head + 1) % LAG_TRACKER_BUFFER_SIZE; |
|
|
|
|
new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE; |
|
|
|
|
buffer_full = false; |
|
|
|
|
for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i) |
|
|
|
|
{ |
|
|
|
|
if (new_write_head == LagTracker.read_heads[i]) |
|
|
|
|
if (new_write_head == lag_tracker->read_heads[i]) |
|
|
|
|
buffer_full = true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -3464,17 +3466,17 @@ LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time) |
|
|
|
|
*/ |
|
|
|
|
if (buffer_full) |
|
|
|
|
{ |
|
|
|
|
new_write_head = LagTracker.write_head; |
|
|
|
|
if (LagTracker.write_head > 0) |
|
|
|
|
LagTracker.write_head--; |
|
|
|
|
new_write_head = lag_tracker->write_head; |
|
|
|
|
if (lag_tracker->write_head > 0) |
|
|
|
|
lag_tracker->write_head--; |
|
|
|
|
else |
|
|
|
|
LagTracker.write_head = LAG_TRACKER_BUFFER_SIZE - 1; |
|
|
|
|
lag_tracker->write_head = LAG_TRACKER_BUFFER_SIZE - 1; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Store a sample at the current write head position. */ |
|
|
|
|
LagTracker.buffer[LagTracker.write_head].lsn = lsn; |
|
|
|
|
LagTracker.buffer[LagTracker.write_head].time = local_flush_time; |
|
|
|
|
LagTracker.write_head = new_write_head; |
|
|
|
|
lag_tracker->buffer[lag_tracker->write_head].lsn = lsn; |
|
|
|
|
lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time; |
|
|
|
|
lag_tracker->write_head = new_write_head; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
@ -3496,14 +3498,14 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now) |
|
|
|
|
TimestampTz time = 0; |
|
|
|
|
|
|
|
|
|
/* Read all unread samples up to this LSN or end of buffer. */ |
|
|
|
|
while (LagTracker.read_heads[head] != LagTracker.write_head && |
|
|
|
|
LagTracker.buffer[LagTracker.read_heads[head]].lsn <= lsn) |
|
|
|
|
while (lag_tracker->read_heads[head] != lag_tracker->write_head && |
|
|
|
|
lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn) |
|
|
|
|
{ |
|
|
|
|
time = LagTracker.buffer[LagTracker.read_heads[head]].time; |
|
|
|
|
LagTracker.last_read[head] = |
|
|
|
|
LagTracker.buffer[LagTracker.read_heads[head]]; |
|
|
|
|
LagTracker.read_heads[head] = |
|
|
|
|
(LagTracker.read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE; |
|
|
|
|
time = lag_tracker->buffer[lag_tracker->read_heads[head]].time; |
|
|
|
|
lag_tracker->last_read[head] = |
|
|
|
|
lag_tracker->buffer[lag_tracker->read_heads[head]]; |
|
|
|
|
lag_tracker->read_heads[head] = |
|
|
|
|
(lag_tracker->read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
@ -3513,8 +3515,8 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now) |
|
|
|
|
* interpolation at the beginning of the next burst of WAL after a period |
|
|
|
|
* of idleness. |
|
|
|
|
*/ |
|
|
|
|
if (LagTracker.read_heads[head] == LagTracker.write_head) |
|
|
|
|
LagTracker.last_read[head].time = 0; |
|
|
|
|
if (lag_tracker->read_heads[head] == lag_tracker->write_head) |
|
|
|
|
lag_tracker->last_read[head].time = 0; |
|
|
|
|
|
|
|
|
|
if (time > now) |
|
|
|
|
{ |
|
|
|
@ -3532,17 +3534,17 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now) |
|
|
|
|
* eventually start moving again and cross one of our samples before |
|
|
|
|
* we can show the lag increasing. |
|
|
|
|
*/ |
|
|
|
|
if (LagTracker.read_heads[head] == LagTracker.write_head) |
|
|
|
|
if (lag_tracker->read_heads[head] == lag_tracker->write_head) |
|
|
|
|
{ |
|
|
|
|
/* There are no future samples, so we can't interpolate. */ |
|
|
|
|
return -1; |
|
|
|
|
} |
|
|
|
|
else if (LagTracker.last_read[head].time != 0) |
|
|
|
|
else if (lag_tracker->last_read[head].time != 0) |
|
|
|
|
{ |
|
|
|
|
/* We can interpolate between last_read and the next sample. */ |
|
|
|
|
double fraction; |
|
|
|
|
WalTimeSample prev = LagTracker.last_read[head]; |
|
|
|
|
WalTimeSample next = LagTracker.buffer[LagTracker.read_heads[head]]; |
|
|
|
|
WalTimeSample prev = lag_tracker->last_read[head]; |
|
|
|
|
WalTimeSample next = lag_tracker->buffer[lag_tracker->read_heads[head]]; |
|
|
|
|
|
|
|
|
|
if (lsn < prev.lsn) |
|
|
|
|
{ |
|
|
|
@ -3579,7 +3581,7 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now) |
|
|
|
|
* standby reaches the future sample the best we can do is report |
|
|
|
|
* the hypothetical lag if that sample were to be replayed now. |
|
|
|
|
*/ |
|
|
|
|
time = LagTracker.buffer[LagTracker.read_heads[head]].time; |
|
|
|
|
time = lag_tracker->buffer[lag_tracker->read_heads[head]].time; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|