mirror of https://github.com/postgres/postgres
Performing AIO using io_uring can be considerably faster than io_method=worker, particularly when lots of small IOs are issued, as a) the context-switch overhead for worker based AIO becomes more significant b) the number of IO workers can become limiting io_uring, however, is linux specific and requires an additional compile-time dependency (liburing). This implementation is fairly simple and there are substantial optimization opportunities. The description of the existing AIO_IO_COMPLETION wait event is updated to make the difference between it and the new AIO_IO_URING_EXECUTION clearer. Reviewed-by: Noah Misch <noah@leadboat.com> Reviewed-by: Jakub Wartak <jakub.wartak@enterprisedb.com> Discussion: https://postgr.es/m/uvrtrknj4kdytuboidbhwclo4gxhswwcpgadptsjvjqcluzmah%40brqs62irg4dt Discussion: https://postgr.es/m/20210223100344.llw5an2aklengrmn@alap3.anarazel.de Discussion: https://postgr.es/m/stj36ea6yyhoxtqkhpieia2z4krnam7qyetc57rfezgk4zgapf@gcnactj4z56mpull/208/head
parent
8eadd5c73c
commit
c325a7633f
@ -0,0 +1,484 @@ |
||||
/*-------------------------------------------------------------------------
|
||||
* |
||||
* method_io_uring.c |
||||
* AIO - perform AIO using Linux' io_uring |
||||
* |
||||
* For now we create one io_uring instance for each backend. These io_uring |
||||
* instances have to be created in postmaster, during startup, to allow other |
||||
* backends to process IO completions, if the issuing backend is currently |
||||
* busy doing other things. Other backends may not use another backend's |
||||
* io_uring instance to submit IO, that'd require additional locking that |
||||
* would likely be harmful for performance. |
||||
* |
||||
* We likely will want to introduce a backend-local io_uring instance in the |
||||
* future, e.g. for FE/BE network IO. |
||||
* |
||||
* Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group |
||||
* Portions Copyright (c) 1994, Regents of the University of California |
||||
* |
||||
* IDENTIFICATION |
||||
* src/backend/storage/aio/method_io_uring.c |
||||
* |
||||
*------------------------------------------------------------------------- |
||||
*/ |
||||
|
||||
#include "postgres.h" |
||||
|
||||
/* included early, for IOMETHOD_IO_URING_ENABLED */ |
||||
#include "storage/aio.h" |
||||
|
||||
#ifdef IOMETHOD_IO_URING_ENABLED |
||||
|
||||
#include <liburing.h> |
||||
|
||||
#include "miscadmin.h" |
||||
#include "storage/aio_internal.h" |
||||
#include "storage/fd.h" |
||||
#include "storage/proc.h" |
||||
#include "storage/shmem.h" |
||||
#include "storage/lwlock.h" |
||||
#include "storage/procnumber.h" |
||||
#include "utils/wait_event.h" |
||||
|
||||
|
||||
/* number of completions processed at once */ |
||||
#define PGAIO_MAX_LOCAL_COMPLETED_IO 32 |
||||
|
||||
|
||||
/* Entry points for IoMethodOps. */ |
||||
static size_t pgaio_uring_shmem_size(void); |
||||
static void pgaio_uring_shmem_init(bool first_time); |
||||
static void pgaio_uring_init_backend(void); |
||||
static int pgaio_uring_submit(uint16 num_staged_ios, PgAioHandle **staged_ios); |
||||
static void pgaio_uring_wait_one(PgAioHandle *ioh, uint64 ref_generation); |
||||
|
||||
/* helper functions */ |
||||
static void pgaio_uring_sq_from_io(PgAioHandle *ioh, struct io_uring_sqe *sqe); |
||||
|
||||
|
||||
const IoMethodOps pgaio_uring_ops = { |
||||
/*
|
||||
* While io_uring mostly is OK with FDs getting closed while the IO is in |
||||
* flight, that is not true for IOs submitted with IOSQE_ASYNC. |
||||
* |
||||
* See |
||||
* https://postgr.es/m/5ons2rtmwarqqhhexb3dnqulw5rjgwgoct57vpdau4rujlrffj%403fls6d2mkiwc
|
||||
*/ |
||||
.wait_on_fd_before_close = true, |
||||
|
||||
.shmem_size = pgaio_uring_shmem_size, |
||||
.shmem_init = pgaio_uring_shmem_init, |
||||
.init_backend = pgaio_uring_init_backend, |
||||
|
||||
.submit = pgaio_uring_submit, |
||||
.wait_one = pgaio_uring_wait_one, |
||||
}; |
||||
|
||||
/*
|
||||
* Per-backend state when using io_method=io_uring |
||||
* |
||||
* Align the whole struct to a cacheline boundary, to prevent false sharing |
||||
* between completion_lock and prior backend's io_uring_ring. |
||||
*/ |
||||
typedef struct pg_attribute_aligned (PG_CACHE_LINE_SIZE) |
||||
PgAioUringContext |
||||
{ |
||||
/*
|
||||
* Multiple backends can process completions for this backend's io_uring |
||||
* instance (e.g. when the backend issuing IO is busy doing something |
||||
* else). To make that safe we have to ensure that only a single backend |
||||
* gets io completions from the io_uring instance at a time. |
||||
*/ |
||||
LWLock completion_lock; |
||||
|
||||
struct io_uring io_uring_ring; |
||||
} PgAioUringContext; |
||||
|
||||
/* PgAioUringContexts for all backends */ |
||||
static PgAioUringContext *pgaio_uring_contexts; |
||||
|
||||
/* the current backend's context */ |
||||
static PgAioUringContext *pgaio_my_uring_context; |
||||
|
||||
|
||||
static uint32 |
||||
pgaio_uring_procs(void) |
||||
{ |
||||
/*
|
||||
* We can subtract MAX_IO_WORKERS here as io workers are never used at the |
||||
* same time as io_method=io_uring. |
||||
*/ |
||||
return MaxBackends + NUM_AUXILIARY_PROCS - MAX_IO_WORKERS; |
||||
} |
||||
|
||||
static Size |
||||
pgaio_uring_context_shmem_size(void) |
||||
{ |
||||
return mul_size(pgaio_uring_procs(), sizeof(PgAioUringContext)); |
||||
} |
||||
|
||||
static size_t |
||||
pgaio_uring_shmem_size(void) |
||||
{ |
||||
return pgaio_uring_context_shmem_size(); |
||||
} |
||||
|
||||
static void |
||||
pgaio_uring_shmem_init(bool first_time) |
||||
{ |
||||
int TotalProcs = MaxBackends + NUM_AUXILIARY_PROCS - MAX_IO_WORKERS; |
||||
bool found; |
||||
|
||||
pgaio_uring_contexts = (PgAioUringContext *) |
||||
ShmemInitStruct("AioUring", pgaio_uring_shmem_size(), &found); |
||||
|
||||
if (found) |
||||
return; |
||||
|
||||
for (int contextno = 0; contextno < TotalProcs; contextno++) |
||||
{ |
||||
PgAioUringContext *context = &pgaio_uring_contexts[contextno]; |
||||
int ret; |
||||
|
||||
/*
|
||||
* Right now a high TotalProcs will cause problems in two ways: |
||||
* |
||||
* - RLIMIT_NOFILE needs to be big enough to allow all |
||||
* io_uring_queue_init() calls to succeed. |
||||
* |
||||
* - RLIMIT_NOFILE needs to be big enough to still have enough file |
||||
* descriptors to satisfy set_max_safe_fds() left over. Or, even |
||||
* better, have max_files_per_process left over FDs. |
||||
* |
||||
* We probably should adjust the soft RLIMIT_NOFILE to ensure that. |
||||
* |
||||
* |
||||
* XXX: Newer versions of io_uring support sharing the workers that |
||||
* execute some asynchronous IOs between io_uring instances. It might |
||||
* be worth using that - also need to evaluate if that causes |
||||
* noticeable additional contention? |
||||
*/ |
||||
ret = io_uring_queue_init(io_max_concurrency, &context->io_uring_ring, 0); |
||||
if (ret < 0) |
||||
{ |
||||
char *hint = NULL; |
||||
int err = ERRCODE_INTERNAL_ERROR; |
||||
|
||||
/* add hints for some failures that errno explains sufficiently */ |
||||
if (-ret == EPERM) |
||||
{ |
||||
err = ERRCODE_INSUFFICIENT_PRIVILEGE; |
||||
hint = _("Check if io_uring is disabled via /proc/sys/kernel/io_uring_disabled."); |
||||
} |
||||
else if (-ret == EMFILE) |
||||
{ |
||||
err = ERRCODE_INSUFFICIENT_RESOURCES; |
||||
hint = psprintf(_("Consider increasing \"ulimit -n\" to at least %d."), |
||||
TotalProcs + max_files_per_process); |
||||
} |
||||
else if (-ret == ENOSYS) |
||||
{ |
||||
err = ERRCODE_FEATURE_NOT_SUPPORTED; |
||||
hint = _("Kernel does not support io_uring."); |
||||
} |
||||
|
||||
/* update errno to allow %m to work */ |
||||
errno = -ret; |
||||
|
||||
ereport(ERROR, |
||||
errcode(err), |
||||
errmsg("could not setup io_uring queue: %m"), |
||||
hint != NULL ? errhint("%s", hint) : 0); |
||||
} |
||||
|
||||
LWLockInitialize(&context->completion_lock, LWTRANCHE_AIO_URING_COMPLETION); |
||||
} |
||||
} |
||||
|
||||
static void |
||||
pgaio_uring_init_backend(void) |
||||
{ |
||||
Assert(MyProcNumber < pgaio_uring_procs()); |
||||
|
||||
pgaio_my_uring_context = &pgaio_uring_contexts[MyProcNumber]; |
||||
} |
||||
|
||||
static int |
||||
pgaio_uring_submit(uint16 num_staged_ios, PgAioHandle **staged_ios) |
||||
{ |
||||
struct io_uring *uring_instance = &pgaio_my_uring_context->io_uring_ring; |
||||
int in_flight_before = dclist_count(&pgaio_my_backend->in_flight_ios); |
||||
|
||||
Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE); |
||||
|
||||
for (int i = 0; i < num_staged_ios; i++) |
||||
{ |
||||
PgAioHandle *ioh = staged_ios[i]; |
||||
struct io_uring_sqe *sqe; |
||||
|
||||
sqe = io_uring_get_sqe(uring_instance); |
||||
|
||||
if (!sqe) |
||||
elog(ERROR, "io_uring submission queue is unexpectedly full"); |
||||
|
||||
pgaio_io_prepare_submit(ioh); |
||||
pgaio_uring_sq_from_io(ioh, sqe); |
||||
|
||||
/*
|
||||
* io_uring executes IO in process context if possible. That's |
||||
* generally good, as it reduces context switching. When performing a |
||||
* lot of buffered IO that means that copying between page cache and |
||||
* userspace memory happens in the foreground, as it can't be |
||||
* offloaded to DMA hardware as is possible when using direct IO. When |
||||
* executing a lot of buffered IO this causes io_uring to be slower |
||||
* than worker mode, as worker mode parallelizes the copying. io_uring |
||||
* can be told to offload work to worker threads instead. |
||||
* |
||||
* If an IO is buffered IO and we already have IOs in flight or |
||||
* multiple IOs are being submitted, we thus tell io_uring to execute |
||||
* the IO in the background. We don't do so for the first few IOs |
||||
* being submitted as executing in this process' context has lower |
||||
* latency. |
||||
*/ |
||||
if (in_flight_before > 4 && (ioh->flags & PGAIO_HF_BUFFERED)) |
||||
io_uring_sqe_set_flags(sqe, IOSQE_ASYNC); |
||||
|
||||
in_flight_before++; |
||||
} |
||||
|
||||
while (true) |
||||
{ |
||||
int ret; |
||||
|
||||
pgstat_report_wait_start(WAIT_EVENT_AIO_IO_URING_SUBMIT); |
||||
ret = io_uring_submit(uring_instance); |
||||
pgstat_report_wait_end(); |
||||
|
||||
if (ret == -EINTR) |
||||
{ |
||||
pgaio_debug(DEBUG3, |
||||
"aio method uring: submit EINTR, nios: %d", |
||||
num_staged_ios); |
||||
} |
||||
else if (ret < 0) |
||||
{ |
||||
/*
|
||||
* The io_uring_enter() manpage suggests that the appropriate |
||||
* reaction to EAGAIN is: |
||||
* |
||||
* "The application should wait for some completions and try |
||||
* again" |
||||
* |
||||
* However, it seems unlikely that that would help in our case, as |
||||
* we apply a low limit to the number of outstanding IOs and thus |
||||
* also outstanding completions, making it unlikely that we'd get |
||||
* EAGAIN while the OS is in good working order. |
||||
* |
||||
* Additionally, it would be problematic to just wait here, our |
||||
* caller might hold critical locks. It'd possibly lead to |
||||
* delaying the crash-restart that seems likely to occur when the |
||||
* kernel is under such heavy memory pressure. |
||||
* |
||||
* Update errno to allow %m to work. |
||||
*/ |
||||
errno = -ret; |
||||
elog(PANIC, "io_uring submit failed: %m"); |
||||
} |
||||
else if (ret != num_staged_ios) |
||||
{ |
||||
/* likely unreachable, but if it is, we would need to re-submit */ |
||||
elog(PANIC, "io_uring submit submitted only %d of %d", |
||||
ret, num_staged_ios); |
||||
} |
||||
else |
||||
{ |
||||
pgaio_debug(DEBUG4, |
||||
"aio method uring: submitted %d IOs", |
||||
num_staged_ios); |
||||
break; |
||||
} |
||||
} |
||||
|
||||
return num_staged_ios; |
||||
} |
||||
|
||||
static void |
||||
pgaio_uring_drain_locked(PgAioUringContext *context) |
||||
{ |
||||
int ready; |
||||
int orig_ready; |
||||
|
||||
Assert(LWLockHeldByMeInMode(&context->completion_lock, LW_EXCLUSIVE)); |
||||
|
||||
/*
|
||||
* Don't drain more events than available right now. Otherwise it's |
||||
* plausible that one backend could get stuck, for a while, receiving CQEs |
||||
* without actually processing them. |
||||
*/ |
||||
orig_ready = ready = io_uring_cq_ready(&context->io_uring_ring); |
||||
|
||||
while (ready > 0) |
||||
{ |
||||
struct io_uring_cqe *cqes[PGAIO_MAX_LOCAL_COMPLETED_IO]; |
||||
uint32 ncqes; |
||||
|
||||
START_CRIT_SECTION(); |
||||
ncqes = |
||||
io_uring_peek_batch_cqe(&context->io_uring_ring, |
||||
cqes, |
||||
Min(PGAIO_MAX_LOCAL_COMPLETED_IO, ready)); |
||||
Assert(ncqes <= ready); |
||||
|
||||
ready -= ncqes; |
||||
|
||||
for (int i = 0; i < ncqes; i++) |
||||
{ |
||||
struct io_uring_cqe *cqe = cqes[i]; |
||||
PgAioHandle *ioh; |
||||
|
||||
ioh = io_uring_cqe_get_data(cqe); |
||||
io_uring_cqe_seen(&context->io_uring_ring, cqe); |
||||
|
||||
pgaio_io_process_completion(ioh, cqe->res); |
||||
} |
||||
|
||||
END_CRIT_SECTION(); |
||||
|
||||
pgaio_debug(DEBUG3, |
||||
"drained %d/%d, now expecting %d", |
||||
ncqes, orig_ready, io_uring_cq_ready(&context->io_uring_ring)); |
||||
} |
||||
} |
||||
|
||||
static void |
||||
pgaio_uring_wait_one(PgAioHandle *ioh, uint64 ref_generation) |
||||
{ |
||||
PgAioHandleState state; |
||||
ProcNumber owner_procno = ioh->owner_procno; |
||||
PgAioUringContext *owner_context = &pgaio_uring_contexts[owner_procno]; |
||||
bool expect_cqe; |
||||
int waited = 0; |
||||
|
||||
/*
|
||||
* XXX: It would be nice to have a smarter locking scheme, nearly all the |
||||
* time the backend owning the ring will consume the completions, making |
||||
* the locking unnecessarily expensive. |
||||
*/ |
||||
LWLockAcquire(&owner_context->completion_lock, LW_EXCLUSIVE); |
||||
|
||||
while (true) |
||||
{ |
||||
pgaio_debug_io(DEBUG3, ioh, |
||||
"wait_one io_gen: %llu, ref_gen: %llu, cycle %d", |
||||
(long long unsigned) ioh->generation, |
||||
(long long unsigned) ref_generation, |
||||
waited); |
||||
|
||||
if (pgaio_io_was_recycled(ioh, ref_generation, &state) || |
||||
state != PGAIO_HS_SUBMITTED) |
||||
{ |
||||
/* the IO was completed by another backend */ |
||||
break; |
||||
} |
||||
else if (io_uring_cq_ready(&owner_context->io_uring_ring)) |
||||
{ |
||||
/* no need to wait in the kernel, io_uring has a completion */ |
||||
expect_cqe = true; |
||||
} |
||||
else |
||||
{ |
||||
int ret; |
||||
struct io_uring_cqe *cqes; |
||||
|
||||
/* need to wait in the kernel */ |
||||
pgstat_report_wait_start(WAIT_EVENT_AIO_IO_URING_EXECUTION); |
||||
ret = io_uring_wait_cqes(&owner_context->io_uring_ring, &cqes, 1, NULL, NULL); |
||||
pgstat_report_wait_end(); |
||||
|
||||
if (ret == -EINTR) |
||||
{ |
||||
continue; |
||||
} |
||||
else if (ret != 0) |
||||
{ |
||||
/* see comment after io_uring_submit() */ |
||||
errno = -ret; |
||||
elog(PANIC, "io_uring wait failed: %m"); |
||||
} |
||||
else |
||||
{ |
||||
Assert(cqes != NULL); |
||||
expect_cqe = true; |
||||
waited++; |
||||
} |
||||
} |
||||
|
||||
if (expect_cqe) |
||||
{ |
||||
pgaio_uring_drain_locked(owner_context); |
||||
} |
||||
} |
||||
|
||||
LWLockRelease(&owner_context->completion_lock); |
||||
|
||||
pgaio_debug(DEBUG3, |
||||
"wait_one with %d sleeps", |
||||
waited); |
||||
} |
||||
|
||||
static void |
||||
pgaio_uring_sq_from_io(PgAioHandle *ioh, struct io_uring_sqe *sqe) |
||||
{ |
||||
struct iovec *iov; |
||||
|
||||
switch (ioh->op) |
||||
{ |
||||
case PGAIO_OP_READV: |
||||
iov = &pgaio_ctl->iovecs[ioh->iovec_off]; |
||||
if (ioh->op_data.read.iov_length == 1) |
||||
{ |
||||
io_uring_prep_read(sqe, |
||||
ioh->op_data.read.fd, |
||||
iov->iov_base, |
||||
iov->iov_len, |
||||
ioh->op_data.read.offset); |
||||
} |
||||
else |
||||
{ |
||||
io_uring_prep_readv(sqe, |
||||
ioh->op_data.read.fd, |
||||
iov, |
||||
ioh->op_data.read.iov_length, |
||||
ioh->op_data.read.offset); |
||||
|
||||
} |
||||
break; |
||||
|
||||
case PGAIO_OP_WRITEV: |
||||
iov = &pgaio_ctl->iovecs[ioh->iovec_off]; |
||||
if (ioh->op_data.write.iov_length == 1) |
||||
{ |
||||
io_uring_prep_write(sqe, |
||||
ioh->op_data.write.fd, |
||||
iov->iov_base, |
||||
iov->iov_len, |
||||
ioh->op_data.write.offset); |
||||
} |
||||
else |
||||
{ |
||||
io_uring_prep_writev(sqe, |
||||
ioh->op_data.write.fd, |
||||
iov, |
||||
ioh->op_data.write.iov_length, |
||||
ioh->op_data.write.offset); |
||||
} |
||||
break; |
||||
|
||||
case PGAIO_OP_INVALID: |
||||
elog(ERROR, "trying to prepare invalid IO operation for execution"); |
||||
} |
||||
|
||||
io_uring_sqe_set_data(sqe, ioh); |
||||
} |
||||
|
||||
#endif /* IOMETHOD_IO_URING_ENABLED */ |
Loading…
Reference in new issue