@ -59,6 +59,13 @@ static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
static void begin_cb_wrapper ( ReorderBuffer * cache , ReorderBufferTXN * txn ) ;
static void commit_cb_wrapper ( ReorderBuffer * cache , ReorderBufferTXN * txn ,
XLogRecPtr commit_lsn ) ;
static void begin_prepare_cb_wrapper ( ReorderBuffer * cache , ReorderBufferTXN * txn ) ;
static void prepare_cb_wrapper ( ReorderBuffer * cache , ReorderBufferTXN * txn ,
XLogRecPtr prepare_lsn ) ;
static void commit_prepared_cb_wrapper ( ReorderBuffer * cache , ReorderBufferTXN * txn ,
XLogRecPtr commit_lsn ) ;
static void rollback_prepared_cb_wrapper ( ReorderBuffer * cache , ReorderBufferTXN * txn ,
XLogRecPtr prepare_end_lsn , TimestampTz prepare_time ) ;
static void change_cb_wrapper ( ReorderBuffer * cache , ReorderBufferTXN * txn ,
Relation relation , ReorderBufferChange * change ) ;
static void truncate_cb_wrapper ( ReorderBuffer * cache , ReorderBufferTXN * txn ,
@ -74,6 +81,8 @@ static void stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr last_lsn ) ;
static void stream_abort_cb_wrapper ( ReorderBuffer * cache , ReorderBufferTXN * txn ,
XLogRecPtr abort_lsn ) ;
static void stream_prepare_cb_wrapper ( ReorderBuffer * cache , ReorderBufferTXN * txn ,
XLogRecPtr prepare_lsn ) ;
static void stream_commit_cb_wrapper ( ReorderBuffer * cache , ReorderBufferTXN * txn ,
XLogRecPtr commit_lsn ) ;
static void stream_change_cb_wrapper ( ReorderBuffer * cache , ReorderBufferTXN * txn ,
@ -237,11 +246,37 @@ StartupDecodingContext(List *output_plugin_options,
ctx - > reorder - > stream_start = stream_start_cb_wrapper ;
ctx - > reorder - > stream_stop = stream_stop_cb_wrapper ;
ctx - > reorder - > stream_abort = stream_abort_cb_wrapper ;
ctx - > reorder - > stream_prepare = stream_prepare_cb_wrapper ;
ctx - > reorder - > stream_commit = stream_commit_cb_wrapper ;
ctx - > reorder - > stream_change = stream_change_cb_wrapper ;
ctx - > reorder - > stream_message = stream_message_cb_wrapper ;
ctx - > reorder - > stream_truncate = stream_truncate_cb_wrapper ;
/*
* To support two - phase logical decoding , we require
* begin_prepare / prepare / commit - prepare / abort - prepare callbacks . The
* filter_prepare callback is optional . We however enable two - phase
* logical decoding when at least one of the methods is enabled so that we
* can easily identify missing methods .
*
* We decide it here , but only check it later in the wrappers .
*/
ctx - > twophase = ( ctx - > callbacks . begin_prepare_cb ! = NULL ) | |
( ctx - > callbacks . prepare_cb ! = NULL ) | |
( ctx - > callbacks . commit_prepared_cb ! = NULL ) | |
( ctx - > callbacks . rollback_prepared_cb ! = NULL ) | |
( ctx - > callbacks . stream_prepare_cb ! = NULL ) | |
( ctx - > callbacks . filter_prepare_cb ! = NULL ) ;
/*
* Callback to support decoding at prepare time .
*/
ctx - > reorder - > begin_prepare = begin_prepare_cb_wrapper ;
ctx - > reorder - > prepare = prepare_cb_wrapper ;
ctx - > reorder - > commit_prepared = commit_prepared_cb_wrapper ;
ctx - > reorder - > rollback_prepared = rollback_prepared_cb_wrapper ;
ctx - > out = makeStringInfo ( ) ;
ctx - > prepare_write = prepare_write ;
ctx - > write = do_write ;
@ -782,6 +817,186 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
error_context_stack = errcallback . previous ;
}
/*
* The functionality of begin_prepare is quite similar to begin with the
* exception that this will have gid ( global transaction id ) information which
* can be used by plugin . Now , we thought about extending the existing begin
* but that would break the replication protocol and additionally this looks
* cleaner .
*/
static void
begin_prepare_cb_wrapper ( ReorderBuffer * cache , ReorderBufferTXN * txn )
{
LogicalDecodingContext * ctx = cache - > private_data ;
LogicalErrorCallbackState state ;
ErrorContextCallback errcallback ;
Assert ( ! ctx - > fast_forward ) ;
/* We're only supposed to call this when two-phase commits are supported */
Assert ( ctx - > twophase ) ;
/* Push callback + info on the error context stack */
state . ctx = ctx ;
state . callback_name = " begin_prepare " ;
state . report_location = txn - > first_lsn ;
errcallback . callback = output_plugin_error_callback ;
errcallback . arg = ( void * ) & state ;
errcallback . previous = error_context_stack ;
error_context_stack = & errcallback ;
/* set output state */
ctx - > accept_writes = true ;
ctx - > write_xid = txn - > xid ;
ctx - > write_location = txn - > first_lsn ;
/*
* If the plugin supports two - phase commits then begin prepare callback is
* mandatory
*/
if ( ctx - > callbacks . begin_prepare_cb = = NULL )
ereport ( ERROR ,
( errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " logical replication at prepare time requires begin_prepare_cb callback " ) ) ) ;
/* do the actual work: call callback */
ctx - > callbacks . begin_prepare_cb ( ctx , txn ) ;
/* Pop the error context stack */
error_context_stack = errcallback . previous ;
}
static void
prepare_cb_wrapper ( ReorderBuffer * cache , ReorderBufferTXN * txn ,
XLogRecPtr prepare_lsn )
{
LogicalDecodingContext * ctx = cache - > private_data ;
LogicalErrorCallbackState state ;
ErrorContextCallback errcallback ;
Assert ( ! ctx - > fast_forward ) ;
/* We're only supposed to call this when two-phase commits are supported */
Assert ( ctx - > twophase ) ;
/* Push callback + info on the error context stack */
state . ctx = ctx ;
state . callback_name = " prepare " ;
state . report_location = txn - > final_lsn ; /* beginning of prepare record */
errcallback . callback = output_plugin_error_callback ;
errcallback . arg = ( void * ) & state ;
errcallback . previous = error_context_stack ;
error_context_stack = & errcallback ;
/* set output state */
ctx - > accept_writes = true ;
ctx - > write_xid = txn - > xid ;
ctx - > write_location = txn - > end_lsn ; /* points to the end of the record */
/*
* If the plugin supports two - phase commits then prepare callback is
* mandatory
*/
if ( ctx - > callbacks . prepare_cb = = NULL )
ereport ( ERROR ,
( errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " logical replication at prepare time requires prepare_cb callback " ) ) ) ;
/* do the actual work: call callback */
ctx - > callbacks . prepare_cb ( ctx , txn , prepare_lsn ) ;
/* Pop the error context stack */
error_context_stack = errcallback . previous ;
}
static void
commit_prepared_cb_wrapper ( ReorderBuffer * cache , ReorderBufferTXN * txn ,
XLogRecPtr commit_lsn )
{
LogicalDecodingContext * ctx = cache - > private_data ;
LogicalErrorCallbackState state ;
ErrorContextCallback errcallback ;
Assert ( ! ctx - > fast_forward ) ;
/* We're only supposed to call this when two-phase commits are supported */
Assert ( ctx - > twophase ) ;
/* Push callback + info on the error context stack */
state . ctx = ctx ;
state . callback_name = " commit_prepared " ;
state . report_location = txn - > final_lsn ; /* beginning of commit record */
errcallback . callback = output_plugin_error_callback ;
errcallback . arg = ( void * ) & state ;
errcallback . previous = error_context_stack ;
error_context_stack = & errcallback ;
/* set output state */
ctx - > accept_writes = true ;
ctx - > write_xid = txn - > xid ;
ctx - > write_location = txn - > end_lsn ; /* points to the end of the record */
/*
* If the plugin support two - phase commits then commit prepared callback
* is mandatory
*/
if ( ctx - > callbacks . commit_prepared_cb = = NULL )
ereport ( ERROR ,
( errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " logical replication at prepare time requires commit_prepared_cb callback " ) ) ) ;
/* do the actual work: call callback */
ctx - > callbacks . commit_prepared_cb ( ctx , txn , commit_lsn ) ;
/* Pop the error context stack */
error_context_stack = errcallback . previous ;
}
static void
rollback_prepared_cb_wrapper ( ReorderBuffer * cache , ReorderBufferTXN * txn ,
XLogRecPtr prepare_end_lsn ,
TimestampTz prepare_time )
{
LogicalDecodingContext * ctx = cache - > private_data ;
LogicalErrorCallbackState state ;
ErrorContextCallback errcallback ;
Assert ( ! ctx - > fast_forward ) ;
/* We're only supposed to call this when two-phase commits are supported */
Assert ( ctx - > twophase ) ;
/* Push callback + info on the error context stack */
state . ctx = ctx ;
state . callback_name = " rollback_prepared " ;
state . report_location = txn - > final_lsn ; /* beginning of commit record */
errcallback . callback = output_plugin_error_callback ;
errcallback . arg = ( void * ) & state ;
errcallback . previous = error_context_stack ;
error_context_stack = & errcallback ;
/* set output state */
ctx - > accept_writes = true ;
ctx - > write_xid = txn - > xid ;
ctx - > write_location = txn - > end_lsn ; /* points to the end of the record */
/*
* If the plugin support two - phase commits then rollback prepared callback
* is mandatory
*/
if ( ctx - > callbacks . rollback_prepared_cb = = NULL )
ereport ( ERROR ,
( errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " logical replication at prepare time requires rollback_prepared_cb callback " ) ) ) ;
/* do the actual work: call callback */
ctx - > callbacks . rollback_prepared_cb ( ctx , txn , prepare_end_lsn ,
prepare_time ) ;
/* Pop the error context stack */
error_context_stack = errcallback . previous ;
}
static void
change_cb_wrapper ( ReorderBuffer * cache , ReorderBufferTXN * txn ,
Relation relation , ReorderBufferChange * change )
@ -859,6 +1074,45 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
error_context_stack = errcallback . previous ;
}
bool
filter_prepare_cb_wrapper ( LogicalDecodingContext * ctx , const char * gid )
{
LogicalErrorCallbackState state ;
ErrorContextCallback errcallback ;
bool ret ;
Assert ( ! ctx - > fast_forward ) ;
/*
* Skip if decoding of two - phase transactions at PREPARE time is not
* enabled . In that case , all two - phase transactions are considered
* filtered out and will be applied as regular transactions at COMMIT
* PREPARED .
*/
if ( ! ctx - > twophase )
return true ;
/* Push callback + info on the error context stack */
state . ctx = ctx ;
state . callback_name = " filter_prepare " ;
state . report_location = InvalidXLogRecPtr ;
errcallback . callback = output_plugin_error_callback ;
errcallback . arg = ( void * ) & state ;
errcallback . previous = error_context_stack ;
error_context_stack = & errcallback ;
/* set output state */
ctx - > accept_writes = false ;
/* do the actual work: call callback */
ret = ctx - > callbacks . filter_prepare_cb ( ctx , gid ) ;
/* Pop the error context stack */
error_context_stack = errcallback . previous ;
return ret ;
}
bool
filter_by_origin_cb_wrapper ( LogicalDecodingContext * ctx , RepOriginId origin_id )
{
@ -1056,6 +1310,49 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
error_context_stack = errcallback . previous ;
}
static void
stream_prepare_cb_wrapper ( ReorderBuffer * cache , ReorderBufferTXN * txn ,
XLogRecPtr prepare_lsn )
{
LogicalDecodingContext * ctx = cache - > private_data ;
LogicalErrorCallbackState state ;
ErrorContextCallback errcallback ;
Assert ( ! ctx - > fast_forward ) ;
/*
* We ' re only supposed to call this when streaming and two - phase commits
* are supported .
*/
Assert ( ctx - > streaming ) ;
Assert ( ctx - > twophase ) ;
/* Push callback + info on the error context stack */
state . ctx = ctx ;
state . callback_name = " stream_prepare " ;
state . report_location = txn - > final_lsn ;
errcallback . callback = output_plugin_error_callback ;
errcallback . arg = ( void * ) & state ;
errcallback . previous = error_context_stack ;
error_context_stack = & errcallback ;
/* set output state */
ctx - > accept_writes = true ;
ctx - > write_xid = txn - > xid ;
ctx - > write_location = txn - > end_lsn ;
/* in streaming mode with two-phase commits, stream_prepare_cb is required */
if ( ctx - > callbacks . stream_prepare_cb = = NULL )
ereport ( ERROR ,
( errcode ( ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ) ,
errmsg ( " logical streaming at prepare time requires a stream_prepare_cb callback " ) ) ) ;
ctx - > callbacks . stream_prepare_cb ( ctx , txn , prepare_lsn ) ;
/* Pop the error context stack */
error_context_stack = errcallback . previous ;
}
static void
stream_commit_cb_wrapper ( ReorderBuffer * cache , ReorderBufferTXN * txn ,
XLogRecPtr commit_lsn )