@ -92,8 +92,6 @@ static void send_relation_and_attrs(Relation relation, TransactionId xid,
static void send_repl_origin ( LogicalDecodingContext * ctx ,
RepOriginId origin_id , XLogRecPtr origin_lsn ,
bool send_origin ) ;
static void update_replication_progress ( LogicalDecodingContext * ctx ,
bool skipped_xact ) ;
/*
* Only 3 publication actions are used for row filtering ( " insert " , " update " ,
@ -586,7 +584,7 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
* from this transaction has been sent to the downstream .
*/
sent_begin_txn = txndata - > sent_begin_txn ;
update_replication_p rogress( ctx , ! sent_begin_txn ) ;
OutputPluginUpdateP rogress( ctx , ! sent_begin_txn ) ;
pfree ( txndata ) ;
txn - > output_plugin_private = NULL ;
@ -625,7 +623,7 @@ static void
pgoutput_prepare_txn ( LogicalDecodingContext * ctx , ReorderBufferTXN * txn ,
XLogRecPtr prepare_lsn )
{
update_replication_p rogress( ctx , false ) ;
OutputPluginUpdateP rogress( ctx , false ) ;
OutputPluginPrepareWrite ( ctx , true ) ;
logicalrep_write_prepare ( ctx - > out , txn , prepare_lsn ) ;
@ -639,7 +637,7 @@ static void
pgoutput_commit_prepared_txn ( LogicalDecodingContext * ctx , ReorderBufferTXN * txn ,
XLogRecPtr commit_lsn )
{
update_replication_p rogress( ctx , false ) ;
OutputPluginUpdateP rogress( ctx , false ) ;
OutputPluginPrepareWrite ( ctx , true ) ;
logicalrep_write_commit_prepared ( ctx - > out , txn , commit_lsn ) ;
@ -655,7 +653,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
XLogRecPtr prepare_end_lsn ,
TimestampTz prepare_time )
{
update_replication_p rogress( ctx , false ) ;
OutputPluginUpdateP rogress( ctx , false ) ;
OutputPluginPrepareWrite ( ctx , true ) ;
logicalrep_write_rollback_prepared ( ctx - > out , txn , prepare_end_lsn ,
@ -1401,8 +1399,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
TupleTableSlot * old_slot = NULL ;
TupleTableSlot * new_slot = NULL ;
update_replication_progress ( ctx , false ) ;
if ( ! is_publishable_relation ( relation ) )
return ;
@ -1637,8 +1633,6 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Oid * relids ;
TransactionId xid = InvalidTransactionId ;
update_replication_progress ( ctx , false ) ;
/* Remember the xid for the change in streaming mode. See pgoutput_change. */
if ( in_streaming )
xid = change - > txn - > xid ;
@ -1702,8 +1696,6 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
PGOutputData * data = ( PGOutputData * ) ctx - > output_plugin_private ;
TransactionId xid = InvalidTransactionId ;
update_replication_progress ( ctx , false ) ;
if ( ! data - > messages )
return ;
@ -1903,7 +1895,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
Assert ( ! in_streaming ) ;
Assert ( rbtxn_is_streamed ( txn ) ) ;
update_replication_p rogress( ctx , false ) ;
OutputPluginUpdateP rogress( ctx , false ) ;
OutputPluginPrepareWrite ( ctx , true ) ;
logicalrep_write_stream_commit ( ctx - > out , txn , commit_lsn ) ;
@ -1924,7 +1916,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
{
Assert ( rbtxn_is_streamed ( txn ) ) ;
update_replication_p rogress( ctx , false ) ;
OutputPluginUpdateP rogress( ctx , false ) ;
OutputPluginPrepareWrite ( ctx , true ) ;
logicalrep_write_stream_prepare ( ctx - > out , txn , prepare_lsn ) ;
OutputPluginWrite ( ctx , true ) ;
@ -2424,37 +2416,3 @@ send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
}
}
}
/*
* Try to update progress and send a keepalive message if too many changes were
* processed .
*
* For a large transaction , if we don ' t send any change to the downstream for a
* long time ( exceeds the wal_receiver_timeout of standby ) then it can timeout .
* This can happen when all or most of the changes are either not published or
* got filtered out .
*/
static void
update_replication_progress ( LogicalDecodingContext * ctx , bool skipped_xact )
{
static int changes_count = 0 ;
/*
* We don ' t want to try sending a keepalive message after processing each
* change as that can have overhead . Tests revealed that there is no
* noticeable overhead in doing it after continuously processing 100 or so
* changes .
*/
# define CHANGES_THRESHOLD 100
/*
* If we are at the end of transaction LSN , update progress tracking .
* Otherwise , after continuously processing CHANGES_THRESHOLD changes , we
* try to send a keepalive message if required .
*/
if ( ctx - > end_xact | | + + changes_count > = CHANGES_THRESHOLD )
{
OutputPluginUpdateProgress ( ctx , skipped_xact ) ;
changes_count = 0 ;
}
}