@ -55,6 +55,7 @@
/* GUC variables */
/* GUC variables */
int max_logical_replication_workers = 4 ;
int max_logical_replication_workers = 4 ;
int max_sync_workers_per_subscription = 2 ;
int max_sync_workers_per_subscription = 2 ;
int max_parallel_apply_workers_per_subscription = 2 ;
LogicalRepWorker * MyLogicalRepWorker = NULL ;
LogicalRepWorker * MyLogicalRepWorker = NULL ;
@ -74,6 +75,7 @@ static void logicalrep_launcher_onexit(int code, Datum arg);
static void logicalrep_worker_onexit ( int code , Datum arg ) ;
static void logicalrep_worker_onexit ( int code , Datum arg ) ;
static void logicalrep_worker_detach ( void ) ;
static void logicalrep_worker_detach ( void ) ;
static void logicalrep_worker_cleanup ( LogicalRepWorker * worker ) ;
static void logicalrep_worker_cleanup ( LogicalRepWorker * worker ) ;
static int logicalrep_pa_worker_count ( Oid subid ) ;
static bool on_commit_launcher_wakeup = false ;
static bool on_commit_launcher_wakeup = false ;
@ -152,8 +154,10 @@ get_subscription_list(void)
*
*
* This is only needed for cleaning up the shared memory in case the worker
* This is only needed for cleaning up the shared memory in case the worker
* fails to attach .
* fails to attach .
*
* Returns whether the attach was successful .
*/
*/
static void
static bool
WaitForReplicationWorkerAttach ( LogicalRepWorker * worker ,
WaitForReplicationWorkerAttach ( LogicalRepWorker * worker ,
uint16 generation ,
uint16 generation ,
BackgroundWorkerHandle * handle )
BackgroundWorkerHandle * handle )
@ -169,11 +173,11 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
LWLockAcquire ( LogicalRepWorkerLock , LW_SHARED ) ;
LWLockAcquire ( LogicalRepWorkerLock , LW_SHARED ) ;
/* Worker either died or has started; no need to do anything . */
/* Worker either died or has started. Return false if died . */
if ( ! worker - > in_use | | worker - > proc )
if ( ! worker - > in_use | | worker - > proc )
{
{
LWLockRelease ( LogicalRepWorkerLock ) ;
LWLockRelease ( LogicalRepWorkerLock ) ;
return ;
return worker - > in_use ;
}
}
LWLockRelease ( LogicalRepWorkerLock ) ;
LWLockRelease ( LogicalRepWorkerLock ) ;
@ -188,7 +192,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
if ( generation = = worker - > generation )
if ( generation = = worker - > generation )
logicalrep_worker_cleanup ( worker ) ;
logicalrep_worker_cleanup ( worker ) ;
LWLockRelease ( LogicalRepWorkerLock ) ;
LWLockRelease ( LogicalRepWorkerLock ) ;
return ;
return false ;
}
}
/*
/*
@ -210,6 +214,8 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
/*
/*
* Walks the workers array and searches for one that matches given
* Walks the workers array and searches for one that matches given
* subscription id and relid .
* subscription id and relid .
*
* We are only interested in the leader apply worker or table sync worker .
*/
*/
LogicalRepWorker *
LogicalRepWorker *
logicalrep_worker_find ( Oid subid , Oid relid , bool only_running )
logicalrep_worker_find ( Oid subid , Oid relid , bool only_running )
@ -224,6 +230,10 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
{
{
LogicalRepWorker * w = & LogicalRepCtx - > workers [ i ] ;
LogicalRepWorker * w = & LogicalRepCtx - > workers [ i ] ;
/* Skip parallel apply workers. */
if ( isParallelApplyWorker ( w ) )
continue ;
if ( w - > in_use & & w - > subid = = subid & & w - > relid = = relid & &
if ( w - > in_use & & w - > subid = = subid & & w - > relid = = relid & &
( ! only_running | | w - > proc ) )
( ! only_running | | w - > proc ) )
{
{
@ -260,11 +270,13 @@ logicalrep_workers_find(Oid subid, bool only_running)
}
}
/*
/*
* Start new apply background worker , if possible .
* Start new logical replication background worker , if possible .
*
* Returns true on success , false on failure .
*/
*/
void
bool
logicalrep_worker_launch ( Oid dbid , Oid subid , const char * subname , Oid userid ,
logicalrep_worker_launch ( Oid dbid , Oid subid , const char * subname , Oid userid ,
Oid relid )
Oid relid , dsm_handle subworker_dsm )
{
{
BackgroundWorker bgw ;
BackgroundWorker bgw ;
BackgroundWorkerHandle * bgw_handle ;
BackgroundWorkerHandle * bgw_handle ;
@ -273,7 +285,12 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
int slot = 0 ;
int slot = 0 ;
LogicalRepWorker * worker = NULL ;
LogicalRepWorker * worker = NULL ;
int nsyncworkers ;
int nsyncworkers ;
int nparallelapplyworkers ;
TimestampTz now ;
TimestampTz now ;
bool is_parallel_apply_worker = ( subworker_dsm ! = DSM_HANDLE_INVALID ) ;
/* Sanity check - tablesync worker cannot be a subworker */
Assert ( ! ( is_parallel_apply_worker & & OidIsValid ( relid ) ) ) ;
ereport ( DEBUG1 ,
ereport ( DEBUG1 ,
( errmsg_internal ( " starting logical replication worker for subscription \" %s \" " ,
( errmsg_internal ( " starting logical replication worker for subscription \" %s \" " ,
@ -351,7 +368,20 @@ retry:
if ( OidIsValid ( relid ) & & nsyncworkers > = max_sync_workers_per_subscription )
if ( OidIsValid ( relid ) & & nsyncworkers > = max_sync_workers_per_subscription )
{
{
LWLockRelease ( LogicalRepWorkerLock ) ;
LWLockRelease ( LogicalRepWorkerLock ) ;
return ;
return false ;
}
nparallelapplyworkers = logicalrep_pa_worker_count ( subid ) ;
/*
* Return false if the number of parallel apply workers reached the limit
* per subscription .
*/
if ( is_parallel_apply_worker & &
nparallelapplyworkers > = max_parallel_apply_workers_per_subscription )
{
LWLockRelease ( LogicalRepWorkerLock ) ;
return false ;
}
}
/*
/*
@ -365,7 +395,7 @@ retry:
( errcode ( ERRCODE_CONFIGURATION_LIMIT_EXCEEDED ) ,
( errcode ( ERRCODE_CONFIGURATION_LIMIT_EXCEEDED ) ,
errmsg ( " out of logical replication worker slots " ) ,
errmsg ( " out of logical replication worker slots " ) ,
errhint ( " You might need to increase max_logical_replication_workers. " ) ) ) ;
errhint ( " You might need to increase max_logical_replication_workers. " ) ) ) ;
return ;
return false ;
}
}
/* Prepare the worker slot. */
/* Prepare the worker slot. */
@ -380,6 +410,8 @@ retry:
worker - > relstate = SUBREL_STATE_UNKNOWN ;
worker - > relstate = SUBREL_STATE_UNKNOWN ;
worker - > relstate_lsn = InvalidXLogRecPtr ;
worker - > relstate_lsn = InvalidXLogRecPtr ;
worker - > stream_fileset = NULL ;
worker - > stream_fileset = NULL ;
worker - > apply_leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid ;
worker - > parallel_apply = is_parallel_apply_worker ;
worker - > last_lsn = InvalidXLogRecPtr ;
worker - > last_lsn = InvalidXLogRecPtr ;
TIMESTAMP_NOBEGIN ( worker - > last_send_time ) ;
TIMESTAMP_NOBEGIN ( worker - > last_send_time ) ;
TIMESTAMP_NOBEGIN ( worker - > last_recv_time ) ;
TIMESTAMP_NOBEGIN ( worker - > last_recv_time ) ;
@ -397,19 +429,34 @@ retry:
BGWORKER_BACKEND_DATABASE_CONNECTION ;
BGWORKER_BACKEND_DATABASE_CONNECTION ;
bgw . bgw_start_time = BgWorkerStart_RecoveryFinished ;
bgw . bgw_start_time = BgWorkerStart_RecoveryFinished ;
snprintf ( bgw . bgw_library_name , BGW_MAXLEN , " postgres " ) ;
snprintf ( bgw . bgw_library_name , BGW_MAXLEN , " postgres " ) ;
if ( is_parallel_apply_worker )
snprintf ( bgw . bgw_function_name , BGW_MAXLEN , " ParallelApplyWorkerMain " ) ;
else
snprintf ( bgw . bgw_function_name , BGW_MAXLEN , " ApplyWorkerMain " ) ;
snprintf ( bgw . bgw_function_name , BGW_MAXLEN , " ApplyWorkerMain " ) ;
if ( OidIsValid ( relid ) )
if ( OidIsValid ( relid ) )
snprintf ( bgw . bgw_name , BGW_MAXLEN ,
snprintf ( bgw . bgw_name , BGW_MAXLEN ,
" logical replication worker for subscription %u sync %u " , subid , relid ) ;
" logical replication worker for subscription %u sync %u " , subid , relid ) ;
else if ( is_parallel_apply_worker )
snprintf ( bgw . bgw_name , BGW_MAXLEN ,
" logical replication parallel apply worker for subscription %u " , subid ) ;
else
else
snprintf ( bgw . bgw_name , BGW_MAXLEN ,
snprintf ( bgw . bgw_name , BGW_MAXLEN ,
" logical replication worker for subscription %u " , subid ) ;
" logical replication apply worker for subscription %u " , subid ) ;
if ( is_parallel_apply_worker )
snprintf ( bgw . bgw_type , BGW_MAXLEN , " logical replication parallel worker " ) ;
else
snprintf ( bgw . bgw_type , BGW_MAXLEN , " logical replication worker " ) ;
snprintf ( bgw . bgw_type , BGW_MAXLEN , " logical replication worker " ) ;
bgw . bgw_restart_time = BGW_NEVER_RESTART ;
bgw . bgw_restart_time = BGW_NEVER_RESTART ;
bgw . bgw_notify_pid = MyProcPid ;
bgw . bgw_notify_pid = MyProcPid ;
bgw . bgw_main_arg = Int32GetDatum ( slot ) ;
bgw . bgw_main_arg = Int32GetDatum ( slot ) ;
if ( is_parallel_apply_worker )
memcpy ( bgw . bgw_extra , & subworker_dsm , sizeof ( dsm_handle ) ) ;
if ( ! RegisterDynamicBackgroundWorker ( & bgw , & bgw_handle ) )
if ( ! RegisterDynamicBackgroundWorker ( & bgw , & bgw_handle ) )
{
{
/* Failed to start worker, so clean up the worker slot. */
/* Failed to start worker, so clean up the worker slot. */
@ -422,33 +469,23 @@ retry:
( errcode ( ERRCODE_CONFIGURATION_LIMIT_EXCEEDED ) ,
( errcode ( ERRCODE_CONFIGURATION_LIMIT_EXCEEDED ) ,
errmsg ( " out of background worker slots " ) ,
errmsg ( " out of background worker slots " ) ,
errhint ( " You might need to increase max_worker_processes. " ) ) ) ;
errhint ( " You might need to increase max_worker_processes. " ) ) ) ;
return ;
return false ;
}
}
/* Now wait until it attaches. */
/* Now wait until it attaches. */
WaitForReplicationWorkerAttach ( worker , generation , bgw_handle ) ;
return WaitForReplicationWorkerAttach ( worker , generation , bgw_handle ) ;
}
}
/*
/*
* Stop the logical replication worker for subid / relid , if any , and wait until
* Internal function to stop the worker and wait until it detaches from the
* it detaches from the slot .
* slot .
*/
*/
void
static void
logicalrep_worker_stop ( Oid subid , Oid relid )
logicalrep_worker_stop_internal ( LogicalRepWorker * worker , int signo )
{
{
LogicalRepWorker * worker ;
uint16 generation ;
uint16 generation ;
LWLockAcquire ( LogicalRepWorkerLock , LW_SHARED ) ;
Assert ( LWLockHeldByMeInMode ( LogicalRepWorkerLock , LW_SHARED ) ) ;
worker = logicalrep_worker_find ( subid , relid , false ) ;
/* No worker, nothing to do. */
if ( ! worker )
{
LWLockRelease ( LogicalRepWorkerLock ) ;
return ;
}
/*
/*
* Remember which generation was our worker so we can check if what we see
* Remember which generation was our worker so we can check if what we see
@ -486,10 +523,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
* different , meaning that a different worker has taken the slot .
* different , meaning that a different worker has taken the slot .
*/
*/
if ( ! worker - > in_use | | worker - > generation ! = generation )
if ( ! worker - > in_use | | worker - > generation ! = generation )
{
LWLockRelease ( LogicalRepWorkerLock ) ;
return ;
return ;
}
/* Worker has assigned proc, so it has started. */
/* Worker has assigned proc, so it has started. */
if ( worker - > proc )
if ( worker - > proc )
@ -497,7 +531,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
}
}
/* Now terminate the worker ... */
/* Now terminate the worker ... */
kill ( worker - > proc - > pid , SIGTERM ) ;
kill ( worker - > proc - > pid , signo ) ;
/* ... and wait for it to die. */
/* ... and wait for it to die. */
for ( ; ; )
for ( ; ; )
@ -523,6 +557,53 @@ logicalrep_worker_stop(Oid subid, Oid relid)
LWLockAcquire ( LogicalRepWorkerLock , LW_SHARED ) ;
LWLockAcquire ( LogicalRepWorkerLock , LW_SHARED ) ;
}
}
}
/*
* Stop the logical replication worker for subid / relid , if any .
*/
void
logicalrep_worker_stop ( Oid subid , Oid relid )
{
LogicalRepWorker * worker ;
LWLockAcquire ( LogicalRepWorkerLock , LW_SHARED ) ;
worker = logicalrep_worker_find ( subid , relid , false ) ;
if ( worker )
{
Assert ( ! isParallelApplyWorker ( worker ) ) ;
logicalrep_worker_stop_internal ( worker , SIGTERM ) ;
}
LWLockRelease ( LogicalRepWorkerLock ) ;
}
/*
* Stop the logical replication parallel apply worker corresponding to the
* input slot number .
*
* Node that the function sends SIGINT instead of SIGTERM to the parallel apply
* worker so that the worker exits cleanly .
*/
void
logicalrep_pa_worker_stop ( int slot_no , uint16 generation )
{
LogicalRepWorker * worker ;
Assert ( slot_no > = 0 & & slot_no < max_logical_replication_workers ) ;
LWLockAcquire ( LogicalRepWorkerLock , LW_SHARED ) ;
worker = & LogicalRepCtx - > workers [ slot_no ] ;
Assert ( isParallelApplyWorker ( worker ) ) ;
/*
* Only stop the worker if the generation matches and the worker is alive .
*/
if ( worker - > generation = = generation & & worker - > proc )
logicalrep_worker_stop_internal ( worker , SIGINT ) ;
LWLockRelease ( LogicalRepWorkerLock ) ;
LWLockRelease ( LogicalRepWorkerLock ) ;
}
}
@ -595,11 +676,40 @@ logicalrep_worker_attach(int slot)
}
}
/*
/*
* Detach the worker ( cleans up the worker info ) .
* Stop the parallel apply workers if any , and detach the leader apply worker
* ( cleans up the worker info ) .
*/
*/
static void
static void
logicalrep_worker_detach ( void )
logicalrep_worker_detach ( void )
{
{
/* Stop the parallel apply workers. */
if ( am_leader_apply_worker ( ) )
{
List * workers ;
ListCell * lc ;
/*
* Detach from the error_mq_handle for all parallel apply workers
* before terminating them . This prevents the leader apply worker from
* receiving the worker termination message and sending it to logs
* when the same is already done by the parallel worker .
*/
pa_detach_all_error_mq ( ) ;
LWLockAcquire ( LogicalRepWorkerLock , LW_SHARED ) ;
workers = logicalrep_workers_find ( MyLogicalRepWorker - > subid , true ) ;
foreach ( lc , workers )
{
LogicalRepWorker * w = ( LogicalRepWorker * ) lfirst ( lc ) ;
if ( isParallelApplyWorker ( w ) )
logicalrep_worker_stop_internal ( w , SIGTERM ) ;
}
LWLockRelease ( LogicalRepWorkerLock ) ;
}
/* Block concurrent access. */
/* Block concurrent access. */
LWLockAcquire ( LogicalRepWorkerLock , LW_EXCLUSIVE ) ;
LWLockAcquire ( LogicalRepWorkerLock , LW_EXCLUSIVE ) ;
@ -622,6 +732,8 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker)
worker - > userid = InvalidOid ;
worker - > userid = InvalidOid ;
worker - > subid = InvalidOid ;
worker - > subid = InvalidOid ;
worker - > relid = InvalidOid ;
worker - > relid = InvalidOid ;
worker - > apply_leader_pid = InvalidPid ;
worker - > parallel_apply = false ;
}
}
/*
/*
@ -653,6 +765,13 @@ logicalrep_worker_onexit(int code, Datum arg)
if ( MyLogicalRepWorker - > stream_fileset ! = NULL )
if ( MyLogicalRepWorker - > stream_fileset ! = NULL )
FileSetDeleteAll ( MyLogicalRepWorker - > stream_fileset ) ;
FileSetDeleteAll ( MyLogicalRepWorker - > stream_fileset ) ;
/*
* Session level locks may be acquired outside of a transaction in
* parallel apply mode and will not be released when the worker
* terminates , so manually release all locks before the worker exits .
*/
LockReleaseAll ( DEFAULT_LOCKMETHOD , true ) ;
ApplyLauncherWakeup ( ) ;
ApplyLauncherWakeup ( ) ;
}
}
@ -680,6 +799,33 @@ logicalrep_sync_worker_count(Oid subid)
return res ;
return res ;
}
}
/*
* Count the number of registered ( but not necessarily running ) parallel apply
* workers for a subscription .
*/
static int
logicalrep_pa_worker_count ( Oid subid )
{
int i ;
int res = 0 ;
Assert ( LWLockHeldByMe ( LogicalRepWorkerLock ) ) ;
/*
* Scan all attached parallel apply workers , only counting those which
* have the given subscription id .
*/
for ( i = 0 ; i < max_logical_replication_workers ; i + + )
{
LogicalRepWorker * w = & LogicalRepCtx - > workers [ i ] ;
if ( w - > subid = = subid & & isParallelApplyWorker ( w ) )
res + + ;
}
return res ;
}
/*
/*
* ApplyLauncherShmemSize
* ApplyLauncherShmemSize
* Compute space needed for replication launcher shared memory
* Compute space needed for replication launcher shared memory
@ -869,7 +1015,7 @@ ApplyLauncherMain(Datum main_arg)
wait_time = wal_retrieve_retry_interval ;
wait_time = wal_retrieve_retry_interval ;
logicalrep_worker_launch ( sub - > dbid , sub - > oid , sub - > name ,
logicalrep_worker_launch ( sub - > dbid , sub - > oid , sub - > name ,
sub - > owner , InvalidOid ) ;
sub - > owner , InvalidOid , DSM_HANDLE_INVALID ) ;
}
}
}
}
@ -952,6 +1098,10 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
if ( OidIsValid ( subid ) & & worker . subid ! = subid )
if ( OidIsValid ( subid ) & & worker . subid ! = subid )
continue ;
continue ;
/* Skip if this is a parallel apply worker */
if ( isParallelApplyWorker ( & worker ) )
continue ;
worker_pid = worker . proc - > pid ;
worker_pid = worker . proc - > pid ;
values [ 0 ] = ObjectIdGetDatum ( worker . subid ) ;
values [ 0 ] = ObjectIdGetDatum ( worker . subid ) ;