@ -108,11 +108,13 @@ typedef struct RelationSyncEntry
{
Oid relid ; /* relation oid */
bool replicate_valid ; /* overall validity flag for entry */
bool schema_sent ;
List * streamed_txns ; /* streamed toplevel transactions with this
* schema */
bool replicate_valid ;
/* are we publishing this rel? */
PublicationActions pubactions ;
/*
@ -903,7 +905,9 @@ LoadPublications(List *pubnames)
}
/*
* Publication cache invalidation callback .
* Publication syscache invalidation callback .
*
* Called for invalidations on pg_publication .
*/
static void
publication_invalidation_cb ( Datum arg , int cacheid , uint32 hashvalue )
@ -1130,13 +1134,12 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
HASH_ENTER , & found ) ;
Assert ( entry ! = NULL ) ;
/* Not found means schema wasn't sent */
/* initialize entry, if it's new */
if ( ! found )
{
/* immediately make a new entry valid enough to satisfy callbacks */
entry - > replicate_valid = false ;
entry - > schema_sent = false ;
entry - > streamed_txns = NIL ;
entry - > replicate_valid = false ;
entry - > pubactions . pubinsert = entry - > pubactions . pubupdate =
entry - > pubactions . pubdelete = entry - > pubactions . pubtruncate = false ;
entry - > publish_as_relid = InvalidOid ;
@ -1166,13 +1169,40 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
{
oldctx = MemoryContextSwitchTo ( CacheMemoryContext ) ;
if ( data - > publications )
{
list_free_deep ( data - > publications ) ;
data - > publications = NIL ;
}
data - > publications = LoadPublications ( data - > publication_names ) ;
MemoryContextSwitchTo ( oldctx ) ;
publications_valid = true ;
}
/*
* Reset schema_sent status as the relation definition may have
* changed . Also reset pubactions to empty in case rel was dropped
* from a publication . Also free any objects that depended on the
* earlier definition .
*/
entry - > schema_sent = false ;
list_free ( entry - > streamed_txns ) ;
entry - > streamed_txns = NIL ;
entry - > pubactions . pubinsert = false ;
entry - > pubactions . pubupdate = false ;
entry - > pubactions . pubdelete = false ;
entry - > pubactions . pubtruncate = false ;
if ( entry - > map )
{
/*
* Must free the TupleDescs contained in the map explicitly ,
* because free_conversion_map ( ) doesn ' t .
*/
FreeTupleDesc ( entry - > map - > indesc ) ;
FreeTupleDesc ( entry - > map - > outdesc ) ;
free_conversion_map ( entry - > map ) ;
}
entry - > map = NULL ;
/*
* Build publication cache . We can ' t use one provided by relcache as
* relcache considers all publications given relation is in , but here
@ -1212,16 +1242,18 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
foreach ( lc2 , ancestors )
{
Oid ancestor = lfirst_oid ( lc2 ) ;
List * apubids = GetRelationPublications ( ancestor ) ;
List * aschemaPubids = GetSchemaPublications ( get_rel_namespace ( ancestor ) ) ;
if ( list_member_oid ( GetRelationPublications ( ancestor ) ,
pub - > oid ) | |
list_member_oid ( GetSchemaPublications ( get_rel_namespace ( ancestor ) ) ,
pub - > oid ) )
if ( list_member_oid ( apubids , pub - > oid ) | |
list_member_oid ( aschemaPubids , pub - > oid ) )
{
ancestor_published = true ;
if ( pub - > pubviaroot )
publish_as_relid = ancestor ;
}
list_free ( apubids ) ;
list_free ( aschemaPubids ) ;
}
}
@ -1251,6 +1283,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
}
list_free ( pubids ) ;
list_free ( schemaPubids ) ;
entry - > publish_as_relid = publish_as_relid ;
entry - > replicate_valid = true ;
@ -1322,43 +1355,40 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
/*
* Nobody keeps pointers to entries in this hash table around outside
* logical decoding callback calls - but invalidation events can come in
* * during * a callback if we access the relcache in the callback . Because
* of that we must mark the cache entry as invalid but not remove it from
* the hash while it could still be referenced , then prune it at a later
* safe point .
*
* Getting invalidations for relations that aren ' t in the table is
* entirely normal , since there ' s no way to unregister for an invalidation
* event . So we don ' t care if it ' s found or not .
* * during * a callback if we do any syscache access in the callback .
* Because of that we must mark the cache entry as invalid but not damage
* any of its substructure here . The next get_rel_sync_entry ( ) call will
* rebuild it all .
*/
entry = ( RelationSyncEntry * ) hash_search ( RelationSyncCache , & relid ,
HASH_FIND , NULL ) ;
/*
* Reset schema sent status as the relation definition may have changed .
* Also free any objects that depended on the earlier definition .
*/
if ( entry ! = NULL )
if ( OidIsValid ( relid ) )
{
entry - > schema_sent = false ;
list_free ( entry - > streamed_txns ) ;
entry - > streamed_txns = NIL ;
if ( entry - > map )
/*
* Getting invalidations for relations that aren ' t in the table is
* entirely normal . So we don ' t care if it ' s found or not .
*/
entry = ( RelationSyncEntry * ) hash_search ( RelationSyncCache , & relid ,
HASH_FIND , NULL ) ;
if ( entry ! = NULL )
entry - > replicate_valid = false ;
}
else
{
/* Whole cache must be flushed. */
HASH_SEQ_STATUS status ;
hash_seq_init ( & status , RelationSyncCache ) ;
while ( ( entry = ( RelationSyncEntry * ) hash_seq_search ( & status ) ) ! = NULL )
{
/*
* Must free the TupleDescs contained in the map explicitly ,
* because free_conversion_map ( ) doesn ' t .
*/
FreeTupleDesc ( entry - > map - > indesc ) ;
FreeTupleDesc ( entry - > map - > outdesc ) ;
free_conversion_map ( entry - > map ) ;
entry - > replicate_valid = false ;
}
entry - > map = NULL ;
}
}
/*
* Publication relation / schema map syscache invalidation callback
*
* Called for invalidations on pg_publication , pg_publication_rel , and
* pg_publication_namespace .
*/
static void
rel_sync_cache_publication_cb ( Datum arg , int cacheid , uint32 hashvalue )
@ -1382,15 +1412,6 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
while ( ( entry = ( RelationSyncEntry * ) hash_seq_search ( & status ) ) ! = NULL )
{
entry - > replicate_valid = false ;
/*
* There might be some relations dropped from the publication so we
* don ' t need to publish the changes for them .
*/
entry - > pubactions . pubinsert = false ;
entry - > pubactions . pubupdate = false ;
entry - > pubactions . pubdelete = false ;
entry - > pubactions . pubtruncate = false ;
}
}