mirror of https://github.com/postgres/postgres
pg_wal_replay_wait() is to be used on standby and specifies waiting for the specific WAL location to be replayed before starting the transaction. This option is useful when the user makes some data changes on primary and needs a guarantee to see these changes on standby. The queue of waiters is stored in the shared memory array sorted by LSN. During replay of WAL waiters whose LSNs are already replayed are deleted from the shared memory array and woken up by setting of their latches. pg_wal_replay_wait() needs to wait without any snapshot held. Otherwise, the snapshot could prevent the replay of WAL records implying a kind of self-deadlock. This is why it is only possible to implement pg_wal_replay_wait() as a procedure working in a non-atomic context, not a function. Catversion is bumped. Discussion: https://postgr.es/m/eb12f9b03851bb2583adab5df9579b4b%40postgrespro.ru Author: Kartyshov Ivan, Alexander Korotkov Reviewed-by: Michael Paquier, Peter Eisentraut, Dilip Kumar, Amit Kapila Reviewed-by: Alexander Lakhin, Bharath Rupireddy, Euler Taveirapull/159/head
parent
6faca9ae28
commit
06c418e163
@ -0,0 +1,348 @@ |
||||
/*-------------------------------------------------------------------------
|
||||
* |
||||
* waitlsn.c |
||||
* Implements waiting for the given LSN, which is used in |
||||
* CALL pg_wal_replay_wait(target_lsn pg_lsn, timeout float8). |
||||
* |
||||
* Copyright (c) 2024, PostgreSQL Global Development Group |
||||
* |
||||
* IDENTIFICATION |
||||
* src/backend/commands/waitlsn.c |
||||
* |
||||
*------------------------------------------------------------------------- |
||||
*/ |
||||
|
||||
#include "postgres.h" |
||||
|
||||
#include <float.h> |
||||
#include <math.h> |
||||
|
||||
#include "pgstat.h" |
||||
#include "fmgr.h" |
||||
#include "access/transam.h" |
||||
#include "access/xact.h" |
||||
#include "access/xlog.h" |
||||
#include "access/xlogdefs.h" |
||||
#include "access/xlogrecovery.h" |
||||
#include "catalog/pg_type.h" |
||||
#include "commands/waitlsn.h" |
||||
#include "executor/spi.h" |
||||
#include "funcapi.h" |
||||
#include "miscadmin.h" |
||||
#include "storage/ipc.h" |
||||
#include "storage/latch.h" |
||||
#include "storage/pmsignal.h" |
||||
#include "storage/proc.h" |
||||
#include "storage/shmem.h" |
||||
#include "storage/sinvaladt.h" |
||||
#include "utils/builtins.h" |
||||
#include "utils/pg_lsn.h" |
||||
#include "utils/snapmgr.h" |
||||
#include "utils/timestamp.h" |
||||
#include "utils/fmgrprotos.h" |
||||
|
||||
/* Add to / delete from shared memory array */ |
||||
static void addLSNWaiter(XLogRecPtr lsn); |
||||
static void deleteLSNWaiter(void); |
||||
|
||||
struct WaitLSNState *waitLSN = NULL; |
||||
static volatile sig_atomic_t haveShmemItem = false; |
||||
|
||||
/*
|
||||
* Report the amount of shared memory space needed for WaitLSNState |
||||
*/ |
||||
Size |
||||
WaitLSNShmemSize(void) |
||||
{ |
||||
Size size; |
||||
|
||||
size = offsetof(WaitLSNState, procInfos); |
||||
size = add_size(size, mul_size(MaxBackends, sizeof(WaitLSNProcInfo))); |
||||
return size; |
||||
} |
||||
|
||||
/* Initialize the WaitLSNState in the shared memory */ |
||||
void |
||||
WaitLSNShmemInit(void) |
||||
{ |
||||
bool found; |
||||
|
||||
waitLSN = (WaitLSNState *) ShmemInitStruct("WaitLSNState", |
||||
WaitLSNShmemSize(), |
||||
&found); |
||||
if (!found) |
||||
{ |
||||
SpinLockInit(&waitLSN->mutex); |
||||
waitLSN->numWaitedProcs = 0; |
||||
pg_atomic_init_u64(&waitLSN->minLSN, PG_UINT64_MAX); |
||||
} |
||||
} |
||||
|
||||
/*
|
||||
* Add the information about the LSN waiter backend to the shared memory |
||||
* array. |
||||
*/ |
||||
static void |
||||
addLSNWaiter(XLogRecPtr lsn) |
||||
{ |
||||
WaitLSNProcInfo cur; |
||||
int i; |
||||
|
||||
SpinLockAcquire(&waitLSN->mutex); |
||||
|
||||
cur.procnum = MyProcNumber; |
||||
cur.waitLSN = lsn; |
||||
|
||||
for (i = 0; i < waitLSN->numWaitedProcs; i++) |
||||
{ |
||||
if (waitLSN->procInfos[i].waitLSN >= cur.waitLSN) |
||||
{ |
||||
WaitLSNProcInfo tmp; |
||||
|
||||
tmp = waitLSN->procInfos[i]; |
||||
waitLSN->procInfos[i] = cur; |
||||
cur = tmp; |
||||
} |
||||
} |
||||
waitLSN->procInfos[i] = cur; |
||||
waitLSN->numWaitedProcs++; |
||||
|
||||
pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN); |
||||
SpinLockRelease(&waitLSN->mutex); |
||||
} |
||||
|
||||
/*
|
||||
* Delete the information about the LSN waiter backend from the shared memory |
||||
* array. |
||||
*/ |
||||
static void |
||||
deleteLSNWaiter(void) |
||||
{ |
||||
int i; |
||||
bool found = false; |
||||
|
||||
SpinLockAcquire(&waitLSN->mutex); |
||||
|
||||
for (i = 0; i < waitLSN->numWaitedProcs; i++) |
||||
{ |
||||
if (waitLSN->procInfos[i].procnum == MyProcNumber) |
||||
found = true; |
||||
|
||||
if (found && i < waitLSN->numWaitedProcs - 1) |
||||
{ |
||||
waitLSN->procInfos[i] = waitLSN->procInfos[i + 1]; |
||||
} |
||||
} |
||||
|
||||
if (!found) |
||||
{ |
||||
SpinLockRelease(&waitLSN->mutex); |
||||
return; |
||||
} |
||||
waitLSN->numWaitedProcs--; |
||||
|
||||
if (waitLSN->numWaitedProcs != 0) |
||||
pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN); |
||||
else |
||||
pg_atomic_write_u64(&waitLSN->minLSN, PG_UINT64_MAX); |
||||
|
||||
SpinLockRelease(&waitLSN->mutex); |
||||
} |
||||
|
||||
/*
|
||||
* Set latches of LSN waiters whose LSN has been replayed. Set latches of all |
||||
* LSN waiters when InvalidXLogRecPtr is given. |
||||
*/ |
||||
void |
||||
WaitLSNSetLatches(XLogRecPtr currentLSN) |
||||
{ |
||||
int i; |
||||
int *wakeUpProcNums; |
||||
int numWakeUpProcs; |
||||
|
||||
wakeUpProcNums = palloc(sizeof(int) * MaxBackends); |
||||
|
||||
SpinLockAcquire(&waitLSN->mutex); |
||||
|
||||
/*
|
||||
* Remember processes, whose waited LSNs are already replayed. We should |
||||
* set their latches later after spinlock release. |
||||
*/ |
||||
for (i = 0; i < waitLSN->numWaitedProcs; i++) |
||||
{ |
||||
if (!XLogRecPtrIsInvalid(currentLSN) && |
||||
waitLSN->procInfos[i].waitLSN > currentLSN) |
||||
break; |
||||
|
||||
wakeUpProcNums[i] = waitLSN->procInfos[i].procnum; |
||||
} |
||||
|
||||
/*
|
||||
* Immediately remove those processes from the shmem array. Otherwise, |
||||
* shmem array items will be here till corresponding processes wake up and |
||||
* delete themselves. |
||||
*/ |
||||
numWakeUpProcs = i; |
||||
for (i = 0; i < waitLSN->numWaitedProcs - numWakeUpProcs; i++) |
||||
waitLSN->procInfos[i] = waitLSN->procInfos[i + numWakeUpProcs]; |
||||
waitLSN->numWaitedProcs -= numWakeUpProcs; |
||||
|
||||
if (waitLSN->numWaitedProcs != 0) |
||||
pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN); |
||||
else |
||||
pg_atomic_write_u64(&waitLSN->minLSN, PG_UINT64_MAX); |
||||
|
||||
SpinLockRelease(&waitLSN->mutex); |
||||
|
||||
/*
|
||||
* Set latches for processes, whose waited LSNs are already replayed. This |
||||
* involves spinlocks. So, we shouldn't do this under a spinlock. |
||||
*/ |
||||
for (i = 0; i < numWakeUpProcs; i++) |
||||
{ |
||||
PGPROC *backend; |
||||
|
||||
backend = GetPGProcByNumber(wakeUpProcNums[i]); |
||||
SetLatch(&backend->procLatch); |
||||
} |
||||
pfree(wakeUpProcNums); |
||||
} |
||||
|
||||
/*
|
||||
* Delete our item from shmem array if any. |
||||
*/ |
||||
void |
||||
WaitLSNCleanup(void) |
||||
{ |
||||
if (haveShmemItem) |
||||
deleteLSNWaiter(); |
||||
} |
||||
|
||||
/*
|
||||
* Wait using MyLatch till the given LSN is replayed, the postmaster dies or |
||||
* timeout happens. |
||||
*/ |
||||
void |
||||
WaitForLSN(XLogRecPtr targetLSN, int64 timeout) |
||||
{ |
||||
XLogRecPtr currentLSN; |
||||
TimestampTz endtime; |
||||
|
||||
/* Shouldn't be called when shmem isn't initialized */ |
||||
Assert(waitLSN); |
||||
|
||||
/* Should be only called by a backend */ |
||||
Assert(MyBackendType == B_BACKEND); |
||||
|
||||
if (!RecoveryInProgress()) |
||||
ereport(ERROR, |
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
||||
errmsg("recovery is not in progress"), |
||||
errhint("Waiting for LSN can only be executed during recovery."))); |
||||
|
||||
/* If target LSN is already replayed, exit immediately */ |
||||
if (targetLSN <= GetXLogReplayRecPtr(NULL)) |
||||
return; |
||||
|
||||
if (timeout > 0) |
||||
endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout); |
||||
|
||||
addLSNWaiter(targetLSN); |
||||
haveShmemItem = true; |
||||
|
||||
for (;;) |
||||
{ |
||||
int rc; |
||||
int latch_events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH; |
||||
long delay_ms = 0; |
||||
|
||||
/* Check if the waited LSN has been replayed */ |
||||
currentLSN = GetXLogReplayRecPtr(NULL); |
||||
if (targetLSN <= currentLSN) |
||||
break; |
||||
|
||||
/* Recheck that recovery is still in-progress */ |
||||
if (!RecoveryInProgress()) |
||||
ereport(ERROR, |
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
||||
errmsg("recovery is not in progress"), |
||||
errdetail("Recovery ended before replaying the target LSN %X/%X; last replay LSN %X/%X.", |
||||
LSN_FORMAT_ARGS(targetLSN), |
||||
LSN_FORMAT_ARGS(currentLSN)))); |
||||
|
||||
if (timeout > 0) |
||||
{ |
||||
delay_ms = (endtime - GetCurrentTimestamp()) / 1000; |
||||
latch_events |= WL_TIMEOUT; |
||||
if (delay_ms <= 0) |
||||
break; |
||||
} |
||||
|
||||
CHECK_FOR_INTERRUPTS(); |
||||
|
||||
rc = WaitLatch(MyLatch, latch_events, delay_ms, |
||||
WAIT_EVENT_WAIT_FOR_WAL_REPLAY); |
||||
|
||||
if (rc & WL_LATCH_SET) |
||||
ResetLatch(MyLatch); |
||||
} |
||||
|
||||
if (targetLSN > currentLSN) |
||||
{ |
||||
deleteLSNWaiter(); |
||||
haveShmemItem = false; |
||||
ereport(ERROR, |
||||
(errcode(ERRCODE_QUERY_CANCELED), |
||||
errmsg("timed out while waiting for target LSN %X/%X to be replayed; current replay LSN %X/%X", |
||||
LSN_FORMAT_ARGS(targetLSN), |
||||
LSN_FORMAT_ARGS(currentLSN)))); |
||||
} |
||||
else |
||||
{ |
||||
haveShmemItem = false; |
||||
} |
||||
} |
||||
|
||||
Datum |
||||
pg_wal_replay_wait(PG_FUNCTION_ARGS) |
||||
{ |
||||
XLogRecPtr target_lsn = PG_GETARG_LSN(0); |
||||
int64 timeout = PG_GETARG_INT64(1); |
||||
CallContext *context = (CallContext *) fcinfo->context; |
||||
|
||||
if (timeout < 0) |
||||
ereport(ERROR, |
||||
(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), |
||||
errmsg("\"timeout\" must not be negative"))); |
||||
|
||||
/*
|
||||
* We are going to wait for the LSN replay. We should first care that we |
||||
* don't hold a snapshot and correspondingly our MyProc->xmin is invalid. |
||||
* Otherwise, our snapshot could prevent the replay of WAL records |
||||
* implying a kind of self-deadlock. This is the reason why |
||||
* pg_wal_replay_wait() is a procedure, not a function. |
||||
* |
||||
* At first, we check that pg_wal_replay_wait() is called in a non-atomic |
||||
* context. That is, a procedure call isn't wrapped into a transaction, |
||||
* another procedure call, or a function call. |
||||
* |
||||
* Secondly, according to PlannedStmtRequiresSnapshot(), even in an atomic |
||||
* context, CallStmt is processed with a snapshot. Thankfully, we can pop |
||||
* this snapshot, because PortalRunUtility() can tolerate this. |
||||
*/ |
||||
if (context->atomic) |
||||
ereport(ERROR, |
||||
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
||||
errmsg("pg_wal_replay_wait() must be only called in non-atomic context"), |
||||
errdetail("Make sure pg_wal_replay_wait() isn't called within a transaction, another procedure, or a function."))); |
||||
|
||||
if (ActiveSnapshotSet()) |
||||
PopActiveSnapshot(); |
||||
Assert(!ActiveSnapshotSet()); |
||||
InvalidateCatalogSnapshot(); |
||||
Assert(MyProc->xmin == InvalidTransactionId); |
||||
|
||||
(void) WaitForLSN(target_lsn, timeout); |
||||
|
||||
PG_RETURN_VOID(); |
||||
} |
@ -0,0 +1,43 @@ |
||||
/*-------------------------------------------------------------------------
|
||||
* |
||||
* waitlsn.h |
||||
* Declarations for LSN waiting routines. |
||||
* |
||||
* Copyright (c) 2024, PostgreSQL Global Development Group |
||||
* |
||||
* src/include/commands/waitlsn.h |
||||
* |
||||
*------------------------------------------------------------------------- |
||||
*/ |
||||
#ifndef WAIT_LSN_H |
||||
#define WAIT_LSN_H |
||||
|
||||
#include "postgres.h" |
||||
#include "port/atomics.h" |
||||
#include "storage/spin.h" |
||||
#include "tcop/dest.h" |
||||
|
||||
/* Shared memory structures */ |
||||
typedef struct WaitLSNProcInfo |
||||
{ |
||||
int procnum; |
||||
XLogRecPtr waitLSN; |
||||
} WaitLSNProcInfo; |
||||
|
||||
typedef struct WaitLSNState |
||||
{ |
||||
pg_atomic_uint64 minLSN; |
||||
slock_t mutex; |
||||
int numWaitedProcs; |
||||
WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER]; |
||||
} WaitLSNState; |
||||
|
||||
extern PGDLLIMPORT struct WaitLSNState *waitLSN; |
||||
|
||||
extern void WaitForLSN(XLogRecPtr targetLSN, int64 timeout); |
||||
extern Size WaitLSNShmemSize(void); |
||||
extern void WaitLSNShmemInit(void); |
||||
extern void WaitLSNSetLatches(XLogRecPtr currentLSN); |
||||
extern void WaitLSNCleanup(void); |
||||
|
||||
#endif /* WAIT_LSN_H */ |
@ -0,0 +1,97 @@ |
||||
# Checks waiting for the lsn replay on standby using |
||||
# pg_wal_replay_wait() procedure. |
||||
use strict; |
||||
use warnings; |
||||
|
||||
use PostgreSQL::Test::Cluster; |
||||
use PostgreSQL::Test::Utils; |
||||
use Test::More; |
||||
|
||||
# Initialize primary node |
||||
my $node_primary = PostgreSQL::Test::Cluster->new('primary'); |
||||
$node_primary->init(allows_streaming => 1); |
||||
$node_primary->start; |
||||
|
||||
# And some content and take a backup |
||||
$node_primary->safe_psql('postgres', |
||||
"CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a"); |
||||
my $backup_name = 'my_backup'; |
||||
$node_primary->backup($backup_name); |
||||
|
||||
# Create a streaming standby with a 1 second delay from the backup |
||||
my $node_standby = PostgreSQL::Test::Cluster->new('standby'); |
||||
my $delay = 1; |
||||
$node_standby->init_from_backup($node_primary, $backup_name, |
||||
has_streaming => 1); |
||||
$node_standby->append_conf( |
||||
'postgresql.conf', qq[ |
||||
recovery_min_apply_delay = '${delay}s' |
||||
]); |
||||
$node_standby->start; |
||||
|
||||
|
||||
# Make sure that pg_wal_replay_wait() works: add new content to |
||||
# primary and memorize primary's insert LSN, then wait for that LSN to be |
||||
# replayed on standby. |
||||
$node_primary->safe_psql('postgres', |
||||
"INSERT INTO wait_test VALUES (generate_series(11, 20))"); |
||||
my $lsn1 = |
||||
$node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); |
||||
my $output = $node_standby->safe_psql( |
||||
'postgres', qq[ |
||||
CALL pg_wal_replay_wait('${lsn1}', 1000000); |
||||
SELECT pg_lsn_cmp(pg_last_wal_replay_lsn(), '${lsn1}'::pg_lsn); |
||||
]); |
||||
|
||||
# Make sure the current LSN on standby is at least as big as the LSN we |
||||
# observed on primary's before. |
||||
ok($output >= 0, |
||||
"standby reached the same LSN as primary after pg_wal_replay_wait()"); |
||||
|
||||
# Check that new data is visible after calling pg_wal_replay_wait() |
||||
$node_primary->safe_psql('postgres', |
||||
"INSERT INTO wait_test VALUES (generate_series(21, 30))"); |
||||
my $lsn2 = |
||||
$node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); |
||||
$output = $node_standby->safe_psql( |
||||
'postgres', qq[ |
||||
CALL pg_wal_replay_wait('${lsn2}'); |
||||
SELECT count(*) FROM wait_test; |
||||
]); |
||||
|
||||
# Make sure the current LSN on standby and is the same as primary's LSN |
||||
ok($output eq 30, "standby reached the same LSN as primary"); |
||||
|
||||
# Check that waiting for unreachable LSN triggers the timeout. The |
||||
# unreachable LSN must be well in advance. So WAL records issued by |
||||
# the concurrent autovacuum could not affect that. |
||||
my $lsn3 = |
||||
$node_primary->safe_psql('postgres', |
||||
"SELECT pg_current_wal_insert_lsn() + 10000000000"); |
||||
my $stderr; |
||||
$node_standby->safe_psql('postgres', |
||||
"CALL pg_wal_replay_wait('${lsn2}', 10);"); |
||||
$node_standby->psql( |
||||
'postgres', |
||||
"CALL pg_wal_replay_wait('${lsn3}', 1000);", |
||||
stderr => \$stderr); |
||||
ok( $stderr =~ /timed out while waiting for target LSN/, |
||||
"get timeout on waiting for unreachable LSN"); |
||||
|
||||
# Check that the standby promotion terminates the wait on LSN. Start |
||||
# waiting for unreachable LSN then promote. Check the log for the relevant |
||||
# error message. |
||||
my $psql_session = $node_standby->background_psql('postgres'); |
||||
$psql_session->query_until( |
||||
qr/start/, qq[ |
||||
\\echo start |
||||
CALL pg_wal_replay_wait('${lsn3}'); |
||||
]); |
||||
|
||||
my $log_offset = -s $node_standby->logfile; |
||||
$node_standby->promote; |
||||
$node_standby->wait_for_log('recovery is not in progress', $log_offset); |
||||
|
||||
$node_standby->stop; |
||||
$node_primary->stop; |
||||
done_testing(); |
Loading…
Reference in new issue