@ -31,7 +31,6 @@
# include "settings.h"
static bool DescribeQuery ( const char * query , double * elapsed_msec ) ;
static bool ExecQueryUsingCursor ( const char * query , double * elapsed_msec ) ;
static int ExecQueryAndProcessResults ( const char * query ,
double * elapsed_msec ,
bool * svpt_gone_p ,
@ -40,7 +39,6 @@ static int ExecQueryAndProcessResults(const char *query,
const printQueryOpt * opt ,
FILE * printQueryFout ) ;
static bool command_no_begin ( const char * query ) ;
static bool is_select_command ( const char * query ) ;
/*
@ -83,6 +81,46 @@ openQueryOutputFile(const char *fname, FILE **fout, bool *is_pipe)
return true ;
}
/*
* Check if an output stream for \ g needs to be opened , and if yes ,
* open it and update the caller ' s gfile_fout and is_pipe state variables .
* Return true if OK , false if an error occurred .
*/
static bool
SetupGOutput ( FILE * * gfile_fout , bool * is_pipe )
{
/* If there is a \g file or program, and it's not already open, open it */
if ( pset . gfname ! = NULL & & * gfile_fout = = NULL )
{
if ( openQueryOutputFile ( pset . gfname , gfile_fout , is_pipe ) )
{
if ( * is_pipe )
disable_sigpipe_trap ( ) ;
}
else
return false ;
}
return true ;
}
/*
* Close the output stream for \ g , if we opened it .
*/
static void
CloseGOutput ( FILE * gfile_fout , bool is_pipe )
{
if ( gfile_fout )
{
if ( is_pipe )
{
SetShellResultVariables ( pclose ( gfile_fout ) ) ;
restore_sigpipe_trap ( ) ;
}
else
fclose ( gfile_fout ) ;
}
}
/*
* setQFout
* - - handler for - o command line option and \ o command
@ -373,6 +411,7 @@ AcceptResult(const PGresult *result, bool show_error)
{
case PGRES_COMMAND_OK :
case PGRES_TUPLES_OK :
case PGRES_TUPLES_CHUNK :
case PGRES_EMPTY_QUERY :
case PGRES_COPY_IN :
case PGRES_COPY_OUT :
@ -1135,16 +1174,10 @@ SendQuery(const char *query)
/* Describe query's result columns, without executing it */
OK = DescribeQuery ( query , & elapsed_msec ) ;
}
else if ( pset . fetch_count < = 0 | | pset . gexec_flag | |
pset . crosstab_flag | | ! is_select_command ( query ) )
{
/* Default fetch-it-all-and-print mode */
OK = ( ExecQueryAndProcessResults ( query , & elapsed_msec , & svpt_gone , false , 0 , NULL , NULL ) > 0 ) ;
}
else
{
/* Fetch-in-segments mode */
OK = ExecQueryUsingCursor ( query , & elapsed_msec ) ;
/* Default fetch-and-print mode */
OK = ( ExecQueryAndProcessResults ( query , & elapsed_msec , & svpt_gone , false , 0 , NULL , NULL ) > 0 ) ;
}
if ( ! OK & & pset . echo = = PSQL_ECHO_ERRORS )
@ -1454,6 +1487,21 @@ ExecQueryAndProcessResults(const char *query,
return - 1 ;
}
/*
* Fetch the result in chunks if FETCH_COUNT is set . But we don ' t enable
* chunking if SHOW_ALL_RESULTS is false , since that requires us to
* accumulate all rows before we can tell what should be displayed , which
* would counter the idea of FETCH_COUNT . Chunking is also disabled when
* \ crosstab , \ gexec , \ gset or \ watch is used .
*/
if ( pset . fetch_count > 0 & & pset . show_all_results & &
! pset . crosstab_flag & & ! pset . gexec_flag & &
! pset . gset_prefix & & ! is_watch )
{
if ( ! PQsetChunkedRowsMode ( pset . db , pset . fetch_count ) )
pg_log_warning ( " fetching results in chunked mode failed " ) ;
}
/*
* If SIGINT is sent while the query is processing , the interrupt will be
* consumed . The user ' s intention , though , is to cancel the entire watch
@ -1475,6 +1523,7 @@ ExecQueryAndProcessResults(const char *query,
while ( result ! = NULL )
{
ExecStatusType result_status ;
bool is_chunked_result = false ;
PGresult * next_result ;
bool last ;
@ -1572,20 +1621,9 @@ ExecQueryAndProcessResults(const char *query,
}
else if ( pset . gfname )
{
/* send to \g file, which we may have opened already */
if ( gfile_fout = = NULL )
{
if ( openQueryOutputFile ( pset . gfname ,
& gfile_fout , & gfile_is_pipe ) )
{
if ( gfile_is_pipe )
disable_sigpipe_trap ( ) ;
copy_stream = gfile_fout ;
}
else
success = false ;
}
else
/* COPY followed by \g filename or \g |program */
success & = SetupGOutput ( & gfile_fout , & gfile_is_pipe ) ;
if ( gfile_fout )
copy_stream = gfile_fout ;
}
else
@ -1603,6 +1641,101 @@ ExecQueryAndProcessResults(const char *query,
success & = HandleCopyResult ( & result , copy_stream ) ;
}
/* If we have a chunked result, collect and print all chunks */
if ( result_status = = PGRES_TUPLES_CHUNK )
{
FILE * tuples_fout = printQueryFout ? printQueryFout : stdout ;
printQueryOpt my_popt = pset . popt ;
int64 total_tuples = 0 ;
bool is_pager = false ;
int flush_error = 0 ;
/* initialize print options for partial table output */
my_popt . topt . start_table = true ;
my_popt . topt . stop_table = false ;
my_popt . topt . prior_records = 0 ;
/* open \g file if needed */
success & = SetupGOutput ( & gfile_fout , & gfile_is_pipe ) ;
if ( gfile_fout )
tuples_fout = gfile_fout ;
/* force use of pager for any chunked resultset going to stdout */
if ( success & & tuples_fout = = stdout )
{
tuples_fout = PageOutput ( INT_MAX , & ( my_popt . topt ) ) ;
is_pager = true ;
}
do
{
/*
* display the current chunk of results , unless the output
* stream stopped working or we got cancelled
*/
if ( success & & ! flush_error & & ! cancel_pressed )
{
printQuery ( result , & my_popt , tuples_fout , is_pager , pset . logfile ) ;
flush_error = fflush ( tuples_fout ) ;
}
/* after the first result set, disallow header decoration */
my_popt . topt . start_table = false ;
/* count tuples before dropping the result */
my_popt . topt . prior_records + = PQntuples ( result ) ;
total_tuples + = PQntuples ( result ) ;
ClearOrSaveResult ( result ) ;
/* get the next result, loop if it's PGRES_TUPLES_CHUNK */
result = PQgetResult ( pset . db ) ;
} while ( PQresultStatus ( result ) = = PGRES_TUPLES_CHUNK ) ;
/* We expect an empty PGRES_TUPLES_OK, else there's a problem */
if ( PQresultStatus ( result ) = = PGRES_TUPLES_OK )
{
char buf [ 32 ] ;
Assert ( PQntuples ( result ) = = 0 ) ;
/* Display the footer using the empty result */
if ( success & & ! flush_error & & ! cancel_pressed )
{
my_popt . topt . stop_table = true ;
printQuery ( result , & my_popt , tuples_fout , is_pager , pset . logfile ) ;
fflush ( tuples_fout ) ;
}
if ( is_pager )
ClosePager ( tuples_fout ) ;
/*
* We must do a fake SetResultVariables ( ) , since we don ' t have
* a PGresult corresponding to the whole query .
*/
SetVariable ( pset . vars , " ERROR " , " false " ) ;
SetVariable ( pset . vars , " SQLSTATE " , " 00000 " ) ;
snprintf ( buf , sizeof ( buf ) , INT64_FORMAT , total_tuples ) ;
SetVariable ( pset . vars , " ROW_COUNT " , buf ) ;
/* Prevent SetResultVariables call below */
is_chunked_result = true ;
/* Clear the empty result so it isn't printed below */
ClearOrSaveResult ( result ) ;
result = NULL ;
}
else
{
/* Probably an error report, so close the pager and print it */
if ( is_pager )
ClosePager ( tuples_fout ) ;
success & = AcceptResult ( result , true ) ;
/* SetResultVariables and ClearOrSaveResult happen below */
}
}
/*
* Check PQgetResult ( ) again . In the typical case of a single - command
* string , it will return NULL . Otherwise , we ' ll have other results
@ -1640,31 +1773,18 @@ ExecQueryAndProcessResults(const char *query,
* tuple output , but it ' s still used for status output .
*/
FILE * tuples_fout = printQueryFout ;
bool do_print = true ;
if ( PQresultStatus ( result ) = = PGRES_TUPLES_OK & &
pset . gfname )
{
if ( gfile_fout = = NULL )
{
if ( openQueryOutputFile ( pset . gfname ,
& gfile_fout , & gfile_is_pipe ) )
{
if ( gfile_is_pipe )
disable_sigpipe_trap ( ) ;
}
else
success = do_print = false ;
}
if ( PQresultStatus ( result ) = = PGRES_TUPLES_OK )
success & = SetupGOutput ( & gfile_fout , & gfile_is_pipe ) ;
if ( gfile_fout )
tuples_fout = gfile_fout ;
}
if ( do_print )
if ( success )
success & = PrintQueryResult ( result , last , opt ,
tuples_fout , printQueryFout ) ;
}
/* set variables from last result */
if ( ! is_watch & & las t)
/* set variables from last result, unless dealt with elsewhere */
if ( last & & ! is_watch & & ! is_chunked_result )
SetResultVariables ( result , success ) ;
ClearOrSaveResult ( result ) ;
@ -1678,16 +1798,7 @@ ExecQueryAndProcessResults(const char *query,
}
/* close \g file if we opened it */
if ( gfile_fout )
{
if ( gfile_is_pipe )
{
SetShellResultVariables ( pclose ( gfile_fout ) ) ;
restore_sigpipe_trap ( ) ;
}
else
fclose ( gfile_fout ) ;
}
CloseGOutput ( gfile_fout , gfile_is_pipe ) ;
/* may need this to recover from conn loss during COPY */
if ( ! CheckConnection ( ) )
@ -1700,274 +1811,6 @@ ExecQueryAndProcessResults(const char *query,
}
/*
* ExecQueryUsingCursor : run a SELECT - like query using a cursor
*
* This feature allows result sets larger than RAM to be dealt with .
*
* Returns true if the query executed successfully , false otherwise .
*
* If pset . timing is on , total query time ( exclusive of result - printing ) is
* stored into * elapsed_msec .
*/
static bool
ExecQueryUsingCursor ( const char * query , double * elapsed_msec )
{
bool OK = true ;
PGresult * result ;
PQExpBufferData buf ;
printQueryOpt my_popt = pset . popt ;
bool timing = pset . timing ;
FILE * fout ;
bool is_pipe ;
bool is_pager = false ;
bool started_txn = false ;
int64 total_tuples = 0 ;
int ntuples ;
int fetch_count ;
char fetch_cmd [ 64 ] ;
instr_time before ,
after ;
int flush_error ;
* elapsed_msec = 0 ;
/* initialize print options for partial table output */
my_popt . topt . start_table = true ;
my_popt . topt . stop_table = false ;
my_popt . topt . prior_records = 0 ;
if ( timing )
INSTR_TIME_SET_CURRENT ( before ) ;
else
INSTR_TIME_SET_ZERO ( before ) ;
/* if we're not in a transaction, start one */
if ( PQtransactionStatus ( pset . db ) = = PQTRANS_IDLE )
{
result = PQexec ( pset . db , " BEGIN " ) ;
OK = AcceptResult ( result , true ) & &
( PQresultStatus ( result ) = = PGRES_COMMAND_OK ) ;
ClearOrSaveResult ( result ) ;
if ( ! OK )
return false ;
started_txn = true ;
}
/* Send DECLARE CURSOR */
initPQExpBuffer ( & buf ) ;
appendPQExpBuffer ( & buf , " DECLARE _psql_cursor NO SCROLL CURSOR FOR \n %s " ,
query ) ;
result = PQexec ( pset . db , buf . data ) ;
OK = AcceptResult ( result , true ) & &
( PQresultStatus ( result ) = = PGRES_COMMAND_OK ) ;
if ( ! OK )
SetResultVariables ( result , OK ) ;
ClearOrSaveResult ( result ) ;
termPQExpBuffer ( & buf ) ;
if ( ! OK )
goto cleanup ;
if ( timing )
{
INSTR_TIME_SET_CURRENT ( after ) ;
INSTR_TIME_SUBTRACT ( after , before ) ;
* elapsed_msec + = INSTR_TIME_GET_MILLISEC ( after ) ;
}
/*
* In \ gset mode , we force the fetch count to be 2 , so that we will throw
* the appropriate error if the query returns more than one row .
*/
if ( pset . gset_prefix )
fetch_count = 2 ;
else
fetch_count = pset . fetch_count ;
snprintf ( fetch_cmd , sizeof ( fetch_cmd ) ,
" FETCH FORWARD %d FROM _psql_cursor " ,
fetch_count ) ;
/* prepare to write output to \g argument, if any */
if ( pset . gfname )
{
if ( ! openQueryOutputFile ( pset . gfname , & fout , & is_pipe ) )
{
OK = false ;
goto cleanup ;
}
if ( is_pipe )
disable_sigpipe_trap ( ) ;
}
else
{
fout = pset . queryFout ;
is_pipe = false ; /* doesn't matter */
}
/* clear any pre-existing error indication on the output stream */
clearerr ( fout ) ;
for ( ; ; )
{
if ( timing )
INSTR_TIME_SET_CURRENT ( before ) ;
/* get fetch_count tuples at a time */
result = PQexec ( pset . db , fetch_cmd ) ;
if ( timing )
{
INSTR_TIME_SET_CURRENT ( after ) ;
INSTR_TIME_SUBTRACT ( after , before ) ;
* elapsed_msec + = INSTR_TIME_GET_MILLISEC ( after ) ;
}
if ( PQresultStatus ( result ) ! = PGRES_TUPLES_OK )
{
/* shut down pager before printing error message */
if ( is_pager )
{
ClosePager ( fout ) ;
is_pager = false ;
}
OK = AcceptResult ( result , true ) ;
Assert ( ! OK ) ;
SetResultVariables ( result , OK ) ;
ClearOrSaveResult ( result ) ;
break ;
}
if ( pset . gset_prefix )
{
/* StoreQueryTuple will complain if not exactly one row */
OK = StoreQueryTuple ( result ) ;
ClearOrSaveResult ( result ) ;
break ;
}
/*
* Note we do not deal with \ gdesc , \ gexec or \ crosstabview modes here
*/
ntuples = PQntuples ( result ) ;
total_tuples + = ntuples ;
if ( ntuples < fetch_count )
{
/* this is the last result set, so allow footer decoration */
my_popt . topt . stop_table = true ;
}
else if ( fout = = stdout & & ! is_pager )
{
/*
* If query requires multiple result sets , hack to ensure that
* only one pager instance is used for the whole mess
*/
fout = PageOutput ( INT_MAX , & ( my_popt . topt ) ) ;
is_pager = true ;
}
printQuery ( result , & my_popt , fout , is_pager , pset . logfile ) ;
ClearOrSaveResult ( result ) ;
/* after the first result set, disallow header decoration */
my_popt . topt . start_table = false ;
my_popt . topt . prior_records + = ntuples ;
/*
* Make sure to flush the output stream , so intermediate results are
* visible to the client immediately . We check the results because if
* the pager dies / exits / etc , there ' s no sense throwing more data at
* it .
*/
flush_error = fflush ( fout ) ;
/*
* Check if we are at the end , if a cancel was pressed , or if there
* were any errors either trying to flush out the results , or more
* generally on the output stream at all . If we hit any errors
* writing things to the stream , we presume $ PAGER has disappeared and
* stop bothering to pull down more data .
*/
if ( ntuples < fetch_count | | cancel_pressed | | flush_error | |
ferror ( fout ) )
break ;
}
if ( pset . gfname )
{
/* close \g argument file/pipe */
if ( is_pipe )
{
SetShellResultVariables ( pclose ( fout ) ) ;
restore_sigpipe_trap ( ) ;
}
else
fclose ( fout ) ;
}
else if ( is_pager )
{
/* close transient pager */
ClosePager ( fout ) ;
}
if ( OK )
{
/*
* We don ' t have a PGresult here , and even if we did it wouldn ' t have
* the right row count , so fake SetResultVariables ( ) . In error cases ,
* we already set the result variables above .
*/
char buf [ 32 ] ;
SetVariable ( pset . vars , " ERROR " , " false " ) ;
SetVariable ( pset . vars , " SQLSTATE " , " 00000 " ) ;
snprintf ( buf , sizeof ( buf ) , INT64_FORMAT , total_tuples ) ;
SetVariable ( pset . vars , " ROW_COUNT " , buf ) ;
}
cleanup :
if ( timing )
INSTR_TIME_SET_CURRENT ( before ) ;
/*
* We try to close the cursor on either success or failure , but on failure
* ignore the result ( it ' s probably just a bleat about being in an aborted
* transaction )
*/
result = PQexec ( pset . db , " CLOSE _psql_cursor " ) ;
if ( OK )
{
OK = AcceptResult ( result , true ) & &
( PQresultStatus ( result ) = = PGRES_COMMAND_OK ) ;
ClearOrSaveResult ( result ) ;
}
else
PQclear ( result ) ;
if ( started_txn )
{
result = PQexec ( pset . db , OK ? " COMMIT " : " ROLLBACK " ) ;
OK & = AcceptResult ( result , true ) & &
( PQresultStatus ( result ) = = PGRES_COMMAND_OK ) ;
ClearOrSaveResult ( result ) ;
}
if ( timing )
{
INSTR_TIME_SET_CURRENT ( after ) ;
INSTR_TIME_SUBTRACT ( after , before ) ;
* elapsed_msec + = INSTR_TIME_GET_MILLISEC ( after ) ;
}
return OK ;
}
/*
* Advance the given char pointer over white space and SQL comments .
*/
@ -2247,43 +2090,6 @@ command_no_begin(const char *query)
}
/*
* Check whether the specified command is a SELECT ( or VALUES ) .
*/
static bool
is_select_command ( const char * query )
{
int wordlen ;
/*
* First advance over any whitespace , comments and left parentheses .
*/
for ( ; ; )
{
query = skip_white_space ( query ) ;
if ( query [ 0 ] = = ' ( ' )
query + + ;
else
break ;
}
/*
* Check word length ( since " selectx " is not " select " ) .
*/
wordlen = 0 ;
while ( isalpha ( ( unsigned char ) query [ wordlen ] ) )
wordlen + = PQmblenBounded ( & query [ wordlen ] , pset . encoding ) ;
if ( wordlen = = 6 & & pg_strncasecmp ( query , " select " , 6 ) = = 0 )
return true ;
if ( wordlen = = 6 & & pg_strncasecmp ( query , " values " , 6 ) = = 0 )
return true ;
return false ;
}
/*
* Test if the current user is a database superuser .
*/