@ -30,23 +30,34 @@
* searching the through all waiters each time we receive a reply .
*
* In 9.5 or before only a single standby could be considered as
* synchronous . In 9.6 we support multiple synchronous standbys .
* The number of synchronous standbys that transactions must wait for
* replies from is specified in synchronous_standby_names .
* This parameter also specifies a list of standby names ,
* which determines the priority of each standby for being chosen as
* a synchronous standby . The standbys whose names appear earlier
* in the list are given higher priority and will be considered as
* synchronous . Other standby servers appearing later in this list
* represent potential synchronous standbys . If any of the current
* synchronous standbys disconnects for whatever reason , it will be
* replaced immediately with the next - highest - priority standby .
* synchronous . In 9.6 we support a priority - based multiple synchronous
* standbys . In 10.0 a quorum - based multiple synchronous standbys is also
* supported . The number of synchronous standbys that transactions
* must wait for replies from is specified in synchronous_standby_names .
* This parameter also specifies a list of standby names and the method
* ( FIRST and ANY ) to choose synchronous standbys from the listed ones .
*
* The method FIRST specifies a priority - based synchronous replication
* and makes transaction commits wait until their WAL records are
* replicated to the requested number of synchronous standbys chosen based
* on their priorities . The standbys whose names appear earlier in the list
* are given higher priority and will be considered as synchronous .
* Other standby servers appearing later in this list represent potential
* synchronous standbys . If any of the current synchronous standbys
* disconnects for whatever reason , it will be replaced immediately with
* the next - highest - priority standby .
*
* The method ANY specifies a quorum - based synchronous replication
* and makes transaction commits wait until their WAL records are
* replicated to at least the requested number of synchronous standbys
* in the list . All the standbys appearing in the list are considered as
* candidates for quorum synchronous standbys .
*
* Before the standbys chosen from synchronous_standby_names can
* become the synchronous standbys they must have caught up with
* the primary ; that may take some time . Once caught up ,
* the current higher priority standbys which are considered as
* synchronous at that moment will release waiters from the queue .
* the standbys which are considered as synchronous at that moment
* will release waiters from the queue .
*
* Portions Copyright ( c ) 2010 - 2016 , PostgreSQL Global Development Group
*
@ -79,18 +90,29 @@ char *SyncRepStandbyNames;
static bool announce_next_takeover = true ;
static SyncRepConfigData * SyncRepConfig = NULL ;
SyncRepConfigData * SyncRepConfig = NULL ;
static int SyncRepWaitMode = SYNC_REP_NO_WAIT ;
static void SyncRepQueueInsert ( int mode ) ;
static void SyncRepCancelWait ( void ) ;
static int SyncRepWakeQueue ( bool all , int mode ) ;
static bool SyncRepGetOldestSyncRecPtr ( XLogRecPtr * writePtr ,
XLogRecPtr * flushPtr ,
XLogRecPtr * applyPtr ,
bool * am_sync ) ;
static bool SyncRepGetSyncRecPtr ( XLogRecPtr * writePtr ,
XLogRecPtr * flushPtr ,
XLogRecPtr * applyPtr ,
bool * am_sync ) ;
static void SyncRepGetOldestSyncRecPtr ( XLogRecPtr * writePtr ,
XLogRecPtr * flushPtr ,
XLogRecPtr * applyPtr ,
List * sync_standbys ) ;
static void SyncRepGetNthLatestSyncRecPtr ( XLogRecPtr * writePtr ,
XLogRecPtr * flushPtr ,
XLogRecPtr * applyPtr ,
List * sync_standbys , uint8 nth ) ;
static int SyncRepGetStandbyPriority ( void ) ;
static List * SyncRepGetSyncStandbysPriority ( bool * am_sync ) ;
static List * SyncRepGetSyncStandbysQuorum ( bool * am_sync ) ;
static int cmp_lsn ( const void * a , const void * b ) ;
# ifdef USE_ASSERT_CHECKING
static bool SyncRepQueueIsOrderedByLSN ( int mode ) ;
@ -386,7 +408,7 @@ SyncRepReleaseWaiters(void)
XLogRecPtr writePtr ;
XLogRecPtr flushPtr ;
XLogRecPtr applyPtr ;
bool got_oldest ;
bool got_recptr ;
bool am_sync ;
int numwrite = 0 ;
int numflush = 0 ;
@ -413,11 +435,10 @@ SyncRepReleaseWaiters(void)
LWLockAcquire ( SyncRepLock , LW_EXCLUSIVE ) ;
/*
* Check whether we are a sync standby or not , and calculate the oldest
* Check whether we are a sync standby or not , and calculate the synced
* positions among all sync standbys .
*/
got_oldest = SyncRepGetOldestSyncRecPtr ( & writePtr , & flushPtr ,
& applyPtr , & am_sync ) ;
got_recptr = SyncRepGetSyncRecPtr ( & writePtr , & flushPtr , & applyPtr , & am_sync ) ;
/*
* If we are managing a sync standby , though we weren ' t prior to this ,
@ -426,16 +447,22 @@ SyncRepReleaseWaiters(void)
if ( announce_next_takeover & & am_sync )
{
announce_next_takeover = false ;
ereport ( LOG ,
( errmsg ( " standby \" %s \" is now a synchronous standby with priority %u " ,
application_name , MyWalSnd - > sync_standby_priority ) ) ) ;
if ( SyncRepConfig - > syncrep_method = = SYNC_REP_PRIORITY )
ereport ( LOG ,
( errmsg ( " standby \" %s \" is now a synchronous standby with priority %u " ,
application_name , MyWalSnd - > sync_standby_priority ) ) ) ;
else
ereport ( LOG ,
( errmsg ( " standby \" %s \" is now a candidate for quorum synchronous standby " ,
application_name ) ) ) ;
}
/*
* If the number of sync standbys is less than requested or we aren ' t
* managing a sync standby then just leave .
*/
if ( ! got_oldest | | ! am_sync )
if ( ! got_recptr | | ! am_sync )
{
LWLockRelease ( SyncRepLock ) ;
announce_next_takeover = ! am_sync ;
@ -471,21 +498,20 @@ SyncRepReleaseWaiters(void)
}
/*
* Calculate the oldest Write , Flush and Apply positions among sync standbys .
* Calculate the synced Write , Flush and Apply positions among sync standbys .
*
* Return false if the number of sync standbys is less than
* synchronous_standby_names specifies . Otherwise return true and
* store the oldest positions into * writePtr , * flushPtr and * applyPtr .
* store the positions into * writePtr , * flushPtr and * applyPtr .
*
* On return , * am_sync is set to true if this walsender is connecting to
* sync standby . Otherwise it ' s set to false .
*/
static bool
SyncRepGetOldest SyncRecPtr ( XLogRecPtr * writePtr , XLogRecPtr * flushPtr ,
SyncRepGetSyncRecPtr ( XLogRecPtr * writePtr , XLogRecPtr * flushPtr ,
XLogRecPtr * applyPtr , bool * am_sync )
{
List * sync_standbys ;
ListCell * cell ;
* writePtr = InvalidXLogRecPtr ;
* flushPtr = InvalidXLogRecPtr ;
@ -508,12 +534,49 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
}
/*
* Scan through all sync standbys and calculate the oldest Write , Flush
* and Apply positions .
* In a priority - based sync replication , the synced positions are the
* oldest ones among sync standbys . In a quorum - based , they are the Nth
* latest ones .
*
* SyncRepGetNthLatestSyncRecPtr ( ) also can calculate the oldest positions .
* But we use SyncRepGetOldestSyncRecPtr ( ) for that calculation because
* it ' s a bit more efficient .
*
* XXX If the numbers of current and requested sync standbys are the same ,
* we can use SyncRepGetOldestSyncRecPtr ( ) to calculate the synced
* positions even in a quorum - based sync replication .
*/
if ( SyncRepConfig - > syncrep_method = = SYNC_REP_PRIORITY )
{
SyncRepGetOldestSyncRecPtr ( writePtr , flushPtr , applyPtr ,
sync_standbys ) ;
}
else
{
SyncRepGetNthLatestSyncRecPtr ( writePtr , flushPtr , applyPtr ,
sync_standbys , SyncRepConfig - > num_sync ) ;
}
list_free ( sync_standbys ) ;
return true ;
}
/*
* Calculate the oldest Write , Flush and Apply positions among sync standbys .
*/
static void
SyncRepGetOldestSyncRecPtr ( XLogRecPtr * writePtr , XLogRecPtr * flushPtr ,
XLogRecPtr * applyPtr , List * sync_standbys )
{
ListCell * cell ;
/*
* Scan through all sync standbys and calculate the oldest
* Write , Flush and Apply positions .
*/
foreach ( cell , sync_standbys )
foreach ( cell , sync_standbys )
{
WalSnd * walsnd = & WalSndCtl - > walsnds [ lfirst_int ( cell ) ] ;
WalSnd * walsnd = & WalSndCtl - > walsnds [ lfirst_int ( cell ) ] ;
XLogRecPtr write ;
XLogRecPtr flush ;
XLogRecPtr apply ;
@ -531,23 +594,163 @@ SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
if ( XLogRecPtrIsInvalid ( * applyPtr ) | | * applyPtr > apply )
* applyPtr = apply ;
}
}
list_free ( sync_standbys ) ;
return true ;
/*
* Calculate the Nth latest Write , Flush and Apply positions among sync
* standbys .
*/
static void
SyncRepGetNthLatestSyncRecPtr ( XLogRecPtr * writePtr , XLogRecPtr * flushPtr ,
XLogRecPtr * applyPtr , List * sync_standbys , uint8 nth )
{
ListCell * cell ;
XLogRecPtr * write_array ;
XLogRecPtr * flush_array ;
XLogRecPtr * apply_array ;
int len ;
int i = 0 ;
len = list_length ( sync_standbys ) ;
write_array = ( XLogRecPtr * ) palloc ( sizeof ( XLogRecPtr ) * len ) ;
flush_array = ( XLogRecPtr * ) palloc ( sizeof ( XLogRecPtr ) * len ) ;
apply_array = ( XLogRecPtr * ) palloc ( sizeof ( XLogRecPtr ) * len ) ;
foreach ( cell , sync_standbys )
{
WalSnd * walsnd = & WalSndCtl - > walsnds [ lfirst_int ( cell ) ] ;
SpinLockAcquire ( & walsnd - > mutex ) ;
write_array [ i ] = walsnd - > write ;
flush_array [ i ] = walsnd - > flush ;
apply_array [ i ] = walsnd - > apply ;
SpinLockRelease ( & walsnd - > mutex ) ;
i + + ;
}
qsort ( write_array , len , sizeof ( XLogRecPtr ) , cmp_lsn ) ;
qsort ( flush_array , len , sizeof ( XLogRecPtr ) , cmp_lsn ) ;
qsort ( apply_array , len , sizeof ( XLogRecPtr ) , cmp_lsn ) ;
/* Get Nth latest Write, Flush, Apply positions */
* writePtr = write_array [ nth - 1 ] ;
* flushPtr = flush_array [ nth - 1 ] ;
* applyPtr = apply_array [ nth - 1 ] ;
pfree ( write_array ) ;
pfree ( flush_array ) ;
pfree ( apply_array ) ;
}
/*
* Compare lsn in order to sort array in descending order .
*/
static int
cmp_lsn ( const void * a , const void * b )
{
XLogRecPtr lsn1 = * ( ( const XLogRecPtr * ) a ) ;
XLogRecPtr lsn2 = * ( ( const XLogRecPtr * ) b ) ;
if ( lsn1 > lsn2 )
return - 1 ;
else if ( lsn1 = = lsn2 )
return 0 ;
else
return 1 ;
}
/*
* Return the list of sync standbys , or NIL if no sync standby is connected .
*
* If there are multiple standbys with the same priority ,
* the first one found is selected preferentially .
* The caller must hold SyncRepLock .
*
* On return , * am_sync is set to true if this walsender is connecting to
* sync standby . Otherwise it ' s set to false .
*/
List *
SyncRepGetSyncStandbys ( bool * am_sync )
SyncRepGetSyncStandbys ( bool * am_sync )
{
/* Set default result */
if ( am_sync ! = NULL )
* am_sync = false ;
/* Quick exit if sync replication is not requested */
if ( SyncRepConfig = = NULL )
return NIL ;
return ( SyncRepConfig - > syncrep_method = = SYNC_REP_PRIORITY ) ?
SyncRepGetSyncStandbysPriority ( am_sync ) :
SyncRepGetSyncStandbysQuorum ( am_sync ) ;
}
/*
* Return the list of all the candidates for quorum sync standbys ,
* or NIL if no such standby is connected .
*
* The caller must hold SyncRepLock . This function must be called only in
* a quorum - based sync replication .
*
* On return , * am_sync is set to true if this walsender is connecting to
* sync standby . Otherwise it ' s set to false .
*/
static List *
SyncRepGetSyncStandbysQuorum ( bool * am_sync )
{
List * result = NIL ;
int i ;
volatile WalSnd * walsnd ; /* Use volatile pointer to prevent code
* rearrangement */
Assert ( SyncRepConfig - > syncrep_method = = SYNC_REP_QUORUM ) ;
for ( i = 0 ; i < max_wal_senders ; i + + )
{
walsnd = & WalSndCtl - > walsnds [ i ] ;
/* Must be active */
if ( walsnd - > pid = = 0 )
continue ;
/* Must be streaming */
if ( walsnd - > state ! = WALSNDSTATE_STREAMING )
continue ;
/* Must be synchronous */
if ( walsnd - > sync_standby_priority = = 0 )
continue ;
/* Must have a valid flush position */
if ( XLogRecPtrIsInvalid ( walsnd - > flush ) )
continue ;
/*
* Consider this standby as a candidate for quorum sync standbys
* and append it to the result .
*/
result = lappend_int ( result , i ) ;
if ( am_sync ! = NULL & & walsnd = = MyWalSnd )
* am_sync = true ;
}
return result ;
}
/*
* Return the list of sync standbys chosen based on their priorities ,
* or NIL if no sync standby is connected .
*
* If there are multiple standbys with the same priority ,
* the first one found is selected preferentially .
*
* The caller must hold SyncRepLock . This function must be called only in
* a priority - based sync replication .
*
* On return , * am_sync is set to true if this walsender is connecting to
* sync standby . Otherwise it ' s set to false .
*/
static List *
SyncRepGetSyncStandbysPriority ( bool * am_sync )
{
List * result = NIL ;
List * pending = NIL ;
@ -560,13 +763,7 @@ SyncRepGetSyncStandbys(bool *am_sync)
volatile WalSnd * walsnd ; /* Use volatile pointer to prevent code
* rearrangement */
/* Set default result */
if ( am_sync ! = NULL )
* am_sync = false ;
/* Quick exit if sync replication is not requested */
if ( SyncRepConfig = = NULL )
return NIL ;
Assert ( SyncRepConfig - > syncrep_method = = SYNC_REP_PRIORITY ) ;
lowest_priority = SyncRepConfig - > nmembers ;
next_highest_priority = lowest_priority + 1 ;