|
|
|
@ -41,8 +41,8 @@ static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream, |
|
|
|
|
XLogRecPtr *stoppos); |
|
|
|
|
static int CopyStreamPoll(PGconn *conn, long timeout_ms); |
|
|
|
|
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer); |
|
|
|
|
static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, |
|
|
|
|
XLogRecPtr blockpos, int64 *last_status); |
|
|
|
|
static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, |
|
|
|
|
int len, XLogRecPtr blockpos, int64 *last_status); |
|
|
|
|
static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, |
|
|
|
|
XLogRecPtr *blockpos); |
|
|
|
|
static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf, |
|
|
|
@ -56,7 +56,7 @@ static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, |
|
|
|
|
uint32 *timeline); |
|
|
|
|
|
|
|
|
|
static bool |
|
|
|
|
mark_file_as_archived(const char *basedir, const char *fname) |
|
|
|
|
mark_file_as_archived(const char *basedir, const char *fname, bool do_sync) |
|
|
|
|
{ |
|
|
|
|
int fd; |
|
|
|
|
static char tmppath[MAXPGPATH]; |
|
|
|
@ -74,10 +74,10 @@ mark_file_as_archived(const char *basedir, const char *fname) |
|
|
|
|
|
|
|
|
|
close(fd); |
|
|
|
|
|
|
|
|
|
if (fsync_fname(tmppath, false, progname) != 0) |
|
|
|
|
if (do_sync && fsync_fname(tmppath, false, progname) != 0) |
|
|
|
|
return false; |
|
|
|
|
|
|
|
|
|
if (fsync_parent_path(tmppath, progname) != 0) |
|
|
|
|
if (do_sync && fsync_parent_path(tmppath, progname) != 0) |
|
|
|
|
return false; |
|
|
|
|
|
|
|
|
|
return true; |
|
|
|
@ -134,9 +134,9 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint) |
|
|
|
|
* fsync, in case of a previous crash between padding and fsyncing the |
|
|
|
|
* file. |
|
|
|
|
*/ |
|
|
|
|
if (fsync_fname(fn, false, progname) != 0) |
|
|
|
|
if (stream->do_sync && fsync_fname(fn, false, progname) != 0) |
|
|
|
|
return false; |
|
|
|
|
if (fsync_parent_path(fn, progname) != 0) |
|
|
|
|
if (stream->do_sync && fsync_parent_path(fn, progname) != 0) |
|
|
|
|
return false; |
|
|
|
|
|
|
|
|
|
return true; |
|
|
|
@ -173,9 +173,9 @@ open_walfile(StreamCtl *stream, XLogRecPtr startpoint) |
|
|
|
|
* using synchronous mode, where the file is modified and fsynced |
|
|
|
|
* in-place, without a directory fsync. |
|
|
|
|
*/ |
|
|
|
|
if (fsync_fname(fn, false, progname) != 0) |
|
|
|
|
if (stream->do_sync && fsync_fname(fn, false, progname) != 0) |
|
|
|
|
return false; |
|
|
|
|
if (fsync_parent_path(fn, progname) != 0) |
|
|
|
|
if (stream->do_sync && fsync_parent_path(fn, progname) != 0) |
|
|
|
|
return false; |
|
|
|
|
|
|
|
|
|
if (lseek(f, SEEK_SET, 0) != 0) |
|
|
|
@ -212,7 +212,7 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos) |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (fsync(walfile) != 0) |
|
|
|
|
if (stream->do_sync && fsync(walfile) != 0) |
|
|
|
|
{ |
|
|
|
|
fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"), |
|
|
|
|
progname, current_walfile_name, strerror(errno)); |
|
|
|
@ -258,7 +258,8 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos) |
|
|
|
|
if (currpos == XLOG_SEG_SIZE && stream->mark_done) |
|
|
|
|
{ |
|
|
|
|
/* writes error message if failed */ |
|
|
|
|
if (!mark_file_as_archived(stream->basedir, current_walfile_name)) |
|
|
|
|
if (!mark_file_as_archived(stream->basedir, current_walfile_name, |
|
|
|
|
stream->do_sync)) |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -378,7 +379,8 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content) |
|
|
|
|
if (stream->mark_done) |
|
|
|
|
{ |
|
|
|
|
/* writes error message if failed */ |
|
|
|
|
if (!mark_file_as_archived(stream->basedir, histfname)) |
|
|
|
|
if (!mark_file_as_archived(stream->basedir, histfname, |
|
|
|
|
stream->do_sync)) |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -836,7 +838,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream, |
|
|
|
|
*/ |
|
|
|
|
if (stream->synchronous && lastFlushPosition < blockpos && walfile != -1) |
|
|
|
|
{ |
|
|
|
|
if (fsync(walfile) != 0) |
|
|
|
|
if (stream->do_sync && fsync(walfile) != 0) |
|
|
|
|
{ |
|
|
|
|
fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"), |
|
|
|
|
progname, current_walfile_name, strerror(errno)); |
|
|
|
@ -890,7 +892,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream, |
|
|
|
|
/* Check the message type. */ |
|
|
|
|
if (copybuf[0] == 'k') |
|
|
|
|
{ |
|
|
|
|
if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos, |
|
|
|
|
if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos, |
|
|
|
|
&last_status)) |
|
|
|
|
goto error; |
|
|
|
|
} |
|
|
|
@ -1043,7 +1045,7 @@ CopyStreamReceive(PGconn *conn, long timeout, char **buffer) |
|
|
|
|
* Process the keepalive message. |
|
|
|
|
*/ |
|
|
|
|
static bool |
|
|
|
|
ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, |
|
|
|
|
ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, |
|
|
|
|
XLogRecPtr blockpos, int64 *last_status) |
|
|
|
|
{ |
|
|
|
|
int pos; |
|
|
|
@ -1079,7 +1081,7 @@ ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, |
|
|
|
|
* data has been successfully replicated or not, at the normal |
|
|
|
|
* shutdown of the server. |
|
|
|
|
*/ |
|
|
|
|
if (fsync(walfile) != 0) |
|
|
|
|
if (stream->do_sync && fsync(walfile) != 0) |
|
|
|
|
{ |
|
|
|
|
fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"), |
|
|
|
|
progname, current_walfile_name, strerror(errno)); |
|
|
|
|