@ -415,6 +415,166 @@ $node_B->safe_psql('postgres', "ALTER PUBLICATION tap_pub_B ADD TABLE tab");
$ node_A - > safe_psql ( 'postgres' ,
"ALTER SUBSCRIPTION $subname_AB REFRESH PUBLICATION WITH (copy_data = false)" ) ;
###############################################################################
# Test that publisher's transactions marked with DELAY_CHKPT_IN_COMMIT prevent
# concurrently deleted tuples on the subscriber from being removed. This test
# also acts as a safeguard to prevent developers from moving the commit
# timestamp acquisition before marking DELAY_CHKPT_IN_COMMIT in
# RecordTransactionCommitPrepared.
###############################################################################
my $ injection_points_supported = $ node_B - > check_extension ( 'injection_points' ) ;
# This test depends on an injection point to block the prepared transaction
# commit after marking DELAY_CHKPT_IN_COMMIT flag.
if ( $ injection_points_supported != 0 )
{
$ node_B - > append_conf ( 'postgresql.conf' ,
" shared_preload_libraries = 'injection_points'
max_prepared_transactions = 1 " ) ;
$ node_B - > restart ;
# Disable the subscription on Node B for testing only one-way
# replication.
$ node_B - > psql ( 'postgres' , "ALTER SUBSCRIPTION $subname_BA DISABLE;" ) ;
# Wait for the apply worker to stop
$ node_B - > poll_query_until ( 'postgres' ,
"SELECT count(*) = 0 FROM pg_stat_activity WHERE backend_type = 'logical replication apply worker'"
) ;
# Truncate the table to cleanup existing dead rows in the table. Then insert
# a new row.
$ node_B - > safe_psql (
'postgres' , qq(
TRUNCATE tab ;
INSERT INTO tab VALUES ( 1 , 1 ) ;
) ) ;
$ node_B - > wait_for_catchup ( $ subname_AB ) ;
# Create the injection_points extension on the publisher node and attach to the
# commit-after-delay-checkpoint injection point.
$ node_B - > safe_psql (
'postgres' ,
" CREATE EXTENSION injection_points ;
SELECT injection_points_attach ( 'commit-after-delay-checkpoint' , 'wait' ) ; "
) ;
# Start a background session on the publisher node to perform an update and
# pause at the injection point.
my $ pub_session = $ node_B - > background_psql ( 'postgres' ) ;
$ pub_session - > query_until (
qr/starting_bg_psql/ ,
q{
\ echo starting_bg_psql
BEGIN ;
UPDATE tab SET b = 2 WHERE a = 1 ;
PREPARE TRANSACTION 'txn_with_later_commit_ts' ;
COMMIT PREPARED 'txn_with_later_commit_ts' ;
}
) ;
# Confirm the update is suspended
$ result =
$ node_B - > safe_psql ( 'postgres' , 'SELECT * FROM tab WHERE a = 1' ) ;
is ( $ result , qq( 1|1 ) , 'publisher sees the old row' ) ;
# Delete the row on the subscriber. The deleted row should be retained due to a
# transaction on the publisher, which is currently marked with the
# DELAY_CHKPT_IN_COMMIT flag.
$ node_A - > safe_psql ( 'postgres' , "DELETE FROM tab WHERE a = 1;" ) ;
# Get the commit timestamp for the delete
my $ sub_ts = $ node_A - > safe_psql ( 'postgres' ,
"SELECT timestamp FROM pg_last_committed_xact();" ) ;
$ log_location = - s $ node_A - > logfile ;
# Confirm that the apply worker keeps requesting publisher status, while
# awaiting the prepared transaction to commit. Thus, the request log should
# appear more than once.
$ node_A - > wait_for_log (
qr/sending publisher status request message/ ,
$ log_location ) ;
$ log_location = - s $ node_A - > logfile ;
$ node_A - > wait_for_log (
qr/sending publisher status request message/ ,
$ log_location ) ;
# Confirm that the dead tuple cannot be removed
( $ cmdret , $ stdout , $ stderr ) =
$ node_A - > psql ( 'postgres' , qq( VACUUM ( verbose ) public.tab; ) ) ;
ok ( $ stderr =~ qr/1 are dead but not yet removable/ ,
'the deleted column is non-removable' ) ;
$ log_location = - s $ node_A - > logfile ;
# Wakeup and detach the injection point on the publisher node. The prepared
# transaction should now commit.
$ node_B - > safe_psql (
'postgres' ,
" SELECT injection_points_wakeup ( 'commit-after-delay-checkpoint' ) ;
SELECT injection_points_detach ( 'commit-after-delay-checkpoint' ) ; "
) ;
# Close the background session on the publisher node
ok ( $ pub_session - > quit , "close publisher session" ) ;
# Confirm that the transaction committed
$ result =
$ node_B - > safe_psql ( 'postgres' , 'SELECT * FROM tab WHERE a = 1' ) ;
is ( $ result , qq( 1|2 ) , 'publisher sees the new row' ) ;
# Ensure the UPDATE is replayed on subscriber
$ node_B - > wait_for_catchup ( $ subname_AB ) ;
$ logfile = slurp_file ( $ node_A - > logfile ( ) , $ log_location ) ;
ok ( $ logfile =~
qr / conflict detected on relation "public.tab" : conflict = update_deleted . *
. * DETAIL: . * The row to be updated was deleted locally in transaction [ 0 - 9 ] + at . *
. * Remote row \ ( 1 , 2 \ ) ; replica identity full \ ( 1 , 1 \ ) / ,
'update target row was deleted in tab' ) ;
# Remember the next transaction ID to be assigned
$ next_xid =
$ node_A - > safe_psql ( 'postgres' , "SELECT txid_current() + 1;" ) ;
# Confirm that the xmin value is advanced to the latest nextXid after the
# prepared transaction on the publisher has been committed.
ok ( $ node_A - > poll_query_until (
'postgres' ,
"SELECT xmin = $next_xid from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'"
) ,
"the xmin value of slot 'pg_conflict_detection' is updated on subscriber"
) ;
# Confirm that the dead tuple can be removed now
( $ cmdret , $ stdout , $ stderr ) =
$ node_A - > psql ( 'postgres' , qq( VACUUM ( verbose ) public.tab; ) ) ;
ok ( $ stderr =~ qr/1 removed, 0 remain, 0 are dead but not yet removable/ ,
'the deleted column is removed' ) ;
# Get the commit timestamp for the publisher's update
my $ pub_ts = $ node_B - > safe_psql ( 'postgres' ,
"SELECT pg_xact_commit_timestamp(xmin) from tab where a=1;" ) ;
# Check that the commit timestamp for the update on the publisher is later than
# or equal to the timestamp of the local deletion, as the commit timestamp
# should be assigned after marking the DELAY_CHKPT_IN_COMMIT flag.
$ result = $ node_B - > safe_psql ( 'postgres' ,
"SELECT '$pub_ts'::timestamp >= '$sub_ts'::timestamp" ) ;
is ( $ result , qq( t ) ,
"pub UPDATE's timestamp is later than that of sub's DELETE" ) ;
# Re-enable the subscription for further tests
$ node_B - > psql ( 'postgres' , "ALTER SUBSCRIPTION $subname_BA ENABLE;" ) ;
}
###############################################################################
# Check that dead tuple retention stops due to the wait time surpassing
# max_retention_duration.