@ -724,6 +724,18 @@ typedef struct XLogCtlData
XLogRecPtr lastFpwDisableRecPtr ;
slock_t info_lck ; /* locks shared variables shown above */
/*
* Variables used to track segment - boundary - crossing WAL records . See
* RegisterSegmentBoundary . Protected by segtrack_lck .
*/
XLogSegNo lastNotifiedSeg ;
XLogSegNo earliestSegBoundary ;
XLogRecPtr earliestSegBoundaryEndPtr ;
XLogSegNo latestSegBoundary ;
XLogRecPtr latestSegBoundaryEndPtr ;
slock_t segtrack_lck ; /* locks shared variables shown above */
} XLogCtlData ;
static XLogCtlData * XLogCtl = NULL ;
@ -920,6 +932,7 @@ static void RemoveXlogFile(const char *segname, XLogSegNo recycleSegNo,
XLogSegNo * endlogSegNo ) ;
static void UpdateLastRemovedPtr ( char * filename ) ;
static void ValidateXLOGDirectoryStructure ( void ) ;
static void RegisterSegmentBoundary ( XLogSegNo seg , XLogRecPtr pos ) ;
static void CleanupBackupHistory ( void ) ;
static void UpdateMinRecoveryPoint ( XLogRecPtr lsn , bool force ) ;
static XLogRecord * ReadRecord ( XLogReaderState * xlogreader ,
@ -1154,23 +1167,56 @@ XLogInsertRecord(XLogRecData *rdata,
END_CRIT_SECTION ( ) ;
/*
* Update shared LogwrtRqst . Write , if we crossed page boundary .
* If we crossed page boundary , update LogwrtRqst . Write ; if we crossed
* segment boundary , register that and wake up walwriter .
*/
if ( StartPos / XLOG_BLCKSZ ! = EndPos / XLOG_BLCKSZ )
{
XLogSegNo StartSeg ;
XLogSegNo EndSeg ;
XLByteToSeg ( StartPos , StartSeg , wal_segment_size ) ;
XLByteToSeg ( EndPos , EndSeg , wal_segment_size ) ;
/*
* Register our crossing the segment boundary if that occurred .
*
* Note that we did not use XLByteToPrevSeg ( ) for determining the
* ending segment . This is so that a record that fits perfectly into
* the end of the segment causes the latter to get marked ready for
* archival immediately .
*/
if ( StartSeg ! = EndSeg & & XLogArchivingActive ( ) )
RegisterSegmentBoundary ( EndSeg , EndPos ) ;
/*
* Advance LogwrtRqst . Write so that it includes new block ( s ) .
*
* We do this after registering the segment boundary so that the
* comparison with the flushed pointer below can use the latest value
* known globally .
*/
SpinLockAcquire ( & XLogCtl - > info_lck ) ;
/* advance global request to include new block(s) */
if ( XLogCtl - > LogwrtRqst . Write < EndPos )
XLogCtl - > LogwrtRqst . Write = EndPos ;
/* update local result copy while I have the chance */
LogwrtResult = XLogCtl - > LogwrtResult ;
SpinLockRelease ( & XLogCtl - > info_lck ) ;
/*
* There ' s a chance that the record was already flushed to disk and we
* missed marking segments as ready for archive . If this happens , we
* nudge the WALWriter , which will take care of notifying segments as
* needed .
*/
if ( StartSeg ! = EndSeg & & XLogArchivingActive ( ) & &
LogwrtResult . Flush > = EndPos & & ProcGlobal - > walwriterLatch )
SetLatch ( ProcGlobal - > walwriterLatch ) ;
}
/*
* If this was an XLOG_SWITCH record , flush the record and the empty
* padding space that fills the rest of the segment , and perform
* end - of - segment actions ( eg , notifying archiver ) .
* padding space that fills the rest of the segment .
*/
if ( isLogSwitch )
{
@ -2421,6 +2467,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
/* We should always be inside a critical section here */
Assert ( CritSectionCount > 0 ) ;
Assert ( LWLockHeldByMe ( WALWriteLock ) ) ;
/*
* Update local LogwrtResult ( caller probably did this already , but . . . )
@ -2586,11 +2633,12 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
* later . Doing it here ensures that one and only one backend will
* perform this fsync .
*
* This is also the right place to notify the Archiver that the
* segment is ready to copy to archival storage , and to update the
* timer for archive_timeout , and to signal for a checkpoint if
* too many logfile segments have been used since the last
* checkpoint .
* If WAL archiving is active , we attempt to notify the archiver
* of any segments that are now ready for archival .
*
* This is also the right place to update the timer for
* archive_timeout and to signal for a checkpoint if too many
* logfile segments have been used since the last checkpoint .
*/
if ( finishing_seg )
{
@ -2602,7 +2650,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
LogwrtResult . Flush = LogwrtResult . Write ; /* end of page */
if ( XLogArchivingActive ( ) )
XLogArchiveNotifySeg ( openLogSegNo ) ;
NotifySegmentsReadyForArchive ( LogwrtResult . Flush ) ;
XLogCtl - > lastSegSwitchTime = ( pg_time_t ) time ( NULL ) ;
XLogCtl - > lastSegSwitchLSN = LogwrtResult . Flush ;
@ -2690,6 +2738,9 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
XLogCtl - > LogwrtRqst . Flush = LogwrtResult . Flush ;
SpinLockRelease ( & XLogCtl - > info_lck ) ;
}
if ( XLogArchivingActive ( ) )
NotifySegmentsReadyForArchive ( LogwrtResult . Flush ) ;
}
/*
@ -4328,6 +4379,131 @@ ValidateXLOGDirectoryStructure(void)
}
}
/*
* RegisterSegmentBoundary
*
* WAL records that are split across a segment boundary require special
* treatment for archiving : the initial segment must not be archived until
* the end segment has been flushed , in case we crash before we have
* the chance to flush the end segment ( because after recovery we would
* overwrite that WAL record with a different one , and so the file we
* archived no longer represents truth . ) This also applies to streaming
* physical replication .
*
* To handle this , we keep track of the LSN of WAL records that cross
* segment boundaries . Two such are sufficient : the ones with the
* earliest and the latest end pointers we know about , since the flush
* position advances monotonically . WAL record writers register
* boundary - crossing records here , which is used by . ready file creation
* to delay until the end segment is known flushed .
*/
static void
RegisterSegmentBoundary ( XLogSegNo seg , XLogRecPtr endpos )
{
XLogSegNo segno PG_USED_FOR_ASSERTS_ONLY ;
/* verify caller computed segment number correctly */
AssertArg ( ( XLByteToSeg ( endpos , segno , wal_segment_size ) , segno = = seg ) ) ;
SpinLockAcquire ( & XLogCtl - > segtrack_lck ) ;
/*
* If no segment boundaries are registered , store the new segment boundary
* in earliestSegBoundary . Otherwise , store the greater segment
* boundaries in latestSegBoundary .
*/
if ( XLogCtl - > earliestSegBoundary = = MaxXLogSegNo )
{
XLogCtl - > earliestSegBoundary = seg ;
XLogCtl - > earliestSegBoundaryEndPtr = endpos ;
}
else if ( seg > XLogCtl - > earliestSegBoundary & &
( XLogCtl - > latestSegBoundary = = MaxXLogSegNo | |
seg > XLogCtl - > latestSegBoundary ) )
{
XLogCtl - > latestSegBoundary = seg ;
XLogCtl - > latestSegBoundaryEndPtr = endpos ;
}
SpinLockRelease ( & XLogCtl - > segtrack_lck ) ;
}
/*
* NotifySegmentsReadyForArchive
*
* Mark segments as ready for archival , given that it is safe to do so .
* This function is idempotent .
*/
void
NotifySegmentsReadyForArchive ( XLogRecPtr flushRecPtr )
{
XLogSegNo latest_boundary_seg ;
XLogSegNo last_notified ;
XLogSegNo flushed_seg ;
XLogSegNo seg ;
bool keep_latest ;
XLByteToSeg ( flushRecPtr , flushed_seg , wal_segment_size ) ;
SpinLockAcquire ( & XLogCtl - > segtrack_lck ) ;
if ( XLogCtl - > latestSegBoundary < = flushed_seg & &
XLogCtl - > latestSegBoundaryEndPtr < = flushRecPtr )
{
latest_boundary_seg = XLogCtl - > latestSegBoundary ;
keep_latest = false ;
}
else if ( XLogCtl - > earliestSegBoundary < = flushed_seg & &
XLogCtl - > earliestSegBoundaryEndPtr < = flushRecPtr )
{
latest_boundary_seg = XLogCtl - > earliestSegBoundary ;
keep_latest = true ;
}
else
{
SpinLockRelease ( & XLogCtl - > segtrack_lck ) ;
return ;
}
last_notified = XLogCtl - > lastNotifiedSeg ;
/*
* Update shared memory and discard segment boundaries that are no longer
* needed .
*
* It is safe to update shared memory before we attempt to create the
* . ready files . If our calls to XLogArchiveNotifySeg ( ) fail ,
* RemoveOldXlogFiles ( ) will retry it as needed .
*/
if ( last_notified < latest_boundary_seg - 1 )
XLogCtl - > lastNotifiedSeg = latest_boundary_seg - 1 ;
if ( keep_latest )
{
XLogCtl - > earliestSegBoundary = XLogCtl - > latestSegBoundary ;
XLogCtl - > earliestSegBoundaryEndPtr = XLogCtl - > latestSegBoundaryEndPtr ;
}
else
{
XLogCtl - > earliestSegBoundary = MaxXLogSegNo ;
XLogCtl - > earliestSegBoundaryEndPtr = InvalidXLogRecPtr ;
}
XLogCtl - > latestSegBoundary = MaxXLogSegNo ;
XLogCtl - > latestSegBoundaryEndPtr = InvalidXLogRecPtr ;
SpinLockRelease ( & XLogCtl - > segtrack_lck ) ;
/*
* Notify archiver about segments that are ready for archival ( by creating
* the corresponding . ready files ) .
*/
for ( seg = last_notified + 1 ; seg < latest_boundary_seg ; seg + + )
XLogArchiveNotifySeg ( seg , false ) ;
PgArchWakeup ( ) ;
}
/*
* Remove previous backup history files . This also retries creation of
* . ready files for any backup history files for which XLogArchiveNotify
@ -5230,9 +5406,17 @@ XLOGShmemInit(void)
SpinLockInit ( & XLogCtl - > Insert . insertpos_lck ) ;
SpinLockInit ( & XLogCtl - > info_lck ) ;
SpinLockInit ( & XLogCtl - > segtrack_lck ) ;
SpinLockInit ( & XLogCtl - > ulsn_lck ) ;
InitSharedLatch ( & XLogCtl - > recoveryWakeupLatch ) ;
ConditionVariableInit ( & XLogCtl - > recoveryNotPausedCV ) ;
/* Initialize stuff for marking segments as ready for archival. */
XLogCtl - > lastNotifiedSeg = MaxXLogSegNo ;
XLogCtl - > earliestSegBoundary = MaxXLogSegNo ;
XLogCtl - > earliestSegBoundaryEndPtr = InvalidXLogRecPtr ;
XLogCtl - > latestSegBoundary = MaxXLogSegNo ;
XLogCtl - > latestSegBoundaryEndPtr = InvalidXLogRecPtr ;
}
/*
@ -7873,6 +8057,20 @@ StartupXLOG(void)
XLogCtl - > LogwrtRqst . Write = EndOfLog ;
XLogCtl - > LogwrtRqst . Flush = EndOfLog ;
/*
* Initialize XLogCtl - > lastNotifiedSeg to the previous WAL file .
*/
if ( XLogArchivingActive ( ) )
{
XLogSegNo EndOfLogSeg ;
XLByteToSeg ( EndOfLog , EndOfLogSeg , wal_segment_size ) ;
SpinLockAcquire ( & XLogCtl - > segtrack_lck ) ;
XLogCtl - > lastNotifiedSeg = EndOfLogSeg - 1 ;
SpinLockRelease ( & XLogCtl - > segtrack_lck ) ;
}
/*
* Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE
* record before resource manager writes cleanup WAL records or checkpoint
@ -8000,7 +8198,7 @@ StartupXLOG(void)
XLogArchiveCleanup ( partialfname ) ;
durable_rename ( origpath , partialpath , ERROR ) ;
XLogArchiveNotify ( partialfname ) ;
XLogArchiveNotify ( partialfname , true ) ;
}
}
}