@ -746,6 +746,7 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i
/* set output state */
ctx - > accept_writes = false ;
ctx - > end_xact = false ;
/* do the actual work: call callback */
ctx - > callbacks . startup_cb ( ctx , opt , is_init ) ;
@ -773,6 +774,7 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx)
/* set output state */
ctx - > accept_writes = false ;
ctx - > end_xact = false ;
/* do the actual work: call callback */
ctx - > callbacks . shutdown_cb ( ctx ) ;
@ -808,6 +810,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
ctx - > accept_writes = true ;
ctx - > write_xid = txn - > xid ;
ctx - > write_location = txn - > first_lsn ;
ctx - > end_xact = false ;
/* do the actual work: call callback */
ctx - > callbacks . begin_cb ( ctx , txn ) ;
@ -839,6 +842,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx - > accept_writes = true ;
ctx - > write_xid = txn - > xid ;
ctx - > write_location = txn - > end_lsn ; /* points to the end of the record */
ctx - > end_xact = true ;
/* do the actual work: call callback */
ctx - > callbacks . commit_cb ( ctx , txn , commit_lsn ) ;
@ -879,6 +883,7 @@ begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
ctx - > accept_writes = true ;
ctx - > write_xid = txn - > xid ;
ctx - > write_location = txn - > first_lsn ;
ctx - > end_xact = false ;
/*
* If the plugin supports two - phase commits then begin prepare callback is
@ -923,6 +928,7 @@ prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx - > accept_writes = true ;
ctx - > write_xid = txn - > xid ;
ctx - > write_location = txn - > end_lsn ; /* points to the end of the record */
ctx - > end_xact = true ;
/*
* If the plugin supports two - phase commits then prepare callback is
@ -967,6 +973,7 @@ commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx - > accept_writes = true ;
ctx - > write_xid = txn - > xid ;
ctx - > write_location = txn - > end_lsn ; /* points to the end of the record */
ctx - > end_xact = true ;
/*
* If the plugin support two - phase commits then commit prepared callback
@ -1012,6 +1019,7 @@ rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx - > accept_writes = true ;
ctx - > write_xid = txn - > xid ;
ctx - > write_location = txn - > end_lsn ; /* points to the end of the record */
ctx - > end_xact = true ;
/*
* If the plugin support two - phase commits then rollback prepared callback
@ -1062,6 +1070,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
*/
ctx - > write_location = change - > lsn ;
ctx - > end_xact = false ;
ctx - > callbacks . change_cb ( ctx , txn , relation , change ) ;
/* Pop the error context stack */
@ -1102,6 +1112,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
*/
ctx - > write_location = change - > lsn ;
ctx - > end_xact = false ;
ctx - > callbacks . truncate_cb ( ctx , txn , nrelations , relations , change ) ;
/* Pop the error context stack */
@ -1129,6 +1141,7 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
/* set output state */
ctx - > accept_writes = false ;
ctx - > end_xact = false ;
/* do the actual work: call callback */
ret = ctx - > callbacks . filter_prepare_cb ( ctx , xid , gid ) ;
@ -1159,6 +1172,7 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
/* set output state */
ctx - > accept_writes = false ;
ctx - > end_xact = false ;
/* do the actual work: call callback */
ret = ctx - > callbacks . filter_by_origin_cb ( ctx , origin_id ) ;
@ -1196,6 +1210,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx - > accept_writes = true ;
ctx - > write_xid = txn ! = NULL ? txn - > xid : InvalidTransactionId ;
ctx - > write_location = message_lsn ;
ctx - > end_xact = false ;
/* do the actual work: call callback */
ctx - > callbacks . message_cb ( ctx , txn , message_lsn , transactional , prefix ,
@ -1239,6 +1254,8 @@ stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
*/
ctx - > write_location = first_lsn ;
ctx - > end_xact = false ;
/* in streaming mode, stream_start_cb is required */
if ( ctx - > callbacks . stream_start_cb = = NULL )
ereport ( ERROR ,
@ -1286,6 +1303,8 @@ stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
*/
ctx - > write_location = last_lsn ;
ctx - > end_xact = false ;
/* in streaming mode, stream_stop_cb is required */
if ( ctx - > callbacks . stream_stop_cb = = NULL )
ereport ( ERROR ,
@ -1325,6 +1344,7 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx - > accept_writes = true ;
ctx - > write_xid = txn - > xid ;
ctx - > write_location = abort_lsn ;
ctx - > end_xact = true ;
/* in streaming mode, stream_abort_cb is required */
if ( ctx - > callbacks . stream_abort_cb = = NULL )
@ -1369,6 +1389,7 @@ stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx - > accept_writes = true ;
ctx - > write_xid = txn - > xid ;
ctx - > write_location = txn - > end_lsn ;
ctx - > end_xact = true ;
/* in streaming mode with two-phase commits, stream_prepare_cb is required */
if ( ctx - > callbacks . stream_prepare_cb = = NULL )
@ -1409,6 +1430,7 @@ stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx - > accept_writes = true ;
ctx - > write_xid = txn - > xid ;
ctx - > write_location = txn - > end_lsn ;
ctx - > end_xact = true ;
/* in streaming mode, stream_commit_cb is required */
if ( ctx - > callbacks . stream_commit_cb = = NULL )
@ -1457,6 +1479,8 @@ stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
*/
ctx - > write_location = change - > lsn ;
ctx - > end_xact = false ;
/* in streaming mode, stream_change_cb is required */
if ( ctx - > callbacks . stream_change_cb = = NULL )
ereport ( ERROR ,
@ -1501,6 +1525,7 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx - > accept_writes = true ;
ctx - > write_xid = txn ! = NULL ? txn - > xid : InvalidTransactionId ;
ctx - > write_location = message_lsn ;
ctx - > end_xact = false ;
/* do the actual work: call callback */
ctx - > callbacks . stream_message_cb ( ctx , txn , message_lsn , transactional , prefix ,
@ -1549,6 +1574,8 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
*/
ctx - > write_location = change - > lsn ;
ctx - > end_xact = false ;
ctx - > callbacks . stream_truncate_cb ( ctx , txn , nrelations , relations , change ) ;
/* Pop the error context stack */