@ -5,11 +5,10 @@
* The WAL sender process ( walsender ) is new as of Postgres 9.0 . It takes
* charge of XLOG streaming sender in the primary server . At first , it is
* started by the postmaster when the walreceiver in the standby server
* connects to the primary server and requests XLOG streaming replication ,
* i . e . , unlike any auxiliary process , it is not an always - running process .
* connects to the primary server and requests XLOG streaming replication .
* It attempts to keep reading XLOG records from the disk and sending them
* to the standby server , as long as the connection is alive ( i . e . , like
* any backend , there is an one to one relationship between a connection
* any backend , there is a one - to - one relationship between a connection
* and a walsender process ) .
*
* Normal termination is by SIGTERM , which instructs the walsender to
@ -30,7 +29,7 @@
*
*
* IDENTIFICATION
* $ PostgreSQL : pgsql / src / backend / replication / walsender . c , v 1.24 2010 / 06 / 03 21 : 02 : 12 petere Exp $
* $ PostgreSQL : pgsql / src / backend / replication / walsender . c , v 1.25 2010 / 06 / 03 22 : 17 : 32 tgl Exp $
*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
@ -44,6 +43,7 @@
# include "libpq/pqformat.h"
# include "libpq/pqsignal.h"
# include "miscadmin.h"
# include "replication/walprotocol.h"
# include "replication/walsender.h"
# include "storage/fd.h"
# include "storage/ipc.h"
@ -80,7 +80,7 @@ static uint32 sendOff = 0;
/*
* How far have we sent WAL already ? This is also advertised in
* MyWalSnd - > sentPtr .
* MyWalSnd - > sentPtr . ( Actually , this is the next WAL location to send . )
*/
static XLogRecPtr sentPtr = { 0 , 0 } ;
@ -100,19 +100,9 @@ static void InitWalSnd(void);
static void WalSndHandshake ( void ) ;
static void WalSndKill ( int code , Datum arg ) ;
static void XLogRead ( char * buf , XLogRecPtr recptr , Size nbytes ) ;
static bool XLogSend ( StringInfo outMsg , bool * caughtup ) ;
static bool XLogSend ( char * msgbuf , bool * caughtup ) ;
static void CheckClosedConnection ( void ) ;
/*
* How much WAL to send in one message ? Must be > = XLOG_BLCKSZ .
*
* We don ' t have a good idea of what a good value would be ; there ' s some
* overhead per message in both walsender and walreceiver , but on the other
* hand sending large batches makes walsender less responsive to signals
* because signals are checked only between messages . 128 kB ( with
* default 8 k blocks ) seems like a reasonable guess for now .
*/
# define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
/* Main entry point for walsender process */
int
@ -157,6 +147,9 @@ WalSenderMain(void)
return WalSndLoop ( ) ;
}
/*
* Execute commands from walreceiver , until we enter streaming mode .
*/
static void
WalSndHandshake ( void )
{
@ -172,6 +165,13 @@ WalSndHandshake(void)
/* Wait for a command to arrive */
firstchar = pq_getbyte ( ) ;
/*
* Emergency bailout if postmaster has died . This is to avoid the
* necessity for manual cleanup of all postmaster children .
*/
if ( ! PostmasterIsAlive ( true ) )
exit ( 1 ) ;
/*
* Check for any other interesting events that happened while we
* slept .
@ -211,7 +211,7 @@ WalSndHandshake(void)
/*
* Reply with a result set with one row , two columns .
* First col is system ID , and second if timeline ID
* First col is system ID , and second is timeline ID
*/
snprintf ( sysid , sizeof ( sysid ) , UINT64_FORMAT ,
@ -253,6 +253,7 @@ WalSndHandshake(void)
/* Send CommandComplete and ReadyForQuery messages */
EndCommand ( " SELECT " , DestRemote ) ;
ReadyForQuery ( DestRemote ) ;
/* ReadyForQuery did pq_flush for us */
}
else if ( sscanf ( query_string , " START_REPLICATION %X/%X " ,
& recptr . xlogid , & recptr . xrecoff ) = = 2 )
@ -365,12 +366,17 @@ CheckClosedConnection(void)
static int
WalSndLoop ( void )
{
StringInfoData output_message ;
char * output_message ;
bool caughtup = false ;
initStringInfo ( & output_message ) ;
/*
* Allocate buffer that will be used for each output message . We do this
* just once to reduce palloc overhead . The buffer must be made large
* enough for maximum - sized messages .
*/
output_message = palloc ( 1 + sizeof ( WalDataMessageHeader ) + MAX_SEND_SIZE ) ;
/* Loop forever */
/* Loop forever, unless we get an error */
for ( ; ; )
{
long remain ; /* remaining time (us) */
@ -381,6 +387,7 @@ WalSndLoop(void)
*/
if ( ! PostmasterIsAlive ( true ) )
exit ( 1 ) ;
/* Process any requests or signals received recently */
if ( got_SIGHUP )
{
@ -394,8 +401,8 @@ WalSndLoop(void)
*/
if ( ready_to_stop )
{
if ( ! XLogSend ( & output_message , & caughtup ) )
goto eof ;
if ( ! XLogSend ( output_message , & caughtup ) )
break ;
if ( caughtup )
shutdown_requested = true ;
}
@ -435,17 +442,15 @@ WalSndLoop(void)
remain - = NAPTIME_PER_CYCLE ;
}
}
/* Attempt to send the log once every loop */
if ( ! XLogSend ( & output_message , & caughtup ) )
goto eof ;
if ( ! XLogSend ( output_message , & caughtup ) )
break ;
}
/* can't get here because the above loop never exits */
return 1 ;
eof :
/*
* Get here on send failure . Clean up and exit .
*
* Reset whereToSendOutput to prevent ereport from attempting to send any
* more messages to the standby .
*/
@ -524,6 +529,9 @@ WalSndKill(int code, Datum arg)
/*
* Read ' nbytes ' bytes from WAL into ' buf ' , starting at location ' recptr '
*
* XXX probably this should be improved to suck data directly from the
* WAL buffers when possible .
*/
static void
XLogRead ( char * buf , XLogRecPtr recptr , Size nbytes )
@ -634,51 +642,46 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
/*
* Read up to MAX_SEND_SIZE bytes of WAL that ' s been written ( and flushed ) ,
* but not yet sent to the client , and send it . If there is no unsent WAL ,
* * caughtup is set to true and nothing is sent , otherwise * caughtup is set
* to false .
* but not yet sent to the client , and send it .
*
* msgbuf is a work area in which the output message is constructed . It ' s
* passed in just so we can avoid re - palloc ' ing the buffer on each cycle .
* It must be of size 1 + sizeof ( WalDataMessageHeader ) + MAX_SEND_SIZE .
*
* If there is no unsent WAL remaining , * caughtup is set to true , otherwise
* * caughtup is set to false .
*
* Returns true if OK , false if trouble .
*/
static bool
XLogSend ( StringInfo outMsg , bool * caughtup )
XLogSend ( char * msgbuf , bool * caughtup )
{
XLogRecPtr SendRqstPtr ;
XLogRecPtr startptr ;
XLogRecPtr endptr ;
Size nbytes ;
char activitymsg [ 50 ] ;
/* use volatile pointer to prevent code rearrangement */
volatile WalSnd * walsnd = MyWalSnd ;
WalDataMessageHeader msghdr ;
/* Attempt to send all records flushed to the disk already */
SendRqstPtr = GetWriteRecPtr ( ) ;
/* Quick exit if nothing to do */
if ( ! XLByteLT ( sentPtr , SendRqs tPtr) )
if ( XLByteLE ( SendRqstPtr , sen tPtr) )
{
* caughtup = true ;
return true ;
}
/*
* Otherwise let the caller know that we ' re not fully caught up . Unless
* there ' s a huge backlog , we ' ll be caught up to the current WriteRecPtr
* after we ' ve sent everything below , but more WAL could accumulate while
* we ' re busy sending .
*/
* caughtup = false ;
/*
* Figure out how much to send in one message . If there ' s less than
* Figure out how much to send in one message . If there ' s no more than
* MAX_SEND_SIZE bytes to send , send everything . Otherwise send
* MAX_SEND_SIZE bytes , but round to page boundary .
* MAX_SEND_SIZE bytes , but round to logfile or page boundary .
*
* The rounding is not only for performance reasons . Walreceiver
* relies on the fact that we never split a WAL record across two
* messages . Since a long WAL record is split at page boundary into
* continuation records , page boundary is always a safe cut - off point .
* We also assume that SendRqstPtr never points in the middle of a WAL
* We also assume that SendRqstPtr never points to the middle of a WAL
* record .
*/
startptr = sentPtr ;
@ -694,59 +697,78 @@ XLogSend(StringInfo outMsg, bool *caughtup)
endptr = startptr ;
XLByteAdvance ( endptr , MAX_SEND_SIZE ) ;
/* round down to page boundary. */
endptr . xrecoff - = ( endptr . xrecoff % XLOG_BLCKSZ ) ;
/* if we went beyond SendRqstPtr, back off */
if ( XLByteLT ( SendRqstPtr , endptr ) )
endptr = SendRqstPtr ;
/*
* OK to read and send the slice .
*
* We don ' t need to convert the xlogid / xrecoff from host byte order to
* network byte order because the both server can be expected to have
* the same byte order . If they have different byte order , we don ' t
* reach here .
*/
pq_sendbyte ( outMsg , ' w ' ) ;
pq_sendbytes ( outMsg , ( char * ) & startptr , sizeof ( startptr ) ) ;
if ( endptr . xlogid ! = startptr . xlogid )
{
/* Don't cross a logfile boundary within one message */
Assert ( endptr . xlogid = = startptr . xlogid + 1 ) ;
nbytes = endptr . xrecoff + XLogFileSize - startptr . xrecoff ;
endptr . xlogid = startptr . xlogid ;
endptr . xrecoff = XLogFileSize ;
}
/* if we went beyond SendRqstPtr, back off */
if ( XLByteLE ( SendRqstPtr , endptr ) )
{
endptr = SendRqstPtr ;
* caughtup = true ;
}
else
nbytes = endptr . xrecoff - startptr . xrecoff ;
{
/* round down to page boundary. */
endptr . xrecoff - = ( endptr . xrecoff % XLOG_BLCKSZ ) ;
* caughtup = false ;
}
sentPtr = endptr ;
nbytes = endptr . xrecoff - startptr . xrecoff ;
Assert ( nbytes < = MAX_SEND_SIZE ) ;
/*
* Read the log directly into the output buffer to prevent extra
* memcpy calls .
* OK to read and send the slice .
*/
enlargeStringInfo ( outMsg , nbytes ) ;
msgbuf [ 0 ] = ' w ' ;
XLogRead ( & outMsg - > data [ outMsg - > len ] , startptr , nbytes ) ;
outMsg - > len + = nbytes ;
outMsg - > data [ outMsg - > len ] = ' \0 ' ;
/*
* Read the log directly into the output buffer to avoid extra memcpy
* calls .
*/
XLogRead ( msgbuf + 1 + sizeof ( WalDataMessageHeader ) , startptr , nbytes ) ;
pq_putmessage ( ' d ' , outMsg - > data , outMsg - > len ) ;
resetStringInfo ( outMsg ) ;
/*
* We fill the message header last so that the send timestamp is taken
* as late as possible .
*/
msghdr . dataStart = startptr ;
msghdr . walEnd = SendRqstPtr ;
msghdr . sendTime = GetCurrentTimestamp ( ) ;
/* Update shared memory status */
SpinLockAcquire ( & walsnd - > mutex ) ;
walsnd - > sentPtr = sentPtr ;
SpinLockRelease ( & walsnd - > mutex ) ;
memcpy ( msgbuf + 1 , & msghdr , sizeof ( WalDataMessageHeader ) ) ;
pq_putmessage ( ' d ' , msgbuf , 1 + sizeof ( WalDataMessageHeader ) + nbytes ) ;
/* Flush pending output */
if ( pq_flush ( ) )
return false ;
sentPtr = endptr ;
/* Update shared memory status */
{
/* use volatile pointer to prevent code rearrangement */
volatile WalSnd * walsnd = MyWalSnd ;
SpinLockAcquire ( & walsnd - > mutex ) ;
walsnd - > sentPtr = sentPtr ;
SpinLockRelease ( & walsnd - > mutex ) ;
}
/* Report progress of XLOG streaming in PS display */
snprintf ( activitymsg , sizeof ( activitymsg ) , " streaming %X/%X " ,
sentPtr . xlogid , sentPtr . xrecoff ) ;
set_ps_display ( activitymsg , false ) ;
if ( update_process_title )
{
char activitymsg [ 50 ] ;
snprintf ( activitymsg , sizeof ( activitymsg ) , " streaming %X/%X " ,
sentPtr . xlogid , sentPtr . xrecoff ) ;
set_ps_display ( activitymsg , false ) ;
}
return true ;
}