@ -49,6 +49,34 @@
* GenerationContext for the variable - length transaction data ( allocated
* and freed in groups with similar lifespan ) .
*
* To limit the amount of memory used by decoded changes , we track memory
* used at the reorder buffer level ( i . e . total amount of memory ) , and for
* each transaction . When the total amount of used memory exceeds the
* limit , the transaction consuming the most memory is then serialized to
* disk .
*
* Only decoded changes are evicted from memory ( spilled to disk ) , not the
* transaction records . The number of toplevel transactions is limited ,
* but a transaction with many subtransactions may still consume significant
* amounts of memory . The transaction records are fairly small , though , and
* are not included in the memory limit .
*
* The current eviction algorithm is very simple - the transaction is
* picked merely by size , while it might be useful to also consider age
* ( LSN ) of the changes for example . With the new Generational memory
* allocator , evicting the oldest changes would make it more likely the
* memory gets actually freed .
*
* We still rely on max_changes_in_memory when loading serialized changes
* back into memory . At that point we can ' t use the memory limit directly
* as we load the subxacts independently . One option do deal with this
* would be to count the subxacts , and allow each to allocate 1 / N of the
* memory limit . That however does not seem very appealing , because with
* many subtransactions it may easily cause trashing ( short cycles of
* deserializing and applying very few changes ) . We probably should give
* a bit more memory to the oldest subtransactions , because it ' s likely
* the source for the next sequence of changes .
*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
# include "postgres.h"
@ -154,7 +182,8 @@ typedef struct ReorderBufferDiskChange
* resource management here , but it ' s not entirely clear what that would look
* like .
*/
static const Size max_changes_in_memory = 4096 ;
int logical_decoding_work_mem ;
static const Size max_changes_in_memory = 4096 ; /* XXX for restore only */
/* ---------------------------------------
* primary reorderbuffer support routines
@ -189,7 +218,7 @@ static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTX
* Disk serialization support functions
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
static void ReorderBufferCheckSerializeTXN ( ReorderBuffer * rb , ReorderBufferTXN * txn ) ;
static void ReorderBufferCheckMemoryLimit ( ReorderBuffer * rb ) ;
static void ReorderBufferSerializeTXN ( ReorderBuffer * rb , ReorderBufferTXN * txn ) ;
static void ReorderBufferSerializeChange ( ReorderBuffer * rb , ReorderBufferTXN * txn ,
int fd , ReorderBufferChange * change ) ;
@ -217,6 +246,14 @@ static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
static void ReorderBufferToastAppendChunk ( ReorderBuffer * rb , ReorderBufferTXN * txn ,
Relation relation , ReorderBufferChange * change ) ;
/*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
* memory accounting
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
static Size ReorderBufferChangeSize ( ReorderBufferChange * change ) ;
static void ReorderBufferChangeMemoryUpdate ( ReorderBuffer * rb ,
ReorderBufferChange * change , bool addition ) ;
/*
* Allocate a new ReorderBuffer and clean out any old serialized state from
@ -269,6 +306,7 @@ ReorderBufferAllocate(void)
buffer - > outbuf = NULL ;
buffer - > outbufsize = 0 ;
buffer - > size = 0 ;
buffer - > current_restart_decoding_lsn = InvalidXLogRecPtr ;
@ -374,6 +412,9 @@ ReorderBufferGetChange(ReorderBuffer *rb)
void
ReorderBufferReturnChange ( ReorderBuffer * rb , ReorderBufferChange * change )
{
/* update memory accounting info */
ReorderBufferChangeMemoryUpdate ( rb , change , false ) ;
/* free contained data */
switch ( change - > action )
{
@ -585,12 +626,18 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
txn = ReorderBufferTXNByXid ( rb , xid , true , NULL , lsn , true ) ;
change - > lsn = lsn ;
change - > txn = txn ;
Assert ( InvalidXLogRecPtr ! = lsn ) ;
dlist_push_tail ( & txn - > changes , & change - > node ) ;
txn - > nentries + + ;
txn - > nentries_mem + + ;
ReorderBufferCheckSerializeTXN ( rb , txn ) ;
/* update memory accounting information */
ReorderBufferChangeMemoryUpdate ( rb , change , true ) ;
/* check the memory limits and evict something if needed */
ReorderBufferCheckMemoryLimit ( rb ) ;
}
/*
@ -1217,6 +1264,9 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
change = dlist_container ( ReorderBufferChange , node , iter . cur ) ;
/* Check we're not mixing changes from different transactions. */
Assert ( change - > txn = = txn ) ;
ReorderBufferReturnChange ( rb , change ) ;
}
@ -1229,7 +1279,11 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
ReorderBufferChange * change ;
change = dlist_container ( ReorderBufferChange , node , iter . cur ) ;
/* Check we're not mixing changes from different transactions. */
Assert ( change - > txn = = txn ) ;
Assert ( change - > action = = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID ) ;
ReorderBufferReturnChange ( rb , change ) ;
}
@ -2082,9 +2136,48 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
ReorderBufferQueueChange ( rb , xid , lsn , change ) ;
}
/*
* Update the memory accounting info . We track memory used by the whole
* reorder buffer and the transaction containing the change .
*/
static void
ReorderBufferChangeMemoryUpdate ( ReorderBuffer * rb ,
ReorderBufferChange * change ,
bool addition )
{
Size sz ;
Assert ( change - > txn ) ;
/*
* Ignore tuple CID changes , because those are not evicted when reaching
* memory limit . So we just don ' t count them , because it might easily
* trigger a pointless attempt to spill .
*/
if ( change - > action = = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID )
return ;
sz = ReorderBufferChangeSize ( change ) ;
if ( addition )
{
change - > txn - > size + = sz ;
rb - > size + = sz ;
}
else
{
Assert ( ( rb - > size > = sz ) & & ( change - > txn - > size > = sz ) ) ;
change - > txn - > size - = sz ;
rb - > size - = sz ;
}
}
/*
* Add new ( relfilenode , tid ) - > ( cmin , cmax ) mappings .
*
* We do not include this change type in memory accounting , because we
* keep CIDs in a separate list and do not evict them when reaching
* the memory limit .
*/
void
ReorderBufferAddNewTupleCids ( ReorderBuffer * rb , TransactionId xid ,
@ -2103,6 +2196,7 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
change - > data . tuplecid . cmax = cmax ;
change - > data . tuplecid . combocid = combocid ;
change - > lsn = lsn ;
change - > txn = txn ;
change - > action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID ;
dlist_push_tail ( & txn - > tuplecids , & change - > node ) ;
@ -2230,20 +2324,84 @@ ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
}
/*
* Check whether the transaction tx should spill its data to disk .
* Find the largest transaction ( toplevel or subxact ) to evict ( spill to disk ) .
*
* XXX With many subtransactions this might be quite slow , because we ' ll have
* to walk through all of them . There are some options how we could improve
* that : ( a ) maintain some secondary structure with transactions sorted by
* amount of changes , ( b ) not looking for the entirely largest transaction ,
* but e . g . for transaction using at least some fraction of the memory limit ,
* and ( c ) evicting multiple transactions at once , e . g . to free a given portion
* of the memory limit ( e . g . 50 % ) .
*/
static ReorderBufferTXN *
ReorderBufferLargestTXN ( ReorderBuffer * rb )
{
HASH_SEQ_STATUS hash_seq ;
ReorderBufferTXNByIdEnt * ent ;
ReorderBufferTXN * largest = NULL ;
hash_seq_init ( & hash_seq , rb - > by_txn ) ;
while ( ( ent = hash_seq_search ( & hash_seq ) ) ! = NULL )
{
ReorderBufferTXN * txn = ent - > txn ;
/* if the current transaction is larger, remember it */
if ( ( ! largest ) | | ( txn - > size > largest - > size ) )
largest = txn ;
}
Assert ( largest ) ;
Assert ( largest - > size > 0 ) ;
Assert ( largest - > size < = rb - > size ) ;
return largest ;
}
/*
* Check whether the logical_decoding_work_mem limit was reached , and if yes
* pick the transaction to evict and spill the changes to disk .
*
* XXX At this point we select just a single ( largest ) transaction , but
* we might also adapt a more elaborate eviction strategy - for example
* evicting enough transactions to free certain fraction ( e . g . 50 % ) of
* the memory limit .
*/
static void
ReorderBufferCheckSerializeTXN ( ReorderBuffer * rb , ReorderBufferTXN * txn )
ReorderBufferCheckMemoryLimit ( ReorderBuffer * rb )
{
ReorderBufferTXN * txn ;
/* bail out if we haven't exceeded the memory limit */
if ( rb - > size < logical_decoding_work_mem * 1024L )
return ;
/*
* TODO : improve accounting so we cheaply can take subtransactions into
* account here .
* Pick the largest transaction ( or subtransaction ) and evict it from
* memory by serializing it to disk .
*/
if ( txn - > nentries_mem > = max_changes_in_memory )
{
ReorderBufferSerializeTXN ( rb , txn ) ;
Assert ( txn - > nentries_mem = = 0 ) ;
}
txn = ReorderBufferLargestTXN ( rb ) ;
ReorderBufferSerializeTXN ( rb , txn ) ;
/*
* After eviction , the transaction should have no entries in memory , and
* should use 0 bytes for changes .
*/
Assert ( txn - > size = = 0 ) ;
Assert ( txn - > nentries_mem = = 0 ) ;
/*
* And furthermore , evicting the transaction should get us below the
* memory limit again - it is not possible that we ' re still exceeding the
* memory limit after evicting the transaction .
*
* This follows from the simple fact that the selected transaction is at
* least as large as the most recent change ( which caused us to go over
* the memory limit ) . So by evicting it we ' re definitely back below the
* memory limit .
*/
Assert ( rb - > size < logical_decoding_work_mem * 1024L ) ;
}
/*
@ -2512,6 +2670,84 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
Assert ( ondisk - > change . action = = change - > action ) ;
}
/*
* Size of a change in memory .
*/
static Size
ReorderBufferChangeSize ( ReorderBufferChange * change )
{
Size sz = sizeof ( ReorderBufferChange ) ;
switch ( change - > action )
{
/* fall through these, they're all similar enough */
case REORDER_BUFFER_CHANGE_INSERT :
case REORDER_BUFFER_CHANGE_UPDATE :
case REORDER_BUFFER_CHANGE_DELETE :
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT :
{
ReorderBufferTupleBuf * oldtup ,
* newtup ;
Size oldlen = 0 ;
Size newlen = 0 ;
oldtup = change - > data . tp . oldtuple ;
newtup = change - > data . tp . newtuple ;
if ( oldtup )
{
sz + = sizeof ( HeapTupleData ) ;
oldlen = oldtup - > tuple . t_len ;
sz + = oldlen ;
}
if ( newtup )
{
sz + = sizeof ( HeapTupleData ) ;
newlen = newtup - > tuple . t_len ;
sz + = newlen ;
}
break ;
}
case REORDER_BUFFER_CHANGE_MESSAGE :
{
Size prefix_size = strlen ( change - > data . msg . prefix ) + 1 ;
sz + = prefix_size + change - > data . msg . message_size +
sizeof ( Size ) + sizeof ( Size ) ;
break ;
}
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT :
{
Snapshot snap ;
snap = change - > data . snapshot ;
sz + = sizeof ( SnapshotData ) +
sizeof ( TransactionId ) * snap - > xcnt +
sizeof ( TransactionId ) * snap - > subxcnt ;
break ;
}
case REORDER_BUFFER_CHANGE_TRUNCATE :
{
sz + = sizeof ( Oid ) * change - > data . truncate . nrelids ;
break ;
}
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM :
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID :
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID :
/* ReorderBufferChange contains everything important */
break ;
}
return sz ;
}
/*
* Restore a number of changes spilled to disk back into memory .
*/
@ -2784,6 +3020,16 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
dlist_push_tail ( & txn - > changes , & change - > node ) ;
txn - > nentries_mem + + ;
/*
* Update memory accounting for the restored change . We need to do this
* although we don ' t check the memory limit when restoring the changes in
* this branch ( we only do that when initially queueing the changes after
* decoding ) , because we will release the changes later , and that will
* update the accounting too ( subtracting the size from the counters ) . And
* we don ' t want to underflow there .
*/
ReorderBufferChangeMemoryUpdate ( rb , change , true ) ;
}
/*
@ -3003,6 +3249,19 @@ ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
*
* We cannot replace unchanged toast tuples though , so those will still point
* to on - disk toast data .
*
* While updating the existing change with detoasted tuple data , we need to
* update the memory accounting info , because the change size will differ .
* Otherwise the accounting may get out of sync , triggering serialization
* at unexpected times .
*
* We simply subtract size of the change before rejiggering the tuple , and
* then adding the new size . This makes it look like the change was removed
* and then added back , except it only tweaks the accounting info .
*
* In particular it can ' t trigger serialization , which would be pointless
* anyway as it happens during commit processing right before handing
* the change to the output plugin .
*/
static void
ReorderBufferToastReplace ( ReorderBuffer * rb , ReorderBufferTXN * txn ,
@ -3023,6 +3282,13 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
if ( txn - > toast_hash = = NULL )
return ;
/*
* We ' re going to modify the size of the change , so to make sure the
* accounting is correct we ' ll make it look like we ' re removing the change
* now ( with the old size ) , and then re - add it at the end .
*/
ReorderBufferChangeMemoryUpdate ( rb , change , false ) ;
oldcontext = MemoryContextSwitchTo ( rb - > context ) ;
/* we should only have toast tuples in an INSERT or UPDATE */
@ -3172,6 +3438,9 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
pfree ( isnull ) ;
MemoryContextSwitchTo ( oldcontext ) ;
/* now add the change back, with the correct size */
ReorderBufferChangeMemoryUpdate ( rb , change , true ) ;
}
/*