Automatically terminate replication connections that are idle for more

than replication_timeout (a new GUC) milliseconds. The TCP timeout is often
too long, you want the master to notice a dead connection much sooner.
People complained about that in 9.0 too, but with synchronous replication
it's even more important to notice dead connections promptly.

Fujii Masao and Heikki Linnakangas
pull/1/head
Heikki Linnakangas 15 years ago
parent bc03c5937d
commit 754baa21f7
  1. 28
      doc/src/sgml/config.sgml
  2. 240
      src/backend/libpq/pqcomm.c
  3. 26
      src/backend/port/unix_latch.c
  4. 13
      src/backend/port/win32/socket.c
  5. 30
      src/backend/port/win32_latch.c
  6. 143
      src/backend/replication/walsender.c
  7. 10
      src/backend/utils/misc/guc.c
  8. 1
      src/backend/utils/misc/postgresql.conf.sample
  9. 3
      src/include/libpq/libpq.h
  10. 1
      src/include/replication/walsender.h
  11. 2
      src/include/storage/latch.h

@ -2019,6 +2019,29 @@ SET ENABLE_SEQSCAN TO OFF;
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry id="guc-replication-timeout" xreflabel="replication_timeout">
<term><varname>replication_timeout</varname> (<type>integer</type>)</term>
<indexterm>
<primary><varname>replication_timeout</> configuration parameter</primary>
</indexterm>
<listitem>
<para>
Terminate replication connections that are inactive longer
than the specified number of milliseconds. This is useful for
the primary server to detect a standby crash or network outage.
A value of zero means wait forever. This parameter can only be set in
the <filename>postgresql.conf</> file or on the server command line.
The default value is 60 seconds.
</para>
<para>
To prevent connections from being terminated prematurely,
<xref linkend="guc-wal-receiver-status-interval">
must be enabled on the standby, and its value must be less than the
value of <varname>replication_timeout</>.
</para>
</listitem>
</varlistentry>
</variablelist> </variablelist>
</sect2> </sect2>
@ -2216,6 +2239,11 @@ SET ENABLE_SEQSCAN TO OFF;
the <filename>postgresql.conf</> file or on the server command line. the <filename>postgresql.conf</> file or on the server command line.
The default value is 10 seconds. The default value is 10 seconds.
</para> </para>
<para>
When <xref linkend="guc-replication-timeout"> is enabled on the primary,
<varname>wal_receiver_status_interval</> must be enabled, and its value
must be less than the value of <varname>replication_timeout</>.
</para>
</listitem> </listitem>
</varlistentry> </varlistentry>

@ -55,10 +55,12 @@
* pq_peekbyte - peek at next byte from connection * pq_peekbyte - peek at next byte from connection
* pq_putbytes - send bytes to connection (not flushed until pq_flush) * pq_putbytes - send bytes to connection (not flushed until pq_flush)
* pq_flush - flush pending output * pq_flush - flush pending output
* pq_flush_if_writable - flush pending output if writable without blocking
* pq_getbyte_if_available - get a byte if available without blocking * pq_getbyte_if_available - get a byte if available without blocking
* *
* message-level I/O (and old-style-COPY-OUT cruft): * message-level I/O (and old-style-COPY-OUT cruft):
* pq_putmessage - send a normal message (suppressed in COPY OUT mode) * pq_putmessage - send a normal message (suppressed in COPY OUT mode)
* pq_putmessage_noblock - buffer a normal message (suppressed in COPY OUT)
* pq_startcopyout - inform libpq that a COPY OUT transfer is beginning * pq_startcopyout - inform libpq that a COPY OUT transfer is beginning
* pq_endcopyout - end a COPY OUT transfer * pq_endcopyout - end a COPY OUT transfer
* *
@ -92,6 +94,7 @@
#include "miscadmin.h" #include "miscadmin.h"
#include "storage/ipc.h" #include "storage/ipc.h"
#include "utils/guc.h" #include "utils/guc.h"
#include "utils/memutils.h"
/* /*
* Configuration options * Configuration options
@ -105,15 +108,21 @@ static char sock_path[MAXPGPATH];
/* /*
* Buffers for low-level I/O * Buffers for low-level I/O.
*
* The receive buffer is fixed size. Send buffer is usually 8k, but can be
* enlarged by pq_putmessage_noblock() if the message doesn't fit otherwise.
*/ */
#define PQ_BUFFER_SIZE 8192 #define PQ_SEND_BUFFER_SIZE 8192
#define PQ_RECV_BUFFER_SIZE 8192
static char PqSendBuffer[PQ_BUFFER_SIZE]; static char *PqSendBuffer;
static int PqSendBufferSize; /* Size send buffer */
static int PqSendPointer; /* Next index to store a byte in PqSendBuffer */ static int PqSendPointer; /* Next index to store a byte in PqSendBuffer */
static int PqSendStart; /* Next index to send a byte in PqSendBuffer */
static char PqRecvBuffer[PQ_BUFFER_SIZE]; static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
static int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */ static int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */
static int PqRecvLength; /* End of data available in PqRecvBuffer */ static int PqRecvLength; /* End of data available in PqRecvBuffer */
@ -128,6 +137,7 @@ static bool DoingCopyOut;
static void pq_close(int code, Datum arg); static void pq_close(int code, Datum arg);
static int internal_putbytes(const char *s, size_t len); static int internal_putbytes(const char *s, size_t len);
static int internal_flush(void); static int internal_flush(void);
static void pq_set_nonblocking(bool nonblocking);
#ifdef HAVE_UNIX_SOCKETS #ifdef HAVE_UNIX_SOCKETS
static int Lock_AF_UNIX(unsigned short portNumber, char *unixSocketName); static int Lock_AF_UNIX(unsigned short portNumber, char *unixSocketName);
@ -142,7 +152,9 @@ static int Setup_AF_UNIX(void);
void void
pq_init(void) pq_init(void)
{ {
PqSendPointer = PqRecvPointer = PqRecvLength = 0; PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
PqCommBusy = false; PqCommBusy = false;
DoingCopyOut = false; DoingCopyOut = false;
on_proc_exit(pq_close, 0); on_proc_exit(pq_close, 0);
@ -732,6 +744,42 @@ TouchSocketFile(void)
* -------------------------------- * --------------------------------
*/ */
/* --------------------------------
* pq_set_nonblocking - set socket blocking/non-blocking
*
* Sets the socket non-blocking if nonblocking is TRUE, or sets it
* blocking otherwise.
* --------------------------------
*/
static void
pq_set_nonblocking(bool nonblocking)
{
if (MyProcPort->noblock == nonblocking)
return;
#ifdef WIN32
pgwin32_noblock = nonblocking ? 1 : 0;
#else
/*
* Use COMMERROR on failure, because ERROR would try to send the error
* to the client, which might require changing the mode again, leading
* to infinite recursion.
*/
if (nonblocking)
{
if (!pg_set_noblock(MyProcPort->sock))
ereport(COMMERROR,
(errmsg("could not set socket to non-blocking mode: %m")));
}
else
{
if (!pg_set_block(MyProcPort->sock))
ereport(COMMERROR,
(errmsg("could not set socket to blocking mode: %m")));
}
#endif
MyProcPort->noblock = nonblocking;
}
/* -------------------------------- /* --------------------------------
* pq_recvbuf - load some bytes into the input buffer * pq_recvbuf - load some bytes into the input buffer
@ -756,13 +804,16 @@ pq_recvbuf(void)
PqRecvLength = PqRecvPointer = 0; PqRecvLength = PqRecvPointer = 0;
} }
/* Ensure that we're in blocking mode */
pq_set_nonblocking(false);
/* Can fill buffer from PqRecvLength and upwards */ /* Can fill buffer from PqRecvLength and upwards */
for (;;) for (;;)
{ {
int r; int r;
r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength, r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
PQ_BUFFER_SIZE - PqRecvLength); PQ_RECV_BUFFER_SIZE - PqRecvLength);
if (r < 0) if (r < 0)
{ {
@ -825,7 +876,6 @@ pq_peekbyte(void)
return (unsigned char) PqRecvBuffer[PqRecvPointer]; return (unsigned char) PqRecvBuffer[PqRecvPointer];
} }
/* -------------------------------- /* --------------------------------
* pq_getbyte_if_available - get a single byte from connection, * pq_getbyte_if_available - get a single byte from connection,
* if available * if available
@ -845,72 +895,38 @@ pq_getbyte_if_available(unsigned char *c)
return 1; return 1;
} }
/* Temporarily put the socket into non-blocking mode */ /* Put the socket into non-blocking mode */
#ifdef WIN32 pq_set_nonblocking(true);
pgwin32_noblock = 1;
#else r = secure_read(MyProcPort, c, 1);
if (!pg_set_noblock(MyProcPort->sock)) if (r < 0)
ereport(ERROR,
(errmsg("could not set socket to non-blocking mode: %m")));
#endif
MyProcPort->noblock = true;
PG_TRY();
{ {
r = secure_read(MyProcPort, c, 1); /*
if (r < 0) * Ok if no data available without blocking or interrupted (though
* EINTR really shouldn't happen with a non-blocking socket).
* Report other errors.
*/
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
r = 0;
else
{ {
/* /*
* Ok if no data available without blocking or interrupted (though * Careful: an ereport() that tries to write to the client
* EINTR really shouldn't happen with a non-blocking socket). * would cause recursion to here, leading to stack overflow
* Report other errors. * and core dump! This message must go *only* to the
* postmaster log.
*/ */
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) ereport(COMMERROR,
r = 0; (errcode_for_socket_access(),
else errmsg("could not receive data from client: %m")));
{
/*
* Careful: an ereport() that tries to write to the client
* would cause recursion to here, leading to stack overflow
* and core dump! This message must go *only* to the
* postmaster log.
*/
ereport(COMMERROR,
(errcode_for_socket_access(),
errmsg("could not receive data from client: %m")));
r = EOF;
}
}
else if (r == 0)
{
/* EOF detected */
r = EOF; r = EOF;
} }
} }
PG_CATCH(); else if (r == 0)
{ {
/* /* EOF detected */
* The rest of the backend code assumes the socket is in blocking r = EOF;
* mode, so treat failure as FATAL.
*/
#ifdef WIN32
pgwin32_noblock = 0;
#else
if (!pg_set_block(MyProcPort->sock))
ereport(FATAL,
(errmsg("could not set socket to blocking mode: %m")));
#endif
MyProcPort->noblock = false;
PG_RE_THROW();
} }
PG_END_TRY();
#ifdef WIN32
pgwin32_noblock = 0;
#else
if (!pg_set_block(MyProcPort->sock))
ereport(FATAL,
(errmsg("could not set socket to blocking mode: %m")));
#endif
MyProcPort->noblock = false;
return r; return r;
} }
@ -1138,10 +1154,13 @@ internal_putbytes(const char *s, size_t len)
while (len > 0) while (len > 0)
{ {
/* If buffer is full, then flush it out */ /* If buffer is full, then flush it out */
if (PqSendPointer >= PQ_BUFFER_SIZE) if (PqSendPointer >= PqSendBufferSize)
{
pq_set_nonblocking(false);
if (internal_flush()) if (internal_flush())
return EOF; return EOF;
amount = PQ_BUFFER_SIZE - PqSendPointer; }
amount = PqSendBufferSize - PqSendPointer;
if (amount > len) if (amount > len)
amount = len; amount = len;
memcpy(PqSendBuffer + PqSendPointer, s, amount); memcpy(PqSendBuffer + PqSendPointer, s, amount);
@ -1167,17 +1186,25 @@ pq_flush(void)
if (PqCommBusy) if (PqCommBusy)
return 0; return 0;
PqCommBusy = true; PqCommBusy = true;
pq_set_nonblocking(false);
res = internal_flush(); res = internal_flush();
PqCommBusy = false; PqCommBusy = false;
return res; return res;
} }
/* --------------------------------
* internal_flush - flush pending output
*
* Returns 0 if OK (meaning everything was sent, or operation would block
* and the socket is in non-blocking mode), or EOF if trouble.
* --------------------------------
*/
static int static int
internal_flush(void) internal_flush(void)
{ {
static int last_reported_send_errno = 0; static int last_reported_send_errno = 0;
char *bufptr = PqSendBuffer; char *bufptr = PqSendBuffer + PqSendStart;
char *bufend = PqSendBuffer + PqSendPointer; char *bufend = PqSendBuffer + PqSendPointer;
while (bufptr < bufend) while (bufptr < bufend)
@ -1191,6 +1218,16 @@ internal_flush(void)
if (errno == EINTR) if (errno == EINTR)
continue; /* Ok if we were interrupted */ continue; /* Ok if we were interrupted */
/*
* Ok if no data writable without blocking, and the socket
* is in non-blocking mode.
*/
if (errno == EAGAIN ||
errno == EWOULDBLOCK)
{
return 0;
}
/* /*
* Careful: an ereport() that tries to write to the client would * Careful: an ereport() that tries to write to the client would
* cause recursion to here, leading to stack overflow and core * cause recursion to here, leading to stack overflow and core
@ -1212,18 +1249,56 @@ internal_flush(void)
* We drop the buffered data anyway so that processing can * We drop the buffered data anyway so that processing can
* continue, even though we'll probably quit soon. * continue, even though we'll probably quit soon.
*/ */
PqSendPointer = 0; PqSendStart = PqSendPointer = 0;
return EOF; return EOF;
} }
last_reported_send_errno = 0; /* reset after any successful send */ last_reported_send_errno = 0; /* reset after any successful send */
bufptr += r; bufptr += r;
PqSendStart += r;
} }
PqSendPointer = 0; PqSendStart = PqSendPointer = 0;
return 0; return 0;
} }
/* --------------------------------
* pq_flush_if_writable - flush pending output if writable without blocking
*
* Returns 0 if OK, or EOF if trouble.
* --------------------------------
*/
int
pq_flush_if_writable(void)
{
int res;
/* Quick exit if nothing to do */
if (PqSendPointer == PqSendStart)
return 0;
/* No-op if reentrant call */
if (PqCommBusy)
return 0;
/* Temporarily put the socket into non-blocking mode */
pq_set_nonblocking(true);
PqCommBusy = true;
res = internal_flush();
PqCommBusy = false;
return res;
}
/* --------------------------------
* pq_is_send_pending - is there any pending data in the output buffer?
* --------------------------------
*/
bool
pq_is_send_pending(void)
{
return (PqSendStart < PqSendPointer);
}
/* -------------------------------- /* --------------------------------
* Message-level I/O routines begin here. * Message-level I/O routines begin here.
@ -1285,6 +1360,33 @@ fail:
return EOF; return EOF;
} }
/* --------------------------------
* pq_putmessage_noblock - like pq_putmessage, but never blocks
*
* If the output buffer is too small to hold the message, the buffer
* is enlarged.
*/
void
pq_putmessage_noblock(char msgtype, const char *s, size_t len)
{
int res;
int required;
/*
* Ensure we have enough space in the output buffer for the message header
* as well as the message itself.
*/
required = PqSendPointer + 1 + 4 + len;
if (required > PqSendBufferSize)
{
PqSendBuffer = repalloc(PqSendBuffer, required);
PqSendBufferSize = required;
}
res = pq_putmessage(msgtype, s, len);
Assert(res == 0); /* should not fail when the message fits in buffer */
}
/* -------------------------------- /* --------------------------------
* pq_startcopyout - inform libpq that an old-style COPY OUT transfer * pq_startcopyout - inform libpq that an old-style COPY OUT transfer
* is beginning * is beginning

@ -193,19 +193,21 @@ DisownLatch(volatile Latch *latch)
bool bool
WaitLatch(volatile Latch *latch, long timeout) WaitLatch(volatile Latch *latch, long timeout)
{ {
return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0; return WaitLatchOrSocket(latch, PGINVALID_SOCKET, false, false, timeout) > 0;
} }
/* /*
* Like WaitLatch, but will also return when there's data available in * Like WaitLatch, but will also return when there's data available in
* 'sock' for reading. Returns 0 if timeout was reached, 1 if the latch * 'sock' for reading or writing. Returns 0 if timeout was reached,
* was set, or 2 if the scoket became readable. * 1 if the latch was set, 2 if the socket became readable or writable.
*/ */
int int
WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout) WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, bool forRead,
bool forWrite, long timeout)
{ {
struct timeval tv, *tvp = NULL; struct timeval tv, *tvp = NULL;
fd_set input_mask; fd_set input_mask;
fd_set output_mask;
int rc; int rc;
int result = 0; int result = 0;
@ -241,14 +243,22 @@ WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
FD_ZERO(&input_mask); FD_ZERO(&input_mask);
FD_SET(selfpipe_readfd, &input_mask); FD_SET(selfpipe_readfd, &input_mask);
hifd = selfpipe_readfd; hifd = selfpipe_readfd;
if (sock != PGINVALID_SOCKET) if (sock != PGINVALID_SOCKET && forRead)
{ {
FD_SET(sock, &input_mask); FD_SET(sock, &input_mask);
if (sock > hifd) if (sock > hifd)
hifd = sock; hifd = sock;
} }
rc = select(hifd + 1, &input_mask, NULL, NULL, tvp); FD_ZERO(&output_mask);
if (sock != PGINVALID_SOCKET && forWrite)
{
FD_SET(sock, &output_mask);
if (sock > hifd)
hifd = sock;
}
rc = select(hifd + 1, &input_mask, &output_mask, NULL, tvp);
if (rc < 0) if (rc < 0)
{ {
if (errno == EINTR) if (errno == EINTR)
@ -263,7 +273,9 @@ WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
result = 0; result = 0;
break; break;
} }
if (sock != PGINVALID_SOCKET && FD_ISSET(sock, &input_mask)) if (sock != PGINVALID_SOCKET &&
((forRead && FD_ISSET(sock, &input_mask)) ||
(forWrite && FD_ISSET(sock, &output_mask))))
{ {
result = 2; result = 2;
break; /* data available in socket */ break; /* data available in socket */

@ -14,7 +14,8 @@
#include "postgres.h" #include "postgres.h"
/* /*
* Indicate if pgwin32_recv() should operate in non-blocking mode. * Indicate if pgwin32_recv() and pgwin32_send() should operate
* in non-blocking mode.
* *
* Since the socket emulation layer always sets the actual socket to * Since the socket emulation layer always sets the actual socket to
* non-blocking mode in order to be able to deliver signals, we must * non-blocking mode in order to be able to deliver signals, we must
@ -399,6 +400,16 @@ pgwin32_send(SOCKET s, char *buf, int len, int flags)
return -1; return -1;
} }
if (pgwin32_noblock)
{
/*
* No data sent, and we are in "emulated non-blocking mode", so
* return indicating that we'd block if we were to continue.
*/
errno = EWOULDBLOCK;
return -1;
}
/* No error, zero bytes (win2000+) or error+WSAEWOULDBLOCK (<=nt4) */ /* No error, zero bytes (win2000+) or error+WSAEWOULDBLOCK (<=nt4) */
if (pgwin32_waitforsinglesocket(s, FD_WRITE | FD_CLOSE, INFINITE) == 0) if (pgwin32_waitforsinglesocket(s, FD_WRITE | FD_CLOSE, INFINITE) == 0)

@ -85,11 +85,12 @@ DisownLatch(volatile Latch *latch)
bool bool
WaitLatch(volatile Latch *latch, long timeout) WaitLatch(volatile Latch *latch, long timeout)
{ {
return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0; return WaitLatchOrSocket(latch, PGINVALID_SOCKET, false, false, timeout) > 0;
} }
int int
WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout) WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, bool forRead,
bool forWrite, long timeout)
{ {
DWORD rc; DWORD rc;
HANDLE events[3]; HANDLE events[3];
@ -103,10 +104,17 @@ WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
events[0] = latchevent; events[0] = latchevent;
events[1] = pgwin32_signal_event; events[1] = pgwin32_signal_event;
numevents = 2; numevents = 2;
if (sock != PGINVALID_SOCKET) if (sock != PGINVALID_SOCKET && (forRead || forWrite))
{ {
int flags = 0;
if (forRead)
flags |= FD_READ;
if (forWrite)
flags |= FD_WRITE;
sockevent = WSACreateEvent(); sockevent = WSACreateEvent();
WSAEventSelect(sock, sockevent, FD_READ); WSAEventSelect(sock, sockevent, flags);
events[numevents++] = sockevent; events[numevents++] = sockevent;
} }
@ -139,8 +147,18 @@ WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
pgwin32_dispatch_queued_signals(); pgwin32_dispatch_queued_signals();
else if (rc == WAIT_OBJECT_0 + 2) else if (rc == WAIT_OBJECT_0 + 2)
{ {
WSANETWORKEVENTS resEvents;
Assert(sock != PGINVALID_SOCKET); Assert(sock != PGINVALID_SOCKET);
result = 2;
ZeroMemory(&resEvents, sizeof(resEvents));
if (WSAEnumNetworkEvents(sock, sockevent, &resEvents) == SOCKET_ERROR)
ereport(FATAL,
(errmsg_internal("failed to enumerate network events: %i", (int) GetLastError())));
if ((forRead && resEvents.lNetworkEvents & FD_READ) ||
(forWrite && resEvents.lNetworkEvents & FD_WRITE))
result = 2;
break; break;
} }
else if (rc != WAIT_OBJECT_0) else if (rc != WAIT_OBJECT_0)
@ -148,7 +166,7 @@ WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
} }
/* Clean up the handle we created for the socket */ /* Clean up the handle we created for the socket */
if (sock != PGINVALID_SOCKET) if (sock != PGINVALID_SOCKET && (forRead || forWrite))
{ {
WSAEventSelect(sock, sockevent, 0); WSAEventSelect(sock, sockevent, 0);
WSACloseEvent(sockevent); WSACloseEvent(sockevent);

@ -74,6 +74,7 @@ bool am_walsender = false; /* Am I a walsender process ? */
/* User-settable parameters for walsender */ /* User-settable parameters for walsender */
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */ int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
int WalSndDelay = 1000; /* max sleep time between some actions */ int WalSndDelay = 1000; /* max sleep time between some actions */
int replication_timeout = 60 * 1000; /* maximum time to send one WAL data message */
/* /*
* These variables are used similarly to openLogFile/Id/Seg/Off, * These variables are used similarly to openLogFile/Id/Seg/Off,
@ -95,6 +96,11 @@ static XLogRecPtr sentPtr = {0, 0};
*/ */
static StringInfoData reply_message; static StringInfoData reply_message;
/*
* Timestamp of the last receipt of the reply from the standby.
*/
static TimestampTz last_reply_timestamp;
/* Flags set by signal handlers for later service in main loop */ /* Flags set by signal handlers for later service in main loop */
static volatile sig_atomic_t got_SIGHUP = false; static volatile sig_atomic_t got_SIGHUP = false;
volatile sig_atomic_t walsender_shutdown_requested = false; volatile sig_atomic_t walsender_shutdown_requested = false;
@ -113,7 +119,7 @@ static int WalSndLoop(void);
static void InitWalSnd(void); static void InitWalSnd(void);
static void WalSndHandshake(void); static void WalSndHandshake(void);
static void WalSndKill(int code, Datum arg); static void WalSndKill(int code, Datum arg);
static bool XLogSend(char *msgbuf, bool *caughtup); static void XLogSend(char *msgbuf, bool *caughtup);
static void IdentifySystem(void); static void IdentifySystem(void);
static void StartReplication(StartReplicationCmd * cmd); static void StartReplication(StartReplicationCmd * cmd);
static void ProcessStandbyMessage(void); static void ProcessStandbyMessage(void);
@ -469,6 +475,7 @@ ProcessRepliesIfAny(void)
{ {
unsigned char firstchar; unsigned char firstchar;
int r; int r;
int received = false;
for (;;) for (;;)
{ {
@ -484,7 +491,7 @@ ProcessRepliesIfAny(void)
if (r == 0) if (r == 0)
{ {
/* no data available without blocking */ /* no data available without blocking */
return; break;
} }
/* Handle the very limited subset of commands expected in this phase */ /* Handle the very limited subset of commands expected in this phase */
@ -495,6 +502,7 @@ ProcessRepliesIfAny(void)
*/ */
case 'd': case 'd':
ProcessStandbyMessage(); ProcessStandbyMessage();
received = true;
break; break;
/* /*
@ -510,6 +518,12 @@ ProcessRepliesIfAny(void)
firstchar))); firstchar)));
} }
} }
/*
* Save the last reply timestamp if we've received at least
* one reply.
*/
if (received)
last_reply_timestamp = GetCurrentTimestamp();
} }
/* /*
@ -688,6 +702,9 @@ WalSndLoop(void)
*/ */
initStringInfo(&reply_message); initStringInfo(&reply_message);
/* Initialize the last reply timestamp */
last_reply_timestamp = GetCurrentTimestamp();
/* Loop forever, unless we get an error */ /* Loop forever, unless we get an error */
for (;;) for (;;)
{ {
@ -706,19 +723,6 @@ WalSndLoop(void)
SyncRepInitConfig(); SyncRepInitConfig();
} }
/*
* When SIGUSR2 arrives, we send all outstanding logs up to the
* shutdown checkpoint record (i.e., the latest record) and exit.
*/
if (walsender_ready_to_stop)
{
if (!XLogSend(output_message, &caughtup))
break;
ProcessRepliesIfAny();
if (caughtup)
walsender_shutdown_requested = true;
}
/* Normal exit from the walsender is here */ /* Normal exit from the walsender is here */
if (walsender_shutdown_requested) if (walsender_shutdown_requested)
{ {
@ -730,11 +734,13 @@ WalSndLoop(void)
} }
/* /*
* If we had sent all accumulated WAL in last round, nap for the * If we don't have any pending data in the output buffer, try to
* configured time before retrying. * send some more.
*/ */
if (caughtup) if (!pq_is_send_pending())
{ {
XLogSend(output_message, &caughtup);
/* /*
* Even if we wrote all the WAL that was available when we started * Even if we wrote all the WAL that was available when we started
* sending, more might have arrived while we were sending this * sending, more might have arrived while we were sending this
@ -742,28 +748,79 @@ WalSndLoop(void)
* received any signals from that time. Let's arm the latch * received any signals from that time. Let's arm the latch
* again, and after that check that we're still up-to-date. * again, and after that check that we're still up-to-date.
*/ */
ResetLatch(&MyWalSnd->latch); if (caughtup && !pq_is_send_pending())
if (!XLogSend(output_message, &caughtup))
break;
if (caughtup && !got_SIGHUP && !walsender_ready_to_stop && !walsender_shutdown_requested)
{ {
/* ResetLatch(&MyWalSnd->latch);
* XXX: We don't really need the periodic wakeups anymore,
* WaitLatchOrSocket should reliably wake up as soon as
* something interesting happens.
*/
/* Sleep */ XLogSend(output_message, &caughtup);
WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
WalSndDelay * 1000L);
} }
} }
else
/* Flush pending output to the client */
if (pq_flush_if_writable() != 0)
break;
/*
* When SIGUSR2 arrives, we send any outstanding logs up to the
* shutdown checkpoint record (i.e., the latest record) and exit.
*/
if (walsender_ready_to_stop && !pq_is_send_pending())
{ {
/* Attempt to send the log once every loop */ XLogSend(output_message, &caughtup);
if (!XLogSend(output_message, &caughtup)) ProcessRepliesIfAny();
if (caughtup && !pq_is_send_pending())
walsender_shutdown_requested = true;
}
if ((caughtup || pq_is_send_pending()) &&
!got_SIGHUP &&
!walsender_shutdown_requested)
{
TimestampTz finish_time;
long sleeptime;
/* Reschedule replication timeout */
if (replication_timeout > 0)
{
long secs;
int usecs;
finish_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
replication_timeout);
TimestampDifference(GetCurrentTimestamp(),
finish_time, &secs, &usecs);
sleeptime = secs * 1000 + usecs / 1000;
if (WalSndDelay < sleeptime)
sleeptime = WalSndDelay;
}
else
{
/*
* XXX: Without timeout, we don't really need the periodic
* wakeups anymore, WaitLatchOrSocket should reliably wake up
* as soon as something interesting happens.
*/
sleeptime = WalSndDelay;
}
/* Sleep */
WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
true, pq_is_send_pending(),
sleeptime * 1000L);
/* Check for replication timeout */
if (replication_timeout > 0 &&
GetCurrentTimestamp() >= finish_time)
{
/*
* Since typically expiration of replication timeout means
* communication problem, we don't send the error message
* to the standby.
*/
ereport(COMMERROR,
(errmsg("terminating walsender process due to replication timeout")));
break; break;
}
} }
/* /*
@ -993,7 +1050,8 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
/* /*
* Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk, * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
* but not yet sent to the client, and send it. * but not yet sent to the client, and buffer it in the libpq output
* buffer.
* *
* msgbuf is a work area in which the output message is constructed. It's * msgbuf is a work area in which the output message is constructed. It's
* passed in just so we can avoid re-palloc'ing the buffer on each cycle. * passed in just so we can avoid re-palloc'ing the buffer on each cycle.
@ -1001,10 +1059,9 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
* *
* If there is no unsent WAL remaining, *caughtup is set to true, otherwise * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
* *caughtup is set to false. * *caughtup is set to false.
*
* Returns true if OK, false if trouble.
*/ */
static bool static void
XLogSend(char *msgbuf, bool *caughtup) XLogSend(char *msgbuf, bool *caughtup)
{ {
XLogRecPtr SendRqstPtr; XLogRecPtr SendRqstPtr;
@ -1027,7 +1084,7 @@ XLogSend(char *msgbuf, bool *caughtup)
if (XLByteLE(SendRqstPtr, sentPtr)) if (XLByteLE(SendRqstPtr, sentPtr))
{ {
*caughtup = true; *caughtup = true;
return true; return;
} }
/* /*
@ -1099,11 +1156,7 @@ XLogSend(char *msgbuf, bool *caughtup)
memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader)); memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader));
pq_putmessage('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes); pq_putmessage_noblock('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
/* Flush pending output to the client */
if (pq_flush())
return false;
sentPtr = endptr; sentPtr = endptr;
@ -1127,7 +1180,7 @@ XLogSend(char *msgbuf, bool *caughtup)
set_ps_display(activitymsg, false); set_ps_display(activitymsg, false);
} }
return true; return;
} }
/* SIGHUP: set flag to re-read config file at next convenient time */ /* SIGHUP: set flag to re-read config file at next convenient time */

@ -1855,6 +1855,16 @@ static struct config_int ConfigureNamesInt[] =
1000, 1, 10000, NULL, NULL 1000, 1, 10000, NULL, NULL
}, },
{
{"replication_timeout", PGC_SIGHUP, WAL_REPLICATION,
gettext_noop("Sets the maximum time to wait for WAL replication."),
NULL,
GUC_UNIT_MS
},
&replication_timeout,
60 * 1000, 0, INT_MAX, NULL, NULL
},
{ {
{"commit_delay", PGC_USERSET, WAL_SETTINGS, {"commit_delay", PGC_USERSET, WAL_SETTINGS,
gettext_noop("Sets the delay in microseconds between transaction commit and " gettext_noop("Sets the delay in microseconds between transaction commit and "

@ -200,6 +200,7 @@
#wal_sender_delay = 1s # walsender cycle time, 1-10000 milliseconds #wal_sender_delay = 1s # walsender cycle time, 1-10000 milliseconds
#wal_keep_segments = 0 # in logfile segments, 16MB each; 0 disables #wal_keep_segments = 0 # in logfile segments, 16MB each; 0 disables
#vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed #vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed
#replication_timeout = 60s # in milliseconds, 0 is disabled
# - Standby Servers - # - Standby Servers -

@ -60,7 +60,10 @@ extern int pq_peekbyte(void);
extern int pq_getbyte_if_available(unsigned char *c); extern int pq_getbyte_if_available(unsigned char *c);
extern int pq_putbytes(const char *s, size_t len); extern int pq_putbytes(const char *s, size_t len);
extern int pq_flush(void); extern int pq_flush(void);
extern int pq_flush_if_writable(void);
extern bool pq_is_send_pending(void);
extern int pq_putmessage(char msgtype, const char *s, size_t len); extern int pq_putmessage(char msgtype, const char *s, size_t len);
extern void pq_putmessage_noblock(char msgtype, const char *s, size_t len);
extern void pq_startcopyout(void); extern void pq_startcopyout(void);
extern void pq_endcopyout(bool errorAbort); extern void pq_endcopyout(bool errorAbort);

@ -98,6 +98,7 @@ extern volatile sig_atomic_t walsender_ready_to_stop;
/* user-settable parameters */ /* user-settable parameters */
extern int WalSndDelay; extern int WalSndDelay;
extern int max_wal_senders; extern int max_wal_senders;
extern int replication_timeout;
extern int WalSenderMain(void); extern int WalSenderMain(void);
extern void WalSndSignals(void); extern void WalSndSignals(void);

@ -40,7 +40,7 @@ extern void OwnLatch(volatile Latch *latch);
extern void DisownLatch(volatile Latch *latch); extern void DisownLatch(volatile Latch *latch);
extern bool WaitLatch(volatile Latch *latch, long timeout); extern bool WaitLatch(volatile Latch *latch, long timeout);
extern int WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, extern int WaitLatchOrSocket(volatile Latch *latch, pgsocket sock,
long timeout); bool forRead, bool forWrite, long timeout);
extern void SetLatch(volatile Latch *latch); extern void SetLatch(volatile Latch *latch);
extern void ResetLatch(volatile Latch *latch); extern void ResetLatch(volatile Latch *latch);
#define TestLatch(latch) (((volatile Latch *) latch)->is_set) #define TestLatch(latch) (((volatile Latch *) latch)->is_set)

Loading…
Cancel
Save