@ -39,6 +39,13 @@
* the last cycle . Refer to the comments above wait_for_slot_activity ( ) for
* more details .
*
* If the SQL function pg_sync_replication_slots ( ) is used to sync the slots ,
* and if the slots are not ready to be synced and are marked as RS_TEMPORARY
* because of any of the reasons mentioned above , then the SQL function also
* waits and retries until the slots are marked as RS_PERSISTENT ( which means
* sync - ready ) . Refer to the comments in SyncReplicationSlots ( ) for more
* details .
*
* Any standby synchronized slots will be dropped if they no longer need
* to be synchronized . See comment atop drop_local_obsolete_slots ( ) for more
* details .
@ -64,6 +71,7 @@
# include "storage/procarray.h"
# include "tcop/tcopprot.h"
# include "utils/builtins.h"
# include "utils/memutils.h"
# include "utils/pg_lsn.h"
# include "utils/ps_status.h"
# include "utils/timeout.h"
@ -599,11 +607,15 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
* local ones , then update the LSNs and persist the local synced slot for
* future synchronization ; otherwise , do nothing .
*
* * slot_persistence_pending is set to true if any of the slots fail to
* persist .
*
* Return true if the slot is marked as RS_PERSISTENT ( sync - ready ) , otherwise
* false .
*/
static bool
update_and_persist_local_synced_slot ( RemoteSlot * remote_slot , Oid remote_dbid )
update_and_persist_local_synced_slot ( RemoteSlot * remote_slot , Oid remote_dbid ,
bool * slot_persistence_pending )
{
ReplicationSlot * slot = MyReplicationSlot ;
bool found_consistent_snapshot = false ;
@ -627,7 +639,13 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
* current location when recreating the slot in the next cycle . It may
* take more time to create such a slot . Therefore , we keep this slot
* and attempt the synchronization in the next cycle .
*
* We also update the slot_persistence_pending parameter , so the SQL
* function can retry .
*/
if ( slot_persistence_pending )
* slot_persistence_pending = true ;
return false ;
}
@ -642,6 +660,10 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
errdetail ( " Synchronization could lead to data loss, because the standby could not build a consistent snapshot to decode WALs at LSN %X/%08X. " ,
LSN_FORMAT_ARGS ( slot - > data . restart_lsn ) ) ) ;
/* Set this, so that SQL function can retry */
if ( slot_persistence_pending )
* slot_persistence_pending = true ;
return false ;
}
@ -665,10 +687,14 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
* updated . The slot is then persisted and is considered as sync - ready for
* periodic syncs .
*
* * slot_persistence_pending is set to true if any of the slots fail to
* persist .
*
* Returns TRUE if the local slot is updated .
*/
static bool
synchronize_one_slot ( RemoteSlot * remote_slot , Oid remote_dbid )
synchronize_one_slot ( RemoteSlot * remote_slot , Oid remote_dbid ,
bool * slot_persistence_pending )
{
ReplicationSlot * slot ;
XLogRecPtr latestFlushPtr = GetStandbyFlushRecPtr ( NULL ) ;
@ -770,7 +796,8 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
if ( slot - > data . persistency = = RS_TEMPORARY )
{
slot_updated = update_and_persist_local_synced_slot ( remote_slot ,
remote_dbid ) ;
remote_dbid ,
slot_persistence_pending ) ;
}
/* Slot ready for sync, so sync it. */
@ -867,7 +894,8 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
return false ;
}
update_and_persist_local_synced_slot ( remote_slot , remote_dbid ) ;
update_and_persist_local_synced_slot ( remote_slot , remote_dbid ,
slot_persistence_pending ) ;
slot_updated = true ;
}
@ -878,15 +906,16 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
}
/*
* Synchroniz e slots .
* Fetch remot e slots .
*
* Gets the failover logical slots info from the primary server and updates
* the slots locally . Creates the slots if not present on the standby .
* If slot_names is NIL , fetches all failover logical slots from the
* primary server , otherwise fetches only the ones with names in slot_names .
*
* Returns TRUE if any of the slots gets updated in this sync - cycle .
* Returns a list of remote slot information structures , or NIL if none
* are found .
*/
static bool
synchroniz e_slots( WalReceiverConn * wrconn )
static List *
fetch_remot e_slots( WalReceiverConn * wrconn , List * slot_names )
{
# define SLOTSYNC_COLUMN_COUNT 10
Oid slotRow [ SLOTSYNC_COLUMN_COUNT ] = { TEXTOID , TEXTOID , LSNOID ,
@ -895,29 +924,45 @@ synchronize_slots(WalReceiverConn *wrconn)
WalRcvExecResult * res ;
TupleTableSlot * tupslot ;
List * remote_slot_list = NIL ;
bool some_slot_updated = false ;
bool started_tx = false ;
const char * query = " SELECT slot_name, plugin, confirmed_flush_lsn, "
" restart_lsn, catalog_xmin, two_phase, two_phase_at, failover, "
" database, invalidation_reason "
" FROM pg_catalog.pg_replication_slots "
" WHERE failover and NOT temporary " ;
/* The syscache access in walrcv_exec() needs a transaction env. */
if ( ! IsTransactionState ( ) )
StringInfoData query ;
initStringInfo ( & query ) ;
appendStringInfoString ( & query ,
" SELECT slot_name, plugin, confirmed_flush_lsn, "
" restart_lsn, catalog_xmin, two_phase, "
" two_phase_at, failover, "
" database, invalidation_reason "
" FROM pg_catalog.pg_replication_slots "
" WHERE failover and NOT temporary " ) ;
if ( slot_names ! = NIL )
{
StartTransactionCommand ( ) ;
started_tx = true ;
bool first_slot = true ;
/*
* Construct the query to fetch only the specified slots
*/
appendStringInfoString ( & query , " AND slot_name IN ( " ) ;
foreach_ptr ( char , slot_name , slot_names )
{
if ( ! first_slot )
appendStringInfoString ( & query , " , " ) ;
appendStringInfo ( & query , " %s " , quote_literal_cstr ( slot_name ) ) ;
first_slot = false ;
}
appendStringInfoChar ( & query , ' ) ' ) ;
}
/* Execute the query */
res = walrcv_exec ( wrconn , query , SLOTSYNC_COLUMN_COUNT , slotRow ) ;
res = walrcv_exec ( wrconn , query . data , SLOTSYNC_COLUMN_COUNT , slotRow ) ;
pfree ( query . data ) ;
if ( res - > status ! = WALRCV_OK_TUPLES )
ereport ( ERROR ,
errmsg ( " could not fetch failover logical slots info from the primary server: %s " ,
res - > err ) ) ;
/* Construct the remote_slot tuple and synchronize each slot locally */
tupslot = MakeSingleTupleTableSlot ( res - > tupledesc , & TTSOpsMinimalTuple ) ;
while ( tuplestore_gettupleslot ( res - > tuplestore , true , false , tupslot ) )
{
@ -994,6 +1039,29 @@ synchronize_slots(WalReceiverConn *wrconn)
ExecClearTuple ( tupslot ) ;
}
walrcv_clear_result ( res ) ;
return remote_slot_list ;
}
/*
* Synchronize slots .
*
* This function takes a list of remote slots and synchronizes them locally . It
* creates the slots if not present on the standby and updates existing ones .
*
* If slot_persistence_pending is not NULL , it will be set to true if one or
* more slots could not be persisted . This allows callers such as
* SyncReplicationSlots ( ) to retry those slots .
*
* Returns TRUE if any of the slots gets updated in this sync - cycle .
*/
static bool
synchronize_slots ( WalReceiverConn * wrconn , List * remote_slot_list ,
bool * slot_persistence_pending )
{
bool some_slot_updated = false ;
/* Drop local slots that no longer need to be synced. */
drop_local_obsolete_slots ( remote_slot_list ) ;
@ -1009,19 +1077,12 @@ synchronize_slots(WalReceiverConn *wrconn)
*/
LockSharedObject ( DatabaseRelationId , remote_dbid , 0 , AccessShareLock ) ;
some_slot_updated | = synchronize_one_slot ( remote_slot , remote_dbid ) ;
some_slot_updated | = synchronize_one_slot ( remote_slot , remote_dbid ,
slot_persistence_pending ) ;
UnlockSharedObject ( DatabaseRelationId , remote_dbid , 0 , AccessShareLock ) ;
}
/* We are done, free remote_slot_list elements */
list_free_deep ( remote_slot_list ) ;
walrcv_clear_result ( res ) ;
if ( started_tx )
CommitTransactionCommand ( ) ;
return some_slot_updated ;
}
@ -1460,6 +1521,9 @@ reset_syncing_flag(void)
*
* It connects to the primary server , fetches logical failover slots
* information periodically in order to create and sync the slots .
*
* Note : If any changes are made here , check if the corresponding SQL
* function logic in SyncReplicationSlots ( ) also needs to be changed .
*/
void
ReplSlotSyncWorkerMain ( const void * startup_data , size_t startup_data_len )
@ -1621,10 +1685,27 @@ ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len)
for ( ; ; )
{
bool some_slot_updated = false ;
bool started_tx = false ;
List * remote_slots ;
ProcessSlotSyncInterrupts ( ) ;
some_slot_updated = synchronize_slots ( wrconn ) ;
/*
* The syscache access in fetch_remote_slots ( ) needs a transaction
* env .
*/
if ( ! IsTransactionState ( ) )
{
StartTransactionCommand ( ) ;
started_tx = true ;
}
remote_slots = fetch_remote_slots ( wrconn , NIL ) ;
some_slot_updated = synchronize_slots ( wrconn , remote_slots , NULL ) ;
list_free_deep ( remote_slots ) ;
if ( started_tx )
CommitTransactionCommand ( ) ;
wait_for_slot_activity ( some_slot_updated ) ;
}
@ -1864,15 +1945,43 @@ slotsync_failure_callback(int code, Datum arg)
walrcv_disconnect ( wrconn ) ;
}
/*
* Helper function to extract slot names from a list of remote slots
*/
static List *
extract_slot_names ( List * remote_slots )
{
List * slot_names = NIL ;
foreach_ptr ( RemoteSlot , remote_slot , remote_slots )
{
char * slot_name ;
slot_name = pstrdup ( remote_slot - > name ) ;
slot_names = lappend ( slot_names , slot_name ) ;
}
return slot_names ;
}
/*
* Synchronize the failover enabled replication slots using the specified
* primary server connection .
*
* Repeatedly fetches and updates replication slot information from the
* primary until all slots are at least " sync ready " .
*
* Exits early if promotion is triggered or certain critical
* configuration parameters have changed .
*/
void
SyncReplicationSlots ( WalReceiverConn * wrconn )
{
PG_ENSURE_ERROR_CLEANUP ( slotsync_failure_callback , PointerGetDatum ( wrconn ) ) ;
{
List * remote_slots = NIL ;
List * slot_names = NIL ; /* List of slot names to track */
check_and_set_sync_info ( MyProcPid ) ;
/* Check for interrupts and config changes */
@ -1880,7 +1989,54 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
validate_remote_info ( wrconn ) ;
synchronize_slots ( wrconn ) ;
/* Retry until all the slots are sync-ready */
for ( ; ; )
{
bool slot_persistence_pending = false ;
bool some_slot_updated = false ;
/* Check for interrupts and config changes */
ProcessSlotSyncInterrupts ( ) ;
/* We must be in a valid transaction state */
Assert ( IsTransactionState ( ) ) ;
/*
* Fetch remote slot info for the given slot_names . If slot_names
* is NIL , fetch all failover - enabled slots . Note that we reuse
* slot_names from the first iteration ; re - fetching all failover
* slots each time could cause an endless loop . Instead of
* reprocessing only the pending slots in each iteration , it ' s
* better to process all the slots received in the first
* iteration . This ensures that by the time we ' re done , all slots
* reflect the latest values .
*/
remote_slots = fetch_remote_slots ( wrconn , slot_names ) ;
/* Attempt to synchronize slots */
some_slot_updated = synchronize_slots ( wrconn , remote_slots ,
& slot_persistence_pending ) ;
/*
* If slot_persistence_pending is true , extract slot names for
* future iterations ( only needed if we haven ' t done it yet )
*/
if ( slot_names = = NIL & & slot_persistence_pending )
slot_names = extract_slot_names ( remote_slots ) ;
/* Free the current remote_slots list */
list_free_deep ( remote_slots ) ;
/* Done if all slots are persisted i.e are sync-ready */
if ( ! slot_persistence_pending )
break ;
/* wait before retrying again */
wait_for_slot_activity ( some_slot_updated ) ;
}
if ( slot_names )
list_free_deep ( slot_names ) ;
/* Cleanup the synced temporary slots */
ReplicationSlotCleanup ( true ) ;