@ -11,18 +11,39 @@
# include <windows.h>
# endif
# ifndef WIN32
# include <sys/time.h>
# include <unistd.h>
# endif /* ! WIN32 */
# ifdef HAVE_SYS_SELECT_H
# include <sys/select.h>
# endif
# include "libpq-fe.h"
# include "isolationtester.h"
# define PREP_WAITING "isolationtester_waiting"
/*
* conns [ 0 ] is the global setup , teardown , and watchdog connection . Additional
* connections represent spec - defined sessions .
*/
static PGconn * * conns = NULL ;
static const char * * backend_ids = NULL ;
static int nconns = 0 ;
static void run_all_permutations ( TestSpec * testspec ) ;
static void run_all_permutations_recurse ( TestSpec * testspec , int nsteps , Step * * steps ) ;
static void run_all_permutations_recurse ( TestSpec * testspec , int nsteps ,
Step * * steps ) ;
static void run_named_permutations ( TestSpec * testspec ) ;
static void run_permutation ( TestSpec * testspec , int nsteps , Step * * steps ) ;
# define STEP_NONBLOCK 0x1 /* return 0 as soon as cmd waits for a lock */
# define STEP_RETRY 0x2 /* this is a retry of a previously-waiting cmd */
static bool try_complete_step ( Step * step , int flags ) ;
static int step_qsort_cmp ( const void * a , const void * b ) ;
static int step_bsearch_cmp ( const void * a , const void * b ) ;
@ -45,6 +66,7 @@ main(int argc, char **argv)
const char * conninfo ;
TestSpec * testspec ;
int i ;
PGresult * res ;
/*
* If the user supplies a parameter on the command line , use it as the
@ -61,13 +83,15 @@ main(int argc, char **argv)
testspec = & parseresult ;
printf ( " Parsed test spec with %d sessions \n " , testspec - > nsessions ) ;
/* Establish connections to the database, one for each session */
nconns = testspec - > nsessions ;
/*
* Establish connections to the database , one for each session and an extra
* for lock wait detection and global work .
*/
nconns = 1 + testspec - > nsessions ;
conns = calloc ( nconns , sizeof ( PGconn * ) ) ;
for ( i = 0 ; i < testspec - > nsessions ; i + + )
backend_ids = calloc ( nconns , sizeof ( * backend_ids ) ) ;
for ( i = 0 ; i < nconns ; i + + )
{
PGresult * res ;
conns [ i ] = PQconnectdb ( conninfo ) ;
if ( PQstatus ( conns [ i ] ) ! = CONNECTION_OK )
{
@ -87,6 +111,28 @@ main(int argc, char **argv)
exit_nicely ( ) ;
}
PQclear ( res ) ;
/* Get the backend ID for lock wait checking. */
res = PQexec ( conns [ i ] , " SELECT i FROM pg_stat_get_backend_idset() t(i) "
" WHERE pg_stat_get_backend_pid(i) = pg_backend_pid() " ) ;
if ( PQresultStatus ( res ) = = PGRES_TUPLES_OK )
{
if ( PQntuples ( res ) = = 1 & & PQnfields ( res ) = = 1 )
backend_ids [ i ] = strdup ( PQgetvalue ( res , 0 , 0 ) ) ;
else
{
fprintf ( stderr , " backend id query returned %d rows and %d columns, expected 1 row and 1 column " ,
PQntuples ( res ) , PQnfields ( res ) ) ;
exit_nicely ( ) ;
}
}
else
{
fprintf ( stderr , " backend id query failed: %s " ,
PQerrorMessage ( conns [ i ] ) ) ;
exit_nicely ( ) ;
}
PQclear ( res ) ;
}
/* Set the session index fields in steps. */
@ -99,6 +145,16 @@ main(int argc, char **argv)
session - > steps [ stepindex ] - > session = i ;
}
res = PQprepare ( conns [ 0 ] , PREP_WAITING ,
" SELECT 1 WHERE pg_stat_get_backend_waiting($1) " , 0 , NULL ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
{
fprintf ( stderr , " prepare of lock wait query failed: %s " ,
PQerrorMessage ( conns [ 0 ] ) ) ;
exit_nicely ( ) ;
}
PQclear ( res ) ;
/*
* Run the permutations specified in the spec , or all if none were
* explicitly specified .
@ -254,6 +310,7 @@ run_permutation(TestSpec * testspec, int nsteps, Step ** steps)
{
PGresult * res ;
int i ;
Step * waiting = NULL ;
printf ( " \n starting permutation: " ) ;
for ( i = 0 ; i < nsteps ; i + + )
@ -277,12 +334,12 @@ run_permutation(TestSpec * testspec, int nsteps, Step ** steps)
{
if ( testspec - > sessions [ i ] - > setupsql )
{
res = PQexec ( conns [ i ] , testspec - > sessions [ i ] - > setupsql ) ;
res = PQexec ( conns [ i + 1 ] , testspec - > sessions [ i ] - > setupsql ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
{
fprintf ( stderr , " setup of session %s failed: %s " ,
testspec - > sessions [ i ] - > name ,
PQerrorMessage ( conns [ 0 ] ) ) ;
PQerrorMessage ( conns [ i + 1 ] ) ) ;
exit_nicely ( ) ;
}
PQclear ( res ) ;
@ -292,44 +349,43 @@ run_permutation(TestSpec * testspec, int nsteps, Step ** steps)
/* Perform steps */
for ( i = 0 ; i < nsteps ; i + + )
{
Step * step = steps [ i ] ;
Step * step = steps [ i ] ;
printf ( " step %s: %s \n " , step - > name , step - > sql ) ;
res = PQexec ( conns [ step - > session ] , step - > sql ) ;
switch ( PQresultStatus ( res ) )
if ( ! PQsendQuery ( conns [ 1 + step - > session ] , step - > sql ) )
{
case PGRES_COMMAND_OK :
break ;
case PGRES_TUPLES_OK :
printResultSet ( res ) ;
break ;
fprintf ( stdout , " failed to send query: %s \n " ,
PQerrorMessage ( conns [ 1 + step - > session ] ) ) ;
exit_nicely ( ) ;
}
case PGRES_FATAL_ERROR :
/* Detail may contain xid values, so just show primary. */
printf ( " %s: %s \n " , PQresultErrorField ( res , PG_DIAG_SEVERITY ) ,
PQresultErrorField ( res , PG_DIAG_MESSAGE_PRIMARY ) ) ;
break ;
if ( waiting ! = NULL )
{
/* Some other step is already waiting: just block. */
try_complete_step ( step , 0 ) ;
default :
printf ( " unexpected result status: %s \n " ,
PQresStatus ( PQresultStatus ( res ) ) ) ;
/* See if this step unblocked the waiting step. */
if ( ! try_complete_step ( waiting , STEP_NONBLOCK | STEP_RETRY ) )
waiting = NULL ;
}
PQclear ( res ) ;
else if ( try_complete_step ( step , STEP_NONBLOCK ) )
waiting = step ;
}
/* Finish any waiting query. */
if ( waiting ! = NULL )
try_complete_step ( waiting , STEP_RETRY ) ;
/* Perform per-session teardown */
for ( i = 0 ; i < testspec - > nsessions ; i + + )
{
if ( testspec - > sessions [ i ] - > teardownsql )
{
res = PQexec ( conns [ i ] , testspec - > sessions [ i ] - > teardownsql ) ;
res = PQexec ( conns [ i + 1 ] , testspec - > sessions [ i ] - > teardownsql ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
{
fprintf ( stderr , " teardown of session %s failed: %s " ,
testspec - > sessions [ i ] - > name ,
PQerrorMessage ( conns [ 0 ] ) ) ;
PQerrorMessage ( conns [ i + 1 ] ) ) ;
/* don't exit on teardown failure */
}
PQclear ( res ) ;
@ -351,6 +407,105 @@ run_permutation(TestSpec * testspec, int nsteps, Step ** steps)
}
}
/*
* Our caller already sent the query associated with this step . Wait for it
* to either complete or ( if given the STEP_NONBLOCK flag ) to block while
* waiting for a lock . We assume that any lock wait will persist until we
* have executed additional steps in the permutation . This is not fully
* robust - - a concurrent autovacuum could briefly take a lock with which we
* conflict . The risk may be low enough to discount .
*
* When calling this function on behalf of a given step for a second or later
* time , pass the STEP_RETRY flag . This only affects the messages printed .
*
* If the STEP_NONBLOCK flag was specified and the query is waiting to acquire
* a lock , returns true . Otherwise , returns false .
*/
static bool
try_complete_step ( Step * step , int flags )
{
PGconn * conn = conns [ 1 + step - > session ] ;
fd_set read_set ;
struct timeval timeout ;
int sock = PQsocket ( conn ) ;
int ret ;
PGresult * res ;
FD_ZERO ( & read_set ) ;
while ( flags & STEP_NONBLOCK & & PQisBusy ( conn ) )
{
FD_SET ( sock , & read_set ) ;
timeout . tv_sec = 0 ;
timeout . tv_usec = 10000 ; /* Check for lock waits every 10ms. */
ret = select ( sock + 1 , & read_set , NULL , NULL , & timeout ) ;
if ( ret < 0 ) /* error in select() */
{
fprintf ( stderr , " select failed: %s \n " , strerror ( errno ) ) ;
exit_nicely ( ) ;
}
else if ( ret = = 0 ) /* select() timeout: check for lock wait */
{
int ntuples ;
res = PQexecPrepared ( conns [ 0 ] , PREP_WAITING , 1 ,
& backend_ids [ step - > session + 1 ] ,
NULL , NULL , 0 ) ;
if ( PQresultStatus ( res ) ! = PGRES_TUPLES_OK )
{
fprintf ( stderr , " lock wait query failed: %s " ,
PQerrorMessage ( conn ) ) ;
exit_nicely ( ) ;
}
ntuples = PQntuples ( res ) ;
PQclear ( res ) ;
if ( ntuples > = 1 ) /* waiting to acquire a lock */
{
if ( ! ( flags & STEP_RETRY ) )
printf ( " step %s: %s <waiting ...> \n " ,
step - > name , step - > sql ) ;
return true ;
}
/* else, not waiting: give it more time */
}
else if ( ! PQconsumeInput ( conn ) ) /* select(): data available */
{
fprintf ( stderr , " PQconsumeInput failed: %s " , PQerrorMessage ( conn ) ) ;
exit_nicely ( ) ;
}
}
if ( flags & STEP_RETRY )
printf ( " step %s: <... completed> \n " , step - > name ) ;
else
printf ( " step %s: %s \n " , step - > name , step - > sql ) ;
while ( ( res = PQgetResult ( conn ) ) )
{
switch ( PQresultStatus ( res ) )
{
case PGRES_COMMAND_OK :
break ;
case PGRES_TUPLES_OK :
printResultSet ( res ) ;
break ;
case PGRES_FATAL_ERROR :
/* Detail may contain xid values, so just show primary. */
printf ( " %s: %s \n " , PQresultErrorField ( res , PG_DIAG_SEVERITY ) ,
PQresultErrorField ( res , PG_DIAG_MESSAGE_PRIMARY ) ) ;
break ;
default :
printf ( " unexpected result status: %s \n " ,
PQresStatus ( PQresultStatus ( res ) ) ) ;
}
PQclear ( res ) ;
}
return false ;
}
static void
printResultSet ( PGresult * res )
{