@ -303,7 +303,8 @@ logicalrep_workers_find(Oid subid, bool only_running)
* Returns true on success , false on failure .
*/
bool
logicalrep_worker_launch ( Oid dbid , Oid subid , const char * subname , Oid userid ,
logicalrep_worker_launch ( LogicalRepWorkerType wtype ,
Oid dbid , Oid subid , const char * subname , Oid userid ,
Oid relid , dsm_handle subworker_dsm )
{
BackgroundWorker bgw ;
@ -315,10 +316,18 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
int nsyncworkers ;
int nparallelapplyworkers ;
TimestampTz now ;
bool is_parallel_apply_worker = ( subworker_dsm ! = DSM_HANDLE_INVALID ) ;
/* Sanity check - tablesync worker cannot be a subworker */
Assert ( ! ( is_parallel_apply_worker & & OidIsValid ( relid ) ) ) ;
bool is_tablesync_worker = ( wtype = = WORKERTYPE_TABLESYNC ) ;
bool is_parallel_apply_worker = ( wtype = = WORKERTYPE_PARALLEL_APPLY ) ;
/*----------
* Sanity checks :
* - must be valid worker type
* - tablesync workers are only ones to have relid
* - parallel apply worker is the only kind of subworker
*/
Assert ( wtype ! = WORKERTYPE_UNKNOWN ) ;
Assert ( is_tablesync_worker = = OidIsValid ( relid ) ) ;
Assert ( is_parallel_apply_worker = = ( subworker_dsm ! = DSM_HANDLE_INVALID ) ) ;
ereport ( DEBUG1 ,
( errmsg_internal ( " starting logical replication worker for subscription \" %s \" " ,
@ -393,7 +402,7 @@ retry:
* sync worker limit per subscription . So , just return silently as we
* might get here because of an otherwise harmless race condition .
*/
if ( OidIsValid ( relid ) & & nsyncworkers > = max_sync_workers_per_subscription )
if ( is_tablesync_worker & & nsyncworkers > = max_sync_workers_per_subscription )
{
LWLockRelease ( LogicalRepWorkerLock ) ;
return false ;
@ -427,6 +436,7 @@ retry:
}
/* Prepare the worker slot. */
worker - > type = wtype ;
worker - > launch_time = now ;
worker - > in_use = true ;
worker - > generation + + ;
@ -466,7 +476,7 @@ retry:
subid ) ;
snprintf ( bgw . bgw_type , BGW_MAXLEN , " logical replication parallel worker " ) ;
}
else if ( OidIsValid ( relid ) )
else if ( is_tablesync_worker )
{
snprintf ( bgw . bgw_function_name , BGW_MAXLEN , " TablesyncWorkerMain " ) ;
snprintf ( bgw . bgw_name , BGW_MAXLEN ,
@ -847,7 +857,7 @@ logicalrep_sync_worker_count(Oid subid)
{
LogicalRepWorker * w = & LogicalRepCtx - > workers [ i ] ;
if ( w - > subid = = subid & & OidIsValid ( w - > relid ) )
if ( w - > subid = = subid & & isTablesyncWorker ( w ) )
res + + ;
}
@ -1180,7 +1190,8 @@ ApplyLauncherMain(Datum main_arg)
( elapsed = TimestampDifferenceMilliseconds ( last_start , now ) ) > = wal_retrieve_retry_interval )
{
ApplyLauncherSetWorkerStartTime ( sub - > oid , now ) ;
logicalrep_worker_launch ( sub - > dbid , sub - > oid , sub - > name ,
logicalrep_worker_launch ( WORKERTYPE_APPLY ,
sub - > dbid , sub - > oid , sub - > name ,
sub - > owner , InvalidOid ,
DSM_HANDLE_INVALID ) ;
}
@ -1290,7 +1301,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
worker_pid = worker . proc - > pid ;
values [ 0 ] = ObjectIdGetDatum ( worker . subid ) ;
if ( OidIsValid ( worker . relid ) )
if ( isTablesyncWorker ( & worker ) )
values [ 1 ] = ObjectIdGetDatum ( worker . relid ) ;
else
nulls [ 1 ] = true ;