mirror of https://github.com/postgres/postgres
A number of pg_upgrade steps require connecting to every database in the cluster and running the same query in each one. When there are many databases, these steps are particularly time-consuming, especially since they are performed sequentially, i.e., we connect to a database, run the query, and process the results before moving on to the next database. This commit introduces a new framework that makes it easy to parallelize most of these once-in-each-database tasks by processing multiple databases concurrently. This framework manages a set of slots that follow a simple state machine, and it uses libpq's asynchronous APIs to establish the connections and run the queries. The --jobs option is used to determine the number of slots to use. To use this new task framework, callers simply need to provide the query and a callback function to process its results, and the framework takes care of the rest. A more complete description is provided at the top of the new task.c file. None of the eligible once-in-each-database tasks are converted to use this new framework in this commit. That will be done via several follow-up commits. Reviewed-by: Jeff Davis, Robert Haas, Daniel Gustafsson, Ilya Gladyshev, Corey Huinker Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13pull/177/head
parent
d891c49286
commit
40e2e5e92b
@ -0,0 +1,443 @@ |
||||
/*
|
||||
* task.c |
||||
* framework for parallelizing pg_upgrade's once-in-each-database tasks |
||||
* |
||||
* This framework provides an efficient way of running the various |
||||
* once-in-each-database tasks required by pg_upgrade. Specifically, it |
||||
* parallelizes these tasks by managing a set of slots that follow a simple |
||||
* state machine and by using libpq's asynchronous APIs to establish the |
||||
* connections and run the queries. Callers simply need to create a callback |
||||
* function and build/execute an UpgradeTask. A simple example follows: |
||||
* |
||||
* static void |
||||
* my_process_cb(DbInfo *dbinfo, PGresult *res, void *arg) |
||||
* { |
||||
* for (int i = 0; i < PQntuples(res); i++) |
||||
* { |
||||
* ... process results ... |
||||
* } |
||||
* } |
||||
* |
||||
* void |
||||
* my_task(ClusterInfo *cluster) |
||||
* { |
||||
* UpgradeTask *task = upgrade_task_create(); |
||||
* |
||||
* upgrade_task_add_step(task, |
||||
* "... query text ...", |
||||
* my_process_cb, |
||||
* true, // let the task free the PGresult
|
||||
* NULL); // "arg" pointer for callback
|
||||
* upgrade_task_run(task, cluster); |
||||
* upgrade_task_free(task); |
||||
* } |
||||
* |
||||
* Note that multiple steps can be added to a given task. When there are |
||||
* multiple steps, the task will run all of the steps consecutively in the same |
||||
* database connection before freeing the connection and moving on. In other |
||||
* words, it only ever initiates one connection to each database in the |
||||
* cluster for a given run. |
||||
* |
||||
* Copyright (c) 2024, PostgreSQL Global Development Group |
||||
* src/bin/pg_upgrade/task.c |
||||
*/ |
||||
|
||||
#include "postgres_fe.h" |
||||
|
||||
#include "common/connect.h" |
||||
#include "fe_utils/string_utils.h" |
||||
#include "pg_upgrade.h" |
||||
|
||||
/*
|
||||
* dbs_complete stores the number of databases that we have completed |
||||
* processing. When this value equals the number of databases in the cluster, |
||||
* the task is finished. |
||||
*/ |
||||
static int dbs_complete; |
||||
|
||||
/*
|
||||
* dbs_processing stores the index of the next database in the cluster's array |
||||
* of databases that will be picked up for processing. It will always be |
||||
* greater than or equal to dbs_complete. |
||||
*/ |
||||
static int dbs_processing; |
||||
|
||||
/*
|
||||
* This struct stores the information for a single step of a task. Note that |
||||
* the query string is stored in the "queries" PQExpBuffer for the UpgradeTask. |
||||
* All steps in a task are run in a single connection before moving on to the |
||||
* next database (which requires a new connection). |
||||
*/ |
||||
typedef struct UpgradeTaskStep |
||||
{ |
||||
UpgradeTaskProcessCB process_cb; /* processes the results of the query */ |
||||
bool free_result; /* should we free the result? */ |
||||
void *arg; /* pointer passed to process_cb */ |
||||
} UpgradeTaskStep; |
||||
|
||||
/*
|
||||
* This struct is a thin wrapper around an array of steps, i.e., |
||||
* UpgradeTaskStep, plus a PQExpBuffer for all the query strings. |
||||
*/ |
||||
typedef struct UpgradeTask |
||||
{ |
||||
UpgradeTaskStep *steps; |
||||
int num_steps; |
||||
PQExpBuffer queries; |
||||
} UpgradeTask; |
||||
|
||||
/*
|
||||
* The different states for a parallel slot. |
||||
*/ |
||||
typedef enum |
||||
{ |
||||
FREE, /* slot available for use in a new database */ |
||||
CONNECTING, /* waiting for connection to be established */ |
||||
RUNNING_QUERIES, /* running/processing queries in the task */ |
||||
} UpgradeTaskSlotState; |
||||
|
||||
/*
|
||||
* We maintain an array of user_opts.jobs slots to execute the task. |
||||
*/ |
||||
typedef struct |
||||
{ |
||||
UpgradeTaskSlotState state; /* state of the slot */ |
||||
int db_idx; /* index of the database assigned to slot */ |
||||
int step_idx; /* index of the current step of task */ |
||||
PGconn *conn; /* current connection managed by slot */ |
||||
bool ready; /* slot is ready for processing */ |
||||
bool select_mode; /* select() mode: true->read, false->write */ |
||||
int sock; /* file descriptor for connection's socket */ |
||||
} UpgradeTaskSlot; |
||||
|
||||
/*
|
||||
* Initializes an UpgradeTask. |
||||
*/ |
||||
UpgradeTask * |
||||
upgrade_task_create(void) |
||||
{ |
||||
UpgradeTask *task = pg_malloc0(sizeof(UpgradeTask)); |
||||
|
||||
task->queries = createPQExpBuffer(); |
||||
|
||||
/* All tasks must first set a secure search_path. */ |
||||
upgrade_task_add_step(task, ALWAYS_SECURE_SEARCH_PATH_SQL, NULL, true, NULL); |
||||
|
||||
return task; |
||||
} |
||||
|
||||
/*
|
||||
* Frees all storage associated with an UpgradeTask. |
||||
*/ |
||||
void |
||||
upgrade_task_free(UpgradeTask *task) |
||||
{ |
||||
destroyPQExpBuffer(task->queries); |
||||
pg_free(task->steps); |
||||
pg_free(task); |
||||
} |
||||
|
||||
/*
|
||||
* Adds a step to an UpgradeTask. The steps will be executed in each database |
||||
* in the order in which they are added. |
||||
* |
||||
* task: task object that must have been initialized via upgrade_task_create() |
||||
* query: the query text |
||||
* process_cb: function that processes the results of the query |
||||
* free_result: should we free the PGresult, or leave it to the caller? |
||||
* arg: pointer to task-specific data that is passed to each callback |
||||
*/ |
||||
void |
||||
upgrade_task_add_step(UpgradeTask *task, const char *query, |
||||
UpgradeTaskProcessCB process_cb, bool free_result, |
||||
void *arg) |
||||
{ |
||||
UpgradeTaskStep *new_step; |
||||
|
||||
task->steps = pg_realloc(task->steps, |
||||
++task->num_steps * sizeof(UpgradeTaskStep)); |
||||
|
||||
new_step = &task->steps[task->num_steps - 1]; |
||||
new_step->process_cb = process_cb; |
||||
new_step->free_result = free_result; |
||||
new_step->arg = arg; |
||||
|
||||
appendPQExpBuffer(task->queries, "%s;", query); |
||||
} |
||||
|
||||
/*
|
||||
* Build a connection string for the slot's current database and asynchronously |
||||
* start a new connection, but do not wait for the connection to be |
||||
* established. |
||||
*/ |
||||
static void |
||||
start_conn(const ClusterInfo *cluster, UpgradeTaskSlot *slot) |
||||
{ |
||||
PQExpBufferData conn_opts; |
||||
DbInfo *dbinfo = &cluster->dbarr.dbs[slot->db_idx]; |
||||
|
||||
/* Build connection string with proper quoting */ |
||||
initPQExpBuffer(&conn_opts); |
||||
appendPQExpBufferStr(&conn_opts, "dbname="); |
||||
appendConnStrVal(&conn_opts, dbinfo->db_name); |
||||
appendPQExpBufferStr(&conn_opts, " user="); |
||||
appendConnStrVal(&conn_opts, os_info.user); |
||||
appendPQExpBuffer(&conn_opts, " port=%d", cluster->port); |
||||
if (cluster->sockdir) |
||||
{ |
||||
appendPQExpBufferStr(&conn_opts, " host="); |
||||
appendConnStrVal(&conn_opts, cluster->sockdir); |
||||
} |
||||
|
||||
slot->conn = PQconnectStart(conn_opts.data); |
||||
|
||||
if (!slot->conn) |
||||
pg_fatal("failed to create connection with connection string: \"%s\"", |
||||
conn_opts.data); |
||||
|
||||
termPQExpBuffer(&conn_opts); |
||||
} |
||||
|
||||
/*
|
||||
* Run the process_cb callback function to process the result of a query, and |
||||
* free the result if the caller indicated we should do so. |
||||
*/ |
||||
static void |
||||
process_query_result(const ClusterInfo *cluster, UpgradeTaskSlot *slot, |
||||
const UpgradeTask *task) |
||||
{ |
||||
UpgradeTaskStep *steps = &task->steps[slot->step_idx]; |
||||
UpgradeTaskProcessCB process_cb = steps->process_cb; |
||||
DbInfo *dbinfo = &cluster->dbarr.dbs[slot->db_idx]; |
||||
PGresult *res = PQgetResult(slot->conn); |
||||
|
||||
if (PQstatus(slot->conn) == CONNECTION_BAD || |
||||
(PQresultStatus(res) != PGRES_TUPLES_OK && |
||||
PQresultStatus(res) != PGRES_COMMAND_OK)) |
||||
pg_fatal("connection failure: %s", PQerrorMessage(slot->conn)); |
||||
|
||||
/*
|
||||
* We assume that a NULL process_cb callback function means there's |
||||
* nothing to process. This is primarily intended for the initial step in |
||||
* every task that sets a safe search_path. |
||||
*/ |
||||
if (process_cb) |
||||
(*process_cb) (dbinfo, res, steps->arg); |
||||
|
||||
if (steps->free_result) |
||||
PQclear(res); |
||||
} |
||||
|
||||
/*
|
||||
* Advances the state machine for a given slot as necessary. |
||||
*/ |
||||
static void |
||||
process_slot(const ClusterInfo *cluster, UpgradeTaskSlot *slot, const UpgradeTask *task) |
||||
{ |
||||
PostgresPollingStatusType status; |
||||
|
||||
if (!slot->ready) |
||||
return; |
||||
|
||||
switch (slot->state) |
||||
{ |
||||
case FREE: |
||||
|
||||
/*
|
||||
* If all of the databases in the cluster have been processed or |
||||
* are currently being processed by other slots, we are done. |
||||
*/ |
||||
if (dbs_processing >= cluster->dbarr.ndbs) |
||||
return; |
||||
|
||||
/*
|
||||
* Claim the next database in the cluster's array and initiate a |
||||
* new connection. |
||||
*/ |
||||
slot->db_idx = dbs_processing++; |
||||
slot->state = CONNECTING; |
||||
start_conn(cluster, slot); |
||||
|
||||
return; |
||||
|
||||
case CONNECTING: |
||||
|
||||
/* Check for connection failure. */ |
||||
status = PQconnectPoll(slot->conn); |
||||
if (status == PGRES_POLLING_FAILED) |
||||
pg_fatal("connection failure: %s", PQerrorMessage(slot->conn)); |
||||
|
||||
/* Check whether the connection is still establishing. */ |
||||
if (status != PGRES_POLLING_OK) |
||||
{ |
||||
slot->select_mode = (status == PGRES_POLLING_READING); |
||||
return; |
||||
} |
||||
|
||||
/*
|
||||
* Move on to running/processing the queries in the task. |
||||
*/ |
||||
slot->state = RUNNING_QUERIES; |
||||
slot->select_mode = true; /* wait until ready for reading */ |
||||
if (!PQsendQuery(slot->conn, task->queries->data)) |
||||
pg_fatal("connection failure: %s", PQerrorMessage(slot->conn)); |
||||
|
||||
return; |
||||
|
||||
case RUNNING_QUERIES: |
||||
|
||||
/*
|
||||
* Consume any available data and clear the read-ready indicator |
||||
* for the connection. |
||||
*/ |
||||
if (!PQconsumeInput(slot->conn)) |
||||
pg_fatal("connection failure: %s", PQerrorMessage(slot->conn)); |
||||
|
||||
/*
|
||||
* Process any results that are ready so that we can free up this |
||||
* slot for another database as soon as possible. |
||||
*/ |
||||
for (; slot->step_idx < task->num_steps; slot->step_idx++) |
||||
{ |
||||
/* If no more results are available yet, move on. */ |
||||
if (PQisBusy(slot->conn)) |
||||
return; |
||||
|
||||
process_query_result(cluster, slot, task); |
||||
} |
||||
|
||||
/*
|
||||
* If we just finished processing the result of the last step in |
||||
* the task, free the slot. We recursively call this function on |
||||
* the newly-freed slot so that we can start initiating the next |
||||
* connection immediately instead of waiting for the next loop |
||||
* through the slots. |
||||
*/ |
||||
dbs_complete++; |
||||
PQfinish(slot->conn); |
||||
memset(slot, 0, sizeof(UpgradeTaskSlot)); |
||||
slot->ready = true; |
||||
|
||||
process_slot(cluster, slot, task); |
||||
|
||||
return; |
||||
} |
||||
} |
||||
|
||||
/*
|
||||
* Returns -1 on error, else the number of ready descriptors. |
||||
*/ |
||||
static int |
||||
select_loop(int maxFd, fd_set *input, fd_set *output) |
||||
{ |
||||
fd_set save_input = *input; |
||||
fd_set save_output = *output; |
||||
|
||||
if (maxFd == 0) |
||||
return 0; |
||||
|
||||
for (;;) |
||||
{ |
||||
int i; |
||||
|
||||
*input = save_input; |
||||
*output = save_output; |
||||
|
||||
i = select(maxFd + 1, input, output, NULL, NULL); |
||||
|
||||
#ifndef WIN32 |
||||
if (i < 0 && errno == EINTR) |
||||
continue; |
||||
#else |
||||
if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR) |
||||
continue; |
||||
#endif |
||||
return i; |
||||
} |
||||
} |
||||
|
||||
/*
|
||||
* Wait on the slots to either finish connecting or to receive query results if |
||||
* possible. This avoids a tight loop in upgrade_task_run(). |
||||
*/ |
||||
static void |
||||
wait_on_slots(UpgradeTaskSlot *slots, int numslots) |
||||
{ |
||||
fd_set input; |
||||
fd_set output; |
||||
int maxFd = 0; |
||||
|
||||
FD_ZERO(&input); |
||||
FD_ZERO(&output); |
||||
|
||||
for (int i = 0; i < numslots; i++) |
||||
{ |
||||
/*
|
||||
* We assume the previous call to process_slot() handled everything |
||||
* that was marked ready in the previous call to wait_on_slots(), if |
||||
* any. |
||||
*/ |
||||
slots[i].ready = false; |
||||
|
||||
/*
|
||||
* This function should only ever see free slots as we are finishing |
||||
* processing the last few databases, at which point we don't have any |
||||
* databases left for them to process. We'll never use these slots |
||||
* again, so we can safely ignore them. |
||||
*/ |
||||
if (slots[i].state == FREE) |
||||
continue; |
||||
|
||||
/*
|
||||
* Add the socket to the set. |
||||
*/ |
||||
slots[i].sock = PQsocket(slots[i].conn); |
||||
if (slots[i].sock < 0) |
||||
pg_fatal("invalid socket"); |
||||
FD_SET(slots[i].sock, slots[i].select_mode ? &input : &output); |
||||
maxFd = Max(maxFd, slots[i].sock); |
||||
} |
||||
|
||||
/*
|
||||
* If we found socket(s) to wait on, wait. |
||||
*/ |
||||
if (select_loop(maxFd, &input, &output) == -1) |
||||
pg_fatal("select() failed: %m"); |
||||
|
||||
/*
|
||||
* Mark which sockets appear to be ready. |
||||
*/ |
||||
for (int i = 0; i < numslots; i++) |
||||
slots[i].ready |= (FD_ISSET(slots[i].sock, &input) || |
||||
FD_ISSET(slots[i].sock, &output)); |
||||
} |
||||
|
||||
/*
|
||||
* Runs all the steps of the task in every database in the cluster using |
||||
* user_opts.jobs parallel slots. |
||||
*/ |
||||
void |
||||
upgrade_task_run(const UpgradeTask *task, const ClusterInfo *cluster) |
||||
{ |
||||
int jobs = Max(1, user_opts.jobs); |
||||
UpgradeTaskSlot *slots = pg_malloc0(sizeof(UpgradeTaskSlot) * jobs); |
||||
|
||||
dbs_complete = 0; |
||||
dbs_processing = 0; |
||||
|
||||
/*
|
||||
* Process every slot the first time round. |
||||
*/ |
||||
for (int i = 0; i < jobs; i++) |
||||
slots[i].ready = true; |
||||
|
||||
while (dbs_complete < cluster->dbarr.ndbs) |
||||
{ |
||||
for (int i = 0; i < jobs; i++) |
||||
process_slot(cluster, &slots[i], task); |
||||
|
||||
wait_on_slots(slots, jobs); |
||||
} |
||||
|
||||
pg_free(slots); |
||||
} |
||||
Loading…
Reference in new issue