@ -129,8 +129,14 @@ bool log_replication_commands = false;
*/
bool wake_wal_senders = false ;
static WALOpenSegment * sendSeg = NULL ;
static WALSegmentContext * sendCxt = NULL ;
/*
* Physical walsender does not use xlogreader to read WAL , but it does use a
* fake one to keep state . Logical walsender uses a proper xlogreader . Both
* keep the ' xlogreader ' pointer to the right one , for the sake of common
* routines .
*/
static XLogReaderState fake_xlogreader ;
static XLogReaderState * xlogreader ;
/*
* These variables keep track of the state of the timeline we ' re currently
@ -248,8 +254,8 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
static TimeOffset LagTrackerRead ( int head , XLogRecPtr lsn , TimestampTz now ) ;
static bool TransactionIdInRecentPast ( TransactionId xid , uint32 epoch ) ;
static int WalSndSegmentOpen ( XLogReaderState * state , XLogSegNo nextSegNo ,
WALSegmentContext * segcxt , TimeLineID * tli_p ) ;
static void WalSndSegmentOpen ( XLogReaderState * state , XLogSegNo nextSegNo ,
TimeLineID * tli_p ) ;
static void UpdateSpillStats ( LogicalDecodingContext * ctx ) ;
@ -280,12 +286,19 @@ InitWalSender(void)
/* Initialize empty timestamp buffer for lag tracking. */
lag_tracker = MemoryContextAllocZero ( TopMemoryContext , sizeof ( LagTracker ) ) ;
/* Make sure we can remember the current read position in XLOG. */
sendSeg = ( WALOpenSegment * )
MemoryContextAlloc ( TopMemoryContext , sizeof ( WALOpenSegment ) ) ;
sendCxt = ( WALSegmentContext * )
MemoryContextAlloc ( TopMemoryContext , sizeof ( WALSegmentContext ) ) ;
WALOpenSegmentInit ( sendSeg , sendCxt , wal_segment_size , NULL ) ;
/*
* Prepare physical walsender ' s fake xlogreader struct . Logical walsender
* does this later .
*/
if ( ! am_db_walsender )
{
xlogreader = & fake_xlogreader ;
xlogreader - > routine =
* XL_ROUTINE ( . segment_open = WalSndSegmentOpen ,
. segment_close = wal_segment_close ) ;
WALOpenSegmentInit ( & xlogreader - > seg , & xlogreader - > segcxt ,
wal_segment_size , NULL ) ;
}
}
/*
@ -302,11 +315,8 @@ WalSndErrorCleanup(void)
ConditionVariableCancelSleep ( ) ;
pgstat_report_wait_end ( ) ;
if ( sendSeg - > ws_file > = 0 )
{
close ( sendSeg - > ws_file ) ;
sendSeg - > ws_file = - 1 ;
}
if ( xlogreader - > seg . ws_file > = 0 )
wal_segment_close ( xlogreader ) ;
if ( MyReplicationSlot ! = NULL )
ReplicationSlotRelease ( ) ;
@ -837,11 +847,9 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
cur_page ,
targetPagePtr ,
XLOG_BLCKSZ ,
sendSeg - > ws_tli , /* Pass the current TLI because only
state - > seg . ws_tli , /* Pass the current TLI because only
* WalSndSegmentOpen controls whether new
* TLI is needed . */
sendSeg ,
sendCxt ,
& errinfo ) )
WALReadRaiseError ( & errinfo ) ;
@ -852,8 +860,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
* read ( ) succeeds in that case , but the data we tried to read might
* already have been overwritten with new WAL records .
*/
XLByteToSeg ( targetPagePtr , segno , sendCx t - > ws_segsize ) ;
CheckXLogRemoved ( segno , sendSeg - > ws_tli ) ;
XLByteToSeg ( targetPagePtr , segno , state - > segcxt . ws_segsize ) ;
CheckXLogRemoved ( segno , state - > seg . ws_tli ) ;
return count ;
}
@ -1176,6 +1184,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
. segment_close = wal_segment_close ) ,
WalSndPrepareWrite , WalSndWriteData ,
WalSndUpdateProgress ) ;
xlogreader = logical_decoding_ctx - > reader ;
WalSndSetState ( WALSNDSTATE_CATCHUP ) ;
@ -2447,13 +2456,11 @@ WalSndKill(int code, Datum arg)
}
/* XLogReaderRoutine->segment_open callback */
static int
WalSndSegmentOpen ( XLogReaderState * state ,
XLogSegNo nextSegNo , WALSegmentContext * segcxt ,
static void
WalSndSegmentOpen ( XLogReaderState * state , XLogSegNo nextSegNo ,
TimeLineID * tli_p )
{
char path [ MAXPGPATH ] ;
int fd ;
/*-------
* When reading from a historic timeline , and there is a timeline switch
@ -2484,15 +2491,15 @@ WalSndSegmentOpen(XLogReaderState *state,
{
XLogSegNo endSegNo ;
XLByteToSeg ( sendTimeLineValidUpto , endSegNo , segcxt - > ws_segsize ) ;
if ( sendSeg - > ws_segno = = endSegNo )
XLByteToSeg ( sendTimeLineValidUpto , endSegNo , stat e - > segcxt . ws_segsize ) ;
if ( state - > seg . ws_segno = = endSegNo )
* tli_p = sendTimeLineNextTLI ;
}
XLogFilePath ( path , * tli_p , nextSegNo , segcxt - > ws_segsize ) ;
fd = BasicOpenFile ( path , O_RDONLY | PG_BINARY ) ;
if ( fd > = 0 )
return fd ;
XLogFilePath ( path , * tli_p , nextSegNo , stat e - > segcxt . ws_segsize ) ;
state - > seg . ws_file = BasicOpenFile ( path , O_RDONLY | PG_BINARY ) ;
if ( state - > seg . ws_file > = 0 )
return ;
/*
* If the file is not found , assume it ' s because the standby asked for a
@ -2515,7 +2522,6 @@ WalSndSegmentOpen(XLogReaderState *state,
( errcode_for_file_access ( ) ,
errmsg ( " could not open file \" %s \" : %m " ,
path ) ) ) ;
return - 1 ; /* keep compiler quiet */
}
/*
@ -2537,12 +2543,6 @@ XLogSendPhysical(void)
Size nbytes ;
XLogSegNo segno ;
WALReadError errinfo ;
static XLogReaderState fake_xlogreader =
{
/* Fake xlogreader state for WALRead */
. routine . segment_open = WalSndSegmentOpen ,
. routine . segment_close = wal_segment_close
} ;
/* If requested switch the WAL sender to the stopping state. */
if ( got_STOPPING )
@ -2685,9 +2685,8 @@ XLogSendPhysical(void)
if ( sendTimeLineIsHistoric & & sendTimeLineValidUpto < = sentPtr )
{
/* close the current file. */
if ( sendSeg - > ws_file > = 0 )
close ( sendSeg - > ws_file ) ;
sendSeg - > ws_file = - 1 ;
if ( xlogreader - > seg . ws_file > = 0 )
wal_segment_close ( xlogreader ) ;
/* Send CopyDone */
pq_putmessage_noblock ( ' c ' , NULL , 0 ) ;
@ -2760,21 +2759,19 @@ XLogSendPhysical(void)
enlargeStringInfo ( & output_message , nbytes ) ;
retry :
if ( ! WALRead ( & fake_ xlogreader,
if ( ! WALRead ( xlogreader ,
& output_message . data [ output_message . len ] ,
startptr ,
nbytes ,
sendSeg - > ws_tli , /* Pass the current TLI because only
* WalSndSegmentOpen controls whether new
* TLI is needed . */
sendSeg ,
sendCxt ,
xlogreader - > seg . ws_tli , /* Pass the current TLI because
* only WalSndSegmentOpen controls
* whether new TLI is needed . */
& errinfo ) )
WALReadRaiseError ( & errinfo ) ;
/* See logical_read_xlog_page(). */
XLByteToSeg ( startptr , segno , sendCxt - > ws_segsize ) ;
CheckXLogRemoved ( segno , sendSeg - > ws_tli ) ;
XLByteToSeg ( startptr , segno , xlogreader - > segcxt . ws_segsize ) ;
CheckXLogRemoved ( segno , xlogreader - > seg . ws_tli ) ;
/*
* During recovery , the currently - open WAL file might be replaced with the
@ -2792,10 +2789,9 @@ retry:
walsnd - > needreload = false ;
SpinLockRelease ( & walsnd - > mutex ) ;
if ( reload & & sendSeg - > ws_file > = 0 )
if ( reload & & xlogreader - > seg . ws_file > = 0 )
{
close ( sendSeg - > ws_file ) ;
sendSeg - > ws_file = - 1 ;
wal_segment_close ( xlogreader ) ;
goto retry ;
}