|
|
|
@ -1428,10 +1428,8 @@ WalSndWaitForWal(XLogRecPtr loc) |
|
|
|
|
/*
|
|
|
|
|
* We only send regular messages to the client for full decoded |
|
|
|
|
* transactions, but a synchronous replication and walsender shutdown |
|
|
|
|
* possibly are waiting for a later location. So, before sleeping, we |
|
|
|
|
* send a ping containing the flush location. If the receiver is |
|
|
|
|
* otherwise idle, this keepalive will trigger a reply. Processing the |
|
|
|
|
* reply will update these MyWalSnd locations. |
|
|
|
|
* possibly are waiting for a later location. So we send pings |
|
|
|
|
* containing the flush location every now and then. |
|
|
|
|
*/ |
|
|
|
|
if (MyWalSnd->flush < sentPtr && |
|
|
|
|
MyWalSnd->write < sentPtr && |
|
|
|
@ -2316,16 +2314,20 @@ WalSndLoop(WalSndSendDataCallback send_data) |
|
|
|
|
WalSndKeepaliveIfNecessary(); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Block if we have unsent data. Let WalSndWaitForWal() handle any |
|
|
|
|
* other blocking; idle receivers need its additional actions. |
|
|
|
|
* We don't block if not caught up, unless there is unsent data |
|
|
|
|
* pending in which case we'd better block until the socket is |
|
|
|
|
* write-ready. This test is only needed for the case where the |
|
|
|
|
* send_data callback handled a subset of the available data but then |
|
|
|
|
* pq_flush_if_writable flushed it all --- we should immediately try |
|
|
|
|
* to send more. |
|
|
|
|
*/ |
|
|
|
|
if (pq_is_send_pending()) |
|
|
|
|
if ((WalSndCaughtUp && !streamingDoneSending) || pq_is_send_pending()) |
|
|
|
|
{ |
|
|
|
|
long sleeptime; |
|
|
|
|
int wakeEvents; |
|
|
|
|
|
|
|
|
|
wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT | |
|
|
|
|
WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE; |
|
|
|
|
WL_SOCKET_READABLE; |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Use fresh timestamp, not last_processing, to reduce the chance |
|
|
|
@ -2333,6 +2335,9 @@ WalSndLoop(WalSndSendDataCallback send_data) |
|
|
|
|
*/ |
|
|
|
|
sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); |
|
|
|
|
|
|
|
|
|
if (pq_is_send_pending()) |
|
|
|
|
wakeEvents |= WL_SOCKET_WRITEABLE; |
|
|
|
|
|
|
|
|
|
/* Sleep until something happens or we time out */ |
|
|
|
|
(void) WaitLatchOrSocket(MyLatch, wakeEvents, |
|
|
|
|
MyProcPort->sock, sleeptime, |
|
|
|
|