@ -8,68 +8,26 @@ use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils ;
use Test::More ;
###############################
# Setup
###############################
# Initialize publisher node
my $ node_publisher = PostgreSQL::Test::Cluster - > new ( 'publisher' ) ;
$ node_publisher - > init ( allows_streaming = > 'logical' ) ;
$ node_publisher - > append_conf (
'postgresql.conf' , qq(
max_prepared_transactions = 10
logical_decoding_mode = immediate
) ) ;
$ node_publisher - > start ;
# Create subscriber node
my $ node_subscriber = PostgreSQL::Test::Cluster - > new ( 'subscriber' ) ;
$ node_subscriber - > init ( allows_streaming = > 'logical' ) ;
$ node_subscriber - > append_conf (
'postgresql.conf' , qq(
max_prepared_transactions = 10
) ) ;
$ node_subscriber - > start ;
# Create some pre-existing content on publisher
$ node_publisher - > safe_psql ( 'postgres' ,
"CREATE TABLE test_tab (a int primary key, b varchar)" ) ;
$ node_publisher - > safe_psql ( 'postgres' ,
"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')" ) ;
# Setup structure on subscriber (columns a and b are compatible with same table name on publisher)
$ node_subscriber - > safe_psql ( 'postgres' ,
"CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"
) ;
# Setup logical replication (streaming = on)
my $ publisher_connstr = $ node_publisher - > connstr . ' dbname=postgres' ;
$ node_publisher - > safe_psql ( 'postgres' ,
"CREATE PUBLICATION tap_pub FOR TABLE test_tab" ) ;
my $ appname = 'tap_sub' ;
$ node_subscriber - > safe_psql (
'postgres' , "
CREATE SUBSCRIPTION tap_sub
CONNECTION '$publisher_connstr application_name=$appname'
PUBLICATION tap_pub
WITH ( streaming = on , two_phase = on ) " ) ;
# Wait for initial table sync to finish
$ node_subscriber - > wait_for_subscription_sync ( $ node_publisher , $ appname ) ;
# Also wait for two-phase to be enabled
my $ twophase_query =
"SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT IN ('e');" ;
$ node_subscriber - > poll_query_until ( 'postgres' , $ twophase_query )
or die "Timed out while waiting for subscriber to enable twophase" ;
###############################
# Check initial data was copied to subscriber
###############################
my $ result = $ node_subscriber - > safe_psql ( 'postgres' ,
"SELECT count(*), count(c), count(d = 999) FROM test_tab" ) ;
is ( $ result , qq( 2|2|2 ) , 'check initial data was copied to subscriber' ) ;
# Check that the parallel apply worker has finished applying the streaming
# transaction.
sub check_parallel_log
{
my ( $ node_subscriber , $ offset , $ is_parallel , $ type ) = @ _ ;
if ( $ is_parallel )
{
$ node_subscriber - > wait_for_log (
qr/DEBUG: ( [A-Z0-9]+:)? finished processing the STREAM $type command/ ,
$ offset ) ;
}
}
# Common test steps for both the streaming=on and streaming=parallel cases.
sub test_streaming
{
my ( $ node_publisher , $ node_subscriber , $ appname , $ is_parallel ) = @ _ ;
my $ offset = 0 ;
###############################
# Test 2PC PREPARE / COMMIT PREPARED.
@ -79,6 +37,9 @@ is($result, qq(2|2|2), 'check initial data was copied to subscriber');
# Expect all data is replicated on subscriber side after the commit.
###############################
# Check the subscriber log from now on.
$ offset = - s $ node_subscriber - > logfile ;
# check that 2PC gets replicated to subscriber
# Insert, update and delete some rows.
$ node_publisher - > safe_psql (
@ -91,8 +52,10 @@ $node_publisher->safe_psql(
$ node_publisher - > wait_for_catchup ( $ appname ) ;
check_parallel_log ( $ node_subscriber , $ offset , $ is_parallel , 'PREPARE' ) ;
# check that transaction is in prepared state on subscriber
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
my $ result = $ node_subscriber - > safe_psql ( 'postgres' ,
"SELECT count(*) FROM pg_prepared_xacts;" ) ;
is ( $ result , qq( 1 ) , 'transaction is prepared on subscriber' ) ;
@ -122,7 +85,11 @@ is($result, qq(0), 'transaction is committed on subscriber');
###############################
# First, delete the data except for 2 rows (will be replicated)
$ node_publisher - > safe_psql ( 'postgres' , "DELETE FROM test_tab WHERE a > 2;" ) ;
$ node_publisher - > safe_psql ( 'postgres' ,
"DELETE FROM test_tab WHERE a > 2;" ) ;
# Check the subscriber log from now on.
$ offset = - s $ node_subscriber - > logfile ;
# Then insert, update and delete some rows.
$ node_publisher - > safe_psql (
@ -135,6 +102,8 @@ $node_publisher->safe_psql(
$ node_publisher - > wait_for_catchup ( $ appname ) ;
check_parallel_log ( $ node_subscriber , $ offset , $ is_parallel , 'PREPARE' ) ;
# check that transaction is in prepared state on subscriber
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
"SELECT count(*) FROM pg_prepared_xacts;" ) ;
@ -150,7 +119,8 @@ $node_publisher->wait_for_catchup($appname);
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
"SELECT count(*), count(c), count(d = 999) FROM test_tab" ) ;
is ( $ result , qq( 2|2|2 ) ,
'Rows inserted by 2PC are rolled back, leaving only the original 2 rows' ) ;
'Rows inserted by 2PC are rolled back, leaving only the original 2 rows'
) ;
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
"SELECT count(*) FROM pg_prepared_xacts;" ) ;
@ -166,6 +136,9 @@ is($result, qq(0), 'transaction is aborted on subscriber');
# Note: both publisher and subscriber do crash/restart.
###############################
# Check the subscriber log from now on.
$ offset = - s $ node_subscriber - > logfile ;
$ node_publisher - > safe_psql (
'postgres' , q{
BEGIN ;
@ -180,6 +153,10 @@ $node_publisher->stop('immediate');
$ node_publisher - > start ;
$ node_subscriber - > start ;
# We don't try to check the log for parallel option here as the subscriber
# may have stopped after finishing the prepare and before logging the
# appropriate message.
# commit post the restart
$ node_publisher - > safe_psql ( 'postgres' ,
"COMMIT PREPARED 'test_prepared_tab';" ) ;
@ -204,7 +181,11 @@ is($result, qq(4|4|4),
###############################
# First, delete the data except for 2 rows (will be replicated)
$ node_publisher - > safe_psql ( 'postgres' , "DELETE FROM test_tab WHERE a > 2;" ) ;
$ node_publisher - > safe_psql ( 'postgres' ,
"DELETE FROM test_tab WHERE a > 2;" ) ;
# Check the subscriber log from now on.
$ offset = - s $ node_subscriber - > logfile ;
# Then insert, update and delete some rows.
$ node_publisher - > safe_psql (
@ -217,6 +198,8 @@ $node_publisher->safe_psql(
$ node_publisher - > wait_for_catchup ( $ appname ) ;
check_parallel_log ( $ node_subscriber , $ offset , $ is_parallel , 'PREPARE' ) ;
# check that transaction is in prepared state on subscriber
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
"SELECT count(*) FROM pg_prepared_xacts;" ) ;
@ -237,7 +220,8 @@ $node_publisher->wait_for_catchup($appname);
# but the extra INSERT outside of the 2PC still was replicated
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
"SELECT count(*), count(c), count(d = 999) FROM test_tab" ) ;
is ( $ result , qq( 3|3|3 ) , 'check the outside insert was copied to subscriber' ) ;
is ( $ result , qq( 3|3|3 ) ,
'check the outside insert was copied to subscriber' ) ;
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
"SELECT count(*) FROM pg_prepared_xacts;" ) ;
@ -255,7 +239,11 @@ is($result, qq(0), 'transaction is aborted on subscriber');
###############################
# First, delete the data except for 2 rows (will be replicated)
$ node_publisher - > safe_psql ( 'postgres' , "DELETE FROM test_tab WHERE a > 2;" ) ;
$ node_publisher - > safe_psql ( 'postgres' ,
"DELETE FROM test_tab WHERE a > 2;" ) ;
# Check the subscriber log from now on.
$ offset = - s $ node_subscriber - > logfile ;
# Then insert, update and delete some rows.
$ node_publisher - > safe_psql (
@ -268,6 +256,8 @@ $node_publisher->safe_psql(
$ node_publisher - > wait_for_catchup ( $ appname ) ;
check_parallel_log ( $ node_subscriber , $ offset , $ is_parallel , 'PREPARE' ) ;
# check that transaction is in prepared state on subscriber
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
"SELECT count(*) FROM pg_prepared_xacts;" ) ;
@ -295,6 +285,105 @@ $result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;" ) ;
is ( $ result , qq( 0 ) , 'transaction is committed on subscriber' ) ;
# Cleanup the test data
$ node_publisher - > safe_psql ( 'postgres' ,
"DELETE FROM test_tab WHERE a > 2;" ) ;
$ node_publisher - > wait_for_catchup ( $ appname ) ;
}
###############################
# Setup
###############################
# Initialize publisher node
my $ node_publisher = PostgreSQL::Test::Cluster - > new ( 'publisher' ) ;
$ node_publisher - > init ( allows_streaming = > 'logical' ) ;
$ node_publisher - > append_conf (
'postgresql.conf' , qq(
max_prepared_transactions = 10
logical_decoding_mode = immediate
) ) ;
$ node_publisher - > start ;
# Create subscriber node
my $ node_subscriber = PostgreSQL::Test::Cluster - > new ( 'subscriber' ) ;
$ node_subscriber - > init ( allows_streaming = > 'logical' ) ;
$ node_subscriber - > append_conf (
'postgresql.conf' , qq(
max_prepared_transactions = 10
) ) ;
$ node_subscriber - > start ;
# Create some pre-existing content on publisher
$ node_publisher - > safe_psql ( 'postgres' ,
"CREATE TABLE test_tab (a int primary key, b varchar)" ) ;
$ node_publisher - > safe_psql ( 'postgres' ,
"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')" ) ;
# Setup structure on subscriber (columns a and b are compatible with same table name on publisher)
$ node_subscriber - > safe_psql ( 'postgres' ,
"CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"
) ;
# Setup logical replication (streaming = on)
my $ publisher_connstr = $ node_publisher - > connstr . ' dbname=postgres' ;
$ node_publisher - > safe_psql ( 'postgres' ,
"CREATE PUBLICATION tap_pub FOR TABLE test_tab" ) ;
my $ appname = 'tap_sub' ;
################################
# Test using streaming mode 'on'
################################
$ node_subscriber - > safe_psql (
'postgres' , "
CREATE SUBSCRIPTION tap_sub
CONNECTION '$publisher_connstr application_name=$appname'
PUBLICATION tap_pub
WITH ( streaming = on , two_phase = on ) " ) ;
# Wait for initial table sync to finish
$ node_subscriber - > wait_for_subscription_sync ( $ node_publisher , $ appname ) ;
# Also wait for two-phase to be enabled
my $ twophase_query =
"SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT IN ('e');" ;
$ node_subscriber - > poll_query_until ( 'postgres' , $ twophase_query )
or die "Timed out while waiting for subscriber to enable twophase" ;
# Check initial data was copied to subscriber
my $ result = $ node_subscriber - > safe_psql ( 'postgres' ,
"SELECT count(*), count(c), count(d = 999) FROM test_tab" ) ;
is ( $ result , qq( 2|2|2 ) , 'check initial data was copied to subscriber' ) ;
test_streaming ( $ node_publisher , $ node_subscriber , $ appname , 0 ) ;
######################################
# Test using streaming mode 'parallel'
######################################
my $ oldpid = $ node_publisher - > safe_psql ( 'postgres' ,
"SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';"
) ;
$ node_subscriber - > safe_psql ( 'postgres' ,
"ALTER SUBSCRIPTION tap_sub SET(streaming = parallel)" ) ;
$ node_publisher - > poll_query_until ( 'postgres' ,
"SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';"
)
or die
"Timed out while waiting for apply to restart after changing SUBSCRIPTION" ;
# We need to check DEBUG logs to ensure that the parallel apply worker has
# applied the transaction. So, bump up the log verbosity.
$ node_subscriber - > append_conf ( 'postgresql.conf' , "log_min_messages = debug1" ) ;
$ node_subscriber - > reload ;
# Run a query to make sure that the reload has taken effect.
$ node_subscriber - > safe_psql ( 'postgres' , q{ SELECT 1 } ) ;
test_streaming ( $ node_publisher , $ node_subscriber , $ appname , 1 ) ;
###############################
# check all the cleanup
###############################