mirror of https://github.com/postgres/postgres
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
377 lines
14 KiB
377 lines
14 KiB
|
|
# Copyright (c) 2021-2025, PostgreSQL Global Development Group
|
|
|
|
# Test the CREATE SUBSCRIPTION 'origin' parameter and its interaction with
|
|
# 'copy_data' parameter.
|
|
use strict;
|
|
use warnings FATAL => 'all';
|
|
use PostgreSQL::Test::Cluster;
|
|
use PostgreSQL::Test::Utils;
|
|
use Test::More;
|
|
|
|
my $subname_AB = 'tap_sub_A_B';
|
|
my $subname_AB2 = 'tap_sub_A_B_2';
|
|
my $subname_BA = 'tap_sub_B_A';
|
|
my $subname_BC = 'tap_sub_B_C';
|
|
|
|
my $result;
|
|
my $stdout;
|
|
my $stderr;
|
|
|
|
###############################################################################
|
|
# Setup a bidirectional logical replication between node_A & node_B
|
|
###############################################################################
|
|
|
|
# Initialize nodes
|
|
# node_A
|
|
my $node_A = PostgreSQL::Test::Cluster->new('node_A');
|
|
$node_A->init(allows_streaming => 'logical');
|
|
$node_A->start;
|
|
|
|
# node_B
|
|
my $node_B = PostgreSQL::Test::Cluster->new('node_B');
|
|
$node_B->init(allows_streaming => 'logical');
|
|
|
|
# Enable the track_commit_timestamp to detect the conflict when attempting to
|
|
# update a row that was previously modified by a different origin.
|
|
$node_B->append_conf('postgresql.conf', 'track_commit_timestamp = on');
|
|
$node_B->start;
|
|
|
|
# Create table on node_A
|
|
$node_A->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)");
|
|
|
|
# Create the same table on node_B
|
|
$node_B->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)");
|
|
|
|
# Setup logical replication
|
|
# node_A (pub) -> node_B (sub)
|
|
my $node_A_connstr = $node_A->connstr . ' dbname=postgres';
|
|
$node_A->safe_psql('postgres', "CREATE PUBLICATION tap_pub_A FOR TABLE tab");
|
|
$node_B->safe_psql(
|
|
'postgres', "
|
|
CREATE SUBSCRIPTION $subname_BA
|
|
CONNECTION '$node_A_connstr application_name=$subname_BA'
|
|
PUBLICATION tap_pub_A
|
|
WITH (origin = none)");
|
|
|
|
# node_B (pub) -> node_A (sub)
|
|
my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
|
|
$node_B->safe_psql('postgres', "CREATE PUBLICATION tap_pub_B FOR TABLE tab");
|
|
$node_A->safe_psql(
|
|
'postgres', "
|
|
CREATE SUBSCRIPTION $subname_AB
|
|
CONNECTION '$node_B_connstr application_name=$subname_AB'
|
|
PUBLICATION tap_pub_B
|
|
WITH (origin = none, copy_data = off)");
|
|
|
|
# Wait for initial table sync to finish
|
|
$node_A->wait_for_subscription_sync($node_B, $subname_AB);
|
|
$node_B->wait_for_subscription_sync($node_A, $subname_BA);
|
|
|
|
is(1, 1, 'Bidirectional replication setup is complete');
|
|
|
|
###############################################################################
|
|
# Check that bidirectional logical replication setup does not cause infinite
|
|
# recursive insertion.
|
|
###############################################################################
|
|
|
|
# insert a record
|
|
$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (11);");
|
|
$node_B->safe_psql('postgres', "INSERT INTO tab VALUES (21);");
|
|
|
|
$node_A->wait_for_catchup($subname_BA);
|
|
$node_B->wait_for_catchup($subname_AB);
|
|
|
|
# check that transaction was committed on subscriber(s)
|
|
$result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
|
|
is( $result, qq(11
|
|
21),
|
|
'Inserted successfully without leading to infinite recursion in bidirectional replication setup'
|
|
);
|
|
$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
|
|
is( $result, qq(11
|
|
21),
|
|
'Inserted successfully without leading to infinite recursion in bidirectional replication setup'
|
|
);
|
|
|
|
$node_A->safe_psql('postgres', "DELETE FROM tab;");
|
|
|
|
$node_A->wait_for_catchup($subname_BA);
|
|
$node_B->wait_for_catchup($subname_AB);
|
|
|
|
###############################################################################
|
|
# Check that remote data of node_B (that originated from node_C) is not
|
|
# published to node_A.
|
|
###############################################################################
|
|
$result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
|
|
is($result, qq(), 'Check existing data');
|
|
|
|
$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
|
|
is($result, qq(), 'Check existing data');
|
|
|
|
# Initialize node node_C
|
|
my $node_C = PostgreSQL::Test::Cluster->new('node_C');
|
|
$node_C->init(allows_streaming => 'logical');
|
|
$node_C->start;
|
|
|
|
$node_C->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)");
|
|
|
|
# Setup logical replication
|
|
# node_C (pub) -> node_B (sub)
|
|
my $node_C_connstr = $node_C->connstr . ' dbname=postgres';
|
|
$node_C->safe_psql('postgres', "CREATE PUBLICATION tap_pub_C FOR TABLE tab");
|
|
$node_B->safe_psql(
|
|
'postgres', "
|
|
CREATE SUBSCRIPTION $subname_BC
|
|
CONNECTION '$node_C_connstr application_name=$subname_BC'
|
|
PUBLICATION tap_pub_C
|
|
WITH (origin = none)");
|
|
$node_B->wait_for_subscription_sync($node_C, $subname_BC);
|
|
|
|
# insert a record
|
|
$node_C->safe_psql('postgres', "INSERT INTO tab VALUES (32);");
|
|
|
|
$node_C->wait_for_catchup($subname_BC);
|
|
$node_B->wait_for_catchup($subname_AB);
|
|
$node_A->wait_for_catchup($subname_BA);
|
|
|
|
$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
|
|
is($result, qq(32), 'The node_C data replicated to node_B');
|
|
|
|
# check that the data published from node_C to node_B is not sent to node_A
|
|
$result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
|
|
is($result, qq(),
|
|
'Remote data originating from another node (not the publisher) is not replicated when origin parameter is none'
|
|
);
|
|
|
|
###############################################################################
|
|
# Check that the conflict can be detected when attempting to update or
|
|
# delete a row that was previously modified by a different source.
|
|
###############################################################################
|
|
|
|
$node_B->safe_psql('postgres', "DELETE FROM tab;");
|
|
|
|
$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (32);");
|
|
|
|
$node_A->wait_for_catchup($subname_BA);
|
|
$node_B->wait_for_catchup($subname_AB);
|
|
|
|
$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
|
|
is($result, qq(32), 'The node_A data replicated to node_B');
|
|
|
|
# The update should update the row on node B that was inserted by node A.
|
|
$node_C->safe_psql('postgres', "UPDATE tab SET a = 33 WHERE a = 32;");
|
|
|
|
$node_B->wait_for_log(
|
|
qr/conflict detected on relation "public.tab": conflict=update_origin_differs.*\n.*DETAIL:.* Updating the row that was modified by a different origin ".*" in transaction [0-9]+ at .*\n.*Existing local row \(32\); remote row \(33\); replica identity \(a\)=\(32\)/
|
|
);
|
|
|
|
$node_B->safe_psql('postgres', "DELETE FROM tab;");
|
|
$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (33);");
|
|
|
|
$node_A->wait_for_catchup($subname_BA);
|
|
$node_B->wait_for_catchup($subname_AB);
|
|
|
|
$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
|
|
is($result, qq(33), 'The node_A data replicated to node_B');
|
|
|
|
# The delete should remove the row on node B that was inserted by node A.
|
|
$node_C->safe_psql('postgres', "DELETE FROM tab WHERE a = 33;");
|
|
|
|
$node_B->wait_for_log(
|
|
qr/conflict detected on relation "public.tab": conflict=delete_origin_differs.*\n.*DETAIL:.* Deleting the row that was modified by a different origin ".*" in transaction [0-9]+ at .*\n.*Existing local row \(33\); replica identity \(a\)=\(33\)/
|
|
);
|
|
|
|
# The remaining tests no longer test conflict detection.
|
|
$node_B->append_conf('postgresql.conf', 'track_commit_timestamp = off');
|
|
$node_B->restart;
|
|
|
|
###############################################################################
|
|
# Specifying origin = NONE indicates that the publisher should only replicate the
|
|
# changes that are generated locally from node_B, but in this case since the
|
|
# node_B is also subscribing data from node_A, node_B can have remotely
|
|
# originated data from node_A. We log a warning, in this case, to draw
|
|
# attention to there being possible remote data.
|
|
###############################################################################
|
|
($result, $stdout, $stderr) = $node_A->psql(
|
|
'postgres', "
|
|
CREATE SUBSCRIPTION $subname_AB2
|
|
CONNECTION '$node_B_connstr application_name=$subname_AB2'
|
|
PUBLICATION tap_pub_B
|
|
WITH (origin = none, copy_data = on)");
|
|
like(
|
|
$stderr,
|
|
qr/WARNING: ( [A-Z0-9]+:)? subscription "tap_sub_a_b_2" requested copy_data with origin = NONE but might copy data that had a different origin/,
|
|
"Create subscription with origin = none and copy_data when the publisher has subscribed same table"
|
|
);
|
|
|
|
$node_A->wait_for_subscription_sync($node_B, $subname_AB2);
|
|
|
|
# Alter subscription ... refresh publication should be successful when no new
|
|
# table is added
|
|
$node_A->safe_psql(
|
|
'postgres', "
|
|
ALTER SUBSCRIPTION $subname_AB2 REFRESH PUBLICATION");
|
|
|
|
# Check Alter subscription ... refresh publication when there is a new
|
|
# table that is subscribing data from a different publication
|
|
$node_A->safe_psql('postgres', "CREATE TABLE tab_new (a int PRIMARY KEY)");
|
|
$node_B->safe_psql('postgres', "CREATE TABLE tab_new (a int PRIMARY KEY)");
|
|
|
|
# add a new table to the publication
|
|
$node_A->safe_psql('postgres',
|
|
"ALTER PUBLICATION tap_pub_A ADD TABLE tab_new");
|
|
$node_B->safe_psql(
|
|
'postgres', "
|
|
ALTER SUBSCRIPTION $subname_BA REFRESH PUBLICATION");
|
|
|
|
$node_B->wait_for_subscription_sync($node_A, $subname_BA);
|
|
|
|
# add a new table to the publication
|
|
$node_B->safe_psql('postgres',
|
|
"ALTER PUBLICATION tap_pub_B ADD TABLE tab_new");
|
|
|
|
# Alter subscription ... refresh publication should log a warning when a new
|
|
# table on the publisher is subscribing data from a different publication
|
|
($result, $stdout, $stderr) = $node_A->psql(
|
|
'postgres', "
|
|
ALTER SUBSCRIPTION $subname_AB2 REFRESH PUBLICATION");
|
|
like(
|
|
$stderr,
|
|
qr/WARNING: ( [A-Z0-9]+:)? subscription "tap_sub_a_b_2" requested copy_data with origin = NONE but might copy data that had a different origin/,
|
|
"Refresh publication when the publisher has subscribed for the new table, but the subscriber-side wants origin = none"
|
|
);
|
|
|
|
# Ensure that relation has reached 'ready' state before we try to drop it
|
|
my $synced_query =
|
|
"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r');";
|
|
$node_A->poll_query_until('postgres', $synced_query)
|
|
or die "Timed out while waiting for subscriber to synchronize data";
|
|
|
|
$node_B->wait_for_catchup($subname_AB2);
|
|
|
|
# clear the operations done by this test
|
|
$node_A->safe_psql(
|
|
'postgres', qq(
|
|
DROP TABLE tab_new;
|
|
DROP SUBSCRIPTION $subname_AB2;
|
|
DROP SUBSCRIPTION $subname_AB;
|
|
DROP PUBLICATION tap_pub_A;
|
|
));
|
|
$node_B->safe_psql(
|
|
'postgres', qq(
|
|
DROP TABLE tab_new;
|
|
DROP SUBSCRIPTION $subname_BA;
|
|
DROP PUBLICATION tap_pub_B;
|
|
));
|
|
|
|
###############################################################################
|
|
# Specifying origin = NONE and copy_data = on must raise WARNING if we subscribe
|
|
# to a partitioned table and this table contains any remotely originated data.
|
|
#
|
|
# node_B
|
|
# __________________________
|
|
# | tab_main | --------------> node_C (tab_main)
|
|
# |__________________________|
|
|
# | tab_part1 | tab_part2 | <-------------- node_A (tab_part2)
|
|
# |____________|_____________|
|
|
# | tab_part2_1 |
|
|
# |_____________|
|
|
#
|
|
# node_B
|
|
# __________________________
|
|
# | tab_main |
|
|
# |__________________________|
|
|
# | tab_part1 | tab_part2 | <-------------- node_A (tab_part2)
|
|
# |____________|_____________|
|
|
# | tab_part2_1 | --------------> node_C (tab_part2_1)
|
|
# |_____________|
|
|
###############################################################################
|
|
|
|
# create a table on node A which will act as a source for a partition on node B
|
|
$node_A->safe_psql(
|
|
'postgres', qq(
|
|
CREATE TABLE tab_part2(a int);
|
|
CREATE PUBLICATION tap_pub_A FOR TABLE tab_part2;
|
|
));
|
|
|
|
# create a partition table on node B
|
|
$node_B->safe_psql(
|
|
'postgres', qq(
|
|
CREATE TABLE tab_main(a int) PARTITION BY RANGE(a);
|
|
CREATE TABLE tab_part1 PARTITION OF tab_main FOR VALUES FROM (0) TO (5);
|
|
CREATE TABLE tab_part2(a int) PARTITION BY RANGE(a);
|
|
CREATE TABLE tab_part2_1 PARTITION OF tab_part2 FOR VALUES FROM (5) TO (10);
|
|
ALTER TABLE tab_main ATTACH PARTITION tab_part2 FOR VALUES FROM (5) to (10);
|
|
CREATE SUBSCRIPTION tap_sub_A_B CONNECTION '$node_A_connstr' PUBLICATION tap_pub_A;
|
|
));
|
|
|
|
# create a table on node C
|
|
$node_C->safe_psql(
|
|
'postgres', qq(
|
|
CREATE TABLE tab_main(a int);
|
|
CREATE TABLE tab_part2_1(a int);
|
|
));
|
|
|
|
# create a logical replication setup between node B and node C with
|
|
# subscription on node C having origin = NONE and copy_data = on
|
|
$node_B->safe_psql(
|
|
'postgres', qq(
|
|
CREATE PUBLICATION tap_pub_B FOR TABLE tab_main WITH (publish_via_partition_root);
|
|
CREATE PUBLICATION tap_pub_B_2 FOR TABLE tab_part2_1;
|
|
));
|
|
|
|
($result, $stdout, $stderr) = $node_C->psql(
|
|
'postgres', "
|
|
CREATE SUBSCRIPTION tap_sub_B_C CONNECTION '$node_B_connstr' PUBLICATION tap_pub_B WITH (origin = none, copy_data = on);
|
|
");
|
|
|
|
# A warning must be logged as a partition 'tab_part2' in node B is subscribed to
|
|
# node A so partition 'tab_part2' can have remotely originated data
|
|
like(
|
|
$stderr,
|
|
qr/WARNING: ( [A-Z0-9]+:)? subscription "tap_sub_b_c" requested copy_data with origin = NONE but might copy data that had a different origin/,
|
|
"Create subscription with origin = none and copy_data when the publisher's partition is subscribing from different origin"
|
|
);
|
|
$node_C->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_B_C");
|
|
|
|
($result, $stdout, $stderr) = $node_C->psql(
|
|
'postgres', "
|
|
CREATE SUBSCRIPTION tap_sub_B_C CONNECTION '$node_B_connstr' PUBLICATION tap_pub_B_2 WITH (origin = none, copy_data = on);
|
|
");
|
|
|
|
# A warning must be logged as ancestor of table 'tab_part2_1' in node B is
|
|
# subscribed to node A so table 'tab_part2_1' can have remotely originated
|
|
# data
|
|
like(
|
|
$stderr,
|
|
qr/WARNING: ( [A-Z0-9]+:)? subscription "tap_sub_b_c" requested copy_data with origin = NONE but might copy data that had a different origin/,
|
|
"Create subscription with origin = none and copy_data when the publisher's ancestor is subscribing from different origin"
|
|
);
|
|
|
|
# clear the operations done by this test
|
|
$node_C->safe_psql(
|
|
'postgres', qq(
|
|
DROP SUBSCRIPTION tap_sub_B_C;
|
|
DROP TABLE tab_main;
|
|
DROP TABLE tab_part2_1;
|
|
));
|
|
$node_B->safe_psql(
|
|
'postgres', qq(
|
|
DROP SUBSCRIPTION tap_sub_A_B;
|
|
DROP PUBLICATION tap_pub_B;
|
|
DROP PUBLICATION tap_pub_B_2;
|
|
DROP TABLE tab_main;
|
|
));
|
|
$node_A->safe_psql(
|
|
'postgres', qq(
|
|
DROP PUBLICATION tap_pub_A;
|
|
DROP TABLE tab_part2;
|
|
));
|
|
|
|
# shutdown
|
|
$node_B->stop('fast');
|
|
$node_A->stop('fast');
|
|
$node_C->stop('fast');
|
|
|
|
done_testing();
|
|
|