@ -8,80 +8,41 @@ use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils ;
use PostgreSQL::Test::Utils ;
use Test::More ;
use Test::More ;
###############################
# Check that the parallel apply worker has finished applying the streaming
# Setup
# transaction.
###############################
sub check_parallel_log
{
# Initialize publisher node
my ( $ node_subscriber , $ offset , $ is_parallel , $ type ) = @ _ ;
my $ node_publisher = PostgreSQL::Test::Cluster - > new ( 'publisher' ) ;
$ node_publisher - > init ( allows_streaming = > 'logical' ) ;
if ( $ is_parallel )
$ node_publisher - > append_conf (
{
'postgresql.conf' , qq(
$ node_subscriber - > wait_for_log (
max_prepared_transactions = 10
qr/DEBUG: ( [A-Z0-9]+:)? finished processing the STREAM $type command/ ,
logical_decoding_mode = immediate
$ offset ) ;
) ) ;
}
$ node_publisher - > start ;
}
# Create subscriber node
# Common test steps for both the streaming=on and streaming=parallel cases.
my $ node_subscriber = PostgreSQL::Test::Cluster - > new ( 'subscriber' ) ;
sub test_streaming
$ node_subscriber - > init ( allows_streaming = > 'logical' ) ;
{
$ node_subscriber - > append_conf (
my ( $ node_publisher , $ node_subscriber , $ appname , $ is_parallel ) = @ _ ;
'postgresql.conf' , qq(
max_prepared_transactions = 10
my $ offset = 0 ;
) ) ;
$ node_subscriber - > start ;
###############################
# Test 2PC PREPARE / COMMIT PREPARED.
# Create some pre-existing content on publisher
# 1. Data is streamed as a 2PC transaction.
$ node_publisher - > safe_psql ( 'postgres' ,
# 2. Then do commit prepared.
"CREATE TABLE test_tab (a int primary key, b varchar)" ) ;
#
$ node_publisher - > safe_psql ( 'postgres' ,
# Expect all data is replicated on subscriber side after the commit.
"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')" ) ;
###############################
# Setup structure on subscriber (columns a and b are compatible with same table name on publisher)
# Check the subscriber log from now on.
$ node_subscriber - > safe_psql ( 'postgres' ,
$ offset = - s $ node_subscriber - > logfile ;
"CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"
) ;
# check that 2PC gets replicated to subscriber
# Insert, update and delete some rows.
# Setup logical replication (streaming = on)
$ node_publisher - > safe_psql (
my $ publisher_connstr = $ node_publisher - > connstr . ' dbname=postgres' ;
$ node_publisher - > safe_psql ( 'postgres' ,
"CREATE PUBLICATION tap_pub FOR TABLE test_tab" ) ;
my $ appname = 'tap_sub' ;
$ node_subscriber - > safe_psql (
'postgres' , "
CREATE SUBSCRIPTION tap_sub
CONNECTION '$publisher_connstr application_name=$appname'
PUBLICATION tap_pub
WITH ( streaming = on , two_phase = on ) " ) ;
# Wait for initial table sync to finish
$ node_subscriber - > wait_for_subscription_sync ( $ node_publisher , $ appname ) ;
# Also wait for two-phase to be enabled
my $ twophase_query =
"SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT IN ('e');" ;
$ node_subscriber - > poll_query_until ( 'postgres' , $ twophase_query )
or die "Timed out while waiting for subscriber to enable twophase" ;
###############################
# Check initial data was copied to subscriber
###############################
my $ result = $ node_subscriber - > safe_psql ( 'postgres' ,
"SELECT count(*), count(c), count(d = 999) FROM test_tab" ) ;
is ( $ result , qq( 2|2|2 ) , 'check initial data was copied to subscriber' ) ;
###############################
# Test 2PC PREPARE / COMMIT PREPARED.
# 1. Data is streamed as a 2PC transaction.
# 2. Then do commit prepared.
#
# Expect all data is replicated on subscriber side after the commit.
###############################
# check that 2PC gets replicated to subscriber
# Insert, update and delete some rows.
$ node_publisher - > safe_psql (
'postgres' , q{
'postgres' , q{
BEGIN ;
BEGIN ;
INSERT INTO test_tab SELECT i , md5 ( i:: text ) FROM generate_series ( 3 , 5 ) s(i) ;
INSERT INTO test_tab SELECT i , md5 ( i:: text ) FROM generate_series ( 3 , 5 ) s(i) ;
@ -89,43 +50,49 @@ $node_publisher->safe_psql(
DELETE FROM test_tab WHERE mod ( a , 3 ) = 0 ;
DELETE FROM test_tab WHERE mod ( a , 3 ) = 0 ;
PREPARE TRANSACTION 'test_prepared_tab' ; } ) ;
PREPARE TRANSACTION 'test_prepared_tab' ; } ) ;
$ node_publisher - > wait_for_catchup ( $ appname ) ;
$ node_publisher - > wait_for_catchup ( $ appname ) ;
# check that transaction is in prepared state on subscriber
check_parallel_log ( $ node_subscriber , $ offset , $ is_parallel , 'PREPARE' ) ;
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
# check that transaction is in prepared state on subscriber
my $ result = $ node_subscriber - > safe_psql ( 'postgres' ,
"SELECT count(*) FROM pg_prepared_xacts;" ) ;
"SELECT count(*) FROM pg_prepared_xacts;" ) ;
is ( $ result , qq( 1 ) , 'transaction is prepared on subscriber' ) ;
is ( $ result , qq( 1 ) , 'transaction is prepared on subscriber' ) ;
# 2PC transaction gets committed
# 2PC transaction gets committed
$ node_publisher - > safe_psql ( 'postgres' ,
$ node_publisher - > safe_psql ( 'postgres' ,
"COMMIT PREPARED 'test_prepared_tab';" ) ;
"COMMIT PREPARED 'test_prepared_tab';" ) ;
$ node_publisher - > wait_for_catchup ( $ appname ) ;
$ node_publisher - > wait_for_catchup ( $ appname ) ;
# check that transaction is committed on subscriber
# check that transaction is committed on subscriber
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
"SELECT count(*), count(c), count(d = 999) FROM test_tab" ) ;
"SELECT count(*), count(c), count(d = 999) FROM test_tab" ) ;
is ( $ result , qq( 4|4|4 ) ,
is ( $ result , qq( 4|4|4 ) ,
'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults'
'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults'
) ;
) ;
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
"SELECT count(*) FROM pg_prepared_xacts;" ) ;
"SELECT count(*) FROM pg_prepared_xacts;" ) ;
is ( $ result , qq( 0 ) , 'transaction is committed on subscriber' ) ;
is ( $ result , qq( 0 ) , 'transaction is committed on subscriber' ) ;
###############################
###############################
# Test 2PC PREPARE / ROLLBACK PREPARED.
# Test 2PC PREPARE / ROLLBACK PREPARED.
# 1. Table is deleted back to 2 rows which are replicated on subscriber.
# 1. Table is deleted back to 2 rows which are replicated on subscriber.
# 2. Data is streamed using 2PC.
# 2. Data is streamed using 2PC.
# 3. Do rollback prepared.
# 3. Do rollback prepared.
#
#
# Expect data rolls back leaving only the original 2 rows.
# Expect data rolls back leaving only the original 2 rows.
###############################
###############################
# First, delete the data except for 2 rows (will be replicated)
# First, delete the data except for 2 rows (will be replicated)
$ node_publisher - > safe_psql ( 'postgres' , "DELETE FROM test_tab WHERE a > 2;" ) ;
$ node_publisher - > safe_psql ( 'postgres' ,
"DELETE FROM test_tab WHERE a > 2;" ) ;
# Then insert, update and delete some rows.
$ node_publisher - > safe_psql (
# Check the subscriber log from now on.
$ offset = - s $ node_subscriber - > logfile ;
# Then insert, update and delete some rows.
$ node_publisher - > safe_psql (
'postgres' , q{
'postgres' , q{
BEGIN ;
BEGIN ;
INSERT INTO test_tab SELECT i , md5 ( i:: text ) FROM generate_series ( 3 , 5 ) s(i) ;
INSERT INTO test_tab SELECT i , md5 ( i:: text ) FROM generate_series ( 3 , 5 ) s(i) ;
@ -133,40 +100,46 @@ $node_publisher->safe_psql(
DELETE FROM test_tab WHERE mod ( a , 3 ) = 0 ;
DELETE FROM test_tab WHERE mod ( a , 3 ) = 0 ;
PREPARE TRANSACTION 'test_prepared_tab' ; } ) ;
PREPARE TRANSACTION 'test_prepared_tab' ; } ) ;
$ node_publisher - > wait_for_catchup ( $ appname ) ;
$ node_publisher - > wait_for_catchup ( $ appname ) ;
# check that transaction is in prepared state on subscriber
check_parallel_log ( $ node_subscriber , $ offset , $ is_parallel , 'PREPARE' ) ;
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
# check that transaction is in prepared state on subscriber
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
"SELECT count(*) FROM pg_prepared_xacts;" ) ;
"SELECT count(*) FROM pg_prepared_xacts;" ) ;
is ( $ result , qq( 1 ) , 'transaction is prepared on subscriber' ) ;
is ( $ result , qq( 1 ) , 'transaction is prepared on subscriber' ) ;
# 2PC transaction gets aborted
# 2PC transaction gets aborted
$ node_publisher - > safe_psql ( 'postgres' ,
$ node_publisher - > safe_psql ( 'postgres' ,
"ROLLBACK PREPARED 'test_prepared_tab';" ) ;
"ROLLBACK PREPARED 'test_prepared_tab';" ) ;
$ node_publisher - > wait_for_catchup ( $ appname ) ;
$ node_publisher - > wait_for_catchup ( $ appname ) ;
# check that transaction is aborted on subscriber
# check that transaction is aborted on subscriber
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
"SELECT count(*), count(c), count(d = 999) FROM test_tab" ) ;
"SELECT count(*), count(c), count(d = 999) FROM test_tab" ) ;
is ( $ result , qq( 2|2|2 ) ,
is ( $ result , qq( 2|2|2 ) ,
'Rows inserted by 2PC are rolled back, leaving only the original 2 rows' ) ;
'Rows inserted by 2PC are rolled back, leaving only the original 2 rows'
) ;
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
"SELECT count(*) FROM pg_prepared_xacts;" ) ;
"SELECT count(*) FROM pg_prepared_xacts;" ) ;
is ( $ result , qq( 0 ) , 'transaction is aborted on subscriber' ) ;
is ( $ result , qq( 0 ) , 'transaction is aborted on subscriber' ) ;
###############################
###############################
# Check that 2PC COMMIT PREPARED is decoded properly on crash restart.
# Check that 2PC COMMIT PREPARED is decoded properly on crash restart.
# 1. insert, update and delete some rows.
# 1. insert, update and delete some rows.
# 2. Then server crashes before the 2PC transaction is committed.
# 2. Then server crashes before the 2PC transaction is committed.
# 3. After servers are restarted the pending transaction is committed.
# 3. After servers are restarted the pending transaction is committed.
#
#
# Expect all data is replicated on subscriber side after the commit.
# Expect all data is replicated on subscriber side after the commit.
# Note: both publisher and subscriber do crash/restart.
# Note: both publisher and subscriber do crash/restart.
###############################
###############################
$ node_publisher - > safe_psql (
# Check the subscriber log from now on.
$ offset = - s $ node_subscriber - > logfile ;
$ node_publisher - > safe_psql (
'postgres' , q{
'postgres' , q{
BEGIN ;
BEGIN ;
INSERT INTO test_tab SELECT i , md5 ( i:: text ) FROM generate_series ( 3 , 5 ) s(i) ;
INSERT INTO test_tab SELECT i , md5 ( i:: text ) FROM generate_series ( 3 , 5 ) s(i) ;
@ -174,40 +147,48 @@ $node_publisher->safe_psql(
DELETE FROM test_tab WHERE mod ( a , 3 ) = 0 ;
DELETE FROM test_tab WHERE mod ( a , 3 ) = 0 ;
PREPARE TRANSACTION 'test_prepared_tab' ; } ) ;
PREPARE TRANSACTION 'test_prepared_tab' ; } ) ;
$ node_subscriber - > stop ( 'immediate' ) ;
$ node_subscriber - > stop ( 'immediate' ) ;
$ node_publisher - > stop ( 'immediate' ) ;
$ node_publisher - > stop ( 'immediate' ) ;
$ node_publisher - > start ;
$ node_publisher - > start ;
$ node_subscriber - > start ;
$ node_subscriber - > start ;
# commit post the restart
# We don't try to check the log for parallel option here as the subscriber
$ node_publisher - > safe_psql ( 'postgres' ,
# may have stopped after finishing the prepare and before logging the
# appropriate message.
# commit post the restart
$ node_publisher - > safe_psql ( 'postgres' ,
"COMMIT PREPARED 'test_prepared_tab';" ) ;
"COMMIT PREPARED 'test_prepared_tab';" ) ;
$ node_publisher - > wait_for_catchup ( $ appname ) ;
$ node_publisher - > wait_for_catchup ( $ appname ) ;
# check inserts are visible
# check inserts are visible
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
"SELECT count(*), count(c), count(d = 999) FROM test_tab" ) ;
"SELECT count(*), count(c), count(d = 999) FROM test_tab" ) ;
is ( $ result , qq( 4|4|4 ) ,
is ( $ result , qq( 4|4|4 ) ,
'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults'
'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults'
) ;
) ;
###############################
###############################
# Do INSERT after the PREPARE but before ROLLBACK PREPARED.
# Do INSERT after the PREPARE but before ROLLBACK PREPARED.
# 1. Table is deleted back to 2 rows which are replicated on subscriber.
# 1. Table is deleted back to 2 rows which are replicated on subscriber.
# 2. Data is streamed using 2PC.
# 2. Data is streamed using 2PC.
# 3. A single row INSERT is done which is after the PREPARE.
# 3. A single row INSERT is done which is after the PREPARE.
# 4. Then do a ROLLBACK PREPARED.
# 4. Then do a ROLLBACK PREPARED.
#
#
# Expect the 2PC data rolls back leaving only 3 rows on the subscriber
# Expect the 2PC data rolls back leaving only 3 rows on the subscriber
# (the original 2 + inserted 1).
# (the original 2 + inserted 1).
###############################
###############################
# First, delete the data except for 2 rows (will be replicated)
# First, delete the data except for 2 rows (will be replicated)
$ node_publisher - > safe_psql ( 'postgres' , "DELETE FROM test_tab WHERE a > 2;" ) ;
$ node_publisher - > safe_psql ( 'postgres' ,
"DELETE FROM test_tab WHERE a > 2;" ) ;
# Then insert, update and delete some rows.
$ node_publisher - > safe_psql (
# Check the subscriber log from now on.
$ offset = - s $ node_subscriber - > logfile ;
# Then insert, update and delete some rows.
$ node_publisher - > safe_psql (
'postgres' , q{
'postgres' , q{
BEGIN ;
BEGIN ;
INSERT INTO test_tab SELECT i , md5 ( i:: text ) FROM generate_series ( 3 , 5 ) s(i) ;
INSERT INTO test_tab SELECT i , md5 ( i:: text ) FROM generate_series ( 3 , 5 ) s(i) ;
@ -215,50 +196,57 @@ $node_publisher->safe_psql(
DELETE FROM test_tab WHERE mod ( a , 3 ) = 0 ;
DELETE FROM test_tab WHERE mod ( a , 3 ) = 0 ;
PREPARE TRANSACTION 'test_prepared_tab' ; } ) ;
PREPARE TRANSACTION 'test_prepared_tab' ; } ) ;
$ node_publisher - > wait_for_catchup ( $ appname ) ;
$ node_publisher - > wait_for_catchup ( $ appname ) ;
# check that transaction is in prepared state on subscriber
check_parallel_log ( $ node_subscriber , $ offset , $ is_parallel , 'PREPARE' ) ;
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
# check that transaction is in prepared state on subscriber
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
"SELECT count(*) FROM pg_prepared_xacts;" ) ;
"SELECT count(*) FROM pg_prepared_xacts;" ) ;
is ( $ result , qq( 1 ) , 'transaction is prepared on subscriber' ) ;
is ( $ result , qq( 1 ) , 'transaction is prepared on subscriber' ) ;
# Insert a different record (now we are outside of the 2PC transaction)
# Insert a different record (now we are outside of the 2PC transaction)
# Note: the 2PC transaction still holds row locks so make sure this insert is for a separate primary key
# Note: the 2PC transaction still holds row locks so make sure this insert is for a separate primary key
$ node_publisher - > safe_psql ( 'postgres' ,
$ node_publisher - > safe_psql ( 'postgres' ,
"INSERT INTO test_tab VALUES (99999, 'foobar')" ) ;
"INSERT INTO test_tab VALUES (99999, 'foobar')" ) ;
# 2PC transaction gets aborted
# 2PC transaction gets aborted
$ node_publisher - > safe_psql ( 'postgres' ,
$ node_publisher - > safe_psql ( 'postgres' ,
"ROLLBACK PREPARED 'test_prepared_tab';" ) ;
"ROLLBACK PREPARED 'test_prepared_tab';" ) ;
$ node_publisher - > wait_for_catchup ( $ appname ) ;
$ node_publisher - > wait_for_catchup ( $ appname ) ;
# check that transaction is aborted on subscriber,
# check that transaction is aborted on subscriber,
# but the extra INSERT outside of the 2PC still was replicated
# but the extra INSERT outside of the 2PC still was replicated
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
"SELECT count(*), count(c), count(d = 999) FROM test_tab" ) ;
"SELECT count(*), count(c), count(d = 999) FROM test_tab" ) ;
is ( $ result , qq( 3|3|3 ) , 'check the outside insert was copied to subscriber' ) ;
is ( $ result , qq( 3|3|3 ) ,
'check the outside insert was copied to subscriber' ) ;
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
"SELECT count(*) FROM pg_prepared_xacts;" ) ;
"SELECT count(*) FROM pg_prepared_xacts;" ) ;
is ( $ result , qq( 0 ) , 'transaction is aborted on subscriber' ) ;
is ( $ result , qq( 0 ) , 'transaction is aborted on subscriber' ) ;
###############################
###############################
# Do INSERT after the PREPARE but before COMMIT PREPARED.
# Do INSERT after the PREPARE but before COMMIT PREPARED.
# 1. Table is deleted back to 2 rows which are replicated on subscriber.
# 1. Table is deleted back to 2 rows which are replicated on subscriber.
# 2. Data is streamed using 2PC.
# 2. Data is streamed using 2PC.
# 3. A single row INSERT is done which is after the PREPARE.
# 3. A single row INSERT is done which is after the PREPARE.
# 4. Then do a COMMIT PREPARED.
# 4. Then do a COMMIT PREPARED.
#
#
# Expect 2PC data + the extra row are on the subscriber
# Expect 2PC data + the extra row are on the subscriber
# (the 3334 + inserted 1 = 3335).
# (the 3334 + inserted 1 = 3335).
###############################
###############################
# First, delete the data except for 2 rows (will be replicated)
# First, delete the data except for 2 rows (will be replicated)
$ node_publisher - > safe_psql ( 'postgres' , "DELETE FROM test_tab WHERE a > 2;" ) ;
$ node_publisher - > safe_psql ( 'postgres' ,
"DELETE FROM test_tab WHERE a > 2;" ) ;
# Then insert, update and delete some rows.
$ node_publisher - > safe_psql (
# Check the subscriber log from now on.
$ offset = - s $ node_subscriber - > logfile ;
# Then insert, update and delete some rows.
$ node_publisher - > safe_psql (
'postgres' , q{
'postgres' , q{
BEGIN ;
BEGIN ;
INSERT INTO test_tab SELECT i , md5 ( i:: text ) FROM generate_series ( 3 , 5 ) s(i) ;
INSERT INTO test_tab SELECT i , md5 ( i:: text ) FROM generate_series ( 3 , 5 ) s(i) ;
@ -266,34 +254,135 @@ $node_publisher->safe_psql(
DELETE FROM test_tab WHERE mod ( a , 3 ) = 0 ;
DELETE FROM test_tab WHERE mod ( a , 3 ) = 0 ;
PREPARE TRANSACTION 'test_prepared_tab' ; } ) ;
PREPARE TRANSACTION 'test_prepared_tab' ; } ) ;
$ node_publisher - > wait_for_catchup ( $ appname ) ;
$ node_publisher - > wait_for_catchup ( $ appname ) ;
# check that transaction is in prepared state on subscriber
check_parallel_log ( $ node_subscriber , $ offset , $ is_parallel , 'PREPARE' ) ;
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
# check that transaction is in prepared state on subscriber
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
"SELECT count(*) FROM pg_prepared_xacts;" ) ;
"SELECT count(*) FROM pg_prepared_xacts;" ) ;
is ( $ result , qq( 1 ) , 'transaction is prepared on subscriber' ) ;
is ( $ result , qq( 1 ) , 'transaction is prepared on subscriber' ) ;
# Insert a different record (now we are outside of the 2PC transaction)
# Insert a different record (now we are outside of the 2PC transaction)
# Note: the 2PC transaction still holds row locks so make sure this insert is for a separare primary key
# Note: the 2PC transaction still holds row locks so make sure this insert is for a separare primary key
$ node_publisher - > safe_psql ( 'postgres' ,
$ node_publisher - > safe_psql ( 'postgres' ,
"INSERT INTO test_tab VALUES (99999, 'foobar')" ) ;
"INSERT INTO test_tab VALUES (99999, 'foobar')" ) ;
# 2PC transaction gets committed
# 2PC transaction gets committed
$ node_publisher - > safe_psql ( 'postgres' ,
$ node_publisher - > safe_psql ( 'postgres' ,
"COMMIT PREPARED 'test_prepared_tab';" ) ;
"COMMIT PREPARED 'test_prepared_tab';" ) ;
$ node_publisher - > wait_for_catchup ( $ appname ) ;
$ node_publisher - > wait_for_catchup ( $ appname ) ;
# check that transaction is committed on subscriber
# check that transaction is committed on subscriber
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
"SELECT count(*), count(c), count(d = 999) FROM test_tab" ) ;
"SELECT count(*), count(c), count(d = 999) FROM test_tab" ) ;
is ( $ result , qq( 5|5|5 ) ,
is ( $ result , qq( 5|5|5 ) ,
'Rows inserted by 2PC (as well as outside insert) have committed on subscriber, and extra columns contain local defaults'
'Rows inserted by 2PC (as well as outside insert) have committed on subscriber, and extra columns contain local defaults'
) ;
) ;
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
$ result = $ node_subscriber - > safe_psql ( 'postgres' ,
"SELECT count(*) FROM pg_prepared_xacts;" ) ;
"SELECT count(*) FROM pg_prepared_xacts;" ) ;
is ( $ result , qq( 0 ) , 'transaction is committed on subscriber' ) ;
is ( $ result , qq( 0 ) , 'transaction is committed on subscriber' ) ;
# Cleanup the test data
$ node_publisher - > safe_psql ( 'postgres' ,
"DELETE FROM test_tab WHERE a > 2;" ) ;
$ node_publisher - > wait_for_catchup ( $ appname ) ;
}
###############################
# Setup
###############################
# Initialize publisher node
my $ node_publisher = PostgreSQL::Test::Cluster - > new ( 'publisher' ) ;
$ node_publisher - > init ( allows_streaming = > 'logical' ) ;
$ node_publisher - > append_conf (
'postgresql.conf' , qq(
max_prepared_transactions = 10
logical_decoding_mode = immediate
) ) ;
$ node_publisher - > start ;
# Create subscriber node
my $ node_subscriber = PostgreSQL::Test::Cluster - > new ( 'subscriber' ) ;
$ node_subscriber - > init ( allows_streaming = > 'logical' ) ;
$ node_subscriber - > append_conf (
'postgresql.conf' , qq(
max_prepared_transactions = 10
) ) ;
$ node_subscriber - > start ;
# Create some pre-existing content on publisher
$ node_publisher - > safe_psql ( 'postgres' ,
"CREATE TABLE test_tab (a int primary key, b varchar)" ) ;
$ node_publisher - > safe_psql ( 'postgres' ,
"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')" ) ;
# Setup structure on subscriber (columns a and b are compatible with same table name on publisher)
$ node_subscriber - > safe_psql ( 'postgres' ,
"CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"
) ;
# Setup logical replication (streaming = on)
my $ publisher_connstr = $ node_publisher - > connstr . ' dbname=postgres' ;
$ node_publisher - > safe_psql ( 'postgres' ,
"CREATE PUBLICATION tap_pub FOR TABLE test_tab" ) ;
my $ appname = 'tap_sub' ;
################################
# Test using streaming mode 'on'
################################
$ node_subscriber - > safe_psql (
'postgres' , "
CREATE SUBSCRIPTION tap_sub
CONNECTION '$publisher_connstr application_name=$appname'
PUBLICATION tap_pub
WITH ( streaming = on , two_phase = on ) " ) ;
# Wait for initial table sync to finish
$ node_subscriber - > wait_for_subscription_sync ( $ node_publisher , $ appname ) ;
# Also wait for two-phase to be enabled
my $ twophase_query =
"SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT IN ('e');" ;
$ node_subscriber - > poll_query_until ( 'postgres' , $ twophase_query )
or die "Timed out while waiting for subscriber to enable twophase" ;
# Check initial data was copied to subscriber
my $ result = $ node_subscriber - > safe_psql ( 'postgres' ,
"SELECT count(*), count(c), count(d = 999) FROM test_tab" ) ;
is ( $ result , qq( 2|2|2 ) , 'check initial data was copied to subscriber' ) ;
test_streaming ( $ node_publisher , $ node_subscriber , $ appname , 0 ) ;
######################################
# Test using streaming mode 'parallel'
######################################
my $ oldpid = $ node_publisher - > safe_psql ( 'postgres' ,
"SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';"
) ;
$ node_subscriber - > safe_psql ( 'postgres' ,
"ALTER SUBSCRIPTION tap_sub SET(streaming = parallel)" ) ;
$ node_publisher - > poll_query_until ( 'postgres' ,
"SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';"
)
or die
"Timed out while waiting for apply to restart after changing SUBSCRIPTION" ;
# We need to check DEBUG logs to ensure that the parallel apply worker has
# applied the transaction. So, bump up the log verbosity.
$ node_subscriber - > append_conf ( 'postgresql.conf' , "log_min_messages = debug1" ) ;
$ node_subscriber - > reload ;
# Run a query to make sure that the reload has taken effect.
$ node_subscriber - > safe_psql ( 'postgres' , q{ SELECT 1 } ) ;
test_streaming ( $ node_publisher , $ node_subscriber , $ appname , 1 ) ;
###############################
###############################
# check all the cleanup
# check all the cleanup