@ -68,19 +68,9 @@
* memory gets actually freed .
*
* We use a max - heap with transaction size as the key to efficiently find
* the largest transaction . While the max - heap is empty , we don ' t update
* the max - heap when updating the memory counter . Therefore , we can get
* the largest transaction in O ( N ) time , where N is the number of
* transactions including top - level transactions and subtransactions .
*
* We build the max - heap just before selecting the largest transactions
* if the number of transactions being decoded is higher than the threshold ,
* MAX_HEAP_TXN_COUNT_THRESHOLD . After building the max - heap , we also
* update the max - heap when updating the memory counter . The intention is
* to efficiently find the largest transaction in O ( 1 ) time instead of
* incurring the cost of memory counter updates ( O ( log N ) ) . Once the number
* of transactions got lower than the threshold , we reset the max - heap
* ( refer to ReorderBufferMaybeResetMaxHeap ( ) for details ) .
* the largest transaction . We update the max - heap whenever the memory
* counter is updated ; however transactions with size 0 are not stored in
* the heap , because they have no changes to evict .
*
* We still rely on max_changes_in_memory when loading serialized changes
* back into memory . At that point we can ' t use the memory limit directly
@ -122,23 +112,6 @@
# include "utils/rel.h"
# include "utils/relfilenumbermap.h"
/*
* Threshold of the total number of top - level and sub transactions that
* controls whether we use the max - heap for tracking their sizes . Although
* using the max - heap to select the largest transaction is effective when
* there are many transactions being decoded , maintaining the max - heap while
* updating the memory statistics can be costly . Therefore , we use
* MaxConnections as the threshold so that we use the max - heap only when
* using subtransactions .
*/
# define MAX_HEAP_TXN_COUNT_THRESHOLD MaxConnections
/*
* A macro to check if the max - heap is ready to use and needs to be updated
* accordingly .
*/
# define ReorderBufferMaxHeapIsReady(rb) !binaryheap_empty((rb)->txn_heap)
/* entry for a hash table we use to map from xid to our transaction state */
typedef struct ReorderBufferTXNByIdEnt
{
@ -290,9 +263,7 @@ static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
static void ReorderBufferCleanupSerializedTXNs ( const char * slotname ) ;
static void ReorderBufferSerializedPath ( char * path , ReplicationSlot * slot ,
TransactionId xid , XLogSegNo segno ) ;
static void ReorderBufferBuildMaxHeap ( ReorderBuffer * rb ) ;
static void ReorderBufferMaybeResetMaxHeap ( ReorderBuffer * rb ) ;
static int ReorderBufferTXNSizeCompare ( Datum a , Datum b , void * arg ) ;
static int ReorderBufferTXNSizeCompare ( const pairingheap_node * a , const pairingheap_node * b , void * arg ) ;
static void ReorderBufferFreeSnap ( ReorderBuffer * rb , Snapshot snap ) ;
static Snapshot ReorderBufferCopySnap ( ReorderBuffer * rb , Snapshot orig_snap ,
@ -390,16 +361,8 @@ ReorderBufferAllocate(void)
buffer - > outbufsize = 0 ;
buffer - > size = 0 ;
/*
* The binaryheap is indexed for faster manipulations .
*
* We allocate the initial heap size greater than
* MAX_HEAP_TXN_COUNT_THRESHOLD because the txn_heap will not be used
* until the threshold is exceeded .
*/
buffer - > txn_heap = binaryheap_allocate ( MAX_HEAP_TXN_COUNT_THRESHOLD * 2 ,
ReorderBufferTXNSizeCompare ,
true , NULL ) ;
/* txn_heap is ordered by transaction size */
buffer - > txn_heap = pairingheap_allocate ( ReorderBufferTXNSizeCompare , NULL ) ;
buffer - > spillTxns = 0 ;
buffer - > spillCount = 0 ;
@ -1637,12 +1600,6 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* deallocate */
ReorderBufferReturnTXN ( rb , txn ) ;
/*
* After cleaning up one transaction , the number of transactions might get
* lower than the threshold for the max - heap .
*/
ReorderBufferMaybeResetMaxHeap ( rb ) ;
}
/*
@ -3265,20 +3222,18 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
if ( addition )
{
Size oldsize = txn - > size ;
txn - > size + = sz ;
rb - > size + = sz ;
/* Update the total size in the top transaction. */
toptxn - > total_size + = sz ;
/* Update the max-heap as well if necessary */
if ( ReorderBufferMaxHeapIsReady ( rb ) )
{
if ( ( txn - > size - sz ) = = 0 )
binaryheap_add ( rb - > txn_heap , PointerGetDatum ( txn ) ) ;
else
binaryheap_update_up ( rb - > txn_heap , PointerGetDatum ( txn ) ) ;
}
/* Update the max-heap */
if ( oldsize ! = 0 )
pairingheap_remove ( rb - > txn_heap , & txn - > txn_node ) ;
pairingheap_add ( rb - > txn_heap , & txn - > txn_node ) ;
}
else
{
@ -3289,14 +3244,10 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
/* Update the total size in the top transaction. */
toptxn - > total_size - = sz ;
/* Update the max-heap as well if necessary */
if ( ReorderBufferMaxHeapIsReady ( rb ) )
{
if ( txn - > size = = 0 )
binaryheap_remove_node_ptr ( rb - > txn_heap , PointerGetDatum ( txn ) ) ;
else
binaryheap_update_down ( rb - > txn_heap , PointerGetDatum ( txn ) ) ;
}
/* Update the max-heap */
pairingheap_remove ( rb - > txn_heap , & txn - > txn_node ) ;
if ( txn - > size ! = 0 )
pairingheap_add ( rb - > txn_heap , & txn - > txn_node ) ;
}
Assert ( txn - > size < = rb - > size ) ;
@ -3555,10 +3506,10 @@ ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
/* Compare two transactions by size */
static int
ReorderBufferTXNSizeCompare ( Datum a , Datum b , void * arg )
ReorderBufferTXNSizeCompare ( const pairingheap_node * a , const pairingheap_node * b , void * arg )
{
ReorderBufferTXN * ta = ( ReorderBufferTXN * ) DatumGetPointer ( a ) ;
ReorderBufferTXN * tb = ( ReorderBufferTXN * ) DatumGetPointer ( b ) ;
const ReorderBufferTXN * ta = pairingheap_const_container ( ReorderBufferTXN , txn_node , a ) ;
const ReorderBufferTXN * tb = pairingheap_const_container ( ReorderBufferTXN , txn_node , b ) ;
if ( ta - > size < tb - > size )
return - 1 ;
@ -3568,106 +3519,16 @@ ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg)
}
/*
* Build the max - heap . The heap assembly step is deferred until the end , for
* efficiency .
*/
static void
ReorderBufferBuildMaxHeap ( ReorderBuffer * rb )
{
HASH_SEQ_STATUS hash_seq ;
ReorderBufferTXNByIdEnt * ent ;
Assert ( binaryheap_empty ( rb - > txn_heap ) ) ;
hash_seq_init ( & hash_seq , rb - > by_txn ) ;
while ( ( ent = hash_seq_search ( & hash_seq ) ) ! = NULL )
{
ReorderBufferTXN * txn = ent - > txn ;
if ( txn - > size = = 0 )
continue ;
binaryheap_add_unordered ( rb - > txn_heap , PointerGetDatum ( txn ) ) ;
}
binaryheap_build ( rb - > txn_heap ) ;
}
/*
* Reset the max - heap if the number of transactions got lower than the
* threshold .
*/
static void
ReorderBufferMaybeResetMaxHeap ( ReorderBuffer * rb )
{
/*
* If we add and remove transactions right around the threshold , we could
* easily end up " thrashing " . To avoid it , we adapt 10 % of transactions to
* reset the max - heap .
*/
if ( ReorderBufferMaxHeapIsReady ( rb ) & &
binaryheap_size ( rb - > txn_heap ) < MAX_HEAP_TXN_COUNT_THRESHOLD * 0.9 )
binaryheap_reset ( rb - > txn_heap ) ;
}
/*
* Find the largest transaction ( toplevel or subxact ) to evict ( spill to disk )
* by doing a linear search or using the max - heap depending on the number of
* transactions in ReorderBuffer . Refer to the comments atop this file for the
* algorithm details .
* Find the largest transaction ( toplevel or subxact ) to evict ( spill to disk ) .
*/
static ReorderBufferTXN *
ReorderBufferLargestTXN ( ReorderBuffer * rb )
{
ReorderBufferTXN * largest = NULL ;
if ( ! ReorderBufferMaxHeapIsReady ( rb ) )
{
/*
* If the number of transactions are small , we scan all transactions
* being decoded to get the largest transaction . This saves the cost
* of building a max - heap with a small number of transactions .
*/
if ( hash_get_num_entries ( rb - > by_txn ) < MAX_HEAP_TXN_COUNT_THRESHOLD )
{
HASH_SEQ_STATUS hash_seq ;
ReorderBufferTXNByIdEnt * ent ;
hash_seq_init ( & hash_seq , rb - > by_txn ) ;
while ( ( ent = hash_seq_search ( & hash_seq ) ) ! = NULL )
{
ReorderBufferTXN * txn = ent - > txn ;
/* if the current transaction is larger, remember it */
if ( ( ! largest ) | | ( txn - > size > largest - > size ) )
largest = txn ;
}
Assert ( largest ) ;
}
else
{
/*
* There are a large number of transactions in ReorderBuffer . We
* build the max - heap for efficiently selecting the largest
* transactions .
*/
ReorderBufferBuildMaxHeap ( rb ) ;
/*
* The max - heap is ready now . We remain the max - heap at least
* until we free up enough transactions to bring the total memory
* usage below the limit . The largest transaction is selected
* below .
*/
Assert ( ReorderBufferMaxHeapIsReady ( rb ) ) ;
}
}
ReorderBufferTXN * largest ;
/* Get the largest transaction from the max-heap */
if ( ReorderBufferMaxHeapIsReady ( rb ) )
largest = ( ReorderBufferTXN * )
DatumGetPointer ( binaryheap_first ( rb - > txn_heap ) ) ;
largest = pairingheap_container ( ReorderBufferTXN , txn_node ,
pairingheap_first ( rb - > txn_heap ) ) ;
Assert ( largest ) ;
Assert ( largest - > size > 0 ) ;
@ -3812,12 +3673,6 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
/* We must be under the memory limit now. */
Assert ( rb - > size < logical_decoding_work_mem * 1024L ) ;
/*
* After evicting some transactions , the number of transactions might get
* lower than the threshold for the max - heap .
*/
ReorderBufferMaybeResetMaxHeap ( rb ) ;
}
/*