@ -3,21 +3,20 @@
* latch . c
* latch . c
* Routines for inter - process latches
* Routines for inter - process latches
*
*
* The Unix implementation uses the so - called self - pipe trick to overcome the
* The poll ( ) implementation uses the so - called self - pipe trick to overcome the
* race condition involved with poll ( ) ( or epoll_wait ( ) on linux ) and setting
* race condition involved with poll ( ) and setting a global flag in the signal
* a global flag in the signal handler . When a latch is set and the current
* handler . When a latch is set and the current process is waiting for it , the
* process is waiting for it , the signal handler wakes up the poll ( ) in
* signal handler wakes up the poll ( ) in WaitLatch by writing a byte to a pipe .
* WaitLatch by writing a byte to a pipe . A signal by itself doesn ' t interrupt
* A signal by itself doesn ' t interrupt poll ( ) on all platforms , and even on
* poll ( ) on all platforms , and even on platforms where it does , a signal that
* platforms where it does , a signal that arrives just before the poll ( ) call
* arrives just before the poll ( ) call does not prevent poll ( ) from entering
* does not prevent poll ( ) from entering sleep . An incoming byte on a pipe
* sleep . An incoming byte on a pipe however reliably interrupts the sleep ,
* however reliably interrupts the sleep , and causes poll ( ) to return
* and causes poll ( ) to return immediately even if the signal arrives before
* immediately even if the signal arrives before poll ( ) begins .
* poll ( ) begins .
*
*
* When SetLatch is called from the same process that owns the latch ,
* The epoll ( ) implementation overcomes the race with a different technique : it
* SetLatch writes the byte directly to the pipe . If it ' s owned by another
* keeps SIGURG blocked and consumes from a signalfd ( ) descriptor instead . We
* process , SIGURG is sent and the signal handler in the waiting process
* don ' t need to register a signal handler or create our own self - pipe . We
* writes the byte to the pipe on behalf of the signaling process .
* assume that any system that has Linux epoll ( ) also has Linux signalfd ( ) .
*
*
* The Windows implementation uses Windows events that are inherited by all
* The Windows implementation uses Windows events that are inherited by all
* postmaster child processes . There ' s no need for the self - pipe trick there .
* postmaster child processes . There ' s no need for the self - pipe trick there .
@ -46,6 +45,7 @@
# include <poll.h>
# include <poll.h>
# endif
# endif
# include "libpq/pqsignal.h"
# include "miscadmin.h"
# include "miscadmin.h"
# include "pgstat.h"
# include "pgstat.h"
# include "port/atomics.h"
# include "port/atomics.h"
@ -79,6 +79,10 @@
# error "no wait set implementation available"
# error "no wait set implementation available"
# endif
# endif
# ifdef WAIT_USE_EPOLL
# include <sys/signalfd.h>
# endif
/* typedef in latch.h */
/* typedef in latch.h */
struct WaitEventSet
struct WaitEventSet
{
{
@ -139,7 +143,14 @@ static WaitEventSet *LatchWaitSet;
# ifndef WIN32
# ifndef WIN32
/* Are we currently in WaitLatch? The signal handler would like to know. */
/* Are we currently in WaitLatch? The signal handler would like to know. */
static volatile sig_atomic_t waiting = false ;
static volatile sig_atomic_t waiting = false ;
# endif
# ifdef WAIT_USE_EPOLL
/* On Linux, we'll receive SIGURG via a signalfd file descriptor. */
static int signal_fd = - 1 ;
# endif
# if defined(WAIT_USE_POLL) || defined(WAIT_USE_KQUEUE)
/* Read and write ends of the self-pipe */
/* Read and write ends of the self-pipe */
static int selfpipe_readfd = - 1 ;
static int selfpipe_readfd = - 1 ;
static int selfpipe_writefd = - 1 ;
static int selfpipe_writefd = - 1 ;
@ -150,8 +161,11 @@ static int selfpipe_owner_pid = 0;
/* Private function prototypes */
/* Private function prototypes */
static void latch_sigurg_handler ( SIGNAL_ARGS ) ;
static void latch_sigurg_handler ( SIGNAL_ARGS ) ;
static void sendSelfPipeByte ( void ) ;
static void sendSelfPipeByte ( void ) ;
static void drainSelfPipe ( void ) ;
# endif
# endif /* WIN32 */
# if defined(WAIT_USE_POLL) || defined(WAIT_USE_EPOLL)
static void drain ( void ) ;
# endif
# if defined(WAIT_USE_EPOLL)
# if defined(WAIT_USE_EPOLL)
static void WaitEventAdjustEpoll ( WaitEventSet * set , WaitEvent * event , int action ) ;
static void WaitEventAdjustEpoll ( WaitEventSet * set , WaitEvent * event , int action ) ;
@ -175,7 +189,7 @@ static inline int WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
void
void
InitializeLatchSupport ( void )
InitializeLatchSupport ( void )
{
{
# ifndef WIN32
# if defined(WAIT_USE_POLL) || defined(WAIT_USE_KQUEUE)
int pipefd [ 2 ] ;
int pipefd [ 2 ] ;
if ( IsUnderPostmaster )
if ( IsUnderPostmaster )
@ -247,8 +261,21 @@ InitializeLatchSupport(void)
ReserveExternalFD ( ) ;
ReserveExternalFD ( ) ;
pqsignal ( SIGURG , latch_sigurg_handler ) ;
pqsignal ( SIGURG , latch_sigurg_handler ) ;
# else
# endif
/* currently, nothing to do here for Windows */
# ifdef WAIT_USE_EPOLL
sigset_t signalfd_mask ;
/* Block SIGURG, because we'll receive it through a signalfd. */
sigaddset ( & UnBlockSig , SIGURG ) ;
/* Set up the signalfd to receive SIGURG notifications. */
sigemptyset ( & signalfd_mask ) ;
sigaddset ( & signalfd_mask , SIGURG ) ;
signal_fd = signalfd ( - 1 , & signalfd_mask , SFD_NONBLOCK | SFD_CLOEXEC ) ;
if ( signal_fd < 0 )
elog ( FATAL , " signalfd() failed " ) ;
ReserveExternalFD ( ) ;
# endif
# endif
}
}
@ -273,7 +300,9 @@ InitializeLatchWaitSet(void)
void
void
ShutdownLatchSupport ( void )
ShutdownLatchSupport ( void )
{
{
# if defined(WAIT_USE_POLL) || defined(WAIT_USE_KQUEUE)
pqsignal ( SIGURG , SIG_IGN ) ;
pqsignal ( SIGURG , SIG_IGN ) ;
# endif
if ( LatchWaitSet )
if ( LatchWaitSet )
{
{
@ -281,11 +310,18 @@ ShutdownLatchSupport(void)
LatchWaitSet = NULL ;
LatchWaitSet = NULL ;
}
}
# if defined(WAIT_USE_POLL) || defined(WAIT_USE_KQUEUE)
close ( selfpipe_readfd ) ;
close ( selfpipe_readfd ) ;
close ( selfpipe_writefd ) ;
close ( selfpipe_writefd ) ;
selfpipe_readfd = - 1 ;
selfpipe_readfd = - 1 ;
selfpipe_writefd = - 1 ;
selfpipe_writefd = - 1 ;
selfpipe_owner_pid = InvalidPid ;
selfpipe_owner_pid = InvalidPid ;
# endif
# if defined(WAIT_USE_EPOLL)
close ( signal_fd ) ;
signal_fd = - 1 ;
# endif
}
}
/*
/*
@ -299,10 +335,10 @@ InitLatch(Latch *latch)
latch - > owner_pid = MyProcPid ;
latch - > owner_pid = MyProcPid ;
latch - > is_shared = false ;
latch - > is_shared = false ;
# ifndef WIN32
# if defined(WAIT_USE_POLL) || defined(WAIT_USE_KQUEUE)
/* Assert InitializeLatchSupport has been called in this process */
/* Assert InitializeLatchSupport has been called in this process */
Assert ( selfpipe_readfd > = 0 & & selfpipe_owner_pid = = MyProcPid ) ;
Assert ( selfpipe_readfd > = 0 & & selfpipe_owner_pid = = MyProcPid ) ;
# else
# elif defined(WAIT_USE_WIN32)
latch - > event = CreateEvent ( NULL , TRUE , FALSE , NULL ) ;
latch - > event = CreateEvent ( NULL , TRUE , FALSE , NULL ) ;
if ( latch - > event = = NULL )
if ( latch - > event = = NULL )
elog ( ERROR , " CreateEvent failed: error code %lu " , GetLastError ( ) ) ;
elog ( ERROR , " CreateEvent failed: error code %lu " , GetLastError ( ) ) ;
@ -363,7 +399,7 @@ OwnLatch(Latch *latch)
/* Sanity checks */
/* Sanity checks */
Assert ( latch - > is_shared ) ;
Assert ( latch - > is_shared ) ;
# ifndef WIN32
# if defined(WAIT_USE_POLL) || defined(WAIT_USE_KQUEUE)
/* Assert InitializeLatchSupport has been called in this process */
/* Assert InitializeLatchSupport has been called in this process */
Assert ( selfpipe_readfd > = 0 & & selfpipe_owner_pid = = MyProcPid ) ;
Assert ( selfpipe_readfd > = 0 & & selfpipe_owner_pid = = MyProcPid ) ;
# endif
# endif
@ -550,9 +586,9 @@ SetLatch(Latch *latch)
/*
/*
* See if anyone ' s waiting for the latch . It can be the current process if
* See if anyone ' s waiting for the latch . It can be the current process if
* we ' re in a signal handler . We use the self - pipe to wake up the
* we ' re in a signal handler . We use the self - pipe or SIGURG to ourselves
* poll ( ) / epoll_wait ( ) in that case . If it ' s another process , send a
* to wake up WaitEventSetWaitBlock ( ) without races in that case . If it ' s
* signal .
* another process , send a signal .
*
*
* Fetch owner_pid only once , in case the latch is concurrently getting
* Fetch owner_pid only once , in case the latch is concurrently getting
* owned or disowned . XXX : This assumes that pid_t is atomic , which isn ' t
* owned or disowned . XXX : This assumes that pid_t is atomic , which isn ' t
@ -575,11 +611,17 @@ SetLatch(Latch *latch)
return ;
return ;
else if ( owner_pid = = MyProcPid )
else if ( owner_pid = = MyProcPid )
{
{
# if defined(WAIT_USE_POLL) || defined(WAIT_USE_KQUEUE)
if ( waiting )
if ( waiting )
sendSelfPipeByte ( ) ;
sendSelfPipeByte ( ) ;
# else
if ( waiting )
kill ( MyProcPid , SIGURG ) ;
# endif
}
}
else
else
kill ( owner_pid , SIGURG ) ;
kill ( owner_pid , SIGURG ) ;
# else
# else
/*
/*
@ -856,8 +898,13 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
{
{
set - > latch = latch ;
set - > latch = latch ;
set - > latch_pos = event - > pos ;
set - > latch_pos = event - > pos ;
# ifndef WIN32
# if defined(WAIT_USE_POLL) || defined(WAIT_USE_KQUEUE)
event - > fd = selfpipe_readfd ;
event - > fd = selfpipe_readfd ;
# elif defined(WAIT_USE_EPOLL)
event - > fd = signal_fd ;
# else
event - > fd = PGINVALID_SOCKET ;
return event - > pos ;
# endif
# endif
}
}
else if ( events = = WL_POSTMASTER_DEATH )
else if ( events = = WL_POSTMASTER_DEATH )
@ -932,12 +979,13 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
if ( latch & & latch - > owner_pid ! = MyProcPid )
if ( latch & & latch - > owner_pid ! = MyProcPid )
elog ( ERROR , " cannot wait on a latch owned by another process " ) ;
elog ( ERROR , " cannot wait on a latch owned by another process " ) ;
set - > latch = latch ;
set - > latch = latch ;
/*
/*
* On Unix , we don ' t need to modify the kernel object because the
* On Unix , we don ' t need to modify the kernel object because the
* underlying pipe is the same for all latches so we can return
* underlying pipe ( if there is one ) is the same for all latches so we
* immediately . On Windows , we need to update our array of handles ,
* can return immediately . On Windows , we need to update our array of
* but we leave the old one in place and tolerate spurious wakeups if
* handles , but we leave the old one in place and tolerate spurious
* the latch is disabled .
* wakeups if the latch is disabled .
*/
*/
# if defined(WAIT_USE_WIN32)
# if defined(WAIT_USE_WIN32)
if ( ! latch )
if ( ! latch )
@ -1421,8 +1469,8 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
if ( cur_event - > events = = WL_LATCH_SET & &
if ( cur_event - > events = = WL_LATCH_SET & &
cur_epoll_event - > events & ( EPOLLIN | EPOLLERR | EPOLLHUP ) )
cur_epoll_event - > events & ( EPOLLIN | EPOLLERR | EPOLLHUP ) )
{
{
/* There's data in the self-pipe, clear it . */
/* Drain the signalfd . */
drainSelfPipe ( ) ;
drain ( ) ;
if ( set - > latch & & set - > latch - > is_set )
if ( set - > latch & & set - > latch - > is_set )
{
{
@ -1575,7 +1623,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
cur_kqueue_event - > filter = = EVFILT_READ )
cur_kqueue_event - > filter = = EVFILT_READ )
{
{
/* There's data in the self-pipe, clear it. */
/* There's data in the self-pipe, clear it. */
drainSelfPipe ( ) ;
drain ( ) ;
if ( set - > latch & & set - > latch - > is_set )
if ( set - > latch & & set - > latch - > is_set )
{
{
@ -1691,7 +1739,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
( cur_pollfd - > revents & ( POLLIN | POLLHUP | POLLERR | POLLNVAL ) ) )
( cur_pollfd - > revents & ( POLLIN | POLLHUP | POLLERR | POLLNVAL ) ) )
{
{
/* There's data in the self-pipe, clear it. */
/* There's data in the self-pipe, clear it. */
drainSelfPipe ( ) ;
drain ( ) ;
if ( set - > latch & & set - > latch - > is_set )
if ( set - > latch & & set - > latch - > is_set )
{
{
@ -1951,7 +1999,8 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
}
}
# endif
# endif
# ifndef WIN32
# if defined(WAIT_USE_POLL) || defined(WAIT_USE_KQUEUE)
/*
/*
* SetLatch uses SIGURG to wake up the process waiting on the latch .
* SetLatch uses SIGURG to wake up the process waiting on the latch .
*
*
@ -1967,10 +2016,8 @@ latch_sigurg_handler(SIGNAL_ARGS)
errno = save_errno ;
errno = save_errno ;
}
}
# endif /* !WIN32 */
/* Send one byte to the self-pipe, to wake up WaitLatch */
/* Send one byte to the self-pipe, to wake up WaitLatch */
# ifndef WIN32
static void
static void
sendSelfPipeByte ( void )
sendSelfPipeByte ( void )
{
{
@ -2000,45 +2047,58 @@ retry:
return ;
return ;
}
}
}
}
# endif /* !WIN32 */
# endif
# if defined(WAIT_USE_POLL) || defined(WAIT_USE_EPOLL)
/*
/*
* Read all available data from the self - pipe
* Read all available data from self - pipe or signalfd .
*
*
* Note : this is only called when waiting = true . If it fails and doesn ' t
* Note : this is only called when waiting = true . If it fails and doesn ' t
* return , it must reset that flag first ( though ideally , this will never
* return , it must reset that flag first ( though ideally , this will never
* happen ) .
* happen ) .
*/
*/
# ifndef WIN32
static void
static void
drainSelfPipe ( void )
drain ( void )
{
{
/*
char buf [ 1024 ] ;
* There shouldn ' t normally be more than one byte in the pipe , or maybe a
* few bytes if multiple processes run SetLatch at the same instant .
*/
char buf [ 16 ] ;
int rc ;
int rc ;
int fd ;
# ifdef WAIT_USE_POLL
fd = selfpipe_readfd ;
# else
fd = signal_fd ;
# endif
for ( ; ; )
for ( ; ; )
{
{
rc = read ( selfpipe_readfd , buf , sizeof ( buf ) ) ;
rc = read ( fd , buf , sizeof ( buf ) ) ;
if ( rc < 0 )
if ( rc < 0 )
{
{
if ( errno = = EAGAIN | | errno = = EWOULDBLOCK )
if ( errno = = EAGAIN | | errno = = EWOULDBLOCK )
break ; /* the pipe is empty */
break ; /* the descriptor is empty */
else if ( errno = = EINTR )
else if ( errno = = EINTR )
continue ; /* retry */
continue ; /* retry */
else
else
{
{
waiting = false ;
waiting = false ;
# ifdef WAIT_USE_POLL
elog ( ERROR , " read() on self-pipe failed: %m " ) ;
elog ( ERROR , " read() on self-pipe failed: %m " ) ;
# else
elog ( ERROR , " read() on signalfd failed: %m " ) ;
# endif
}
}
}
}
else if ( rc = = 0 )
else if ( rc = = 0 )
{
{
waiting = false ;
waiting = false ;
# ifdef WAIT_USE_POLL
elog ( ERROR , " unexpected EOF on self-pipe " ) ;
elog ( ERROR , " unexpected EOF on self-pipe " ) ;
# else
elog ( ERROR , " unexpected EOF on signalfd " ) ;
# endif
}
}
else if ( rc < sizeof ( buf ) )
else if ( rc < sizeof ( buf ) )
{
{
@ -2048,4 +2108,5 @@ drainSelfPipe(void)
/* else buffer wasn't big enough, so read again */
/* else buffer wasn't big enough, so read again */
}
}
}
}
# endif /* !WIN32 */
# endif