@ -77,6 +77,40 @@
* a bit more memory to the oldest subtransactions , because it ' s likely
* they are the source for the next sequence of changes .
*
* When decoding sequences , we differentiate between a sequences created
* in a ( running ) transaction , and sequences created in other ( already
* committed ) transactions . Changes for sequences created in the same
* top - level transaction are treated as " transactional " i . e . just like
* any other change from that transaction ( and discarded in case of a
* rollback ) . Changes for sequences created earlier are treated as not
* transactional - are processed immediately , as if performed outside
* any transaction ( and thus not rolled back ) .
*
* This mixed behavior is necessary - sequences are non - transactional
* ( e . g . ROLLBACK does not undo the sequence increments ) . But for new
* sequences , we need to handle them in a transactional way , because if
* we ever get some DDL support , the sequence won ' t exist until the
* transaction gets applied . So we need to ensure the increments don ' t
* happen until the sequence gets created .
*
* To differentiate which sequences are " old " and which were created
* in a still - running transaction , we track sequences created in running
* transactions in a hash table . Sequences are identified by relfilenode ,
* and we track XID of the ( sub ) transaction that created it . This means
* that if a transaction does something that changes the relfilenode
* ( like an alter / reset of a sequence ) , the new relfilenode will be
* treated as if created in the transaction . The list of sequences gets
* discarded when the transaction completes ( commit / rollback ) .
*
* We don ' t use the XID to check if it ' s the same top - level transaction .
* It ' s enough to know it was created in an in - progress transaction ,
* and we know it must be the current one because otherwise it wouldn ' t
* see the sequence object .
*
* The XID may be valid even for non - transactional sequences - we simply
* keep the XID logged to WAL , it ' s up to the reorderbuffer to decide if
* the increment is transactional .
*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
# include "postgres.h"
@ -91,6 +125,7 @@
# include "access/xact.h"
# include "access/xlog_internal.h"
# include "catalog/catalog.h"
# include "commands/sequence.h"
# include "lib/binaryheap.h"
# include "miscadmin.h"
# include "pgstat.h"
@ -116,6 +151,13 @@ typedef struct ReorderBufferTXNByIdEnt
ReorderBufferTXN * txn ;
} ReorderBufferTXNByIdEnt ;
/* entry for hash table we use to track sequences created in running xacts */
typedef struct ReorderBufferSequenceEnt
{
RelFileNode rnode ;
TransactionId xid ;
} ReorderBufferSequenceEnt ;
/* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
typedef struct ReorderBufferTupleCidKey
{
@ -339,6 +381,14 @@ ReorderBufferAllocate(void)
buffer - > by_txn = hash_create ( " ReorderBufferByXid " , 1000 , & hash_ctl ,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT ) ;
/* hash table of sequences, mapping relfilenode to XID of transaction */
hash_ctl . keysize = sizeof ( RelFileNode ) ;
hash_ctl . entrysize = sizeof ( ReorderBufferSequenceEnt ) ;
hash_ctl . hcxt = buffer - > context ;
buffer - > sequences = hash_create ( " ReorderBufferSequenceHash " , 1000 , & hash_ctl ,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT ) ;
buffer - > by_txn_last_xid = InvalidTransactionId ;
buffer - > by_txn_last_txn = NULL ;
@ -525,6 +575,13 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
change - > data . truncate . relids = NULL ;
}
break ;
case REORDER_BUFFER_CHANGE_SEQUENCE :
if ( change - > data . sequence . tuple )
{
ReorderBufferReturnTupleBuf ( rb , change - > data . sequence . tuple ) ;
change - > data . sequence . tuple = NULL ;
}
break ;
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM :
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT :
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID :
@ -859,6 +916,230 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
}
}
/*
* Treat the sequence increment as transactional ?
*
* The hash table tracks all sequences created in in - progress transactions ,
* so we simply do a lookup ( the sequence is identified by relfilende ) . If
* we find a match , the increment should be handled as transactional .
*/
bool
ReorderBufferSequenceIsTransactional ( ReorderBuffer * rb ,
RelFileNode rnode , bool created )
{
bool found = false ;
if ( created )
return true ;
hash_search ( rb - > sequences ,
( void * ) & rnode ,
HASH_FIND ,
& found ) ;
return found ;
}
/*
* Cleanup sequences created in in - progress transactions .
*
* There ' s no way to search by XID , so we simply do a seqscan of all
* the entries in the hash table . Hopefully there are only a couple
* entries in most cases - people generally don ' t create many new
* sequences over and over .
*/
static void
ReorderBufferSequenceCleanup ( ReorderBuffer * rb , TransactionId xid )
{
HASH_SEQ_STATUS scan_status ;
ReorderBufferSequenceEnt * ent ;
hash_seq_init ( & scan_status , rb - > sequences ) ;
while ( ( ent = ( ReorderBufferSequenceEnt * ) hash_seq_search ( & scan_status ) ) ! = NULL )
{
/* skip sequences not from this transaction */
if ( ent - > xid ! = xid )
continue ;
( void ) hash_search ( rb - > sequences ,
( void * ) & ( ent - > rnode ) ,
HASH_REMOVE , NULL ) ;
}
}
/*
* A transactional sequence increment is queued to be processed upon commit
* and a non - transactional increment gets processed immediately .
*
* A sequence update may be both transactional and non - transactional . When
* created in a running transaction , treat it as transactional and queue
* the change in it . Otherwise treat it as non - transactional , so that we
* don ' t forget the increment in case of a rollback .
*/
void
ReorderBufferQueueSequence ( ReorderBuffer * rb , TransactionId xid ,
Snapshot snapshot , XLogRecPtr lsn , RepOriginId origin_id ,
RelFileNode rnode , bool transactional , bool created ,
ReorderBufferTupleBuf * tuplebuf )
{
/*
* Change needs to be handled as transactional , because the sequence was
* created in a transaction that is still running . In that case all the
* changes need to be queued in that transaction , we must not send them
* to the downstream until the transaction commits .
*
* There ' s a bit of a trouble with subtransactions - we can ' t queue it
* into the subxact , because it might be rolled back and we ' d lose the
* increment . We need to queue it into the same ( sub ) xact that created
* the sequence , which is why we track the XID in the hash table .
*/
if ( transactional )
{
MemoryContext oldcontext ;
ReorderBufferChange * change ;
/* lookup sequence by relfilenode */
ReorderBufferSequenceEnt * ent ;
bool found ;
/* transactional changes require a transaction */
Assert ( xid ! = InvalidTransactionId ) ;
/* search the lookup table (we ignore the return value, found is enough) */
ent = hash_search ( rb - > sequences ,
( void * ) & rnode ,
created ? HASH_ENTER : HASH_FIND ,
& found ) ;
/*
* If this is the " create " increment , we must not have found any
* pre - existing entry in the hash table ( i . e . there must not be
* any conflicting sequence ) .
*/
Assert ( ! ( created & & found ) ) ;
/* But we must have either created or found an existing entry. */
Assert ( created | | found ) ;
/*
* When creating the sequence , remember the XID of the transaction
* that created id .
*/
if ( created )
ent - > xid = xid ;
/* XXX Maybe check that we're still in the same top-level xact? */
/* OK, allocate and queue the change */
oldcontext = MemoryContextSwitchTo ( rb - > context ) ;
change = ReorderBufferGetChange ( rb ) ;
change - > action = REORDER_BUFFER_CHANGE_SEQUENCE ;
change - > origin_id = origin_id ;
memcpy ( & change - > data . sequence . relnode , & rnode , sizeof ( RelFileNode ) ) ;
change - > data . sequence . tuple = tuplebuf ;
/* add it to the same subxact that created the sequence */
ReorderBufferQueueChange ( rb , ent - > xid , lsn , change , false ) ;
MemoryContextSwitchTo ( oldcontext ) ;
}
else
{
/*
* This increment is for a sequence that was not created in any
* running transaction , so we treat it as non - transactional and
* just send it to the output plugin directly .
*/
ReorderBufferTXN * txn = NULL ;
volatile Snapshot snapshot_now = snapshot ;
bool using_subtxn ;
# ifdef USE_ASSERT_CHECKING
/* All "creates" have to be handled as transactional. */
Assert ( ! created ) ;
/* Make sure the sequence is not in the hash table. */
{
bool found ;
hash_search ( rb - > sequences ,
( void * ) & rnode ,
HASH_FIND , & found ) ;
Assert ( ! found ) ;
}
# endif
if ( xid ! = InvalidTransactionId )
txn = ReorderBufferTXNByXid ( rb , xid , true , NULL , lsn , true ) ;
/* setup snapshot to allow catalog access */
SetupHistoricSnapshot ( snapshot_now , NULL ) ;
/*
* Decoding needs access to syscaches et al . , which in turn use
* heavyweight locks and such . Thus we need to have enough state around to
* keep track of those . The easiest way is to simply use a transaction
* internally . That also allows us to easily enforce that nothing writes
* to the database by checking for xid assignments .
*
* When we ' re called via the SQL SRF there ' s already a transaction
* started , so start an explicit subtransaction there .
*/
using_subtxn = IsTransactionOrTransactionBlock ( ) ;
PG_TRY ( ) ;
{
Relation relation ;
HeapTuple tuple ;
Form_pg_sequence_data seq ;
Oid reloid ;
if ( using_subtxn )
BeginInternalSubTransaction ( " sequence " ) ;
else
StartTransactionCommand ( ) ;
reloid = RelidByRelfilenode ( rnode . spcNode , rnode . relNode ) ;
if ( reloid = = InvalidOid )
elog ( ERROR , " could not map filenode \" %s \" to relation OID " ,
relpathperm ( rnode ,
MAIN_FORKNUM ) ) ;
relation = RelationIdGetRelation ( reloid ) ;
tuple = & tuplebuf - > tuple ;
seq = ( Form_pg_sequence_data ) GETSTRUCT ( tuple ) ;
rb - > sequence ( rb , txn , lsn , relation , transactional ,
seq - > last_value , seq - > log_cnt , seq - > is_called ) ;
RelationClose ( relation ) ;
TeardownHistoricSnapshot ( false ) ;
AbortCurrentTransaction ( ) ;
if ( using_subtxn )
RollbackAndReleaseCurrentSubTransaction ( ) ;
}
PG_CATCH ( ) ;
{
TeardownHistoricSnapshot ( true ) ;
AbortCurrentTransaction ( ) ;
if ( using_subtxn )
RollbackAndReleaseCurrentSubTransaction ( ) ;
PG_RE_THROW ( ) ;
}
PG_END_TRY ( ) ;
}
}
/*
* AssertTXNLsnOrder
* Verify LSN ordering of transaction lists in the reorderbuffer
@ -1535,6 +1816,9 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
& found ) ;
Assert ( found ) ;
/* Remove sequences created in this transaction (if any). */
ReorderBufferSequenceCleanup ( rb , txn - > xid ) ;
/* remove entries spilled to disk */
if ( rbtxn_is_serialized ( txn ) )
ReorderBufferRestoreCleanup ( rb , txn ) ;
@ -1950,6 +2234,29 @@ ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn,
change - > data . msg . message ) ;
}
/*
* Helper function for ReorderBufferProcessTXN for applying sequences .
*/
static inline void
ReorderBufferApplySequence ( ReorderBuffer * rb , ReorderBufferTXN * txn ,
Relation relation , ReorderBufferChange * change ,
bool streaming )
{
HeapTuple tuple ;
Form_pg_sequence_data seq ;
tuple = & change - > data . sequence . tuple - > tuple ;
seq = ( Form_pg_sequence_data ) GETSTRUCT ( tuple ) ;
/* Only ever called from ReorderBufferApplySequence, so transational. */
if ( streaming )
rb - > stream_sequence ( rb , txn , change - > lsn , relation , true ,
seq - > last_value , seq - > log_cnt , seq - > is_called ) ;
else
rb - > sequence ( rb , txn , change - > lsn , relation , true ,
seq - > last_value , seq - > log_cnt , seq - > is_called ) ;
}
/*
* Function to store the command id and snapshot at the end of the current
* stream so that we can reuse the same while sending the next stream .
@ -2392,6 +2699,31 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID :
elog ( ERROR , " tuplecid value in changequeue " ) ;
break ;
case REORDER_BUFFER_CHANGE_SEQUENCE :
Assert ( snapshot_now ) ;
reloid = RelidByRelfilenode ( change - > data . sequence . relnode . spcNode ,
change - > data . sequence . relnode . relNode ) ;
if ( reloid = = InvalidOid )
elog ( ERROR , " could not map filenode \" %s \" to relation OID " ,
relpathperm ( change - > data . sequence . relnode ,
MAIN_FORKNUM ) ) ;
relation = RelationIdGetRelation ( reloid ) ;
if ( ! RelationIsValid ( relation ) )
elog ( ERROR , " could not open relation with OID %u (for filenode \" %s \" ) " ,
reloid ,
relpathperm ( change - > data . sequence . relnode ,
MAIN_FORKNUM ) ) ;
if ( RelationIsLogicallyLogged ( relation ) )
ReorderBufferApplySequence ( rb , txn , relation , change , streaming ) ;
RelationClose ( relation ) ;
break ;
}
}
@ -3776,6 +4108,39 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
memcpy ( data , change - > data . truncate . relids , size ) ;
data + = size ;
break ;
}
case REORDER_BUFFER_CHANGE_SEQUENCE :
{
char * data ;
ReorderBufferTupleBuf * tup ;
Size len = 0 ;
tup = change - > data . sequence . tuple ;
if ( tup )
{
sz + = sizeof ( HeapTupleData ) ;
len = tup - > tuple . t_len ;
sz + = len ;
}
/* make sure we have enough space */
ReorderBufferSerializeReserve ( rb , sz ) ;
data = ( ( char * ) rb - > outbuf ) + sizeof ( ReorderBufferDiskChange ) ;
/* might have been reallocated above */
ondisk = ( ReorderBufferDiskChange * ) rb - > outbuf ;
if ( len )
{
memcpy ( data , & tup - > tuple , sizeof ( HeapTupleData ) ) ;
data + = sizeof ( HeapTupleData ) ;
memcpy ( data , tup - > tuple . t_data , len ) ;
data + = len ;
}
break ;
}
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM :
@ -4040,6 +4405,22 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
{
sz + = sizeof ( Oid ) * change - > data . truncate . nrelids ;
break ;
}
case REORDER_BUFFER_CHANGE_SEQUENCE :
{
ReorderBufferTupleBuf * tup ;
Size len = 0 ;
tup = change - > data . sequence . tuple ;
if ( tup )
{
sz + = sizeof ( HeapTupleData ) ;
len = tup - > tuple . t_len ;
sz + = len ;
}
break ;
}
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM :
@ -4341,6 +4722,30 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
break ;
}
case REORDER_BUFFER_CHANGE_SEQUENCE :
if ( change - > data . sequence . tuple )
{
uint32 tuplelen = ( ( HeapTuple ) data ) - > t_len ;
change - > data . sequence . tuple =
ReorderBufferGetTupleBuf ( rb , tuplelen - SizeofHeapTupleHeader ) ;
/* restore ->tuple */
memcpy ( & change - > data . sequence . tuple - > tuple , data ,
sizeof ( HeapTupleData ) ) ;
data + = sizeof ( HeapTupleData ) ;
/* reset t_data pointer into the new tuplebuf */
change - > data . sequence . tuple - > tuple . t_data =
ReorderBufferTupleBufData ( change - > data . sequence . tuple ) ;
/* restore tuple data itself */
memcpy ( change - > data . sequence . tuple - > tuple . t_data , data , tuplelen ) ;
data + = tuplelen ;
}
break ;
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM :
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT :
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID :