@ -67,6 +67,21 @@
* allocator , evicting the oldest changes would make it more likely the
* memory gets actually freed .
*
* We use a max - heap with transaction size as the key to efficiently find
* the largest transaction . While the max - heap is empty , we don ' t update
* the max - heap when updating the memory counter . Therefore , we can get
* the largest transaction in O ( N ) time , where N is the number of
* transactions including top - level transactions and subtransactions .
*
* We build the max - heap just before selecting the largest transactions
* if the number of transactions being decoded is higher than the threshold ,
* MAX_HEAP_TXN_COUNT_THRESHOLD . After building the max - heap , we also
* update the max - heap when updating the memory counter . The intention is
* to efficiently find the largest transaction in O ( 1 ) time instead of
* incurring the cost of memory counter updates ( O ( log N ) ) . Once the number
* of transactions got lower than the threshold , we reset the max - heap
* ( refer to ReorderBufferMaybeResetMaxHeap ( ) for details ) .
*
* We still rely on max_changes_in_memory when loading serialized changes
* back into memory . At that point we can ' t use the memory limit directly
* as we load the subxacts independently . One option to deal with this
@ -107,6 +122,22 @@
# include "utils/rel.h"
# include "utils/relfilenumbermap.h"
/*
* Threshold of the total number of top - level and sub transactions that
* controls whether we use the max - heap for tracking their sizes . Although
* using the max - heap to select the largest transaction is effective when
* there are many transactions being decoded , maintaining the max - heap while
* updating the memory statistics can be costly . Therefore , we use
* MaxConnections as the threshold so that we use the max - heap only when
* using subtransactions .
*/
# define MAX_HEAP_TXN_COUNT_THRESHOLD MaxConnections
/*
* A macro to check if the max - heap is ready to use and needs to be updated
* accordingly .
*/
# define ReorderBufferMaxHeapIsReady(rb) !binaryheap_empty((rb)->txn_heap)
/* entry for a hash table we use to map from xid to our transaction state */
typedef struct ReorderBufferTXNByIdEnt
@ -259,6 +290,9 @@ static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
static void ReorderBufferCleanupSerializedTXNs ( const char * slotname ) ;
static void ReorderBufferSerializedPath ( char * path , ReplicationSlot * slot ,
TransactionId xid , XLogSegNo segno ) ;
static void ReorderBufferBuildMaxHeap ( ReorderBuffer * rb ) ;
static void ReorderBufferMaybeResetMaxHeap ( ReorderBuffer * rb ) ;
static int ReorderBufferTXNSizeCompare ( Datum a , Datum b , void * arg ) ;
static void ReorderBufferFreeSnap ( ReorderBuffer * rb , Snapshot snap ) ;
static Snapshot ReorderBufferCopySnap ( ReorderBuffer * rb , Snapshot orig_snap ,
@ -293,6 +327,7 @@ static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *t
static Size ReorderBufferChangeSize ( ReorderBufferChange * change ) ;
static void ReorderBufferChangeMemoryUpdate ( ReorderBuffer * rb ,
ReorderBufferChange * change ,
ReorderBufferTXN * txn ,
bool addition , Size sz ) ;
/*
@ -355,6 +390,17 @@ ReorderBufferAllocate(void)
buffer - > outbufsize = 0 ;
buffer - > size = 0 ;
/*
* The binaryheap is indexed for faster manipulations .
*
* We allocate the initial heap size greater than
* MAX_HEAP_TXN_COUNT_THRESHOLD because the txn_heap will not be used
* until the threshold is exceeded .
*/
buffer - > txn_heap = binaryheap_allocate ( MAX_HEAP_TXN_COUNT_THRESHOLD * 2 ,
ReorderBufferTXNSizeCompare ,
true , NULL ) ;
buffer - > spillTxns = 0 ;
buffer - > spillCount = 0 ;
buffer - > spillBytes = 0 ;
@ -485,7 +531,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
{
/* update memory accounting info */
if ( upd_mem )
ReorderBufferChangeMemoryUpdate ( rb , change , false ,
ReorderBufferChangeMemoryUpdate ( rb , change , NULL , false ,
ReorderBufferChangeSize ( change ) ) ;
/* free contained data */
@ -816,7 +862,7 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
txn - > nentries_mem + + ;
/* update memory accounting information */
ReorderBufferChangeMemoryUpdate ( rb , change , true ,
ReorderBufferChangeMemoryUpdate ( rb , change , NULL , true ,
ReorderBufferChangeSize ( change ) ) ;
/* process partial change */
@ -1527,7 +1573,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* Check we're not mixing changes from different transactions. */
Assert ( change - > txn = = txn ) ;
ReorderBufferReturnChange ( rb , change , tru e) ;
ReorderBufferReturnChange ( rb , change , fals e) ;
}
/*
@ -1586,8 +1632,17 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
if ( rbtxn_is_serialized ( txn ) )
ReorderBufferRestoreCleanup ( rb , txn ) ;
/* Update the memory counter */
ReorderBufferChangeMemoryUpdate ( rb , NULL , txn , false , txn - > size ) ;
/* deallocate */
ReorderBufferReturnTXN ( rb , txn ) ;
/*
* After cleaning up one transaction , the number of transactions might get
* lower than the threshold for the max - heap .
*/
ReorderBufferMaybeResetMaxHeap ( rb ) ;
}
/*
@ -1637,9 +1692,12 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
/* remove the change from it's containing list */
dlist_delete ( & change - > node ) ;
ReorderBufferReturnChange ( rb , change , tru e) ;
ReorderBufferReturnChange ( rb , change , fals e) ;
}
/* Update the memory counter */
ReorderBufferChangeMemoryUpdate ( rb , NULL , txn , false , txn - > size ) ;
/*
* Mark the transaction as streamed .
*
@ -3166,6 +3224,9 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
* decide if we reached the memory limit , the transaction counter allows
* us to quickly pick the largest transaction for eviction .
*
* Either txn or change must be non - NULL at least . We update the memory
* counter of txn if it ' s non - NULL , otherwise change - > txn .
*
* When streaming is enabled , we need to update the toplevel transaction
* counters instead - we don ' t really care about subtransactions as we
* can ' t stream them individually anyway , and we only pick toplevel
@ -3174,22 +3235,27 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
static void
ReorderBufferChangeMemoryUpdate ( ReorderBuffer * rb ,
ReorderBufferChange * change ,
ReorderBufferTXN * txn ,
bool addition , Size sz )
{
ReorderBufferTXN * txn ;
ReorderBufferTXN * toptxn ;
Assert ( change - > txn ) ;
Assert ( txn | | change ) ;
/*
* Ignore tuple CID changes , because those are not evicted when reaching
* memory limit . So we just don ' t count them , because it might easily
* trigger a pointless attempt to spill .
*/
if ( change - > action = = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID )
if ( change & & change - > action = = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID )
return ;
txn = change - > txn ;
if ( sz = = 0 )
return ;
if ( txn = = NULL )
txn = change - > txn ;
Assert ( txn ! = NULL ) ;
/*
* Update the total size in top level as well . This is later used to
@ -3204,6 +3270,15 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
/* Update the total size in the top transaction. */
toptxn - > total_size + = sz ;
/* Update the max-heap as well if necessary */
if ( ReorderBufferMaxHeapIsReady ( rb ) )
{
if ( ( txn - > size - sz ) = = 0 )
binaryheap_add ( rb - > txn_heap , PointerGetDatum ( txn ) ) ;
else
binaryheap_update_up ( rb - > txn_heap , PointerGetDatum ( txn ) ) ;
}
}
else
{
@ -3213,6 +3288,15 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
/* Update the total size in the top transaction. */
toptxn - > total_size - = sz ;
/* Update the max-heap as well if necessary */
if ( ReorderBufferMaxHeapIsReady ( rb ) )
{
if ( txn - > size = = 0 )
binaryheap_remove_node_ptr ( rb - > txn_heap , PointerGetDatum ( txn ) ) ;
else
binaryheap_update_down ( rb - > txn_heap , PointerGetDatum ( txn ) ) ;
}
}
Assert ( txn - > size < = rb - > size ) ;
@ -3468,34 +3552,123 @@ ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
}
}
/* Compare two transactions by size */
static int
ReorderBufferTXNSizeCompare ( Datum a , Datum b , void * arg )
{
ReorderBufferTXN * ta = ( ReorderBufferTXN * ) DatumGetPointer ( a ) ;
ReorderBufferTXN * tb = ( ReorderBufferTXN * ) DatumGetPointer ( b ) ;
if ( ta - > size < tb - > size )
return - 1 ;
if ( ta - > size > tb - > size )
return 1 ;
return 0 ;
}
/*
* Find the largest transaction ( toplevel or subxact ) to evict ( spill to disk ) .
*
* XXX With many subtransactions this might be quite slow , because we ' ll have
* to walk through all of them . There are some options how we could improve
* that : ( a ) maintain some secondary structure with transactions sorted by
* amount of changes , ( b ) not looking for the entirely largest transaction ,
* but e . g . for transaction using at least some fraction of the memory limit ,
* and ( c ) evicting multiple transactions at once , e . g . to free a given portion
* of the memory limit ( e . g . 50 % ) .
* Build the max - heap . The heap assembly step is deferred until the end , for
* efficiency .
*/
static ReorderBufferTXN *
ReorderBufferLargestTXN ( ReorderBuffer * rb )
static void
ReorderBufferBuildMaxHeap ( ReorderBuffer * rb )
{
HASH_SEQ_STATUS hash_seq ;
ReorderBufferTXNByIdEnt * ent ;
ReorderBufferTXN * largest = NULL ;
Assert ( binaryheap_empty ( rb - > txn_heap ) ) ;
hash_seq_init ( & hash_seq , rb - > by_txn ) ;
while ( ( ent = hash_seq_search ( & hash_seq ) ) ! = NULL )
{
ReorderBufferTXN * txn = ent - > txn ;
/* if the current transaction is larger, remember it */
if ( ( ! largest ) | | ( txn - > size > largest - > size ) )
largest = txn ;
if ( txn - > size = = 0 )
continue ;
binaryheap_add_unordered ( rb - > txn_heap , PointerGetDatum ( txn ) ) ;
}
binaryheap_build ( rb - > txn_heap ) ;
}
/*
* Reset the max - heap if the number of transactions got lower than the
* threshold .
*/
static void
ReorderBufferMaybeResetMaxHeap ( ReorderBuffer * rb )
{
/*
* If we add and remove transactions right around the threshold , we could
* easily end up " thrashing " . To avoid it , we adapt 10 % of transactions to
* reset the max - heap .
*/
if ( ReorderBufferMaxHeapIsReady ( rb ) & &
binaryheap_size ( rb - > txn_heap ) < MAX_HEAP_TXN_COUNT_THRESHOLD * 0.9 )
binaryheap_reset ( rb - > txn_heap ) ;
}
/*
* Find the largest transaction ( toplevel or subxact ) to evict ( spill to disk )
* by doing a linear search or using the max - heap depending on the number of
* transactions in ReorderBuffer . Refer to the comments atop this file for the
* algorithm details .
*/
static ReorderBufferTXN *
ReorderBufferLargestTXN ( ReorderBuffer * rb )
{
ReorderBufferTXN * largest = NULL ;
if ( ! ReorderBufferMaxHeapIsReady ( rb ) )
{
/*
* If the number of transactions are small , we scan all transactions
* being decoded to get the largest transaction . This saves the cost
* of building a max - heap with a small number of transactions .
*/
if ( hash_get_num_entries ( rb - > by_txn ) < MAX_HEAP_TXN_COUNT_THRESHOLD )
{
HASH_SEQ_STATUS hash_seq ;
ReorderBufferTXNByIdEnt * ent ;
hash_seq_init ( & hash_seq , rb - > by_txn ) ;
while ( ( ent = hash_seq_search ( & hash_seq ) ) ! = NULL )
{
ReorderBufferTXN * txn = ent - > txn ;
/* if the current transaction is larger, remember it */
if ( ( ! largest ) | | ( txn - > size > largest - > size ) )
largest = txn ;
}
Assert ( largest ) ;
}
else
{
/*
* There are a large number of transactions in ReorderBuffer . We
* build the max - heap for efficiently selecting the largest
* transactions .
*/
ReorderBufferBuildMaxHeap ( rb ) ;
/*
* The max - heap is ready now . We remain the max - heap at least
* until we free up enough transactions to bring the total memory
* usage below the limit . The largest transaction is selected
* below .
*/
Assert ( ReorderBufferMaxHeapIsReady ( rb ) ) ;
}
}
/* Get the largest transaction from the max-heap */
if ( ReorderBufferMaxHeapIsReady ( rb ) )
largest = ( ReorderBufferTXN * )
DatumGetPointer ( binaryheap_first ( rb - > txn_heap ) ) ;
Assert ( largest ) ;
Assert ( largest - > size > 0 ) ;
Assert ( largest - > size < = rb - > size ) ;
@ -3638,6 +3811,13 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
/* We must be under the memory limit now. */
Assert ( rb - > size < logical_decoding_work_mem * 1024L ) ;
/*
* After evicting some transactions , the number of transactions might get
* lower than the threshold for the max - heap .
*/
ReorderBufferMaybeResetMaxHeap ( rb ) ;
}
/*
@ -3705,11 +3885,14 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
ReorderBufferSerializeChange ( rb , txn , fd , change ) ;
dlist_delete ( & change - > node ) ;
ReorderBufferReturnChange ( rb , change , tru e) ;
ReorderBufferReturnChange ( rb , change , fals e) ;
spilled + + ;
}
/* Update the memory counter */
ReorderBufferChangeMemoryUpdate ( rb , NULL , txn , false , size ) ;
/* update the statistics iff we have spilled anything */
if ( spilled )
{
@ -4491,7 +4674,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
* update the accounting too ( subtracting the size from the counters ) . And
* we don ' t want to underflow there .
*/
ReorderBufferChangeMemoryUpdate ( rb , change , true ,
ReorderBufferChangeMemoryUpdate ( rb , change , NULL , true ,
ReorderBufferChangeSize ( change ) ) ;
}
@ -4903,9 +5086,9 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
MemoryContextSwitchTo ( oldcontext ) ;
/* subtract the old change size */
ReorderBufferChangeMemoryUpdate ( rb , change , false , old_size ) ;
ReorderBufferChangeMemoryUpdate ( rb , change , NULL , false , old_size ) ;
/* now add the change back, with the correct size */
ReorderBufferChangeMemoryUpdate ( rb , change , true ,
ReorderBufferChangeMemoryUpdate ( rb , change , NULL , true ,
ReorderBufferChangeSize ( change ) ) ;
}