@ -4713,6 +4713,17 @@ InitializeLogRepWorker(void)
CommitTransactionCommand ( ) ;
CommitTransactionCommand ( ) ;
}
}
/*
* Reset the origin state .
*/
static void
replorigin_reset ( int code , Datum arg )
{
replorigin_session_origin = InvalidRepOriginId ;
replorigin_session_origin_lsn = InvalidXLogRecPtr ;
replorigin_session_origin_timestamp = 0 ;
}
/* Common function to setup the leader apply or tablesync worker. */
/* Common function to setup the leader apply or tablesync worker. */
void
void
SetupApplyOrSyncWorker ( int worker_slot )
SetupApplyOrSyncWorker ( int worker_slot )
@ -4741,6 +4752,19 @@ SetupApplyOrSyncWorker(int worker_slot)
InitializeLogRepWorker ( ) ;
InitializeLogRepWorker ( ) ;
/*
* Register a callback to reset the origin state before aborting any
* pending transaction during shutdown ( see ShutdownPostgres ( ) ) . This will
* avoid origin advancement for an in - complete transaction which could
* otherwise lead to its loss as such a transaction won ' t be sent by the
* server again .
*
* Note that even a LOG or DEBUG statement placed after setting the origin
* state may process a shutdown signal before committing the current apply
* operation . So , it is important to register such a callback here .
*/
before_shmem_exit ( replorigin_reset , ( Datum ) 0 ) ;
/* Connect to the origin and start the replication. */
/* Connect to the origin and start the replication. */
elog ( DEBUG1 , " connecting to publisher using connection string \" %s \" " ,
elog ( DEBUG1 , " connecting to publisher using connection string \" %s \" " ,
MySubscription - > conninfo ) ;
MySubscription - > conninfo ) ;
@ -4967,12 +4991,23 @@ void
apply_error_callback ( void * arg )
apply_error_callback ( void * arg )
{
{
ApplyErrorCallbackArg * errarg = & apply_error_callback_arg ;
ApplyErrorCallbackArg * errarg = & apply_error_callback_arg ;
int elevel ;
if ( apply_error_callback_arg . command = = 0 )
if ( apply_error_callback_arg . command = = 0 )
return ;
return ;
Assert ( errarg - > origin_name ) ;
Assert ( errarg - > origin_name ) ;
elevel = geterrlevel ( ) ;
/*
* Reset the origin state to prevent the advancement of origin progress if
* we fail to apply . Otherwise , this will result in transaction loss as
* that transaction won ' t be sent again by the server .
*/
if ( elevel > = ERROR )
replorigin_reset ( 0 , ( Datum ) 0 ) ;
if ( errarg - > rel = = NULL )
if ( errarg - > rel = = NULL )
{
{
if ( ! TransactionIdIsValid ( errarg - > remote_xid ) )
if ( ! TransactionIdIsValid ( errarg - > remote_xid ) )