@ -37,7 +37,7 @@ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline , char * basedir ,
stream_stop_callback stream_stop , int standby_message_timeout ,
char * partial_suffix , XLogRecPtr * stoppos ,
bool synchronous ) ;
bool synchronous , bool mark_done ) ;
static int CopyStreamPoll ( PGconn * conn , long timeout_ms ) ;
static int CopyStreamReceive ( PGconn * conn , long timeout , char * * buffer ) ;
static bool ProcessKeepaliveMsg ( PGconn * conn , char * copybuf , int len ,
@ -45,20 +45,50 @@ static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
static bool ProcessXLogDataMsg ( PGconn * conn , char * copybuf , int len ,
XLogRecPtr * blockpos , uint32 timeline ,
char * basedir , stream_stop_callback stream_stop ,
char * partial_suffix ) ;
char * partial_suffix , bool mark_done ) ;
static PGresult * HandleEndOfCopyStream ( PGconn * conn , char * copybuf ,
XLogRecPtr blockpos , char * basedir , char * partial_suffix ,
XLogRecPtr * stoppos ) ;
XLogRecPtr * stoppos , bool mark_done ) ;
static bool CheckCopyStreamStop ( PGconn * conn , XLogRecPtr blockpos ,
uint32 timeline , char * basedir ,
stream_stop_callback stream_stop ,
char * partial_suffix , XLogRecPtr * stoppos ) ;
char * partial_suffix , XLogRecPtr * stoppos ,
bool mark_done ) ;
static long CalculateCopyStreamSleeptime ( int64 now , int standby_message_timeout ,
int64 last_status ) ;
static bool ReadEndOfStreamingResult ( PGresult * res , XLogRecPtr * startpos ,
uint32 * timeline ) ;
static bool
mark_file_as_archived ( const char * basedir , const char * fname )
{
int fd ;
static char tmppath [ MAXPGPATH ] ;
snprintf ( tmppath , sizeof ( tmppath ) , " %s/archive_status/%s.done " ,
basedir , fname ) ;
fd = open ( tmppath , O_WRONLY | O_CREAT | PG_BINARY , S_IRUSR | S_IWUSR ) ;
if ( fd < 0 )
{
fprintf ( stderr , _ ( " %s: could not create archive status file \" %s \" : %s \n " ) ,
progname , tmppath , strerror ( errno ) ) ;
return false ;
}
if ( fsync ( fd ) ! = 0 )
{
fprintf ( stderr , _ ( " %s: could not fsync file \" %s \" : %s \n " ) ,
progname , tmppath , strerror ( errno ) ) ;
return false ;
}
close ( fd ) ;
return true ;
}
/*
* Open a new WAL file in the specified directory .
*
@ -152,7 +182,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
* and returns false , otherwise returns true .
*/
static bool
close_walfile ( char * basedir , char * partial_suffix , XLogRecPtr pos )
close_walfile ( char * basedir , char * partial_suffix , XLogRecPtr pos , bool mark_done )
{
off_t currpos ;
@ -206,6 +236,19 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
_ ( " %s: not renaming \" %s%s \" , segment is not complete \n " ) ,
progname , current_walfile_name , partial_suffix ) ;
/*
* Mark file as archived if requested by the caller - pg_basebackup needs
* to do so as files can otherwise get archived again after promotion of a
* new node . This is in line with walreceiver . c always doing a
* XLogArchiveForceDone ( ) after a complete segment .
*/
if ( currpos = = XLOG_SEG_SIZE & & mark_done )
{
/* writes error message if failed */
if ( ! mark_file_as_archived ( basedir , current_walfile_name ) )
return false ;
}
lastFlushPosition = pos ;
return true ;
}
@ -248,7 +291,8 @@ existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
}
static bool
writeTimeLineHistoryFile ( char * basedir , TimeLineID tli , char * filename , char * content )
writeTimeLineHistoryFile ( char * basedir , TimeLineID tli , char * filename ,
char * content , bool mark_done )
{
int size = strlen ( content ) ;
char path [ MAXPGPATH ] ;
@ -327,6 +371,14 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *co
return false ;
}
/* Maintain archive_status, check close_walfile() for details. */
if ( mark_done )
{
/* writes error message if failed */
if ( ! mark_file_as_archived ( basedir , histfname ) )
return false ;
}
return true ;
}
@ -447,7 +499,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char * sysidentifier , char * basedir ,
stream_stop_callback stream_stop ,
int standby_message_timeout , char * partial_suffix ,
bool synchronous )
bool synchronous , bool mark_done )
{
char query [ 128 ] ;
char slotcmd [ 128 ] ;
@ -562,7 +614,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* Write the history file to disk */
writeTimeLineHistoryFile ( basedir , timeline ,
PQgetvalue ( res , 0 , 0 ) ,
PQgetvalue ( res , 0 , 1 ) ) ;
PQgetvalue ( res , 0 , 1 ) ,
mark_done ) ;
PQclear ( res ) ;
}
@ -592,7 +645,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* Stream the WAL */
res = HandleCopyStream ( conn , startpos , timeline , basedir , stream_stop ,
standby_message_timeout , partial_suffix ,
& stoppos , synchronous ) ;
& stoppos , synchronous , mark_done ) ;
if ( res = = NULL )
goto error ;
@ -757,7 +810,7 @@ static PGresult *
HandleCopyStream ( PGconn * conn , XLogRecPtr startpos , uint32 timeline ,
char * basedir , stream_stop_callback stream_stop ,
int standby_message_timeout , char * partial_suffix ,
XLogRecPtr * stoppos , bool synchronous )
XLogRecPtr * stoppos , bool synchronous , bool mark_done )
{
char * copybuf = NULL ;
int64 last_status = - 1 ;
@ -775,7 +828,8 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* Check if we should continue streaming , or abort at this point .
*/
if ( ! CheckCopyStreamStop ( conn , blockpos , timeline , basedir ,
stream_stop , partial_suffix , stoppos ) )
stream_stop , partial_suffix , stoppos ,
mark_done ) )
goto error ;
now = feGetCurrentTimestamp ( ) ;
@ -830,7 +884,8 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
if ( r = = - 2 )
{
PGresult * res = HandleEndOfCopyStream ( conn , copybuf , blockpos ,
basedir , partial_suffix , stoppos ) ;
basedir , partial_suffix ,
stoppos , mark_done ) ;
if ( res = = NULL )
goto error ;
else
@ -847,14 +902,16 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
else if ( copybuf [ 0 ] = = ' w ' )
{
if ( ! ProcessXLogDataMsg ( conn , copybuf , r , & blockpos ,
timeline , basedir , stream_stop , partial_suffix ) )
timeline , basedir , stream_stop ,
partial_suffix , true ) )
goto error ;
/*
* Check if we should continue streaming , or abort at this point .
*/
if ( ! CheckCopyStreamStop ( conn , blockpos , timeline , basedir ,
stream_stop , partial_suffix , stoppos ) )
stream_stop , partial_suffix , stoppos ,
mark_done ) )
goto error ;
}
else
@ -1055,7 +1112,7 @@ static bool
ProcessXLogDataMsg ( PGconn * conn , char * copybuf , int len ,
XLogRecPtr * blockpos , uint32 timeline ,
char * basedir , stream_stop_callback stream_stop ,
char * partial_suffix )
char * partial_suffix , bool mark_done )
{
int xlogoff ;
int bytes_left ;
@ -1163,7 +1220,7 @@ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
/* Did we reach the end of a WAL segment? */
if ( * blockpos % XLOG_SEG_SIZE = = 0 )
{
if ( ! close_walfile ( basedir , partial_suffix , * blockpos ) )
if ( ! close_walfile ( basedir , partial_suffix , * blockpos , mark_done ) )
/* Error message written in close_walfile() */
return false ;
@ -1193,7 +1250,7 @@ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
static PGresult *
HandleEndOfCopyStream ( PGconn * conn , char * copybuf ,
XLogRecPtr blockpos , char * basedir , char * partial_suffix ,
XLogRecPtr * stoppos )
XLogRecPtr * stoppos , bool mark_done )
{
PGresult * res = PQgetResult ( conn ) ;
@ -1204,7 +1261,7 @@ HandleEndOfCopyStream(PGconn *conn, char *copybuf,
*/
if ( still_sending )
{
if ( ! close_walfile ( basedir , partial_suffix , blockpos ) )
if ( ! close_walfile ( basedir , partial_suffix , blockpos , mark_done ) )
{
/* Error message written in close_walfile() */
PQclear ( res ) ;
@ -1236,11 +1293,11 @@ HandleEndOfCopyStream(PGconn *conn, char *copybuf,
static bool
CheckCopyStreamStop ( PGconn * conn , XLogRecPtr blockpos , uint32 timeline ,
char * basedir , stream_stop_callback stream_stop ,
char * partial_suffix , XLogRecPtr * stoppos )
char * partial_suffix , XLogRecPtr * stoppos , bool mark_done )
{
if ( still_sending & & stream_stop ( blockpos , timeline , false ) )
{
if ( ! close_walfile ( basedir , partial_suffix , blockpos ) )
if ( ! close_walfile ( basedir , partial_suffix , blockpos , mark_done ) )
{
/* Potential error message is written by close_walfile */
return false ;