@ -247,8 +247,10 @@ typedef struct ApplyErrorCallbackArg
* The action to be taken for the changes in the transaction .
* The action to be taken for the changes in the transaction .
*
*
* TRANS_LEADER_APPLY :
* TRANS_LEADER_APPLY :
* This action means that we are in the leader apply worker and changes of the
* This action means that we are in the leader apply worker or table sync
* transaction are applied directly by the worker .
* worker . The changes of the transaction are either directly applied or
* are read from temporary files ( for streaming transactions ) and then
* applied by the worker .
*
*
* TRANS_LEADER_SERIALIZE :
* TRANS_LEADER_SERIALIZE :
* This action means that we are in the leader apply worker or table sync
* This action means that we are in the leader apply worker or table sync
@ -1004,6 +1006,9 @@ apply_handle_begin(StringInfo s)
{
{
LogicalRepBeginData begin_data ;
LogicalRepBeginData begin_data ;
/* There must not be an active streaming transaction. */
Assert ( ! TransactionIdIsValid ( stream_xid ) ) ;
logicalrep_read_begin ( s , & begin_data ) ;
logicalrep_read_begin ( s , & begin_data ) ;
set_apply_error_context_xact ( begin_data . xid , begin_data . final_lsn ) ;
set_apply_error_context_xact ( begin_data . xid , begin_data . final_lsn ) ;
@ -1058,6 +1063,9 @@ apply_handle_begin_prepare(StringInfo s)
( errcode ( ERRCODE_PROTOCOL_VIOLATION ) ,
( errcode ( ERRCODE_PROTOCOL_VIOLATION ) ,
errmsg_internal ( " tablesync worker received a BEGIN PREPARE message " ) ) ) ;
errmsg_internal ( " tablesync worker received a BEGIN PREPARE message " ) ) ) ;
/* There must not be an active streaming transaction. */
Assert ( ! TransactionIdIsValid ( stream_xid ) ) ;
logicalrep_read_begin_prepare ( s , & begin_data ) ;
logicalrep_read_begin_prepare ( s , & begin_data ) ;
set_apply_error_context_xact ( begin_data . xid , begin_data . prepare_lsn ) ;
set_apply_error_context_xact ( begin_data . xid , begin_data . prepare_lsn ) ;
@ -1301,7 +1309,7 @@ apply_handle_stream_prepare(StringInfo s)
switch ( apply_action )
switch ( apply_action )
{
{
case TRANS_LEADER_SERIALIZE :
case TRANS_LEADER_APPLY :
/*
/*
* The transaction has been serialized to file , so replay all the
* The transaction has been serialized to file , so replay all the
@ -1384,7 +1392,7 @@ apply_handle_stream_prepare(StringInfo s)
break ;
break ;
default :
default :
Assert ( false ) ;
elog ( ERROR , " unexpected apply action: %d " , ( int ) apply_action ) ;
break ;
break ;
}
}
@ -1484,6 +1492,9 @@ apply_handle_stream_start(StringInfo s)
( errcode ( ERRCODE_PROTOCOL_VIOLATION ) ,
( errcode ( ERRCODE_PROTOCOL_VIOLATION ) ,
errmsg_internal ( " duplicate STREAM START message " ) ) ) ;
errmsg_internal ( " duplicate STREAM START message " ) ) ) ;
/* There must not be an active streaming transaction. */
Assert ( ! TransactionIdIsValid ( stream_xid ) ) ;
/* notify handle methods we're processing a remote transaction */
/* notify handle methods we're processing a remote transaction */
in_streamed_transaction = true ;
in_streamed_transaction = true ;
@ -1589,7 +1600,7 @@ apply_handle_stream_start(StringInfo s)
break ;
break ;
default :
default :
Assert ( false ) ;
elog ( ERROR , " unexpected apply action: %d " , ( int ) apply_action ) ;
break ;
break ;
}
}
@ -1705,11 +1716,12 @@ apply_handle_stream_stop(StringInfo s)
break ;
break ;
default :
default :
Assert ( false ) ;
elog ( ERROR , " unexpected apply action: %d " , ( int ) apply_action ) ;
break ;
break ;
}
}
in_streamed_transaction = false ;
in_streamed_transaction = false ;
stream_xid = InvalidTransactionId ;
/*
/*
* The parallel apply worker could be in a transaction in which case we
* The parallel apply worker could be in a transaction in which case we
@ -1842,7 +1854,7 @@ apply_handle_stream_abort(StringInfo s)
switch ( apply_action )
switch ( apply_action )
{
{
case TRANS_LEADER_SERIALIZE :
case TRANS_LEADER_APPLY :
/*
/*
* We are in the leader apply worker and the transaction has been
* We are in the leader apply worker and the transaction has been
@ -1957,7 +1969,7 @@ apply_handle_stream_abort(StringInfo s)
break ;
break ;
default :
default :
Assert ( false ) ;
elog ( ERROR , " unexpected apply action: %d " , ( int ) apply_action ) ;
break ;
break ;
}
}
@ -2154,7 +2166,7 @@ apply_handle_stream_commit(StringInfo s)
switch ( apply_action )
switch ( apply_action )
{
{
case TRANS_LEADER_SERIALIZE :
case TRANS_LEADER_APPLY :
/*
/*
* The transaction has been serialized to file , so replay all the
* The transaction has been serialized to file , so replay all the
@ -2226,7 +2238,7 @@ apply_handle_stream_commit(StringInfo s)
break ;
break ;
default :
default :
Assert ( false ) ;
elog ( ERROR , " unexpected apply action: %d " , ( int ) apply_action ) ;
break ;
break ;
}
}
@ -4204,7 +4216,6 @@ stream_close_file(void)
BufFileClose ( stream_fd ) ;
BufFileClose ( stream_fd ) ;
stream_xid = InvalidTransactionId ;
stream_fd = NULL ;
stream_fd = NULL ;
}
}
@ -4977,10 +4988,12 @@ set_apply_error_context_origin(char *originname)
}
}
/*
/*
* Return the action to be taken for the given transaction . * winfo is
* Return the action to be taken for the given transaction . See
* assigned to the destination parallel worker info when the leader apply
* TransApplyAction for information on each of the actions .
* worker has to pass all the transaction ' s changes to the parallel apply
*
* worker .
* * winfo is assigned to the destination parallel worker info when the leader
* apply worker has to pass all the transaction ' s changes to the parallel
* apply worker .
*/
*/
static TransApplyAction
static TransApplyAction
get_transaction_apply_action ( TransactionId xid , ParallelApplyWorkerInfo * * winfo )
get_transaction_apply_action ( TransactionId xid , ParallelApplyWorkerInfo * * winfo )
@ -4991,27 +5004,35 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
{
{
return TRANS_PARALLEL_APPLY ;
return TRANS_PARALLEL_APPLY ;
}
}
else if ( in_remote_transaction )
{
return TRANS_LEADER_APPLY ;
}
/*
/*
* Check if we are processing this transaction using a parallel apply
* If we are processing this transaction using a parallel apply worker then
* worker .
* either we send the changes to the parallel worker or if the worker is busy
* then serialize the changes to the file which will later be processed by
* the parallel worker .
*/
*/
* winfo = pa_find_worker ( xid ) ;
* winfo = pa_find_worker ( xid ) ;
if ( ! * winfo )
if ( * winfo & & ( * winfo ) - > serialize_changes )
{
{
return TRANS_LEADER_SERIALIZE ;
return TRANS_LEADER_PARTIAL_ SERIALIZE ;
}
}
else if ( ( * winfo ) - > serialize_changes )
else if ( * winfo )
{
{
return TRANS_LEADER_PARTIAL_SERIALIZE ;
return TRANS_LEADER_SEND_TO_PARALLEL ;
}
/*
* If there is no parallel worker involved to process this transaction then
* we either directly apply the change or serialize it to a file which will
* later be applied when the transaction finish message is processed .
*/
else if ( in_streamed_transaction )
{
return TRANS_LEADER_SERIALIZE ;
}
}
else
else
{
{
return TRANS_LEADER_SEND_TO_PARALLEL ;
return TRANS_LEADER_APPLY ;
}
}
}
}