@ -131,13 +131,21 @@ typedef struct ReorderBufferTupleCidEnt
CommandId combocid ; /* just for debugging */
} ReorderBufferTupleCidEnt ;
/* Virtual file descriptor with file offset tracking */
typedef struct TXNEntryFile
{
File vfd ; /* -1 when the file is closed */
off_t curOffset ; /* offset for next write or read. Reset to 0
* when vfd is opened . */
} TXNEntryFile ;
/* k-way in-order change iteration support structures */
typedef struct ReorderBufferIterTXNEntry
{
XLogRecPtr lsn ;
ReorderBufferChange * change ;
ReorderBufferTXN * txn ;
int fd ;
TXNEntryFile file ;
XLogSegNo segno ;
} ReorderBufferIterTXNEntry ;
@ -207,7 +215,8 @@ static void AssertTXNLsnOrder(ReorderBuffer *rb);
* subtransactions
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
static ReorderBufferIterTXNState * ReorderBufferIterTXNInit ( ReorderBuffer * rb , ReorderBufferTXN * txn ) ;
static void ReorderBufferIterTXNInit ( ReorderBuffer * rb , ReorderBufferTXN * txn ,
ReorderBufferIterTXNState * volatile * iter_state ) ;
static ReorderBufferChange * ReorderBufferIterTXNNext ( ReorderBuffer * rb , ReorderBufferIterTXNState * state ) ;
static void ReorderBufferIterTXNFinish ( ReorderBuffer * rb ,
ReorderBufferIterTXNState * state ) ;
@ -223,7 +232,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
static void ReorderBufferSerializeChange ( ReorderBuffer * rb , ReorderBufferTXN * txn ,
int fd , ReorderBufferChange * change ) ;
static Size ReorderBufferRestoreChanges ( ReorderBuffer * rb , ReorderBufferTXN * txn ,
int * fd , XLogSegNo * segno ) ;
TXNEntryFile * file , XLogSegNo * segno ) ;
static void ReorderBufferRestoreChange ( ReorderBuffer * rb , ReorderBufferTXN * txn ,
char * change ) ;
static void ReorderBufferRestoreCleanup ( ReorderBuffer * rb , ReorderBufferTXN * txn ) ;
@ -996,15 +1005,23 @@ ReorderBufferIterCompare(Datum a, Datum b, void *arg)
/*
* Allocate & initialize an iterator which iterates in lsn order over a
* transaction and all its subtransactions .
*
* Note : The iterator state is returned through iter_state parameter rather
* than the function ' s return value . This is because the state gets cleaned up
* in a PG_CATCH block in the caller , so we want to make sure the caller gets
* back the state even if this function throws an exception .
*/
static ReorderBufferIterTXNState *
ReorderBufferIterTXNInit ( ReorderBuffer * rb , ReorderBufferTXN * txn )
static void
ReorderBufferIterTXNInit ( ReorderBuffer * rb , ReorderBufferTXN * txn ,
ReorderBufferIterTXNState * volatile * iter_state )
{
Size nr_txns = 0 ;
ReorderBufferIterTXNState * state ;
dlist_iter cur_txn_i ;
int32 off ;
* iter_state = NULL ;
/*
* Calculate the size of our heap : one element for every transaction that
* contains changes . ( Besides the transactions already in the reorder
@ -1039,7 +1056,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
for ( off = 0 ; off < state - > nr_txns ; off + + )
{
state - > entries [ off ] . fd = - 1 ;
state - > entries [ off ] . file . vf d = - 1 ;
state - > entries [ off ] . segno = 0 ;
}
@ -1048,6 +1065,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
ReorderBufferIterCompare ,
state ) ;
/* Now that the state fields are initialized, it is safe to return it. */
* iter_state = state ;
/*
* Now insert items into the binary heap , in an unordered fashion . ( We
* will run a heap assembly step at the end ; this is more efficient . )
@ -1064,7 +1084,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
{
/* serialize remaining changes */
ReorderBufferSerializeTXN ( rb , txn ) ;
ReorderBufferRestoreChanges ( rb , txn , & state - > entries [ off ] . fd ,
ReorderBufferRestoreChanges ( rb , txn , & state - > entries [ off ] . file ,
& state - > entries [ off ] . segno ) ;
}
@ -1094,7 +1114,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* serialize remaining changes */
ReorderBufferSerializeTXN ( rb , cur_txn ) ;
ReorderBufferRestoreChanges ( rb , cur_txn ,
& state - > entries [ off ] . fd ,
& state - > entries [ off ] . file ,
& state - > entries [ off ] . segno ) ;
}
cur_change = dlist_head_element ( ReorderBufferChange , node ,
@ -1110,8 +1130,6 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
/* assemble a valid binary heap */
binaryheap_build ( state - > heap ) ;
return state ;
}
/*
@ -1175,7 +1193,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
dlist_delete ( & change - > node ) ;
dlist_push_tail ( & state - > old_change , & change - > node ) ;
if ( ReorderBufferRestoreChanges ( rb , entry - > txn , & entry - > fd ,
if ( ReorderBufferRestoreChanges ( rb , entry - > txn , & entry - > file ,
& state - > entries [ off ] . segno ) )
{
/* successfully restored changes from disk */
@ -1214,8 +1232,8 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb,
for ( off = 0 ; off < state - > nr_txns ; off + + )
{
if ( state - > entries [ off ] . fd ! = - 1 )
CloseTransientFil e ( state - > entries [ off ] . fd ) ;
if ( state - > entries [ off ] . file . vf d ! = - 1 )
File Close( state - > entries [ off ] . file . v fd) ;
}
/* free memory we might have "leaked" in the last *Next call */
@ -1558,7 +1576,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
rb - > begin ( rb , txn ) ;
iterstate = ReorderBufferIterTXNInit ( rb , txn ) ;
ReorderBufferIterTXNInit ( rb , txn , & iterstate ) ;
while ( ( change = ReorderBufferIterTXNNext ( rb , iterstate ) ) ! = NULL )
{
Relation relation = NULL ;
@ -2765,11 +2783,12 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
*/
static Size
ReorderBufferRestoreChanges ( ReorderBuffer * rb , ReorderBufferTXN * txn ,
int * fd , XLogSegNo * segno )
TXNEntryFile * file , XLogSegNo * segno )
{
Size restored = 0 ;
XLogSegNo last_segno ;
dlist_mutable_iter cleanup_iter ;
File * fd = & file - > vfd ;
Assert ( txn - > first_lsn ! = InvalidXLogRecPtr ) ;
Assert ( txn - > final_lsn ! = InvalidXLogRecPtr ) ;
@ -2810,7 +2829,11 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
ReorderBufferSerializedPath ( path , MyReplicationSlot , txn - > xid ,
* segno ) ;
* fd = OpenTransientFile ( path , O_RDONLY | PG_BINARY ) ;
* fd = PathNameOpenFile ( path , O_RDONLY | PG_BINARY ) ;
/* No harm in resetting the offset even in case of failure */
file - > curOffset = 0 ;
if ( * fd < 0 & & errno = = ENOENT )
{
* fd = - 1 ;
@ -2830,14 +2853,14 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
* end of this file .
*/
ReorderBufferSerializeReserve ( rb , sizeof ( ReorderBufferDiskChange ) ) ;
pgstat_report_wait_start ( WAIT_EVENT_REORDER_BUFFER_READ ) ;
readBytes = read ( * fd , rb - > outbuf , sizeof ( ReorderBufferDiskChange ) ) ;
pgstat_report_wait_end ( ) ;
readBytes = FileRead ( file - > vfd , rb - > outbuf ,
sizeof ( ReorderBufferDiskChange ) ,
file - > curOffset , WAIT_EVENT_REORDER_BUFFER_READ ) ;
/* eof */
if ( readBytes = = 0 )
{
CloseTransientFil e ( * fd ) ;
File Close( * fd ) ;
* fd = - 1 ;
( * segno ) + + ;
continue ;
@ -2853,16 +2876,19 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
readBytes ,
( uint32 ) sizeof ( ReorderBufferDiskChange ) ) ) ) ;
file - > curOffset + = readBytes ;
ondisk = ( ReorderBufferDiskChange * ) rb - > outbuf ;
ReorderBufferSerializeReserve ( rb ,
sizeof ( ReorderBufferDiskChange ) + ondisk - > size ) ;
ondisk = ( ReorderBufferDiskChange * ) rb - > outbuf ;
pgstat_report_wait_start ( WAIT_EVENT_REORDER_BUFFER_READ ) ;
readBytes = read ( * fd , rb - > outbuf + sizeof ( ReorderBufferDiskChange ) ,
ondisk - > size - sizeof ( ReorderBufferDiskChange ) ) ;
pgstat_report_wait_end ( ) ;
readBytes = FileRead ( file - > vfd ,
rb - > outbuf + sizeof ( ReorderBufferDiskChange ) ,
ondisk - > size - sizeof ( ReorderBufferDiskChange ) ,
file - > curOffset ,
WAIT_EVENT_REORDER_BUFFER_READ ) ;
if ( readBytes < 0 )
ereport ( ERROR ,
@ -2875,6 +2901,8 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
readBytes ,
( uint32 ) ( ondisk - > size - sizeof ( ReorderBufferDiskChange ) ) ) ) ) ;
file - > curOffset + = readBytes ;
/*
* ok , read a full change from disk , now restore it into proper
* in - memory format