@ -236,20 +236,6 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
. ts = 0 ,
} ;
/*
* Stream xid hash entry . Whenever we see a new xid we create this entry in the
* xidhash and along with it create the streaming file and store the fileset handle .
* The subxact file is created iff there is any subxact info under this xid . This
* entry is used on the subsequent streams for the xid to get the corresponding
* fileset handles , so storing them in hash makes the search faster .
*/
typedef struct StreamXidHash
{
TransactionId xid ; /* xid is the hash key and must be first */
FileSet * stream_fileset ; /* file set for stream data */
FileSet * subxact_fileset ; /* file set for subxact info */
} StreamXidHash ;
static MemoryContext ApplyMessageContext = NULL ;
MemoryContext ApplyContext = NULL ;
@ -269,12 +255,6 @@ static bool in_streamed_transaction = false;
static TransactionId stream_xid = InvalidTransactionId ;
/*
* Hash table for storing the streaming xid information along with filesets
* for streaming and subxact files .
*/
static HTAB * xidhash = NULL ;
/* BufFile handle of the current streaming file */
static BufFile * stream_fd = NULL ;
@ -1118,7 +1098,6 @@ static void
apply_handle_stream_start ( StringInfo s )
{
bool first_segment ;
HASHCTL hash_ctl ;
if ( in_streamed_transaction )
ereport ( ERROR ,
@ -1148,17 +1127,23 @@ apply_handle_stream_start(StringInfo s)
set_apply_error_context_xact ( stream_xid , 0 ) ;
/*
* Initialize the xidhash table if we haven ' t yet . This will be used for
* the entire duration of the apply worker so create it in permanent
* context .
* Initialize the worker ' s stream_fileset if we haven ' t yet . This will be
* used for the entire duration of the worker so create it in a permanent
* context . We create this on the very first streaming message from any
* transaction and then use it for this and other streaming transactions .
* Now , we could create a fileset at the start of the worker as well but
* then we won ' t be sure that it will ever be used .
*/
if ( xidhash = = NULL )
if ( MyLogicalRepWorker - > stream_fileset = = NULL )
{
hash_ctl . keysize = sizeof ( TransactionId ) ;
hash_ctl . entrysize = sizeof ( StreamXidHash ) ;
hash_ctl . hcxt = ApplyContext ;
xidhash = hash_create ( " StreamXidHash " , 1024 , & hash_ctl ,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT ) ;
MemoryContext oldctx ;
oldctx = MemoryContextSwitchTo ( ApplyContext ) ;
MyLogicalRepWorker - > stream_fileset = palloc ( sizeof ( FileSet ) ) ;
FileSetInit ( MyLogicalRepWorker - > stream_fileset ) ;
MemoryContextSwitchTo ( oldctx ) ;
}
/* open the spool file for this transaction */
@ -1253,7 +1238,6 @@ apply_handle_stream_abort(StringInfo s)
BufFile * fd ;
bool found = false ;
char path [ MAXPGPATH ] ;
StreamXidHash * ent ;
set_apply_error_context_xact ( subxid , 0 ) ;
@ -1285,19 +1269,10 @@ apply_handle_stream_abort(StringInfo s)
return ;
}
ent = ( StreamXidHash * ) hash_search ( xidhash ,
( void * ) & xid ,
HASH_FIND ,
NULL ) ;
if ( ! ent )
ereport ( ERROR ,
( errcode ( ERRCODE_PROTOCOL_VIOLATION ) ,
errmsg_internal ( " transaction %u not found in stream XID hash table " ,
xid ) ) ) ;
/* open the changes file */
changes_filename ( path , MyLogicalRepWorker - > subid , xid ) ;
fd = BufFileOpenFileSet ( ent - > stream_fileset , path , O_RDWR ) ;
fd = BufFileOpenFileSet ( MyLogicalRepWorker - > stream_fileset , path ,
O_RDWR , false ) ;
/* OK, truncate the file at the right offset */
BufFileTruncateFileSet ( fd , subxact_data . subxacts [ subidx ] . fileno ,
@ -1327,7 +1302,6 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
int nchanges ;
char path [ MAXPGPATH ] ;
char * buffer = NULL ;
StreamXidHash * ent ;
MemoryContext oldcxt ;
BufFile * fd ;
@ -1345,17 +1319,8 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
changes_filename ( path , MyLogicalRepWorker - > subid , xid ) ;
elog ( DEBUG1 , " replaying changes from file \" %s \" " , path ) ;
ent = ( StreamXidHash * ) hash_search ( xidhash ,
( void * ) & xid ,
HASH_FIND ,
NULL ) ;
if ( ! ent )
ereport ( ERROR ,
( errcode ( ERRCODE_PROTOCOL_VIOLATION ) ,
errmsg_internal ( " transaction %u not found in stream XID hash table " ,
xid ) ) ) ;
fd = BufFileOpenFileSet ( ent - > stream_fileset , path , O_RDONLY ) ;
fd = BufFileOpenFileSet ( MyLogicalRepWorker - > stream_fileset , path , O_RDONLY ,
false ) ;
buffer = palloc ( BLCKSZ ) ;
initStringInfo ( & s2 ) ;
@ -2541,30 +2506,6 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
}
}
/*
* Cleanup filesets .
*/
void
logicalrep_worker_cleanupfileset ( void )
{
HASH_SEQ_STATUS status ;
StreamXidHash * hentry ;
/* Remove all the pending stream and subxact filesets. */
if ( xidhash )
{
hash_seq_init ( & status , xidhash ) ;
while ( ( hentry = ( StreamXidHash * ) hash_seq_search ( & status ) ) ! = NULL )
{
FileSetDeleteAll ( hentry - > stream_fileset ) ;
/* Delete the subxact fileset iff it is created. */
if ( hentry - > subxact_fileset )
FileSetDeleteAll ( hentry - > subxact_fileset ) ;
}
}
}
/*
* Apply main loop .
*/
@ -3026,58 +2967,30 @@ subxact_info_write(Oid subid, TransactionId xid)
{
char path [ MAXPGPATH ] ;
Size len ;
StreamXidHash * ent ;
BufFile * fd ;
Assert ( TransactionIdIsValid ( xid ) ) ;
/* Find the xid entry in the xidhash */
ent = ( StreamXidHash * ) hash_search ( xidhash ,
( void * ) & xid ,
HASH_FIND ,
NULL ) ;
/* By this time we must have created the transaction entry */
Assert ( ent ) ;
/* construct the subxact filename */
subxact_filename ( path , subid , xid ) ;
/*
* If there is no subtransaction then nothing to do , but if already have
* subxact file then delete that .
*/
/* Delete the subxacts file, if exists. */
if ( subxact_data . nsubxacts = = 0 )
{
if ( ent - > subxact_fileset )
{
cleanup_subxact_info ( ) ;
FileSetDeleteAll ( ent - > subxact_fileset ) ;
pfree ( ent - > subxact_fileset ) ;
ent - > subxact_fileset = NULL ;
}
cleanup_subxact_info ( ) ;
BufFileDeleteFileSet ( MyLogicalRepWorker - > stream_fileset , path , true ) ;
return ;
}
subxact_filename ( path , subid , xid ) ;
/*
* Create the subxact file if it not already created , otherwise open the
* existing file .
*/
if ( ent - > subxact_fileset = = NULL )
{
MemoryContext oldctx ;
/*
* We need to maintain fileset across multiple stream start / stop
* calls . So , need to allocate it in a persistent context .
*/
oldctx = MemoryContextSwitchTo ( ApplyContext ) ;
ent - > subxact_fileset = palloc ( sizeof ( FileSet ) ) ;
FileSetInit ( ent - > subxact_fileset ) ;
MemoryContextSwitchTo ( oldctx ) ;
fd = BufFileCreateFileSet ( ent - > subxact_fileset , path ) ;
}
else
fd = BufFileOpenFileSet ( ent - > subxact_fileset , path , O_RDWR ) ;
fd = BufFileOpenFileSet ( MyLogicalRepWorker - > stream_fileset , path , O_RDWR ,
true ) ;
if ( fd = = NULL )
fd = BufFileCreateFileSet ( MyLogicalRepWorker - > stream_fileset , path ) ;
len = sizeof ( SubXactInfo ) * subxact_data . nsubxacts ;
@ -3104,34 +3017,21 @@ subxact_info_read(Oid subid, TransactionId xid)
char path [ MAXPGPATH ] ;
Size len ;
BufFile * fd ;
StreamXidHash * ent ;
MemoryContext oldctx ;
Assert ( ! subxact_data . subxacts ) ;
Assert ( subxact_data . nsubxacts = = 0 ) ;
Assert ( subxact_data . nsubxacts_max = = 0 ) ;
/* Find the stream xid entry in the xidhash */
ent = ( StreamXidHash * ) hash_search ( xidhash ,
( void * ) & xid ,
HASH_FIND ,
NULL ) ;
if ( ! ent )
ereport ( ERROR ,
( errcode ( ERRCODE_PROTOCOL_VIOLATION ) ,
errmsg_internal ( " transaction %u not found in stream XID hash table " ,
xid ) ) ) ;
/*
* If subxact_fileset is not valid that mean we don ' t have any subxact
* info
* If the subxact file doesn ' t exist that means we don ' t have any subxact
* info .
*/
if ( ent - > subxact_fileset = = NULL )
return ;
subxact_filename ( path , subid , xid ) ;
fd = BufFileOpenFileSet ( ent - > subxact_fileset , path , O_RDONLY ) ;
fd = BufFileOpenFileSet ( MyLogicalRepWorker - > stream_fileset , path , O_RDONLY ,
true ) ;
if ( fd = = NULL )
return ;
/* read number of subxact items */
if ( BufFileRead ( fd , & subxact_data . nsubxacts ,
@ -3267,42 +3167,21 @@ changes_filename(char *path, Oid subid, TransactionId xid)
* Cleanup files for a subscription / toplevel transaction .
*
* Remove files with serialized changes and subxact info for a particular
* toplevel transaction . Each subscription has a separate set of files .
* toplevel transaction . Each subscription has a separate set of files
* for any toplevel transaction .
*/
static void
stream_cleanup_files ( Oid subid , TransactionId xid )
{
char path [ MAXPGPATH ] ;
StreamXidHash * ent ;
/* Find the xid entry in the xidhash */
ent = ( StreamXidHash * ) hash_search ( xidhash ,
( void * ) & xid ,
HASH_FIND ,
NULL ) ;
if ( ! ent )
ereport ( ERROR ,
( errcode ( ERRCODE_PROTOCOL_VIOLATION ) ,
errmsg_internal ( " transaction %u not found in stream XID hash table " ,
xid ) ) ) ;
/* Delete the change file and release the stream fileset memory */
/* Delete the changes file. */
changes_filename ( path , subid , xid ) ;
FileSetDeleteAll ( ent - > stream_fileset ) ;
pfree ( ent - > stream_fileset ) ;
ent - > stream_fileset = NULL ;
BufFileDeleteFileSet ( MyLogicalRepWorker - > stream_fileset , path , false ) ;
/* Delete the subxact file and release the memory, if it exist */
if ( ent - > subxact_fileset )
{
subxact_filename ( path , subid , xid ) ;
FileSetDeleteAll ( ent - > subxact_fileset ) ;
pfree ( ent - > subxact_fileset ) ;
ent - > subxact_fileset = NULL ;
}
/* Remove the xid entry from the stream xid hash */
hash_search ( xidhash , ( void * ) & xid , HASH_REMOVE , NULL ) ;
/* Delete the subxact file, if it exists. */
subxact_filename ( path , subid , xid ) ;
BufFileDeleteFileSet ( MyLogicalRepWorker - > stream_fileset , path , true ) ;
}
/*
@ -3312,8 +3191,8 @@ stream_cleanup_files(Oid subid, TransactionId xid)
*
* Open a file for streamed changes from a toplevel transaction identified
* by stream_xid ( global variable ) . If it ' s the first chunk of streamed
* changes for this transaction , initialize the fileset and create the buffile ,
* otherwise open the previously created file .
* changes for this transaction , create the buffile , otherwise open the
* previously created file .
*
* This can only be called at the beginning of a " streaming " block , i . e .
* between stream_start / stream_stop messages from the upstream .
@ -3322,20 +3201,13 @@ static void
stream_open_file ( Oid subid , TransactionId xid , bool first_segment )
{
char path [ MAXPGPATH ] ;
bool found ;
MemoryContext oldcxt ;
StreamXidHash * ent ;
Assert ( in_streamed_transaction ) ;
Assert ( OidIsValid ( subid ) ) ;
Assert ( TransactionIdIsValid ( xid ) ) ;
Assert ( stream_fd = = NULL ) ;
/* create or find the xid entry in the xidhash */
ent = ( StreamXidHash * ) hash_search ( xidhash ,
( void * ) & xid ,
HASH_ENTER ,
& found ) ;
changes_filename ( path , subid , xid ) ;
elog ( DEBUG1 , " opening file \" %s \" for streamed changes " , path ) ;
@ -3347,49 +3219,20 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
oldcxt = MemoryContextSwitchTo ( LogicalStreamingContext ) ;
/*
* If this is the first streamed segment , the file must not exist , so make
* sure we ' re the ones creating it . Otherwise just open the file for
* writing , in append mode .
* If this is the first streamed segment , create the changes file .
* Otherwise , just open the file for writing , in append mode .
*/
if ( first_segment )
{
MemoryContext savectx ;
FileSet * fileset ;
if ( found )
ereport ( ERROR ,
( errcode ( ERRCODE_PROTOCOL_VIOLATION ) ,
errmsg_internal ( " incorrect first-segment flag for streamed replication transaction " ) ) ) ;
/*
* We need to maintain fileset across multiple stream start / stop
* calls . So , need to allocate it in a persistent context .
*/
savectx = MemoryContextSwitchTo ( ApplyContext ) ;
fileset = palloc ( sizeof ( FileSet ) ) ;
FileSetInit ( fileset ) ;
MemoryContextSwitchTo ( savectx ) ;
stream_fd = BufFileCreateFileSet ( fileset , path ) ;
/* Remember the fileset for the next stream of the same transaction */
ent - > xid = xid ;
ent - > stream_fileset = fileset ;
ent - > subxact_fileset = NULL ;
}
stream_fd = BufFileCreateFileSet ( MyLogicalRepWorker - > stream_fileset ,
path ) ;
else
{
if ( ! found )
ereport ( ERROR ,
( errcode ( ERRCODE_PROTOCOL_VIOLATION ) ,
errmsg_internal ( " incorrect first-segment flag for streamed replication transaction " ) ) ) ;
/*
* Open the file and seek to the end of the file because we always
* append the changes file .
*/
stream_fd = BufFileOpenFileSet ( ent - > stream_fileset , path , O_RDWR ) ;
stream_fd = BufFileOpenFileSet ( MyLogicalRepWorker - > stream_fileset ,
path , O_RDWR , false ) ;
BufFileSeek ( stream_fd , 0 , 0 , SEEK_END ) ;
}