@ -316,7 +316,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
UpdateSubscriptionRelState ( MyLogicalRepWorker - > subid ,
UpdateSubscriptionRelState ( MyLogicalRepWorker - > subid ,
MyLogicalRepWorker - > relid ,
MyLogicalRepWorker - > relid ,
MyLogicalRepWorker - > relstate ,
MyLogicalRepWorker - > relstate ,
MyLogicalRepWorker - > relstate_lsn ) ;
MyLogicalRepWorker - > relstate_lsn ,
false ) ;
/*
/*
* End streaming so that LogRepWorkerWalRcvConn can be used to drop
* End streaming so that LogRepWorkerWalRcvConn can be used to drop
@ -425,6 +426,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
ListCell * lc ;
ListCell * lc ;
bool started_tx = false ;
bool started_tx = false ;
bool should_exit = false ;
bool should_exit = false ;
Relation rel = NULL ;
Assert ( ! IsTransactionState ( ) ) ;
Assert ( ! IsTransactionState ( ) ) ;
@ -492,7 +494,17 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
* worker to remove the origin tracking as if there is any
* worker to remove the origin tracking as if there is any
* error while dropping we won ' t restart it to drop the
* error while dropping we won ' t restart it to drop the
* origin . So passing missing_ok = true .
* origin . So passing missing_ok = true .
*
* Lock the subscription and origin in the same order as we
* are doing during DDL commands to avoid deadlocks . See
* AlterSubscription_refresh .
*/
*/
LockSharedObject ( SubscriptionRelationId , MyLogicalRepWorker - > subid ,
0 , AccessShareLock ) ;
if ( ! rel )
rel = table_open ( SubscriptionRelRelationId , RowExclusiveLock ) ;
ReplicationOriginNameForLogicalRep ( MyLogicalRepWorker - > subid ,
ReplicationOriginNameForLogicalRep ( MyLogicalRepWorker - > subid ,
rstate - > relid ,
rstate - > relid ,
originname ,
originname ,
@ -504,7 +516,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
*/
*/
UpdateSubscriptionRelState ( MyLogicalRepWorker - > subid ,
UpdateSubscriptionRelState ( MyLogicalRepWorker - > subid ,
rstate - > relid , rstate - > state ,
rstate - > relid , rstate - > state ,
rstate - > lsn ) ;
rstate - > lsn , true ) ;
}
}
}
}
else
else
@ -555,7 +567,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
* This is required to avoid any undetected deadlocks
* This is required to avoid any undetected deadlocks
* due to any existing lock as deadlock detector won ' t
* due to any existing lock as deadlock detector won ' t
* be able to detect the waits on the latch .
* be able to detect the waits on the latch .
*
* Also close any tables prior to the commit .
*/
*/
if ( rel )
{
table_close ( rel , NoLock ) ;
rel = NULL ;
}
CommitTransactionCommand ( ) ;
CommitTransactionCommand ( ) ;
pgstat_report_stat ( false ) ;
pgstat_report_stat ( false ) ;
}
}
@ -622,6 +641,11 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
}
}
}
}
/* Close table if opened */
if ( rel )
table_close ( rel , NoLock ) ;
if ( started_tx )
if ( started_tx )
{
{
/*
/*
@ -1413,7 +1437,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
UpdateSubscriptionRelState ( MyLogicalRepWorker - > subid ,
UpdateSubscriptionRelState ( MyLogicalRepWorker - > subid ,
MyLogicalRepWorker - > relid ,
MyLogicalRepWorker - > relid ,
MyLogicalRepWorker - > relstate ,
MyLogicalRepWorker - > relstate ,
MyLogicalRepWorker - > relstate_lsn ) ;
MyLogicalRepWorker - > relstate_lsn ,
false ) ;
CommitTransactionCommand ( ) ;
CommitTransactionCommand ( ) ;
pgstat_report_stat ( true ) ;
pgstat_report_stat ( true ) ;
@ -1546,7 +1571,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
UpdateSubscriptionRelState ( MyLogicalRepWorker - > subid ,
UpdateSubscriptionRelState ( MyLogicalRepWorker - > subid ,
MyLogicalRepWorker - > relid ,
MyLogicalRepWorker - > relid ,
SUBREL_STATE_FINISHEDCOPY ,
SUBREL_STATE_FINISHEDCOPY ,
MyLogicalRepWorker - > relstate_lsn ) ;
MyLogicalRepWorker - > relstate_lsn ,
false ) ;
CommitTransactionCommand ( ) ;
CommitTransactionCommand ( ) ;