@ -224,6 +224,8 @@ static void maybe_reread_subscription(void);
/* prototype needed because of stream_commit */
static void apply_dispatch ( StringInfo s ) ;
static void apply_handle_commit_internal ( StringInfo s ,
LogicalRepCommitData * commit_data ) ;
static void apply_handle_insert_internal ( ResultRelInfo * relinfo ,
EState * estate , TupleTableSlot * remoteslot ) ;
static void apply_handle_update_internal ( ResultRelInfo * relinfo ,
@ -709,29 +711,7 @@ apply_handle_commit(StringInfo s)
Assert ( commit_data . commit_lsn = = remote_final_lsn ) ;
/* The synchronization worker runs in single transaction. */
if ( IsTransactionState ( ) & & ! am_tablesync_worker ( ) )
{
/*
* Update origin state so we can restart streaming from correct
* position in case of crash .
*/
replorigin_session_origin_lsn = commit_data . end_lsn ;
replorigin_session_origin_timestamp = commit_data . committime ;
CommitTransactionCommand ( ) ;
pgstat_report_stat ( false ) ;
store_flush_position ( commit_data . end_lsn ) ;
}
else
{
/* Process any invalidation messages that might have accumulated. */
AcceptInvalidationMessages ( ) ;
maybe_reread_subscription ( ) ;
}
in_remote_transaction = false ;
apply_handle_commit_internal ( s , & commit_data ) ;
/* Process any tables that are being synchronized in parallel. */
process_syncing_tables ( commit_data . end_lsn ) ;
@ -772,8 +752,10 @@ apply_handle_stream_start(StringInfo s)
/*
* Start a transaction on stream start , this transaction will be committed
* on the stream stop . We need the transaction for handling the buffile ,
* used for serializing the streaming data and subxact info .
* on the stream stop unless it is a tablesync worker in which case it will
* be committed after processing all the messages . We need the transaction
* for handling the buffile , used for serializing the streaming data and
* subxact info .
*/
ensure_transaction ( ) ;
@ -825,8 +807,12 @@ apply_handle_stream_stop(StringInfo s)
/* We must be in a valid transaction state */
Assert ( IsTransactionState ( ) ) ;
/* Commit the per-stream transaction */
CommitTransactionCommand ( ) ;
/* The synchronization worker runs in single transaction. */
if ( ! am_tablesync_worker ( ) )
{
/* Commit the per-stream transaction */
CommitTransactionCommand ( ) ;
}
in_streamed_transaction = false ;
@ -902,7 +888,10 @@ apply_handle_stream_abort(StringInfo s)
{
/* Cleanup the subxact info */
cleanup_subxact_info ( ) ;
CommitTransactionCommand ( ) ;
/* The synchronization worker runs in single transaction */
if ( ! am_tablesync_worker ( ) )
CommitTransactionCommand ( ) ;
return ;
}
@ -928,7 +917,9 @@ apply_handle_stream_abort(StringInfo s)
/* write the updated subxact list */
subxact_info_write ( MyLogicalRepWorker - > subid , xid ) ;
CommitTransactionCommand ( ) ;
if ( ! am_tablesync_worker ( ) )
CommitTransactionCommand ( ) ;
}
}
@ -1048,35 +1039,54 @@ apply_handle_stream_commit(StringInfo s)
BufFileClose ( fd ) ;
/*
* Update origin state so we can restart streaming from correct position
* in case of crash .
*/
replorigin_session_origin_lsn = commit_data . end_lsn ;
replorigin_session_origin_timestamp = commit_data . committime ;
pfree ( buffer ) ;
pfree ( s2 . data ) ;
CommitTransactionCommand ( ) ;
pgstat_report_stat ( false ) ;
store_flush_position ( commit_data . end_lsn ) ;
elog ( DEBUG1 , " replayed %d (all) changes from file \" %s \" " ,
nchanges , path ) ;
in_remote_transaction = false ;
/* Process any tables that are being synchronized in parallel. */
process_syncing_tables ( commit_data . end_lsn ) ;
apply_handle_commit_internal ( s , & commit_data ) ;
/* unlink the files with serialized changes and subxact info */
stream_cleanup_files ( MyLogicalRepWorker - > subid , xid ) ;
/* Process any tables that are being synchronized in parallel. */
process_syncing_tables ( commit_data . end_lsn ) ;
pgstat_report_activity ( STATE_IDLE , NULL ) ;
}
/*
* Helper function for apply_handle_commit and apply_handle_stream_commit .
*/
static void
apply_handle_commit_internal ( StringInfo s , LogicalRepCommitData * commit_data )
{
/* The synchronization worker runs in single transaction. */
if ( IsTransactionState ( ) & & ! am_tablesync_worker ( ) )
{
/*
* Update origin state so we can restart streaming from correct
* position in case of crash .
*/
replorigin_session_origin_lsn = commit_data - > end_lsn ;
replorigin_session_origin_timestamp = commit_data - > committime ;
CommitTransactionCommand ( ) ;
pgstat_report_stat ( false ) ;
store_flush_position ( commit_data - > end_lsn ) ;
}
else
{
/* Process any invalidation messages that might have accumulated. */
AcceptInvalidationMessages ( ) ;
maybe_reread_subscription ( ) ;
}
in_remote_transaction = false ;
}
/*
* Handle RELATION message .
*