@ -271,6 +271,7 @@ int debug_discard_caches = 0;
# define MAX_SYSCACHE_CALLBACKS 64
# define MAX_RELCACHE_CALLBACKS 10
# define MAX_RELSYNC_CALLBACKS 10
static struct SYSCACHECALLBACK
{
@ -292,6 +293,15 @@ static struct RELCACHECALLBACK
static int relcache_callback_count = 0 ;
static struct RELSYNCCALLBACK
{
RelSyncCallbackFunction function ;
Datum arg ;
} relsync_callback_list [ MAX_RELSYNC_CALLBACKS ] ;
static int relsync_callback_count = 0 ;
/* ----------------------------------------------------------------
* Invalidation subgroup support functions
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
@ -484,6 +494,36 @@ AddRelcacheInvalidationMessage(InvalidationMsgsGroup *group,
AddInvalidationMessage ( group , RelCacheMsgs , & msg ) ;
}
/*
* Add a relsync inval entry
*
* We put these into the relcache subgroup for simplicity . This message is the
* same as AddRelcacheInvalidationMessage ( ) except that it is for
* RelationSyncCache maintained by decoding plugin pgoutput .
*/
static void
AddRelsyncInvalidationMessage ( InvalidationMsgsGroup * group ,
Oid dbId , Oid relId )
{
SharedInvalidationMessage msg ;
/* Don't add a duplicate item. */
ProcessMessageSubGroup ( group , RelCacheMsgs ,
if ( msg - > rc . id = = SHAREDINVALRELSYNC_ID & &
( msg - > rc . relId = = relId | |
msg - > rc . relId = = InvalidOid ) )
return ) ;
/* OK, add the item */
msg . rc . id = SHAREDINVALRELSYNC_ID ;
msg . rc . dbId = dbId ;
msg . rc . relId = relId ;
/* check AddCatcacheInvalidationMessage() for an explanation */
VALGRIND_MAKE_MEM_DEFINED ( & msg , sizeof ( msg ) ) ;
AddInvalidationMessage ( group , RelCacheMsgs , & msg ) ;
}
/*
* Add a snapshot inval entry
*
@ -611,6 +651,17 @@ RegisterRelcacheInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
info - > RelcacheInitFileInval = true ;
}
/*
* RegisterRelsyncInvalidation
*
* As above , but register a relsynccache invalidation event .
*/
static void
RegisterRelsyncInvalidation ( InvalidationInfo * info , Oid dbId , Oid relId )
{
AddRelsyncInvalidationMessage ( & info - > CurrentCmdInvalidMsgs , dbId , relId ) ;
}
/*
* RegisterSnapshotInvalidation
*
@ -751,6 +802,13 @@ InvalidateSystemCachesExtended(bool debug_discard)
ccitem - > function ( ccitem - > arg , InvalidOid ) ;
}
for ( i = 0 ; i < relsync_callback_count ; i + + )
{
struct RELSYNCCALLBACK * ccitem = relsync_callback_list + i ;
ccitem - > function ( ccitem - > arg , InvalidOid ) ;
}
}
/*
@ -832,6 +890,12 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
else if ( msg - > sn . dbId = = MyDatabaseId )
InvalidateCatalogSnapshot ( ) ;
}
else if ( msg - > id = = SHAREDINVALRELSYNC_ID )
{
/* We only care about our own database */
if ( msg - > rs . dbId = = MyDatabaseId )
CallRelSyncCallbacks ( msg - > rs . relid ) ;
}
else
elog ( FATAL , " unrecognized SI message ID: %d " , msg - > id ) ;
}
@ -1621,6 +1685,32 @@ CacheInvalidateRelcacheByRelid(Oid relid)
ReleaseSysCache ( tup ) ;
}
/*
* CacheInvalidateRelSync
* Register invalidation of the cache in logical decoding output plugin
* for a database .
*
* This type of invalidation message is used for the specific purpose of output
* plugins . Processes which do not decode WALs would do nothing even when it
* receives the message .
*/
void
CacheInvalidateRelSync ( Oid relid )
{
RegisterRelsyncInvalidation ( PrepareInvalidationState ( ) ,
MyDatabaseId , relid ) ;
}
/*
* CacheInvalidateRelSyncAll
* Register invalidation of the whole cache in logical decoding output
* plugin .
*/
void
CacheInvalidateRelSyncAll ( void )
{
CacheInvalidateRelSync ( InvalidOid ) ;
}
/*
* CacheInvalidateSmgr
@ -1763,6 +1853,27 @@ CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
+ + relcache_callback_count ;
}
/*
* CacheRegisterRelSyncCallback
* Register the specified function to be called for all future
* relsynccache invalidation events .
*
* This function is intended to be call from the logical decoding output
* plugins .
*/
void
CacheRegisterRelSyncCallback ( RelSyncCallbackFunction func ,
Datum arg )
{
if ( relsync_callback_count > = MAX_RELSYNC_CALLBACKS )
elog ( FATAL , " out of relsync_callback_list slots " ) ;
relsync_callback_list [ relsync_callback_count ] . function = func ;
relsync_callback_list [ relsync_callback_count ] . arg = arg ;
+ + relsync_callback_count ;
}
/*
* CallSyscacheCallbacks
*
@ -1788,6 +1899,20 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue)
}
}
/*
* CallSyscacheCallbacks
*/
void
CallRelSyncCallbacks ( Oid relid )
{
for ( int i = 0 ; i < relsync_callback_count ; i + + )
{
struct RELSYNCCALLBACK * ccitem = relsync_callback_list + i ;
ccitem - > function ( ccitem - > arg , relid ) ;
}
}
/*
* LogLogicalInvalidations
*