Post-commit review fixes for 228c370868.

This commit fixes three issues:

1) When a disabled subscription is created with retain_dead_tuples set to true,
the launcher is not woken up immediately, which may lead to delays in creating
the conflict detection slot.

Creating the conflict detection slot is essential even when the subscription is
not enabled. This ensures that dead tuples are retained, which is necessary for
accurately identifying the type of conflict during replication.

2) Conflict-related data was unnecessarily retained when the subscription does
not have a table.

3) Conflict-relevant data could be prematurely removed before applying
prepared transactions on the publisher that are in the commit critical section.

This issue occurred because the backend executing COMMIT PREPARED was not
accounted for during the computation of oldestXid in the commit phase on
the publisher. As a result, the subscriber could advance the conflict
slot's xmin without waiting for such COMMIT PREPARED transactions to
complete.

We fixed this issue by identifying prepared transactions that are in the
commit critical section during computation of oldestXid in commit phase.

Author: Zhijie Hou <houzj.fnst@fujitsu.com>
Reviewed-by: shveta malik <shveta.malik@gmail.com>
Reviewed-by: Dilip Kumar <dilipbalaut@gmail.com>
Reviewed-by: Nisha Moond <nisha.moond412@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/OS9PR01MB16913DACB64E5721872AA5C02943BA@OS9PR01MB16913.jpnprd01.prod.outlook.com
Discussion: https://postgr.es/m/OS9PR01MB16913F67856B0DA2A909788129400A@OS9PR01MB16913.jpnprd01.prod.outlook.com
master
Amit Kapila 3 days ago
parent 43eb2c5419
commit 1f7e9ba3ac
  1. 55
      src/backend/access/transam/twophase.c
  2. 12
      src/backend/commands/subscriptioncmds.c
  3. 26
      src/backend/replication/logical/tablesync.c
  4. 25
      src/backend/replication/logical/worker.c
  5. 12
      src/backend/replication/walsender.c
  6. 2
      src/include/access/twophase.h
  7. 1
      src/include/replication/worker_internal.h
  8. 29
      src/test/subscription/t/035_conflicts.pl

@ -2809,3 +2809,58 @@ LookupGXactBySubid(Oid subid)
return found;
}
/*
* TwoPhaseGetXidByLockingProc
* Return the oldest transaction ID from prepared transactions that are
* currently in the commit critical section.
*
* This function only considers transactions in the currently connected
* database. If no matching transactions are found, it returns
* InvalidTransactionId.
*/
TransactionId
TwoPhaseGetOldestXidInCommit(void)
{
TransactionId oldestRunningXid = InvalidTransactionId;
LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
for (int i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
PGPROC *commitproc;
TransactionId xid;
if (!gxact->valid)
continue;
if (gxact->locking_backend == INVALID_PROC_NUMBER)
continue;
/*
* Get the backend that is handling the transaction. It's safe to
* access this backend while holding TwoPhaseStateLock, as the backend
* can only be destroyed after either removing or unlocking the
* current global transaction, both of which require an exclusive
* TwoPhaseStateLock.
*/
commitproc = GetPGProcByNumber(gxact->locking_backend);
if (MyDatabaseId != commitproc->databaseId)
continue;
if ((commitproc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0)
continue;
xid = XidFromFullTransactionId(gxact->fxid);
if (!TransactionIdIsValid(oldestRunningXid) ||
TransactionIdPrecedes(xid, oldestRunningXid))
oldestRunningXid = xid;
}
LWLockRelease(TwoPhaseStateLock);
return oldestRunningXid;
}

@ -854,7 +854,17 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
pgstat_create_subscription(subid);
if (opts.enabled)
/*
* Notify the launcher to start the apply worker if the subscription is
* enabled, or to create the conflict detection slot if retain_dead_tuples
* is enabled.
*
* Creating the conflict detection slot is essential even when the
* subscription is not enabled. This ensures that dead tuples are
* retained, which is necessary for accurately identifying the type of
* conflict during replication.
*/
if (opts.enabled || opts.retaindeadtuples)
ApplyLauncherWakeupAtCommit();
ObjectAddressSet(myself, SubscriptionRelationId, subid);

@ -1788,6 +1788,32 @@ AllTablesyncsReady(void)
return has_subrels && (table_states_not_ready == NIL);
}
/*
* Return whether the subscription currently has any relations.
*
* Note: Unlike HasSubscriptionRelations(), this function relies on cached
* information for subscription relations. Additionally, it should not be
* invoked outside of apply or tablesync workers, as MySubscription must be
* initialized first.
*/
bool
HasSubscriptionRelationsCached(void)
{
bool started_tx;
bool has_subrels;
/* We need up-to-date subscription tables info here */
has_subrels = FetchTableStates(&started_tx);
if (started_tx)
{
CommitTransactionCommand();
pgstat_report_stat(true);
}
return has_subrels;
}
/*
* Update the two_phase state of the specified subscription in pg_subscription.
*/

@ -4595,11 +4595,28 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
* workers is complex and not worth the effort, so we simply return if not
* all tables are in the READY state.
*
* It is safe to add new tables with initial states to the subscription
* after this check because any changes applied to these tables should
* have a WAL position greater than the rdt_data->remote_lsn.
* Advancing the transaction ID is necessary even when no tables are
* currently subscribed, to avoid retaining dead tuples unnecessarily.
* While it might seem safe to skip all phases and directly assign
* candidate_xid to oldest_nonremovable_xid during the
* RDT_GET_CANDIDATE_XID phase in such cases, this is unsafe. If users
* concurrently add tables to the subscription, the apply worker may not
* process invalidations in time. Consequently,
* HasSubscriptionRelationsCached() might miss the new tables, leading to
* premature advancement of oldest_nonremovable_xid.
*
* Performing the check during RDT_WAIT_FOR_LOCAL_FLUSH is safe, as
* invalidations are guaranteed to be processed before applying changes
* from newly added tables while waiting for the local flush to reach
* remote_lsn.
*
* Additionally, even if we check for subscription tables during
* RDT_GET_CANDIDATE_XID, they might be dropped before reaching
* RDT_WAIT_FOR_LOCAL_FLUSH. Therefore, it's still necessary to verify
* subscription tables at this stage to prevent unnecessary tuple
* retention.
*/
if (!AllTablesyncsReady())
if (HasSubscriptionRelationsCached() && !AllTablesyncsReady())
{
TimestampTz now;

@ -51,6 +51,7 @@
#include "access/timeline.h"
#include "access/transam.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogreader.h"
@ -2719,6 +2720,7 @@ ProcessStandbyPSRequestMessage(void)
{
XLogRecPtr lsn = InvalidXLogRecPtr;
TransactionId oldestXidInCommit;
TransactionId oldestGXidInCommit;
FullTransactionId nextFullXid;
FullTransactionId fullOldestXidInCommit;
WalSnd *walsnd = MyWalSnd;
@ -2746,6 +2748,16 @@ ProcessStandbyPSRequestMessage(void)
* ones replicated.
*/
oldestXidInCommit = GetOldestActiveTransactionId(true, false);
oldestGXidInCommit = TwoPhaseGetOldestXidInCommit();
/*
* Update the oldest xid for standby transmission if an older prepared
* transaction exists and is currently in commit phase.
*/
if (TransactionIdIsValid(oldestGXidInCommit) &&
TransactionIdPrecedes(oldestGXidInCommit, oldestXidInCommit))
oldestXidInCommit = oldestGXidInCommit;
nextFullXid = ReadNextFullTransactionId();
fullOldestXidInCommit = FullTransactionIdFromAllowableAt(nextFullXid,
oldestXidInCommit);

@ -68,4 +68,6 @@ extern void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid_res,
int szgid);
extern bool LookupGXactBySubid(Oid subid);
extern TransactionId TwoPhaseGetOldestXidInCommit(void);
#endif /* TWOPHASE_H */

@ -272,6 +272,7 @@ extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
char *originname, Size szoriginname);
extern bool AllTablesyncsReady(void);
extern bool HasSubscriptionRelationsCached(void);
extern void UpdateTwoPhaseState(Oid suboid, char new_state);
extern void process_syncing_tables(XLogRecPtr current_lsn);

@ -386,6 +386,35 @@ ok( $logfile =~
.*Remote row \(2, 4\); replica identity full \(2, 2\)/,
'update target row was deleted in tab');
###############################################################################
# Check that the xmin value of the conflict detection slot can be advanced when
# the subscription has no tables.
###############################################################################
# Remove the table from the publication
$node_B->safe_psql('postgres', "ALTER PUBLICATION tap_pub_B DROP TABLE tab");
$node_A->safe_psql('postgres',
"ALTER SUBSCRIPTION $subname_AB REFRESH PUBLICATION");
# Remember the next transaction ID to be assigned
$next_xid = $node_A->safe_psql('postgres', "SELECT txid_current() + 1;");
# Confirm that the xmin value is advanced to the latest nextXid. If no
# transactions are running, the apply worker selects nextXid as the candidate
# for the non-removable xid. See GetOldestActiveTransactionId().
ok( $node_A->poll_query_until(
'postgres',
"SELECT xmin = $next_xid from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'"
),
"the xmin value of slot 'pg_conflict_detection' is updated on Node A");
# Re-add the table to the publication for further tests
$node_B->safe_psql('postgres', "ALTER PUBLICATION tap_pub_B ADD TABLE tab");
$node_A->safe_psql('postgres',
"ALTER SUBSCRIPTION $subname_AB REFRESH PUBLICATION WITH (copy_data = false)");
###############################################################################
# Check that dead tuple retention stops due to the wait time surpassing
# max_retention_duration.

Loading…
Cancel
Save