|
|
|
|
@ -65,6 +65,7 @@ |
|
|
|
|
#include "funcapi.h" |
|
|
|
|
#include "libpq/libpq.h" |
|
|
|
|
#include "libpq/pqformat.h" |
|
|
|
|
#include "libpq/protocol.h" |
|
|
|
|
#include "miscadmin.h" |
|
|
|
|
#include "nodes/replnodes.h" |
|
|
|
|
#include "pgstat.h" |
|
|
|
|
@ -735,13 +736,13 @@ HandleUploadManifestPacket(StringInfo buf, off_t *offset, |
|
|
|
|
|
|
|
|
|
switch (mtype) |
|
|
|
|
{ |
|
|
|
|
case 'd': /* CopyData */ |
|
|
|
|
case PqMsg_CopyData: |
|
|
|
|
maxmsglen = PQ_LARGE_MESSAGE_LIMIT; |
|
|
|
|
break; |
|
|
|
|
case 'c': /* CopyDone */ |
|
|
|
|
case 'f': /* CopyFail */ |
|
|
|
|
case 'H': /* Flush */ |
|
|
|
|
case 'S': /* Sync */ |
|
|
|
|
case PqMsg_CopyDone: |
|
|
|
|
case PqMsg_CopyFail: |
|
|
|
|
case PqMsg_Flush: |
|
|
|
|
case PqMsg_Sync: |
|
|
|
|
maxmsglen = PQ_SMALL_MESSAGE_LIMIT; |
|
|
|
|
break; |
|
|
|
|
default: |
|
|
|
|
@ -763,19 +764,19 @@ HandleUploadManifestPacket(StringInfo buf, off_t *offset, |
|
|
|
|
/* Process the message */ |
|
|
|
|
switch (mtype) |
|
|
|
|
{ |
|
|
|
|
case 'd': /* CopyData */ |
|
|
|
|
case PqMsg_CopyData: |
|
|
|
|
AppendIncrementalManifestData(ib, buf->data, buf->len); |
|
|
|
|
return true; |
|
|
|
|
|
|
|
|
|
case 'c': /* CopyDone */ |
|
|
|
|
case PqMsg_CopyDone: |
|
|
|
|
return false; |
|
|
|
|
|
|
|
|
|
case 'H': /* Sync */ |
|
|
|
|
case 'S': /* Flush */ |
|
|
|
|
case PqMsg_Sync: |
|
|
|
|
case PqMsg_Flush: |
|
|
|
|
/* Ignore these while in CopyOut mode as we do elsewhere. */ |
|
|
|
|
return true; |
|
|
|
|
|
|
|
|
|
case 'f': |
|
|
|
|
case PqMsg_CopyFail: |
|
|
|
|
ereport(ERROR, |
|
|
|
|
(errcode(ERRCODE_QUERY_CANCELED), |
|
|
|
|
errmsg("COPY from stdin failed: %s", |
|
|
|
|
@ -1569,7 +1570,7 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, |
|
|
|
|
tmpbuf.data, sizeof(int64)); |
|
|
|
|
|
|
|
|
|
/* output previously gathered data in a CopyData packet */ |
|
|
|
|
pq_putmessage_noblock('d', ctx->out->data, ctx->out->len); |
|
|
|
|
pq_putmessage_noblock(PqMsg_CopyData, ctx->out->data, ctx->out->len); |
|
|
|
|
|
|
|
|
|
CHECK_FOR_INTERRUPTS(); |
|
|
|
|
|
|
|
|
|
@ -2305,7 +2306,7 @@ ProcessRepliesIfAny(void) |
|
|
|
|
case PqMsg_CopyDone: |
|
|
|
|
if (!streamingDoneSending) |
|
|
|
|
{ |
|
|
|
|
pq_putmessage_noblock('c', NULL, 0); |
|
|
|
|
pq_putmessage_noblock(PqMsg_CopyDone, NULL, 0); |
|
|
|
|
streamingDoneSending = true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -2758,7 +2759,7 @@ ProcessStandbyPSRequestMessage(void) |
|
|
|
|
pq_sendint64(&output_message, GetCurrentTimestamp()); |
|
|
|
|
|
|
|
|
|
/* ... and send it wrapped in CopyData */ |
|
|
|
|
pq_putmessage_noblock('d', output_message.data, output_message.len); |
|
|
|
|
pq_putmessage_noblock(PqMsg_CopyData, output_message.data, output_message.len); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
@ -3306,7 +3307,7 @@ XLogSendPhysical(void) |
|
|
|
|
wal_segment_close(xlogreader); |
|
|
|
|
|
|
|
|
|
/* Send CopyDone */ |
|
|
|
|
pq_putmessage_noblock('c', NULL, 0); |
|
|
|
|
pq_putmessage_noblock(PqMsg_CopyDone, NULL, 0); |
|
|
|
|
streamingDoneSending = true; |
|
|
|
|
|
|
|
|
|
WalSndCaughtUp = true; |
|
|
|
|
@ -3434,7 +3435,7 @@ retry: |
|
|
|
|
memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)], |
|
|
|
|
tmpbuf.data, sizeof(int64)); |
|
|
|
|
|
|
|
|
|
pq_putmessage_noblock('d', output_message.data, output_message.len); |
|
|
|
|
pq_putmessage_noblock(PqMsg_CopyData, output_message.data, output_message.len); |
|
|
|
|
|
|
|
|
|
sentPtr = endptr; |
|
|
|
|
|
|
|
|
|
@ -4140,7 +4141,7 @@ WalSndKeepalive(bool requestReply, XLogRecPtr writePtr) |
|
|
|
|
pq_sendbyte(&output_message, requestReply ? 1 : 0); |
|
|
|
|
|
|
|
|
|
/* ... and send it wrapped in CopyData */ |
|
|
|
|
pq_putmessage_noblock('d', output_message.data, output_message.len); |
|
|
|
|
pq_putmessage_noblock(PqMsg_CopyData, output_message.data, output_message.len); |
|
|
|
|
|
|
|
|
|
/* Set local flag */ |
|
|
|
|
if (requestReply) |
|
|
|
|
|