diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 125f3c7bbbe..c221a25f699 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -35,6 +35,7 @@ #include #include +#include #include #include #include @@ -68,6 +69,8 @@ #include "pgbench.h" #include "port/pg_bitutils.h" #include "portability/instr_time.h" +#include +#include /* X/Open (XSI) requires to provide M_PI, but core POSIX does not */ #ifndef M_PI @@ -817,7 +820,6 @@ static const BuiltinScript builtin_script[] = } }; - /* Function prototypes */ static void setNullValue(PgBenchValue *pv); static void setBoolValue(PgBenchValue *pv, bool bval); @@ -848,6 +850,18 @@ typedef void (*initRowMethod) (PQExpBufferData *sql, int64 curr); static const PsqlScanCallbacks pgbench_callbacks = { NULL, /* don't need get_variable functionality */ }; +/* Structure to hold worker thread data */ +typedef struct WorkerData +{ + PGconn *con; + const char *table; + int64 start_row; + int64 end_row; + initRowMethod init_row; + int worker_id; + volatile bool *has_error; + char error_msg[256]; +} WorkerData; static char get_table_relkind(PGconn *con, const char *table) @@ -1618,6 +1632,234 @@ doConnect(void) return conn; } +/* + * Truncate specified table(s) + * tableName can be a single table or comma-separated list of tables + */ +static void +truncateTable(PGconn *con, const char *tableName) +{ + char *query; + size_t queryLen; + + if (tableName == NULL || strlen(tableName) == 0) + { + fprintf(stderr, "Error: table name cannot be null or empty\n"); + return; + } + + queryLen = strlen("TRUNCATE TABLE ") + strlen(tableName) + 1; + query = pg_malloc(queryLen); + + snprintf(query, queryLen, "TRUNCATE TABLE %s", tableName); + executeStatement(con, query); + free(query); +} + +static void * +initWorkerThread(void *arg) +{ + + WorkerData *data = (WorkerData *) arg; + PGresult *res = NULL; + PQExpBufferData sql; + char copy_statement[256]; + int64 row; + bool copy_started = false; + PGconn *conn; + + fprintf(stderr, "[Worker %d] started \n", data->worker_id); + + if(data->worker_id == 0) { + /* + We do not issue the "begin" statement here, we started it already such that the worker with index 1 can use copy (freeze on) + which requires to be run in the same transaction as table create or truncate. + */ + conn = data->con; + } else { + conn = doConnect(); + executeStatement(conn, "begin"); + } + + if (PQstatus(conn) != CONNECTION_OK) + { + snprintf(data->error_msg, sizeof(data->error_msg), + "Worker %d: connection failed: %s", + data->worker_id, PQerrorMessage(conn)); + *data->has_error = true; + return NULL; + } + + initPQExpBuffer(&sql); + + if (data -> worker_id == 0) { + snprintf(copy_statement, sizeof(copy_statement), + "COPY %s FROM STDIN (FREEZE ON)", data->table); + } else { + snprintf(copy_statement, sizeof(copy_statement), + "COPY %s FROM STDIN", data->table); + } + + res = PQexec(conn, copy_statement); + + if (PQresultStatus(res) != PGRES_COPY_IN) + { + snprintf(data->error_msg, sizeof(data->error_msg), + "Worker %d: COPY failed: %s", + data->worker_id, PQerrorMessage(data->con)); + fprintf(stderr, "[Worker %d] COPY failed: %s\n", data->worker_id, PQerrorMessage(data->con)); + + *data->has_error = true; + PQclear(res); + termPQExpBuffer(&sql); + return NULL; + } + + PQclear(res); + copy_started = true; + + for (row = data->start_row; row < data->end_row; row++) + { + if (*data->has_error) + { + fprintf(stderr, "[Worker %d] Aborting row loop due to global error flag\n", data->worker_id); + break; + } + + data->init_row(&sql, row); + + + if (PQputline(conn, sql.data) != 0) + { + snprintf(data->error_msg, sizeof(data->error_msg), + "Worker %d: PQputline failed: %s", + data->worker_id, PQerrorMessage(data->con)); + fprintf(stderr, "[Worker %d] PQputline failed: %s\n", data->worker_id, PQerrorMessage(data->con)); + + *data->has_error = true; + + break; + } + + if (PQflush(conn) != 0) + { + snprintf(data->error_msg, sizeof(data->error_msg), + "Worker %d: PQflush failed: %s", + data->worker_id, PQerrorMessage(data->con)); + fprintf(stderr, "[Worker %d] PQflush failed: %s\n", data->worker_id, PQerrorMessage(data->con)); + + *data->has_error = true; + break; + } + } + + if (copy_started) + { + + if (PQputline(conn, "\\.\n") != 0) + { + fprintf(stderr, "[Worker %d] Failed to send end marker\n", data->worker_id); + } + + if (PQflush(conn) != 0) + { + fprintf(stderr, "[Worker %d] Failed to flush end marker\n", data->worker_id); + } + + if (PQendcopy(conn) != 0) + { + snprintf(data->error_msg, sizeof(data->error_msg), + "Worker %d: PQendcopy failed: %s", + data->worker_id, PQerrorMessage(data->con)); + fprintf(stderr, "[Worker %d] PQendcopy failed: %s\n", data->worker_id, PQerrorMessage(data->con)); + + *data->has_error = true; + } + else + { + fprintf(stderr, "[Worker %d] COPY completed successfully\n", data->worker_id); + executeStatement(conn, "commit"); + } + } + + termPQExpBuffer(&sql); + fprintf(stderr, "[Worker %d] Thread exiting\n", data->worker_id); + return NULL; +} + +static void +initPopulateTableParallel(PGconn *connection, int num_workers, + const char *table, int64 total_rows, + initRowMethod init_row) +{ + THREAD_T *worker_threads; + WorkerData *worker_data; + volatile bool has_error = false; + int64 rows_per_worker; + int i; + + /* Allocate worker data and threads */ + worker_threads = malloc(num_workers * sizeof(pthread_t)); + worker_data = malloc(num_workers * sizeof(WorkerData)); + + /* Calculate work distribution */ + rows_per_worker = total_rows / num_workers; + + /* Create and start worker threads */ + for (i = 0; i < num_workers; i++) + { + worker_data[i].con = connection; + worker_data[i].table = table; + worker_data[i].init_row = init_row; + worker_data[i].worker_id = i; + worker_data[i].has_error = &has_error; + worker_data[i].error_msg[0] = '\0'; + + /* Distribute rows among workers */ + worker_data[i].start_row = i * rows_per_worker; + worker_data[i].end_row = (i + 1) * rows_per_worker; + + /* Last worker gets any remaining rows */ + if (i == num_workers - 1) + worker_data[i].end_row = total_rows; + + if (THREAD_CREATE(&worker_threads[i], initWorkerThread, &worker_data[i]) != 0) + pg_fatal("Failed to create worker thread %d", i); + } + + /* Wait for all worker threads to complete */ + for (i = 0; i < num_workers; i++) + { + THREAD_JOIN(worker_threads[i]); + + /* Check for errors */ + if (worker_data[i].error_msg[0] != '\0') + { + fprintf(stderr, "Error: %s\n", worker_data[i].error_msg); + has_error = true; + } + } + + /* Clean up */ + free(worker_threads); + free(worker_data); + + if (has_error) + pg_fatal("One or more worker threads encountered errors"); +} + +static void +initPopulateTable(PGconn *con, const char *table, int64 total_rows, + initRowMethod init_row) +{ + + executeStatement(con, "begin"); + printf("Truncating table %s...\n", table); + truncateTable(con, table); + + initPopulateTableParallel(con, nthreads, table, total_rows * scale, init_row); +} + /* qsort comparator for Variable array */ static int compareVariableNames(const void *v1, const void *v2) @@ -3496,7 +3738,6 @@ static int discardUntilSync(CState *st) { bool received_sync = false; - /* send a sync */ if (!PQpipelineSync(st->con)) { @@ -4934,16 +5175,12 @@ initCreateTables(PGconn *con) } /* - * Truncate away any old data, in one command in case there are foreign keys + * Truncate away any old data for pgbench tables, in one command in case there are foreign keys */ static void initTruncateTables(PGconn *con) { - executeStatement(con, "truncate table " - "pgbench_accounts, " - "pgbench_branches, " - "pgbench_history, " - "pgbench_tellers"); + truncateTable(con, "pgbench_accounts, pgbench_branches, pgbench_history, pgbench_tellers"); } static void @@ -4973,124 +5210,124 @@ initAccount(PQExpBufferData *sql, int64 curr) curr + 1, curr / naccounts + 1); } -static void -initPopulateTable(PGconn *con, const char *table, int64 base, - initRowMethod init_row) -{ - int n; - int64 k; - int chars = 0; - int prev_chars = 0; - PGresult *res; - PQExpBufferData sql; - char copy_statement[256]; - const char *copy_statement_fmt = "copy %s from stdin"; - int64 total = base * scale; - - /* used to track elapsed time and estimate of the remaining time */ - pg_time_usec_t start; - int log_interval = 1; - - /* Stay on the same line if reporting to a terminal */ - char eol = isatty(fileno(stderr)) ? '\r' : '\n'; - - initPQExpBuffer(&sql); - - /* Use COPY with FREEZE on v14 and later for all ordinary tables */ - if ((PQserverVersion(con) >= 140000) && - get_table_relkind(con, table) == RELKIND_RELATION) - copy_statement_fmt = "copy %s from stdin with (freeze on)"; - - - n = pg_snprintf(copy_statement, sizeof(copy_statement), copy_statement_fmt, table); - if (n >= sizeof(copy_statement)) - pg_fatal("invalid buffer size: must be at least %d characters long", n); - else if (n == -1) - pg_fatal("invalid format string"); - - res = PQexec(con, copy_statement); - - if (PQresultStatus(res) != PGRES_COPY_IN) - pg_fatal("unexpected copy in result: %s", PQerrorMessage(con)); - PQclear(res); - - start = pg_time_now(); - - for (k = 0; k < total; k++) - { - int64 j = k + 1; - - init_row(&sql, k); - if (PQputline(con, sql.data)) - pg_fatal("PQputline failed"); - - if (CancelRequested) - break; - - /* - * If we want to stick with the original logging, print a message each - * 100k inserted rows. - */ - if ((!use_quiet) && (j % 100000 == 0)) - { - double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start); - double remaining_sec = ((double) total - j) * elapsed_sec / j; - - chars = fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) of %s done (elapsed %.2f s, remaining %.2f s)", - j, total, - (int) ((j * 100) / total), - table, elapsed_sec, remaining_sec); - - /* - * If the previous progress message is longer than the current - * one, add spaces to the current line to fully overwrite any - * remaining characters from the previous message. - */ - if (prev_chars > chars) - fprintf(stderr, "%*c", prev_chars - chars, ' '); - fputc(eol, stderr); - prev_chars = chars; - } - /* let's not call the timing for each row, but only each 100 rows */ - else if (use_quiet && (j % 100 == 0)) - { - double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start); - double remaining_sec = ((double) total - j) * elapsed_sec / j; - - /* have we reached the next interval (or end)? */ - if ((j == total) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS)) - { - chars = fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) of %s done (elapsed %.2f s, remaining %.2f s)", - j, total, - (int) ((j * 100) / total), - table, elapsed_sec, remaining_sec); - - /* - * If the previous progress message is longer than the current - * one, add spaces to the current line to fully overwrite any - * remaining characters from the previous message. - */ - if (prev_chars > chars) - fprintf(stderr, "%*c", prev_chars - chars, ' '); - fputc(eol, stderr); - prev_chars = chars; - - /* skip to the next interval */ - log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS); - } - } - } - - if (chars != 0 && eol != '\n') - fprintf(stderr, "%*c\r", chars, ' '); /* Clear the current line */ - - if (PQputline(con, "\\.\n")) - pg_fatal("very last PQputline failed"); - if (PQendcopy(con)) - pg_fatal("PQendcopy failed"); - - termPQExpBuffer(&sql); -} +// static void +// initPopulateTable(PGconn *con, const char *table, int64 base, +// initRowMethod init_row) +// { +// int n; +// int64 k; +// int chars = 0; +// int prev_chars = 0; +// PGresult *res; +// PQExpBufferData sql; +// char copy_statement[256]; +// const char *copy_statement_fmt = "copy %s from stdin"; +// int64 total = base * scale; + +// /* used to track elapsed time and estimate of the remaining time */ +// pg_time_usec_t start; +// int log_interval = 1; + +// /* Stay on the same line if reporting to a terminal */ +// char eol = isatty(fileno(stderr)) ? '\r' : '\n'; + +// initPQExpBuffer(&sql); + +// /* Use COPY with FREEZE on v14 and later for all ordinary tables */ +// if ((PQserverVersion(con) >= 140000) && +// get_table_relkind(con, table) == RELKIND_RELATION) +// copy_statement_fmt = "copy %s from stdin with (freeze on)"; + + +// n = pg_snprintf(copy_statement, sizeof(copy_statement), copy_statement_fmt, table); +// if (n >= sizeof(copy_statement)) +// pg_fatal("invalid buffer size: must be at least %d characters long", n); +// else if (n == -1) +// pg_fatal("invalid format string"); + +// res = PQexec(con, copy_statement); + +// if (PQresultStatus(res) != PGRES_COPY_IN) +// pg_fatal("unexpected copy in result: %s", PQerrorMessage(con)); +// PQclear(res); + +// start = pg_time_now(); + +// for (k = 0; k < total; k++) +// { +// int64 j = k + 1; + +// init_row(&sql, k); +// if (PQputline(con, sql.data)) +// pg_fatal("PQputline failed"); + +// if (CancelRequested) +// break; + +// /* +// * If we want to stick with the original logging, print a message each +// * 100k inserted rows. +// */ +// if ((!use_quiet) && (j % 100000 == 0)) +// { +// double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start); +// double remaining_sec = ((double) total - j) * elapsed_sec / j; + +// chars = fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) of %s done (elapsed %.2f s, remaining %.2f s)", +// j, total, +// (int) ((j * 100) / total), +// table, elapsed_sec, remaining_sec); + +// /* +// * If the previous progress message is longer than the current +// * one, add spaces to the current line to fully overwrite any +// * remaining characters from the previous message. +// */ +// if (prev_chars > chars) +// fprintf(stderr, "%*c", prev_chars - chars, ' '); +// fputc(eol, stderr); +// prev_chars = chars; +// } +// /* let's not call the timing for each row, but only each 100 rows */ +// else if (use_quiet && (j % 100 == 0)) +// { +// double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start); +// double remaining_sec = ((double) total - j) * elapsed_sec / j; + +// /* have we reached the next interval (or end)? */ +// if ((j == total) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS)) +// { +// chars = fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) of %s done (elapsed %.2f s, remaining %.2f s)", +// j, total, +// (int) ((j * 100) / total), +// table, elapsed_sec, remaining_sec); + +// /* +// * If the previous progress message is longer than the current +// * one, add spaces to the current line to fully overwrite any +// * remaining characters from the previous message. +// */ +// if (prev_chars > chars) +// fprintf(stderr, "%*c", prev_chars - chars, ' '); +// fputc(eol, stderr); +// prev_chars = chars; + +// /* skip to the next interval */ +// log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS); +// } +// } +// } + +// if (chars != 0 && eol != '\n') +// fprintf(stderr, "%*c\r", chars, ' '); /* Clear the current line */ + +// if (PQputline(con, "\\.\n")) +// pg_fatal("very last PQputline failed"); +// if (PQendcopy(con)) +// pg_fatal("PQendcopy failed"); + +// termPQExpBuffer(&sql); +// } /* * Fill the standard tables with some data generated and sent from the client. @@ -5107,10 +5344,10 @@ initGenerateDataClientSide(PGconn *con) * we do all of this in one transaction to enable the backend's * data-loading optimizations */ - executeStatement(con, "begin"); + // executeStatement(con, "begin"); /* truncate away any old data */ - initTruncateTables(con); + // initTruncateTables(con); /* * fill branches, tellers, accounts in that order in case foreign keys @@ -5120,7 +5357,7 @@ initGenerateDataClientSide(PGconn *con) initPopulateTable(con, "pgbench_tellers", ntellers, initTeller); initPopulateTable(con, "pgbench_accounts", naccounts, initAccount); - executeStatement(con, "commit"); + // executeStatement(con, "commit"); } /* @@ -6889,6 +7126,7 @@ main(int argc, char **argv) { exit(1); } + break; case 'l': benchmarking_option_set = true; @@ -7111,7 +7349,7 @@ main(int argc, char **argv) * optimization; throttle_delay is calculated incorrectly below if some * threads have no clients assigned to them.) */ - if (nthreads > nclients) + if (nthreads > nclients && !is_init_mode) nthreads = nclients; /* @@ -7146,8 +7384,8 @@ main(int argc, char **argv) if (is_init_mode) { - if (benchmarking_option_set) - pg_fatal("some of the specified options cannot be used in initialization (-i) mode"); + // if (benchmarking_option_set) + // pg_fatal("some of the specified options cannot be used in initialization (-i) mode"); if (partitions == 0 && partition_method != PART_NONE) pg_fatal("--partition-method requires greater than zero --partitions");