@ -187,6 +187,9 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
TransactionIdPrecedes ( remote_slot - > catalog_xmin ,
TransactionIdPrecedes ( remote_slot - > catalog_xmin ,
slot - > data . catalog_xmin ) )
slot - > data . catalog_xmin ) )
{
{
/* Update slot sync skip stats */
pgstat_report_replslotsync ( slot ) ;
/*
/*
* This can happen in following situations :
* This can happen in following situations :
*
*
@ -277,6 +280,13 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
errdetail_internal ( " Remote slot has LSN %X/%08X but local slot has LSN %X/%08X. " ,
errdetail_internal ( " Remote slot has LSN %X/%08X but local slot has LSN %X/%08X. " ,
LSN_FORMAT_ARGS ( remote_slot - > confirmed_lsn ) ,
LSN_FORMAT_ARGS ( remote_slot - > confirmed_lsn ) ,
LSN_FORMAT_ARGS ( slot - > data . confirmed_flush ) ) ) ;
LSN_FORMAT_ARGS ( slot - > data . confirmed_flush ) ) ) ;
/*
* If we can ' t reach a consistent snapshot , the slot won ' t be
* persisted . See update_and_persist_local_synced_slot ( ) .
*/
if ( found_consistent_snapshot & & ! ( * found_consistent_snapshot ) )
pgstat_report_replslotsync ( slot ) ;
}
}
updated_xmin_or_lsn = true ;
updated_xmin_or_lsn = true ;
@ -563,6 +573,7 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
bool found_consistent_snapshot = false ;
bool found_consistent_snapshot = false ;
bool remote_slot_precedes = false ;
bool remote_slot_precedes = false ;
/* Slotsync skip stats are handled in function update_local_synced_slot() */
( void ) update_local_synced_slot ( remote_slot , remote_dbid ,
( void ) update_local_synced_slot ( remote_slot , remote_dbid ,
& found_consistent_snapshot ,
& found_consistent_snapshot ,
& remote_slot_precedes ) ;
& remote_slot_precedes ) ;
@ -624,31 +635,9 @@ static bool
synchronize_one_slot ( RemoteSlot * remote_slot , Oid remote_dbid )
synchronize_one_slot ( RemoteSlot * remote_slot , Oid remote_dbid )
{
{
ReplicationSlot * slot ;
ReplicationSlot * slot ;
XLogRecPtr latestFlushPtr ;
XLogRecPtr latestFlushPtr = GetStandbyFlushRecPtr ( NULL ) ;
bool slot_updated = false ;
bool slot_updated = false ;
/*
* Make sure that concerned WAL is received and flushed before syncing
* slot to target lsn received from the primary server .
*/
latestFlushPtr = GetStandbyFlushRecPtr ( NULL ) ;
if ( remote_slot - > confirmed_lsn > latestFlushPtr )
{
/*
* Can get here only if GUC ' synchronized_standby_slots ' on the
* primary server was not configured correctly .
*/
ereport ( AmLogicalSlotSyncWorkerProcess ( ) ? LOG : ERROR ,
errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " skipping slot synchronization because the received slot sync "
" LSN %X/%08X for slot \" %s \" is ahead of the standby position %X/%08X " ,
LSN_FORMAT_ARGS ( remote_slot - > confirmed_lsn ) ,
remote_slot - > name ,
LSN_FORMAT_ARGS ( latestFlushPtr ) ) ) ;
return false ;
}
/* Search for the named slot */
/* Search for the named slot */
if ( ( slot = SearchNamedReplicationSlot ( remote_slot - > name , true ) ) )
if ( ( slot = SearchNamedReplicationSlot ( remote_slot - > name , true ) ) )
{
{
@ -707,10 +696,38 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
/* Skip the sync of an invalidated slot */
/* Skip the sync of an invalidated slot */
if ( slot - > data . invalidated ! = RS_INVAL_NONE )
if ( slot - > data . invalidated ! = RS_INVAL_NONE )
{
{
pgstat_report_replslotsync ( slot ) ;
ReplicationSlotRelease ( ) ;
ReplicationSlotRelease ( ) ;
return slot_updated ;
return slot_updated ;
}
}
/*
* Make sure that concerned WAL is received and flushed before syncing
* slot to target lsn received from the primary server .
*
* Report statistics only after the slot has been acquired , ensuring
* it cannot be dropped during the reporting process .
*/
if ( remote_slot - > confirmed_lsn > latestFlushPtr )
{
pgstat_report_replslotsync ( slot ) ;
/*
* Can get here only if GUC ' synchronized_standby_slots ' on the
* primary server was not configured correctly .
*/
ereport ( AmLogicalSlotSyncWorkerProcess ( ) ? LOG : ERROR ,
errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " skipping slot synchronization because the received slot sync "
" LSN %X/%08X for slot \" %s \" is ahead of the standby position %X/%08X " ,
LSN_FORMAT_ARGS ( remote_slot - > confirmed_lsn ) ,
remote_slot - > name ,
LSN_FORMAT_ARGS ( latestFlushPtr ) ) ) ;
return slot_updated ;
}
/* Slot not ready yet, let's attempt to make it sync-ready now. */
/* Slot not ready yet, let's attempt to make it sync-ready now. */
if ( slot - > data . persistency = = RS_TEMPORARY )
if ( slot - > data . persistency = = RS_TEMPORARY )
{
{
@ -784,6 +801,32 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
ReplicationSlotsComputeRequiredXmin ( true ) ;
ReplicationSlotsComputeRequiredXmin ( true ) ;
LWLockRelease ( ProcArrayLock ) ;
LWLockRelease ( ProcArrayLock ) ;
/*
* Make sure that concerned WAL is received and flushed before syncing
* slot to target lsn received from the primary server .
*
* Report statistics only after the slot has been acquired , ensuring
* it cannot be dropped during the reporting process .
*/
if ( remote_slot - > confirmed_lsn > latestFlushPtr )
{
pgstat_report_replslotsync ( slot ) ;
/*
* Can get here only if GUC ' synchronized_standby_slots ' on the
* primary server was not configured correctly .
*/
ereport ( AmLogicalSlotSyncWorkerProcess ( ) ? LOG : ERROR ,
errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " skipping slot synchronization because the received slot sync "
" LSN %X/%08X for slot \" %s \" is ahead of the standby position %X/%08X " ,
LSN_FORMAT_ARGS ( remote_slot - > confirmed_lsn ) ,
remote_slot - > name ,
LSN_FORMAT_ARGS ( latestFlushPtr ) ) ) ;
return false ;
}
update_and_persist_local_synced_slot ( remote_slot , remote_dbid ) ;
update_and_persist_local_synced_slot ( remote_slot , remote_dbid ) ;
slot_updated = true ;
slot_updated = true ;