@ -226,7 +226,6 @@ typedef struct ApplyErrorCallbackArg
/* Remote node information */
int remote_attnum ; /* -1 if invalid */
TransactionId remote_xid ;
TimestampTz ts ; /* commit, rollback, or prepare timestamp */
} ApplyErrorCallbackArg ;
static ApplyErrorCallbackArg apply_error_callback_arg =
@ -235,7 +234,6 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
. rel = NULL ,
. remote_attnum = - 1 ,
. remote_xid = InvalidTransactionId ,
. ts = 0 ,
} ;
static MemoryContext ApplyMessageContext = NULL ;
@ -334,7 +332,7 @@ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
/* Functions for apply error callback */
static void apply_error_callback ( void * arg ) ;
static inline void set_apply_error_context_xact ( TransactionId xid , TimestampTz ts ) ;
static inline void set_apply_error_context_xact ( TransactionId xid ) ;
static inline void reset_apply_error_context_info ( void ) ;
/*
@ -787,7 +785,7 @@ apply_handle_begin(StringInfo s)
LogicalRepBeginData begin_data ;
logicalrep_read_begin ( s , & begin_data ) ;
set_apply_error_context_xact ( begin_data . xid , begin_data . committime ) ;
set_apply_error_context_xact ( begin_data . xid ) ;
remote_final_lsn = begin_data . final_lsn ;
@ -839,7 +837,7 @@ apply_handle_begin_prepare(StringInfo s)
errmsg_internal ( " tablesync worker received a BEGIN PREPARE message " ) ) ) ;
logicalrep_read_begin_prepare ( s , & begin_data ) ;
set_apply_error_context_xact ( begin_data . xid , begin_data . prepare_time ) ;
set_apply_error_context_xact ( begin_data . xid ) ;
remote_final_lsn = begin_data . prepare_lsn ;
@ -938,7 +936,7 @@ apply_handle_commit_prepared(StringInfo s)
char gid [ GIDSIZE ] ;
logicalrep_read_commit_prepared ( s , & prepare_data ) ;
set_apply_error_context_xact ( prepare_data . xid , prepare_data . commit_time ) ;
set_apply_error_context_xact ( prepare_data . xid ) ;
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid ( MySubscription - > oid , prepare_data . xid ,
@ -979,7 +977,7 @@ apply_handle_rollback_prepared(StringInfo s)
char gid [ GIDSIZE ] ;
logicalrep_read_rollback_prepared ( s , & rollback_data ) ;
set_apply_error_context_xact ( rollback_data . xid , rollback_data . rollback_time ) ;
set_apply_error_context_xact ( rollback_data . xid ) ;
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid ( MySubscription - > oid , rollback_data . xid ,
@ -1044,7 +1042,7 @@ apply_handle_stream_prepare(StringInfo s)
errmsg_internal ( " tablesync worker received a STREAM PREPARE message " ) ) ) ;
logicalrep_read_stream_prepare ( s , & prepare_data ) ;
set_apply_error_context_xact ( prepare_data . xid , prepare_data . prepare_time ) ;
set_apply_error_context_xact ( prepare_data . xid ) ;
elog ( DEBUG1 , " received prepare for streamed transaction %u " , prepare_data . xid ) ;
@ -1126,7 +1124,7 @@ apply_handle_stream_start(StringInfo s)
( errcode ( ERRCODE_PROTOCOL_VIOLATION ) ,
errmsg_internal ( " invalid transaction ID in streamed replication transaction " ) ) ) ;
set_apply_error_context_xact ( stream_xid , 0 ) ;
set_apply_error_context_xact ( stream_xid ) ;
/*
* Initialize the worker ' s stream_fileset if we haven ' t yet . This will be
@ -1215,7 +1213,7 @@ apply_handle_stream_abort(StringInfo s)
*/
if ( xid = = subxid )
{
set_apply_error_context_xact ( xid , 0 ) ;
set_apply_error_context_xact ( xid ) ;
stream_cleanup_files ( MyLogicalRepWorker - > subid , xid ) ;
}
else
@ -1241,7 +1239,7 @@ apply_handle_stream_abort(StringInfo s)
bool found = false ;
char path [ MAXPGPATH ] ;
set_apply_error_context_xact ( subxid , 0 ) ;
set_apply_error_context_xact ( subxid ) ;
subidx = - 1 ;
begin_replication_step ( ) ;
@ -1426,7 +1424,7 @@ apply_handle_stream_commit(StringInfo s)
errmsg_internal ( " STREAM COMMIT message without STREAM STOP " ) ) ) ;
xid = logicalrep_read_stream_commit ( s , & commit_data ) ;
set_apply_error_context_xact ( xid , commit_data . committime ) ;
set_apply_error_context_xact ( xid ) ;
elog ( DEBUG1 , " received commit for streamed transaction %u " , xid ) ;
@ -3648,46 +3646,41 @@ IsLogicalWorker(void)
static void
apply_error_callback ( void * arg )
{
StringInfoData buf ;
ApplyErrorCallbackArg * errarg = & apply_error_callback_arg ;
if ( apply_error_callback_arg . command = = 0 )
return ;
initStringInfo ( & buf ) ;
appendStringInfo ( & buf , _ ( " processing remote data during \" %s \" " ) ,
logicalrep_message_type ( errarg - > command ) ) ;
/* append relation information */
if ( errarg - > rel )
{
appendStringInfo ( & buf , _ ( " for replication target relation \" %s.%s \" " ) ,
errarg - > rel - > remoterel . nspname ,
errarg - > rel - > remoterel . relname ) ;
if ( errarg - > remote_attnum > = 0 )
appendStringInfo ( & buf , _ ( " column \" %s \" " ) ,
errarg - > rel - > remoterel . attnames [ errarg - > remote_attnum ] ) ;
}
/* append transaction information */
if ( TransactionIdIsNormal ( errarg - > remote_xid ) )
if ( errarg - > rel = = NULL )
{
appendStringInfo ( & buf , _ ( " in transaction %u " ) , errarg - > remote_xid ) ;
if ( errarg - > ts ! = 0 )
appendStringInfo ( & buf , _ ( " at %s " ) ,
timestamptz_to_str ( errarg - > ts ) ) ;
if ( ! TransactionIdIsValid ( errarg - > remote_xid ) )
errcontext ( " processing remote data during \" %s \" " ,
logicalrep_message_type ( errarg - > command ) ) ;
else
errcontext ( " processing remote data during \" %s \" in transaction %u " ,
logicalrep_message_type ( errarg - > command ) ,
errarg - > remote_xid ) ;
}
errcontext ( " %s " , buf . data ) ;
pfree ( buf . data ) ;
else if ( errarg - > remote_attnum < 0 )
errcontext ( " processing remote data during \" %s \" for replication target relation \" %s.%s \" in transaction %u " ,
logicalrep_message_type ( errarg - > command ) ,
errarg - > rel - > remoterel . nspname ,
errarg - > rel - > remoterel . relname ,
errarg - > remote_xid ) ;
else
errcontext ( " processing remote data during \" %s \" for replication target relation \" %s.%s \" column \" %s \" in transaction %u " ,
logicalrep_message_type ( errarg - > command ) ,
errarg - > rel - > remoterel . nspname ,
errarg - > rel - > remoterel . relname ,
errarg - > rel - > remoterel . attnames [ errarg - > remote_attnum ] ,
errarg - > remote_xid ) ;
}
/* Set transaction information of apply error callback */
static inline void
set_apply_error_context_xact ( TransactionId xid , TimestampTz ts )
set_apply_error_context_xact ( TransactionId xid )
{
apply_error_callback_arg . remote_xid = xid ;
apply_error_callback_arg . ts = ts ;
}
/* Reset all information of apply error callback */
@ -3697,5 +3690,5 @@ reset_apply_error_context_info(void)
apply_error_callback_arg . command = 0 ;
apply_error_callback_arg . rel = NULL ;
apply_error_callback_arg . remote_attnum = - 1 ;
set_apply_error_context_xact ( InvalidTransactionId , 0 ) ;
set_apply_error_context_xact ( InvalidTransactionId ) ;
}