@ -628,7 +628,8 @@ typedef struct
pg_time_usec_t txn_begin ; /* used for measuring schedule lag times */
pg_time_usec_t txn_begin ; /* used for measuring schedule lag times */
pg_time_usec_t stmt_begin ; /* used for measuring statement latencies */
pg_time_usec_t stmt_begin ; /* used for measuring statement latencies */
bool prepared [ MAX_SCRIPTS ] ; /* whether client prepared the script */
/* whether client prepared each command of each script */
bool * * prepared ;
/*
/*
* For processing failures and repeating transactions with serialization
* For processing failures and repeating transactions with serialization
@ -733,6 +734,7 @@ static const char *QUERYMODE[] = {"simple", "extended", "prepared"};
* argv Command arguments , the first of which is the command or SQL
* argv Command arguments , the first of which is the command or SQL
* string itself . For SQL commands , after post - processing
* string itself . For SQL commands , after post - processing
* argv [ 0 ] is the same as ' lines ' with variables substituted .
* argv [ 0 ] is the same as ' lines ' with variables substituted .
* prepname The name that this command is prepared under , in prepare mode
* varprefix SQL commands terminated with \ gset or \ aset have this set
* varprefix SQL commands terminated with \ gset or \ aset have this set
* to a non NULL value . If nonempty , it ' s used to prefix the
* to a non NULL value . If nonempty , it ' s used to prefix the
* variable name that receives the value .
* variable name that receives the value .
@ -751,6 +753,7 @@ typedef struct Command
MetaCommand meta ;
MetaCommand meta ;
int argc ;
int argc ;
char * argv [ MAX_ARGS ] ;
char * argv [ MAX_ARGS ] ;
char * prepname ;
char * varprefix ;
char * varprefix ;
PgBenchExpr * expr ;
PgBenchExpr * expr ;
SimpleStats stats ;
SimpleStats stats ;
@ -3006,13 +3009,6 @@ runShellCommand(Variables *variables, char *variable, char **argv, int argc)
return true ;
return true ;
}
}
# define MAX_PREPARE_NAME 32
static void
preparedStatementName ( char * buffer , int file , int state )
{
sprintf ( buffer , " P%d_%d " , file , state ) ;
}
/*
/*
* Report the abortion of the client when processing SQL commands .
* Report the abortion of the client when processing SQL commands .
*/
*/
@ -3053,6 +3049,87 @@ chooseScript(TState *thread)
return i - 1 ;
return i - 1 ;
}
}
/*
* Prepare the SQL command from st - > use_file at command_num .
*/
static void
prepareCommand ( CState * st , int command_num )
{
Command * command = sql_script [ st - > use_file ] . commands [ command_num ] ;
/* No prepare for non-SQL commands */
if ( command - > type ! = SQL_COMMAND )
return ;
/*
* If not already done , allocate space for ' prepared ' flags : one boolean
* for each command of each script .
*/
if ( ! st - > prepared )
{
st - > prepared = pg_malloc ( sizeof ( bool * ) * num_scripts ) ;
for ( int i = 0 ; i < num_scripts ; i + + )
{
ParsedScript * script = & sql_script [ i ] ;
int numcmds ;
for ( numcmds = 0 ; script - > commands [ numcmds ] ! = NULL ; numcmds + + )
;
st - > prepared [ i ] = pg_malloc0 ( sizeof ( bool ) * numcmds ) ;
}
}
if ( ! st - > prepared [ st - > use_file ] [ command_num ] )
{
PGresult * res ;
pg_log_debug ( " client %d preparing %s " , st - > id , command - > prepname ) ;
res = PQprepare ( st - > con , command - > prepname ,
command - > argv [ 0 ] , command - > argc - 1 , NULL ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
pg_log_error ( " %s " , PQerrorMessage ( st - > con ) ) ;
PQclear ( res ) ;
st - > prepared [ st - > use_file ] [ command_num ] = true ;
}
}
/*
* Prepare all the commands in the script that come after the \ startpipeline
* that ' s at position st - > command , and the first \ endpipeline we find .
*
* This sets the - > prepared flag for each relevant command as well as the
* \ startpipeline itself , but doesn ' t move the st - > command counter .
*/
static void
prepareCommandsInPipeline ( CState * st )
{
int j ;
Command * * commands = sql_script [ st - > use_file ] . commands ;
Assert ( commands [ st - > command ] - > type = = META_COMMAND & &
commands [ st - > command ] - > meta = = META_STARTPIPELINE ) ;
/*
* We set the ' prepared ' flag on the \ startpipeline itself to flag that we
* don ' t need to do this next time without calling prepareCommand ( ) , even
* though we don ' t actually prepare this command .
*/
if ( st - > prepared & &
st - > prepared [ st - > use_file ] [ st - > command ] )
return ;
for ( j = st - > command + 1 ; commands [ j ] ! = NULL ; j + + )
{
if ( commands [ j ] - > type = = META_COMMAND & &
commands [ j ] - > meta = = META_ENDPIPELINE )
break ;
prepareCommand ( st , j ) ;
}
st - > prepared [ st - > use_file ] [ st - > command ] = true ;
}
/* Send a SQL command, using the chosen querymode */
/* Send a SQL command, using the chosen querymode */
static bool
static bool
sendCommand ( CState * st , Command * command )
sendCommand ( CState * st , Command * command )
@ -3083,49 +3160,13 @@ sendCommand(CState *st, Command *command)
}
}
else if ( querymode = = QUERY_PREPARED )
else if ( querymode = = QUERY_PREPARED )
{
{
char name [ MAX_PREPARE_NAME ] ;
const char * params [ MAX_ARGS ] ;
const char * params [ MAX_ARGS ] ;
if ( ! st - > prepared [ st - > use_file ] )
prepareCommand ( st , st - > command ) ;
{
int j ;
Command * * commands = sql_script [ st - > use_file ] . commands ;
for ( j = 0 ; commands [ j ] ! = NULL ; j + + )
{
PGresult * res ;
if ( commands [ j ] - > type ! = SQL_COMMAND )
continue ;
preparedStatementName ( name , st - > use_file , j ) ;
if ( PQpipelineStatus ( st - > con ) = = PQ_PIPELINE_OFF )
{
res = PQprepare ( st - > con , name ,
commands [ j ] - > argv [ 0 ] , commands [ j ] - > argc - 1 , NULL ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
pg_log_error ( " %s " , PQerrorMessage ( st - > con ) ) ;
PQclear ( res ) ;
}
else
{
/*
* In pipeline mode , we use asynchronous functions . If a
* server - side error occurs , it will be processed later
* among the other results .
*/
if ( ! PQsendPrepare ( st - > con , name ,
commands [ j ] - > argv [ 0 ] , commands [ j ] - > argc - 1 , NULL ) )
pg_log_error ( " %s " , PQerrorMessage ( st - > con ) ) ;
}
}
st - > prepared [ st - > use_file ] = true ;
}
getQueryParams ( & st - > variables , command , params ) ;
getQueryParams ( & st - > variables , command , params ) ;
preparedStatementName ( name , st - > use_file , st - > command ) ;
pg_log_debug ( " client %d sending %s " , st - > id , name ) ;
pg_log_debug ( " client %d sending %s " , st - > id , command - > prepname ) ;
r = PQsendQueryPrepared ( st - > con , name , command - > argc - 1 ,
r = PQsendQueryPrepared ( st - > con , command - > prepname , command - > argc - 1 ,
params , NULL , NULL , 0 ) ;
params , NULL , NULL , 0 ) ;
}
}
else /* unknown sql mode */
else /* unknown sql mode */
@ -3597,7 +3638,8 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
thread - > conn_duration + = now - start ;
thread - > conn_duration + = now - start ;
/* Reset session-local state */
/* Reset session-local state */
memset ( st - > prepared , 0 , sizeof ( st - > prepared ) ) ;
pg_free ( st - > prepared ) ;
st - > prepared = NULL ;
}
}
/*
/*
@ -4360,6 +4402,16 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
return CSTATE_ABORTED ;
return CSTATE_ABORTED ;
}
}
/*
* If we ' re in prepared - query mode , we need to prepare all the
* commands that are inside the pipeline before we actually start the
* pipeline itself . This solves the problem that running BEGIN
* ISOLATION LEVEL SERIALIZABLE in a pipeline would fail due to a
* snapshot having been acquired by the prepare within the pipeline .
*/
if ( querymode = = QUERY_PREPARED )
prepareCommandsInPipeline ( st ) ;
if ( PQpipelineStatus ( st - > con ) ! = PQ_PIPELINE_OFF )
if ( PQpipelineStatus ( st - > con ) ! = PQ_PIPELINE_OFF )
{
{
commandFailed ( st , " startpipeline " , " already in pipeline mode " ) ;
commandFailed ( st , " startpipeline " , " already in pipeline mode " ) ;
@ -5439,6 +5491,7 @@ create_sql_command(PQExpBuffer buf, const char *source)
my_command - > varprefix = NULL ; /* allocated later, if needed */
my_command - > varprefix = NULL ; /* allocated later, if needed */
my_command - > expr = NULL ;
my_command - > expr = NULL ;
initSimpleStats ( & my_command - > stats ) ;
initSimpleStats ( & my_command - > stats ) ;
my_command - > prepname = NULL ; /* set later, if needed */
return my_command ;
return my_command ;
}
}
@ -5468,6 +5521,7 @@ static void
postprocess_sql_command ( Command * my_command )
postprocess_sql_command ( Command * my_command )
{
{
char buffer [ 128 ] ;
char buffer [ 128 ] ;
static int prepnum = 0 ;
Assert ( my_command - > type = = SQL_COMMAND ) ;
Assert ( my_command - > type = = SQL_COMMAND ) ;
@ -5476,15 +5530,17 @@ postprocess_sql_command(Command *my_command)
buffer [ strcspn ( buffer , " \n \r " ) ] = ' \0 ' ;
buffer [ strcspn ( buffer , " \n \r " ) ] = ' \0 ' ;
my_command - > first_line = pg_strdup ( buffer ) ;
my_command - > first_line = pg_strdup ( buffer ) ;
/* parse query if necessary */
/* Parse query and generate prepared statement name, if necessary */
switch ( querymode )
switch ( querymode )
{
{
case QUERY_SIMPLE :
case QUERY_SIMPLE :
my_command - > argv [ 0 ] = my_command - > lines . data ;
my_command - > argv [ 0 ] = my_command - > lines . data ;
my_command - > argc + + ;
my_command - > argc + + ;
break ;
break ;
case QUERY_EXTENDED :
case QUERY_PREPARED :
case QUERY_PREPARED :
my_command - > prepname = psprintf ( " P_%d " , prepnum + + ) ;
/* fall through */
case QUERY_EXTENDED :
if ( ! parseQuery ( my_command ) )
if ( ! parseQuery ( my_command ) )
exit ( 1 ) ;
exit ( 1 ) ;
break ;
break ;