@ -3,6 +3,21 @@
* method_worker . c
* AIO - perform AIO using worker processes
*
* IO workers consume IOs from a shared memory submission queue , run
* traditional synchronous system calls , and perform the shared completion
* handling immediately . Client code submits most requests by pushing IOs
* into the submission queue , and waits ( if necessary ) using condition
* variables . Some IOs cannot be performed in another process due to lack of
* infrastructure for reopening the file , and must processed synchronously by
* the client code when submitted .
*
* So that the submitter can make just one system call when submitting a batch
* of IOs , wakeups " fan out " ; each woken IO worker can wake two more . XXX This
* could be improved by using futexes instead of latches to wake N waiters .
*
* This method of AIO is available in all builds on all operating systems , and
* is the default .
*
* Portions Copyright ( c ) 1996 - 2025 , PostgreSQL Global Development Group
* Portions Copyright ( c ) 1994 , Regents of the University of California
*
@ -16,25 +31,339 @@
# include "libpq/pqsignal.h"
# include "miscadmin.h"
# include "port/pg_bitutils.h"
# include "postmaster/auxprocess.h"
# include "postmaster/interrupt.h"
# include "storage/aio.h"
# include "storage/aio_internal.h"
# include "storage/aio_subsys.h"
# include "storage/io_worker.h"
# include "storage/ipc.h"
# include "storage/latch.h"
# include "storage/proc.h"
# include "tcop/tcopprot.h"
# include "utils/ps_status.h"
# include "utils/wait_event.h"
/* How many workers should each worker wake up if needed? */
# define IO_WORKER_WAKEUP_FANOUT 2
typedef struct AioWorkerSubmissionQueue
{
uint32 size ;
uint32 mask ;
uint32 head ;
uint32 tail ;
uint32 ios [ FLEXIBLE_ARRAY_MEMBER ] ;
} AioWorkerSubmissionQueue ;
typedef struct AioWorkerSlot
{
Latch * latch ;
bool in_use ;
} AioWorkerSlot ;
typedef struct AioWorkerControl
{
uint64 idle_worker_mask ;
AioWorkerSlot workers [ FLEXIBLE_ARRAY_MEMBER ] ;
} AioWorkerControl ;
static size_t pgaio_worker_shmem_size ( void ) ;
static void pgaio_worker_shmem_init ( bool first_time ) ;
static bool pgaio_worker_needs_synchronous_execution ( PgAioHandle * ioh ) ;
static int pgaio_worker_submit ( uint16 num_staged_ios , PgAioHandle * * staged_ios ) ;
const IoMethodOps pgaio_worker_ops = {
. shmem_size = pgaio_worker_shmem_size ,
. shmem_init = pgaio_worker_shmem_init ,
. needs_synchronous_execution = pgaio_worker_needs_synchronous_execution ,
. submit = pgaio_worker_submit ,
} ;
/* GUCs */
int io_workers = 3 ;
static int io_worker_queue_size = 64 ;
static int MyIoWorkerId ;
static AioWorkerSubmissionQueue * io_worker_submission_queue ;
static AioWorkerControl * io_worker_control ;
static size_t
pgaio_worker_queue_shmem_size ( int * queue_size )
{
/* Round size up to next power of two so we can make a mask. */
* queue_size = pg_nextpower2_32 ( io_worker_queue_size ) ;
return offsetof ( AioWorkerSubmissionQueue , ios ) +
sizeof ( uint32 ) * * queue_size ;
}
static size_t
pgaio_worker_control_shmem_size ( void )
{
return offsetof ( AioWorkerControl , workers ) +
sizeof ( AioWorkerSlot ) * MAX_IO_WORKERS ;
}
static size_t
pgaio_worker_shmem_size ( void )
{
size_t sz ;
int queue_size ;
sz = pgaio_worker_queue_shmem_size ( & queue_size ) ;
sz = add_size ( sz , pgaio_worker_control_shmem_size ( ) ) ;
return sz ;
}
static void
pgaio_worker_shmem_init ( bool first_time )
{
bool found ;
int queue_size ;
io_worker_submission_queue =
ShmemInitStruct ( " AioWorkerSubmissionQueue " ,
pgaio_worker_queue_shmem_size ( & queue_size ) ,
& found ) ;
if ( ! found )
{
io_worker_submission_queue - > size = queue_size ;
io_worker_submission_queue - > head = 0 ;
io_worker_submission_queue - > tail = 0 ;
}
io_worker_control =
ShmemInitStruct ( " AioWorkerControl " ,
pgaio_worker_control_shmem_size ( ) ,
& found ) ;
if ( ! found )
{
io_worker_control - > idle_worker_mask = 0 ;
for ( int i = 0 ; i < MAX_IO_WORKERS ; + + i )
{
io_worker_control - > workers [ i ] . latch = NULL ;
io_worker_control - > workers [ i ] . in_use = false ;
}
}
}
static int
pgaio_choose_idle_worker ( void )
{
int worker ;
if ( io_worker_control - > idle_worker_mask = = 0 )
return - 1 ;
/* Find the lowest bit position, and clear it. */
worker = pg_rightmost_one_pos64 ( io_worker_control - > idle_worker_mask ) ;
io_worker_control - > idle_worker_mask & = ~ ( UINT64_C ( 1 ) < < worker ) ;
return worker ;
}
static bool
pgaio_worker_submission_queue_insert ( PgAioHandle * ioh )
{
AioWorkerSubmissionQueue * queue ;
uint32 new_head ;
queue = io_worker_submission_queue ;
new_head = ( queue - > head + 1 ) & ( queue - > size - 1 ) ;
if ( new_head = = queue - > tail )
{
pgaio_debug ( DEBUG3 , " io queue is full, at %u elements " ,
io_worker_submission_queue - > size ) ;
return false ; /* full */
}
queue - > ios [ queue - > head ] = pgaio_io_get_id ( ioh ) ;
queue - > head = new_head ;
return true ;
}
static uint32
pgaio_worker_submission_queue_consume ( void )
{
AioWorkerSubmissionQueue * queue ;
uint32 result ;
queue = io_worker_submission_queue ;
if ( queue - > tail = = queue - > head )
return UINT32_MAX ; /* empty */
result = queue - > ios [ queue - > tail ] ;
queue - > tail = ( queue - > tail + 1 ) & ( queue - > size - 1 ) ;
return result ;
}
static uint32
pgaio_worker_submission_queue_depth ( void )
{
uint32 head ;
uint32 tail ;
head = io_worker_submission_queue - > head ;
tail = io_worker_submission_queue - > tail ;
if ( tail > head )
head + = io_worker_submission_queue - > size ;
Assert ( head > = tail ) ;
return head - tail ;
}
static bool
pgaio_worker_needs_synchronous_execution ( PgAioHandle * ioh )
{
return
! IsUnderPostmaster
| | ioh - > flags & PGAIO_HF_REFERENCES_LOCAL
| | ! pgaio_io_can_reopen ( ioh ) ;
}
static void
pgaio_worker_submit_internal ( int nios , PgAioHandle * ios [ ] )
{
PgAioHandle * synchronous_ios [ PGAIO_SUBMIT_BATCH_SIZE ] ;
int nsync = 0 ;
Latch * wakeup = NULL ;
int worker ;
Assert ( nios < = PGAIO_SUBMIT_BATCH_SIZE ) ;
LWLockAcquire ( AioWorkerSubmissionQueueLock , LW_EXCLUSIVE ) ;
for ( int i = 0 ; i < nios ; + + i )
{
Assert ( ! pgaio_worker_needs_synchronous_execution ( ios [ i ] ) ) ;
if ( ! pgaio_worker_submission_queue_insert ( ios [ i ] ) )
{
/*
* We ' ll do it synchronously , but only after we ' ve sent as many as
* we can to workers , to maximize concurrency .
*/
synchronous_ios [ nsync + + ] = ios [ i ] ;
continue ;
}
if ( wakeup = = NULL )
{
/* Choose an idle worker to wake up if we haven't already. */
worker = pgaio_choose_idle_worker ( ) ;
if ( worker > = 0 )
wakeup = io_worker_control - > workers [ worker ] . latch ;
pgaio_debug_io ( DEBUG4 , ios [ i ] ,
" choosing worker %d " ,
worker ) ;
}
}
LWLockRelease ( AioWorkerSubmissionQueueLock ) ;
if ( wakeup )
SetLatch ( wakeup ) ;
/* Run whatever is left synchronously. */
if ( nsync > 0 )
{
for ( int i = 0 ; i < nsync ; + + i )
{
pgaio_io_perform_synchronously ( synchronous_ios [ i ] ) ;
}
}
}
static int
pgaio_worker_submit ( uint16 num_staged_ios , PgAioHandle * * staged_ios )
{
for ( int i = 0 ; i < num_staged_ios ; i + + )
{
PgAioHandle * ioh = staged_ios [ i ] ;
pgaio_io_prepare_submit ( ioh ) ;
}
pgaio_worker_submit_internal ( num_staged_ios , staged_ios ) ;
return num_staged_ios ;
}
/*
* on_shmem_exit ( ) callback that releases the worker ' s slot in
* io_worker_control .
*/
static void
pgaio_worker_die ( int code , Datum arg )
{
LWLockAcquire ( AioWorkerSubmissionQueueLock , LW_EXCLUSIVE ) ;
Assert ( io_worker_control - > workers [ MyIoWorkerId ] . in_use ) ;
Assert ( io_worker_control - > workers [ MyIoWorkerId ] . latch = = MyLatch ) ;
io_worker_control - > workers [ MyIoWorkerId ] . in_use = false ;
io_worker_control - > workers [ MyIoWorkerId ] . latch = NULL ;
LWLockRelease ( AioWorkerSubmissionQueueLock ) ;
}
/*
* Register the worker in shared memory , assign MyWorkerId and register a
* shutdown callback to release registration .
*/
static void
pgaio_worker_register ( void )
{
MyIoWorkerId = - 1 ;
/*
* XXX : This could do with more fine - grained locking . But it ' s also not
* very common for the number of workers to change at the moment . . .
*/
LWLockAcquire ( AioWorkerSubmissionQueueLock , LW_EXCLUSIVE ) ;
for ( int i = 0 ; i < MAX_IO_WORKERS ; + + i )
{
if ( ! io_worker_control - > workers [ i ] . in_use )
{
Assert ( io_worker_control - > workers [ i ] . latch = = NULL ) ;
io_worker_control - > workers [ i ] . in_use = true ;
MyIoWorkerId = i ;
break ;
}
else
Assert ( io_worker_control - > workers [ i ] . latch ! = NULL ) ;
}
if ( MyIoWorkerId = = - 1 )
elog ( ERROR , " couldn't find a free worker slot " ) ;
io_worker_control - > idle_worker_mask | = ( UINT64_C ( 1 ) < < MyIoWorkerId ) ;
io_worker_control - > workers [ MyIoWorkerId ] . latch = MyLatch ;
LWLockRelease ( AioWorkerSubmissionQueueLock ) ;
on_shmem_exit ( pgaio_worker_die , 0 ) ;
}
void
IoWorkerMain ( const void * startup_data , size_t startup_data_len )
{
sigjmp_buf local_sigjmp_buf ;
PgAioHandle * volatile error_ioh = NULL ;
volatile int error_errno = 0 ;
char cmd [ 128 ] ;
MyBackendType = B_IO_WORKER ;
AuxiliaryProcessMainCommon ( ) ;
@ -53,6 +382,12 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
pqsignal ( SIGUSR1 , procsignal_sigusr1_handler ) ;
pqsignal ( SIGUSR2 , SignalHandlerForShutdownRequest ) ;
/* also registers a shutdown callback to unregister */
pgaio_worker_register ( ) ;
sprintf ( cmd , " io worker: %d " , MyIoWorkerId ) ;
set_ps_display ( cmd ) ;
/* see PostgresMain() */
if ( sigsetjmp ( local_sigjmp_buf , 1 ) ! = 0 )
{
@ -61,6 +396,27 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
EmitErrorReport ( ) ;
/*
* In the - very unlikely - case that the IO failed in a way that
* raises an error we need to mark the IO as failed .
*
* Need to do just enough error recovery so that we can mark the IO as
* failed and then exit ( postmaster will start a new worker ) .
*/
LWLockReleaseAll ( ) ;
if ( error_ioh ! = NULL )
{
/* should never fail without setting error_errno */
Assert ( error_errno ! = 0 ) ;
errno = error_errno ;
START_CRIT_SECTION ( ) ;
pgaio_io_process_completion ( error_ioh , - error_errno ) ;
END_CRIT_SECTION ( ) ;
}
proc_exit ( 1 ) ;
}
@ -71,9 +427,89 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
while ( ! ShutdownRequestPending )
{
WaitLatch ( MyLatch , WL_LATCH_SET | WL_EXIT_ON_PM_DEATH , - 1 ,
WAIT_EVENT_IO_WORKER_MAIN ) ;
ResetLatch ( MyLatch ) ;
uint32 io_index ;
Latch * latches [ IO_WORKER_WAKEUP_FANOUT ] ;
int nlatches = 0 ;
int nwakeups = 0 ;
int worker ;
/* Try to get a job to do. */
LWLockAcquire ( AioWorkerSubmissionQueueLock , LW_EXCLUSIVE ) ;
if ( ( io_index = pgaio_worker_submission_queue_consume ( ) ) = = UINT32_MAX )
{
/*
* Nothing to do . Mark self idle .
*
* XXX : Invent some kind of back pressure to reduce useless
* wakeups ?
*/
io_worker_control - > idle_worker_mask | = ( UINT64_C ( 1 ) < < MyIoWorkerId ) ;
}
else
{
/* Got one. Clear idle flag. */
io_worker_control - > idle_worker_mask & = ~ ( UINT64_C ( 1 ) < < MyIoWorkerId ) ;
/* See if we can wake up some peers. */
nwakeups = Min ( pgaio_worker_submission_queue_depth ( ) ,
IO_WORKER_WAKEUP_FANOUT ) ;
for ( int i = 0 ; i < nwakeups ; + + i )
{
if ( ( worker = pgaio_choose_idle_worker ( ) ) < 0 )
break ;
latches [ nlatches + + ] = io_worker_control - > workers [ worker ] . latch ;
}
}
LWLockRelease ( AioWorkerSubmissionQueueLock ) ;
for ( int i = 0 ; i < nlatches ; + + i )
SetLatch ( latches [ i ] ) ;
if ( io_index ! = UINT32_MAX )
{
PgAioHandle * ioh = NULL ;
ioh = & pgaio_ctl - > io_handles [ io_index ] ;
error_ioh = ioh ;
pgaio_debug_io ( DEBUG4 , ioh ,
" worker %d processing IO " ,
MyIoWorkerId ) ;
/*
* It ' s very unlikely , but possible , that reopen fails . E . g . due
* to memory allocations failing or file permissions changing or
* such . In that case we need to fail the IO .
*
* There ' s not really a good errno we can report here .
*/
error_errno = ENOENT ;
pgaio_io_reopen ( ioh ) ;
/*
* To be able to exercise the reopen - fails path , allow injection
* points to trigger a failure at this point .
*/
pgaio_io_call_inj ( ioh , " AIO_WORKER_AFTER_REOPEN " ) ;
error_errno = 0 ;
error_ioh = NULL ;
/*
* We don ' t expect this to ever fail with ERROR or FATAL , no need
* to keep error_ioh set to the IO .
* pgaio_io_perform_synchronously ( ) contains a critical section to
* ensure we don ' t accidentally fail .
*/
pgaio_io_perform_synchronously ( ioh ) ;
}
else
{
WaitLatch ( MyLatch , WL_LATCH_SET | WL_EXIT_ON_PM_DEATH , - 1 ,
WAIT_EVENT_IO_WORKER_MAIN ) ;
ResetLatch ( MyLatch ) ;
}
CHECK_FOR_INTERRUPTS ( ) ;
}
@ -83,6 +519,5 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
bool
pgaio_workers_enabled ( void )
{
/* placeholder for future commit */
return false ;
return io_method = = IOMETHOD_WORKER ;
}