@ -38,6 +38,7 @@
# include "replication/logicallauncher.h"
# include "replication/logicalworker.h"
# include "replication/slot.h"
# include "replication/walreceiver.h"
# include "replication/worker_internal.h"
# include "storage/ipc.h"
@ -76,6 +77,7 @@ static void ApplyLauncherWakeup(void);
static void logicalrep_launcher_onexit ( int code , Datum arg ) ;
static void logicalrep_worker_onexit ( int code , Datum arg ) ;
static void logicalrep_worker_detach ( void ) ;
static void logicalrep_worker_cleanup ( LogicalRepWorker * worker ) ;
/* Flags set by signal handlers */
volatile sig_atomic_t got_SIGHUP = false ;
@ -154,15 +156,19 @@ get_subscription_list(void)
/*
* Wait for a background worker to start up and attach to the shmem context .
*
* This is like WaitForBackgroundWorkerStartup ( ) , except that we wait fo r
* attaching , not just start and we also just exit if postmaster died .
* This is only needed for cleaning up the shared memory in case the worke r
* fails to attach .
*/
static bool
static void
WaitForReplicationWorkerAttach ( LogicalRepWorker * worker ,
BackgroundWorkerHandle * handle )
{
BgwHandleStatus status ;
int rc ;
uint16 generation ;
/* Remember generation for future identification. */
generation = worker - > generation ;
for ( ; ; )
{
@ -170,18 +176,29 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
CHECK_FOR_INTERRUPTS ( ) ;
LWLockAcquire ( LogicalRepWorkerLock , LW_SHARED ) ;
/* Worker either died or has started; no need to do anything. */
if ( ! worker - > in_use | | worker - > proc )
{
LWLockRelease ( LogicalRepWorkerLock ) ;
return ;
}
LWLockRelease ( LogicalRepWorkerLock ) ;
/* Check if worker has died before attaching, and clean up after it. */
status = GetBackgroundWorkerPid ( handle , & pid ) ;
/*
* Worker started and attached to our shmem . This check is safe
* because only launcher ever starts the workers , so nobody can steal
* the worker slot .
*/
if ( status = = BGWH_STARTED & & worker - > proc )
return true ;
/* Worker didn't start or died before attaching to our shmem. */
if ( status = = BGWH_STOPPED )
return false ;
{
LWLockAcquire ( LogicalRepWorkerLock , LW_EXCLUSIVE ) ;
/* Ensure that this was indeed the worker we waited for. */
if ( generation = = worker - > generation )
logicalrep_worker_cleanup ( worker ) ;
LWLockRelease ( LogicalRepWorkerLock ) ;
return ;
}
/*
* We need timeout because we generally don ' t get notified via latch
@ -197,7 +214,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
ResetLatch ( MyLatch ) ;
}
return false ;
return ;
}
/*
@ -216,8 +233,8 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
for ( i = 0 ; i < max_logical_replication_workers ; i + + )
{
LogicalRepWorker * w = & LogicalRepCtx - > workers [ i ] ;
if ( w - > subid = = subid & & w - > relid = = relid & &
( ! only_running | | ( w - > proc & & IsBackendPid ( w - > proc - > pid ) ) ) )
if ( w - > in_use & & w - > subid = = subid & & w - > relid = = relid & &
( ! only_running | | w - > proc ) )
{
res = w ;
break ;
@ -236,8 +253,11 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
{
BackgroundWorker bgw ;
BackgroundWorkerHandle * bgw_handle ;
int i ;
int slot ;
LogicalRepWorker * worker = NULL ;
int nsyncworkers ;
TimestampTz now ;
ereport ( LOG ,
( errmsg ( " starting logical replication worker for subscription \" %s \" " ,
@ -255,17 +275,73 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
*/
LWLockAcquire ( LogicalRepWorkerLock , LW_EXCLUSIVE ) ;
retry :
/* Find unused worker slot. */
for ( slot = 0 ; slot < max_logical_replication_workers ; slot + + )
for ( i = 0 ; i < max_logical_replication_workers ; i + + )
{
if ( ! LogicalRepCtx - > workers [ slot ] . proc )
LogicalRepWorker * w = & LogicalRepCtx - > workers [ i ] ;
if ( ! w - > in_use )
{
worker = & LogicalRepCtx - > workers [ slot ] ;
worker = w ;
slot = i ;
break ;
}
}
/* Bail if not found */
nsyncworkers = logicalrep_sync_worker_count ( subid ) ;
now = GetCurrentTimestamp ( ) ;
/*
* If we didn ' t find a free slot , try to do garbage collection . The
* reason we do this is because if some worker failed to start up and its
* parent has crashed while waiting , the in_use state was never cleared .
*/
if ( worker = = NULL | | nsyncworkers > = max_sync_workers_per_subscription )
{
bool did_cleanup = false ;
for ( i = 0 ; i < max_logical_replication_workers ; i + + )
{
LogicalRepWorker * w = & LogicalRepCtx - > workers [ i ] ;
/*
* If the worker was marked in use but didn ' t manage to attach in
* time , clean it up .
*/
if ( w - > in_use & & ! w - > proc & &
TimestampDifferenceExceeds ( w - > launch_time , now ,
wal_receiver_timeout ) )
{
elog ( WARNING ,
" logical replication worker for subscription \" %d \" took too long to start; canceled " ,
worker - > subid ) ;
logicalrep_worker_cleanup ( w ) ;
did_cleanup = true ;
}
}
if ( did_cleanup )
goto retry ;
}
/*
* If we reached the sync worker limit per subscription , just exit
* silently as we might get here because of an otherwise harmless race
* condition .
*/
if ( nsyncworkers > = max_sync_workers_per_subscription )
{
LWLockRelease ( LogicalRepWorkerLock ) ;
return ;
}
/*
* However if there are no more free worker slots , inform user about it
* before exiting .
*/
if ( worker = = NULL )
{
LWLockRelease ( LogicalRepWorkerLock ) ;
@ -276,7 +352,10 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
return ;
}
/* Prepare the worker info. */
/* Prepare the worker slot. */
worker - > launch_time = now ;
worker - > in_use = true ;
worker - > generation + + ;
worker - > proc = NULL ;
worker - > dbid = dbid ;
worker - > userid = userid ;
@ -331,6 +410,7 @@ void
logicalrep_worker_stop ( Oid subid , Oid relid )
{
LogicalRepWorker * worker ;
uint16 generation ;
LWLockAcquire ( LogicalRepWorkerLock , LW_SHARED ) ;
@ -343,11 +423,17 @@ logicalrep_worker_stop(Oid subid, Oid relid)
return ;
}
/*
* Remember which generation was our worker so we can check if what we see
* is still the same one .
*/
generation = worker - > generation ;
/*
* If we found worker but it does not have proc set it is starting up ,
* wait for it to finish and then kill it .
*/
while ( worker & & ! worker - > proc )
while ( worker - > in_use & & ! worker - > proc )
{
int rc ;
@ -370,10 +456,11 @@ logicalrep_worker_stop(Oid subid, Oid relid)
LWLockAcquire ( LogicalRepWorkerLock , LW_SHARED ) ;
/*
* Worker is no longer associated with subscription . It must have
* exited , nothing more for us to do .
* Check whether the worker slot is no longer used , which would mean
* that the worker has exited , or whether the worker generation is
* different , meaning that a different worker has taken the slot .
*/
if ( worker - > subid = = InvalidOid )
if ( ! worker - > in_use | | worker - > generation ! = generation )
{
LWLockRelease ( LogicalRepWorkerLock ) ;
return ;
@ -394,7 +481,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
int rc ;
LWLockAcquire ( LogicalRepWorkerLock , LW_SHARED ) ;
if ( ! worker - > proc )
if ( ! worker - > proc | | worker - > generation ! = generation )
{
LWLockRelease ( LogicalRepWorkerLock ) ;
break ;
@ -453,11 +540,23 @@ logicalrep_worker_attach(int slot)
Assert ( slot > = 0 & & slot < max_logical_replication_workers ) ;
MyLogicalRepWorker = & LogicalRepCtx - > workers [ slot ] ;
if ( ! MyLogicalRepWorker - > in_use )
{
LWLockRelease ( LogicalRepWorkerLock ) ;
ereport ( ERROR ,
( errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " logical replication worker slot %d is empty, cannot attach " ,
slot ) ) ) ;
}
if ( MyLogicalRepWorker - > proc )
{
LWLockRelease ( LogicalRepWorkerLock ) ;
ereport ( ERROR ,
( errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " logical replication worker slot %d already used by "
" another worker " , slot ) ) ) ;
errmsg ( " logical replication worker slot %d is already used by "
" another worker, cannot attach " , slot ) ) ) ;
}
MyLogicalRepWorker - > proc = MyProc ;
before_shmem_exit ( logicalrep_worker_onexit , ( Datum ) 0 ) ;
@ -474,14 +573,27 @@ logicalrep_worker_detach(void)
/* Block concurrent access. */
LWLockAcquire ( LogicalRepWorkerLock , LW_EXCLUSIVE ) ;
MyLogicalRepWorker - > dbid = InvalidOid ;
MyLogicalRepWorker - > userid = InvalidOid ;
MyLogicalRepWorker - > subid = InvalidOid ;
MyLogicalRepWorker - > proc = NULL ;
logicalrep_worker_cleanup ( MyLogicalRepWorker ) ;
LWLockRelease ( LogicalRepWorkerLock ) ;
}
/*
* Clean up worker info .
*/
static void
logicalrep_worker_cleanup ( LogicalRepWorker * worker )
{
Assert ( LWLockHeldByMeInMode ( LogicalRepWorkerLock , LW_EXCLUSIVE ) ) ;
worker - > in_use = false ;
worker - > proc = NULL ;
worker - > dbid = InvalidOid ;
worker - > userid = InvalidOid ;
worker - > subid = InvalidOid ;
worker - > relid = InvalidOid ;
}
/*
* Cleanup function for logical replication launcher .
*
@ -732,12 +844,11 @@ ApplyLauncherMain(Datum main_arg)
if ( sub - > enabled & & w = = NULL )
{
logicalrep_worker_launch ( sub - > dbid , sub - > oid , sub - > name ,
sub - > owner , InvalidOid ) ;
last_start_time = now ;
wait_time = wal_retrieve_retry_interval ;
/* Limit to one worker per mainloop cycle. */
break ;
logicalrep_worker_launch ( sub - > dbid , sub - > oid , sub - > name ,
sub - > owner , InvalidOid ) ;
}
}