mirror of https://github.com/postgres/postgres
This commit revertspull/159/head06c418e163
,e37662f221
,bf1e650806
,25f42429e2
,ee79928441
, and74eaf66f98
per review by Heikki Linnakangas. Discussion: https://postgr.es/m/b155606b-e744-4218-bda5-29379779da1a%40iki.fi
parent
922c4c461d
commit
772faafca1
@ -1,337 +0,0 @@ |
||||
/*-------------------------------------------------------------------------
|
||||
* |
||||
* waitlsn.c |
||||
* Implements waiting for the given replay LSN, which is used in |
||||
* CALL pg_wal_replay_wait(target_lsn pg_lsn, timeout float8). |
||||
* |
||||
* Copyright (c) 2024, PostgreSQL Global Development Group |
||||
* |
||||
* IDENTIFICATION |
||||
* src/backend/commands/waitlsn.c |
||||
* |
||||
*------------------------------------------------------------------------- |
||||
*/ |
||||
|
||||
#include "postgres.h" |
||||
|
||||
#include <float.h> |
||||
#include <math.h> |
||||
|
||||
#include "pgstat.h" |
||||
#include "access/xlog.h" |
||||
#include "access/xlogrecovery.h" |
||||
#include "commands/waitlsn.h" |
||||
#include "funcapi.h" |
||||
#include "miscadmin.h" |
||||
#include "storage/latch.h" |
||||
#include "storage/proc.h" |
||||
#include "storage/shmem.h" |
||||
#include "utils/fmgrprotos.h" |
||||
#include "utils/pg_lsn.h" |
||||
#include "utils/snapmgr.h" |
||||
#include "utils/wait_event_types.h" |
||||
|
||||
static int lsn_cmp(const pairingheap_node *a, const pairingheap_node *b, |
||||
void *arg); |
||||
|
||||
struct WaitLSNState *waitLSN = NULL; |
||||
|
||||
/* Report the amount of shared memory space needed for WaitLSNState. */ |
||||
Size |
||||
WaitLSNShmemSize(void) |
||||
{ |
||||
Size size; |
||||
|
||||
size = offsetof(WaitLSNState, procInfos); |
||||
size = add_size(size, mul_size(MaxBackends, sizeof(WaitLSNProcInfo))); |
||||
return size; |
||||
} |
||||
|
||||
/* Initialize the WaitLSNState in the shared memory. */ |
||||
void |
||||
WaitLSNShmemInit(void) |
||||
{ |
||||
bool found; |
||||
|
||||
waitLSN = (WaitLSNState *) ShmemInitStruct("WaitLSNState", |
||||
WaitLSNShmemSize(), |
||||
&found); |
||||
if (!found) |
||||
{ |
||||
pg_atomic_init_u64(&waitLSN->minWaitedLSN, PG_UINT64_MAX); |
||||
pairingheap_initialize(&waitLSN->waitersHeap, lsn_cmp, NULL); |
||||
memset(&waitLSN->procInfos, 0, MaxBackends * sizeof(WaitLSNProcInfo)); |
||||
} |
||||
} |
||||
|
||||
/*
|
||||
* Comparison function for waitLSN->waitersHeap heap. Waiting processes are |
||||
* ordered by lsn, so that the waiter with smallest lsn is at the top. |
||||
*/ |
||||
static int |
||||
lsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg) |
||||
{ |
||||
const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, phNode, a); |
||||
const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, phNode, b); |
||||
|
||||
if (aproc->waitLSN < bproc->waitLSN) |
||||
return 1; |
||||
else if (aproc->waitLSN > bproc->waitLSN) |
||||
return -1; |
||||
else |
||||
return 0; |
||||
} |
||||
|
||||
/*
|
||||
* Update waitLSN->minWaitedLSN according to the current state of |
||||
* waitLSN->waitersHeap. |
||||
*/ |
||||
static void |
||||
updateMinWaitedLSN(void) |
||||
{ |
||||
XLogRecPtr minWaitedLSN = PG_UINT64_MAX; |
||||
|
||||
if (!pairingheap_is_empty(&waitLSN->waitersHeap)) |
||||
{ |
||||
pairingheap_node *node = pairingheap_first(&waitLSN->waitersHeap); |
||||
|
||||
minWaitedLSN = pairingheap_container(WaitLSNProcInfo, phNode, node)->waitLSN; |
||||
} |
||||
|
||||
pg_atomic_write_u64(&waitLSN->minWaitedLSN, minWaitedLSN); |
||||
} |
||||
|
||||
/*
|
||||
* Put the current process into the heap of LSN waiters. |
||||
*/ |
||||
static void |
||||
addLSNWaiter(XLogRecPtr lsn) |
||||
{ |
||||
WaitLSNProcInfo *procInfo = &waitLSN->procInfos[MyProcNumber]; |
||||
|
||||
LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); |
||||
|
||||
Assert(!procInfo->inHeap); |
||||
|
||||
procInfo->procnum = MyProcNumber; |
||||
procInfo->waitLSN = lsn; |
||||
|
||||
pairingheap_add(&waitLSN->waitersHeap, &procInfo->phNode); |
||||
procInfo->inHeap = true; |
||||
updateMinWaitedLSN(); |
||||
|
||||
LWLockRelease(WaitLSNLock); |
||||
} |
||||
|
||||
/*
|
||||
* Remove the current process from the heap of LSN waiters if it's there. |
||||
*/ |
||||
static void |
||||
deleteLSNWaiter(void) |
||||
{ |
||||
WaitLSNProcInfo *procInfo = &waitLSN->procInfos[MyProcNumber]; |
||||
|
||||
LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); |
||||
|
||||
if (!procInfo->inHeap) |
||||
{ |
||||
LWLockRelease(WaitLSNLock); |
||||
return; |
||||
} |
||||
|
||||
pairingheap_remove(&waitLSN->waitersHeap, &procInfo->phNode); |
||||
procInfo->inHeap = false; |
||||
updateMinWaitedLSN(); |
||||
|
||||
LWLockRelease(WaitLSNLock); |
||||
} |
||||
|
||||
/*
|
||||
* Set latches of LSN waiters whose LSN has been replayed. Set latches of all |
||||
* LSN waiters when InvalidXLogRecPtr is given. |
||||
*/ |
||||
void |
||||
WaitLSNSetLatches(XLogRecPtr currentLSN) |
||||
{ |
||||
int i; |
||||
int *wakeUpProcNums; |
||||
int numWakeUpProcs = 0; |
||||
|
||||
wakeUpProcNums = palloc(sizeof(int) * MaxBackends); |
||||
|
||||
LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); |
||||
|
||||
/*
|
||||
* Iterate the pairing heap of waiting processes till we find LSN not yet |
||||
* replayed. Record the process numbers to set their latches later. |
||||
*/ |
||||
while (!pairingheap_is_empty(&waitLSN->waitersHeap)) |
||||
{ |
||||
pairingheap_node *node = pairingheap_first(&waitLSN->waitersHeap); |
||||
WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, phNode, node); |
||||
|
||||
if (!XLogRecPtrIsInvalid(currentLSN) && |
||||
procInfo->waitLSN > currentLSN) |
||||
break; |
||||
|
||||
wakeUpProcNums[numWakeUpProcs++] = procInfo->procnum; |
||||
(void) pairingheap_remove_first(&waitLSN->waitersHeap); |
||||
procInfo->inHeap = false; |
||||
} |
||||
|
||||
updateMinWaitedLSN(); |
||||
|
||||
LWLockRelease(WaitLSNLock); |
||||
|
||||
/*
|
||||
* Set latches for processes, whose waited LSNs are already replayed. This |
||||
* involves spinlocks. So, we shouldn't do this under a spinlock. |
||||
*/ |
||||
for (i = 0; i < numWakeUpProcs; i++) |
||||
{ |
||||
PGPROC *backend; |
||||
|
||||
backend = GetPGProcByNumber(wakeUpProcNums[i]); |
||||
SetLatch(&backend->procLatch); |
||||
} |
||||
pfree(wakeUpProcNums); |
||||
} |
||||
|
||||
/*
|
||||
* Delete our item from shmem array if any. |
||||
*/ |
||||
void |
||||
WaitLSNCleanup(void) |
||||
{ |
||||
/*
|
||||
* We do a fast-path check of the 'inHeap' flag without the lock. This |
||||
* flag is set to true only by the process itself. So, it's only possible |
||||
* to get a false positive. But that will be eliminated by a recheck |
||||
* inside deleteLSNWaiter(). |
||||
*/ |
||||
if (waitLSN->procInfos[MyProcNumber].inHeap) |
||||
deleteLSNWaiter(); |
||||
} |
||||
|
||||
/*
|
||||
* Wait using MyLatch till the given LSN is replayed, the postmaster dies or |
||||
* timeout happens. |
||||
*/ |
||||
void |
||||
WaitForLSN(XLogRecPtr targetLSN, int64 timeout) |
||||
{ |
||||
XLogRecPtr currentLSN; |
||||
TimestampTz endtime = 0; |
||||
|
||||
/* Shouldn't be called when shmem isn't initialized */ |
||||
Assert(waitLSN); |
||||
|
||||
/* Should be only called by a backend */ |
||||
Assert(MyBackendType == B_BACKEND && MyProcNumber <= MaxBackends); |
||||
|
||||
if (!RecoveryInProgress()) |
||||
ereport(ERROR, |
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
||||
errmsg("recovery is not in progress"), |
||||
errhint("Waiting for LSN can only be executed during recovery."))); |
||||
|
||||
/* If target LSN is already replayed, exit immediately */ |
||||
if (targetLSN <= GetXLogReplayRecPtr(NULL)) |
||||
return; |
||||
|
||||
if (timeout > 0) |
||||
endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout); |
||||
|
||||
addLSNWaiter(targetLSN); |
||||
|
||||
for (;;) |
||||
{ |
||||
int rc; |
||||
int latch_events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH; |
||||
long delay_ms = 0; |
||||
|
||||
/* Check if the waited LSN has been replayed */ |
||||
currentLSN = GetXLogReplayRecPtr(NULL); |
||||
if (targetLSN <= currentLSN) |
||||
break; |
||||
|
||||
/* Recheck that recovery is still in-progress */ |
||||
if (!RecoveryInProgress()) |
||||
ereport(ERROR, |
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
||||
errmsg("recovery is not in progress"), |
||||
errdetail("Recovery ended before replaying the target LSN %X/%X; last replay LSN %X/%X.", |
||||
LSN_FORMAT_ARGS(targetLSN), |
||||
LSN_FORMAT_ARGS(currentLSN)))); |
||||
|
||||
if (timeout > 0) |
||||
{ |
||||
delay_ms = (endtime - GetCurrentTimestamp()) / 1000; |
||||
latch_events |= WL_TIMEOUT; |
||||
if (delay_ms <= 0) |
||||
break; |
||||
} |
||||
|
||||
CHECK_FOR_INTERRUPTS(); |
||||
|
||||
rc = WaitLatch(MyLatch, latch_events, delay_ms, |
||||
WAIT_EVENT_WAIT_FOR_WAL_REPLAY); |
||||
|
||||
if (rc & WL_LATCH_SET) |
||||
ResetLatch(MyLatch); |
||||
} |
||||
|
||||
if (targetLSN > currentLSN) |
||||
{ |
||||
deleteLSNWaiter(); |
||||
ereport(ERROR, |
||||
(errcode(ERRCODE_QUERY_CANCELED), |
||||
errmsg("timed out while waiting for target LSN %X/%X to be replayed; current replay LSN %X/%X", |
||||
LSN_FORMAT_ARGS(targetLSN), |
||||
LSN_FORMAT_ARGS(currentLSN)))); |
||||
} |
||||
} |
||||
|
||||
Datum |
||||
pg_wal_replay_wait(PG_FUNCTION_ARGS) |
||||
{ |
||||
XLogRecPtr target_lsn = PG_GETARG_LSN(0); |
||||
int64 timeout = PG_GETARG_INT64(1); |
||||
CallContext *context = (CallContext *) fcinfo->context; |
||||
|
||||
if (timeout < 0) |
||||
ereport(ERROR, |
||||
(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), |
||||
errmsg("\"timeout\" must not be negative"))); |
||||
|
||||
/*
|
||||
* We are going to wait for the LSN replay. We should first care that we |
||||
* don't hold a snapshot and correspondingly our MyProc->xmin is invalid. |
||||
* Otherwise, our snapshot could prevent the replay of WAL records |
||||
* implying a kind of self-deadlock. This is the reason why |
||||
* pg_wal_replay_wait() is a procedure, not a function. |
||||
* |
||||
* At first, we check that pg_wal_replay_wait() is called in a non-atomic |
||||
* context. That is, a procedure call isn't wrapped into a transaction, |
||||
* another procedure call, or a function call. |
||||
* |
||||
* Secondly, according to PlannedStmtRequiresSnapshot(), even in an atomic |
||||
* context, CallStmt is processed with a snapshot. Thankfully, we can pop |
||||
* this snapshot, because PortalRunUtility() can tolerate this. |
||||
*/ |
||||
if (context->atomic) |
||||
ereport(ERROR, |
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
||||
errmsg("pg_wal_replay_wait() must be only called in non-atomic context"), |
||||
errdetail("Make sure pg_wal_replay_wait() isn't called within a transaction, another procedure, or a function."))); |
||||
|
||||
if (ActiveSnapshotSet()) |
||||
PopActiveSnapshot(); |
||||
Assert(!ActiveSnapshotSet()); |
||||
InvalidateCatalogSnapshot(); |
||||
Assert(MyProc->xmin == InvalidTransactionId); |
||||
|
||||
(void) WaitForLSN(target_lsn, timeout); |
||||
|
||||
PG_RETURN_VOID(); |
||||
} |
@ -1,77 +0,0 @@ |
||||
/*-------------------------------------------------------------------------
|
||||
* |
||||
* waitlsn.h |
||||
* Declarations for LSN replay waiting routines. |
||||
* |
||||
* Copyright (c) 2024, PostgreSQL Global Development Group |
||||
* |
||||
* src/include/commands/waitlsn.h |
||||
* |
||||
*------------------------------------------------------------------------- |
||||
*/ |
||||
#ifndef WAIT_LSN_H |
||||
#define WAIT_LSN_H |
||||
|
||||
#include "lib/pairingheap.h" |
||||
#include "postgres.h" |
||||
#include "port/atomics.h" |
||||
#include "storage/spin.h" |
||||
#include "tcop/dest.h" |
||||
|
||||
/*
|
||||
* WaitLSNProcInfo – the shared memory structure representing information |
||||
* about the single process, which may wait for LSN replay. An item of |
||||
* waitLSN->procInfos array. |
||||
*/ |
||||
typedef struct WaitLSNProcInfo |
||||
{ |
||||
/*
|
||||
* A process number, same as the index of this item in waitLSN->procInfos. |
||||
* Stored for convenience. |
||||
*/ |
||||
int procnum; |
||||
|
||||
/* LSN, which this process is waiting for */ |
||||
XLogRecPtr waitLSN; |
||||
|
||||
/* A pairing heap node for participation in waitLSN->waitersHeap */ |
||||
pairingheap_node phNode; |
||||
|
||||
/* A flag indicating that this item is added to waitLSN->waitersHeap */ |
||||
bool inHeap; |
||||
} WaitLSNProcInfo; |
||||
|
||||
/*
|
||||
* WaitLSNState - the shared memory state for the replay LSN waiting facility. |
||||
*/ |
||||
typedef struct WaitLSNState |
||||
{ |
||||
/*
|
||||
* The minimum LSN value some process is waiting for. Used for the |
||||
* fast-path checking if we need to wake up any waiters after replaying a |
||||
* WAL record. Could be read lock-less. Update protected by WaitLSNLock. |
||||
*/ |
||||
pg_atomic_uint64 minWaitedLSN; |
||||
|
||||
/*
|
||||
* A pairing heap of waiting processes order by LSN values (least LSN is |
||||
* on top). Protected by WaitLSNLock. |
||||
*/ |
||||
pairingheap waitersHeap; |
||||
|
||||
/*
|
||||
* An array with per-process information, indexed by the process number. |
||||
* Protected by WaitLSNLock. |
||||
*/ |
||||
WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER]; |
||||
} WaitLSNState; |
||||
|
||||
extern PGDLLIMPORT struct WaitLSNState *waitLSN; |
||||
|
||||
extern void WaitForLSN(XLogRecPtr targetLSN, int64 timeout); |
||||
extern Size WaitLSNShmemSize(void); |
||||
extern void WaitLSNShmemInit(void); |
||||
extern void WaitLSNSetLatches(XLogRecPtr currentLSN); |
||||
extern void WaitLSNCleanup(void); |
||||
|
||||
#endif /* WAIT_LSN_H */ |
@ -1,97 +0,0 @@ |
||||
# Checks waiting for the lsn replay on standby using |
||||
# pg_wal_replay_wait() procedure. |
||||
use strict; |
||||
use warnings; |
||||
|
||||
use PostgreSQL::Test::Cluster; |
||||
use PostgreSQL::Test::Utils; |
||||
use Test::More; |
||||
|
||||
# Initialize primary node |
||||
my $node_primary = PostgreSQL::Test::Cluster->new('primary'); |
||||
$node_primary->init(allows_streaming => 1); |
||||
$node_primary->start; |
||||
|
||||
# And some content and take a backup |
||||
$node_primary->safe_psql('postgres', |
||||
"CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a"); |
||||
my $backup_name = 'my_backup'; |
||||
$node_primary->backup($backup_name); |
||||
|
||||
# Create a streaming standby with a 1 second delay from the backup |
||||
my $node_standby = PostgreSQL::Test::Cluster->new('standby'); |
||||
my $delay = 1; |
||||
$node_standby->init_from_backup($node_primary, $backup_name, |
||||
has_streaming => 1); |
||||
$node_standby->append_conf( |
||||
'postgresql.conf', qq[ |
||||
recovery_min_apply_delay = '${delay}s' |
||||
]); |
||||
$node_standby->start; |
||||
|
||||
|
||||
# Make sure that pg_wal_replay_wait() works: add new content to |
||||
# primary and memorize primary's insert LSN, then wait for that LSN to be |
||||
# replayed on standby. |
||||
$node_primary->safe_psql('postgres', |
||||
"INSERT INTO wait_test VALUES (generate_series(11, 20))"); |
||||
my $lsn1 = |
||||
$node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); |
||||
my $output = $node_standby->safe_psql( |
||||
'postgres', qq[ |
||||
CALL pg_wal_replay_wait('${lsn1}', 1000000); |
||||
SELECT pg_lsn_cmp(pg_last_wal_replay_lsn(), '${lsn1}'::pg_lsn); |
||||
]); |
||||
|
||||
# Make sure the current LSN on standby is at least as big as the LSN we |
||||
# observed on primary's before. |
||||
ok($output >= 0, |
||||
"standby reached the same LSN as primary after pg_wal_replay_wait()"); |
||||
|
||||
# Check that new data is visible after calling pg_wal_replay_wait() |
||||
$node_primary->safe_psql('postgres', |
||||
"INSERT INTO wait_test VALUES (generate_series(21, 30))"); |
||||
my $lsn2 = |
||||
$node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); |
||||
$output = $node_standby->safe_psql( |
||||
'postgres', qq[ |
||||
CALL pg_wal_replay_wait('${lsn2}'); |
||||
SELECT count(*) FROM wait_test; |
||||
]); |
||||
|
||||
# Make sure the current LSN on standby and is the same as primary's LSN |
||||
ok($output eq 30, "standby reached the same LSN as primary"); |
||||
|
||||
# Check that waiting for unreachable LSN triggers the timeout. The |
||||
# unreachable LSN must be well in advance. So WAL records issued by |
||||
# the concurrent autovacuum could not affect that. |
||||
my $lsn3 = |
||||
$node_primary->safe_psql('postgres', |
||||
"SELECT pg_current_wal_insert_lsn() + 10000000000"); |
||||
my $stderr; |
||||
$node_standby->safe_psql('postgres', |
||||
"CALL pg_wal_replay_wait('${lsn2}', 10);"); |
||||
$node_standby->psql( |
||||
'postgres', |
||||
"CALL pg_wal_replay_wait('${lsn3}', 1000);", |
||||
stderr => \$stderr); |
||||
ok( $stderr =~ /timed out while waiting for target LSN/, |
||||
"get timeout on waiting for unreachable LSN"); |
||||
|
||||
# Check that the standby promotion terminates the wait on LSN. Start |
||||
# waiting for unreachable LSN then promote. Check the log for the relevant |
||||
# error message. |
||||
my $psql_session = $node_standby->background_psql('postgres'); |
||||
$psql_session->query_until( |
||||
qr/start/, qq[ |
||||
\\echo start |
||||
CALL pg_wal_replay_wait('${lsn3}'); |
||||
]); |
||||
|
||||
my $log_offset = -s $node_standby->logfile; |
||||
$node_standby->promote; |
||||
$node_standby->wait_for_log('recovery is not in progress', $log_offset); |
||||
|
||||
$node_standby->stop; |
||||
$node_primary->stop; |
||||
done_testing(); |
Loading…
Reference in new issue