@ -415,6 +415,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
static HTAB * last_start_times = NULL ;
ListCell * lc ;
bool started_tx = false ;
bool should_exit = false ;
Assert ( ! IsTransactionState ( ) ) ;
@ -446,28 +447,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
last_start_times = NULL ;
}
/*
* Even when the two_phase mode is requested by the user , it remains as
* ' pending ' until all tablesyncs have reached READY state .
*
* When this happens , we restart the apply worker and ( if the conditions
* are still ok ) then the two_phase tri - state will become ' enabled ' at
* that time .
*
* Note : If the subscription has no tables then leave the state as
* PENDING , which allows ALTER SUBSCRIPTION . . . REFRESH PUBLICATION to
* work .
*/
if ( MySubscription - > twophasestate = = LOGICALREP_TWOPHASE_STATE_PENDING & &
AllTablesyncsReady ( ) )
{
ereport ( LOG ,
( errmsg ( " logical replication apply worker for subscription \" %s \" will restart so that two_phase can be enabled " ,
MySubscription - > name ) ) ) ;
proc_exit ( 0 ) ;
}
/*
* Process all tables that are being synchronized .
*/
@ -619,9 +598,36 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
if ( started_tx )
{
/*
* Even when the two_phase mode is requested by the user , it remains
* as ' pending ' until all tablesyncs have reached READY state .
*
* When this happens , we restart the apply worker and ( if the
* conditions are still ok ) then the two_phase tri - state will become
* ' enabled ' at that time .
*
* Note : If the subscription has no tables then leave the state as
* PENDING , which allows ALTER SUBSCRIPTION . . . REFRESH PUBLICATION to
* work .
*/
if ( MySubscription - > twophasestate = = LOGICALREP_TWOPHASE_STATE_PENDING )
{
CommandCounterIncrement ( ) ; /* make updates visible */
if ( AllTablesyncsReady ( ) )
{
ereport ( LOG ,
( errmsg ( " logical replication apply worker for subscription \" %s \" will restart so that two_phase can be enabled " ,
MySubscription - > name ) ) ) ;
should_exit = true ;
}
}
CommitTransactionCommand ( ) ;
pgstat_report_stat ( true ) ;
}
if ( should_exit )
proc_exit ( 0 ) ;
}
/*
@ -802,6 +808,7 @@ fetch_remote_table_info(char *nspname, char *relname,
TupleTableSlot * tslot ;
Oid attrsRow [ ] = { INT2VECTOROID } ;
StringInfoData pub_names ;
initStringInfo ( & pub_names ) ;
foreach ( lc , MySubscription - > publications )
{