@ -33,6 +33,7 @@ static bool DescribeQuery(const char *query, double *elapsed_msec);
static bool ExecQueryUsingCursor ( const char * query , double * elapsed_msec ) ;
static bool command_no_begin ( const char * query ) ;
static bool is_select_command ( const char * query ) ;
static int SendQueryAndProcessResults ( const char * query , double * pelapsed_msec , bool is_watch ) ;
/*
@ -353,7 +354,7 @@ CheckConnection(void)
* Returns true for valid result , false for error state .
*/
static bool
AcceptResult ( const PGresult * result )
AcceptResult ( const PGresult * result , bool show_error )
{
bool OK ;
@ -384,7 +385,7 @@ AcceptResult(const PGresult *result)
break ;
}
if ( ! OK )
if ( ! OK & & show_error )
{
const char * error = PQerrorMessage ( pset . db ) ;
@ -472,6 +473,18 @@ ClearOrSaveResult(PGresult *result)
}
}
/*
* Consume all results
*/
static void
ClearOrSaveAllResults ( )
{
PGresult * result ;
while ( ( result = PQgetResult ( pset . db ) ) ! = NULL )
ClearOrSaveResult ( result ) ;
}
/*
* Print microtiming output . Always print raw milliseconds ; if the interval
@ -572,7 +585,7 @@ PSQLexec(const char *query)
ResetCancelConn ( ) ;
if ( ! AcceptResult ( res ) )
if ( ! AcceptResult ( res , true ) )
{
ClearOrSaveResult ( res ) ;
res = NULL ;
@ -594,10 +607,8 @@ PSQLexec(const char *query)
int
PSQLexecWatch ( const char * query , const printQueryOpt * opt )
{
PGresult * res ;
double elapsed_msec = 0 ;
instr_time before ;
instr_time after ;
int res ;
if ( ! pset . db )
{
@ -606,75 +617,16 @@ PSQLexecWatch(const char *query, const printQueryOpt *opt)
}
SetCancelConn ( pset . db ) ;
if ( pset . timing )
INSTR_TIME_SET_CURRENT ( before ) ;
res = PQexec ( pset . db , query ) ;
res = SendQueryAndProcessResults ( query , & elapsed_msec , true ) ;
ResetCancelConn ( ) ;
if ( ! AcceptResult ( res ) )
{
ClearOrSaveResult ( res ) ;
return 0 ;
}
if ( pset . timing )
{
INSTR_TIME_SET_CURRENT ( after ) ;
INSTR_TIME_SUBTRACT ( after , before ) ;
elapsed_msec = INSTR_TIME_GET_MILLISEC ( after ) ;
}
/*
* If SIGINT is sent while the query is processing , the interrupt will be
* consumed . The user ' s intention , though , is to cancel the entire watch
* process , so detect a sent cancellation request and exit in this case .
*/
if ( cancel_pressed )
{
PQclear ( res ) ;
return 0 ;
}
switch ( PQresultStatus ( res ) )
{
case PGRES_TUPLES_OK :
printQuery ( res , opt , pset . queryFout , false , pset . logfile ) ;
break ;
case PGRES_COMMAND_OK :
fprintf ( pset . queryFout , " %s \n %s \n \n " , opt - > title , PQcmdStatus ( res ) ) ;
break ;
case PGRES_EMPTY_QUERY :
pg_log_error ( " \\ watch cannot be used with an empty query " ) ;
PQclear ( res ) ;
return - 1 ;
case PGRES_COPY_OUT :
case PGRES_COPY_IN :
case PGRES_COPY_BOTH :
pg_log_error ( " \\ watch cannot be used with COPY " ) ;
PQclear ( res ) ;
return - 1 ;
default :
pg_log_error ( " unexpected result status for \\ watch " ) ;
PQclear ( res ) ;
return - 1 ;
}
PQclear ( res ) ;
fflush ( pset . queryFout ) ;
/* Possible microtiming output */
if ( pset . timing )
PrintTiming ( elapsed_msec ) ;
return 1 ;
return res ;
}
@ -887,197 +839,114 @@ loop_exit:
/*
* ProcessResult : utility function for use by SendQuery ( ) only
*
* When our command string contained a COPY FROM STDIN or COPY TO STDOUT ,
* PQexec ( ) has stopped at the PGresult associated with the first such
* command . In that event , we ' ll marshal data for the COPY and then cycle
* through any subsequent PGresult objects .
* Marshal the COPY data . Either subroutine will get the
* connection out of its COPY state , then call PQresultStatus ( )
* once and report any error . Return whether all was ok .
*
* When the command string contained no such COPY command , this function
* degenerates to an AcceptResult ( ) call .
* For COPY OUT , direct the output to pset . copyStream if it ' s set ,
* otherwise to pset . gfname if it ' s set , otherwise to queryFout .
* For COPY IN , use pset . copyStream as data source if it ' s set ,
* otherwise cur_cmd_source .
*
* Changes its argument to point to the last PGresult of the command string ,
* or NULL if that result was for a COPY TO STDOUT . ( Returning NULL prevents
* the command status from being printed , which we want in that case so that
* the status line doesn ' t get taken as part of the COPY data . )
*
* Returns true on complete success , false otherwise . Possible failure modes
* include purely client - side problems ; check the transaction status for the
* server - side opinion .
* Update result if further processing is necessary , or NULL otherwise .
* Return a result when queryFout can safely output a result status :
* on COPY IN , or on COPY OUT if written to something other than pset . queryFout .
* Returning NULL prevents the command status from being printed , which
* we want if the status line doesn ' t get taken as part of the COPY data .
*/
static bool
Process Result( PGresult * * results )
HandleCopyResult ( PGresult * * result )
{
bool success = true ;
bool first_cycle = true ;
bool success ;
FILE * copystream ;
PGresult * copy_result ;
ExecStatusType result_status = PQresultStatus ( * result ) ;
for ( ; ; )
Assert ( result_status = = PGRES_COPY_OUT | |
result_status = = PGRES_COPY_IN ) ;
SetCancelConn ( pset . db ) ;
if ( result_status = = PGRES_COPY_OUT )
{
ExecStatusType result_status ;
bool is_copy ;
PGresult * next_result ;
bool need_close = false ;
bool is_pipe = false ;
if ( ! AcceptResult ( * results ) )
if ( pset . copyStream )
{
/*
* Failure at this point is always a server - side failure or a
* failure to submit the command string . Either way , we ' re
* finished with this command string .
*/
success = false ;
break ;
/* invoked by \copy */
copystream = pset . copyStream ;
}
result_status = PQresultStatus ( * results ) ;
switch ( result_status )
else if ( pset . gfname )
{
case PGRES_EMPTY_QUERY :
case PGRES_COMMAND_OK :
case PGRES_TUPLES_OK :
is_copy = false ;
break ;
/* invoked by \g */
if ( openQueryOutputFile ( pset . gfname ,
& copystream , & is_pipe ) )
{
need_close = true ;
if ( is_pipe )
disable_sigpipe_trap ( ) ;
}
else
copystream = NULL ; /* discard COPY data entirely */
}
else
{
/* fall back to the generic query output stream */
copystream = pset . queryFout ;
}
case PGRES_COPY_OUT :
case PGRES_COPY_IN :
is_copy = true ;
break ;
success = handleCopyOut ( pset . db ,
copystream ,
& copy_result )
& & ( copystream ! = NULL ) ;
default :
/* AcceptResult() should have caught anything else. */
is_copy = false ;
pg_log_error ( " unexpected PQresultStatus: %d " , result_status ) ;
break ;
/*
* Suppress status printing if the report would go to the same
* place as the COPY data just went . Note this doesn ' t
* prevent error reporting , since handleCopyOut did that .
*/
if ( copystream = = pset . queryFout )
{
PQclear ( copy_result ) ;
copy_result = NULL ;
}
if ( is_copy )
if ( need_close )
{
/*
* Marshal the COPY data . Either subroutine will get the
* connection out of its COPY state , then call PQresultStatus ( )
* once and report any error .
*
* For COPY OUT , direct the output to pset . copyStream if it ' s set ,
* otherwise to pset . gfname if it ' s set , otherwise to queryFout .
* For COPY IN , use pset . copyStream as data source if it ' s set ,
* otherwise cur_cmd_source .
*/
FILE * copystream ;
PGresult * copy_result ;
SetCancelConn ( pset . db ) ;
if ( result_status = = PGRES_COPY_OUT )
/* close \g argument file/pipe */
if ( is_pipe )
{
bool need_close = false ;
bool is_pipe = false ;
if ( pset . copyStream )
{
/* invoked by \copy */
copystream = pset . copyStream ;
}
else if ( pset . gfname )
{
/* invoked by \g */
if ( openQueryOutputFile ( pset . gfname ,
& copystream , & is_pipe ) )
{
need_close = true ;
if ( is_pipe )
disable_sigpipe_trap ( ) ;
}
else
copystream = NULL ; /* discard COPY data entirely */
}
else
{
/* fall back to the generic query output stream */
copystream = pset . queryFout ;
}
success = handleCopyOut ( pset . db ,
copystream ,
& copy_result )
& & success
& & ( copystream ! = NULL ) ;
/*
* Suppress status printing if the report would go to the same
* place as the COPY data just went . Note this doesn ' t
* prevent error reporting , since handleCopyOut did that .
*/
if ( copystream = = pset . queryFout )
{
PQclear ( copy_result ) ;
copy_result = NULL ;
}
if ( need_close )
{
/* close \g argument file/pipe */
if ( is_pipe )
{
pclose ( copystream ) ;
restore_sigpipe_trap ( ) ;
}
else
{
fclose ( copystream ) ;
}
}
pclose ( copystream ) ;
restore_sigpipe_trap ( ) ;
}
else
{
/* COPY IN */
copystream = pset . copyStream ? pset . copyStream : pset . cur_cmd_source ;
success = handleCopyIn ( pset . db ,
copystream ,
PQbinaryTuples ( * results ) ,
& copy_result ) & & success ;
fclose ( copystream ) ;
}
ResetCancelConn ( ) ;
/*
* Replace the PGRES_COPY_OUT / IN result with COPY command ' s exit
* status , or with NULL if we want to suppress printing anything .
*/
PQclear ( * results ) ;
* results = copy_result ;
}
else if ( first_cycle )
{
/* fast path: no COPY commands; PQexec visited all results */
break ;
}
/*
* Check PQgetResult ( ) again . In the typical case of a single - command
* string , it will return NULL . Otherwise , we ' ll have other results
* to process that may include other COPYs . We keep the last result .
*/
next_result = PQgetResult ( pset . db ) ;
if ( ! next_result )
break ;
PQclear ( * results ) ;
* results = next_result ;
first_cycle = false ;
}
else
{
/* COPY IN */
copystream = pset . copyStream ? pset . copyStream : pset . cur_cmd_source ;
success = handleCopyIn ( pset . db ,
copystream ,
PQbinaryTuples ( * result ) ,
& copy_result ) ;
}
SetResultVariables ( * results , success ) ;
/* may need this to recover from conn loss during COPY */
if ( ! first_cycle & & ! CheckConnection ( ) )
return false ;
ResetCancelConn ( ) ;
PQclear ( * result ) ;
* result = copy_result ;
return success ;
}
/*
* PrintQueryStatus : report command status as required
*
* Note : Utility function for use by PrintQueryResults ( ) only .
* Note : Utility function for use by HandleQueryResult ( ) only .
*/
static void
PrintQueryStatus ( PGresult * results )
@ -1105,43 +974,50 @@ PrintQueryStatus(PGresult *results)
/*
* PrintQueryResults : print out ( or store or execute ) query results as required
*
* Note : Utility function for use by SendQuery ( ) only .
* HandleQueryResult : print out , store or execute one query result
* as required .
*
* Returns true if the query executed successfully , false otherwise .
*/
static bool
PrintQueryResults ( PGresult * results )
HandleQueryResult ( PGresult * result , bool last )
{
bool success ;
const char * cmdstatus ;
if ( ! results )
if ( result = = NULL )
return false ;
switch ( PQresultStatus ( results ) )
switch ( PQresultStatus ( result ) )
{
case PGRES_TUPLES_OK :
/* store or execute or print the data ... */
if ( pset . gset_prefix )
success = StoreQueryTuple ( results ) ;
else if ( pset . gexec_flag )
success = ExecQueryTuples ( results ) ;
else if ( pset . crosstab_flag )
success = PrintResultsInCrosstab ( results ) ;
if ( last & & pset . gset_prefix )
success = StoreQueryTuple ( result ) ;
else if ( last & & pset . gexec_flag )
success = ExecQueryTuples ( result ) ;
else if ( last & & pset . crosstab_flag )
success = PrintResultsInCrosstab ( result ) ;
else if ( last | | pset . show_all_results )
success = PrintQueryTuples ( result ) ;
else
success = PrintQueryTuples ( results ) ;
success = true ;
/* if it's INSERT/UPDATE/DELETE RETURNING, also print status */
cmdstatus = PQcmdStatus ( results ) ;
if ( strncmp ( cmdstatus , " INSERT " , 6 ) = = 0 | |
strncmp ( cmdstatus , " UPDATE " , 6 ) = = 0 | |
strncmp ( cmdstatus , " DELETE " , 6 ) = = 0 )
PrintQueryStatus ( results ) ;
if ( last | | pset . show_all_results )
{
cmdstatus = PQcmdStatus ( result ) ;
if ( strncmp ( cmdstatus , " INSERT " , 6 ) = = 0 | |
strncmp ( cmdstatus , " UPDATE " , 6 ) = = 0 | |
strncmp ( cmdstatus , " DELETE " , 6 ) = = 0 )
PrintQueryStatus ( result ) ;
}
break ;
case PGRES_COMMAND_OK :
PrintQueryStatus ( results ) ;
if ( last | | pset . show_all_results )
PrintQueryStatus ( result ) ;
success = true ;
break ;
@ -1151,7 +1027,7 @@ PrintQueryResults(PGresult *results)
case PGRES_COPY_OUT :
case PGRES_COPY_IN :
/* nothing to do here */
/* nothing to do here: already processed */
success = true ;
break ;
@ -1164,7 +1040,7 @@ PrintQueryResults(PGresult *results)
default :
success = false ;
pg_log_error ( " unexpected PQresultStatus: %d " ,
PQresultStatus ( results ) ) ;
PQresultStatus ( result ) ) ;
break ;
}
@ -1173,6 +1049,217 @@ PrintQueryResults(PGresult *results)
return success ;
}
/*
* Data structure and functions to record notices while they are
* emitted , so that they can be shown later .
*
* We need to know which result is last , which requires to extract
* one result in advance , hence two buffers are needed .
*/
typedef struct {
bool in_flip ;
PQExpBufferData flip ;
PQExpBufferData flop ;
} t_notice_messages ;
/*
* Store notices in appropriate buffer , for later display .
*/
static void
AppendNoticeMessage ( void * arg , const char * msg )
{
t_notice_messages * notes = ( t_notice_messages * ) arg ;
appendPQExpBufferStr ( notes - > in_flip ? & notes - > flip : & notes - > flop , msg ) ;
}
/*
* Show notices stored in buffer , which is then reset .
*/
static void
ShowNoticeMessage ( t_notice_messages * notes )
{
PQExpBufferData * current = notes - > in_flip ? & notes - > flip : & notes - > flop ;
if ( current - > data ! = NULL & & * current - > data ! = ' \0 ' )
pg_log_info ( " %s " , current - > data ) ;
resetPQExpBuffer ( current ) ;
}
/*
* SendQueryAndProcessResults : utility function for use by SendQuery ( )
* and PSQLexecWatch ( ) .
*
* Sends query and cycles through PGresult objects .
*
* When not under \ watch and if our command string contained a COPY FROM STDIN
* or COPY TO STDOUT , the PGresult associated with these commands must be
* processed by providing an input or output stream . In that event , we ' ll
* marshal data for the COPY .
*
* For other commands , the results are processed normally , depending on their
* status .
*
* Returns 1 on complete success , 0 on interrupt and - 1 or errors . Possible
* failure modes include purely client - side problems ; check the transaction
* status for the server - side opinion .
*
* Note that on a combined query , failure does not mean that nothing was
* committed .
*/
static int
SendQueryAndProcessResults ( const char * query , double * pelapsed_msec , bool is_watch )
{
bool success ;
instr_time before ;
PGresult * result ;
t_notice_messages notes ;
if ( pset . timing )
INSTR_TIME_SET_CURRENT ( before ) ;
success = PQsendQuery ( pset . db , query ) ;
ResetCancelConn ( ) ;
if ( ! success )
{
const char * error = PQerrorMessage ( pset . db ) ;
if ( strlen ( error ) )
pg_log_info ( " %s " , error ) ;
CheckConnection ( ) ;
return - 1 ;
}
/*
* If SIGINT is sent while the query is processing , the interrupt will be
* consumed . The user ' s intention , though , is to cancel the entire watch
* process , so detect a sent cancellation request and exit in this case .
*/
if ( is_watch & & cancel_pressed )
{
ClearOrSaveAllResults ( ) ;
return 0 ;
}
/* intercept notices */
notes . in_flip = true ;
initPQExpBuffer ( & notes . flip ) ;
initPQExpBuffer ( & notes . flop ) ;
PQsetNoticeProcessor ( pset . db , AppendNoticeMessage , & notes ) ;
/* first result */
result = PQgetResult ( pset . db ) ;
while ( result ! = NULL )
{
ExecStatusType result_status ;
PGresult * next_result ;
bool last ;
if ( ! AcceptResult ( result , false ) )
{
/*
* Some error occured , either a server - side failure or
* a failure to submit the command string . Record that .
*/
const char * error = PQerrorMessage ( pset . db ) ;
ShowNoticeMessage ( & notes ) ;
if ( strlen ( error ) )
pg_log_info ( " %s " , error ) ;
CheckConnection ( ) ;
if ( ! is_watch )
SetResultVariables ( result , false ) ;
ClearOrSaveResult ( result ) ;
success = false ;
/* and switch to next result */
result = PQgetResult ( pset . db ) ;
continue ;
}
/* must handle COPY before changing the current result */
result_status = PQresultStatus ( result ) ;
Assert ( result_status ! = PGRES_COPY_BOTH ) ;
if ( result_status = = PGRES_COPY_IN | |
result_status = = PGRES_COPY_OUT )
{
ShowNoticeMessage ( & notes ) ;
if ( is_watch )
{
ClearOrSaveAllResults ( ) ;
pg_log_error ( " \\ watch cannot be used with COPY " ) ;
return - 1 ;
}
/* use normal notice processor during COPY */
PQsetNoticeProcessor ( pset . db , NoticeProcessor , NULL ) ;
success & = HandleCopyResult ( & result ) ;
PQsetNoticeProcessor ( pset . db , AppendNoticeMessage , & notes ) ;
}
/*
* Check PQgetResult ( ) again . In the typical case of a single - command
* string , it will return NULL . Otherwise , we ' ll have other results
* to process .
*/
notes . in_flip = ! notes . in_flip ;
next_result = PQgetResult ( pset . db ) ;
notes . in_flip = ! notes . in_flip ;
last = ( next_result = = NULL ) ;
/*
* Get timing measure before printing the last result .
*
* It will include the display of previous results , if any .
* This cannot be helped because the server goes on processing
* further queries anyway while the previous ones are being displayed .
* The parallel execution of the client display hides the server time
* when it is shorter .
*
* With combined queries , timing must be understood as an upper bound
* of the time spent processing them .
*/
if ( last & & pset . timing )
{
instr_time now ;
INSTR_TIME_SET_CURRENT ( now ) ;
INSTR_TIME_SUBTRACT ( now , before ) ;
* pelapsed_msec = INSTR_TIME_GET_MILLISEC ( now ) ;
}
/* notices already shown above for copy */
ShowNoticeMessage ( & notes ) ;
/* this may or may not print something depending on settings */
if ( result ! = NULL )
success & = HandleQueryResult ( result , last ) ;
/* set variables on last result if all went well */
if ( ! is_watch & & last & & success )
SetResultVariables ( result , true ) ;
ClearOrSaveResult ( result ) ;
notes . in_flip = ! notes . in_flip ;
result = next_result ;
}
/* reset notice hook */
PQsetNoticeProcessor ( pset . db , NoticeProcessor , NULL ) ;
termPQExpBuffer ( & notes . flip ) ;
termPQExpBuffer ( & notes . flop ) ;
/* may need this to recover from conn loss during COPY */
if ( ! CheckConnection ( ) )
return - 1 ;
return success ? 1 : - 1 ;
}
/*
* SendQuery : send the query string to the backend
@ -1294,28 +1381,9 @@ SendQuery(const char *query)
pset . crosstab_flag | | ! is_select_command ( query ) )
{
/* Default fetch-it-all-and-print mode */
instr_time before ,
after ;
if ( pset . timing )
INSTR_TIME_SET_CURRENT ( before ) ;
results = PQexec ( pset . db , query ) ;
/* these operations are included in the timing result: */
ResetCancelConn ( ) ;
OK = ProcessResult ( & results ) ;
if ( pset . timing )
{
INSTR_TIME_SET_CURRENT ( after ) ;
INSTR_TIME_SUBTRACT ( after , before ) ;
elapsed_msec = INSTR_TIME_GET_MILLISEC ( after ) ;
}
/* but printing results isn't: */
if ( OK & & results )
OK = PrintQueryResults ( results ) ;
int res = SendQueryAndProcessResults ( query , & elapsed_msec , false ) ;
OK = ( res > = 0 ) ;
results = NULL ;
}
else
{
@ -1497,7 +1565,7 @@ DescribeQuery(const char *query, double *elapsed_msec)
PQclear ( results ) ;
results = PQdescribePrepared ( pset . db , " " ) ;
OK = AcceptResult ( results ) & &
OK = AcceptResult ( results , true ) & &
( PQresultStatus ( results ) = = PGRES_COMMAND_OK ) ;
if ( OK & & results )
{
@ -1545,7 +1613,7 @@ DescribeQuery(const char *query, double *elapsed_msec)
PQclear ( results ) ;
results = PQexec ( pset . db , buf . data ) ;
OK = AcceptResult ( results ) ;
OK = AcceptResult ( results , true ) ;
if ( pset . timing )
{
@ -1555,7 +1623,7 @@ DescribeQuery(const char *query, double *elapsed_msec)
}
if ( OK & & results )
OK = PrintQueryResults ( results ) ;
OK = HandleQueryResult ( results , true ) ;
termPQExpBuffer ( & buf ) ;
}
@ -1614,7 +1682,7 @@ ExecQueryUsingCursor(const char *query, double *elapsed_msec)
if ( PQtransactionStatus ( pset . db ) = = PQTRANS_IDLE )
{
results = PQexec ( pset . db , " BEGIN " ) ;
OK = AcceptResult ( results ) & &
OK = AcceptResult ( results , true ) & &
( PQresultStatus ( results ) = = PGRES_COMMAND_OK ) ;
ClearOrSaveResult ( results ) ;
if ( ! OK )
@ -1628,7 +1696,7 @@ ExecQueryUsingCursor(const char *query, double *elapsed_msec)
query ) ;
results = PQexec ( pset . db , buf . data ) ;
OK = AcceptResult ( results ) & &
OK = AcceptResult ( results , true ) & &
( PQresultStatus ( results ) = = PGRES_COMMAND_OK ) ;
if ( ! OK )
SetResultVariables ( results , OK ) ;
@ -1701,7 +1769,7 @@ ExecQueryUsingCursor(const char *query, double *elapsed_msec)
is_pager = false ;
}
OK = AcceptResult ( results ) ;
OK = AcceptResult ( results , true ) ;
Assert ( ! OK ) ;
SetResultVariables ( results , OK ) ;
ClearOrSaveResult ( results ) ;
@ -1810,7 +1878,7 @@ cleanup:
results = PQexec ( pset . db , " CLOSE _psql_cursor " ) ;
if ( OK )
{
OK = AcceptResult ( results ) & &
OK = AcceptResult ( results , true ) & &
( PQresultStatus ( results ) = = PGRES_COMMAND_OK ) ;
ClearOrSaveResult ( results ) ;
}
@ -1820,7 +1888,7 @@ cleanup:
if ( started_txn )
{
results = PQexec ( pset . db , OK ? " COMMIT " : " ROLLBACK " ) ;
OK & = AcceptResult ( results ) & &
OK & = AcceptResult ( results , true ) & &
( PQresultStatus ( results ) = = PGRES_COMMAND_OK ) ;
ClearOrSaveResult ( results ) ;
}