@ -4404,6 +4404,17 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
pfree ( syncslotname ) ;
pfree ( syncslotname ) ;
}
}
/*
* Reset the origin state .
*/
static void
replorigin_reset ( int code , Datum arg )
{
replorigin_session_origin = InvalidRepOriginId ;
replorigin_session_origin_lsn = InvalidXLogRecPtr ;
replorigin_session_origin_timestamp = 0 ;
}
/*
/*
* Run the apply loop with error handling . Disable the subscription ,
* Run the apply loop with error handling . Disable the subscription ,
* if necessary .
* if necessary .
@ -4553,6 +4564,19 @@ ApplyWorkerMain(Datum main_arg)
InitializeApplyWorker ( ) ;
InitializeApplyWorker ( ) ;
/*
* Register a callback to reset the origin state before aborting any
* pending transaction during shutdown ( see ShutdownPostgres ( ) ) . This will
* avoid origin advancement for an in - complete transaction which could
* otherwise lead to its loss as such a transaction won ' t be sent by the
* server again .
*
* Note that even a LOG or DEBUG statement placed after setting the origin
* state may process a shutdown signal before committing the current apply
* operation . So , it is important to register such a callback here .
*/
before_shmem_exit ( replorigin_reset , ( Datum ) 0 ) ;
InitializingApplyWorker = false ;
InitializingApplyWorker = false ;
/* Connect to the origin and start the replication. */
/* Connect to the origin and start the replication. */
@ -4916,12 +4940,23 @@ void
apply_error_callback ( void * arg )
apply_error_callback ( void * arg )
{
{
ApplyErrorCallbackArg * errarg = & apply_error_callback_arg ;
ApplyErrorCallbackArg * errarg = & apply_error_callback_arg ;
int elevel ;
if ( apply_error_callback_arg . command = = 0 )
if ( apply_error_callback_arg . command = = 0 )
return ;
return ;
Assert ( errarg - > origin_name ) ;
Assert ( errarg - > origin_name ) ;
elevel = geterrlevel ( ) ;
/*
* Reset the origin state to prevent the advancement of origin progress if
* we fail to apply . Otherwise , this will result in transaction loss as
* that transaction won ' t be sent again by the server .
*/
if ( elevel > = ERROR )
replorigin_reset ( 0 , ( Datum ) 0 ) ;
if ( errarg - > rel = = NULL )
if ( errarg - > rel = = NULL )
{
{
if ( ! TransactionIdIsValid ( errarg - > remote_xid ) )
if ( ! TransactionIdIsValid ( errarg - > remote_xid ) )