@ -42,6 +42,7 @@
# include <limits.h>
# include <limits.h>
# include <math.h>
# include <math.h>
# include <signal.h>
# include <signal.h>
# include <time.h>
# include <sys/time.h>
# include <sys/time.h>
# ifdef HAVE_SYS_SELECT_H
# ifdef HAVE_SYS_SELECT_H
# include <sys/select.h>
# include <sys/select.h>
@ -227,7 +228,7 @@ typedef struct SimpleStats
*/
*/
typedef struct StatsData
typedef struct StatsData
{
{
long start_time ; /* interval start time, for aggregates */
time_t start_time ; /* interval start time, for aggregates */
int64 cnt ; /* number of transactions */
int64 cnt ; /* number of transactions */
int64 skipped ; /* number of transactions skipped under --rate
int64 skipped ; /* number of transactions skipped under --rate
* and - - latency - limit */
* and - - latency - limit */
@ -449,7 +450,7 @@ static const BuiltinScript builtin_script[] =
static void setIntValue ( PgBenchValue * pv , int64 ival ) ;
static void setIntValue ( PgBenchValue * pv , int64 ival ) ;
static void setDoubleValue ( PgBenchValue * pv , double dval ) ;
static void setDoubleValue ( PgBenchValue * pv , double dval ) ;
static bool evaluateExpr ( TState * , CState * , PgBenchExpr * , PgBenchValue * ) ;
static bool evaluateExpr ( TState * , CState * , PgBenchExpr * , PgBenchValue * ) ;
static void doLog ( TState * thread , CState * st , instr_time * now ,
static void doLog ( TState * thread , CState * st ,
StatsData * agg , bool skipped , double latency , double lag ) ;
StatsData * agg , bool skipped , double latency , double lag ) ;
static void processXactStats ( TState * thread , CState * st , instr_time * now ,
static void processXactStats ( TState * thread , CState * st , instr_time * now ,
bool skipped , StatsData * agg ) ;
bool skipped , StatsData * agg ) ;
@ -780,7 +781,7 @@ mergeSimpleStats(SimpleStats *acc, SimpleStats *ss)
* the given value .
* the given value .
*/
*/
static void
static void
initStats ( StatsData * sd , double start_time )
initStats ( StatsData * sd , time_t start_time )
{
{
sd - > start_time = start_time ;
sd - > start_time = start_time ;
sd - > cnt = 0 ;
sd - > cnt = 0 ;
@ -2425,10 +2426,15 @@ doCustom(TState *thread, CState *st, StatsData *agg)
}
}
/*
/*
* print log entry after completing one transaction .
* Print log entry after completing one transaction .
*
* We print Unix - epoch timestamps in the log , so that entries can be
* correlated against other logs . On some platforms this could be obtained
* from the instr_time reading the caller has , but rather than get entangled
* with that , we just eat the cost of an extra syscall in all cases .
*/
*/
static void
static void
doLog ( TState * thread , CState * st , instr_time * now ,
doLog ( TState * thread , CState * st ,
StatsData * agg , bool skipped , double latency , double lag )
StatsData * agg , bool skipped , double latency , double lag )
{
{
FILE * logfile = thread - > logfile ;
FILE * logfile = thread - > logfile ;
@ -2447,15 +2453,17 @@ doLog(TState *thread, CState *st, instr_time *now,
if ( agg_interval > 0 )
if ( agg_interval > 0 )
{
{
/*
/*
* Loop until we reach the interval of the current transaction , and
* Loop until we reach the interval of the current moment , and print
* print all the empty intervals in between ( this may happen with very
* any empty intervals in between ( this may happen with very low tps ,
* low tps , e . g . - - rate = 0.1 ) .
* e . g . - - rate = 0.1 ) .
*/
*/
while ( agg - > start_time + agg_interval < INSTR_TIME_GET_DOUBLE ( * now ) )
time_t now = time ( NULL ) ;
while ( agg - > start_time + agg_interval < = now )
{
{
/* print aggregated report to logfile */
/* print aggregated report to logfile */
fprintf ( logfile , " %ld " INT64_FORMAT " %.0f %.0f %.0f %.0f " ,
fprintf ( logfile , " %ld " INT64_FORMAT " %.0f %.0f %.0f %.0f " ,
agg - > start_time ,
( long ) agg - > start_time ,
agg - > cnt ,
agg - > cnt ,
agg - > latency . sum ,
agg - > latency . sum ,
agg - > latency . sum2 ,
agg - > latency . sum2 ,
@ -2485,12 +2493,6 @@ doLog(TState *thread, CState *st, instr_time *now,
/* no, print raw transactions */
/* no, print raw transactions */
struct timeval tv ;
struct timeval tv ;
/*
* We print the current system timestamp in the log , so that entries
* can be correlated against other logs . On some platforms this is
* available in * now , but rather than get entangled with that , we just
* eat the cost of an extra syscall in all cases .
*/
gettimeofday ( & tv , NULL ) ;
gettimeofday ( & tv , NULL ) ;
if ( skipped )
if ( skipped )
fprintf ( logfile , " %d " INT64_FORMAT " skipped %d %ld %ld " ,
fprintf ( logfile , " %d " INT64_FORMAT " skipped %d %ld %ld " ,
@ -2518,7 +2520,7 @@ processXactStats(TState *thread, CState *st, instr_time *now,
double latency = 0.0 ,
double latency = 0.0 ,
lag = 0.0 ;
lag = 0.0 ;
if ( ( ! skipped | | agg_interval ) & & INSTR_TIME_IS_ZERO ( * now ) )
if ( ( ! skipped ) & & INSTR_TIME_IS_ZERO ( * now ) )
INSTR_TIME_SET_CURRENT ( * now ) ;
INSTR_TIME_SET_CURRENT ( * now ) ;
if ( ! skipped )
if ( ! skipped )
@ -2540,7 +2542,7 @@ processXactStats(TState *thread, CState *st, instr_time *now,
thread - > stats . cnt + + ;
thread - > stats . cnt + + ;
if ( use_log )
if ( use_log )
doLog ( thread , st , now , agg , skipped , latency , lag ) ;
doLog ( thread , st , agg , skipped , latency , lag ) ;
/* XXX could use a mutex here, but we choose not to */
/* XXX could use a mutex here, but we choose not to */
if ( per_script_stats )
if ( per_script_stats )
@ -3202,7 +3204,7 @@ ParseScript(const char *script, const char *desc, int weight)
ps . desc = desc ;
ps . desc = desc ;
ps . weight = weight ;
ps . weight = weight ;
ps . commands = ( Command * * ) pg_malloc ( sizeof ( Command * ) * alloc_num ) ;
ps . commands = ( Command * * ) pg_malloc ( sizeof ( Command * ) * alloc_num ) ;
initStats ( & ps . stats , 0. 0) ;
initStats ( & ps . stats , 0 ) ;
/* Prepare to parse script */
/* Prepare to parse script */
sstate = psql_scan_create ( & pgbench_callbacks ) ;
sstate = psql_scan_create ( & pgbench_callbacks ) ;
@ -3972,10 +3974,6 @@ main(int argc, char **argv)
}
}
break ;
break ;
case 5 :
case 5 :
# ifdef WIN32
fprintf ( stderr , " --aggregate-interval is not currently supported on Windows \n " ) ;
exit ( 1 ) ;
# else
benchmarking_option_set = true ;
benchmarking_option_set = true ;
agg_interval = atoi ( optarg ) ;
agg_interval = atoi ( optarg ) ;
if ( agg_interval < = 0 )
if ( agg_interval < = 0 )
@ -3984,7 +3982,6 @@ main(int argc, char **argv)
optarg ) ;
optarg ) ;
exit ( 1 ) ;
exit ( 1 ) ;
}
}
# endif
break ;
break ;
case 6 :
case 6 :
progress_timestamp = true ;
progress_timestamp = true ;
@ -4267,7 +4264,7 @@ main(int argc, char **argv)
thread - > random_state [ 2 ] = random ( ) ;
thread - > random_state [ 2 ] = random ( ) ;
thread - > logfile = NULL ; /* filled in later */
thread - > logfile = NULL ; /* filled in later */
thread - > latency_late = 0 ;
thread - > latency_late = 0 ;
initStats ( & thread - > stats , 0. 0) ;
initStats ( & thread - > stats , 0 ) ;
nclients_dealt + = thread - > nstate ;
nclients_dealt + = thread - > nstate ;
}
}
@ -4321,7 +4318,7 @@ main(int argc, char **argv)
# endif /* ENABLE_THREAD_SAFETY */
# endif /* ENABLE_THREAD_SAFETY */
/* wait for threads and accumulate results */
/* wait for threads and accumulate results */
initStats ( & stats , 0. 0) ;
initStats ( & stats , 0 ) ;
INSTR_TIME_SET_ZERO ( conn_total_time ) ;
INSTR_TIME_SET_ZERO ( conn_total_time ) ;
for ( i = 0 ; i < nthreads ; i + + )
for ( i = 0 ; i < nthreads ; i + + )
{
{
@ -4394,6 +4391,9 @@ threadRun(void *arg)
INSTR_TIME_SET_ZERO ( thread - > conn_time ) ;
INSTR_TIME_SET_ZERO ( thread - > conn_time ) ;
initStats ( & aggs , time ( NULL ) ) ;
last = aggs ;
/* open log file if requested */
/* open log file if requested */
if ( use_log )
if ( use_log )
{
{
@ -4429,9 +4429,6 @@ threadRun(void *arg)
INSTR_TIME_SET_CURRENT ( thread - > conn_time ) ;
INSTR_TIME_SET_CURRENT ( thread - > conn_time ) ;
INSTR_TIME_SUBTRACT ( thread - > conn_time , thread - > start_time ) ;
INSTR_TIME_SUBTRACT ( thread - > conn_time , thread - > start_time ) ;
initStats ( & aggs , INSTR_TIME_GET_DOUBLE ( thread - > start_time ) ) ;
last = aggs ;
/* explicitly initialize the state machines */
/* explicitly initialize the state machines */
for ( i = 0 ; i < nstate ; i + + )
for ( i = 0 ; i < nstate ; i + + )
{
{
@ -4635,7 +4632,7 @@ threadRun(void *arg)
* ( If a read from a 64 - bit integer is not atomic , you might
* ( If a read from a 64 - bit integer is not atomic , you might
* get a " torn " read and completely bogus latencies though ! )
* get a " torn " read and completely bogus latencies though ! )
*/
*/
initStats ( & cur , 0. 0) ;
initStats ( & cur , 0 ) ;
for ( i = 0 ; i < nthreads ; i + + )
for ( i = 0 ; i < nthreads ; i + + )
{
{
mergeSimpleStats ( & cur . latency , & thread [ i ] . stats . latency ) ;
mergeSimpleStats ( & cur . latency , & thread [ i ] . stats . latency ) ;
@ -4695,12 +4692,13 @@ done:
INSTR_TIME_ACCUM_DIFF ( thread - > conn_time , end , start ) ;
INSTR_TIME_ACCUM_DIFF ( thread - > conn_time , end , start ) ;
if ( thread - > logfile )
if ( thread - > logfile )
{
{
if ( agg_interval )
if ( agg_interval > 0 )
{
{
/* log aggregated but not yet reported transactions */
/* log aggregated but not yet reported transactions */
doLog ( thread , state , & end , & aggs , false , 0 , 0 ) ;
doLog ( thread , state , & aggs , false , 0 , 0 ) ;
}
}
fclose ( thread - > logfile ) ;
fclose ( thread - > logfile ) ;
thread - > logfile = NULL ;
}
}
return NULL ;
return NULL ;
}
}