@ -102,16 +102,24 @@ typedef struct
/*
* Lookup table for slot invalidation causes .
*/
const char * const SlotInvalidationCauses [ ] = {
[ RS_INVAL_NONE ] = " none " ,
[ RS_INVAL_WAL_REMOVED ] = " wal_removed " ,
[ RS_INVAL_HORIZON ] = " rows_removed " ,
[ RS_INVAL_WAL_LEVEL ] = " wal_level_insufficient " ,
typedef struct SlotInvalidationCauseMap
{
ReplicationSlotInvalidationCause cause ;
const char * cause_name ;
} SlotInvalidationCauseMap ;
static const SlotInvalidationCauseMap SlotInvalidationCauses [ ] = {
{ RS_INVAL_NONE , " none " } ,
{ RS_INVAL_WAL_REMOVED , " wal_removed " } ,
{ RS_INVAL_HORIZON , " rows_removed " } ,
{ RS_INVAL_WAL_LEVEL , " wal_level_insufficient " } ,
{ RS_INVAL_IDLE_TIMEOUT , " idle_timeout " } ,
} ;
/* Maximum number of invalidation causes */
# define RS_INVAL_MAX_CAUSES RS_INVAL_WAL_LEVEL
/*
* Ensure that the lookup table is up - to - date with the enums defined in
* ReplicationSlotInvalidationCause .
*/
StaticAssertDecl ( lengthof ( SlotInvalidationCauses ) = = ( RS_INVAL_MAX_CAUSES + 1 ) ,
" array length mismatch " ) ;
@ -141,6 +149,12 @@ ReplicationSlot *MyReplicationSlot = NULL;
int max_replication_slots = 10 ; /* the maximum number of replication
* slots */
/*
* Invalidate replication slots that have remained idle longer than this
* duration ; ' 0 ' disables it .
*/
int idle_replication_slot_timeout_mins = 0 ;
/*
* This GUC lists streaming replication standby server slot names that
* logical WAL sender processes will wait for .
@ -575,7 +589,7 @@ retry:
errmsg ( " can no longer access replication slot \" %s \" " ,
NameStr ( s - > data . name ) ) ,
errdetail ( " This replication slot has been invalidated due to \" %s \" . " ,
SlotInvalidationCauses [ s - > data . invalidated ] ) ) ;
GetSlotInvalidationCauseName ( s - > data . invalidated ) ) ) ;
}
/*
@ -592,14 +606,23 @@ retry:
if ( ! nowait )
ConditionVariablePrepareToSleep ( & s - > active_cv ) ;
/*
* It is important to reset the inactive_since under spinlock here to
* avoid race conditions with slot invalidation . See comments related
* to inactive_since in InvalidatePossiblyObsoleteSlot .
*/
SpinLockAcquire ( & s - > mutex ) ;
if ( s - > active_pid = = 0 )
s - > active_pid = MyProcPid ;
active_pid = s - > active_pid ;
ReplicationSlotSetInactiveSince ( s , 0 , false ) ;
SpinLockRelease ( & s - > mutex ) ;
}
else
{
active_pid = MyProcPid ;
ReplicationSlotSetInactiveSince ( s , 0 , true ) ;
}
LWLockRelease ( ReplicationSlotControlLock ) ;
/*
@ -640,11 +663,6 @@ retry:
if ( SlotIsLogical ( s ) )
pgstat_acquire_replslot ( s ) ;
/*
* Reset the time since the slot has become inactive as the slot is active
* now .
*/
ReplicationSlotSetInactiveSince ( s , 0 , true ) ;
if ( am_walsender )
{
@ -1512,12 +1530,14 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
NameData slotname ,
XLogRecPtr restart_lsn ,
XLogRecPtr oldestLSN ,
TransactionId snapshotConflictHorizon )
TransactionId snapshotConflictHorizon ,
long slot_idle_seconds )
{
StringInfoData err_detail ;
bool hint = false ;
StringInfoData err_hint ;
initStringInfo ( & err_detail ) ;
initStringInfo ( & err_hint ) ;
switch ( cause )
{
@ -1525,13 +1545,15 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
{
unsigned long long ex = oldestLSN - restart_lsn ;
hint = true ;
appendStringInfo ( & err_detail ,
ngettext ( " The slot's restart_lsn %X/%X exceeds the limit by %llu byte. " ,
" The slot's restart_lsn %X/%X exceeds the limit by %llu bytes. " ,
ex ) ,
LSN_FORMAT_ARGS ( restart_lsn ) ,
ex ) ;
/* translator: %s is a GUC variable name */
appendStringInfo ( & err_hint , _ ( " You might need to increase \" %s \" . " ) ,
" max_slot_wal_keep_size " ) ;
break ;
}
case RS_INVAL_HORIZON :
@ -1542,6 +1564,21 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
case RS_INVAL_WAL_LEVEL :
appendStringInfoString ( & err_detail , _ ( " Logical decoding on standby requires \" wal_level \" >= \" logical \" on the primary server. " ) ) ;
break ;
case RS_INVAL_IDLE_TIMEOUT :
{
int minutes = slot_idle_seconds / SECS_PER_MINUTE ;
int secs = slot_idle_seconds % SECS_PER_MINUTE ;
/* translator: %s is a GUC variable name */
appendStringInfo ( & err_detail , _ ( " The slot's idle time of %dmin %02ds exceeds the configured \" %s \" duration of %dmin. " ) ,
minutes , secs , " idle_replication_slot_timeout " ,
idle_replication_slot_timeout_mins ) ;
/* translator: %s is a GUC variable name */
appendStringInfo ( & err_hint , _ ( " You might need to increase \" %s \" . " ) ,
" idle_replication_slot_timeout " ) ;
break ;
}
case RS_INVAL_NONE :
pg_unreachable ( ) ;
}
@ -1553,9 +1590,99 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
errmsg ( " invalidating obsolete replication slot \" %s \" " ,
NameStr ( slotname ) ) ,
errdetail_internal ( " %s " , err_detail . data ) ,
hint ? errhint ( " You might need to increase \" %s \" . " , " max_slot_wal_keep_size " ) : 0 ) ;
err_hint . len ? errhint ( " %s " , err_hint . data ) : 0 ) ;
pfree ( err_detail . data ) ;
pfree ( err_hint . data ) ;
}
/*
* Can we invalidate an idle replication slot ?
*
* Idle timeout invalidation is allowed only when :
*
* 1. Idle timeout is set
* 2. Slot has reserved WAL
* 3. Slot is inactive
* 4. The slot is not being synced from the primary while the server is in
* recovery . This is because synced slots are always considered to be
* inactive because they don ' t perform logical decoding to produce changes .
*/
static inline bool
CanInvalidateIdleSlot ( ReplicationSlot * s )
{
return ( idle_replication_slot_timeout_mins ! = 0 & &
! XLogRecPtrIsInvalid ( s - > data . restart_lsn ) & &
s - > inactive_since > 0 & &
! ( RecoveryInProgress ( ) & & s - > data . synced ) ) ;
}
/*
* DetermineSlotInvalidationCause - Determine the cause for which a slot
* becomes invalid among the given possible causes .
*
* This function sequentially checks all possible invalidation causes and
* returns the first one for which the slot is eligible for invalidation .
*/
static ReplicationSlotInvalidationCause
DetermineSlotInvalidationCause ( uint32 possible_causes , ReplicationSlot * s ,
XLogRecPtr oldestLSN , Oid dboid ,
TransactionId snapshotConflictHorizon ,
TransactionId initial_effective_xmin ,
TransactionId initial_catalog_effective_xmin ,
XLogRecPtr initial_restart_lsn ,
TimestampTz * inactive_since , TimestampTz now )
{
Assert ( possible_causes ! = RS_INVAL_NONE ) ;
if ( possible_causes & RS_INVAL_WAL_REMOVED )
{
if ( initial_restart_lsn ! = InvalidXLogRecPtr & &
initial_restart_lsn < oldestLSN )
return RS_INVAL_WAL_REMOVED ;
}
if ( possible_causes & RS_INVAL_HORIZON )
{
/* invalid DB oid signals a shared relation */
if ( SlotIsLogical ( s ) & &
( dboid = = InvalidOid | | dboid = = s - > data . database ) )
{
if ( TransactionIdIsValid ( initial_effective_xmin ) & &
TransactionIdPrecedesOrEquals ( initial_effective_xmin ,
snapshotConflictHorizon ) )
return RS_INVAL_HORIZON ;
else if ( TransactionIdIsValid ( initial_catalog_effective_xmin ) & &
TransactionIdPrecedesOrEquals ( initial_catalog_effective_xmin ,
snapshotConflictHorizon ) )
return RS_INVAL_HORIZON ;
}
}
if ( possible_causes & RS_INVAL_WAL_LEVEL )
{
if ( SlotIsLogical ( s ) )
return RS_INVAL_WAL_LEVEL ;
}
if ( possible_causes & RS_INVAL_IDLE_TIMEOUT )
{
Assert ( now > 0 ) ;
/*
* Check if the slot needs to be invalidated due to
* idle_replication_slot_timeout GUC .
*/
if ( CanInvalidateIdleSlot ( s ) & &
TimestampDifferenceExceedsSeconds ( s - > inactive_since , now ,
idle_replication_slot_timeout_mins * SECS_PER_MINUTE ) )
{
* inactive_since = s - > inactive_since ;
return RS_INVAL_IDLE_TIMEOUT ;
}
}
return RS_INVAL_NONE ;
}
/*
@ -1572,7 +1699,7 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
* for syscalls , so caller must restart if we return true .
*/
static bool
InvalidatePossiblyObsoleteSlot ( ReplicationSlotInvalidationCause cause ,
InvalidatePossiblyObsoleteSlot ( uint32 possible_causes ,
ReplicationSlot * s ,
XLogRecPtr oldestLSN ,
Oid dboid , TransactionId snapshotConflictHorizon ,
@ -1585,6 +1712,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
TransactionId initial_catalog_effective_xmin = InvalidTransactionId ;
XLogRecPtr initial_restart_lsn = InvalidXLogRecPtr ;
ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE ;
TimestampTz inactive_since = 0 ;
for ( ; ; )
{
@ -1592,6 +1720,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
NameData slotname ;
int active_pid = 0 ;
ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE ;
TimestampTz now = 0 ;
long slot_idle_secs = 0 ;
Assert ( LWLockHeldByMeInMode ( ReplicationSlotControlLock , LW_SHARED ) ) ;
@ -1602,6 +1732,15 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
break ;
}
if ( possible_causes & RS_INVAL_IDLE_TIMEOUT )
{
/*
* Assign the current time here to avoid system call overhead
* while holding the spinlock in subsequent code .
*/
now = GetCurrentTimestamp ( ) ;
}
/*
* Check if the slot needs to be invalidated . If it needs to be
* invalidated , and is not currently acquired , acquire it and mark it
@ -1621,6 +1760,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
* those values change since the process holding the slot has been
* terminated ( if any ) , so record them here to ensure that we
* would report the correct invalidation cause .
*
* Unlike other slot attributes , slot ' s inactive_since can ' t be
* changed until the acquired slot is released or the owning
* process is terminated . So , the inactive slot can only be
* invalidated immediately without being terminated .
*/
if ( ! terminated )
{
@ -1629,35 +1773,15 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
initial_catalog_effective_xmin = s - > effective_catalog_xmin ;
}
switch ( cause )
{
case RS_INVAL_WAL_REMOVED :
if ( initial_restart_lsn ! = InvalidXLogRecPtr & &
initial_restart_lsn < oldestLSN )
invalidation_cause = cause ;
break ;
case RS_INVAL_HORIZON :
if ( ! SlotIsLogical ( s ) )
break ;
/* invalid DB oid signals a shared relation */
if ( dboid ! = InvalidOid & & dboid ! = s - > data . database )
break ;
if ( TransactionIdIsValid ( initial_effective_xmin ) & &
TransactionIdPrecedesOrEquals ( initial_effective_xmin ,
snapshotConflictHorizon ) )
invalidation_cause = cause ;
else if ( TransactionIdIsValid ( initial_catalog_effective_xmin ) & &
TransactionIdPrecedesOrEquals ( initial_catalog_effective_xmin ,
snapshotConflictHorizon ) )
invalidation_cause = cause ;
break ;
case RS_INVAL_WAL_LEVEL :
if ( SlotIsLogical ( s ) )
invalidation_cause = cause ;
break ;
case RS_INVAL_NONE :
pg_unreachable ( ) ;
}
invalidation_cause = DetermineSlotInvalidationCause ( possible_causes ,
s , oldestLSN ,
dboid ,
snapshotConflictHorizon ,
initial_effective_xmin ,
initial_catalog_effective_xmin ,
initial_restart_lsn ,
& inactive_since ,
now ) ;
}
/*
@ -1705,12 +1829,25 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
/*
* The logical replication slots shouldn ' t be invalidated as GUC
* max_slot_wal_keep_size is set to - 1 during the binary upgrade . See
* check_old_cluster_for_valid_slots ( ) where we ensure that no
* invalidated before the upgrade .
* max_slot_wal_keep_size is set to - 1 and
* idle_replication_slot_timeout is set to 0 during the binary
* upgrade . See check_old_cluster_for_valid_slots ( ) where we ensure
* that no invalidated before the upgrade .
*/
Assert ( ! ( * invalidated & & SlotIsLogical ( s ) & & IsBinaryUpgrade ) ) ;
/*
* Calculate the idle time duration of the slot if slot is marked
* invalidated with RS_INVAL_IDLE_TIMEOUT .
*/
if ( invalidation_cause = = RS_INVAL_IDLE_TIMEOUT )
{
int slot_idle_usecs ;
TimestampDifference ( inactive_since , now , & slot_idle_secs ,
& slot_idle_usecs ) ;
}
if ( active_pid ! = 0 )
{
/*
@ -1739,7 +1876,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
{
ReportSlotInvalidation ( invalidation_cause , true , active_pid ,
slotname , restart_lsn ,
oldestLSN , snapshotConflictHorizon ) ;
oldestLSN , snapshotConflictHorizon ,
slot_idle_secs ) ;
if ( MyBackendType = = B_STARTUP )
( void ) SendProcSignal ( active_pid ,
@ -1785,7 +1923,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
ReportSlotInvalidation ( invalidation_cause , false , active_pid ,
slotname , restart_lsn ,
oldestLSN , snapshotConflictHorizon ) ;
oldestLSN , snapshotConflictHorizon ,
slot_idle_secs ) ;
/* done with this slot for now */
break ;
@ -1802,26 +1941,32 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
*
* Returns true when any slot have got invalidated .
*
* Whether a slot needs to be invalidated depends on the cause . A slot is
* remov ed if it :
* Whether a slot needs to be invalidated depends on the invalidation cause .
* A slot is invalidat ed if it :
* - RS_INVAL_WAL_REMOVED : requires a LSN older than the given segment
* - RS_INVAL_HORIZON : requires a snapshot < = the given horizon in the given
* db ; dboid may be InvalidOid for shared relations
* - RS_INVAL_WAL_LEVEL : is logical
* - RS_INVAL_WAL_LEVEL : is logical and wal_level is insufficient
* - RS_INVAL_IDLE_TIMEOUT : has been idle longer than the configured
* " idle_replication_slot_timeout " duration .
*
* Note : This function attempts to invalidate the slot for multiple possible
* causes in a single pass , minimizing redundant iterations . The " cause "
* parameter can be a MASK representing one or more of the defined causes .
*
* NB - this runs as part of checkpoint , so avoid raising errors if possible .
*/
bool
InvalidateObsoleteReplicationSlots ( ReplicationSlotInvalidationCause cause ,
InvalidateObsoleteReplicationSlots ( uint32 possible_causes ,
XLogSegNo oldestSegno , Oid dboid ,
TransactionId snapshotConflictHorizon )
{
XLogRecPtr oldestLSN ;
bool invalidated = false ;
Assert ( cause ! = RS_INVAL_HORIZON | | TransactionIdIsValid ( snapshotConflictHorizon ) ) ;
Assert ( cause ! = RS_INVAL_WAL_REMOVED | | oldestSegno > 0 ) ;
Assert ( cause ! = RS_INVAL_NONE ) ;
Assert ( ! ( possible_causes & RS_INVAL_HORIZON ) | | TransactionIdIsValid ( snapshotConflictHorizon ) ) ;
Assert ( ! ( possible_causes & RS_INVAL_WAL_REMOVED ) | | oldestSegno > 0 ) ;
Assert ( possible_ causes ! = RS_INVAL_NONE ) ;
if ( max_replication_slots = = 0 )
return invalidated ;
@ -1837,7 +1982,7 @@ restart:
if ( ! s - > in_use )
continue ;
if ( InvalidatePossiblyObsoleteSlot ( cause , s , oldestLSN , dboid ,
if ( InvalidatePossiblyObsoleteSlot ( possible_ causes , s , oldestLSN , dboid ,
snapshotConflictHorizon ,
& invalidated ) )
{
@ -2426,26 +2571,37 @@ RestoreSlotFromDisk(const char *name)
* ReplicationSlotInvalidationCause .
*/
ReplicationSlotInvalidationCause
GetSlotInvalidationCause ( const char * invalidation_reason )
GetSlotInvalidationCause ( const char * cause_name )
{
ReplicationSlotInvalidationCause cause ;
ReplicationSlotInvalidationCause result = RS_INVAL_NONE ;
bool found PG_USED_FOR_ASSERTS_ONLY = false ;
Assert ( cause_name ) ;
Assert ( invalidation_reason ) ;
/* Search lookup table for the cause having this name */
for ( int i = 0 ; i < = RS_INVAL_MAX_CAUSES ; i + + )
{
if ( strcmp ( SlotInvalidationCauses [ i ] . cause_name , cause_name ) = = 0 )
return SlotInvalidationCauses [ i ] . cause ;
}
Assert ( false ) ;
return RS_INVAL_NONE ; /* to keep compiler quiet */
}
for ( cause = RS_INVAL_NONE ; cause < = RS_INVAL_MAX_CAUSES ; cause + + )
/*
* Maps an ReplicationSlotInvalidationCause to the invalidation
* reason for a replication slot .
*/
const char *
GetSlotInvalidationCauseName ( ReplicationSlotInvalidationCause cause )
{
/* Search lookup table for the name of this cause */
for ( int i = 0 ; i < = RS_INVAL_MAX_CAUSES ; i + + )
{
if ( strcmp ( SlotInvalidationCauses [ cause ] , invalidation_reason ) = = 0 )
{
found = true ;
result = cause ;
break ;
}
if ( SlotInvalidationCauses [ i ] . cause = = cause )
return SlotInvalidationCauses [ i ] . cause_name ;
}
Assert ( found ) ;
return result ;
Assert ( false ) ;
return " none " ; /* to keep compiler quiet */
}
/*
@ -2802,3 +2958,22 @@ WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
ConditionVariableCancelSleep ( ) ;
}
/*
* GUC check_hook for idle_replication_slot_timeout
*
* The value of idle_replication_slot_timeout must be set to 0 during
* a binary upgrade . See start_postmaster ( ) in pg_upgrade for more details .
*/
bool
check_idle_replication_slot_timeout ( int * newval , void * * extra , GucSource source )
{
if ( IsBinaryUpgrade & & * newval ! = 0 )
{
GUC_check_errdetail ( " The value of \" %s \" must be set to 0 during binary upgrade mode. " ,
" idle_replication_slot_timeout " ) ;
return false ;
}
return true ;
}