PG-1604: Improve last key LSN calculation logic

Previosly we simply set the LSN for the new key to the first write
location.

This is however not correct, as there are many corner cases around this:
* recovery / replication might write old LSNs
* we can't handle multiple keys with the same TLI/LSN, which can happen
  with quick restarts without writes

To support this in this commit we modify the following:
* We only activate new keys outside crash recovery, or immediately if
  encryption is turned off
* We also take the already existing last key into account (if exists),
  and only activate a new key if we progressed past its start location

The remaining changes are just support infrastructure for this:

* Since we might rewrite old records, we use the already existing keys
  for those writes, not the active last keys
* We prefetch existing keys during initialization, so it doesn't
  accidentally happen in the critical section during a write

There is a remaining bug with stopping wal encryption, also mentioned in
a TODO message in the code. This will be addressed in a later PR as this
fix already took too long.
pull/238/head
Zsolt Parragi 1 month ago
parent c7e7dc52a7
commit 9dfed22f84
  1. 2
      contrib/pg_tde/meson.build
  2. 103
      contrib/pg_tde/src/access/pg_tde_xlog_smgr.c
  3. 2
      contrib/pg_tde/src/include/access/pg_tde_xlog_smgr.h
  4. 637
      contrib/pg_tde/t/2pc_replication.pl
  5. 659
      contrib/pg_tde/t/stream_rep.pl
  6. 7
      src/bin/pg_basebackup/receivelog.c

@ -128,6 +128,8 @@ tap_tests = [
't/wal_archiving.pl', 't/wal_archiving.pl',
't/wal_encrypt.pl', 't/wal_encrypt.pl',
't/wal_key_tli.pl', 't/wal_key_tli.pl',
't/2pc_replication.pl',
't/stream_rep.pl',
] ]
tests += { tests += {

@ -226,6 +226,7 @@ void
TDEXLogSmgrInitWrite(bool encrypt_xlog) TDEXLogSmgrInitWrite(bool encrypt_xlog)
{ {
WalEncryptionKey *key = pg_tde_read_last_wal_key(); WalEncryptionKey *key = pg_tde_read_last_wal_key();
WALKeyCacheRec *keys;
/* /*
* Always generate a new key on starting PostgreSQL to protect against * Always generate a new key on starting PostgreSQL to protect against
@ -246,6 +247,16 @@ TDEXLogSmgrInitWrite(bool encrypt_xlog)
TDEXLogSetEncKeyLocation(EncryptionKey.wal_start); TDEXLogSetEncKeyLocation(EncryptionKey.wal_start);
} }
keys = pg_tde_get_wal_cache_keys();
if (keys == NULL)
{
WalLocation start = {.tli = 1,.lsn = 0};
/* cache is empty, prefetch keys from disk */
pg_tde_fetch_wal_keys(start);
}
if (key) if (key)
pfree(key); pfree(key);
} }
@ -263,6 +274,32 @@ TDEXLogSmgrInitWriteReuseKey()
} }
} }
/*
* Encrypt XLog page(s) from the buf and write to the segment file.
*/
static ssize_t
TDEXLogWriteEncryptedPagesOldKeys(int fd, const void *buf, size_t count, off_t offset,
TimeLineID tli, XLogSegNo segno, int segSize)
{
char *enc_buff = EncryptionBuf;
#ifndef FRONTEND
Assert(count <= TDEXLogEncryptBuffSize());
#endif
/* Copy the data as-is, as we might have unencrypted parts */
memcpy(enc_buff, buf, count);
/*
* This method potentially allocates, but only in very early execution
* Shouldn't happen in a write, where we are in a critical section
*/
TDEXLogCryptBuffer(buf, enc_buff, count, offset, tli, segno, segSize);
return pg_pwrite(fd, enc_buff, count, offset);
}
/* /*
* Encrypt XLog page(s) from the buf and write to the segment file. * Encrypt XLog page(s) from the buf and write to the segment file.
*/ */
@ -284,6 +321,7 @@ TDEXLogWriteEncryptedPages(int fd, const void *buf, size_t count, off_t offset,
#endif #endif
CalcXLogPageIVPrefix(tli, segno, key->base_iv, iv_prefix); CalcXLogPageIVPrefix(tli, segno, key->base_iv, iv_prefix);
pg_tde_stream_crypt(iv_prefix, pg_tde_stream_crypt(iv_prefix,
offset, offset,
(char *) buf, (char *) buf,
@ -299,26 +337,64 @@ static ssize_t
tdeheap_xlog_seg_write(int fd, const void *buf, size_t count, off_t offset, tdeheap_xlog_seg_write(int fd, const void *buf, size_t count, off_t offset,
TimeLineID tli, XLogSegNo segno, int segSize) TimeLineID tli, XLogSegNo segno, int segSize)
{ {
bool lastKeyUsable;
bool afterWriteKey;
#ifdef FRONTEND
bool crashRecovery = false;
#else
bool crashRecovery = GetRecoveryState() == RECOVERY_STATE_CRASH;
#endif
WalLocation loc = {.tli = tli};
WalLocation writeKeyLoc;
XLogSegNoOffsetToRecPtr(segno, offset, segSize, loc.lsn);
/* /*
* Set the last (most recent) key's start LSN if not set. * Set the last (most recent) key's start LSN if not set.
* *
* This func called with WALWriteLock held, so no need in any extra sync. * This func called with WALWriteLock held, so no need in any extra sync.
*/ */
if (EncryptionKey.type != WAL_KEY_TYPE_INVALID && TDEXLogGetEncKeyLsn() == 0)
{
WalLocation loc = {.tli = tli};
XLogSegNoOffsetToRecPtr(segno, offset, segSize, loc.lsn); writeKeyLoc.lsn = TDEXLogGetEncKeyLsn();
pg_read_barrier();
writeKeyLoc.tli = TDEXLogGetEncKeyTli();
pg_tde_wal_last_key_set_location(loc); lastKeyUsable = (writeKeyLoc.lsn != 0);
EncryptionKey.wal_start = loc; afterWriteKey = wal_location_cmp(writeKeyLoc, loc) <= 0;
TDEXLogSetEncKeyLocation(EncryptionKey.wal_start);
if (EncryptionKey.type != WAL_KEY_TYPE_INVALID && !lastKeyUsable)
{
WALKeyCacheRec *last_key = pg_tde_get_last_wal_key();
if (!crashRecovery || EncryptionKey.type == WAL_KEY_TYPE_UNENCRYPTED)
{
/*
* TODO: the unencrypted case is still not perfect, we need to
* report an error in some cornercases
*/
if (last_key == NULL || last_key->start.lsn < loc.lsn)
{
pg_tde_wal_last_key_set_location(loc);
EncryptionKey.wal_start = loc;
TDEXLogSetEncKeyLocation(EncryptionKey.wal_start);
lastKeyUsable = true;
}
}
} }
if (EncryptionKey.type == WAL_KEY_TYPE_ENCRYPTED) if ((!afterWriteKey || !lastKeyUsable) && EncryptionKey.type == WAL_KEY_TYPE_ENCRYPTED)
{
return TDEXLogWriteEncryptedPagesOldKeys(fd, buf, count, offset, tli, segno, segSize);
}
else if (EncryptionKey.type == WAL_KEY_TYPE_ENCRYPTED)
{
return TDEXLogWriteEncryptedPages(fd, buf, count, offset, tli, segno); return TDEXLogWriteEncryptedPages(fd, buf, count, offset, tli, segno);
}
else else
{
return pg_pwrite(fd, buf, count, offset); return pg_pwrite(fd, buf, count, offset);
}
} }
/* /*
@ -340,7 +416,7 @@ tdeheap_xlog_seg_read(int fd, void *buf, size_t count, off_t offset,
if (readsz <= 0) if (readsz <= 0)
return readsz; return readsz;
TDEXLogCryptBuffer(buf, count, offset, tli, segno, segSize); TDEXLogCryptBuffer(buf, buf, count, offset, tli, segno, segSize);
return readsz; return readsz;
} }
@ -349,7 +425,7 @@ tdeheap_xlog_seg_read(int fd, void *buf, size_t count, off_t offset,
* [De]Crypt buffer if needed based on provided segment offset, number and TLI * [De]Crypt buffer if needed based on provided segment offset, number and TLI
*/ */
void void
TDEXLogCryptBuffer(void *buf, size_t count, off_t offset, TDEXLogCryptBuffer(const void *buf, void *out_buf, size_t count, off_t offset,
TimeLineID tli, XLogSegNo segno, int segSize) TimeLineID tli, XLogSegNo segno, int segSize)
{ {
WALKeyCacheRec *keys = pg_tde_get_wal_cache_keys(); WALKeyCacheRec *keys = pg_tde_get_wal_cache_keys();
@ -357,7 +433,7 @@ TDEXLogCryptBuffer(void *buf, size_t count, off_t offset,
WalLocation data_end = {.tli = tli}; WalLocation data_end = {.tli = tli};
WalLocation data_start = {.tli = tli}; WalLocation data_start = {.tli = tli};
if (!keys) if (keys == NULL)
{ {
WalLocation start = {.tli = 1,.lsn = 0}; WalLocation start = {.tli = 1,.lsn = 0};
@ -454,6 +530,7 @@ TDEXLogCryptBuffer(void *buf, size_t count, off_t offset,
XLogSegmentOffset(end_lsn, segSize); XLogSegmentOffset(end_lsn, segSize);
size_t dec_sz; size_t dec_sz;
char *dec_buf = (char *) buf + (dec_off - offset); char *dec_buf = (char *) buf + (dec_off - offset);
char *o_buf = (char *) out_buf + (dec_off - offset);
Assert(dec_off >= offset); Assert(dec_off >= offset);
@ -468,17 +545,19 @@ TDEXLogCryptBuffer(void *buf, size_t count, off_t offset,
dec_end = offset + count; dec_end = offset + count;
} }
Assert(dec_end > dec_off);
dec_sz = dec_end - dec_off; dec_sz = dec_end - dec_off;
#ifdef TDE_XLOG_DEBUG #ifdef TDE_XLOG_DEBUG
elog(DEBUG1, "decrypt WAL, dec_off: %lu [buff_off %lu], sz: %lu | key %u_%X/%X", elog(DEBUG1, "decrypt WAL, dec_off: %lu [buff_off %lu], sz: %lu | key %u_%X/%X",
dec_off, dec_off - offset, dec_sz, curr_key->key.wal_start.tli, LSN_FORMAT_ARGS(curr_key->key.wal_start.lsn)); dec_off, dec_off - offset, dec_sz, curr_key->key.wal_start.tli, LSN_FORMAT_ARGS(curr_key->key.wal_start.lsn));
#endif #endif
pg_tde_stream_crypt(iv_prefix, pg_tde_stream_crypt(iv_prefix,
dec_off, dec_off,
dec_buf, dec_buf,
dec_sz, dec_sz,
dec_buf, o_buf,
curr_key->key.key, curr_key->key.key,
&curr_key->crypt_ctx); &curr_key->crypt_ctx);
} }

@ -13,7 +13,7 @@ extern void TDEXLogSmgrInit(void);
extern void TDEXLogSmgrInitWrite(bool encrypt_xlog); extern void TDEXLogSmgrInitWrite(bool encrypt_xlog);
extern void TDEXLogSmgrInitWriteReuseKey(void); extern void TDEXLogSmgrInitWriteReuseKey(void);
extern void TDEXLogCryptBuffer(void *buf, size_t count, off_t offset, extern void TDEXLogCryptBuffer(const void *buf, void *out_buf, size_t count, off_t offset,
TimeLineID tli, XLogSegNo segno, int segSize); TimeLineID tli, XLogSegNo segno, int segSize);
#endif /* PG_TDE_XLOGSMGR_H */ #endif /* PG_TDE_XLOGSMGR_H */

@ -0,0 +1,637 @@
# Copyright (c) 2021-2024, PostgreSQL Global Development Group
# Tests dedicated to two-phase commit in recovery
use strict;
use warnings FATAL => 'all';
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;
my $psql_out = '';
my $psql_rc = '';
sub configure_and_reload
{
local $Test::Builder::Level = $Test::Builder::Level + 1;
my ($node, $parameter) = @_;
my $name = $node->name;
$node->append_conf(
'postgresql.conf', qq(
$parameter
));
$node->psql('postgres', "SELECT pg_reload_conf()", stdout => \$psql_out);
is($psql_out, 't', "reload node $name with $parameter");
return;
}
# Set up two nodes, which will alternately be primary and replication standby.
# Setup london node
my $node_london = PostgreSQL::Test::Cluster->new("london");
$node_london->init(allows_streaming => 1);
$node_london->append_conf(
'postgresql.conf', qq(
max_prepared_transactions = 10
log_checkpoints = true
));
$node_london->append_conf('postgresql.conf',
"shared_preload_libraries = 'pg_tde'");
$node_london->append_conf('postgresql.conf',
"default_table_access_method = 'tde_heap'");
$node_london->start;
# Create and enable tde extension
$node_london->safe_psql('postgres', 'CREATE EXTENSION IF NOT EXISTS pg_tde;');
$node_london->safe_psql('postgres',
"SELECT pg_tde_add_global_key_provider_file('global_key_provider', '/tmp/pg_global_keyring.file');"
);
$node_london->safe_psql('postgres',
"SELECT pg_tde_create_key_using_global_key_provider('global_test_key', 'global_key_provider');"
);
$node_london->safe_psql('postgres',
"SELECT pg_tde_set_server_key_using_global_key_provider('global_test_key', 'global_key_provider');"
);
$node_london->safe_psql('postgres',
"SELECT pg_tde_add_database_key_provider_file('local_key_provider', '/tmp/pg_local_keyring.file');"
);
$node_london->safe_psql('postgres',
"SELECT pg_tde_create_key_using_database_key_provider('local_test_key', 'local_key_provider');"
);
$node_london->safe_psql('postgres',
"SELECT pg_tde_set_key_using_database_key_provider('local_test_key', 'local_key_provider');"
);
$node_london->append_conf(
'postgresql.conf', qq(
pg_tde.wal_encrypt = on
log_min_messages = DEBUG2
));
$node_london->restart;
# if ($WAL_ENCRYPTION eq 'on'){
# enable_wal_encryption($node_london);
# }
# $node_london->safe_psql('postgres', 'ALTER SYSTEM SET pg_tde.wal_encrypt = on;');
$node_london->restart;
my $backup_dir = $node_london->backup_dir . '/london_backup';
mkdir $backup_dir or die "mkdir($backup_dir) failed: $!";
PostgreSQL::Test::RecursiveCopy::copypath($node_london->data_dir . '/pg_tde',
$backup_dir . '/pg_tde');
$node_london->backup('london_backup');
# Setup paris node
my $node_paris = PostgreSQL::Test::Cluster->new('paris');
$node_paris->init_from_backup($node_london, 'london_backup',
has_streaming => 1);
$node_paris->append_conf(
'postgresql.conf', qq(
subtransaction_buffers = 32
pg_tde.wal_encrypt = on
));
$node_paris->start;
# Switch to synchronous replication in both directions
configure_and_reload($node_london, "synchronous_standby_names = 'paris'");
configure_and_reload($node_paris, "synchronous_standby_names = 'london'");
# Set up nonce names for current primary and standby nodes
note "Initially, london is primary and paris is standby";
my ($cur_primary, $cur_standby) = ($node_london, $node_paris);
my $cur_primary_name = $cur_primary->name;
# Create table we'll use in the test transactions
$cur_primary->psql('postgres', "CREATE TABLE t_009_tbl (id int, msg text)");
###############################################################################
# Check that we can commit and abort transaction after soft restart.
# Here checkpoint happens before shutdown and no WAL replay will occur at next
# startup. In this case postgres re-creates shared-memory state from twophase
# files.
###############################################################################
$cur_primary->psql(
'postgres', "
BEGIN;
INSERT INTO t_009_tbl VALUES (1, 'issued to ${cur_primary_name}');
SAVEPOINT s1;
INSERT INTO t_009_tbl VALUES (2, 'issued to ${cur_primary_name}');
PREPARE TRANSACTION 'xact_009_1';
BEGIN;
INSERT INTO t_009_tbl VALUES (3, 'issued to ${cur_primary_name}');
SAVEPOINT s1;
INSERT INTO t_009_tbl VALUES (4, 'issued to ${cur_primary_name}');
PREPARE TRANSACTION 'xact_009_2';");
$cur_primary->stop;
$cur_primary->start;
$psql_rc = $cur_primary->psql('postgres', "COMMIT PREPARED 'xact_009_1'");
is($psql_rc, '0', 'Commit prepared transaction after restart');
$psql_rc = $cur_primary->psql('postgres', "ROLLBACK PREPARED 'xact_009_2'");
is($psql_rc, '0', 'Rollback prepared transaction after restart');
is( $cur_primary->safe_psql(
'postgres', "SELECT pg_tde_is_encrypted('t_009_tbl');"),
't',
"Table t_009_tbl is encrypted on primary");
###############################################################################
# Check that we can commit and abort after a hard restart.
# At next startup, WAL replay will re-create shared memory state for prepared
# transaction using dedicated WAL records.
###############################################################################
$cur_primary->psql(
'postgres', "
CHECKPOINT;
BEGIN;
INSERT INTO t_009_tbl VALUES (5, 'issued to ${cur_primary_name}');
SAVEPOINT s1;
INSERT INTO t_009_tbl VALUES (6, 'issued to ${cur_primary_name}');
PREPARE TRANSACTION 'xact_009_3';
BEGIN;
INSERT INTO t_009_tbl VALUES (7, 'issued to ${cur_primary_name}');
SAVEPOINT s1;
INSERT INTO t_009_tbl VALUES (8, 'issued to ${cur_primary_name}');
PREPARE TRANSACTION 'xact_009_4';");
$cur_primary->teardown_node;
$cur_primary->start;
$psql_rc = $cur_primary->psql('postgres', "COMMIT PREPARED 'xact_009_3'");
is($psql_rc, '0', 'Commit prepared transaction after teardown');
$psql_rc = $cur_primary->psql('postgres', "ROLLBACK PREPARED 'xact_009_4'");
is($psql_rc, '0', 'Rollback prepared transaction after teardown');
is( $cur_primary->safe_psql(
'postgres', "SELECT pg_tde_is_encrypted('t_009_tbl');"),
't',
"Table t_009_tbl is encrypted on primary");
###############################################################################
# Check that WAL replay can handle several transactions with same GID name.
###############################################################################
$cur_primary->psql(
'postgres', "
CHECKPOINT;
BEGIN;
INSERT INTO t_009_tbl VALUES (9, 'issued to ${cur_primary_name}');
SAVEPOINT s1;
INSERT INTO t_009_tbl VALUES (10, 'issued to ${cur_primary_name}');
PREPARE TRANSACTION 'xact_009_5';
COMMIT PREPARED 'xact_009_5';
BEGIN;
INSERT INTO t_009_tbl VALUES (11, 'issued to ${cur_primary_name}');
SAVEPOINT s1;
INSERT INTO t_009_tbl VALUES (12, 'issued to ${cur_primary_name}');
PREPARE TRANSACTION 'xact_009_5';");
$cur_primary->teardown_node;
$cur_primary->start;
$psql_rc = $cur_primary->psql('postgres', "COMMIT PREPARED 'xact_009_5'");
is($psql_rc, '0', 'Replay several transactions with same GID');
is( $cur_primary->safe_psql(
'postgres', "SELECT pg_tde_is_encrypted('t_009_tbl');"),
't',
"Table t_009_tbl is encrypted on primary");
###############################################################################
# Check that WAL replay cleans up its shared memory state and releases locks
# while replaying transaction commits.
###############################################################################
$cur_primary->psql(
'postgres', "
BEGIN;
INSERT INTO t_009_tbl VALUES (13, 'issued to ${cur_primary_name}');
SAVEPOINT s1;
INSERT INTO t_009_tbl VALUES (14, 'issued to ${cur_primary_name}');
PREPARE TRANSACTION 'xact_009_6';
COMMIT PREPARED 'xact_009_6';");
$cur_primary->teardown_node;
$cur_primary->start;
$psql_rc = $cur_primary->psql(
'postgres', "
BEGIN;
INSERT INTO t_009_tbl VALUES (15, 'issued to ${cur_primary_name}');
SAVEPOINT s1;
INSERT INTO t_009_tbl VALUES (16, 'issued to ${cur_primary_name}');
-- This prepare can fail due to conflicting GID or locks conflicts if
-- replay did not fully cleanup its state on previous commit.
PREPARE TRANSACTION 'xact_009_7';");
is($psql_rc, '0', "Cleanup of shared memory state for 2PC commit");
$cur_primary->psql('postgres', "COMMIT PREPARED 'xact_009_7'");
###############################################################################
# Check that WAL replay will cleanup its shared memory state on running standby.
###############################################################################
$cur_primary->psql(
'postgres', "
BEGIN;
INSERT INTO t_009_tbl VALUES (17, 'issued to ${cur_primary_name}');
SAVEPOINT s1;
INSERT INTO t_009_tbl VALUES (18, 'issued to ${cur_primary_name}');
PREPARE TRANSACTION 'xact_009_8';
COMMIT PREPARED 'xact_009_8';");
$cur_standby->psql(
'postgres',
"SELECT count(*) FROM pg_prepared_xacts",
stdout => \$psql_out);
is($psql_out, '0',
"Cleanup of shared memory state on running standby without checkpoint");
###############################################################################
# Same as in previous case, but let's force checkpoint on standby between
# prepare and commit to use on-disk twophase files.
###############################################################################
$cur_primary->psql(
'postgres', "
BEGIN;
INSERT INTO t_009_tbl VALUES (19, 'issued to ${cur_primary_name}');
SAVEPOINT s1;
INSERT INTO t_009_tbl VALUES (20, 'issued to ${cur_primary_name}');
PREPARE TRANSACTION 'xact_009_9';");
$cur_standby->psql('postgres', "CHECKPOINT");
$cur_primary->psql('postgres', "COMMIT PREPARED 'xact_009_9'");
$cur_standby->psql(
'postgres',
"SELECT count(*) FROM pg_prepared_xacts",
stdout => \$psql_out);
is($psql_out, '0',
"Cleanup of shared memory state on running standby after checkpoint");
###############################################################################
# Check that prepared transactions can be committed on promoted standby.
###############################################################################
$cur_primary->psql(
'postgres', "
BEGIN;
INSERT INTO t_009_tbl VALUES (21, 'issued to ${cur_primary_name}');
SAVEPOINT s1;
INSERT INTO t_009_tbl VALUES (22, 'issued to ${cur_primary_name}');
PREPARE TRANSACTION 'xact_009_10';");
$cur_primary->teardown_node;
$cur_standby->promote;
# change roles
note "Now paris is primary and london is standby";
($cur_primary, $cur_standby) = ($node_paris, $node_london);
$cur_primary_name = $cur_primary->name;
# because london is not running at this point, we can't use syncrep commit
# on this command
$psql_rc = $cur_primary->psql('postgres',
"SET synchronous_commit = off; COMMIT PREPARED 'xact_009_10'");
is($psql_rc, '0', "Restore of prepared transaction on promoted standby");
# restart old primary as new standby
$cur_standby->enable_streaming($cur_primary);
$cur_standby->start;
###############################################################################
# Check that prepared transactions are replayed after soft restart of standby
# while primary is down. Since standby knows that primary is down it uses a
# different code path on startup to ensure that the status of transactions is
# consistent.
###############################################################################
$cur_primary->psql(
'postgres', "
BEGIN;
INSERT INTO t_009_tbl VALUES (23, 'issued to ${cur_primary_name}');
SAVEPOINT s1;
INSERT INTO t_009_tbl VALUES (24, 'issued to ${cur_primary_name}');
PREPARE TRANSACTION 'xact_009_11';");
$cur_primary->stop;
$cur_standby->restart;
$cur_standby->promote;
# change roles
note "Now london is primary and paris is standby";
($cur_primary, $cur_standby) = ($node_london, $node_paris);
$cur_primary_name = $cur_primary->name;
$cur_primary->psql(
'postgres',
"SELECT count(*) FROM pg_prepared_xacts",
stdout => \$psql_out);
is($psql_out, '1',
"Restore prepared transactions from files with primary down");
# restart old primary as new standby
$cur_standby->enable_streaming($cur_primary);
$cur_standby->start;
$cur_primary->psql('postgres', "COMMIT PREPARED 'xact_009_11'");
###############################################################################
# Check that prepared transactions are correctly replayed after standby hard
# restart while primary is down.
###############################################################################
$cur_primary->psql(
'postgres', "
BEGIN;
INSERT INTO t_009_tbl VALUES (25, 'issued to ${cur_primary_name}');
SAVEPOINT s1;
INSERT INTO t_009_tbl VALUES (26, 'issued to ${cur_primary_name}');
PREPARE TRANSACTION 'xact_009_12';
");
$cur_primary->stop;
$cur_standby->teardown_node;
$cur_standby->start;
$cur_standby->promote;
# change roles
note "Now paris is primary and london is standby";
($cur_primary, $cur_standby) = ($node_paris, $node_london);
$cur_primary_name = $cur_primary->name;
$cur_primary->psql(
'postgres',
"SELECT count(*) FROM pg_prepared_xacts",
stdout => \$psql_out);
is($psql_out, '1',
"Restore prepared transactions from records with primary down");
# restart old primary as new standby
$cur_standby->enable_streaming($cur_primary);
$cur_standby->start;
$cur_primary->psql('postgres', "COMMIT PREPARED 'xact_009_12'");
###############################################################################
# Check visibility of prepared transactions in standby after a restart while
# primary is down.
###############################################################################
$cur_primary->psql(
'postgres', "
SET synchronous_commit='remote_apply'; -- To ensure the standby is caught up
CREATE TABLE t_009_tbl_standby_mvcc (id int, msg text);
BEGIN;
INSERT INTO t_009_tbl_standby_mvcc VALUES (1, 'issued to ${cur_primary_name}');
SAVEPOINT s1;
INSERT INTO t_009_tbl_standby_mvcc VALUES (2, 'issued to ${cur_primary_name}');
PREPARE TRANSACTION 'xact_009_standby_mvcc';
");
$cur_primary->stop;
$cur_standby->restart;
# Acquire a snapshot in standby, before we commit the prepared transaction
my $standby_session =
$cur_standby->background_psql('postgres', on_error_die => 1);
$standby_session->query_safe("BEGIN ISOLATION LEVEL REPEATABLE READ");
$psql_out =
$standby_session->query_safe("SELECT count(*) FROM t_009_tbl_standby_mvcc");
is($psql_out, '0',
"Prepared transaction not visible in standby before commit");
# Commit the transaction in primary
$cur_primary->start;
$cur_primary->psql(
'postgres', "
SET synchronous_commit='remote_apply'; -- To ensure the standby is caught up
COMMIT PREPARED 'xact_009_standby_mvcc';
");
# Still not visible to the old snapshot
$psql_out =
$standby_session->query_safe("SELECT count(*) FROM t_009_tbl_standby_mvcc");
is($psql_out, '0',
"Committed prepared transaction not visible to old snapshot in standby");
# Is visible to a new snapshot
$standby_session->query_safe("COMMIT");
$psql_out =
$standby_session->query_safe("SELECT count(*) FROM t_009_tbl_standby_mvcc");
is($psql_out, '2',
"Committed prepared transaction is visible to new snapshot in standby");
$standby_session->quit;
###############################################################################
# Check for a lock conflict between prepared transaction with DDL inside and
# replay of XLOG_STANDBY_LOCK wal record.
###############################################################################
$cur_primary->psql(
'postgres', "
BEGIN;
CREATE TABLE t_009_tbl2 (id int, msg text);
SAVEPOINT s1;
INSERT INTO t_009_tbl2 VALUES (27, 'issued to ${cur_primary_name}');
PREPARE TRANSACTION 'xact_009_13';
-- checkpoint will issue XLOG_STANDBY_LOCK that can conflict with lock
-- held by 'create table' statement
CHECKPOINT;
COMMIT PREPARED 'xact_009_13';");
# Ensure that last transaction is replayed on standby.
my $cur_primary_lsn =
$cur_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn()");
my $caughtup_query =
"SELECT '$cur_primary_lsn'::pg_lsn <= pg_last_wal_replay_lsn()";
$cur_standby->poll_query_until('postgres', $caughtup_query)
or die "Timed out while waiting for standby to catch up";
$cur_standby->psql(
'postgres',
"SELECT count(*) FROM t_009_tbl2",
stdout => \$psql_out);
is($psql_out, '1', "Replay prepared transaction with DDL");
###############################################################################
# Check recovery of prepared transaction with DDL inside after a hard restart
# of the primary.
###############################################################################
$cur_primary->psql(
'postgres', "
BEGIN;
CREATE TABLE t_009_tbl3 (id int, msg text);
SAVEPOINT s1;
INSERT INTO t_009_tbl3 VALUES (28, 'issued to ${cur_primary_name}');
PREPARE TRANSACTION 'xact_009_14';
BEGIN;
CREATE TABLE t_009_tbl4 (id int, msg text);
SAVEPOINT s1;
INSERT INTO t_009_tbl4 VALUES (29, 'issued to ${cur_primary_name}');
PREPARE TRANSACTION 'xact_009_15';");
$cur_primary->teardown_node;
$cur_primary->start;
$psql_rc = $cur_primary->psql('postgres', "COMMIT PREPARED 'xact_009_14'");
is($psql_rc, '0', 'Commit prepared transaction after teardown');
$psql_rc = $cur_primary->psql('postgres', "ROLLBACK PREPARED 'xact_009_15'");
is($psql_rc, '0', 'Rollback prepared transaction after teardown');
###############################################################################
# Check recovery of prepared transaction with DDL inside after a soft restart
# of the primary.
###############################################################################
$cur_primary->psql(
'postgres', "
BEGIN;
CREATE TABLE t_009_tbl5 (id int, msg text);
SAVEPOINT s1;
INSERT INTO t_009_tbl5 VALUES (30, 'issued to ${cur_primary_name}');
PREPARE TRANSACTION 'xact_009_16';
BEGIN;
CREATE TABLE t_009_tbl6 (id int, msg text);
SAVEPOINT s1;
INSERT INTO t_009_tbl6 VALUES (31, 'issued to ${cur_primary_name}');
PREPARE TRANSACTION 'xact_009_17';");
$cur_primary->stop;
$cur_primary->start;
$psql_rc = $cur_primary->psql('postgres', "COMMIT PREPARED 'xact_009_16'");
is($psql_rc, '0', 'Commit prepared transaction after restart');
$psql_rc = $cur_primary->psql('postgres', "ROLLBACK PREPARED 'xact_009_17'");
is($psql_rc, '0', 'Rollback prepared transaction after restart');
###############################################################################
# Verify expected data appears on both servers.
###############################################################################
$cur_primary->psql(
'postgres',
"SELECT count(*) FROM pg_prepared_xacts",
stdout => \$psql_out);
is($psql_out, '0', "No uncommitted prepared transactions on primary");
$cur_primary->psql(
'postgres',
"SELECT * FROM t_009_tbl ORDER BY id",
stdout => \$psql_out);
is( $psql_out, qq{1|issued to london
2|issued to london
5|issued to london
6|issued to london
9|issued to london
10|issued to london
11|issued to london
12|issued to london
13|issued to london
14|issued to london
15|issued to london
16|issued to london
17|issued to london
18|issued to london
19|issued to london
20|issued to london
21|issued to london
22|issued to london
23|issued to paris
24|issued to paris
25|issued to london
26|issued to london},
"Check expected t_009_tbl data on primary");
$cur_primary->psql(
'postgres',
"SELECT * FROM t_009_tbl2",
stdout => \$psql_out);
is( $psql_out,
qq{27|issued to paris},
"Check expected t_009_tbl2 data on primary");
$cur_standby->psql(
'postgres',
"SELECT count(*) FROM pg_prepared_xacts",
stdout => \$psql_out);
is($psql_out, '0', "No uncommitted prepared transactions on standby");
$cur_standby->psql(
'postgres',
"SELECT * FROM t_009_tbl ORDER BY id",
stdout => \$psql_out);
is( $psql_out, qq{1|issued to london
2|issued to london
5|issued to london
6|issued to london
9|issued to london
10|issued to london
11|issued to london
12|issued to london
13|issued to london
14|issued to london
15|issued to london
16|issued to london
17|issued to london
18|issued to london
19|issued to london
20|issued to london
21|issued to london
22|issued to london
23|issued to paris
24|issued to paris
25|issued to london
26|issued to london},
"Check expected t_009_tbl data on standby");
$cur_standby->psql(
'postgres',
"SELECT * FROM t_009_tbl2",
stdout => \$psql_out);
is( $psql_out,
qq{27|issued to paris},
"Check expected t_009_tbl2 data on standby");
# Exercise the 2PC recovery code in StartupSUBTRANS, which is concerned with
# ensuring that enough pg_subtrans pages exist on disk to cover the range of
# prepared transactions at server start time. There's not much we can verify
# directly, but let's at least get the code to run.
$cur_standby->stop();
configure_and_reload($cur_primary, "synchronous_standby_names = ''");
$cur_primary->safe_psql('postgres', "CHECKPOINT");
my $start_lsn =
$cur_primary->safe_psql('postgres', 'select pg_current_wal_insert_lsn()');
$cur_primary->safe_psql('postgres',
"CREATE TABLE test(); BEGIN; CREATE TABLE test1(); PREPARE TRANSACTION 'foo';"
);
my $osubtrans = $cur_primary->safe_psql('postgres',
"select 'pg_subtrans/'||f, s.size from pg_ls_dir('pg_subtrans') f, pg_stat_file('pg_subtrans/'||f) s"
);
$cur_primary->pgbench(
'--no-vacuum --client=5 --transactions=1000',
0,
[],
[],
'pgbench run to cause pg_subtrans traffic',
{
'009_twophase.pgb' => 'insert into test default values'
});
# StartupSUBTRANS is exercised with a wide range of visible XIDs in this
# stop/start sequence, because we left a prepared transaction open above.
# Also, setting subtransaction_buffers to 32 above causes to switch SLRU
# bank, for additional code coverage.
$cur_primary->stop;
$cur_primary->start;
my $nsubtrans = $cur_primary->safe_psql('postgres',
"select 'pg_subtrans/'||f, s.size from pg_ls_dir('pg_subtrans') f, pg_stat_file('pg_subtrans/'||f) s"
);
isnt($osubtrans, $nsubtrans, "contents of pg_subtrans/ have changed");
done_testing();

@ -0,0 +1,659 @@
# Copyright (c) 2021-2024, PostgreSQL Global Development Group
# Minimal test testing streaming replication
use strict;
use warnings FATAL => 'all';
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;
use lib 't';
use pgtde;
# Initialize primary node
my $node_primary = PostgreSQL::Test::Cluster->new('primary');
# A specific role is created to perform some tests related to replication,
# and it needs proper authentication configuration.
$node_primary->init(
allows_streaming => 1,
auth_extra => [ '--create-role', 'repl_role' ]);
$node_primary->append_conf('postgresql.conf',
"shared_preload_libraries = 'pg_tde'");
$node_primary->append_conf('postgresql.conf',
"default_table_access_method = 'tde_heap'");
$node_primary->start;
my $backup_name = 'my_backup';
unlink('/tmp/global_keyring.file');
unlink('/tmp/local_keyring.file');
# Create and enable tde extension
$node_primary->safe_psql('postgres',
'CREATE EXTENSION IF NOT EXISTS pg_tde;');
$node_primary->safe_psql('postgres',
"SELECT pg_tde_add_global_key_provider_file('global_key_provider', '/tmp/global_keyring.file');"
);
$node_primary->safe_psql('postgres',
"SELECT pg_tde_create_key_using_global_key_provider('global_test_key_stream', 'global_key_provider');"
);
$node_primary->safe_psql('postgres',
"SELECT pg_tde_set_server_key_using_global_key_provider('global_test_key_stream', 'global_key_provider');"
);
$node_primary->safe_psql('postgres',
"SELECT pg_tde_add_database_key_provider_file('local_key_provider', '/tmp/local_keyring.file');"
);
$node_primary->safe_psql('postgres',
"SELECT pg_tde_create_key_using_database_key_provider('local_test_key_stream', 'local_key_provider');"
);
$node_primary->safe_psql('postgres',
"SELECT pg_tde_set_key_using_database_key_provider('local_test_key_stream', 'local_key_provider');"
);
my $WAL_ENCRYPTION = $ENV{WAL_ENCRYPTION} // 'off';
if ($WAL_ENCRYPTION eq 'on')
{
$node_primary->append_conf(
'postgresql.conf', qq(
pg_tde.wal_encrypt = on
));
}
$node_primary->restart;
# Take backup
PGTDE::backup($node_primary, $backup_name);
# Create streaming standby linking to primary
my $node_standby_1 = PostgreSQL::Test::Cluster->new('standby_1');
$node_standby_1->init_from_backup($node_primary, $backup_name,
has_streaming => 1);
$node_standby_1->start;
# Take backup of standby 1 (not mandatory, but useful to check if
# pg_basebackup works on a standby).
PGTDE::backup($node_standby_1, $backup_name);
# Take a second backup of the standby while the primary is offline.
$node_primary->stop;
PGTDE::backup($node_standby_1, 'my_backup_2');
$node_primary->start;
# Create second standby node linking to standby 1
my $node_standby_2 = PostgreSQL::Test::Cluster->new('standby_2');
$node_standby_2->init_from_backup($node_standby_1, $backup_name,
has_streaming => 1);
$node_standby_2->start;
# Create some content on primary and check its presence in standby nodes
$node_primary->safe_psql('postgres',
"CREATE TABLE tab_int AS SELECT generate_series(1,1002) AS a");
$node_primary->safe_psql(
'postgres', q{
CREATE TABLE user_logins(id serial, who text);
CREATE FUNCTION on_login_proc() RETURNS EVENT_TRIGGER AS $$
BEGIN
IF NOT pg_is_in_recovery() THEN
INSERT INTO user_logins (who) VALUES (session_user);
END IF;
IF session_user = 'regress_hacker' THEN
RAISE EXCEPTION 'You are not welcome!';
END IF;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
CREATE EVENT TRIGGER on_login_trigger ON login EXECUTE FUNCTION on_login_proc();
ALTER EVENT TRIGGER on_login_trigger ENABLE ALWAYS;
});
# Wait for standbys to catch up
$node_primary->wait_for_replay_catchup($node_standby_1);
$node_standby_1->wait_for_replay_catchup($node_standby_2, $node_primary);
my $result =
$node_standby_1->safe_psql('postgres', "SELECT count(*) FROM tab_int");
print "standby 1: $result\n";
is($result, qq(1002), 'check streamed content on standby 1');
$result =
$node_standby_2->safe_psql('postgres', "SELECT count(*) FROM tab_int");
print "standby 2: $result\n";
is($result, qq(1002), 'check streamed content on standby 2');
# Likewise, but for a sequence
$node_primary->safe_psql('postgres',
"CREATE SEQUENCE seq1; SELECT nextval('seq1')");
# Wait for standbys to catch up
$node_primary->wait_for_replay_catchup($node_standby_1);
$node_standby_1->wait_for_replay_catchup($node_standby_2, $node_primary);
$result = $node_standby_1->safe_psql('postgres', "SELECT * FROM seq1");
print "standby 1: $result\n";
is($result, qq(33|0|t), 'check streamed sequence content on standby 1');
$result = $node_standby_2->safe_psql('postgres', "SELECT * FROM seq1");
print "standby 2: $result\n";
is($result, qq(33|0|t), 'check streamed sequence content on standby 2');
# Check pg_sequence_last_value() returns NULL for unlogged sequence on standby
$node_primary->safe_psql('postgres',
"CREATE UNLOGGED SEQUENCE ulseq; SELECT nextval('ulseq')");
$node_primary->wait_for_replay_catchup($node_standby_1);
is( $node_standby_1->safe_psql(
'postgres',
"SELECT pg_sequence_last_value('ulseq'::regclass) IS NULL"),
't',
'pg_sequence_last_value() on unlogged sequence on standby 1');
# Check that only READ-only queries can run on standbys
is($node_standby_1->psql('postgres', 'INSERT INTO tab_int VALUES (1)'),
3, 'read-only queries on standby 1');
is($node_standby_2->psql('postgres', 'INSERT INTO tab_int VALUES (1)'),
3, 'read-only queries on standby 2');
# Tests for connection parameter target_session_attrs
note "testing connection parameter \"target_session_attrs\"";
# Attempt to connect to $node1, then $node2, using target_session_attrs=$mode.
# Expect to connect to $target_node (undef for failure) with given $status.
sub test_target_session_attrs
{
local $Test::Builder::Level = $Test::Builder::Level + 1;
my $node1 = shift;
my $node2 = shift;
my $target_node = shift;
my $mode = shift;
my $status = shift;
my $node1_host = $node1->host;
my $node1_port = $node1->port;
my $node1_name = $node1->name;
my $node2_host = $node2->host;
my $node2_port = $node2->port;
my $node2_name = $node2->name;
my $target_port = undef;
$target_port = $target_node->port if (defined $target_node);
my $target_name = undef;
$target_name = $target_node->name if (defined $target_node);
# Build connection string for connection attempt.
my $connstr = "host=$node1_host,$node2_host ";
$connstr .= "port=$node1_port,$node2_port ";
$connstr .= "target_session_attrs=$mode";
# Attempt to connect, and if successful, get the server port number
# we connected to. Note we must pass the SQL command via the command
# line not stdin, else Perl may spit up trying to write to stdin of
# an already-failed psql process.
my ($ret, $stdout, $stderr) =
$node1->psql('postgres', undef,
extra_params => [ '-d', $connstr, '-c', 'SHOW port;' ]);
if ($status == 0)
{
is( $status == $ret && $stdout eq $target_port,
1,
"connect to node $target_name with mode \"$mode\" and $node1_name,$node2_name listed"
);
}
else
{
print "status = $status\n";
print "ret = $ret\n";
print "stdout = $stdout\n";
print "stderr = $stderr\n";
is( $status == $ret && !defined $target_node,
1,
"fail to connect with mode \"$mode\" and $node1_name,$node2_name listed"
);
}
return;
}
# Connect to primary in "read-write" mode with primary,standby1 list.
test_target_session_attrs($node_primary, $node_standby_1, $node_primary,
"read-write", 0);
# Connect to primary in "read-write" mode with standby1,primary list.
test_target_session_attrs($node_standby_1, $node_primary, $node_primary,
"read-write", 0);
# Connect to primary in "any" mode with primary,standby1 list.
test_target_session_attrs($node_primary, $node_standby_1, $node_primary,
"any", 0);
# Connect to standby1 in "any" mode with standby1,primary list.
test_target_session_attrs($node_standby_1, $node_primary, $node_standby_1,
"any", 0);
# Connect to primary in "primary" mode with primary,standby1 list.
test_target_session_attrs($node_primary, $node_standby_1, $node_primary,
"primary", 0);
# Connect to primary in "primary" mode with standby1,primary list.
test_target_session_attrs($node_standby_1, $node_primary, $node_primary,
"primary", 0);
# Connect to standby1 in "read-only" mode with primary,standby1 list.
test_target_session_attrs($node_primary, $node_standby_1, $node_standby_1,
"read-only", 0);
# Connect to standby1 in "read-only" mode with standby1,primary list.
test_target_session_attrs($node_standby_1, $node_primary, $node_standby_1,
"read-only", 0);
# Connect to primary in "prefer-standby" mode with primary,primary list.
test_target_session_attrs($node_primary, $node_primary, $node_primary,
"prefer-standby", 0);
# Connect to standby1 in "prefer-standby" mode with primary,standby1 list.
test_target_session_attrs($node_primary, $node_standby_1, $node_standby_1,
"prefer-standby", 0);
# Connect to standby1 in "prefer-standby" mode with standby1,primary list.
test_target_session_attrs($node_standby_1, $node_primary, $node_standby_1,
"prefer-standby", 0);
# Connect to standby1 in "standby" mode with primary,standby1 list.
test_target_session_attrs($node_primary, $node_standby_1, $node_standby_1,
"standby", 0);
# Connect to standby1 in "standby" mode with standby1,primary list.
test_target_session_attrs($node_standby_1, $node_primary, $node_standby_1,
"standby", 0);
# Fail to connect in "read-write" mode with standby1,standby2 list.
test_target_session_attrs($node_standby_1, $node_standby_2, undef,
"read-write", 2);
# Fail to connect in "primary" mode with standby1,standby2 list.
test_target_session_attrs($node_standby_1, $node_standby_2, undef,
"primary", 2);
# Fail to connect in "read-only" mode with primary,primary list.
test_target_session_attrs($node_primary, $node_primary, undef,
"read-only", 2);
# Fail to connect in "standby" mode with primary,primary list.
test_target_session_attrs($node_primary, $node_primary, undef, "standby", 2);
# Test for SHOW commands using a WAL sender connection with a replication
# role.
note "testing SHOW commands for replication connection";
$node_primary->psql(
'postgres', "
CREATE ROLE repl_role REPLICATION LOGIN;
GRANT pg_read_all_settings TO repl_role;");
my $primary_host = $node_primary->host;
my $primary_port = $node_primary->port;
my $connstr_common = "host=$primary_host port=$primary_port user=repl_role";
my $connstr_rep = "$connstr_common replication=1";
my $connstr_db = "$connstr_common replication=database dbname=postgres";
# Test SHOW ALL
my ($ret, $stdout, $stderr) = $node_primary->psql(
'postgres', 'SHOW ALL;',
on_error_die => 1,
extra_params => [ '-d', $connstr_rep ]);
ok($ret == 0, "SHOW ALL with replication role and physical replication");
($ret, $stdout, $stderr) = $node_primary->psql(
'postgres', 'SHOW ALL;',
on_error_die => 1,
extra_params => [ '-d', $connstr_db ]);
ok($ret == 0, "SHOW ALL with replication role and logical replication");
# Test SHOW with a user-settable parameter
($ret, $stdout, $stderr) = $node_primary->psql(
'postgres', 'SHOW work_mem;',
on_error_die => 1,
extra_params => [ '-d', $connstr_rep ]);
ok( $ret == 0,
"SHOW with user-settable parameter, replication role and physical replication"
);
($ret, $stdout, $stderr) = $node_primary->psql(
'postgres', 'SHOW work_mem;',
on_error_die => 1,
extra_params => [ '-d', $connstr_db ]);
ok( $ret == 0,
"SHOW with user-settable parameter, replication role and logical replication"
);
# Test SHOW with a superuser-settable parameter
($ret, $stdout, $stderr) = $node_primary->psql(
'postgres', 'SHOW primary_conninfo;',
on_error_die => 1,
extra_params => [ '-d', $connstr_rep ]);
ok( $ret == 0,
"SHOW with superuser-settable parameter, replication role and physical replication"
);
($ret, $stdout, $stderr) = $node_primary->psql(
'postgres', 'SHOW primary_conninfo;',
on_error_die => 1,
extra_params => [ '-d', $connstr_db ]);
ok( $ret == 0,
"SHOW with superuser-settable parameter, replication role and logical replication"
);
note "testing READ_REPLICATION_SLOT command for replication connection";
my $slotname = 'test_read_replication_slot_physical';
($ret, $stdout, $stderr) = $node_primary->psql(
'postgres',
'READ_REPLICATION_SLOT non_existent_slot;',
extra_params => [ '-d', $connstr_rep ]);
ok($ret == 0, "READ_REPLICATION_SLOT exit code 0 on success");
like($stdout, qr/^\|\|$/,
"READ_REPLICATION_SLOT returns NULL values if slot does not exist");
$node_primary->psql(
'postgres',
"CREATE_REPLICATION_SLOT $slotname PHYSICAL RESERVE_WAL;",
extra_params => [ '-d', $connstr_rep ]);
($ret, $stdout, $stderr) = $node_primary->psql(
'postgres',
"READ_REPLICATION_SLOT $slotname;",
extra_params => [ '-d', $connstr_rep ]);
ok($ret == 0, "READ_REPLICATION_SLOT success with existing slot");
like($stdout, qr/^physical\|[^|]*\|1$/,
"READ_REPLICATION_SLOT returns tuple with slot information");
$node_primary->psql(
'postgres',
"DROP_REPLICATION_SLOT $slotname;",
extra_params => [ '-d', $connstr_rep ]);
note "switching to physical replication slot";
# Switch to using a physical replication slot. We can do this without a new
# backup since physical slots can go backwards if needed. Do so on both
# standbys. Since we're going to be testing things that affect the slot state,
# also increase the standby feedback interval to ensure timely updates.
my ($slotname_1, $slotname_2) = ('standby_1', 'standby_2');
$node_primary->append_conf('postgresql.conf', "max_replication_slots = 4");
$node_primary->restart;
is( $node_primary->psql(
'postgres',
qq[SELECT pg_create_physical_replication_slot('$slotname_1');]),
0,
'physical slot created on primary');
$node_standby_1->append_conf('postgresql.conf',
"primary_slot_name = $slotname_1");
$node_standby_1->append_conf('postgresql.conf',
"wal_receiver_status_interval = 1");
$node_standby_1->append_conf('postgresql.conf', "max_replication_slots = 4");
$node_standby_1->restart;
is( $node_standby_1->psql(
'postgres',
qq[SELECT pg_create_physical_replication_slot('$slotname_2');]),
0,
'physical slot created on intermediate replica');
$node_standby_2->append_conf('postgresql.conf',
"primary_slot_name = $slotname_2");
$node_standby_2->append_conf('postgresql.conf',
"wal_receiver_status_interval = 1");
# should be able change primary_slot_name without restart
# will wait effect in get_slot_xmins above
$node_standby_2->reload;
# Fetch xmin columns from slot's pg_replication_slots row, after waiting for
# given boolean condition to be true to ensure we've reached a quiescent state
sub get_slot_xmins
{
my ($node, $slotname, $check_expr) = @_;
$node->poll_query_until(
'postgres', qq[
SELECT $check_expr
FROM pg_catalog.pg_replication_slots
WHERE slot_name = '$slotname';
]) or die "Timed out waiting for slot xmins to advance";
my $slotinfo = $node->slot($slotname);
return ($slotinfo->{'xmin'}, $slotinfo->{'catalog_xmin'});
}
# There's no hot standby feedback and there are no logical slots on either peer
# so xmin and catalog_xmin should be null on both slots.
my ($xmin, $catalog_xmin) = get_slot_xmins($node_primary, $slotname_1,
"xmin IS NULL AND catalog_xmin IS NULL");
is($xmin, '', 'xmin of non-cascaded slot null with no hs_feedback');
is($catalog_xmin, '',
'catalog xmin of non-cascaded slot null with no hs_feedback');
($xmin, $catalog_xmin) = get_slot_xmins($node_standby_1, $slotname_2,
"xmin IS NULL AND catalog_xmin IS NULL");
is($xmin, '', 'xmin of cascaded slot null with no hs_feedback');
is($catalog_xmin, '',
'catalog xmin of cascaded slot null with no hs_feedback');
# Replication still works?
$node_primary->safe_psql('postgres', 'CREATE TABLE replayed(val integer);');
sub replay_check
{
my $newval = $node_primary->safe_psql('postgres',
'INSERT INTO replayed(val) SELECT coalesce(max(val),0) + 1 AS newval FROM replayed RETURNING val'
);
$node_primary->wait_for_replay_catchup($node_standby_1);
$node_standby_1->wait_for_replay_catchup($node_standby_2, $node_primary);
$node_standby_1->safe_psql('postgres',
qq[SELECT 1 FROM replayed WHERE val = $newval])
or die "standby_1 didn't replay primary value $newval";
$node_standby_2->safe_psql('postgres',
qq[SELECT 1 FROM replayed WHERE val = $newval])
or die "standby_2 didn't replay standby_1 value $newval";
return;
}
replay_check();
my $evttrig = $node_standby_1->safe_psql('postgres',
"SELECT evtname FROM pg_event_trigger WHERE evtevent = 'login'");
is($evttrig, 'on_login_trigger', 'Name of login trigger');
$evttrig = $node_standby_2->safe_psql('postgres',
"SELECT evtname FROM pg_event_trigger WHERE evtevent = 'login'");
is($evttrig, 'on_login_trigger', 'Name of login trigger');
note "enabling hot_standby_feedback";
# Enable hs_feedback. The slot should gain an xmin. We set the status interval
# so we'll see the results promptly.
$node_standby_1->safe_psql('postgres',
'ALTER SYSTEM SET hot_standby_feedback = on;');
$node_standby_1->reload;
$node_standby_2->safe_psql('postgres',
'ALTER SYSTEM SET hot_standby_feedback = on;');
$node_standby_2->reload;
replay_check();
($xmin, $catalog_xmin) = get_slot_xmins($node_primary, $slotname_1,
"xmin IS NOT NULL AND catalog_xmin IS NULL");
isnt($xmin, '', 'xmin of non-cascaded slot non-null with hs feedback');
is($catalog_xmin, '',
'catalog xmin of non-cascaded slot still null with hs_feedback');
my ($xmin1, $catalog_xmin1) = get_slot_xmins($node_standby_1, $slotname_2,
"xmin IS NOT NULL AND catalog_xmin IS NULL");
isnt($xmin1, '', 'xmin of cascaded slot non-null with hs feedback');
is($catalog_xmin1, '',
'catalog xmin of cascaded slot still null with hs_feedback');
note "doing some work to advance xmin";
$node_primary->safe_psql(
'postgres', q{
do $$
begin
for i in 10000..11000 loop
-- use an exception block so that each iteration eats an XID
begin
insert into tab_int values (i);
exception
when division_by_zero then null;
end;
end loop;
end$$;
});
$node_primary->safe_psql('postgres', 'VACUUM;');
$node_primary->safe_psql('postgres', 'CHECKPOINT;');
my ($xmin2, $catalog_xmin2) =
get_slot_xmins($node_primary, $slotname_1, "xmin <> '$xmin'");
note "primary slot's new xmin $xmin2, old xmin $xmin";
isnt($xmin2, $xmin, 'xmin of non-cascaded slot with hs feedback has changed');
is($catalog_xmin2, '',
'catalog xmin of non-cascaded slot still null with hs_feedback unchanged'
);
($xmin2, $catalog_xmin2) =
get_slot_xmins($node_standby_1, $slotname_2, "xmin <> '$xmin1'");
note "standby_1 slot's new xmin $xmin2, old xmin $xmin1";
isnt($xmin2, $xmin1, 'xmin of cascaded slot with hs feedback has changed');
is($catalog_xmin2, '',
'catalog xmin of cascaded slot still null with hs_feedback unchanged');
note "disabling hot_standby_feedback";
# Disable hs_feedback. Xmin should be cleared.
$node_standby_1->safe_psql('postgres',
'ALTER SYSTEM SET hot_standby_feedback = off;');
$node_standby_1->reload;
$node_standby_2->safe_psql('postgres',
'ALTER SYSTEM SET hot_standby_feedback = off;');
$node_standby_2->reload;
replay_check();
($xmin, $catalog_xmin) = get_slot_xmins($node_primary, $slotname_1,
"xmin IS NULL AND catalog_xmin IS NULL");
is($xmin, '', 'xmin of non-cascaded slot null with hs feedback reset');
is($catalog_xmin, '',
'catalog xmin of non-cascaded slot still null with hs_feedback reset');
($xmin, $catalog_xmin) = get_slot_xmins($node_standby_1, $slotname_2,
"xmin IS NULL AND catalog_xmin IS NULL");
is($xmin, '', 'xmin of cascaded slot null with hs feedback reset');
is($catalog_xmin, '',
'catalog xmin of cascaded slot still null with hs_feedback reset');
note "check change primary_conninfo without restart";
$node_standby_2->append_conf('postgresql.conf', "primary_slot_name = ''");
$node_standby_2->enable_streaming($node_primary);
$node_standby_2->reload;
# be sure do not streaming from cascade
$node_standby_1->stop;
my $newval = $node_primary->safe_psql('postgres',
'INSERT INTO replayed(val) SELECT coalesce(max(val),0) + 1 AS newval FROM replayed RETURNING val'
);
$node_primary->wait_for_catchup($node_standby_2);
my $is_replayed = $node_standby_2->safe_psql('postgres',
qq[SELECT 1 FROM replayed WHERE val = $newval]);
is($is_replayed, qq(1), "standby_2 didn't replay primary value $newval");
# Drop any existing slots on the primary, for the follow-up tests.
$node_primary->safe_psql('postgres',
"SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots;");
# Test physical slot advancing and its durability. Create a new slot on
# the primary, not used by any of the standbys. This reserves WAL at creation.
my $phys_slot = 'phys_slot';
$node_primary->safe_psql('postgres',
"SELECT pg_create_physical_replication_slot('$phys_slot', true);");
# Generate some WAL, and switch to a new segment, used to check that
# the previous segment is correctly getting recycled as the slot advancing
# would recompute the minimum LSN calculated across all slots.
my $segment_removed = $node_primary->safe_psql('postgres',
'SELECT pg_walfile_name(pg_current_wal_lsn())');
chomp($segment_removed);
$node_primary->advance_wal(1);
my $current_lsn =
$node_primary->safe_psql('postgres', "SELECT pg_current_wal_lsn();");
chomp($current_lsn);
my $psql_rc = $node_primary->psql('postgres',
"SELECT pg_replication_slot_advance('$phys_slot', '$current_lsn'::pg_lsn);"
);
is($psql_rc, '0', 'slot advancing with physical slot');
my $phys_restart_lsn_pre = $node_primary->safe_psql('postgres',
"SELECT restart_lsn from pg_replication_slots WHERE slot_name = '$phys_slot';"
);
chomp($phys_restart_lsn_pre);
# Slot advance should persist across clean restarts.
$node_primary->restart;
my $phys_restart_lsn_post = $node_primary->safe_psql('postgres',
"SELECT restart_lsn from pg_replication_slots WHERE slot_name = '$phys_slot';"
);
chomp($phys_restart_lsn_post);
ok( ($phys_restart_lsn_pre cmp $phys_restart_lsn_post) == 0,
"physical slot advance persists across restarts");
# Check if the previous segment gets correctly recycled after the
# server stopped cleanly, causing a shutdown checkpoint to be generated.
my $primary_data = $node_primary->data_dir;
ok(!-f "$primary_data/pg_wal/$segment_removed",
"WAL segment $segment_removed recycled after physical slot advancing");
note "testing pg_backup_start() followed by BASE_BACKUP";
my $connstr = $node_primary->connstr('postgres') . " replication=database";
# This test requires a replication connection with a database, as it mixes
# a replication command and a SQL command.
$node_primary->command_fails_like(
[
'psql', '-X', '-c', "SELECT pg_backup_start('backup', true)",
'-c', 'BASE_BACKUP', '-d', $connstr
],
qr/a backup is already in progress in this session/,
'BASE_BACKUP cannot run in session already running backup');
note "testing BASE_BACKUP cancellation";
my $sigchld_bb_timeout =
IPC::Run::timer($PostgreSQL::Test::Utils::timeout_default);
# This test requires a replication connection with a database, as it mixes
# a replication command and a SQL command. The first BASE_BACKUP is throttled
# to give enough room for the cancellation running below. The second command
# for pg_backup_stop() should fail.
my ($sigchld_bb_stdin, $sigchld_bb_stdout, $sigchld_bb_stderr) = ('', '', '');
my $sigchld_bb = IPC::Run::start(
[
'psql', '-X', '-c', "BASE_BACKUP (CHECKPOINT 'fast', MAX_RATE 32);",
'-c', 'SELECT pg_backup_stop()',
'-d', $connstr
],
'<',
\$sigchld_bb_stdin,
'>',
\$sigchld_bb_stdout,
'2>',
\$sigchld_bb_stderr,
$sigchld_bb_timeout);
# The cancellation is issued once the database files are streamed and
# the checkpoint issued at backup start completes.
is( $node_primary->poll_query_until(
'postgres',
"SELECT pg_cancel_backend(a.pid) FROM "
. "pg_stat_activity a, pg_stat_progress_basebackup b WHERE "
. "a.pid = b.pid AND a.query ~ 'BASE_BACKUP' AND "
. "b.phase = 'streaming database files';"),
"1",
"WAL sender sending base backup killed");
# The psql command should fail on pg_backup_stop().
ok( pump_until(
$sigchld_bb, $sigchld_bb_timeout,
\$sigchld_bb_stderr, qr/backup is not in progress/),
'base backup cleanly canceled');
$sigchld_bb->finish();
done_testing();

@ -1131,8 +1131,11 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
} }
#ifdef PERCONA_EXT #ifdef PERCONA_EXT
TDEXLogCryptBuffer(copybuf + hdr_len + bytes_written, bytes_to_write, {
xlogoff, stream->timeline, segno, WalSegSz); void* enc_buf = copybuf + hdr_len + bytes_written;
TDEXLogCryptBuffer(enc_buf, enc_buf, bytes_to_write,
xlogoff, stream->timeline, segno, WalSegSz);
}
#endif #endif
if (stream->walmethod->ops->write(walfile, if (stream->walmethod->ops->write(walfile,

Loading…
Cancel
Save