@ -181,6 +181,15 @@
* pg_subscription . subretentionactive is updated to false within a new
* transaction , and oldest_nonremovable_xid is set to InvalidTransactionId .
*
* - RDT_RESUME_CONFLICT_INFO_RETENTION :
* This phase is required only when max_retention_duration is defined . We
* enter this phase if the retention was previously stopped , and the time
* required to advance the non - removable transaction ID in the
* RDT_WAIT_FOR_LOCAL_FLUSH phase has decreased to within acceptable limits
* ( or if max_retention_duration is set to 0 ) . During this phase ,
* pg_subscription . subretentionactive is updated to true within a new
* transaction , and the worker will be restarted .
*
* The overall state progression is : GET_CANDIDATE_XID - >
* REQUEST_PUBLISHER_STATUS - > WAIT_FOR_PUBLISHER_STATUS - > ( loop to
* REQUEST_PUBLISHER_STATUS till concurrent remote transactions end ) - >
@ -381,7 +390,8 @@ typedef enum
RDT_REQUEST_PUBLISHER_STATUS ,
RDT_WAIT_FOR_PUBLISHER_STATUS ,
RDT_WAIT_FOR_LOCAL_FLUSH ,
RDT_STOP_CONFLICT_INFO_RETENTION
RDT_STOP_CONFLICT_INFO_RETENTION ,
RDT_RESUME_CONFLICT_INFO_RETENTION ,
} RetainDeadTuplesPhase ;
/*
@ -568,10 +578,14 @@ static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
static void wait_for_local_flush ( RetainDeadTuplesData * rdt_data ) ;
static bool should_stop_conflict_info_retention ( RetainDeadTuplesData * rdt_data ) ;
static void stop_conflict_info_retention ( RetainDeadTuplesData * rdt_data ) ;
static void resume_conflict_info_retention ( RetainDeadTuplesData * rdt_data ) ;
static bool update_retention_status ( bool active ) ;
static void reset_retention_data_fields ( RetainDeadTuplesData * rdt_data ) ;
static void adjust_xid_advance_interval ( RetainDeadTuplesData * rdt_data ,
bool new_xid_found ) ;
static void apply_worker_exit ( void ) ;
static void apply_handle_commit_internal ( LogicalRepCommitData * commit_data ) ;
static void apply_handle_insert_internal ( ApplyExecutionData * edata ,
ResultRelInfo * relinfo ,
@ -4367,10 +4381,6 @@ can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
if ( ! MySubscription - > retaindeadtuples )
return false ;
/* No need to advance if we have already stopped retaining */
if ( ! MySubscription - > retentionactive )
return false ;
return true ;
}
@ -4399,6 +4409,9 @@ process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
case RDT_STOP_CONFLICT_INFO_RETENTION :
stop_conflict_info_retention ( rdt_data ) ;
break ;
case RDT_RESUME_CONFLICT_INFO_RETENTION :
resume_conflict_info_retention ( rdt_data ) ;
break ;
}
}
@ -4522,7 +4535,10 @@ wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
* retaining conflict information for this worker .
*/
if ( should_stop_conflict_info_retention ( rdt_data ) )
{
rdt_data - > phase = RDT_STOP_CONFLICT_INFO_RETENTION ;
return ;
}
if ( ! FullTransactionIdIsValid ( rdt_data - > remote_wait_for ) )
rdt_data - > remote_wait_for = rdt_data - > remote_nextxid ;
@ -4643,7 +4659,10 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
* retaining conflict information for this worker .
*/
if ( should_stop_conflict_info_retention ( rdt_data ) )
{
rdt_data - > phase = RDT_STOP_CONFLICT_INFO_RETENTION ;
return ;
}
/*
* Update and check the remote flush position if we are applying changes
@ -4672,6 +4691,21 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
if ( last_flushpos < rdt_data - > remote_lsn )
return ;
/*
* Reaching this point implies should_stop_conflict_info_retention ( )
* returned false earlier , meaning that the most recent duration for
* advancing the non - removable transaction ID is within the
* max_retention_duration or max_retention_duration is set to 0.
*
* Therefore , if conflict info retention was previously stopped due to a
* timeout , it is now safe to resume retention .
*/
if ( ! MySubscription - > retentionactive )
{
rdt_data - > phase = RDT_RESUME_CONFLICT_INFO_RETENTION ;
return ;
}
/*
* Reaching here means the remote WAL position has been received , and all
* transactions up to that position on the publisher have been applied and
@ -4698,13 +4732,7 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
* Check whether conflict information retention should be stopped due to
* exceeding the maximum wait time ( max_retention_duration ) .
*
* If retention should be stopped , transition to the
* RDT_STOP_CONFLICT_INFO_RETENTION phase and return true . Otherwise , return
* false .
*
* Note : Retention won ' t be resumed automatically . The user must manually
* disable retain_dead_tuples and re - enable it after confirming that the
* replication slot maintained by the launcher has been dropped .
* If retention should be stopped , return true . Otherwise , return false .
*/
static bool
should_stop_conflict_info_retention ( RetainDeadTuplesData * rdt_data )
@ -4735,11 +4763,6 @@ should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
rdt_data - > table_sync_wait_time ) )
return false ;
rdt_data - > phase = RDT_STOP_CONFLICT_INFO_RETENTION ;
/* process the next phase */
process_rdt_phase_transition ( rdt_data , false ) ;
return true ;
}
@ -4748,6 +4771,86 @@ should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
*/
static void
stop_conflict_info_retention ( RetainDeadTuplesData * rdt_data )
{
/* Stop retention if not yet */
if ( MySubscription - > retentionactive )
{
/*
* If the retention status cannot be updated ( e . g . , due to active
* transaction ) , skip further processing to avoid inconsistent
* retention behavior .
*/
if ( ! update_retention_status ( false ) )
return ;
SpinLockAcquire ( & MyLogicalRepWorker - > relmutex ) ;
MyLogicalRepWorker - > oldest_nonremovable_xid = InvalidTransactionId ;
SpinLockRelease ( & MyLogicalRepWorker - > relmutex ) ;
ereport ( LOG ,
errmsg ( " logical replication worker for subscription \" %s \" has stopped retaining the information for detecting conflicts " ,
MySubscription - > name ) ,
errdetail ( " Retention is stopped as the apply process is not advancing its xmin within the configured max_retention_duration of %u ms. " ,
MySubscription - > maxretention ) ) ;
}
Assert ( ! TransactionIdIsValid ( MyLogicalRepWorker - > oldest_nonremovable_xid ) ) ;
/*
* If retention has been stopped , reset to the initial phase to retry
* resuming retention . This reset is required to recalculate the current
* wait time and resume retention if the time falls within
* max_retention_duration .
*/
reset_retention_data_fields ( rdt_data ) ;
}
/*
* Workhorse for the RDT_RESUME_CONFLICT_INFO_RETENTION phase .
*/
static void
resume_conflict_info_retention ( RetainDeadTuplesData * rdt_data )
{
/* We can't resume retention without updating retention status. */
if ( ! update_retention_status ( true ) )
return ;
ereport ( LOG ,
errmsg ( " logical replication worker for subscription \" %s \" will resume retaining the information for detecting conflicts " ,
MySubscription - > name ) ,
MySubscription - > maxretention
? errdetail ( " Retention is re-enabled as the apply process is advancing its xmin within the configured max_retention_duration of %u ms. " ,
MySubscription - > maxretention )
: errdetail ( " Retention is re-enabled as max_retention_duration is set to unlimited. " ) ) ;
/*
* Restart the worker to let the launcher initialize
* oldest_nonremovable_xid at startup .
*
* While it ' s technically possible to derive this value on - the - fly using
* the conflict detection slot ' s xmin , doing so risks a race condition :
* the launcher might clean slot . xmin just after retention resumes . This
* would make oldest_nonremovable_xid unreliable , especially during xid
* wraparound .
*
* Although this can be prevented by introducing heavy weight locking , the
* complexity it will bring doesn ' t seem worthwhile given how rarely
* retention is resumed .
*/
apply_worker_exit ( ) ;
}
/*
* Updates pg_subscription . subretentionactive to the given value within a
* new transaction .
*
* If already inside an active transaction , skips the update and returns
* false .
*
* Returns true if the update is successfully performed .
*/
static bool
update_retention_status ( bool active )
{
/*
* Do not update the catalog during an active transaction . The transaction
@ -4755,7 +4858,7 @@ stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
* rollback of catalog updates if the application fails subsequently .
*/
if ( IsTransactionState ( ) )
return ;
return false ;
StartTransactionCommand ( ) ;
@ -4765,26 +4868,18 @@ stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
*/
PushActiveSnapshot ( GetTransactionSnapshot ( ) ) ;
/* Set pg_subscription.subretentionactive to fals e */
UpdateDeadTupleRetentionStatus ( MySubscription - > oid , fals e) ;
/* Update pg_subscription.subretentionactiv e */
UpdateDeadTupleRetentionStatus ( MySubscription - > oid , activ e) ;
PopActiveSnapshot ( ) ;
CommitTransactionCommand ( ) ;
SpinLockAcquire ( & MyLogicalRepWorker - > relmutex ) ;
MyLogicalRepWorker - > oldest_nonremovable_xid = InvalidTransactionId ;
SpinLockRelease ( & MyLogicalRepWorker - > relmutex ) ;
ereport ( LOG ,
errmsg ( " logical replication worker for subscription \" %s \" has stopped retaining the information for detecting conflicts " ,
MySubscription - > name ) ,
errdetail ( " Retention of information used for conflict detection has exceeded max_retention_duration of %u ms. " ,
MySubscription - > maxretention ) ) ;
/* Notify launcher to update the conflict slot */
ApplyLauncherWakeup ( ) ;
reset_retention_data_fields ( rdt_data ) ;
MySubscription - > retentionactive = active ;
return true ;
}
/*
@ -4809,19 +4904,20 @@ reset_retention_data_fields(RetainDeadTuplesData *rdt_data)
/*
* Adjust the interval for advancing non - removable transaction IDs .
*
* If there is no activity on the node , we progressively double the interval
* used to advance non - removable transaction ID . This helps conserve CPU
* and network resources when there ' s little benefit to frequent updates .
* If there is no activity on the node or retention has been stopped , we
* progressively double the interval used to advance non - removable transaction
* ID . This helps conserve CPU and network resources when there ' s little benefit
* to frequent updates .
*
* The interval is capped by the lowest of the following :
* - wal_receiver_status_interval ( if set ) ,
* - wal_receiver_status_interval ( if set and retention is active ) ,
* - a default maximum of 3 minutes ,
* - max_retention_duration .
* - max_retention_duration ( if retention is active ) .
*
* This ensures the interval never exceeds the retention boundary , even if
* other limits are higher . Once activity resumes on the node , the interval
* is reset to lesser of 100 ms and max_retention_duration , allowing timely
* advancement of non - removable transaction ID .
* This ensures the interval never exceeds the retention boundary , even if other
* limits are higher . Once activity resumes on the node and the retention is
* active , the interval is reset to lesser of 100 ms and max_retention_duration ,
* allowing timely a dvancement of non - removable transaction ID .
*
* XXX The use of wal_receiver_status_interval is a bit arbitrary so we can
* consider the other interval or a separate GUC if the need arises .
@ -4829,7 +4925,7 @@ reset_retention_data_fields(RetainDeadTuplesData *rdt_data)
static void
adjust_xid_advance_interval ( RetainDeadTuplesData * rdt_data , bool new_xid_found )
{
if ( ! new_xid_found & & rdt_data - > xid_advance_interval )
if ( rdt_data - > xid_advance_interval & & ! new_xid_found )
{
int max_interval = wal_receiver_status_interval
? wal_receiver_status_interval * 1000
@ -4842,6 +4938,18 @@ adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
rdt_data - > xid_advance_interval = Min ( rdt_data - > xid_advance_interval * 2 ,
max_interval ) ;
}
else if ( rdt_data - > xid_advance_interval & &
! MySubscription - > retentionactive )
{
/*
* Retention has been stopped , so double the interval - capped at a
* maximum of 3 minutes . The wal_receiver_status_interval is
* intentionally not used as a upper bound , since the likelihood of
* retention resuming is lower than that of general activity resuming .
*/
rdt_data - > xid_advance_interval = Min ( rdt_data - > xid_advance_interval * 2 ,
MAX_XID_ADVANCE_INTERVAL ) ;
}
else
{
/*
@ -4851,7 +4959,11 @@ adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
rdt_data - > xid_advance_interval = MIN_XID_ADVANCE_INTERVAL ;
}
/* Ensure the wait time remains within the maximum limit */
/*
* Ensure the wait time remains within the maximum retention time limit
* when retention is active .
*/
if ( MySubscription - > retentionactive )
rdt_data - > xid_advance_interval = Min ( rdt_data - > xid_advance_interval ,
MySubscription - > maxretention ) ;
}