mirror of https://github.com/postgres/postgres
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
308 lines
9.4 KiB
308 lines
9.4 KiB
/*-------------------------------------------------------------------------
|
|
*
|
|
* aio_callback.c
|
|
* AIO - Functionality related to callbacks that can be registered on IO
|
|
* Handles
|
|
*
|
|
* Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
|
|
* Portions Copyright (c) 1994, Regents of the University of California
|
|
*
|
|
* IDENTIFICATION
|
|
* src/backend/storage/aio/aio_callback.c
|
|
*
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
|
|
#include "postgres.h"
|
|
|
|
#include "miscadmin.h"
|
|
#include "storage/aio.h"
|
|
#include "storage/aio_internal.h"
|
|
|
|
|
|
/* just to have something to put into aio_handle_cbs */
|
|
static const PgAioHandleCallbacks aio_invalid_cb = {0};
|
|
|
|
typedef struct PgAioHandleCallbacksEntry
|
|
{
|
|
const PgAioHandleCallbacks *const cb;
|
|
const char *const name;
|
|
} PgAioHandleCallbacksEntry;
|
|
|
|
/*
|
|
* Callback definition for the callbacks that can be registered on an IO
|
|
* handle. See PgAioHandleCallbackID's definition for an explanation for why
|
|
* callbacks are not identified by a pointer.
|
|
*/
|
|
static const PgAioHandleCallbacksEntry aio_handle_cbs[] = {
|
|
#define CALLBACK_ENTRY(id, callback) [id] = {.cb = &callback, .name = #callback}
|
|
CALLBACK_ENTRY(PGAIO_HCB_INVALID, aio_invalid_cb),
|
|
#undef CALLBACK_ENTRY
|
|
};
|
|
|
|
|
|
|
|
/* --------------------------------------------------------------------------------
|
|
* Public callback related functions operating on IO Handles
|
|
* --------------------------------------------------------------------------------
|
|
*/
|
|
|
|
/*
|
|
* Register callback for the IO handle.
|
|
*
|
|
* Only a limited number (PGAIO_HANDLE_MAX_CALLBACKS) of callbacks can be
|
|
* registered for each IO.
|
|
*
|
|
* Callbacks need to be registered before [indirectly] calling
|
|
* pgaio_io_prep_*(), as the IO may be executed immediately.
|
|
*
|
|
* A callback can be passed a small bit of data, e.g. to indicate whether to
|
|
* zero a buffer if it is invalid.
|
|
*
|
|
*
|
|
* Note that callbacks are executed in critical sections. This is necessary
|
|
* to be able to execute IO in critical sections (consider e.g. WAL
|
|
* logging). To perform AIO we first need to acquire a handle, which, if there
|
|
* are no free handles, requires waiting for IOs to complete and to execute
|
|
* their completion callbacks.
|
|
*
|
|
* Callbacks may be executed in the issuing backend but also in another
|
|
* backend (because that backend is waiting for the IO) or in IO workers (if
|
|
* io_method=worker is used).
|
|
*
|
|
*
|
|
* See PgAioHandleCallbackID's definition for an explanation for why
|
|
* callbacks are not identified by a pointer.
|
|
*/
|
|
void
|
|
pgaio_io_register_callbacks(PgAioHandle *ioh, PgAioHandleCallbackID cb_id,
|
|
uint8 cb_data)
|
|
{
|
|
const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
|
|
|
|
if (cb_id >= lengthof(aio_handle_cbs))
|
|
elog(ERROR, "callback %d is out of range", cb_id);
|
|
if (aio_handle_cbs[cb_id].cb->complete_shared == NULL &&
|
|
aio_handle_cbs[cb_id].cb->complete_local == NULL)
|
|
elog(ERROR, "callback %d does not have a completion callback", cb_id);
|
|
if (ioh->num_callbacks >= PGAIO_HANDLE_MAX_CALLBACKS)
|
|
elog(PANIC, "too many callbacks, the max is %d",
|
|
PGAIO_HANDLE_MAX_CALLBACKS);
|
|
ioh->callbacks[ioh->num_callbacks] = cb_id;
|
|
ioh->callbacks_data[ioh->num_callbacks] = cb_data;
|
|
|
|
pgaio_debug_io(DEBUG3, ioh,
|
|
"adding cb #%d, id %d/%s",
|
|
ioh->num_callbacks + 1,
|
|
cb_id, ce->name);
|
|
|
|
ioh->num_callbacks++;
|
|
}
|
|
|
|
/*
|
|
* Associate an array of data with the Handle. This is e.g. useful to the
|
|
* transport knowledge about which buffers a multi-block IO affects to
|
|
* completion callbacks.
|
|
*
|
|
* Right now this can be done only once for each IO, even though multiple
|
|
* callbacks can be registered. There aren't any known usecases requiring more
|
|
* and the required amount of shared memory does add up, so it doesn't seem
|
|
* worth multiplying memory usage by PGAIO_HANDLE_MAX_CALLBACKS.
|
|
*/
|
|
void
|
|
pgaio_io_set_handle_data_64(PgAioHandle *ioh, uint64 *data, uint8 len)
|
|
{
|
|
Assert(ioh->state == PGAIO_HS_HANDED_OUT);
|
|
Assert(ioh->handle_data_len == 0);
|
|
Assert(len <= PG_IOV_MAX);
|
|
|
|
for (int i = 0; i < len; i++)
|
|
pgaio_ctl->handle_data[ioh->iovec_off + i] = data[i];
|
|
ioh->handle_data_len = len;
|
|
}
|
|
|
|
/*
|
|
* Convenience version of pgaio_io_set_handle_data_64() that converts a 32bit
|
|
* array to a 64bit array. Without it callers would end up needing to
|
|
* open-code equivalent code.
|
|
*/
|
|
void
|
|
pgaio_io_set_handle_data_32(PgAioHandle *ioh, uint32 *data, uint8 len)
|
|
{
|
|
Assert(ioh->state == PGAIO_HS_HANDED_OUT);
|
|
Assert(ioh->handle_data_len == 0);
|
|
Assert(len <= PG_IOV_MAX);
|
|
|
|
for (int i = 0; i < len; i++)
|
|
pgaio_ctl->handle_data[ioh->iovec_off + i] = data[i];
|
|
ioh->handle_data_len = len;
|
|
}
|
|
|
|
/*
|
|
* Return data set with pgaio_io_set_handle_data_*().
|
|
*/
|
|
uint64 *
|
|
pgaio_io_get_handle_data(PgAioHandle *ioh, uint8 *len)
|
|
{
|
|
Assert(ioh->handle_data_len > 0);
|
|
|
|
*len = ioh->handle_data_len;
|
|
|
|
return &pgaio_ctl->handle_data[ioh->iovec_off];
|
|
}
|
|
|
|
|
|
|
|
/* --------------------------------------------------------------------------------
|
|
* Public IO Result related functions
|
|
* --------------------------------------------------------------------------------
|
|
*/
|
|
|
|
void
|
|
pgaio_result_report(PgAioResult result, const PgAioTargetData *target_data, int elevel)
|
|
{
|
|
PgAioHandleCallbackID cb_id = result.id;
|
|
const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
|
|
|
|
Assert(result.status != PGAIO_RS_UNKNOWN);
|
|
Assert(result.status != PGAIO_RS_OK);
|
|
|
|
if (ce->cb->report == NULL)
|
|
elog(ERROR, "callback %d/%s does not have report callback",
|
|
result.id, ce->name);
|
|
|
|
ce->cb->report(result, target_data, elevel);
|
|
}
|
|
|
|
|
|
|
|
/* --------------------------------------------------------------------------------
|
|
* Internal callback related functions operating on IO Handles
|
|
* --------------------------------------------------------------------------------
|
|
*/
|
|
|
|
/*
|
|
* Internal function which invokes ->stage for all the registered callbacks.
|
|
*/
|
|
void
|
|
pgaio_io_call_stage(PgAioHandle *ioh)
|
|
{
|
|
Assert(ioh->target > PGAIO_TID_INVALID && ioh->target < PGAIO_TID_COUNT);
|
|
Assert(ioh->op > PGAIO_OP_INVALID && ioh->op < PGAIO_OP_COUNT);
|
|
|
|
for (int i = ioh->num_callbacks; i > 0; i--)
|
|
{
|
|
PgAioHandleCallbackID cb_id = ioh->callbacks[i - 1];
|
|
uint8 cb_data = ioh->callbacks_data[i - 1];
|
|
const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
|
|
|
|
if (!ce->cb->stage)
|
|
continue;
|
|
|
|
pgaio_debug_io(DEBUG3, ioh,
|
|
"calling cb #%d %d/%s->stage(%u)",
|
|
i, cb_id, ce->name, cb_data);
|
|
ce->cb->stage(ioh, cb_data);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Internal function which invokes ->complete_shared for all the registered
|
|
* callbacks.
|
|
*/
|
|
void
|
|
pgaio_io_call_complete_shared(PgAioHandle *ioh)
|
|
{
|
|
PgAioResult result;
|
|
|
|
START_CRIT_SECTION();
|
|
|
|
Assert(ioh->target > PGAIO_TID_INVALID && ioh->target < PGAIO_TID_COUNT);
|
|
Assert(ioh->op > PGAIO_OP_INVALID && ioh->op < PGAIO_OP_COUNT);
|
|
|
|
result.status = PGAIO_RS_OK; /* low level IO is always considered OK */
|
|
result.result = ioh->result;
|
|
result.id = PGAIO_HCB_INVALID;
|
|
result.error_data = 0;
|
|
|
|
/*
|
|
* Call callbacks with the last registered (innermost) callback first.
|
|
* Each callback can modify the result forwarded to the next callback.
|
|
*/
|
|
for (int i = ioh->num_callbacks; i > 0; i--)
|
|
{
|
|
PgAioHandleCallbackID cb_id = ioh->callbacks[i - 1];
|
|
uint8 cb_data = ioh->callbacks_data[i - 1];
|
|
const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
|
|
|
|
if (!ce->cb->complete_shared)
|
|
continue;
|
|
|
|
pgaio_debug_io(DEBUG4, ioh,
|
|
"calling cb #%d, id %d/%s->complete_shared(%u) with distilled result: (status %s, id %u, error_data %d, result %d)",
|
|
i, cb_id, ce->name,
|
|
cb_data,
|
|
pgaio_result_status_string(result.status),
|
|
result.id, result.error_data, result.result);
|
|
result = ce->cb->complete_shared(ioh, result, cb_data);
|
|
}
|
|
|
|
ioh->distilled_result = result;
|
|
|
|
pgaio_debug_io(DEBUG3, ioh,
|
|
"after shared completion: distilled result: (status %s, id %u, error_data: %d, result %d), raw_result: %d",
|
|
pgaio_result_status_string(result.status),
|
|
result.id, result.error_data, result.result,
|
|
ioh->result);
|
|
|
|
END_CRIT_SECTION();
|
|
}
|
|
|
|
/*
|
|
* Internal function which invokes ->complete_local for all the registered
|
|
* callbacks.
|
|
*
|
|
* XXX: It'd be nice to deduplicate with pgaio_io_call_complete_shared().
|
|
*/
|
|
void
|
|
pgaio_io_call_complete_local(PgAioHandle *ioh)
|
|
{
|
|
PgAioResult result;
|
|
|
|
START_CRIT_SECTION();
|
|
|
|
Assert(ioh->target > PGAIO_TID_INVALID && ioh->target < PGAIO_TID_COUNT);
|
|
Assert(ioh->op > PGAIO_OP_INVALID && ioh->op < PGAIO_OP_COUNT);
|
|
|
|
/* start with distilled result from shared callback */
|
|
result = ioh->distilled_result;
|
|
|
|
for (int i = ioh->num_callbacks; i > 0; i--)
|
|
{
|
|
PgAioHandleCallbackID cb_id = ioh->callbacks[i - 1];
|
|
uint8 cb_data = ioh->callbacks_data[i - 1];
|
|
const PgAioHandleCallbacksEntry *ce = &aio_handle_cbs[cb_id];
|
|
|
|
if (!ce->cb->complete_local)
|
|
continue;
|
|
|
|
pgaio_debug_io(DEBUG4, ioh,
|
|
"calling cb #%d, id %d/%s->complete_local(%u) with distilled result: status %s, id %u, error_data %d, result %d",
|
|
i, cb_id, ce->name, cb_data,
|
|
pgaio_result_status_string(result.status),
|
|
result.id, result.error_data, result.result);
|
|
result = ce->cb->complete_local(ioh, result, cb_data);
|
|
}
|
|
|
|
/*
|
|
* Note that we don't save the result in ioh->distilled_result, the local
|
|
* callback's result should not ever matter to other waiters.
|
|
*/
|
|
pgaio_debug_io(DEBUG3, ioh,
|
|
"after local completion: distilled result: (status %s, id %u, error_data %d, result %d), raw_result: %d",
|
|
pgaio_result_status_string(result.status),
|
|
result.id, result.error_data, result.result,
|
|
ioh->result);
|
|
|
|
END_CRIT_SECTION();
|
|
}
|
|
|