diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 29fc4218cd4..941817c8271 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -281,8 +281,8 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state, * Update the state of a subscription table. */ void -UpdateSubscriptionRelState(Oid subid, Oid relid, char state, - XLogRecPtr sublsn) +UpdateSubscriptionRelStateEx(Oid subid, Oid relid, char state, + XLogRecPtr sublsn, bool already_locked) { Relation rel; HeapTuple tup; @@ -290,9 +290,24 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state, Datum values[Natts_pg_subscription_rel]; bool replaces[Natts_pg_subscription_rel]; - LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); + if (already_locked) + { +#ifdef USE_ASSERT_CHECKING + LOCKTAG tag; - rel = table_open(SubscriptionRelRelationId, RowExclusiveLock); + Assert(CheckRelationOidLockedByMe(SubscriptionRelRelationId, + RowExclusiveLock, true)); + SET_LOCKTAG_OBJECT(tag, InvalidOid, SubscriptionRelationId, subid, 0); + Assert(LockHeldByMe(&tag, AccessShareLock)); +#endif + + rel = table_open(SubscriptionRelRelationId, NoLock); + } + else + { + LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); + rel = table_open(SubscriptionRelRelationId, RowExclusiveLock); + } /* Try finding existing mapping. */ tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP, @@ -326,6 +341,16 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state, table_close(rel, NoLock); } +/* + * Update the state of a subscription table. + */ +void +UpdateSubscriptionRelState(Oid subid, Oid relid, char state, + XLogRecPtr sublsn) +{ + UpdateSubscriptionRelStateEx(subid, relid, state, sublsn, false); +} + /* * Get state of subscription table. * diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 26b71dee672..3e9de50ff12 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -366,6 +366,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) static HTAB *last_start_times = NULL; ListCell *lc; bool started_tx = false; + Relation rel = NULL; Assert(!IsTransactionState()); @@ -463,7 +464,16 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * refresh for the subscription where we remove the table * state and its origin and by this time the origin might be * already removed. So passing missing_ok = true. + * + * Lock the subscription and origin in the same order as we + * are doing during DDL commands to avoid deadlocks. See + * AlterSubscription_refresh. */ + LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, + 0, AccessShareLock); + if (!rel) + rel = table_open(SubscriptionRelRelationId, RowExclusiveLock); + ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid, rstate->relid, originname, @@ -473,9 +483,9 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) /* * Update the state to READY only after the origin cleanup. */ - UpdateSubscriptionRelState(MyLogicalRepWorker->subid, - rstate->relid, rstate->state, - rstate->lsn); + UpdateSubscriptionRelStateEx(MyLogicalRepWorker->subid, + rstate->relid, rstate->state, + rstate->lsn, true); } } else @@ -526,7 +536,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * This is required to avoid any undetected deadlocks * due to any existing lock as deadlock detector won't * be able to detect the waits on the latch. + * + * Also close any tables prior to the commit. */ + if (rel) + { + table_close(rel, NoLock); + rel = NULL; + } CommitTransactionCommand(); pgstat_report_stat(false); } @@ -586,6 +603,10 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } } + /* Close table if opened */ + if (rel) + table_close(rel, NoLock); + if (started_tx) { CommitTransactionCommand(); diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index ed94f57baa1..e0a1a4b8b4a 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -85,6 +85,8 @@ extern void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn); extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn); +extern void UpdateSubscriptionRelStateEx(Oid subid, Oid relid, char state, + XLogRecPtr sublsn, bool already_locked); extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn); extern void RemoveSubscriptionRel(Oid subid, Oid relid);