@ -424,6 +424,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
slot - > candidate_restart_valid = InvalidXLogRecPtr ;
slot - > candidate_restart_lsn = InvalidXLogRecPtr ;
slot - > last_saved_confirmed_flush = InvalidXLogRecPtr ;
slot - > last_saved_restart_lsn = InvalidXLogRecPtr ;
slot - > inactive_since = 0 ;
/*
@ -1165,20 +1166,41 @@ ReplicationSlotsComputeRequiredLSN(void)
{
ReplicationSlot * s = & ReplicationSlotCtl - > replication_slots [ i ] ;
XLogRecPtr restart_lsn ;
XLogRecPtr last_saved_restart_lsn ;
bool invalidated ;
ReplicationSlotPersistency persistency ;
if ( ! s - > in_use )
continue ;
SpinLockAcquire ( & s - > mutex ) ;
persistency = s - > data . persistency ;
restart_lsn = s - > data . restart_lsn ;
invalidated = s - > data . invalidated ! = RS_INVAL_NONE ;
last_saved_restart_lsn = s - > last_saved_restart_lsn ;
SpinLockRelease ( & s - > mutex ) ;
/* invalidated slots need not apply */
if ( invalidated )
continue ;
/*
* For persistent slot use last_saved_restart_lsn to compute the
* oldest LSN for removal of WAL segments . The segments between
* last_saved_restart_lsn and restart_lsn might be needed by a
* persistent slot in the case of database crash . Non - persistent
* slots can ' t survive the database crash , so we don ' t care about
* last_saved_restart_lsn for them .
*/
if ( persistency = = RS_PERSISTENT )
{
if ( last_saved_restart_lsn ! = InvalidXLogRecPtr & &
restart_lsn > last_saved_restart_lsn )
{
restart_lsn = last_saved_restart_lsn ;
}
}
if ( restart_lsn ! = InvalidXLogRecPtr & &
( min_required = = InvalidXLogRecPtr | |
restart_lsn < min_required ) )
@ -1216,7 +1238,9 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
{
ReplicationSlot * s ;
XLogRecPtr restart_lsn ;
XLogRecPtr last_saved_restart_lsn ;
bool invalidated ;
ReplicationSlotPersistency persistency ;
s = & ReplicationSlotCtl - > replication_slots [ i ] ;
@ -1230,14 +1254,33 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
/* read once, it's ok if it increases while we're checking */
SpinLockAcquire ( & s - > mutex ) ;
persistency = s - > data . persistency ;
restart_lsn = s - > data . restart_lsn ;
invalidated = s - > data . invalidated ! = RS_INVAL_NONE ;
last_saved_restart_lsn = s - > last_saved_restart_lsn ;
SpinLockRelease ( & s - > mutex ) ;
/* invalidated slots need not apply */
if ( invalidated )
continue ;
/*
* For persistent slot use last_saved_restart_lsn to compute the
* oldest LSN for removal of WAL segments . The segments between
* last_saved_restart_lsn and restart_lsn might be needed by a
* persistent slot in the case of database crash . Non - persistent
* slots can ' t survive the database crash , so we don ' t care about
* last_saved_restart_lsn for them .
*/
if ( persistency = = RS_PERSISTENT )
{
if ( last_saved_restart_lsn ! = InvalidXLogRecPtr & &
restart_lsn > last_saved_restart_lsn )
{
restart_lsn = last_saved_restart_lsn ;
}
}
if ( restart_lsn = = InvalidXLogRecPtr )
continue ;
@ -1455,6 +1498,7 @@ ReplicationSlotReserveWal(void)
Assert ( slot ! = NULL ) ;
Assert ( slot - > data . restart_lsn = = InvalidXLogRecPtr ) ;
Assert ( slot - > last_saved_restart_lsn = = InvalidXLogRecPtr ) ;
/*
* The replication slot mechanism is used to prevent removal of required
@ -1766,6 +1810,8 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
*/
SpinLockAcquire ( & s - > mutex ) ;
Assert ( s - > data . restart_lsn > = s - > last_saved_restart_lsn ) ;
restart_lsn = s - > data . restart_lsn ;
/* we do nothing if the slot is already invalid */
@ -1835,7 +1881,10 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
* just rely on . invalidated .
*/
if ( invalidation_cause = = RS_INVAL_WAL_REMOVED )
{
s - > data . restart_lsn = InvalidXLogRecPtr ;
s - > last_saved_restart_lsn = InvalidXLogRecPtr ;
}
/* Let caller know */
* invalidated = true ;
@ -2079,6 +2128,12 @@ CheckPointReplicationSlots(bool is_shutdown)
SaveSlotToPath ( s , path , LOG ) ;
}
LWLockRelease ( ReplicationSlotAllocationLock ) ;
/*
* Recompute the required LSN as SaveSlotToPath ( ) updated
* last_saved_restart_lsn for slots .
*/
ReplicationSlotsComputeRequiredLSN ( ) ;
}
/*
@ -2354,6 +2409,7 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
if ( ! slot - > just_dirtied )
slot - > dirty = false ;
slot - > last_saved_confirmed_flush = cp . slotdata . confirmed_flush ;
slot - > last_saved_restart_lsn = cp . slotdata . restart_lsn ;
SpinLockRelease ( & slot - > mutex ) ;
LWLockRelease ( & slot - > io_in_progress_lock ) ;
@ -2569,6 +2625,7 @@ RestoreSlotFromDisk(const char *name)
slot - > effective_xmin = cp . slotdata . xmin ;
slot - > effective_catalog_xmin = cp . slotdata . catalog_xmin ;
slot - > last_saved_confirmed_flush = cp . slotdata . confirmed_flush ;
slot - > last_saved_restart_lsn = cp . slotdata . restart_lsn ;
slot - > candidate_catalog_xmin = InvalidTransactionId ;
slot - > candidate_xmin_lsn = InvalidXLogRecPtr ;