Revert "Get rid of WALBufMappingLock"

This reverts commit bc22dc0e0d.
It appears that conditional variables are not suitable for use inside
critical sections.  If WaitLatch()/WaitEventSetWaitBlock() face postmaster
death, they exit, releasing all locks instead of PANIC.  In certain
situations, this leads to data corruption.

Reported-by: Andrey Borodin <x4mmm@yandex-team.ru>
Discussion: https://postgr.es/m/B3C69B86-7F82-4111-B97F-0005497BB745%40yandex-team.ru
Reviewed-by: Andrey Borodin <x4mmm@yandex-team.ru>
Reviewed-by: Aleksander Alekseev <aleksander@tigerdata.com>
Reviewed-by: Kirill Reshke <reshkekirill@gmail.com>
Reviewed-by: Tom Lane <tgl@sss.pgh.pa.us>
Reviewed-by: Thomas Munro <thomas.munro@gmail.com>
Reviewed-by: Tomas Vondra <tomas@vondra.me>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Yura Sokolov <y.sokolov@postgrespro.ru>
Reviewed-by: Michael Paquier <michael@paquier.xyz>
Backpatch-through: 18
REL_18_STABLE
Alexander Korotkov 3 weeks ago
parent 8c0c868637
commit 2ce6abdf50
  1. 234
      src/backend/access/transam/xlog.c
  2. 2
      src/backend/utils/activity/wait_event_names.txt
  3. 2
      src/include/storage/lwlocklist.h

@ -303,6 +303,11 @@ static bool doPageWrites;
* so it's a plain spinlock. The other locks are held longer (potentially
* over I/O operations), so we use LWLocks for them. These locks are:
*
* WALBufMappingLock: must be held to replace a page in the WAL buffer cache.
* It is only held while initializing and changing the mapping. If the
* contents of the buffer being replaced haven't been written yet, the mapping
* lock is released while the write is done, and reacquired afterwards.
*
* WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
* XLogFlush).
*
@ -469,37 +474,21 @@ typedef struct XLogCtlData
pg_atomic_uint64 logFlushResult; /* last byte + 1 flushed */
/*
* First initialized page in the cache (first byte position).
*/
XLogRecPtr InitializedFrom;
/*
* Latest reserved for initialization page in the cache (last byte
* position + 1).
* Latest initialized page in the cache (last byte position + 1).
*
* To change the identity of a buffer, you need to advance
* InitializeReserved first. To change the identity of a buffer that's
* To change the identity of a buffer (and InitializedUpTo), you need to
* hold WALBufMappingLock. To change the identity of a buffer that's
* still dirty, the old page needs to be written out first, and for that
* you need WALWriteLock, and you need to ensure that there are no
* in-progress insertions to the page by calling
* WaitXLogInsertionsToFinish().
*/
pg_atomic_uint64 InitializeReserved;
/*
* Latest initialized page in the cache (last byte position + 1).
*
* InitializedUpTo is updated after the buffer initialization. After
* update, waiters got notification using InitializedUpToCondVar.
*/
pg_atomic_uint64 InitializedUpTo;
ConditionVariable InitializedUpToCondVar;
XLogRecPtr InitializedUpTo;
/*
* These values do not change after startup, although the pointed-to pages
* and xlblocks values certainly do. xlblocks values are changed
* lock-free according to the check for the xlog write position and are
* accompanied by changes of InitializeReserved and InitializedUpTo.
* and xlblocks values certainly do. xlblocks values are protected by
* WALBufMappingLock.
*/
char *pages; /* buffers for unwritten XLOG pages */
pg_atomic_uint64 *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */
@ -822,9 +811,9 @@ XLogInsertRecord(XLogRecData *rdata,
* fullPageWrites from changing until the insertion is finished.
*
* Step 2 can usually be done completely in parallel. If the required WAL
* page is not initialized yet, you have to go through AdvanceXLInsertBuffer,
* which will ensure it is initialized. But the WAL writer tries to do that
* ahead of insertions to avoid that from happening in the critical path.
* page is not initialized yet, you have to grab WALBufMappingLock to
* initialize it, but the WAL writer tries to do that ahead of insertions
* to avoid that from happening in the critical path.
*
*----------
*/
@ -2006,79 +1995,32 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
XLogRecPtr NewPageEndPtr = InvalidXLogRecPtr;
XLogRecPtr NewPageBeginPtr;
XLogPageHeader NewPage;
XLogRecPtr ReservedPtr;
int npages pg_attribute_unused() = 0;
/*
* We must run the loop below inside the critical section as we expect
* XLogCtl->InitializedUpTo to eventually keep up. The most of callers
* already run inside the critical section. Except for WAL writer, which
* passed 'opportunistic == true', and therefore we don't perform
* operations that could error out.
*
* Start an explicit critical section anyway though.
*/
Assert(CritSectionCount > 0 || opportunistic);
START_CRIT_SECTION();
LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
/*--
* Loop till we get all the pages in WAL buffer before 'upto' reserved for
* initialization. Multiple process can initialize different buffers with
* this loop in parallel as following.
*
* 1. Reserve page for initialization using XLogCtl->InitializeReserved.
* 2. Initialize the reserved page.
* 3. Attempt to advance XLogCtl->InitializedUpTo,
/*
* Now that we have the lock, check if someone initialized the page
* already.
*/
ReservedPtr = pg_atomic_read_u64(&XLogCtl->InitializeReserved);
while (upto >= ReservedPtr || opportunistic)
while (upto >= XLogCtl->InitializedUpTo || opportunistic)
{
Assert(ReservedPtr % XLOG_BLCKSZ == 0);
nextidx = XLogRecPtrToBufIdx(XLogCtl->InitializedUpTo);
/*
* Get ending-offset of the buffer page we need to replace.
*
* We don't lookup into xlblocks, but rather calculate position we
* must wait to be written. If it was written, xlblocks will have this
* position (or uninitialized)
* Get ending-offset of the buffer page we need to replace (this may
* be zero if the buffer hasn't been used yet). Fall through if it's
* already written out.
*/
if (ReservedPtr + XLOG_BLCKSZ > XLogCtl->InitializedFrom + XLOG_BLCKSZ * XLOGbuffers)
OldPageRqstPtr = ReservedPtr + XLOG_BLCKSZ - (XLogRecPtr) XLOG_BLCKSZ * XLOGbuffers;
else
OldPageRqstPtr = InvalidXLogRecPtr;
if (LogwrtResult.Write < OldPageRqstPtr && opportunistic)
OldPageRqstPtr = pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx]);
if (LogwrtResult.Write < OldPageRqstPtr)
{
/*
* If we just want to pre-initialize as much as we can without
* flushing, give up now.
* Nope, got work to do. If we just want to pre-initialize as much
* as we can without flushing, give up now.
*/
upto = ReservedPtr - 1;
break;
}
/*
* Attempt to reserve the page for initialization. Failure means that
* this page got reserved by another process.
*/
if (!pg_atomic_compare_exchange_u64(&XLogCtl->InitializeReserved,
&ReservedPtr,
ReservedPtr + XLOG_BLCKSZ))
continue;
/*
* Wait till page gets correctly initialized up to OldPageRqstPtr.
*/
nextidx = XLogRecPtrToBufIdx(ReservedPtr);
while (pg_atomic_read_u64(&XLogCtl->InitializedUpTo) < OldPageRqstPtr)
ConditionVariableSleep(&XLogCtl->InitializedUpToCondVar, WAIT_EVENT_WAL_BUFFER_INIT);
ConditionVariableCancelSleep();
Assert(pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx]) == OldPageRqstPtr);
/* Fall through if it's already written out. */
if (LogwrtResult.Write < OldPageRqstPtr)
{
/* Nope, got work to do. */
if (opportunistic)
break;
/* Advance shared memory write request position */
SpinLockAcquire(&XLogCtl->info_lck);
@ -2093,6 +2035,14 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
RefreshXLogWriteResult(LogwrtResult);
if (LogwrtResult.Write < OldPageRqstPtr)
{
/*
* Must acquire write lock. Release WALBufMappingLock first,
* to make sure that all insertions that we need to wait for
* can finish (up to this same position). Otherwise we risk
* deadlock.
*/
LWLockRelease(WALBufMappingLock);
WaitXLogInsertionsToFinish(OldPageRqstPtr);
LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
@ -2120,6 +2070,9 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
*/
pgstat_report_fixed = true;
}
/* Re-acquire WALBufMappingLock and retry */
LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
continue;
}
}
@ -2127,9 +2080,11 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
* Now the next buffer slot is free and we can set it up to be the
* next output page.
*/
NewPageBeginPtr = ReservedPtr;
NewPageBeginPtr = XLogCtl->InitializedUpTo;
NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ;
Assert(XLogRecPtrToBufIdx(NewPageBeginPtr) == nextidx);
NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
/*
@ -2193,100 +2148,12 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
*/
pg_write_barrier();
/*-----
* Update the value of XLogCtl->xlblocks[nextidx] and try to advance
* XLogCtl->InitializedUpTo in a lock-less manner.
*
* First, let's provide a formal proof of the algorithm. Let it be 'n'
* process with the following variables in shared memory:
* f - an array of 'n' boolean flags,
* v - atomic integer variable.
*
* Also, let
* i - a number of a process,
* j - local integer variable,
* CAS(var, oldval, newval) - compare-and-swap atomic operation
* returning true on success,
* write_barrier()/read_barrier() - memory barriers.
*
* The pseudocode for each process is the following.
*
* j := i
* f[i] := true
* write_barrier()
* while CAS(v, j, j + 1):
* j := j + 1
* read_barrier()
* if not f[j]:
* break
*
* Let's prove that v eventually reaches the value of n.
* 1. Prove by contradiction. Assume v doesn't reach n and stucks
* on k, where k < n.
* 2. Process k attempts CAS(v, k, k + 1). 1). If, as we assumed, v
* gets stuck at k, then this CAS operation must fail. Therefore,
* v < k when process k attempts CAS(v, k, k + 1).
* 3. If, as we assumed, v gets stuck at k, then the value k of v
* must be achieved by some process m, where m < k. The process
* m must observe f[k] == false. Otherwise, it will later attempt
* CAS(v, k, k + 1) with success.
* 4. Therefore, corresponding read_barrier() (while j == k) on
* process m reached before write_barrier() of process k. But then
* process k attempts CAS(v, k, k + 1) after process m successfully
* incremented v to k, and that CAS operation must succeed.
* That leads to a contradiction. So, there is no such k (k < n)
* where v gets stuck. Q.E.D.
*
* To apply this proof to the code below, we assume
* XLogCtl->InitializedUpTo will play the role of v with XLOG_BLCKSZ
* granularity. We also assume setting XLogCtl->xlblocks[nextidx] to
* NewPageEndPtr to play the role of setting f[i] to true. Also, note
* that processes can't concurrently map different xlog locations to
* the same nextidx because we previously requested that
* XLogCtl->InitializedUpTo >= OldPageRqstPtr. So, a xlog buffer can
* be taken for initialization only once the previous initialization
* takes effect on XLogCtl->InitializedUpTo.
*/
pg_atomic_write_u64(&XLogCtl->xlblocks[nextidx], NewPageEndPtr);
pg_write_barrier();
while (pg_atomic_compare_exchange_u64(&XLogCtl->InitializedUpTo, &NewPageBeginPtr, NewPageEndPtr))
{
NewPageBeginPtr = NewPageEndPtr;
NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ;
nextidx = XLogRecPtrToBufIdx(NewPageBeginPtr);
pg_read_barrier();
if (pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx]) != NewPageEndPtr)
{
/*
* Page at nextidx wasn't initialized yet, so we can't move
* InitializedUpto further. It will be moved by backend which
* will initialize nextidx.
*/
ConditionVariableBroadcast(&XLogCtl->InitializedUpToCondVar);
break;
}
}
XLogCtl->InitializedUpTo = NewPageEndPtr;
npages++;
}
END_CRIT_SECTION();
/*
* All the pages in WAL buffer before 'upto' were reserved for
* initialization. However, some pages might be reserved by concurrent
* processes. Wait till they finish initialization.
*/
while (upto >= pg_atomic_read_u64(&XLogCtl->InitializedUpTo))
ConditionVariableSleep(&XLogCtl->InitializedUpToCondVar, WAIT_EVENT_WAL_BUFFER_INIT);
ConditionVariableCancelSleep();
pg_read_barrier();
LWLockRelease(WALBufMappingLock);
#ifdef WAL_DEBUG
if (XLOG_DEBUG && npages > 0)
@ -5195,10 +5062,6 @@ XLOGShmemInit(void)
pg_atomic_init_u64(&XLogCtl->logWriteResult, InvalidXLogRecPtr);
pg_atomic_init_u64(&XLogCtl->logFlushResult, InvalidXLogRecPtr);
pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr);
pg_atomic_init_u64(&XLogCtl->InitializeReserved, InvalidXLogRecPtr);
pg_atomic_init_u64(&XLogCtl->InitializedUpTo, InvalidXLogRecPtr);
ConditionVariableInit(&XLogCtl->InitializedUpToCondVar);
}
/*
@ -6218,8 +6081,7 @@ StartupXLOG(void)
memset(page + len, 0, XLOG_BLCKSZ - len);
pg_atomic_write_u64(&XLogCtl->xlblocks[firstIdx], endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ);
pg_atomic_write_u64(&XLogCtl->InitializedUpTo, endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ);
XLogCtl->InitializedFrom = endOfRecoveryInfo->lastPageBeginPtr;
XLogCtl->InitializedUpTo = endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ;
}
else
{
@ -6228,10 +6090,8 @@ StartupXLOG(void)
* let the first attempt to insert a log record to initialize the next
* buffer.
*/
pg_atomic_write_u64(&XLogCtl->InitializedUpTo, EndOfLog);
XLogCtl->InitializedFrom = EndOfLog;
XLogCtl->InitializedUpTo = EndOfLog;
}
pg_atomic_write_u64(&XLogCtl->InitializeReserved, pg_atomic_read_u64(&XLogCtl->InitializedUpTo));
/*
* Update local and shared status. This is OK to do without any locks

@ -156,7 +156,6 @@ REPLICATION_SLOT_DROP "Waiting for a replication slot to become inactive so it c
RESTORE_COMMAND "Waiting for <xref linkend="guc-restore-command"/> to complete."
SAFE_SNAPSHOT "Waiting to obtain a valid snapshot for a <literal>READ ONLY DEFERRABLE</literal> transaction."
SYNC_REP "Waiting for confirmation from a remote server during synchronous replication."
WAL_BUFFER_INIT "Waiting on WAL buffer to be initialized."
WAL_RECEIVER_EXIT "Waiting for the WAL receiver to exit."
WAL_RECEIVER_WAIT_START "Waiting for startup process to send initial data for streaming replication."
WAL_SUMMARY_READY "Waiting for a new WAL summary to be generated."
@ -316,6 +315,7 @@ XidGen "Waiting to allocate a new transaction ID."
ProcArray "Waiting to access the shared per-process data structures (typically, to get a snapshot or report a session's transaction ID)."
SInvalRead "Waiting to retrieve messages from the shared catalog invalidation queue."
SInvalWrite "Waiting to add a message to the shared catalog invalidation queue."
WALBufMapping "Waiting to replace a page in WAL buffers."
WALWrite "Waiting for WAL buffers to be written to disk."
ControlFile "Waiting to read or update the <filename>pg_control</filename> file or create a new WAL file."
MultiXactGen "Waiting to read or update shared multixact state."

@ -37,7 +37,7 @@ PG_LWLOCK(3, XidGen)
PG_LWLOCK(4, ProcArray)
PG_LWLOCK(5, SInvalRead)
PG_LWLOCK(6, SInvalWrite)
/* 7 was WALBufMapping */
PG_LWLOCK(7, WALBufMapping)
PG_LWLOCK(8, WALWrite)
PG_LWLOCK(9, ControlFile)
/* 10 was CheckpointLock */

Loading…
Cancel
Save