@ -135,6 +135,7 @@
# include "storage/sinval.h"
# include "tcop/tcopprot.h"
# include "utils/builtins.h"
# include "utils/hashutils.h"
# include "utils/memutils.h"
# include "utils/ps_status.h"
# include "utils/snapmgr.h"
@ -323,14 +324,25 @@ static List *upperPendingActions = NIL; /* list of upper-xact lists */
/*
* State for outbound notifies consists of a list of all channels + payloads
* NOTIFYed in the current transaction . We do not actually perform a NOTIFY
* until and unless the transaction commits . pendingNotifies is NIL if no
* NOTIFYs have been done in the current transaction .
* NOTIFYed in the current transaction . We do not actually perform a NOTIFY
* until and unless the transaction commits . pendingNotifies is NULL if no
* NOTIFYs have been done in the current ( sub ) transaction .
*
* We discard duplicate notify events issued in the same transaction .
* Hence , in addition to the list proper ( which we need to track the order
* of the events , since we guarantee to deliver them in order ) , we build a
* hash table which we can probe to detect duplicates . Since building the
* hash table is somewhat expensive , we do so only once we have at least
* MIN_HASHABLE_NOTIFIES events queued in the current ( sub ) transaction ;
* before that we just scan the events linearly .
*
* The list is kept in CurTransactionContext . In subtransactions , each
* subtransaction has its own list in its own CurTransactionContext , but
* successful subtransactions attach their lists to their parent ' s list .
* Failed subtransactions simply discard their lists .
* successful subtransactions add their entries to their parent ' s list .
* Failed subtransactions simply discard their lists . Since these lists
* are independent , there may be notify events in a subtransaction ' s list
* that duplicate events in some ancestor ( sub ) transaction ; we get rid of
* the dups when merging the subtransaction ' s list into its parent ' s .
*
* Note : the action and notify lists do not interact within a transaction .
* In particular , if a transaction does NOTIFY and then LISTEN on the same
@ -339,11 +351,26 @@ static List *upperPendingActions = NIL; /* list of upper-xact lists */
*/
typedef struct Notification
{
char * channel ; /* channel name */
char * payload ; /* payload string (can be empty) */
uint16 channel_len ; /* length of channel-name string */
uint16 payload_len ; /* length of payload string */
/* null-terminated channel name, then null-terminated payload follow */
char data [ FLEXIBLE_ARRAY_MEMBER ] ;
} Notification ;
static List * pendingNotifies = NIL ; /* list of Notifications */
typedef struct NotificationList
{
List * events ; /* list of Notification structs */
HTAB * hashtab ; /* hash of NotificationHash structs, or NULL */
} NotificationList ;
# define MIN_HASHABLE_NOTIFIES 16 /* threshold to build hashtab */
typedef struct NotificationHash
{
Notification * event ; /* => the actual Notification struct */
} NotificationHash ;
static NotificationList * pendingNotifies = NULL ; /* current list, if any */
static List * upperPendingNotifies = NIL ; /* list of upper-xact lists */
@ -392,7 +419,10 @@ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
Snapshot snapshot ) ;
static void asyncQueueAdvanceTail ( void ) ;
static void ProcessIncomingNotify ( void ) ;
static bool AsyncExistsPendingNotify ( const char * channel , const char * payload ) ;
static bool AsyncExistsPendingNotify ( Notification * n ) ;
static void AddEventToPendingNotifies ( Notification * n ) ;
static uint32 notification_hash ( const void * key , Size keysize ) ;
static int notification_match ( const void * key1 , const void * key2 , Size keysize ) ;
static void ClearPendingActionsAndNotifies ( void ) ;
/*
@ -541,6 +571,8 @@ pg_notify(PG_FUNCTION_ARGS)
void
Async_Notify ( const char * channel , const char * payload )
{
size_t channel_len ;
size_t payload_len ;
Notification * n ;
MemoryContext oldcontext ;
@ -550,47 +582,67 @@ Async_Notify(const char *channel, const char *payload)
if ( Trace_notify )
elog ( DEBUG1 , " Async_Notify(%s) " , channel ) ;
channel_len = channel ? strlen ( channel ) : 0 ;
payload_len = payload ? strlen ( payload ) : 0 ;
/* a channel name must be specified */
if ( ! channel | | ! strlen ( channel ) )
if ( channel_len = = 0 )
ereport ( ERROR ,
( errcode ( ERRCODE_INVALID_PARAMETER_VALUE ) ,
errmsg ( " channel name cannot be empty " ) ) ) ;
if ( strlen ( channel ) > = NAMEDATALEN )
/* enforce length limits */
if ( channel_len > = NAMEDATALEN )
ereport ( ERROR ,
( errcode ( ERRCODE_INVALID_PARAMETER_VALUE ) ,
errmsg ( " channel name too long " ) ) ) ;
if ( payload )
{
if ( strlen ( payload ) > = NOTIFY_PAYLOAD_MAX_LENGTH )
ereport ( ERROR ,
( errcode ( ERRCODE_INVALID_PARAMETER_VALUE ) ,
errmsg ( " payload string too long " ) ) ) ;
}
/* no point in making duplicate entries in the list ... */
if ( AsyncExistsPendingNotify ( channel , payload ) )
return ;
if ( payload_len > = NOTIFY_PAYLOAD_MAX_LENGTH )
ereport ( ERROR ,
( errcode ( ERRCODE_INVALID_PARAMETER_VALUE ) ,
errmsg ( " payload string too long " ) ) ) ;
/*
* We must construct the Notification entry , even if we end up not using
* it , in order to compare it cheaply to existing list entries .
*
* The notification list needs to live until end of transaction , so store
* it in the transaction context .
*/
oldcontext = MemoryContextSwitchTo ( CurTransactionContext ) ;
n = ( Notification * ) palloc ( sizeof ( Notification ) ) ;
n - > channel = pstrdup ( channel ) ;
n = ( Notification * ) palloc ( offsetof ( Notification , data ) +
channel_len + payload_len + 2 ) ;
n - > channel_len = channel_len ;
n - > payload_len = payload_len ;
strcpy ( n - > data , channel ) ;
if ( payload )
n - > payload = pstrdup ( payload ) ;
strcpy ( n - > data + channel_len + 1 , payload ) ;
else
n - > payload = " " ;
n - > data [ channel_len + 1 ] = ' \0 ' ;
/*
* We want to preserve the order so we need to append every notification .
* See comments at AsyncExistsPendingNotify ( ) .
*/
pendingNotifies = lappend ( pendingNotifies , n ) ;
/* Now check for duplicates */
if ( AsyncExistsPendingNotify ( n ) )
{
/* It's a dup, so forget it */
pfree ( n ) ;
MemoryContextSwitchTo ( oldcontext ) ;
return ;
}
if ( pendingNotifies = = NULL )
{
/* First notify event in current (sub)xact */
pendingNotifies = ( NotificationList * ) palloc ( sizeof ( NotificationList ) ) ;
pendingNotifies - > events = list_make1 ( n ) ;
/* We certainly don't need a hashtable yet */
pendingNotifies - > hashtab = NULL ;
}
else
{
/* Append more events to existing list */
AddEventToPendingNotifies ( n ) ;
}
MemoryContextSwitchTo ( oldcontext ) ;
}
@ -761,7 +813,7 @@ PreCommit_Notify(void)
{
ListCell * p ;
if ( pendingActions = = NIL & & pendingNotifies = = NIL )
if ( ! pendingActions & & ! pendingNotifies )
return ; /* no relevant statements in this xact */
if ( Trace_notify )
@ -821,7 +873,7 @@ PreCommit_Notify(void)
/* Now push the notifications into the queue */
backendHasSentNotifications = true ;
nextNotify = list_head ( pendingNotifies ) ;
nextNotify = list_head ( pendingNotifies - > events ) ;
while ( nextNotify ! = NULL )
{
/*
@ -1267,8 +1319,8 @@ asyncQueueAdvance(volatile QueuePosition *position, int entryLength)
static void
asyncQueueNotificationToEntry ( Notification * n , AsyncQueueEntry * qe )
{
size_t channellen = strlen ( n - > channel ) ;
size_t payloadlen = strlen ( n - > payload ) ;
size_t channellen = n - > channel_len ;
size_t payloadlen = n - > payload_len ;
int entryLength ;
Assert ( channellen < NAMEDATALEN ) ;
@ -1281,8 +1333,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
qe - > dboid = MyDatabaseId ;
qe - > xid = GetCurrentTransactionId ( ) ;
qe - > srcPid = MyProcPid ;
memcpy ( qe - > data , n - > channel , channellen + 1 ) ;
memcpy ( qe - > data + channellen + 1 , n - > payload , payloadlen + 1 ) ;
memcpy ( qe - > data , n - > data , channellen + payloadlen + 2 ) ;
}
/*
@ -1294,7 +1345,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
* database OID in order to fill the page . So every page is always used up to
* the last byte which simplifies reading the page later .
*
* We are passed the list cell ( in pendingNotifies ) containing the next
* We are passed the list cell ( in pendingNotifies - > events ) containing the next
* notification to write and return the first still - unwritten cell back .
* Eventually we will return NULL indicating all is done .
*
@ -1345,7 +1396,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
if ( offset + qe . length < = QUEUE_PAGESIZE )
{
/* OK, so advance nextNotify past this item */
nextNotify = lnext ( pendingNotifies , nextNotify ) ;
nextNotify = lnext ( pendingNotifies - > events , nextNotify ) ;
}
else
{
@ -1607,7 +1658,7 @@ AtSubStart_Notify(void)
Assert ( list_length ( upperPendingNotifies ) = =
GetCurrentTransactionNestLevel ( ) - 1 ) ;
pendingNotifies = NI L;
pendingNotifies = NUL L;
MemoryContextSwitchTo ( old_cxt ) ;
}
@ -1621,7 +1672,7 @@ void
AtSubCommit_Notify ( void )
{
List * parentPendingActions ;
List * parentPendingNotifies ;
Notification List * parentPendingNotifies ;
parentPendingActions = linitial_node ( List , upperPendingActions ) ;
upperPendingActions = list_delete_first ( upperPendingActions ) ;
@ -1634,16 +1685,41 @@ AtSubCommit_Notify(void)
*/
pendingActions = list_concat ( parentPendingActions , pendingActions ) ;
parentPendingNotifies = linitial_node ( List , upperPendingNotifies ) ;
parentPendingNotifies = ( NotificationList * ) linitial ( upperPendingNotifies ) ;
upperPendingNotifies = list_delete_first ( upperPendingNotifies ) ;
Assert ( list_length ( upperPendingNotifies ) = =
GetCurrentTransactionNestLevel ( ) - 2 ) ;
/*
* We could try to eliminate duplicates here , but it seems not worthwhile .
*/
pendingNotifies = list_concat ( parentPendingNotifies , pendingNotifies ) ;
if ( pendingNotifies = = NULL )
{
/* easy, no notify events happened in current subxact */
pendingNotifies = parentPendingNotifies ;
}
else if ( parentPendingNotifies = = NULL )
{
/* easy, subxact's list becomes parent's */
}
else
{
/*
* Formerly , we didn ' t bother to eliminate duplicates here , but now we
* must , else we fall foul of " Assert(!found) " , either here or during
* a later attempt to build the parent - level hashtable .
*/
NotificationList * childPendingNotifies = pendingNotifies ;
ListCell * l ;
pendingNotifies = parentPendingNotifies ;
/* Insert all the subxact's events into parent, except for dups */
foreach ( l , childPendingNotifies - > events )
{
Notification * childn = ( Notification * ) lfirst ( l ) ;
if ( ! AsyncExistsPendingNotify ( childn ) )
AddEventToPendingNotifies ( childn ) ;
}
}
}
/*
@ -1672,7 +1748,7 @@ AtSubAbort_Notify(void)
while ( list_length ( upperPendingNotifies ) > my_level - 2 )
{
pendingNotifies = linitial_node ( List , upperPendingNotifies ) ;
pendingNotifies = ( NotificationList * ) linitial ( upperPendingNotifies ) ;
upperPendingNotifies = list_delete_first ( upperPendingNotifies ) ;
}
}
@ -2098,52 +2174,139 @@ NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
elog ( INFO , " NOTIFY for \" %s \" payload \" %s \" " , channel , payload ) ;
}
/* Does pendingNotifies include the given channel/payload ? */
/* Does pendingNotifies include a match for the given event ? */
static bool
AsyncExistsPendingNotify ( const char * channel , const char * payload )
AsyncExistsPendingNotify ( Notification * n )
{
ListCell * p ;
Notification * n ;
if ( pendingNotifies = = NIL )
if ( pendingNotifies = = NULL )
return false ;
if ( payload = = NULL )
payload = " " ;
if ( pendingNotifies - > hashtab ! = NULL )
{
/* Use the hash table to probe for a match */
if ( hash_search ( pendingNotifies - > hashtab ,
& n ,
HASH_FIND ,
NULL ) )
return true ;
}
else
{
/* Must scan the event list */
ListCell * l ;
/*----------
* We need to append new elements to the end of the list in order to keep
* the order . However , on the other hand we ' d like to check the list
* backwards in order to make duplicate - elimination a tad faster when the
* same condition is signaled many times in a row . So as a compromise we
* check the tail element first which we can access directly . If this
* doesn ' t match , we check the whole list .
*
* As we are not checking our parents ' lists , we can still get duplicates
* in combination with subtransactions , like in :
*
* begin ;
* notify foo ' 1 ' ;
* savepoint foo ;
* notify foo ' 1 ' ;
* commit ;
* - - - - - - - - - -
*/
n = ( Notification * ) llast ( pendingNotifies ) ;
if ( strcmp ( n - > channel , channel ) = = 0 & &
strcmp ( n - > payload , payload ) = = 0 )
return true ;
foreach ( l , pendingNotifies - > events )
{
Notification * oldn = ( Notification * ) lfirst ( l ) ;
if ( n - > channel_len = = oldn - > channel_len & &
n - > payload_len = = oldn - > payload_len & &
memcmp ( n - > data , oldn - > data ,
n - > channel_len + n - > payload_len + 2 ) = = 0 )
return true ;
}
}
return false ;
}
/*
* Add a notification event to a pre - existing pendingNotifies list .
*
* Because pendingNotifies - > events is already nonempty , this works
* correctly no matter what CurrentMemoryContext is .
*/
static void
AddEventToPendingNotifies ( Notification * n )
{
Assert ( pendingNotifies - > events ! = NIL ) ;
foreach ( p , pendingNotifies )
/* Create the hash table if it's time to */
if ( list_length ( pendingNotifies - > events ) > = MIN_HASHABLE_NOTIFIES & &
pendingNotifies - > hashtab = = NULL )
{
n = ( Notification * ) lfirst ( p ) ;
HASHCTL hash_ctl ;
ListCell * l ;
/* Create the hash table */
MemSet ( & hash_ctl , 0 , sizeof ( hash_ctl ) ) ;
hash_ctl . keysize = sizeof ( Notification * ) ;
hash_ctl . entrysize = sizeof ( NotificationHash ) ;
hash_ctl . hash = notification_hash ;
hash_ctl . match = notification_match ;
hash_ctl . hcxt = CurTransactionContext ;
pendingNotifies - > hashtab =
hash_create ( " Pending Notifies " ,
256L ,
& hash_ctl ,
HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_CONTEXT ) ;
/* Insert all the already-existing events */
foreach ( l , pendingNotifies - > events )
{
Notification * oldn = ( Notification * ) lfirst ( l ) ;
NotificationHash * hentry ;
bool found ;
hentry = ( NotificationHash * ) hash_search ( pendingNotifies - > hashtab ,
& oldn ,
HASH_ENTER ,
& found ) ;
Assert ( ! found ) ;
hentry - > event = oldn ;
}
}
if ( strcmp ( n - > channel , channel ) = = 0 & &
strcmp ( n - > payload , payload ) = = 0 )
return true ;
/* Add new event to the list, in order */
pendingNotifies - > events = lappend ( pendingNotifies - > events , n ) ;
/* Add event to the hash table if needed */
if ( pendingNotifies - > hashtab ! = NULL )
{
NotificationHash * hentry ;
bool found ;
hentry = ( NotificationHash * ) hash_search ( pendingNotifies - > hashtab ,
& n ,
HASH_ENTER ,
& found ) ;
Assert ( ! found ) ;
hentry - > event = n ;
}
}
return false ;
/*
* notification_hash : hash function for notification hash table
*
* The hash " keys " are pointers to Notification structs .
*/
static uint32
notification_hash ( const void * key , Size keysize )
{
const Notification * k = * ( const Notification * const * ) key ;
Assert ( keysize = = sizeof ( Notification * ) ) ;
/* We don't bother to include the payload's trailing null in the hash */
return DatumGetUInt32 ( hash_any ( ( const unsigned char * ) k - > data ,
k - > channel_len + k - > payload_len + 1 ) ) ;
}
/*
* notification_match : match function to use with notification_hash
*/
static int
notification_match ( const void * key1 , const void * key2 , Size keysize )
{
const Notification * k1 = * ( const Notification * const * ) key1 ;
const Notification * k2 = * ( const Notification * const * ) key2 ;
Assert ( keysize = = sizeof ( Notification * ) ) ;
if ( k1 - > channel_len = = k2 - > channel_len & &
k1 - > payload_len = = k2 - > payload_len & &
memcmp ( k1 - > data , k2 - > data ,
k1 - > channel_len + k1 - > payload_len + 2 ) = = 0 )
return 0 ; /* equal */
return 1 ; /* not equal */
}
/* Clear the pendingActions and pendingNotifies lists. */
@ -2158,5 +2321,5 @@ ClearPendingActionsAndNotifies(void)
* pointers .
*/
pendingActions = NIL ;
pendingNotifies = NI L;
pendingNotifies = NUL L;
}