@ -115,6 +115,7 @@ StartupDecodingContext(List *output_plugin_options,
XLogRecPtr start_lsn ,
XLogRecPtr start_lsn ,
TransactionId xmin_horizon ,
TransactionId xmin_horizon ,
bool need_full_snapshot ,
bool need_full_snapshot ,
bool fast_forward ,
XLogPageReadCB read_page ,
XLogPageReadCB read_page ,
LogicalOutputPluginWriterPrepareWrite prepare_write ,
LogicalOutputPluginWriterPrepareWrite prepare_write ,
LogicalOutputPluginWriterWrite do_write ,
LogicalOutputPluginWriterWrite do_write ,
@ -140,6 +141,7 @@ StartupDecodingContext(List *output_plugin_options,
* ( re - ) load output plugins , so we detect a bad ( removed ) output plugin
* ( re - ) load output plugins , so we detect a bad ( removed ) output plugin
* now .
* now .
*/
*/
if ( ! fast_forward )
LoadOutputPlugin ( & ctx - > callbacks , NameStr ( slot - > data . plugin ) ) ;
LoadOutputPlugin ( & ctx - > callbacks , NameStr ( slot - > data . plugin ) ) ;
/*
/*
@ -191,6 +193,8 @@ StartupDecodingContext(List *output_plugin_options,
ctx - > output_plugin_options = output_plugin_options ;
ctx - > output_plugin_options = output_plugin_options ;
ctx - > fast_forward = fast_forward ;
MemoryContextSwitchTo ( old_context ) ;
MemoryContextSwitchTo ( old_context ) ;
return ctx ;
return ctx ;
@ -303,8 +307,9 @@ CreateInitDecodingContext(char *plugin,
ReplicationSlotSave ( ) ;
ReplicationSlotSave ( ) ;
ctx = StartupDecodingContext ( NIL , InvalidXLogRecPtr , xmin_horizon ,
ctx = StartupDecodingContext ( NIL , InvalidXLogRecPtr , xmin_horizon ,
need_full_snapshot , read_page , prepare_write ,
need_full_snapshot , true ,
do_write , update_progress ) ;
read_page , prepare_write , do_write ,
update_progress ) ;
/* call output plugin initialization callback */
/* call output plugin initialization callback */
old_context = MemoryContextSwitchTo ( ctx - > context ) ;
old_context = MemoryContextSwitchTo ( ctx - > context ) ;
@ -342,6 +347,7 @@ CreateInitDecodingContext(char *plugin,
LogicalDecodingContext *
LogicalDecodingContext *
CreateDecodingContext ( XLogRecPtr start_lsn ,
CreateDecodingContext ( XLogRecPtr start_lsn ,
List * output_plugin_options ,
List * output_plugin_options ,
bool fast_forward ,
XLogPageReadCB read_page ,
XLogPageReadCB read_page ,
LogicalOutputPluginWriterPrepareWrite prepare_write ,
LogicalOutputPluginWriterPrepareWrite prepare_write ,
LogicalOutputPluginWriterWrite do_write ,
LogicalOutputPluginWriterWrite do_write ,
@ -395,8 +401,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
ctx = StartupDecodingContext ( output_plugin_options ,
ctx = StartupDecodingContext ( output_plugin_options ,
start_lsn , InvalidTransactionId , false ,
start_lsn , InvalidTransactionId , false ,
read_page , prepare_write , do _write ,
fast_forward , read_page , prepare_write ,
update_progress ) ;
do_write , update_progress ) ;
/* call output plugin initialization callback */
/* call output plugin initialization callback */
old_context = MemoryContextSwitchTo ( ctx - > context ) ;
old_context = MemoryContextSwitchTo ( ctx - > context ) ;
@ -573,6 +579,8 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i
LogicalErrorCallbackState state ;
LogicalErrorCallbackState state ;
ErrorContextCallback errcallback ;
ErrorContextCallback errcallback ;
Assert ( ! ctx - > fast_forward ) ;
/* Push callback + info on the error context stack */
/* Push callback + info on the error context stack */
state . ctx = ctx ;
state . ctx = ctx ;
state . callback_name = " startup " ;
state . callback_name = " startup " ;
@ -598,6 +606,8 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx)
LogicalErrorCallbackState state ;
LogicalErrorCallbackState state ;
ErrorContextCallback errcallback ;
ErrorContextCallback errcallback ;
Assert ( ! ctx - > fast_forward ) ;
/* Push callback + info on the error context stack */
/* Push callback + info on the error context stack */
state . ctx = ctx ;
state . ctx = ctx ;
state . callback_name = " shutdown " ;
state . callback_name = " shutdown " ;
@ -629,6 +639,8 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
LogicalErrorCallbackState state ;
LogicalErrorCallbackState state ;
ErrorContextCallback errcallback ;
ErrorContextCallback errcallback ;
Assert ( ! ctx - > fast_forward ) ;
/* Push callback + info on the error context stack */
/* Push callback + info on the error context stack */
state . ctx = ctx ;
state . ctx = ctx ;
state . callback_name = " begin " ;
state . callback_name = " begin " ;
@ -658,6 +670,8 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
LogicalErrorCallbackState state ;
LogicalErrorCallbackState state ;
ErrorContextCallback errcallback ;
ErrorContextCallback errcallback ;
Assert ( ! ctx - > fast_forward ) ;
/* Push callback + info on the error context stack */
/* Push callback + info on the error context stack */
state . ctx = ctx ;
state . ctx = ctx ;
state . callback_name = " commit " ;
state . callback_name = " commit " ;
@ -687,6 +701,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
LogicalErrorCallbackState state ;
LogicalErrorCallbackState state ;
ErrorContextCallback errcallback ;
ErrorContextCallback errcallback ;
Assert ( ! ctx - > fast_forward ) ;
/* Push callback + info on the error context stack */
/* Push callback + info on the error context stack */
state . ctx = ctx ;
state . ctx = ctx ;
state . callback_name = " change " ;
state . callback_name = " change " ;
@ -721,6 +737,8 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
ErrorContextCallback errcallback ;
ErrorContextCallback errcallback ;
bool ret ;
bool ret ;
Assert ( ! ctx - > fast_forward ) ;
/* Push callback + info on the error context stack */
/* Push callback + info on the error context stack */
state . ctx = ctx ;
state . ctx = ctx ;
state . callback_name = " filter_by_origin " ;
state . callback_name = " filter_by_origin " ;
@ -751,6 +769,8 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
LogicalErrorCallbackState state ;
LogicalErrorCallbackState state ;
ErrorContextCallback errcallback ;
ErrorContextCallback errcallback ;
Assert ( ! ctx - > fast_forward ) ;
if ( ctx - > callbacks . message_cb = = NULL )
if ( ctx - > callbacks . message_cb = = NULL )
return ;
return ;