pull/239/head
Mircea Cadariu 2 weeks ago
parent 1f2e51e3c7
commit 71ad29a431
  1. 502
      src/bin/pgbench/pgbench.c

@ -35,6 +35,7 @@
#include <ctype.h>
#include <float.h>
#include <unistd.h>
#include <limits.h>
#include <math.h>
#include <signal.h>
@ -68,6 +69,8 @@
#include "pgbench.h"
#include "port/pg_bitutils.h"
#include "portability/instr_time.h"
#include <pthread.h>
#include <semaphore.h>
/* X/Open (XSI) requires <math.h> to provide M_PI, but core POSIX does not */
#ifndef M_PI
@ -817,7 +820,6 @@ static const BuiltinScript builtin_script[] =
}
};
/* Function prototypes */
static void setNullValue(PgBenchValue *pv);
static void setBoolValue(PgBenchValue *pv, bool bval);
@ -848,6 +850,18 @@ typedef void (*initRowMethod) (PQExpBufferData *sql, int64 curr);
static const PsqlScanCallbacks pgbench_callbacks = {
NULL, /* don't need get_variable functionality */
};
/* Structure to hold worker thread data */
typedef struct WorkerData
{
PGconn *con;
const char *table;
int64 start_row;
int64 end_row;
initRowMethod init_row;
int worker_id;
volatile bool *has_error;
char error_msg[256];
} WorkerData;
static char
get_table_relkind(PGconn *con, const char *table)
@ -1618,6 +1632,234 @@ doConnect(void)
return conn;
}
/*
* Truncate specified table(s)
* tableName can be a single table or comma-separated list of tables
*/
static void
truncateTable(PGconn *con, const char *tableName)
{
char *query;
size_t queryLen;
if (tableName == NULL || strlen(tableName) == 0)
{
fprintf(stderr, "Error: table name cannot be null or empty\n");
return;
}
queryLen = strlen("TRUNCATE TABLE ") + strlen(tableName) + 1;
query = pg_malloc(queryLen);
snprintf(query, queryLen, "TRUNCATE TABLE %s", tableName);
executeStatement(con, query);
free(query);
}
static void *
initWorkerThread(void *arg)
{
WorkerData *data = (WorkerData *) arg;
PGresult *res = NULL;
PQExpBufferData sql;
char copy_statement[256];
int64 row;
bool copy_started = false;
PGconn *conn;
fprintf(stderr, "[Worker %d] started \n", data->worker_id);
if(data->worker_id == 0) {
/*
We do not issue the "begin" statement here, we started it already such that the worker with index 1 can use copy (freeze on)
which requires to be run in the same transaction as table create or truncate.
*/
conn = data->con;
} else {
conn = doConnect();
executeStatement(conn, "begin");
}
if (PQstatus(conn) != CONNECTION_OK)
{
snprintf(data->error_msg, sizeof(data->error_msg),
"Worker %d: connection failed: %s",
data->worker_id, PQerrorMessage(conn));
*data->has_error = true;
return NULL;
}
initPQExpBuffer(&sql);
if (data -> worker_id == 0) {
snprintf(copy_statement, sizeof(copy_statement),
"COPY %s FROM STDIN (FREEZE ON)", data->table);
} else {
snprintf(copy_statement, sizeof(copy_statement),
"COPY %s FROM STDIN", data->table);
}
res = PQexec(conn, copy_statement);
if (PQresultStatus(res) != PGRES_COPY_IN)
{
snprintf(data->error_msg, sizeof(data->error_msg),
"Worker %d: COPY failed: %s",
data->worker_id, PQerrorMessage(data->con));
fprintf(stderr, "[Worker %d] COPY failed: %s\n", data->worker_id, PQerrorMessage(data->con));
*data->has_error = true;
PQclear(res);
termPQExpBuffer(&sql);
return NULL;
}
PQclear(res);
copy_started = true;
for (row = data->start_row; row < data->end_row; row++)
{
if (*data->has_error)
{
fprintf(stderr, "[Worker %d] Aborting row loop due to global error flag\n", data->worker_id);
break;
}
data->init_row(&sql, row);
if (PQputline(conn, sql.data) != 0)
{
snprintf(data->error_msg, sizeof(data->error_msg),
"Worker %d: PQputline failed: %s",
data->worker_id, PQerrorMessage(data->con));
fprintf(stderr, "[Worker %d] PQputline failed: %s\n", data->worker_id, PQerrorMessage(data->con));
*data->has_error = true;
break;
}
if (PQflush(conn) != 0)
{
snprintf(data->error_msg, sizeof(data->error_msg),
"Worker %d: PQflush failed: %s",
data->worker_id, PQerrorMessage(data->con));
fprintf(stderr, "[Worker %d] PQflush failed: %s\n", data->worker_id, PQerrorMessage(data->con));
*data->has_error = true;
break;
}
}
if (copy_started)
{
if (PQputline(conn, "\\.\n") != 0)
{
fprintf(stderr, "[Worker %d] Failed to send end marker\n", data->worker_id);
}
if (PQflush(conn) != 0)
{
fprintf(stderr, "[Worker %d] Failed to flush end marker\n", data->worker_id);
}
if (PQendcopy(conn) != 0)
{
snprintf(data->error_msg, sizeof(data->error_msg),
"Worker %d: PQendcopy failed: %s",
data->worker_id, PQerrorMessage(data->con));
fprintf(stderr, "[Worker %d] PQendcopy failed: %s\n", data->worker_id, PQerrorMessage(data->con));
*data->has_error = true;
}
else
{
fprintf(stderr, "[Worker %d] COPY completed successfully\n", data->worker_id);
executeStatement(conn, "commit");
}
}
termPQExpBuffer(&sql);
fprintf(stderr, "[Worker %d] Thread exiting\n", data->worker_id);
return NULL;
}
static void
initPopulateTableParallel(PGconn *connection, int num_workers,
const char *table, int64 total_rows,
initRowMethod init_row)
{
THREAD_T *worker_threads;
WorkerData *worker_data;
volatile bool has_error = false;
int64 rows_per_worker;
int i;
/* Allocate worker data and threads */
worker_threads = malloc(num_workers * sizeof(pthread_t));
worker_data = malloc(num_workers * sizeof(WorkerData));
/* Calculate work distribution */
rows_per_worker = total_rows / num_workers;
/* Create and start worker threads */
for (i = 0; i < num_workers; i++)
{
worker_data[i].con = connection;
worker_data[i].table = table;
worker_data[i].init_row = init_row;
worker_data[i].worker_id = i;
worker_data[i].has_error = &has_error;
worker_data[i].error_msg[0] = '\0';
/* Distribute rows among workers */
worker_data[i].start_row = i * rows_per_worker;
worker_data[i].end_row = (i + 1) * rows_per_worker;
/* Last worker gets any remaining rows */
if (i == num_workers - 1)
worker_data[i].end_row = total_rows;
if (THREAD_CREATE(&worker_threads[i], initWorkerThread, &worker_data[i]) != 0)
pg_fatal("Failed to create worker thread %d", i);
}
/* Wait for all worker threads to complete */
for (i = 0; i < num_workers; i++)
{
THREAD_JOIN(worker_threads[i]);
/* Check for errors */
if (worker_data[i].error_msg[0] != '\0')
{
fprintf(stderr, "Error: %s\n", worker_data[i].error_msg);
has_error = true;
}
}
/* Clean up */
free(worker_threads);
free(worker_data);
if (has_error)
pg_fatal("One or more worker threads encountered errors");
}
static void
initPopulateTable(PGconn *con, const char *table, int64 total_rows,
initRowMethod init_row)
{
executeStatement(con, "begin");
printf("Truncating table %s...\n", table);
truncateTable(con, table);
initPopulateTableParallel(con, nthreads, table, total_rows * scale, init_row);
}
/* qsort comparator for Variable array */
static int
compareVariableNames(const void *v1, const void *v2)
@ -3496,7 +3738,6 @@ static int
discardUntilSync(CState *st)
{
bool received_sync = false;
/* send a sync */
if (!PQpipelineSync(st->con))
{
@ -4934,16 +5175,12 @@ initCreateTables(PGconn *con)
}
/*
* Truncate away any old data, in one command in case there are foreign keys
* Truncate away any old data for pgbench tables, in one command in case there are foreign keys
*/
static void
initTruncateTables(PGconn *con)
{
executeStatement(con, "truncate table "
"pgbench_accounts, "
"pgbench_branches, "
"pgbench_history, "
"pgbench_tellers");
truncateTable(con, "pgbench_accounts, pgbench_branches, pgbench_history, pgbench_tellers");
}
static void
@ -4973,124 +5210,124 @@ initAccount(PQExpBufferData *sql, int64 curr)
curr + 1, curr / naccounts + 1);
}
static void
initPopulateTable(PGconn *con, const char *table, int64 base,
initRowMethod init_row)
{
int n;
int64 k;
int chars = 0;
int prev_chars = 0;
PGresult *res;
PQExpBufferData sql;
char copy_statement[256];
const char *copy_statement_fmt = "copy %s from stdin";
int64 total = base * scale;
/* used to track elapsed time and estimate of the remaining time */
pg_time_usec_t start;
int log_interval = 1;
/* Stay on the same line if reporting to a terminal */
char eol = isatty(fileno(stderr)) ? '\r' : '\n';
initPQExpBuffer(&sql);
/* Use COPY with FREEZE on v14 and later for all ordinary tables */
if ((PQserverVersion(con) >= 140000) &&
get_table_relkind(con, table) == RELKIND_RELATION)
copy_statement_fmt = "copy %s from stdin with (freeze on)";
n = pg_snprintf(copy_statement, sizeof(copy_statement), copy_statement_fmt, table);
if (n >= sizeof(copy_statement))
pg_fatal("invalid buffer size: must be at least %d characters long", n);
else if (n == -1)
pg_fatal("invalid format string");
res = PQexec(con, copy_statement);
if (PQresultStatus(res) != PGRES_COPY_IN)
pg_fatal("unexpected copy in result: %s", PQerrorMessage(con));
PQclear(res);
start = pg_time_now();
for (k = 0; k < total; k++)
{
int64 j = k + 1;
init_row(&sql, k);
if (PQputline(con, sql.data))
pg_fatal("PQputline failed");
if (CancelRequested)
break;
/*
* If we want to stick with the original logging, print a message each
* 100k inserted rows.
*/
if ((!use_quiet) && (j % 100000 == 0))
{
double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start);
double remaining_sec = ((double) total - j) * elapsed_sec / j;
chars = fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) of %s done (elapsed %.2f s, remaining %.2f s)",
j, total,
(int) ((j * 100) / total),
table, elapsed_sec, remaining_sec);
/*
* If the previous progress message is longer than the current
* one, add spaces to the current line to fully overwrite any
* remaining characters from the previous message.
*/
if (prev_chars > chars)
fprintf(stderr, "%*c", prev_chars - chars, ' ');
fputc(eol, stderr);
prev_chars = chars;
}
/* let's not call the timing for each row, but only each 100 rows */
else if (use_quiet && (j % 100 == 0))
{
double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start);
double remaining_sec = ((double) total - j) * elapsed_sec / j;
/* have we reached the next interval (or end)? */
if ((j == total) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
{
chars = fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) of %s done (elapsed %.2f s, remaining %.2f s)",
j, total,
(int) ((j * 100) / total),
table, elapsed_sec, remaining_sec);
/*
* If the previous progress message is longer than the current
* one, add spaces to the current line to fully overwrite any
* remaining characters from the previous message.
*/
if (prev_chars > chars)
fprintf(stderr, "%*c", prev_chars - chars, ' ');
fputc(eol, stderr);
prev_chars = chars;
/* skip to the next interval */
log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS);
}
}
}
if (chars != 0 && eol != '\n')
fprintf(stderr, "%*c\r", chars, ' '); /* Clear the current line */
if (PQputline(con, "\\.\n"))
pg_fatal("very last PQputline failed");
if (PQendcopy(con))
pg_fatal("PQendcopy failed");
termPQExpBuffer(&sql);
}
// static void
// initPopulateTable(PGconn *con, const char *table, int64 base,
// initRowMethod init_row)
// {
// int n;
// int64 k;
// int chars = 0;
// int prev_chars = 0;
// PGresult *res;
// PQExpBufferData sql;
// char copy_statement[256];
// const char *copy_statement_fmt = "copy %s from stdin";
// int64 total = base * scale;
// /* used to track elapsed time and estimate of the remaining time */
// pg_time_usec_t start;
// int log_interval = 1;
// /* Stay on the same line if reporting to a terminal */
// char eol = isatty(fileno(stderr)) ? '\r' : '\n';
// initPQExpBuffer(&sql);
// /* Use COPY with FREEZE on v14 and later for all ordinary tables */
// if ((PQserverVersion(con) >= 140000) &&
// get_table_relkind(con, table) == RELKIND_RELATION)
// copy_statement_fmt = "copy %s from stdin with (freeze on)";
// n = pg_snprintf(copy_statement, sizeof(copy_statement), copy_statement_fmt, table);
// if (n >= sizeof(copy_statement))
// pg_fatal("invalid buffer size: must be at least %d characters long", n);
// else if (n == -1)
// pg_fatal("invalid format string");
// res = PQexec(con, copy_statement);
// if (PQresultStatus(res) != PGRES_COPY_IN)
// pg_fatal("unexpected copy in result: %s", PQerrorMessage(con));
// PQclear(res);
// start = pg_time_now();
// for (k = 0; k < total; k++)
// {
// int64 j = k + 1;
// init_row(&sql, k);
// if (PQputline(con, sql.data))
// pg_fatal("PQputline failed");
// if (CancelRequested)
// break;
// /*
// * If we want to stick with the original logging, print a message each
// * 100k inserted rows.
// */
// if ((!use_quiet) && (j % 100000 == 0))
// {
// double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start);
// double remaining_sec = ((double) total - j) * elapsed_sec / j;
// chars = fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) of %s done (elapsed %.2f s, remaining %.2f s)",
// j, total,
// (int) ((j * 100) / total),
// table, elapsed_sec, remaining_sec);
// /*
// * If the previous progress message is longer than the current
// * one, add spaces to the current line to fully overwrite any
// * remaining characters from the previous message.
// */
// if (prev_chars > chars)
// fprintf(stderr, "%*c", prev_chars - chars, ' ');
// fputc(eol, stderr);
// prev_chars = chars;
// }
// /* let's not call the timing for each row, but only each 100 rows */
// else if (use_quiet && (j % 100 == 0))
// {
// double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start);
// double remaining_sec = ((double) total - j) * elapsed_sec / j;
// /* have we reached the next interval (or end)? */
// if ((j == total) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
// {
// chars = fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) of %s done (elapsed %.2f s, remaining %.2f s)",
// j, total,
// (int) ((j * 100) / total),
// table, elapsed_sec, remaining_sec);
// /*
// * If the previous progress message is longer than the current
// * one, add spaces to the current line to fully overwrite any
// * remaining characters from the previous message.
// */
// if (prev_chars > chars)
// fprintf(stderr, "%*c", prev_chars - chars, ' ');
// fputc(eol, stderr);
// prev_chars = chars;
// /* skip to the next interval */
// log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS);
// }
// }
// }
// if (chars != 0 && eol != '\n')
// fprintf(stderr, "%*c\r", chars, ' '); /* Clear the current line */
// if (PQputline(con, "\\.\n"))
// pg_fatal("very last PQputline failed");
// if (PQendcopy(con))
// pg_fatal("PQendcopy failed");
// termPQExpBuffer(&sql);
// }
/*
* Fill the standard tables with some data generated and sent from the client.
@ -5107,10 +5344,10 @@ initGenerateDataClientSide(PGconn *con)
* we do all of this in one transaction to enable the backend's
* data-loading optimizations
*/
executeStatement(con, "begin");
// executeStatement(con, "begin");
/* truncate away any old data */
initTruncateTables(con);
// initTruncateTables(con);
/*
* fill branches, tellers, accounts in that order in case foreign keys
@ -5120,7 +5357,7 @@ initGenerateDataClientSide(PGconn *con)
initPopulateTable(con, "pgbench_tellers", ntellers, initTeller);
initPopulateTable(con, "pgbench_accounts", naccounts, initAccount);
executeStatement(con, "commit");
// executeStatement(con, "commit");
}
/*
@ -6889,6 +7126,7 @@ main(int argc, char **argv)
{
exit(1);
}
break;
case 'l':
benchmarking_option_set = true;
@ -7111,7 +7349,7 @@ main(int argc, char **argv)
* optimization; throttle_delay is calculated incorrectly below if some
* threads have no clients assigned to them.)
*/
if (nthreads > nclients)
if (nthreads > nclients && !is_init_mode)
nthreads = nclients;
/*
@ -7146,8 +7384,8 @@ main(int argc, char **argv)
if (is_init_mode)
{
if (benchmarking_option_set)
pg_fatal("some of the specified options cannot be used in initialization (-i) mode");
// if (benchmarking_option_set)
// pg_fatal("some of the specified options cannot be used in initialization (-i) mode");
if (partitions == 0 && partition_method != PART_NONE)
pg_fatal("--partition-method requires greater than zero --partitions");

Loading…
Cancel
Save