@ -52,26 +52,26 @@
# define IO_WORKER_WAKEUP_FANOUT 2
typedef struct AioWorkerSubmissionQueue
typedef struct Pg AioWorkerSubmissionQueue
{
uint32 size ;
uint32 mask ;
uint32 head ;
uint32 tail ;
uint32 io s[ FLEXIBLE_ARRAY_MEMBER ] ;
} AioWorkerSubmissionQueue ;
uint32 sqe s[ FLEXIBLE_ARRAY_MEMBER ] ;
} Pg AioWorkerSubmissionQueue;
typedef struct AioWorkerSlot
typedef struct Pg AioWorkerSlot
{
Latch * latch ;
bool in_use ;
} AioWorkerSlot ;
} Pg AioWorkerSlot;
typedef struct AioWorkerControl
typedef struct Pg AioWorkerControl
{
uint64 idle_worker_mask ;
AioWorkerSlot workers [ FLEXIBLE_ARRAY_MEMBER ] ;
} AioWorkerControl ;
Pg AioWorkerSlot workers [ FLEXIBLE_ARRAY_MEMBER ] ;
} Pg AioWorkerControl;
static size_t pgaio_worker_shmem_size ( void ) ;
@ -96,8 +96,8 @@ int io_workers = 3;
static int io_worker_queue_size = 64 ;
static int MyIoWorkerId ;
static AioWorkerSubmissionQueue * io_worker_submission_queue ;
static AioWorkerControl * io_worker_control ;
static Pg AioWorkerSubmissionQueue * io_worker_submission_queue ;
static Pg AioWorkerControl * io_worker_control ;
static size_t
@ -106,15 +106,15 @@ pgaio_worker_queue_shmem_size(int *queue_size)
/* Round size up to next power of two so we can make a mask. */
* queue_size = pg_nextpower2_32 ( io_worker_queue_size ) ;
return offsetof ( AioWorkerSubmissionQueue , io s) +
return offsetof ( Pg AioWorkerSubmissionQueue, sqe s) +
sizeof ( uint32 ) * * queue_size ;
}
static size_t
pgaio_worker_control_shmem_size ( void )
{
return offsetof ( AioWorkerControl , workers ) +
sizeof ( AioWorkerSlot ) * MAX_IO_WORKERS ;
return offsetof ( Pg AioWorkerControl, workers ) +
sizeof ( Pg AioWorkerSlot) * MAX_IO_WORKERS ;
}
static size_t
@ -162,7 +162,7 @@ pgaio_worker_shmem_init(bool first_time)
}
static int
pgaio_choose_idle_worker ( void )
pgaio_worker_ choose_idle ( void )
{
int worker ;
@ -180,7 +180,7 @@ pgaio_choose_idle_worker(void)
static bool
pgaio_worker_submission_queue_insert ( PgAioHandle * ioh )
{
AioWorkerSubmissionQueue * queue ;
Pg AioWorkerSubmissionQueue * queue ;
uint32 new_head ;
queue = io_worker_submission_queue ;
@ -192,7 +192,7 @@ pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
return false ; /* full */
}
queue - > io s[ queue - > head ] = pgaio_io_get_id ( ioh ) ;
queue - > sqe s[ queue - > head ] = pgaio_io_get_id ( ioh ) ;
queue - > head = new_head ;
return true ;
@ -201,14 +201,14 @@ pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
static uint32
pgaio_worker_submission_queue_consume ( void )
{
AioWorkerSubmissionQueue * queue ;
Pg AioWorkerSubmissionQueue * queue ;
uint32 result ;
queue = io_worker_submission_queue ;
if ( queue - > tail = = queue - > head )
return UINT32_MAX ; /* empty */
result = queue - > io s[ queue - > tail ] ;
result = queue - > sqe s[ queue - > tail ] ;
queue - > tail = ( queue - > tail + 1 ) & ( queue - > size - 1 ) ;
return result ;
@ -241,37 +241,37 @@ pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
}
static void
pgaio_worker_submit_internal ( int nios , PgAioHandle * ios [ ] )
pgaio_worker_submit_internal ( int num_staged_ ios , PgAioHandle * * staged_ios )
{
PgAioHandle * synchronous_ios [ PGAIO_SUBMIT_BATCH_SIZE ] ;
int nsync = 0 ;
Latch * wakeup = NULL ;
int worker ;
Assert ( nios < = PGAIO_SUBMIT_BATCH_SIZE ) ;
Assert ( num_staged_ ios < = PGAIO_SUBMIT_BATCH_SIZE ) ;
LWLockAcquire ( AioWorkerSubmissionQueueLock , LW_EXCLUSIVE ) ;
for ( int i = 0 ; i < nios ; + + i )
for ( int i = 0 ; i < num_staged_ ios ; + + i )
{
Assert ( ! pgaio_worker_needs_synchronous_execution ( ios [ i ] ) ) ;
if ( ! pgaio_worker_submission_queue_insert ( ios [ i ] ) )
Assert ( ! pgaio_worker_needs_synchronous_execution ( staged_ ios[ i ] ) ) ;
if ( ! pgaio_worker_submission_queue_insert ( staged_ ios[ i ] ) )
{
/*
* We ' ll do it synchronously , but only after we ' ve sent as many as
* we can to workers , to maximize concurrency .
*/
synchronous_ios [ nsync + + ] = ios [ i ] ;
synchronous_ios [ nsync + + ] = staged_ ios[ i ] ;
continue ;
}
if ( wakeup = = NULL )
{
/* Choose an idle worker to wake up if we haven't already. */
worker = pgaio_choose_idle_worker ( ) ;
worker = pgaio_worker_ choose_idle ( ) ;
if ( worker > = 0 )
wakeup = io_worker_control - > workers [ worker ] . latch ;
pgaio_debug_io ( DEBUG4 , ios [ i ] ,
pgaio_debug_io ( DEBUG4 , staged_ ios[ i ] ,
" choosing worker %d " ,
worker ) ;
}
@ -490,7 +490,7 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
IO_WORKER_WAKEUP_FANOUT ) ;
for ( int i = 0 ; i < nwakeups ; + + i )
{
if ( ( worker = pgaio_choose_idle_worker ( ) ) < 0 )
if ( ( worker = pgaio_worker_ choose_idle ( ) ) < 0 )
break ;
latches [ nlatches + + ] = io_worker_control - > workers [ worker ] . latch ;
}