@ -39,6 +39,9 @@
# ifdef HAVE_SYS_EPOLL_H
# include <sys/epoll.h>
# endif
# ifdef HAVE_SYS_EVENT_H
# include <sys/event.h>
# endif
# ifdef HAVE_POLL_H
# include <poll.h>
# endif
@ -60,10 +63,12 @@
* define somewhere before this block .
*/
# if defined(WAIT_USE_EPOLL) || defined(WAIT_USE_POLL) || \
defined ( WAIT_USE_WIN32 )
defined ( WAIT_USE_KQUEUE ) | | defined ( WAIT_USE_ WIN32 )
/* don't overwrite manual choice */
# elif defined(HAVE_SYS_EPOLL_H)
# define WAIT_USE_EPOLL
# elif defined(HAVE_KQUEUE)
# define WAIT_USE_KQUEUE
# elif defined(HAVE_POLL)
# define WAIT_USE_POLL
# elif WIN32
@ -104,6 +109,11 @@ struct WaitEventSet
int epoll_fd ;
/* epoll_wait returns events in a user provided arrays, allocate once */
struct epoll_event * epoll_ret_events ;
# elif defined(WAIT_USE_KQUEUE)
int kqueue_fd ;
/* kevent returns events in a user provided arrays, allocate once */
struct kevent * kqueue_ret_events ;
bool report_postmaster_not_running ;
# elif defined(WAIT_USE_POLL)
/* poll expects events to be waited on every poll() call, prepare once */
struct pollfd * pollfds ;
@ -136,6 +146,8 @@ static void drainSelfPipe(void);
# if defined(WAIT_USE_EPOLL)
static void WaitEventAdjustEpoll ( WaitEventSet * set , WaitEvent * event , int action ) ;
# elif defined(WAIT_USE_KQUEUE)
static void WaitEventAdjustKqueue ( WaitEventSet * set , WaitEvent * event , int old_events ) ;
# elif defined(WAIT_USE_POLL)
static void WaitEventAdjustPoll ( WaitEventSet * set , WaitEvent * event ) ;
# elif defined(WAIT_USE_WIN32)
@ -556,6 +568,8 @@ CreateWaitEventSet(MemoryContext context, int nevents)
# if defined(WAIT_USE_EPOLL)
sz + = MAXALIGN ( sizeof ( struct epoll_event ) * nevents ) ;
# elif defined(WAIT_USE_KQUEUE)
sz + = MAXALIGN ( sizeof ( struct kevent ) * nevents ) ;
# elif defined(WAIT_USE_POLL)
sz + = MAXALIGN ( sizeof ( struct pollfd ) * nevents ) ;
# elif defined(WAIT_USE_WIN32)
@ -574,6 +588,9 @@ CreateWaitEventSet(MemoryContext context, int nevents)
# if defined(WAIT_USE_EPOLL)
set - > epoll_ret_events = ( struct epoll_event * ) data ;
data + = MAXALIGN ( sizeof ( struct epoll_event ) * nevents ) ;
# elif defined(WAIT_USE_KQUEUE)
set - > kqueue_ret_events = ( struct kevent * ) data ;
data + = MAXALIGN ( sizeof ( struct kevent ) * nevents ) ;
# elif defined(WAIT_USE_POLL)
set - > pollfds = ( struct pollfd * ) data ;
data + = MAXALIGN ( sizeof ( struct pollfd ) * nevents ) ;
@ -599,6 +616,13 @@ CreateWaitEventSet(MemoryContext context, int nevents)
if ( fcntl ( set - > epoll_fd , F_SETFD , FD_CLOEXEC ) = = - 1 )
elog ( ERROR , " fcntl(F_SETFD) failed on epoll descriptor: %m " ) ;
# endif /* EPOLL_CLOEXEC */
# elif defined(WAIT_USE_KQUEUE)
set - > kqueue_fd = kqueue ( ) ;
if ( set - > kqueue_fd < 0 )
elog ( ERROR , " kqueue failed: %m " ) ;
if ( fcntl ( set - > kqueue_fd , F_SETFD , FD_CLOEXEC ) = = - 1 )
elog ( ERROR , " fcntl(F_SETFD) failed on kqueue descriptor: %m " ) ;
set - > report_postmaster_not_running = false ;
# elif defined(WAIT_USE_WIN32)
/*
@ -631,6 +655,8 @@ FreeWaitEventSet(WaitEventSet *set)
{
# if defined(WAIT_USE_EPOLL)
close ( set - > epoll_fd ) ;
# elif defined(WAIT_USE_KQUEUE)
close ( set - > kqueue_fd ) ;
# elif defined(WAIT_USE_WIN32)
WaitEvent * cur_event ;
@ -747,6 +773,8 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
/* perform wait primitive specific initialization, if needed */
# if defined(WAIT_USE_EPOLL)
WaitEventAdjustEpoll ( set , event , EPOLL_CTL_ADD ) ;
# elif defined(WAIT_USE_KQUEUE)
WaitEventAdjustKqueue ( set , event , 0 ) ;
# elif defined(WAIT_USE_POLL)
WaitEventAdjustPoll ( set , event ) ;
# elif defined(WAIT_USE_WIN32)
@ -766,10 +794,16 @@ void
ModifyWaitEvent ( WaitEventSet * set , int pos , uint32 events , Latch * latch )
{
WaitEvent * event ;
# if defined(WAIT_USE_KQUEUE)
int old_events ;
# endif
Assert ( pos < set - > nevents ) ;
event = & set - > events [ pos ] ;
# if defined(WAIT_USE_KQUEUE)
old_events = event - > events ;
# endif
/*
* If neither the event mask nor the associated latch changes , return
@ -803,6 +837,8 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
# if defined(WAIT_USE_EPOLL)
WaitEventAdjustEpoll ( set , event , EPOLL_CTL_MOD ) ;
# elif defined(WAIT_USE_KQUEUE)
WaitEventAdjustKqueue ( set , event , old_events ) ;
# elif defined(WAIT_USE_POLL)
WaitEventAdjustPoll ( set , event ) ;
# elif defined(WAIT_USE_WIN32)
@ -895,6 +931,131 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
}
# endif
# if defined(WAIT_USE_KQUEUE)
/*
* On most BSD family systems , the udata member of struct kevent is of type
* void * , so we could directly convert to / from WaitEvent * . Unfortunately ,
* NetBSD has it as intptr_t , so here we wallpaper over that difference with
* an lvalue cast .
*/
# define AccessWaitEvent(k_ev) (*((WaitEvent **)(&(k_ev)->udata)))
static inline void
WaitEventAdjustKqueueAdd ( struct kevent * k_ev , int filter , int action ,
WaitEvent * event )
{
k_ev - > ident = event - > fd ;
k_ev - > filter = filter ;
k_ev - > flags = action | EV_CLEAR ;
k_ev - > fflags = 0 ;
k_ev - > data = 0 ;
AccessWaitEvent ( k_ev ) = event ;
}
static inline void
WaitEventAdjustKqueueAddPostmaster ( struct kevent * k_ev , WaitEvent * event )
{
/* For now postmaster death can only be added, not removed. */
k_ev - > ident = PostmasterPid ;
k_ev - > filter = EVFILT_PROC ;
k_ev - > flags = EV_ADD | EV_CLEAR ;
k_ev - > fflags = NOTE_EXIT ;
k_ev - > data = 0 ;
AccessWaitEvent ( k_ev ) = event ;
}
/*
* old_events is the previous event mask , used to compute what has changed .
*/
static void
WaitEventAdjustKqueue ( WaitEventSet * set , WaitEvent * event , int old_events )
{
int rc ;
struct kevent k_ev [ 2 ] ;
int count = 0 ;
bool new_filt_read = false ;
bool old_filt_read = false ;
bool new_filt_write = false ;
bool old_filt_write = false ;
if ( old_events = = event - > events )
return ;
Assert ( event - > events ! = WL_LATCH_SET | | set - > latch ! = NULL ) ;
Assert ( event - > events = = WL_LATCH_SET | |
event - > events = = WL_POSTMASTER_DEATH | |
( event - > events & ( WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE ) ) ) ;
if ( event - > events = = WL_POSTMASTER_DEATH )
{
/*
* Unlike all the other implementations , we detect postmaster death
* using process notification instead of waiting on the postmaster
* alive pipe .
*/
WaitEventAdjustKqueueAddPostmaster ( & k_ev [ count + + ] , event ) ;
}
else
{
/*
* We need to compute the adds and deletes required to get from the
* old event mask to the new event mask , since kevent treats readable
* and writable as separate events .
*/
if ( old_events = = WL_LATCH_SET | |
( old_events & WL_SOCKET_READABLE ) )
old_filt_read = true ;
if ( event - > events = = WL_LATCH_SET | |
( event - > events & WL_SOCKET_READABLE ) )
new_filt_read = true ;
if ( old_events & WL_SOCKET_WRITEABLE )
old_filt_write = true ;
if ( event - > events & WL_SOCKET_WRITEABLE )
new_filt_write = true ;
if ( old_filt_read & & ! new_filt_read )
WaitEventAdjustKqueueAdd ( & k_ev [ count + + ] , EVFILT_READ , EV_DELETE ,
event ) ;
else if ( ! old_filt_read & & new_filt_read )
WaitEventAdjustKqueueAdd ( & k_ev [ count + + ] , EVFILT_READ , EV_ADD ,
event ) ;
if ( old_filt_write & & ! new_filt_write )
WaitEventAdjustKqueueAdd ( & k_ev [ count + + ] , EVFILT_WRITE , EV_DELETE ,
event ) ;
else if ( ! old_filt_write & & new_filt_write )
WaitEventAdjustKqueueAdd ( & k_ev [ count + + ] , EVFILT_WRITE , EV_ADD ,
event ) ;
}
Assert ( count > 0 ) ;
Assert ( count < = 2 ) ;
rc = kevent ( set - > kqueue_fd , & k_ev [ 0 ] , count , NULL , 0 , NULL ) ;
/*
* When adding the postmaster ' s pid , we have to consider that it might
* already have exited and perhaps even been replaced by another process
* with the same pid . If so , we have to defer reporting this as an event
* until the next call to WaitEventSetWaitBlock ( ) .
*/
if ( rc < 0 )
{
if ( event - > events = = WL_POSTMASTER_DEATH & & errno = = ESRCH )
set - > report_postmaster_not_running = true ;
else
ereport ( ERROR ,
( errcode_for_socket_access ( ) ,
/* translator: %s is a syscall name, such as "poll()" */
errmsg ( " %s failed: %m " ,
" kevent() " ) ) ) ;
}
else if ( event - > events = = WL_POSTMASTER_DEATH & & PostmasterPid ! = getppid ( ) )
set - > report_postmaster_not_running = true ;
}
# endif
# if defined(WAIT_USE_WIN32)
static void
WaitEventAdjustWin32 ( WaitEventSet * set , WaitEvent * event )
@ -1186,6 +1347,143 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
return returned_events ;
}
# elif defined(WAIT_USE_KQUEUE)
/*
* Wait using kevent ( 2 ) on BSD - family systems and macOS .
*
* For now this mirrors the epoll code , but in future it could modify the fd
* set in the same call to kevent as it uses for waiting instead of doing that
* with separate system calls .
*/
static int
WaitEventSetWaitBlock ( WaitEventSet * set , int cur_timeout ,
WaitEvent * occurred_events , int nevents )
{
int returned_events = 0 ;
int rc ;
WaitEvent * cur_event ;
struct kevent * cur_kqueue_event ;
struct timespec timeout ;
struct timespec * timeout_p ;
if ( cur_timeout < 0 )
timeout_p = NULL ;
else
{
timeout . tv_sec = cur_timeout / 1000 ;
timeout . tv_nsec = ( cur_timeout % 1000 ) * 1000000 ;
timeout_p = & timeout ;
}
/* Report events discovered by WaitEventAdjustKqueue(). */
if ( unlikely ( set - > report_postmaster_not_running ) )
{
if ( set - > exit_on_postmaster_death )
proc_exit ( 1 ) ;
occurred_events - > fd = PGINVALID_SOCKET ;
occurred_events - > events = WL_POSTMASTER_DEATH ;
return 1 ;
}
/* Sleep */
rc = kevent ( set - > kqueue_fd , NULL , 0 ,
set - > kqueue_ret_events , nevents ,
timeout_p ) ;
/* Check return code */
if ( rc < 0 )
{
/* EINTR is okay, otherwise complain */
if ( errno ! = EINTR )
{
waiting = false ;
ereport ( ERROR ,
( errcode_for_socket_access ( ) ,
/* translator: %s is a syscall name, such as "poll()" */
errmsg ( " %s failed: %m " ,
" kevent() " ) ) ) ;
}
return 0 ;
}
else if ( rc = = 0 )
{
/* timeout exceeded */
return - 1 ;
}
/*
* At least one event occurred , iterate over the returned kqueue events
* until they ' re either all processed , or we ' ve returned all the events
* the caller desired .
*/
for ( cur_kqueue_event = set - > kqueue_ret_events ;
cur_kqueue_event < ( set - > kqueue_ret_events + rc ) & &
returned_events < nevents ;
cur_kqueue_event + + )
{
/* kevent's udata points to the associated WaitEvent */
cur_event = AccessWaitEvent ( cur_kqueue_event ) ;
occurred_events - > pos = cur_event - > pos ;
occurred_events - > user_data = cur_event - > user_data ;
occurred_events - > events = 0 ;
if ( cur_event - > events = = WL_LATCH_SET & &
cur_kqueue_event - > filter = = EVFILT_READ )
{
/* There's data in the self-pipe, clear it. */
drainSelfPipe ( ) ;
if ( set - > latch - > is_set )
{
occurred_events - > fd = PGINVALID_SOCKET ;
occurred_events - > events = WL_LATCH_SET ;
occurred_events + + ;
returned_events + + ;
}
}
else if ( cur_event - > events = = WL_POSTMASTER_DEATH & &
cur_kqueue_event - > filter = = EVFILT_PROC & &
( cur_kqueue_event - > fflags & NOTE_EXIT ) ! = 0 )
{
if ( set - > exit_on_postmaster_death )
proc_exit ( 1 ) ;
occurred_events - > fd = PGINVALID_SOCKET ;
occurred_events - > events = WL_POSTMASTER_DEATH ;
occurred_events + + ;
returned_events + + ;
}
else if ( cur_event - > events & ( WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE ) )
{
Assert ( cur_event - > fd > = 0 ) ;
if ( ( cur_event - > events & WL_SOCKET_READABLE ) & &
( cur_kqueue_event - > filter = = EVFILT_READ ) )
{
/* readable, or EOF */
occurred_events - > events | = WL_SOCKET_READABLE ;
}
if ( ( cur_event - > events & WL_SOCKET_WRITEABLE ) & &
( cur_kqueue_event - > filter = = EVFILT_WRITE ) )
{
/* writable, or EOF */
occurred_events - > events | = WL_SOCKET_WRITEABLE ;
}
if ( occurred_events - > events ! = 0 )
{
occurred_events - > fd = cur_event - > fd ;
occurred_events + + ;
returned_events + + ;
}
}
}
return returned_events ;
}
# elif defined(WAIT_USE_POLL)
/*