@ -70,20 +70,8 @@ static int pthread_join(pthread_t th, void **thread_return);
/* Use platform-dependent pthread capability */
# include <pthread.h>
# else
/* Use emulation with fork. Rename pthread identifiers to avoid conflicts */
# define PTHREAD_FORK_EMULATION
# include <sys/wait.h>
# define pthread_t pg_pthread_t
# define pthread_attr_t pg_pthread_attr_t
# define pthread_create pg_pthread_create
# define pthread_join pg_pthread_join
typedef struct fork_pthread * pthread_t ;
typedef int pthread_attr_t ;
static int pthread_create ( pthread_t * thread , pthread_attr_t * attr , void * ( * start_routine ) ( void * ) , void * arg ) ;
static int pthread_join ( pthread_t th , void * * thread_return ) ;
/* No threads implementation, use none (-j 1) */
# define pthread_t void *
# endif
@ -210,8 +198,6 @@ typedef struct
PGconn * con ; /* connection handle to DB */
int id ; /* client No. */
int state ; /* state No. */
int cnt ; /* xacts count */
int ecnt ; /* error count */
int listen ; /* 0 indicates that an async query has been
* sent */
int sleeping ; /* 1 indicates that the client is napping */
@ -221,15 +207,19 @@ typedef struct
int64 txn_scheduled ; /* scheduled start time of transaction (usec) */
instr_time txn_begin ; /* used for measuring schedule lag times */
instr_time stmt_begin ; /* used for measuring statement latencies */
int64 txn_latencies ; /* cumulated latencies */
int64 txn_sqlats ; /* cumulated square latencies */
bool is_throttled ; /* whether transaction throttling is done */
int use_file ; /* index in sql_files for this client */
bool prepared [ MAX_FILES ] ;
/* per client collected stats */
int cnt ; /* xacts count */
int ecnt ; /* error count */
int64 txn_latencies ; /* cumulated latencies */
int64 txn_sqlats ; /* cumulated square latencies */
} CState ;
/*
* Thread state and result
* Thread state
*/
typedef struct
{
@ -242,6 +232,9 @@ typedef struct
int * exec_count ; /* number of cmd executions (per Command) */
unsigned short random_state [ 3 ] ; /* separate randomness for each thread */
int64 throttle_trigger ; /* previous/next throttling (us) */
/* per thread collected stats */
instr_time conn_time ;
int64 throttle_lag ; /* total transaction lag behind throttling */
int64 throttle_lag_max ; /* max transaction lag */
int64 throttle_latency_skipped ; /* lagging transactions
@ -251,18 +244,6 @@ typedef struct
# define INVALID_THREAD ((pthread_t) 0)
typedef struct
{
instr_time conn_time ;
int64 xacts ;
int64 latencies ;
int64 sqlats ;
int64 throttle_lag ;
int64 throttle_lag_max ;
int64 throttle_latency_skipped ;
int64 latency_late ;
} TResult ;
/*
* queries read from files
*/
@ -2926,6 +2907,13 @@ main(int argc, char **argv)
fprintf ( stderr , " invalid number of threads: %d \n " , nthreads ) ;
exit ( 1 ) ;
}
# ifndef ENABLE_THREAD_SAFETY
if ( nthreads ! = 1 )
{
fprintf ( stderr , " threads are not supported on this platform, use -j1 \n " ) ;
exit ( 1 ) ;
}
# endif /* !ENABLE_THREAD_SAFETY */
break ;
case ' C ' :
benchmarking_option_set = true ;
@ -3194,22 +3182,6 @@ main(int argc, char **argv)
exit ( 1 ) ;
}
/*
* is_latencies only works with multiple threads in thread - based
* implementations , not fork - based ones , because it supposes that the
* parent can see changes made to the per - thread execution stats by child
* threads . It seems useful enough to accept despite this limitation , but
* perhaps we should FIXME someday ( by passing the stats data back up
* through the parent - to - child pipes ) .
*/
# ifndef ENABLE_THREAD_SAFETY
if ( is_latencies & & nthreads > 1 )
{
fprintf ( stderr , " -r does not work with -j larger than 1 on this platform. \n " ) ;
exit ( 1 ) ;
}
# endif
/*
* save main process id in the global variable because process id will be
* changed after fork .
@ -3414,6 +3386,7 @@ main(int argc, char **argv)
setalarm ( duration ) ;
/* start threads */
# ifdef ENABLE_THREAD_SAFETY
for ( i = 0 ; i < nthreads ; i + + )
{
TState * thread = & threads [ i ] ;
@ -3436,32 +3409,43 @@ main(int argc, char **argv)
thread - > thread = INVALID_THREAD ;
}
}
# else
INSTR_TIME_SET_CURRENT ( threads [ 0 ] . start_time ) ;
threads [ 0 ] . thread = INVALID_THREAD ;
# endif /* ENABLE_THREAD_SAFETY */
/* wait for threads and accumulate results */
INSTR_TIME_SET_ZERO ( conn_total_time ) ;
for ( i = 0 ; i < nthreads ; i + + )
{
void * ret = NULL ;
TState * thread = & threads [ i ] ;
int j ;
# ifdef ENABLE_THREAD_SAFETY
if ( threads [ i ] . thread = = INVALID_THREAD )
ret = threadRun ( & threads [ i ] ) ;
/* actually run this thread directly in the main thread */
( void ) threadRun ( thread ) ;
else
pthread_join ( threads [ i ] . thread , & ret ) ;
/* wait of other threads. should check that 0 is returned? */
pthread_join ( thread - > thread , NULL ) ;
# else
( void ) threadRun ( thread ) ;
# endif /* ENABLE_THREAD_SAFETY */
if ( ret ! = NULL )
{
TResult * r = ( TResult * ) ret ;
/* thread level stats */
throttle_lag + = thread - > throttle_lag ;
throttle_latency_skipped = threads - > throttle_latency_skipped ;
latency_late = thread - > latency_late ;
if ( throttle_lag_max > thread - > throttle_lag_max )
throttle_lag_max = thread - > throttle_lag_max ;
INSTR_TIME_ADD ( conn_total_time , thread - > conn_time ) ;
total_xacts + = r - > xacts ;
total_latencies + = r - > latencies ;
total_sqlats + = r - > sqlats ;
throttle_lag + = r - > throttle_lag ;
throttle_latency_skipped + = r - > throttle_latency_skipped ;
latency_late + = r - > latency_late ;
if ( r - > throttle_lag_max > throttle_lag_max )
throttle_lag_max = r - > throttle_lag_max ;
INSTR_TIME_ADD ( conn_total_time , r - > conn_time ) ;
free ( ret ) ;
/* client-level stats */
for ( j = 0 ; j < thread - > nstate ; j + + )
{
total_xacts + = thread - > state [ j ] . cnt ;
total_latencies + = thread - > state [ i ] . txn_latencies ;
total_sqlats + = thread - > state [ i ] . txn_sqlats ;
}
}
disconnect_all ( state , nclients ) ;
@ -3491,7 +3475,6 @@ threadRun(void *arg)
{
TState * thread = ( TState * ) arg ;
CState * state = thread - > state ;
TResult * result ;
FILE * logfile = NULL ; /* per-thread log file */
instr_time start ,
end ;
@ -3522,9 +3505,7 @@ threadRun(void *arg)
thread - > throttle_lag = 0 ;
thread - > throttle_lag_max = 0 ;
result = pg_malloc ( sizeof ( TResult ) ) ;
INSTR_TIME_SET_ZERO ( result - > conn_time ) ;
INSTR_TIME_SET_ZERO ( thread - > conn_time ) ;
/* open log file if requested */
if ( use_log )
@ -3555,8 +3536,8 @@ threadRun(void *arg)
}
/* time after thread and connections set up */
INSTR_TIME_SET_CURRENT ( result - > conn_time ) ;
INSTR_TIME_SUBTRACT ( result - > conn_time , thread - > start_time ) ;
INSTR_TIME_SET_CURRENT ( thread - > conn_time ) ;
INSTR_TIME_SUBTRACT ( thread - > conn_time , thread - > start_time ) ;
agg_vals_init ( & aggs , thread - > start_time ) ;
@ -3568,7 +3549,7 @@ threadRun(void *arg)
int prev_ecnt = st - > ecnt ;
st - > use_file = getrand ( thread , 0 , num_files - 1 ) ;
if ( ! doCustom ( thread , st , & result - > conn_time , logfile , & aggs ) )
if ( ! doCustom ( thread , st , & thread - > conn_time , logfile , & aggs ) )
remains - - ; /* I've aborted */
if ( st - > ecnt > prev_ecnt & & commands [ st - > state ] - > type = = META_COMMAND )
@ -3650,11 +3631,7 @@ threadRun(void *arg)
}
/* also wake up to print the next progress report on time */
if ( progress & & min_usec > 0
# if !defined(PTHREAD_FORK_EMULATION)
& & thread - > tid = = 0
# endif /* !PTHREAD_FORK_EMULATION */
)
if ( progress & & min_usec > 0 )
{
/* get current time if needed */
if ( now_usec = = 0 )
@ -3710,7 +3687,7 @@ threadRun(void *arg)
if ( st - > con & & ( FD_ISSET ( PQsocket ( st - > con ) , & input_mask )
| | commands [ st - > state ] - > type = = META_COMMAND ) )
{
if ( ! doCustom ( thread , st , & result - > conn_time , logfile , & aggs ) )
if ( ! doCustom ( thread , st , & thread - > conn_time , logfile , & aggs ) )
remains - - ; /* I've aborted */
}
@ -3723,76 +3700,6 @@ threadRun(void *arg)
}
}
# ifdef PTHREAD_FORK_EMULATION
/* each process reports its own progression */
if ( progress )
{
instr_time now_time ;
int64 now ;
INSTR_TIME_SET_CURRENT ( now_time ) ;
now = INSTR_TIME_GET_MICROSEC ( now_time ) ;
if ( now > = next_report )
{
/* generate and show report */
int64 count = 0 ,
lats = 0 ,
sqlats = 0 ,
skipped = 0 ;
int64 lags = thread - > throttle_lag ;
int64 run = now - last_report ;
double tps ,
total_run ,
latency ,
sqlat ,
stdev ,
lag ;
for ( i = 0 ; i < nstate ; i + + )
{
count + = state [ i ] . cnt ;
lats + = state [ i ] . txn_latencies ;
sqlats + = state [ i ] . txn_sqlats ;
}
total_run = ( now - thread_start ) / 1000000.0 ;
tps = 1000000.0 * ( count - last_count ) / run ;
latency = 0.001 * ( lats - last_lats ) / ( count - last_count ) ;
sqlat = 1.0 * ( sqlats - last_sqlats ) / ( count - last_count ) ;
stdev = 0.001 * sqrt ( sqlat - 1000000.0 * latency * latency ) ;
lag = 0.001 * ( lags - last_lags ) / ( count - last_count ) ;
skipped = thread - > throttle_latency_skipped - last_skipped ;
fprintf ( stderr ,
" progress %d: %.1f s, %.1f tps, "
" lat %.3f ms stddev %.3f " ,
thread - > tid , total_run , tps , latency , stdev ) ;
if ( throttle_delay )
{
fprintf ( stderr , " , lag %.3f ms " , lag ) ;
if ( latency_limit )
fprintf ( stderr , " , skipped " INT64_FORMAT , skipped ) ;
}
fprintf ( stderr , " \n " ) ;
last_count = count ;
last_lats = lats ;
last_sqlats = sqlats ;
last_lags = lags ;
last_report = now ;
last_skipped = thread - > throttle_latency_skipped ;
/*
* Ensure that the next report is in the future , in case
* pgbench / postgres got stuck somewhere .
*/
do
{
next_report + = ( int64 ) progress * 1000000 ;
} while ( now > = next_report ) ;
}
}
# else
/* progress report by thread 0 for all threads */
if ( progress & & thread - > tid = = 0 )
{
@ -3817,6 +3724,17 @@ threadRun(void *arg)
lag ,
stdev ;
/*
* Add up the statistics of all threads .
*
* XXX : No locking . There is no guarantee that we get an
* atomic snapshot of the transaction count and latencies , so
* these figures can well be off by a small amount . The
* progress is report ' s purpose is to give a quick overview of
* how the test is going , so that shouldn ' t matter too much .
* ( If a read from a 64 - bit integer is not atomic , you might
* get a " torn " read and completely bogus latencies though ! )
*/
for ( i = 0 ; i < progress_nclients ; i + + )
{
count + = state [ i ] . cnt ;
@ -3864,31 +3782,16 @@ threadRun(void *arg)
} while ( now > = next_report ) ;
}
}
# endif /* PTHREAD_FORK_EMULATION */
}
done :
INSTR_TIME_SET_CURRENT ( start ) ;
disconnect_all ( state , nstate ) ;
result - > xacts = 0 ;
result - > latencies = 0 ;
result - > sqlats = 0 ;
for ( i = 0 ; i < nstate ; i + + )
{
result - > xacts + = state [ i ] . cnt ;
result - > latencies + = state [ i ] . txn_latencies ;
result - > sqlats + = state [ i ] . txn_sqlats ;
}
result - > throttle_lag = thread - > throttle_lag ;
result - > throttle_lag_max = thread - > throttle_lag_max ;
result - > throttle_latency_skipped = thread - > throttle_latency_skipped ;
result - > latency_late = thread - > latency_late ;
INSTR_TIME_SET_CURRENT ( end ) ;
INSTR_TIME_ACCUM_DIFF ( result - > conn_time , end , start ) ;
INSTR_TIME_ACCUM_DIFF ( thread - > conn_time , end , start ) ;
if ( logfile )
fclose ( logfile ) ;
return result ;
return NULL ;
}
/*
@ -3910,90 +3813,6 @@ setalarm(int seconds)
alarm ( seconds ) ;
}
# ifndef ENABLE_THREAD_SAFETY
/*
* implements pthread using fork .
*/
typedef struct fork_pthread
{
pid_t pid ;
int pipes [ 2 ] ;
} fork_pthread ;
static int
pthread_create ( pthread_t * thread ,
pthread_attr_t * attr ,
void * ( * start_routine ) ( void * ) ,
void * arg )
{
fork_pthread * th ;
void * ret ;
int rc ;
th = ( fork_pthread * ) pg_malloc ( sizeof ( fork_pthread ) ) ;
if ( pipe ( th - > pipes ) < 0 )
{
free ( th ) ;
return errno ;
}
th - > pid = fork ( ) ;
if ( th - > pid = = - 1 ) /* error */
{
free ( th ) ;
return errno ;
}
if ( th - > pid ! = 0 ) /* in parent process */
{
close ( th - > pipes [ 1 ] ) ;
* thread = th ;
return 0 ;
}
/* in child process */
close ( th - > pipes [ 0 ] ) ;
/* set alarm again because the child does not inherit timers */
if ( duration > 0 )
setalarm ( duration ) ;
ret = start_routine ( arg ) ;
rc = write ( th - > pipes [ 1 ] , ret , sizeof ( TResult ) ) ;
( void ) rc ;
close ( th - > pipes [ 1 ] ) ;
free ( th ) ;
exit ( 0 ) ;
}
static int
pthread_join ( pthread_t th , void * * thread_return )
{
int status ;
while ( waitpid ( th - > pid , & status , 0 ) ! = th - > pid )
{
if ( errno ! = EINTR )
return errno ;
}
if ( thread_return ! = NULL )
{
/* assume result is TResult */
* thread_return = pg_malloc ( sizeof ( TResult ) ) ;
if ( read ( th - > pipes [ 0 ] , * thread_return , sizeof ( TResult ) ) ! = sizeof ( TResult ) )
{
free ( * thread_return ) ;
* thread_return = NULL ;
}
}
close ( th - > pipes [ 0 ] ) ;
free ( th ) ;
return 0 ;
}
# endif
# else /* WIN32 */
static VOID CALLBACK