@ -47,17 +47,40 @@ static void pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferChange * change ) ;
static bool pgoutput_origin_filter ( LogicalDecodingContext * ctx ,
RepOriginId origin_id ) ;
static void pgoutput_stream_start ( struct LogicalDecodingContext * ctx ,
ReorderBufferTXN * txn ) ;
static void pgoutput_stream_stop ( struct LogicalDecodingContext * ctx ,
ReorderBufferTXN * txn ) ;
static void pgoutput_stream_abort ( struct LogicalDecodingContext * ctx ,
ReorderBufferTXN * txn ,
XLogRecPtr abort_lsn ) ;
static void pgoutput_stream_commit ( struct LogicalDecodingContext * ctx ,
ReorderBufferTXN * txn ,
XLogRecPtr commit_lsn ) ;
static bool publications_valid ;
static bool in_streaming ;
static List * LoadPublications ( List * pubnames ) ;
static void publication_invalidation_cb ( Datum arg , int cacheid ,
uint32 hashvalue ) ;
static void send_relation_and_attrs ( Relation relation , LogicalDecodingContext * ctx ) ;
static void send_relation_and_attrs ( Relation relation , TransactionId xid ,
LogicalDecodingContext * ctx ) ;
/*
* Entry in the map used to remember which relation schemas we sent .
*
* The schema_sent flag determines if the current schema record was already
* sent to the subscriber ( in which case we don ' t need to send it again ) .
*
* The schema cache on downstream is however updated only at commit time ,
* and with streamed transactions the commit order may be different from
* the order the transactions are sent in . Also , the ( sub ) transactions
* might get aborted so we need to send the schema for each ( sub ) transaction
* so that we don ' t loose the schema information on abort . For handling this ,
* we maintain the list of xids ( streamed_txns ) for those we have already sent
* the schema .
*
* For partitions , ' pubactions ' considers not only the table ' s own
* publications , but also those of all of its ancestors .
*/
@ -70,6 +93,8 @@ typedef struct RelationSyncEntry
* have been sent for this to be true .
*/
bool schema_sent ;
List * streamed_txns ; /* streamed toplevel transactions with this
* schema */
bool replicate_valid ;
PublicationActions pubactions ;
@ -95,10 +120,15 @@ typedef struct RelationSyncEntry
static HTAB * RelationSyncCache = NULL ;
static void init_rel_sync_cache ( MemoryContext decoding_context ) ;
static void cleanup_rel_sync_cache ( TransactionId xid , bool is_commit ) ;
static RelationSyncEntry * get_rel_sync_entry ( PGOutputData * data , Oid relid ) ;
static void rel_sync_cache_relation_cb ( Datum arg , Oid relid ) ;
static void rel_sync_cache_publication_cb ( Datum arg , int cacheid ,
uint32 hashvalue ) ;
static void set_schema_sent_in_streamed_txn ( RelationSyncEntry * entry ,
TransactionId xid ) ;
static bool get_schema_sent_in_streamed_txn ( RelationSyncEntry * entry ,
TransactionId xid ) ;
/*
* Specify output plugin callbacks
@ -115,16 +145,26 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb - > commit_cb = pgoutput_commit_txn ;
cb - > filter_by_origin_cb = pgoutput_origin_filter ;
cb - > shutdown_cb = pgoutput_shutdown ;
/* transaction streaming */
cb - > stream_start_cb = pgoutput_stream_start ;
cb - > stream_stop_cb = pgoutput_stream_stop ;
cb - > stream_abort_cb = pgoutput_stream_abort ;
cb - > stream_commit_cb = pgoutput_stream_commit ;
cb - > stream_change_cb = pgoutput_change ;
cb - > stream_truncate_cb = pgoutput_truncate ;
}
static void
parse_output_parameters ( List * options , uint32 * protocol_version ,
List * * publication_names , bool * binary )
List * * publication_names , bool * binary ,
bool * enable_streaming )
{
ListCell * lc ;
bool protocol_version_given = false ;
bool publication_names_given = false ;
bool binary_option_given = false ;
bool streaming_given = false ;
* binary = false ;
@ -182,6 +222,16 @@ parse_output_parameters(List *options, uint32 *protocol_version,
* binary = defGetBoolean ( defel ) ;
}
else if ( strcmp ( defel - > defname , " streaming " ) = = 0 )
{
if ( streaming_given )
ereport ( ERROR ,
( errcode ( ERRCODE_SYNTAX_ERROR ) ,
errmsg ( " conflicting or redundant options " ) ) ) ;
streaming_given = true ;
* enable_streaming = defGetBoolean ( defel ) ;
}
else
elog ( ERROR , " unrecognized pgoutput option: %s " , defel - > defname ) ;
}
@ -194,6 +244,7 @@ static void
pgoutput_startup ( LogicalDecodingContext * ctx , OutputPluginOptions * opt ,
bool is_init )
{
bool enable_streaming = false ;
PGOutputData * data = palloc0 ( sizeof ( PGOutputData ) ) ;
/* Create our memory context for private allocations. */
@ -217,7 +268,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
parse_output_parameters ( ctx - > output_plugin_options ,
& data - > protocol_version ,
& data - > publication_names ,
& data - > binary ) ;
& data - > binary ,
& enable_streaming ) ;
/* Check if we support requested protocol */
if ( data - > protocol_version > LOGICALREP_PROTO_VERSION_NUM )
@ -237,6 +289,27 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
( errcode ( ERRCODE_INVALID_PARAMETER_VALUE ) ,
errmsg ( " publication_names parameter missing " ) ) ) ;
/*
* Decide whether to enable streaming . It is disabled by default , in
* which case we just update the flag in decoding context . Otherwise
* we only allow it with sufficient version of the protocol , and when
* the output plugin supports it .
*/
if ( ! enable_streaming )
ctx - > streaming = false ;
else if ( data - > protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM )
ereport ( ERROR ,
( errcode ( ERRCODE_INVALID_PARAMETER_VALUE ) ,
errmsg ( " requested proto_version=%d does not support streaming, need %d or higher " ,
data - > protocol_version , LOGICALREP_PROTO_STREAM_VERSION_NUM ) ) ) ;
else if ( ! ctx - > streaming )
ereport ( ERROR ,
( errcode ( ERRCODE_INVALID_PARAMETER_VALUE ) ,
errmsg ( " streaming requested, but not supported by output plugin " ) ) ) ;
/* Also remember we're currently not streaming any transaction. */
in_streaming = false ;
/* Init publication state. */
data - > publications = NIL ;
publications_valid = false ;
@ -247,6 +320,11 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
/* Initialize relation schema cache. */
init_rel_sync_cache ( CacheMemoryContext ) ;
}
else
{
/* Disable the streaming during the slot initialization mode. */
ctx - > streaming = false ;
}
}
/*
@ -305,9 +383,47 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
*/
static void
maybe_send_schema ( LogicalDecodingContext * ctx ,
ReorderBufferTXN * txn , ReorderBufferChange * change ,
Relation relation , RelationSyncEntry * relentry )
{
if ( relentry - > schema_sent )
bool schema_sent ;
TransactionId xid = InvalidTransactionId ;
TransactionId topxid = InvalidTransactionId ;
/*
* Remember XID of the ( sub ) transaction for the change . We don ' t care if
* it ' s top - level transaction or not ( we have already sent that XID in
* start of the current streaming block ) .
*
* If we ' re not in a streaming block , just use InvalidTransactionId and
* the write methods will not include it .
*/
if ( in_streaming )
xid = change - > txn - > xid ;
if ( change - > txn - > toptxn )
topxid = change - > txn - > toptxn - > xid ;
else
topxid = xid ;
/*
* Do we need to send the schema ? We do track streamed transactions
* separately , because those may be applied later ( and the regular
* transactions won ' t see their effects until then ) and in an order that
* we don ' t know at this point .
*
* XXX There is a scope of optimization here . Currently , we always send
* the schema first time in a streaming transaction but we can probably
* avoid that by checking ' relentry - > schema_sent ' flag . However , before
* doing that we need to study its impact on the case where we have a mix
* of streaming and non - streaming transactions .
*/
if ( in_streaming )
schema_sent = get_schema_sent_in_streamed_txn ( relentry , topxid ) ;
else
schema_sent = relentry - > schema_sent ;
if ( schema_sent )
return ;
/* If needed, send the ancestor's schema first. */
@ -323,19 +439,24 @@ maybe_send_schema(LogicalDecodingContext *ctx,
relentry - > map = convert_tuples_by_name ( CreateTupleDescCopy ( indesc ) ,
CreateTupleDescCopy ( outdesc ) ) ;
MemoryContextSwitchTo ( oldctx ) ;
send_relation_and_attrs ( ancestor , ctx ) ;
send_relation_and_attrs ( ancestor , xid , ctx ) ;
RelationClose ( ancestor ) ;
}
send_relation_and_attrs ( relation , ctx ) ;
relentry - > schema_sent = true ;
send_relation_and_attrs ( relation , xid , ctx ) ;
if ( in_streaming )
set_schema_sent_in_streamed_txn ( relentry , topxid ) ;
else
relentry - > schema_sent = true ;
}
/*
* Sends a relation
*/
static void
send_relation_and_attrs ( Relation relation , LogicalDecodingContext * ctx )
send_relation_and_attrs ( Relation relation , TransactionId xid ,
LogicalDecodingContext * ctx )
{
TupleDesc desc = RelationGetDescr ( relation ) ;
int i ;
@ -359,17 +480,19 @@ send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx)
continue ;
OutputPluginPrepareWrite ( ctx , false ) ;
logicalrep_write_typ ( ctx - > out , att - > atttypid ) ;
logicalrep_write_typ ( ctx - > out , xid , att - > atttypid ) ;
OutputPluginWrite ( ctx , false ) ;
}
OutputPluginPrepareWrite ( ctx , false ) ;
logicalrep_write_rel ( ctx - > out , relation ) ;
logicalrep_write_rel ( ctx - > out , xid , relation ) ;
OutputPluginWrite ( ctx , false ) ;
}
/*
* Sends the decoded DML over wire .
*
* This is called both in streaming and non - streaming modes .
*/
static void
pgoutput_change ( LogicalDecodingContext * ctx , ReorderBufferTXN * txn ,
@ -378,10 +501,20 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
PGOutputData * data = ( PGOutputData * ) ctx - > output_plugin_private ;
MemoryContext old ;
RelationSyncEntry * relentry ;
TransactionId xid = InvalidTransactionId ;
if ( ! is_publishable_relation ( relation ) )
return ;
/*
* Remember the xid for the change in streaming mode . We need to send xid
* with each change in the streaming mode so that subscriber can make
* their association and on aborts , it can discard the corresponding
* changes .
*/
if ( in_streaming )
xid = change - > txn - > xid ;
relentry = get_rel_sync_entry ( data , RelationGetRelid ( relation ) ) ;
/* First check the table filter */
@ -406,7 +539,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo ( data - > context ) ;
maybe_send_schema ( ctx , relation , relentry ) ;
maybe_send_schema ( ctx , txn , change , relation , relentry ) ;
/* Send the data */
switch ( change - > action )
@ -426,7 +559,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
}
OutputPluginPrepareWrite ( ctx , true ) ;
logicalrep_write_insert ( ctx - > out , relation , tuple ,
logicalrep_write_insert ( ctx - > out , xid , relation , tuple ,
data - > binary ) ;
OutputPluginWrite ( ctx , true ) ;
break ;
@ -451,8 +584,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
}
OutputPluginPrepareWrite ( ctx , true ) ;
logicalrep_write_update ( ctx - > out , relation , oldtuple , new tuple ,
data - > binary ) ;
logicalrep_write_update ( ctx - > out , xid , relation , oldtuple ,
newtuple , data - > binary ) ;
OutputPluginWrite ( ctx , true ) ;
break ;
}
@ -472,7 +605,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
}
OutputPluginPrepareWrite ( ctx , true ) ;
logicalrep_write_delete ( ctx - > out , relation , oldtuple ,
logicalrep_write_delete ( ctx - > out , xid , relation , oldtuple ,
data - > binary ) ;
OutputPluginWrite ( ctx , true ) ;
}
@ -498,6 +631,11 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
int i ;
int nrelids ;
Oid * relids ;
TransactionId xid = InvalidTransactionId ;
/* Remember the xid for the change in streaming mode. See pgoutput_change. */
if ( in_streaming )
xid = change - > txn - > xid ;
old = MemoryContextSwitchTo ( data - > context ) ;
@ -526,13 +664,14 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
continue ;
relids [ nrelids + + ] = relid ;
maybe_send_schema ( ctx , relation , relentry ) ;
maybe_send_schema ( ctx , txn , change , relation , relentry ) ;
}
if ( nrelids > 0 )
{
OutputPluginPrepareWrite ( ctx , true ) ;
logicalrep_write_truncate ( ctx - > out ,
xid ,
nrelids ,
relids ,
change - > data . truncate . cascade ,
@ -605,6 +744,118 @@ publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
rel_sync_cache_publication_cb ( arg , cacheid , hashvalue ) ;
}
/*
* START STREAM callback
*/
static void
pgoutput_stream_start ( struct LogicalDecodingContext * ctx ,
ReorderBufferTXN * txn )
{
bool send_replication_origin = txn - > origin_id ! = InvalidRepOriginId ;
/* we can't nest streaming of transactions */
Assert ( ! in_streaming ) ;
/*
* If we already sent the first stream for this transaction then don ' t
* send the origin id in the subsequent streams .
*/
if ( rbtxn_is_streamed ( txn ) )
send_replication_origin = false ;
OutputPluginPrepareWrite ( ctx , ! send_replication_origin ) ;
logicalrep_write_stream_start ( ctx - > out , txn - > xid , ! rbtxn_is_streamed ( txn ) ) ;
if ( send_replication_origin )
{
char * origin ;
/* Message boundary */
OutputPluginWrite ( ctx , false ) ;
OutputPluginPrepareWrite ( ctx , true ) ;
if ( replorigin_by_oid ( txn - > origin_id , true , & origin ) )
logicalrep_write_origin ( ctx - > out , origin , InvalidXLogRecPtr ) ;
}
OutputPluginWrite ( ctx , true ) ;
/* we're streaming a chunk of transaction now */
in_streaming = true ;
}
/*
* STOP STREAM callback
*/
static void
pgoutput_stream_stop ( struct LogicalDecodingContext * ctx ,
ReorderBufferTXN * txn )
{
/* we should be streaming a trasanction */
Assert ( in_streaming ) ;
OutputPluginPrepareWrite ( ctx , true ) ;
logicalrep_write_stream_stop ( ctx - > out ) ;
OutputPluginWrite ( ctx , true ) ;
/* we've stopped streaming a transaction */
in_streaming = false ;
}
/*
* Notify downstream to discard the streamed transaction ( along with all
* it ' s subtransactions , if it ' s a toplevel transaction ) .
*/
static void
pgoutput_stream_abort ( struct LogicalDecodingContext * ctx ,
ReorderBufferTXN * txn ,
XLogRecPtr abort_lsn )
{
ReorderBufferTXN * toptxn ;
/*
* The abort should happen outside streaming block , even for streamed
* transactions . The transaction has to be marked as streamed , though .
*/
Assert ( ! in_streaming ) ;
/* determine the toplevel transaction */
toptxn = ( txn - > toptxn ) ? txn - > toptxn : txn ;
Assert ( rbtxn_is_streamed ( toptxn ) ) ;
OutputPluginPrepareWrite ( ctx , true ) ;
logicalrep_write_stream_abort ( ctx - > out , toptxn - > xid , txn - > xid ) ;
OutputPluginWrite ( ctx , true ) ;
cleanup_rel_sync_cache ( toptxn - > xid , false ) ;
}
/*
* Notify downstream to apply the streamed transaction ( along with all
* it ' s subtransactions ) .
*/
static void
pgoutput_stream_commit ( struct LogicalDecodingContext * ctx ,
ReorderBufferTXN * txn ,
XLogRecPtr commit_lsn )
{
/*
* The commit should happen outside streaming block , even for streamed
* transactions . The transaction has to be marked as streamed , though .
*/
Assert ( ! in_streaming ) ;
Assert ( rbtxn_is_streamed ( txn ) ) ;
OutputPluginUpdateProgress ( ctx ) ;
OutputPluginPrepareWrite ( ctx , true ) ;
logicalrep_write_stream_commit ( ctx - > out , txn , commit_lsn ) ;
OutputPluginWrite ( ctx , true ) ;
cleanup_rel_sync_cache ( txn - > xid , true ) ;
}
/*
* Initialize the relation schema sync cache for a decoding session .
*
@ -641,6 +892,39 @@ init_rel_sync_cache(MemoryContext cachectx)
( Datum ) 0 ) ;
}
/*
* We expect relatively small number of streamed transactions .
*/
static bool
get_schema_sent_in_streamed_txn ( RelationSyncEntry * entry , TransactionId xid )
{
ListCell * lc ;
foreach ( lc , entry - > streamed_txns )
{
if ( xid = = ( uint32 ) lfirst_int ( lc ) )
return true ;
}
return false ;
}
/*
* Add the xid in the rel sync entry for which we have already sent the schema
* of the relation .
*/
static void
set_schema_sent_in_streamed_txn ( RelationSyncEntry * entry , TransactionId xid )
{
MemoryContext oldctx ;
oldctx = MemoryContextSwitchTo ( CacheMemoryContext ) ;
entry - > streamed_txns = lappend_int ( entry - > streamed_txns , xid ) ;
MemoryContextSwitchTo ( oldctx ) ;
}
/*
* Find or create entry in the relation schema cache .
*
@ -771,11 +1055,58 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
}
if ( ! found )
{
entry - > schema_sent = false ;
entry - > streamed_txns = NULL ;
}
return entry ;
}
/*
* Cleanup list of streamed transactions and update the schema_sent flag .
*
* When a streamed transaction commits or aborts , we need to remove the
* toplevel XID from the schema cache . If the transaction aborted , the
* subscriber will simply throw away the schema records we streamed , so
* we don ' t need to do anything else .
*
* If the transaction is committed , the subscriber will update the relation
* cache - so tweak the schema_sent flag accordingly .
*/
static void
cleanup_rel_sync_cache ( TransactionId xid , bool is_commit )
{
HASH_SEQ_STATUS hash_seq ;
RelationSyncEntry * entry ;
ListCell * lc ;
Assert ( RelationSyncCache ! = NULL ) ;
hash_seq_init ( & hash_seq , RelationSyncCache ) ;
while ( ( entry = hash_seq_search ( & hash_seq ) ) ! = NULL )
{
/*
* We can set the schema_sent flag for an entry that has committed xid
* in the list as that ensures that the subscriber would have the
* corresponding schema and we don ' t need to send it unless there is
* any invalidation for that relation .
*/
foreach ( lc , entry - > streamed_txns )
{
if ( xid = = ( uint32 ) lfirst_int ( lc ) )
{
if ( is_commit )
entry - > schema_sent = true ;
entry - > streamed_txns =
foreach_delete_current ( entry - > streamed_txns , lc ) ;
break ;
}
}
}
}
/*
* Relcache invalidation callback
*/
@ -811,7 +1142,11 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
* Reset schema sent status as the relation definition may have changed .
*/
if ( entry ! = NULL )
{
entry - > schema_sent = false ;
list_free ( entry - > streamed_txns ) ;
entry - > streamed_txns = NULL ;
}
}
/*