@ -7,7 +7,7 @@
* Portions Copyright ( c ) 1994 , Regents of the University of California
*
* IDENTIFICATION
* $ PostgreSQL : pgsql / src / backend / commands / async . c , v 1.127 .2 .1 2005 / 11 / 22 18 : 23 : 06 momjian Exp $
* $ PostgreSQL : pgsql / src / backend / commands / async . c , v 1.127 .2 .2 2008 / 03 / 12 20 : 12 : 14 tgl Exp $
*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
@ -52,6 +52,16 @@
* transaction , since by assumption it is only called from outside any
* transaction .
*
* Like NOTIFY , LISTEN and UNLISTEN just add the desired action to a list
* of pending actions . If we reach transaction commit , the changes are
* applied to pg_listener just before executing any pending NOTIFYs . This
* method is necessary because to avoid race conditions , we must hold lock
* on pg_listener from when we insert a new listener tuple until we commit .
* To do that and not create undue hazard of deadlock , we don ' t want to
* touch pg_listener until we are otherwise done with the transaction ;
* in particular it ' d be uncool to still be taking user - commanded locks
* while holding the pg_listener lock .
*
* Although we grab ExclusiveLock on pg_listener for any operation ,
* the lock is never held very long , so it shouldn ' t cause too much of
* a performance problem . ( Previously we used AccessExclusiveLock , but
@ -75,7 +85,6 @@
# include <unistd.h>
# include <signal.h>
# include <netinet/in.h>
# include "access/heapam.h"
# include "access/twophase_rmgr.h"
@ -87,12 +96,39 @@
# include "storage/ipc.h"
# include "storage/sinval.h"
# include "tcop/tcopprot.h"
# include "utils/builtins.h"
# include "utils/fmgroids.h"
# include "utils/memutils.h"
# include "utils/ps_status.h"
# include "utils/syscache.h"
/*
* State for pending LISTEN / UNLISTEN actions consists of an ordered list of
* all actions requested in the current transaction . As explained above ,
* we don ' t actually modify pg_listener until we reach transaction commit .
*
* The list is kept in CurTransactionContext . In subtransactions , each
* subtransaction has its own list in its own CurTransactionContext , but
* successful subtransactions attach their lists to their parent ' s list .
* Failed subtransactions simply discard their lists .
*/
typedef enum
{
LISTEN_LISTEN ,
LISTEN_UNLISTEN ,
LISTEN_UNLISTEN_ALL
} ListenActionKind ;
typedef struct
{
ListenActionKind action ;
char condname [ 1 ] ; /* actually, as long as needed */
} ListenAction ;
static List * pendingActions = NIL ; /* list of ListenAction */
static List * upperPendingActions = NIL ; /* list of upper-xact lists */
/*
* State for outbound notifies consists of a list of all relnames NOTIFYed
* in the current transaction . We do not actually perform a NOTIFY until
@ -103,8 +139,13 @@
* subtransaction has its own list in its own CurTransactionContext , but
* successful subtransactions attach their lists to their parent ' s list .
* Failed subtransactions simply discard their lists .
*
* Note : the action and notify lists do not interact within a transaction .
* In particular , if a transaction does NOTIFY and then LISTEN on the same
* condition name , it will get a self - notify at commit . This is a bit odd
* but is consistent with our historical behavior .
*/
static List * pendingNotifies = NIL ;
static List * pendingNotifies = NIL ; /* list of C strings */
static List * upperPendingNotifies = NIL ; /* list of upper-xact lists */
@ -118,8 +159,8 @@ static List *upperPendingNotifies = NIL; /* list of upper-xact lists */
* does not grok " volatile " , you ' d be best advised to compile this file
* with all optimization turned off .
*/
static volatile in t notifyInterruptEnabled = 0 ;
static volatile in t notifyInterruptOccurred = 0 ;
static volatile sig_atomic_ t notifyInterruptEnabled = 0 ;
static volatile sig_atomic_ t notifyInterruptOccurred = 0 ;
/* True if we've registered an on_shmem_exit cleanup */
static bool unlistenExitRegistered = false ;
@ -127,16 +168,20 @@ static bool unlistenExitRegistered = false;
bool Trace_notify = false ;
static void queue_listen ( ListenActionKind action , const char * condname ) ;
static void Async_UnlistenAll ( void ) ;
static void Async_UnlistenOnExit ( int code , Datum arg ) ;
static void Exec_Listen ( Relation lRel , const char * relname ) ;
static void Exec_Unlisten ( Relation lRel , const char * relname ) ;
static void Exec_UnlistenAll ( Relation lRel ) ;
static void Send_Notify ( Relation lRel ) ;
static void ProcessIncomingNotify ( void ) ;
static void NotifyMyFrontEnd ( char * relname , int32 listenerPID ) ;
static bool AsyncExistsPendingNotify ( const char * relname ) ;
static void ClearPendingNotifies ( void ) ;
static void ClearPendingActionsAnd Notifies ( void ) ;
/*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
* Async_Notify
*
* This is executed by the SQL notify command .
@ -144,8 +189,6 @@ static void ClearPendingNotifies(void);
* Adds the relation to the list of pending notifies .
* Actual notification happens during transaction commit .
* ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
void
Async_Notify ( const char * relname )
@ -164,6 +207,12 @@ Async_Notify(const char *relname)
oldcontext = MemoryContextSwitchTo ( CurTransactionContext ) ;
/*
* Ordering of the list isn ' t important . We choose to put new
* entries on the front , as this might make duplicate - elimination
* a tad faster when the same condition is signaled many times in
* a row .
*/
pendingNotifies = lcons ( pstrdup ( relname ) , pendingNotifies ) ;
MemoryContextSwitchTo ( oldcontext ) ;
@ -171,34 +220,245 @@ Async_Notify(const char *relname)
}
/*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
* queue_listen
* Common code for listen , unlisten , unlisten all commands .
*
* Adds the request to the list of pending actions .
* Actual update of pg_listener happens during transaction commit .
* ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
*/
static void
queue_listen ( ListenActionKind action , const char * condname )
{
MemoryContext oldcontext ;
ListenAction * actrec ;
/*
* Unlike Async_Notify , we don ' t try to collapse out duplicates .
* It would be too complicated to ensure we get the right interactions
* of conflicting LISTEN / UNLISTEN / UNLISTEN_ALL , and it ' s unlikely that
* there would be any performance benefit anyway in sane applications .
*/
oldcontext = MemoryContextSwitchTo ( CurTransactionContext ) ;
/* space for terminating null is included in sizeof(ListenAction) */
actrec = ( ListenAction * ) palloc ( sizeof ( ListenAction ) + strlen ( condname ) ) ;
actrec - > action = action ;
strcpy ( actrec - > condname , condname ) ;
pendingActions = lappend ( pendingActions , actrec ) ;
MemoryContextSwitchTo ( oldcontext ) ;
}
/*
* Async_Listen
*
* This is executed by the SQL listen command .
*/
void
Async_Listen ( const char * relname )
{
if ( Trace_notify )
elog ( DEBUG1 , " Async_Listen(%s,%d) " , relname , MyProcPid ) ;
queue_listen ( LISTEN_LISTEN , relname ) ;
}
/*
* Async_Unlisten
*
* This is executed by the SQL unlisten command .
*/
void
Async_Unlisten ( const char * relname )
{
/* Handle specially the `unlisten "*"' command */
if ( ( ! relname ) | | ( * relname = = ' \0 ' ) | | ( strcmp ( relname , " * " ) = = 0 ) )
{
Async_UnlistenAll ( ) ;
}
else
{
if ( Trace_notify )
elog ( DEBUG1 , " Async_Unlisten(%s,%d) " , relname , MyProcPid ) ;
queue_listen ( LISTEN_UNLISTEN , relname ) ;
}
}
/*
* Async_UnlistenAll
*
* Register the current backend as listening on the specified
* relation .
* This is invoked by UNLISTEN " * " command , and also at backend exit .
*/
static void
Async_UnlistenAll ( void )
{
if ( Trace_notify )
elog ( DEBUG1 , " Async_UnlistenAll(%d) " , MyProcPid ) ;
queue_listen ( LISTEN_UNLISTEN_ALL , " " ) ;
}
/*
* Async_UnlistenOnExit
*
* Side effects :
* pg_listener is updated .
* Clean up the pg_listener table at backend exit .
*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
* This is executed if we have done any LISTENs in this backend .
* It might not be necessary anymore , if the user UNLISTENed everything ,
* but we don ' t try to detect that case .
*/
static void
Async_UnlistenOnExit ( int code , Datum arg )
{
/*
* We need to start / commit a transaction for the unlisten , but if there is
* already an active transaction we had better abort that one first .
* Otherwise we ' d end up committing changes that probably ought to be
* discarded .
*/
AbortOutOfAnyTransaction ( ) ;
/* Now we can do the unlisten */
StartTransactionCommand ( ) ;
Async_UnlistenAll ( ) ;
CommitTransactionCommand ( ) ;
}
/*
* AtPrepare_Notify
*
* This is called at the prepare phase of a two - phase
* transaction . Save the state for possible commit later .
*/
void
Async_Listen ( const char * relname )
AtPrepare_Notify ( void )
{
ListCell * p ;
/* It's not sensible to have any pending LISTEN/UNLISTEN actions */
if ( pendingActions )
ereport ( ERROR ,
( errcode ( ERRCODE_FEATURE_NOT_SUPPORTED ) ,
errmsg ( " cannot PREPARE a transaction that has executed LISTEN or UNLISTEN " ) ) ) ;
/* We can deal with pending NOTIFY though */
foreach ( p , pendingNotifies )
{
const char * relname = ( const char * ) lfirst ( p ) ;
RegisterTwoPhaseRecord ( TWOPHASE_RM_NOTIFY_ID , 0 ,
relname , strlen ( relname ) + 1 ) ;
}
/*
* We can clear the state immediately , rather than needing a separate
* PostPrepare call , because if the transaction fails we ' d just discard
* the state anyway .
*/
ClearPendingActionsAndNotifies ( ) ;
}
/*
* AtCommit_Notify
*
* This is called at transaction commit .
*
* If there are pending LISTEN / UNLISTEN actions , insert or delete
* tuples in pg_listener accordingly .
*
* If there are outbound notify requests in the pendingNotifies list ,
* scan pg_listener for matching tuples , and either signal the other
* backend or send a message to our own frontend .
*
* NOTE : we are still inside the current transaction , therefore can
* piggyback on its committing of changes .
*/
void
AtCommit_Notify ( void )
{
Relation lRel ;
ListCell * p ;
if ( pendingActions = = NIL & & pendingNotifies = = NIL )
return ; /* no relevant statements in this xact */
/*
* NOTIFY is disabled if not normal processing mode . This test used to be
* in xact . c , but it seems cleaner to do it here .
*/
if ( ! IsNormalProcessingMode ( ) )
{
ClearPendingActionsAndNotifies ( ) ;
return ;
}
if ( Trace_notify )
elog ( DEBUG1 , " AtCommit_Notify " ) ;
/* Acquire ExclusiveLock on pg_listener */
lRel = heap_open ( ListenerRelationId , ExclusiveLock ) ;
/* Perform any pending listen/unlisten actions */
foreach ( p , pendingActions )
{
ListenAction * actrec = ( ListenAction * ) lfirst ( p ) ;
switch ( actrec - > action )
{
case LISTEN_LISTEN :
Exec_Listen ( lRel , actrec - > condname ) ;
break ;
case LISTEN_UNLISTEN :
Exec_Unlisten ( lRel , actrec - > condname ) ;
break ;
case LISTEN_UNLISTEN_ALL :
Exec_UnlistenAll ( lRel ) ;
break ;
}
/* We must CCI after each action in case of conflicting actions */
CommandCounterIncrement ( ) ;
}
/* Perform any pending notifies */
if ( pendingNotifies )
Send_Notify ( lRel ) ;
/*
* We do NOT release the lock on pg_listener here ; we need to hold it
* until end of transaction ( which is about to happen , anyway ) to ensure
* that notified backends see our tuple updates when they look . Else they
* might disregard the signal , which would make the application programmer
* very unhappy . Also , this prevents race conditions when we have just
* inserted a listening tuple .
*/
heap_close ( lRel , NoLock ) ;
ClearPendingActionsAndNotifies ( ) ;
if ( Trace_notify )
elog ( DEBUG1 , " AtCommit_Notify: done " ) ;
}
/*
* Exec_Listen - - - subroutine for AtCommit_Notify
*
* Register the current backend as listening on the specified relation .
*/
static void
Exec_Listen ( Relation lRel , const char * relname )
{
HeapScanDesc scan ;
HeapTuple tuple ;
Datum values [ Natts_pg_listener ] ;
char nulls [ Natts_pg_listener ] ;
int i ;
NameData condname ;
bool alreadyListener = false ;
if ( Trace_notify )
elog ( DEBUG1 , " Async_Listen(%s,%d) " , relname , MyProcPid ) ;
lRel = heap_open ( ListenerRelationId , ExclusiveLock ) ;
elog ( DEBUG1 , " Exec_Listen(%s,%d) " , relname , MyProcPid ) ;
/* Detect whether we are already listening on this relname */
scan = heap_beginscan ( lRel , SnapshotNow , 0 , NULL ) ;
@ -217,27 +477,20 @@ Async_Listen(const char *relname)
heap_endscan ( scan ) ;
if ( alreadyListener )
{
heap_close ( lRel , ExclusiveLock ) ;
return ;
}
/*
* OK to insert a new tuple
*/
memset ( nulls , ' ' , sizeof ( nulls ) ) ;
for ( i = 0 ; i < Natts_pg_listener ; i + + )
{
nulls [ i ] = ' ' ;
values [ i ] = PointerGetDatum ( NULL ) ;
}
i = 0 ;
values [ i + + ] = ( Datum ) relname ;
values [ i + + ] = ( Datum ) MyProcPid ;
values [ i + + ] = ( Datum ) 0 ; /* no notifies pending */
namestrcpy ( & condname , relname ) ;
values [ Anum_pg_listener_relname - 1 ] = NameGetDatum ( & condname ) ;
values [ Anum_pg_listener_pid - 1 ] = Int32GetDatum ( MyProcPid ) ;
values [ Anum_pg_listener_notify - 1 ] = Int32GetDatum ( 0 ) ; /* no notifies pending */
tuple = heap_formtuple ( RelationGetDescr ( lRel ) , values , nulls ) ;
simple_heap_insert ( lRel , tuple ) ;
# ifdef NOT_USED /* currently there are no indexes */
@ -246,8 +499,6 @@ Async_Listen(const char *relname)
heap_freetuple ( tuple ) ;
heap_close ( lRel , ExclusiveLock ) ;
/*
* now that we are listening , make sure we will unlisten before dying .
*/
@ -259,37 +510,19 @@ Async_Listen(const char *relname)
}
/*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
* Async_Unlisten
*
* This is executed by the SQL unlisten command .
* Exec_Unlisten - - - subroutine for AtCommit_Notify
*
* Remove the current backend from the list of listening backends
* for the specified relation .
*
* Side effects :
* pg_listener is updated .
*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
void
Async_Unlisten ( const char * relname )
static void
Exec_Unlisten ( Relation lRel , const char * relname )
{
Relation lRel ;
HeapScanDesc scan ;
HeapTuple tuple ;
/* Handle specially the `unlisten "*"' command */
if ( ( ! relname ) | | ( * relname = = ' \0 ' ) | | ( strcmp ( relname , " * " ) = = 0 ) )
{
Async_UnlistenAll ( ) ;
return ;
}
if ( Trace_notify )
elog ( DEBUG1 , " Async_Unlisten(%s,%d) " , relname , MyProcPid ) ;
lRel = heap_open ( ListenerRelationId , ExclusiveLock ) ;
elog ( DEBUG1 , " Exec_Unlisten(%s,%d) " , relname , MyProcPid ) ;
scan = heap_beginscan ( lRel , SnapshotNow , 0 , NULL ) ;
while ( ( tuple = heap_getnext ( scan , ForwardScanDirection ) ) ! = NULL )
@ -311,8 +544,6 @@ Async_Unlisten(const char *relname)
}
heap_endscan ( scan ) ;
heap_close ( lRel , ExclusiveLock ) ;
/*
* We do not complain about unlistening something not being listened ;
* should we ?
@ -320,35 +551,19 @@ Async_Unlisten(const char *relname)
}
/*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
* Async_UnlistenAll
*
* Unlisten all relations for this backend .
*
* This is invoked by UNLISTEN " * " command , and also at backend exit .
*
* Results :
* XXX
* Exec_UnlistenAll - - - subroutine for AtCommit_Notify
*
* Side effects :
* pg_listener is updated .
*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
* Update pg_listener to unlisten all relations for this backend .
*/
static void
Async_UnlistenAll ( void )
Exec_UnlistenAll ( Relation lRel )
{
Relation lRel ;
TupleDesc tdesc ;
HeapScanDesc scan ;
HeapTuple lTuple ;
ScanKeyData key [ 1 ] ;
if ( Trace_notify )
elog ( DEBUG1 , " Async_UnlistenAll " ) ;
lRel = heap_open ( ListenerRelationId , ExclusiveLock ) ;
tdesc = RelationGetDescr ( lRel ) ;
elog ( DEBUG1 , " Exec_UnlistenAll " ) ;
/* Find and delete all entries with my listenerPID */
ScanKeyInit ( & key [ 0 ] ,
@ -361,100 +576,18 @@ Async_UnlistenAll(void)
simple_heap_delete ( lRel , & lTuple - > t_self ) ;
heap_endscan ( scan ) ;
heap_close ( lRel , ExclusiveLock ) ;
}
/*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
* Async_UnlistenOnExit
*
* Clean up the pg_listener table at backend exit .
*
* This is executed if we have done any LISTENs in this backend .
* It might not be necessary anymore , if the user UNLISTENed everything ,
* but we don ' t try to detect that case .
* Send_Notify - - - subroutine for AtCommit_Notify
*
* Results :
* XXX
*
* Side effects :
* pg_listener is updated if necessary .
*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
* Scan pg_listener for tuples matching our pending notifies , and
* either signal the other backend or send a message to our own frontend .
*/
static void
Async_UnlistenOnExit ( int code , Datum arg )
Send_Notify ( Relation lRel )
{
/*
* We need to start / commit a transaction for the unlisten , but if there is
* already an active transaction we had better abort that one first .
* Otherwise we ' d end up committing changes that probably ought to be
* discarded .
*/
AbortOutOfAnyTransaction ( ) ;
/* Now we can do the unlisten */
StartTransactionCommand ( ) ;
Async_UnlistenAll ( ) ;
CommitTransactionCommand ( ) ;
}
/*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
* AtPrepare_Notify
*
* This is called at the prepare phase of a two - phase
* transaction . Save the state for possible commit later .
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
void
AtPrepare_Notify ( void )
{
ListCell * p ;
foreach ( p , pendingNotifies )
{
const char * relname = ( const char * ) lfirst ( p ) ;
RegisterTwoPhaseRecord ( TWOPHASE_RM_NOTIFY_ID , 0 ,
relname , strlen ( relname ) + 1 ) ;
}
/*
* We can clear the state immediately , rather than needing a separate
* PostPrepare call , because if the transaction fails we ' d just discard
* the state anyway .
*/
ClearPendingNotifies ( ) ;
}
/*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
* AtCommit_Notify
*
* This is called at transaction commit .
*
* If there are outbound notify requests in the pendingNotifies list ,
* scan pg_listener for matching tuples , and either signal the other
* backend or send a message to our own frontend .
*
* NOTE : we are still inside the current transaction , therefore can
* piggyback on its committing of changes .
*
* Results :
* XXX
*
* Side effects :
* Tuples in pg_listener that have matching relnames and other peoples '
* listenerPIDs are updated with a nonzero notification field .
*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
void
AtCommit_Notify ( void )
{
Relation lRel ;
TupleDesc tdesc ;
TupleDesc tdesc = RelationGetDescr ( lRel ) ;
HeapScanDesc scan ;
HeapTuple lTuple ,
rTuple ;
@ -462,22 +595,6 @@ AtCommit_Notify(void)
char repl [ Natts_pg_listener ] ,
nulls [ Natts_pg_listener ] ;
if ( pendingNotifies = = NIL )
return ; /* no NOTIFY statements in this transaction */
/*
* NOTIFY is disabled if not normal processing mode . This test used to be
* in xact . c , but it seems cleaner to do it here .
*/
if ( ! IsNormalProcessingMode ( ) )
{
ClearPendingNotifies ( ) ;
return ;
}
if ( Trace_notify )
elog ( DEBUG1 , " AtCommit_Notify " ) ;
/* preset data to update notify column to MyProcPid */
nulls [ 0 ] = nulls [ 1 ] = nulls [ 2 ] = ' ' ;
repl [ 0 ] = repl [ 1 ] = repl [ 2 ] = ' ' ;
@ -485,8 +602,6 @@ AtCommit_Notify(void)
value [ 0 ] = value [ 1 ] = value [ 2 ] = ( Datum ) 0 ;
value [ Anum_pg_listener_notify - 1 ] = Int32GetDatum ( MyProcPid ) ;
lRel = heap_open ( ListenerRelationId , ExclusiveLock ) ;
tdesc = RelationGetDescr ( lRel ) ;
scan = heap_beginscan ( lRel , SnapshotNow , 0 , NULL ) ;
while ( ( lTuple = heap_getnext ( scan , ForwardScanDirection ) ) ! = NULL )
@ -506,7 +621,6 @@ AtCommit_Notify(void)
* could lose an outside notify , which ' d be bad for applications
* that ignore self - notify messages .
*/
if ( Trace_notify )
elog ( DEBUG1 , " AtCommit_Notify: notifying self " ) ;
@ -539,98 +653,32 @@ AtCommit_Notify(void)
}
else if ( listener - > notification = = 0 )
{
HTSU_Result result ;
ItemPointerData update_ctid ;
TransactionId update_xmax ;
rTuple = heap_modifytuple ( lTuple , tdesc ,
value , nulls , repl ) ;
/* Rewrite the tuple with my PID in notification column */
rTuple = heap_modifytuple ( lTuple , tdesc , value , nulls , repl ) ;
simple_heap_update ( lRel , & lTuple - > t_self , rTuple ) ;
/*
* We cannot use simple_heap_update here because the tuple
* could have been modified by an uncommitted transaction ;
* specifically , since UNLISTEN releases exclusive lock on the
* table before commit , the other guy could already have tried
* to unlisten . There are no other cases where we should be
* able to see an uncommitted update or delete . Therefore , our
* response to a HeapTupleBeingUpdated result is just to
* ignore it . We do * not * wait for the other guy to commit
* - - - that would risk deadlock , and we don ' t want to block
* while holding the table lock anyway for performance
* reasons . We also ignore HeapTupleUpdated , which could occur
* if the other guy commits between our heap_getnext and
* heap_update calls .
*/
result = heap_update ( lRel , & lTuple - > t_self , rTuple ,
& update_ctid , & update_xmax ,
GetCurrentCommandId ( ) , InvalidSnapshot ,
false /* no wait for commit */ ) ;
switch ( result )
{
case HeapTupleSelfUpdated :
/* Tuple was already updated in current command? */
elog ( ERROR , " tuple already updated by self " ) ;
break ;
case HeapTupleMayBeUpdated :
/* done successfully */
# ifdef NOT_USED /* currently there are no indexes */
CatalogUpdateIndexes ( lRel , rTuple ) ;
CatalogUpdateIndexes ( lRel , rTuple ) ;
# endif
break ;
case HeapTupleBeingUpdated :
/* ignore uncommitted tuples */
break ;
case HeapTupleUpdated :
/* ignore just-committed tuples */
break ;
default :
elog ( ERROR , " unrecognized heap_update status: %u " ,
result ) ;
break ;
}
}
}
}
heap_endscan ( scan ) ;
/*
* We do NOT release the lock on pg_listener here ; we need to hold it
* until end of transaction ( which is about to happen , anyway ) to ensure
* that notified backends see our tuple updates when they look . Else they
* might disregard the signal , which would make the application programmer
* very unhappy .
*/
heap_close ( lRel , NoLock ) ;
ClearPendingNotifies ( ) ;
if ( Trace_notify )
elog ( DEBUG1 , " AtCommit_Notify: done " ) ;
}
/*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
* AtAbort_Notify
*
* This is called at transaction abort .
*
* Gets rid of pending outbound notifies that we would have executed
* if the transaction got committed .
*
* Results :
* XXX
*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
* Gets rid of pending actions and outbound notifies that we would have
* executed if the transaction got committed .
*/
void
AtAbort_Notify ( void )
{
ClearPendingNotifies ( ) ;
ClearPendingActionsAndNotifies ( ) ;
}
/*
@ -646,6 +694,13 @@ AtSubStart_Notify(void)
/* Keep the list-of-lists in TopTransactionContext for simplicity */
old_cxt = MemoryContextSwitchTo ( TopTransactionContext ) ;
upperPendingActions = lcons ( pendingActions , upperPendingActions ) ;
Assert ( list_length ( upperPendingActions ) = =
GetCurrentTransactionNestLevel ( ) - 1 ) ;
pendingActions = NIL ;
upperPendingNotifies = lcons ( pendingNotifies , upperPendingNotifies ) ;
Assert ( list_length ( upperPendingNotifies ) = =
@ -659,13 +714,25 @@ AtSubStart_Notify(void)
/*
* AtSubCommit_Notify ( ) - - - Take care of subtransaction commit .
*
* Reassign all items in the pending notifies list to the parent transaction .
* Reassign all items in the pending lists to the parent transaction .
*/
void
AtSubCommit_Notify ( void )
{
List * parentPendingActions ;
List * parentPendingNotifies ;
parentPendingActions = ( List * ) linitial ( upperPendingActions ) ;
upperPendingActions = list_delete_first ( upperPendingActions ) ;
Assert ( list_length ( upperPendingActions ) = =
GetCurrentTransactionNestLevel ( ) - 2 ) ;
/*
* Mustn ' t try to eliminate duplicates here - - - see queue_listen ( )
*/
pendingActions = list_concat ( parentPendingActions , pendingActions ) ;
parentPendingNotifies = ( List * ) linitial ( upperPendingNotifies ) ;
upperPendingNotifies = list_delete_first ( upperPendingNotifies ) ;
@ -687,7 +754,7 @@ AtSubAbort_Notify(void)
int my_level = GetCurrentTransactionNestLevel ( ) ;
/*
* All we have to do is pop the stack - - - the notifies made in this
* All we have to do is pop the stack - - - the actions / notifies made in this
* subxact are no longer interesting , and the space will be freed when
* CurTransactionContext is recycled .
*
@ -696,6 +763,12 @@ AtSubAbort_Notify(void)
* GetCurrentTransactionNestLevel as the indicator of how far we need to
* prune the list .
*/
while ( list_length ( upperPendingActions ) > my_level - 2 )
{
pendingActions = ( List * ) linitial ( upperPendingActions ) ;
upperPendingActions = list_delete_first ( upperPendingActions ) ;
}
while ( list_length ( upperPendingNotifies ) > my_level - 2 )
{
pendingNotifies = ( List * ) linitial ( upperPendingNotifies ) ;
@ -704,7 +777,6 @@ AtSubAbort_Notify(void)
}
/*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
* NotifyInterruptHandler
*
* This is the signal handler for SIGUSR2 .
@ -712,13 +784,6 @@ AtSubAbort_Notify(void)
* If we are idle ( notifyInterruptEnabled is set ) , we can safely invoke
* ProcessIncomingNotify directly . Otherwise , just set a flag
* to do it later .
*
* Results :
* none
*
* Side effects :
* per above
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
void
NotifyInterruptHandler ( SIGNAL_ARGS )
@ -794,7 +859,6 @@ NotifyInterruptHandler(SIGNAL_ARGS)
}
/*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
* EnableNotifyInterrupt
*
* This is called by the PostgresMain main loop just before waiting
@ -804,7 +868,6 @@ NotifyInterruptHandler(SIGNAL_ARGS)
*
* NOTE : the signal handler starts out disabled , and stays so until
* PostgresMain calls this the first time .
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
void
EnableNotifyInterrupt ( void )
@ -853,7 +916,6 @@ EnableNotifyInterrupt(void)
}
/*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
* DisableNotifyInterrupt
*
* This is called by the PostgresMain main loop just after receiving
@ -863,7 +925,6 @@ EnableNotifyInterrupt(void)
* The SIGUSR1 signal handler also needs to call this , so as to
* prevent conflicts if one signal interrupts the other . So we
* must return the previous state of the flag .
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
bool
DisableNotifyInterrupt ( void )
@ -876,7 +937,6 @@ DisableNotifyInterrupt(void)
}
/*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
* ProcessIncomingNotify
*
* Deal with arriving NOTIFYs from other backends .
@ -886,7 +946,6 @@ DisableNotifyInterrupt(void)
* and clear the notification field in pg_listener until next time .
*
* NOTE : since we are outside any transaction , we must create our own .
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
static void
ProcessIncomingNotify ( void )
@ -949,9 +1008,6 @@ ProcessIncomingNotify(void)
/*
* Rewrite the tuple with 0 in notification column .
*
* simple_heap_update is safe here because no one else would have
* tried to UNLISTEN us , so there can be no uncommitted changes .
*/
rTuple = heap_modifytuple ( lTuple , tdesc , value , nulls , repl ) ;
simple_heap_update ( lRel , & lTuple - > t_self , rTuple ) ;
@ -1035,17 +1091,18 @@ AsyncExistsPendingNotify(const char *relname)
return false ;
}
/* Clear the pendingNotifies list. */
/* Clear the pendingActions and pending Notifies lists . */
static void
ClearPendingNotifies ( void )
ClearPendingActionsAnd Notifies ( void )
{
/*
* We used to have to explicitly deallocate the list members and nodes ,
* because they were malloc ' d . Now , since we know they are palloc ' d in
* CurTransactionContext , we need not do that - - - they ' ll go away
* automatically at transaction exit . We need only reset the list head
* pointer .
* pointers .
*/
pendingActions = NIL ;
pendingNotifies = NIL ;
}