|
|
|
|
@ -302,6 +302,11 @@ static bool doPageWrites; |
|
|
|
|
* so it's a plain spinlock. The other locks are held longer (potentially |
|
|
|
|
* over I/O operations), so we use LWLocks for them. These locks are: |
|
|
|
|
* |
|
|
|
|
* WALBufMappingLock: must be held to replace a page in the WAL buffer cache. |
|
|
|
|
* It is only held while initializing and changing the mapping. If the |
|
|
|
|
* contents of the buffer being replaced haven't been written yet, the mapping |
|
|
|
|
* lock is released while the write is done, and reacquired afterwards. |
|
|
|
|
* |
|
|
|
|
* WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or |
|
|
|
|
* XLogFlush). |
|
|
|
|
* |
|
|
|
|
@ -468,32 +473,21 @@ typedef struct XLogCtlData |
|
|
|
|
pg_atomic_uint64 logFlushResult; /* last byte + 1 flushed */ |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Latest reserved for inititalization page in the cache (last byte |
|
|
|
|
* position + 1). |
|
|
|
|
* Latest initialized page in the cache (last byte position + 1). |
|
|
|
|
* |
|
|
|
|
* To change the identity of a buffer, you need to advance |
|
|
|
|
* InitializeReserved first. To change the identity of a buffer that's |
|
|
|
|
* To change the identity of a buffer (and InitializedUpTo), you need to |
|
|
|
|
* hold WALBufMappingLock. To change the identity of a buffer that's |
|
|
|
|
* still dirty, the old page needs to be written out first, and for that |
|
|
|
|
* you need WALWriteLock, and you need to ensure that there are no |
|
|
|
|
* in-progress insertions to the page by calling |
|
|
|
|
* WaitXLogInsertionsToFinish(). |
|
|
|
|
*/ |
|
|
|
|
pg_atomic_uint64 InitializeReserved; |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Latest initialized page in the cache (last byte position + 1). |
|
|
|
|
* |
|
|
|
|
* InitializedUpTo is updated after the buffer initialization. After |
|
|
|
|
* update, waiters got notification using InitializedUpToCondVar. |
|
|
|
|
*/ |
|
|
|
|
pg_atomic_uint64 InitializedUpTo; |
|
|
|
|
ConditionVariable InitializedUpToCondVar; |
|
|
|
|
XLogRecPtr InitializedUpTo; |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* These values do not change after startup, although the pointed-to pages |
|
|
|
|
* and xlblocks values certainly do. xlblocks values are changed |
|
|
|
|
* lock-free according to the check for the xlog write position and are |
|
|
|
|
* accompanied by changes of InitializeReserved and InitializedUpTo. |
|
|
|
|
* and xlblocks values certainly do. xlblocks values are protected by |
|
|
|
|
* WALBufMappingLock. |
|
|
|
|
*/ |
|
|
|
|
char *pages; /* buffers for unwritten XLOG pages */ |
|
|
|
|
pg_atomic_uint64 *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */ |
|
|
|
|
@ -816,9 +810,9 @@ XLogInsertRecord(XLogRecData *rdata, |
|
|
|
|
* fullPageWrites from changing until the insertion is finished. |
|
|
|
|
* |
|
|
|
|
* Step 2 can usually be done completely in parallel. If the required WAL |
|
|
|
|
* page is not initialized yet, you have to go through AdvanceXLInsertBuffer, |
|
|
|
|
* which will ensure it is initialized. But the WAL writer tries to do that |
|
|
|
|
* ahead of insertions to avoid that from happening in the critical path. |
|
|
|
|
* page is not initialized yet, you have to grab WALBufMappingLock to |
|
|
|
|
* initialize it, but the WAL writer tries to do that ahead of insertions |
|
|
|
|
* to avoid that from happening in the critical path. |
|
|
|
|
* |
|
|
|
|
*---------- |
|
|
|
|
*/ |
|
|
|
|
@ -1997,70 +1991,32 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) |
|
|
|
|
XLogRecPtr NewPageEndPtr = InvalidXLogRecPtr; |
|
|
|
|
XLogRecPtr NewPageBeginPtr; |
|
|
|
|
XLogPageHeader NewPage; |
|
|
|
|
XLogRecPtr ReservedPtr; |
|
|
|
|
int npages pg_attribute_unused() = 0; |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* We must run the loop below inside the critical section as we expect |
|
|
|
|
* XLogCtl->InitializedUpTo to eventually keep up. The most of callers |
|
|
|
|
* already run inside the critical section. Except for WAL writer, which |
|
|
|
|
* passed 'opportunistic == true', and therefore we don't perform |
|
|
|
|
* operations that could error out. |
|
|
|
|
* |
|
|
|
|
* Start an explicit critical section anyway though. |
|
|
|
|
*/ |
|
|
|
|
Assert(CritSectionCount > 0 || opportunistic); |
|
|
|
|
START_CRIT_SECTION(); |
|
|
|
|
LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE); |
|
|
|
|
|
|
|
|
|
/*--
|
|
|
|
|
* Loop till we get all the pages in WAL buffer before 'upto' reserved for |
|
|
|
|
* initialization. Multiple process can initialize different buffers with |
|
|
|
|
* this loop in parallel as following. |
|
|
|
|
* |
|
|
|
|
* 1. Reserve page for initialization using XLogCtl->InitializeReserved. |
|
|
|
|
* 2. Initialize the reserved page. |
|
|
|
|
* 3. Attempt to advance XLogCtl->InitializedUpTo, |
|
|
|
|
/*
|
|
|
|
|
* Now that we have the lock, check if someone initialized the page |
|
|
|
|
* already. |
|
|
|
|
*/ |
|
|
|
|
ReservedPtr = pg_atomic_read_u64(&XLogCtl->InitializeReserved); |
|
|
|
|
while (upto >= ReservedPtr || opportunistic) |
|
|
|
|
while (upto >= XLogCtl->InitializedUpTo || opportunistic) |
|
|
|
|
{ |
|
|
|
|
Assert(ReservedPtr % XLOG_BLCKSZ == 0); |
|
|
|
|
nextidx = XLogRecPtrToBufIdx(XLogCtl->InitializedUpTo); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Get ending-offset of the buffer page we need to replace. |
|
|
|
|
* |
|
|
|
|
* We don't lookup into xlblocks, but rather calculate position we |
|
|
|
|
* must wait to be written. If it was written, xlblocks will have this |
|
|
|
|
* position (or uninitialized) |
|
|
|
|
* Get ending-offset of the buffer page we need to replace (this may |
|
|
|
|
* be zero if the buffer hasn't been used yet). Fall through if it's |
|
|
|
|
* already written out. |
|
|
|
|
*/ |
|
|
|
|
if (ReservedPtr + XLOG_BLCKSZ > XLOG_BLCKSZ * XLOGbuffers) |
|
|
|
|
OldPageRqstPtr = ReservedPtr + XLOG_BLCKSZ - XLOG_BLCKSZ * XLOGbuffers; |
|
|
|
|
else |
|
|
|
|
OldPageRqstPtr = InvalidXLogRecPtr; |
|
|
|
|
|
|
|
|
|
if (LogwrtResult.Write < OldPageRqstPtr && opportunistic) |
|
|
|
|
OldPageRqstPtr = pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx]); |
|
|
|
|
if (LogwrtResult.Write < OldPageRqstPtr) |
|
|
|
|
{ |
|
|
|
|
/*
|
|
|
|
|
* If we just want to pre-initialize as much as we can without |
|
|
|
|
* flushing, give up now. |
|
|
|
|
* Nope, got work to do. If we just want to pre-initialize as much |
|
|
|
|
* as we can without flushing, give up now. |
|
|
|
|
*/ |
|
|
|
|
upto = ReservedPtr - 1; |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Attempt to reserve the page for initialization. Failure means that |
|
|
|
|
* this page got reserved by another process. |
|
|
|
|
*/ |
|
|
|
|
if (!pg_atomic_compare_exchange_u64(&XLogCtl->InitializeReserved, |
|
|
|
|
&ReservedPtr, |
|
|
|
|
ReservedPtr + XLOG_BLCKSZ)) |
|
|
|
|
continue; |
|
|
|
|
|
|
|
|
|
/* Fall through if it's already written out. */ |
|
|
|
|
if (LogwrtResult.Write < OldPageRqstPtr) |
|
|
|
|
{ |
|
|
|
|
/* Nope, got work to do. */ |
|
|
|
|
if (opportunistic) |
|
|
|
|
break; |
|
|
|
|
|
|
|
|
|
/* Advance shared memory write request position */ |
|
|
|
|
SpinLockAcquire(&XLogCtl->info_lck); |
|
|
|
|
@ -2075,6 +2031,14 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) |
|
|
|
|
RefreshXLogWriteResult(LogwrtResult); |
|
|
|
|
if (LogwrtResult.Write < OldPageRqstPtr) |
|
|
|
|
{ |
|
|
|
|
/*
|
|
|
|
|
* Must acquire write lock. Release WALBufMappingLock first, |
|
|
|
|
* to make sure that all insertions that we need to wait for |
|
|
|
|
* can finish (up to this same position). Otherwise we risk |
|
|
|
|
* deadlock. |
|
|
|
|
*/ |
|
|
|
|
LWLockRelease(WALBufMappingLock); |
|
|
|
|
|
|
|
|
|
WaitXLogInsertionsToFinish(OldPageRqstPtr); |
|
|
|
|
|
|
|
|
|
LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); |
|
|
|
|
@ -2096,6 +2060,9 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) |
|
|
|
|
pgWalUsage.wal_buffers_full++; |
|
|
|
|
TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE(); |
|
|
|
|
} |
|
|
|
|
/* Re-acquire WALBufMappingLock and retry */ |
|
|
|
|
LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -2103,17 +2070,10 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) |
|
|
|
|
* Now the next buffer slot is free and we can set it up to be the |
|
|
|
|
* next output page. |
|
|
|
|
*/ |
|
|
|
|
NewPageBeginPtr = ReservedPtr; |
|
|
|
|
NewPageBeginPtr = XLogCtl->InitializedUpTo; |
|
|
|
|
NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ; |
|
|
|
|
nextidx = XLogRecPtrToBufIdx(ReservedPtr); |
|
|
|
|
|
|
|
|
|
#ifdef USE_ASSERT_CHECKING |
|
|
|
|
{ |
|
|
|
|
XLogRecPtr storedBound = pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx]); |
|
|
|
|
|
|
|
|
|
Assert(storedBound == OldPageRqstPtr || storedBound == InvalidXLogRecPtr); |
|
|
|
|
} |
|
|
|
|
#endif |
|
|
|
|
Assert(XLogRecPtrToBufIdx(NewPageBeginPtr) == nextidx); |
|
|
|
|
|
|
|
|
|
NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ); |
|
|
|
|
|
|
|
|
|
@ -2179,50 +2139,11 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) |
|
|
|
|
pg_write_barrier(); |
|
|
|
|
|
|
|
|
|
pg_atomic_write_u64(&XLogCtl->xlblocks[nextidx], NewPageEndPtr); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Try to advance XLogCtl->InitializedUpTo. |
|
|
|
|
* |
|
|
|
|
* If the CAS operation failed, then some of previous pages are not |
|
|
|
|
* initialized yet, and this backend gives up. |
|
|
|
|
* |
|
|
|
|
* Since initializer of next page might give up on advancing of |
|
|
|
|
* InitializedUpTo, this backend have to attempt advancing until it |
|
|
|
|
* find page "in the past" or concurrent backend succeeded at |
|
|
|
|
* advancing. When we finish advancing XLogCtl->InitializedUpTo, we |
|
|
|
|
* notify all the waiters with XLogCtl->InitializedUpToCondVar. |
|
|
|
|
*/ |
|
|
|
|
while (pg_atomic_compare_exchange_u64(&XLogCtl->InitializedUpTo, &NewPageBeginPtr, NewPageEndPtr)) |
|
|
|
|
{ |
|
|
|
|
NewPageBeginPtr = NewPageEndPtr; |
|
|
|
|
NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ; |
|
|
|
|
nextidx = XLogRecPtrToBufIdx(NewPageBeginPtr); |
|
|
|
|
|
|
|
|
|
if (pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx]) != NewPageEndPtr) |
|
|
|
|
{ |
|
|
|
|
/*
|
|
|
|
|
* Page at nextidx wasn't initialized yet, so we cann't move |
|
|
|
|
* InitializedUpto further. It will be moved by backend which |
|
|
|
|
* will initialize nextidx. |
|
|
|
|
*/ |
|
|
|
|
ConditionVariableBroadcast(&XLogCtl->InitializedUpToCondVar); |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
XLogCtl->InitializedUpTo = NewPageEndPtr; |
|
|
|
|
|
|
|
|
|
npages++; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
END_CRIT_SECTION(); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* All the pages in WAL buffer before 'upto' were reserved for |
|
|
|
|
* initialization. However, some pages might be reserved by concurrent |
|
|
|
|
* processes. Wait till they finish initialization. |
|
|
|
|
*/ |
|
|
|
|
while (upto >= pg_atomic_read_u64(&XLogCtl->InitializedUpTo)) |
|
|
|
|
ConditionVariableSleep(&XLogCtl->InitializedUpToCondVar, WAIT_EVENT_WAL_BUFFER_INIT); |
|
|
|
|
ConditionVariableCancelSleep(); |
|
|
|
|
LWLockRelease(WALBufMappingLock); |
|
|
|
|
|
|
|
|
|
#ifdef WAL_DEBUG |
|
|
|
|
if (XLOG_DEBUG && npages > 0) |
|
|
|
|
@ -5123,10 +5044,6 @@ XLOGShmemInit(void) |
|
|
|
|
pg_atomic_init_u64(&XLogCtl->logWriteResult, InvalidXLogRecPtr); |
|
|
|
|
pg_atomic_init_u64(&XLogCtl->logFlushResult, InvalidXLogRecPtr); |
|
|
|
|
pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr); |
|
|
|
|
|
|
|
|
|
pg_atomic_init_u64(&XLogCtl->InitializeReserved, InvalidXLogRecPtr); |
|
|
|
|
pg_atomic_init_u64(&XLogCtl->InitializedUpTo, InvalidXLogRecPtr); |
|
|
|
|
ConditionVariableInit(&XLogCtl->InitializedUpToCondVar); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
@ -6146,7 +6063,7 @@ StartupXLOG(void) |
|
|
|
|
memset(page + len, 0, XLOG_BLCKSZ - len); |
|
|
|
|
|
|
|
|
|
pg_atomic_write_u64(&XLogCtl->xlblocks[firstIdx], endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ); |
|
|
|
|
pg_atomic_write_u64(&XLogCtl->InitializedUpTo, endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ); |
|
|
|
|
XLogCtl->InitializedUpTo = endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ; |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
@ -6155,9 +6072,8 @@ StartupXLOG(void) |
|
|
|
|
* let the first attempt to insert a log record to initialize the next |
|
|
|
|
* buffer. |
|
|
|
|
*/ |
|
|
|
|
pg_atomic_write_u64(&XLogCtl->InitializedUpTo, EndOfLog); |
|
|
|
|
XLogCtl->InitializedUpTo = EndOfLog; |
|
|
|
|
} |
|
|
|
|
pg_atomic_write_u64(&XLogCtl->InitializeReserved, pg_atomic_read_u64(&XLogCtl->InitializedUpTo)); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Update local and shared status. This is OK to do without any locks |
|
|
|
|
|