@ -39,7 +39,9 @@ char *const pgresStatus[] = {
" PGRES_NONFATAL_ERROR " ,
" PGRES_FATAL_ERROR " ,
" PGRES_COPY_BOTH " ,
" PGRES_SINGLE_TUPLE "
" PGRES_SINGLE_TUPLE " ,
" PGRES_PIPELINE_SYNC " ,
" PGRES_PIPELINE_ABORTED "
} ;
/*
@ -71,6 +73,8 @@ static PGresult *PQexecFinish(PGconn *conn);
static int PQsendDescribe ( PGconn * conn , char desc_type ,
const char * desc_target ) ;
static int check_field_number ( const PGresult * res , int field_num ) ;
static void pqPipelineProcessQueue ( PGconn * conn ) ;
static int pqPipelineFlush ( PGconn * conn ) ;
/* ----------------
@ -1171,7 +1175,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
conn - > next_result = conn - > result ;
conn - > result = res ;
/* And mark the result ready to return */
conn - > asyncStatus = PGASYNC_READY ;
conn - > asyncStatus = PGASYNC_READY_MORE ;
}
return 1 ;
@ -1184,6 +1188,87 @@ fail:
}
/*
* pqAllocCmdQueueEntry
* Get a command queue entry for caller to fill .
*
* If the recycle queue has a free element , that is returned ; if not , a
* fresh one is allocated . Caller is responsible for adding it to the
* command queue ( pqAppendCmdQueueEntry ) once the struct is filled in , or
* releasing the memory ( pqRecycleCmdQueueEntry ) if an error occurs .
*
* If allocation fails , sets the error message and returns NULL .
*/
static PGcmdQueueEntry *
pqAllocCmdQueueEntry ( PGconn * conn )
{
PGcmdQueueEntry * entry ;
if ( conn - > cmd_queue_recycle = = NULL )
{
entry = ( PGcmdQueueEntry * ) malloc ( sizeof ( PGcmdQueueEntry ) ) ;
if ( entry = = NULL )
{
appendPQExpBufferStr ( & conn - > errorMessage ,
libpq_gettext ( " out of memory \n " ) ) ;
return NULL ;
}
}
else
{
entry = conn - > cmd_queue_recycle ;
conn - > cmd_queue_recycle = entry - > next ;
}
entry - > next = NULL ;
entry - > query = NULL ;
return entry ;
}
/*
* pqAppendCmdQueueEntry
* Append a caller - allocated command queue entry to the queue .
*
* The query itself must already have been put in the output buffer by the
* caller .
*/
static void
pqAppendCmdQueueEntry ( PGconn * conn , PGcmdQueueEntry * entry )
{
Assert ( entry - > next = = NULL ) ;
if ( conn - > cmd_queue_head = = NULL )
conn - > cmd_queue_head = entry ;
else
conn - > cmd_queue_tail - > next = entry ;
conn - > cmd_queue_tail = entry ;
}
/*
* pqRecycleCmdQueueEntry
* Push a command queue entry onto the freelist .
*/
static void
pqRecycleCmdQueueEntry ( PGconn * conn , PGcmdQueueEntry * entry )
{
if ( entry = = NULL )
return ;
/* recyclable entries should not have a follow-on command */
Assert ( entry - > next = = NULL ) ;
if ( entry - > query )
{
free ( entry - > query ) ;
entry - > query = NULL ;
}
entry - > next = conn - > cmd_queue_recycle ;
conn - > cmd_queue_recycle = entry ;
}
/*
* PQsendQuery
* Submit a query , but don ' t wait for it to finish
@ -1209,9 +1294,15 @@ PQsendQueryContinue(PGconn *conn, const char *query)
static int
PQsendQueryInternal ( PGconn * conn , const char * query , bool newQuery )
{
PGcmdQueueEntry * entry = NULL ;
if ( ! PQsendQueryStart ( conn , newQuery ) )
return 0 ;
entry = pqAllocCmdQueueEntry ( conn ) ;
if ( entry = = NULL )
return 0 ; /* error msg already set */
/* check the argument */
if ( ! query )
{
@ -1220,37 +1311,75 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
return 0 ;
}
/* construct the outgoing Query message */
if ( pqPutMsgStart ( ' Q ' , conn ) < 0 | |
pqPuts ( query , conn ) < 0 | |
pqPutMsgEnd ( conn ) < 0 )
/* Send the query message(s) */
if ( conn - > pipelineStatus = = PQ_PIPELINE_OFF )
{
/* error message should be set up already */
return 0 ;
}
/* construct the outgoing Query message */
if ( pqPutMsgStart ( ' Q ' , conn ) < 0 | |
pqPuts ( query , conn ) < 0 | |
pqPutMsgEnd ( conn ) < 0 )
{
/* error message should be set up already */
return 0 ;
}
/* remember we are using simple query protocol */
conn - > queryclass = PGQUERY_SIMPLE ;
/* remember we are using simple query protocol */
entry - > queryclass = PGQUERY_SIMPLE ;
/* and remember the query text too, if possible */
entry - > query = strdup ( query ) ;
}
else
{
/*
* In pipeline mode we cannot use the simple protocol , so we send
* Parse , Bind , Describe Portal , Execute .
*/
if ( pqPutMsgStart ( ' P ' , conn ) < 0 | |
pqPuts ( " " , conn ) < 0 | |
pqPuts ( query , conn ) < 0 | |
pqPutInt ( 0 , 2 , conn ) < 0 | |
pqPutMsgEnd ( conn ) < 0 )
goto sendFailed ;
if ( pqPutMsgStart ( ' B ' , conn ) < 0 | |
pqPuts ( " " , conn ) < 0 | |
pqPuts ( " " , conn ) < 0 | |
pqPutInt ( 0 , 2 , conn ) < 0 | |
pqPutInt ( 0 , 2 , conn ) < 0 | |
pqPutInt ( 0 , 2 , conn ) < 0 | |
pqPutMsgEnd ( conn ) < 0 )
goto sendFailed ;
if ( pqPutMsgStart ( ' D ' , conn ) < 0 | |
pqPutc ( ' P ' , conn ) < 0 | |
pqPuts ( " " , conn ) < 0 | |
pqPutMsgEnd ( conn ) < 0 )
goto sendFailed ;
if ( pqPutMsgStart ( ' E ' , conn ) < 0 | |
pqPuts ( " " , conn ) < 0 | |
pqPutInt ( 0 , 4 , conn ) < 0 | |
pqPutMsgEnd ( conn ) < 0 )
goto sendFailed ;
/* and remember the query text too, if possible */
/* if insufficient memory, last_query just winds up NULL */
if ( conn - > last_query )
free ( conn - > last_query ) ;
conn - > last_query = strdup ( query ) ;
entry - > queryclass = PGQUERY_EXTENDED ;
entry - > query = strdup ( query ) ;
}
/*
* Give the data a push . In nonblock mode , don ' t complain if we ' re unable
* to send it all ; PQgetResult ( ) will do any additional flushing needed .
*/
if ( pqFlush ( conn ) < 0 )
{
/* error message should be set up already */
return 0 ;
}
if ( pqPipelineFlush ( conn ) < 0 )
goto sendFailed ;
/* OK, it's launched! */
conn - > asyncStatus = PGASYNC_BUSY ;
pqAppendCmdQueueEntry ( conn , entry ) ;
if ( conn - > pipelineStatus = = PQ_PIPELINE_OFF )
conn - > asyncStatus = PGASYNC_BUSY ;
return 1 ;
sendFailed :
pqRecycleCmdQueueEntry ( conn , entry ) ;
/* error message should be set up already */
return 0 ;
}
/*
@ -1307,6 +1436,8 @@ PQsendPrepare(PGconn *conn,
const char * stmtName , const char * query ,
int nParams , const Oid * paramTypes )
{
PGcmdQueueEntry * entry = NULL ;
if ( ! PQsendQueryStart ( conn , true ) )
return 0 ;
@ -1330,6 +1461,10 @@ PQsendPrepare(PGconn *conn,
return 0 ;
}
entry = pqAllocCmdQueueEntry ( conn ) ;
if ( entry = = NULL )
return 0 ; /* error msg already set */
/* construct the Parse message */
if ( pqPutMsgStart ( ' P ' , conn ) < 0 | |
pqPuts ( stmtName , conn ) < 0 | |
@ -1356,32 +1491,38 @@ PQsendPrepare(PGconn *conn,
if ( pqPutMsgEnd ( conn ) < 0 )
goto sendFailed ;
/* construct the Sync message */
if ( pqPutMsgStart ( ' S ' , conn ) < 0 | |
pqPutMsgEnd ( conn ) < 0 )
goto sendFailed ;
/* Add a Sync, unless in pipeline mode. */
if ( conn - > pipelineStatus = = PQ_PIPELINE_OFF )
{
if ( pqPutMsgStart ( ' S ' , conn ) < 0 | |
pqPutMsgEnd ( conn ) < 0 )
goto sendFailed ;
}
/* remember we are doing just a Parse */
conn - > queryclass = PGQUERY_PREPARE ;
entry - > queryclass = PGQUERY_PREPARE ;
/* and remember the query text too, if possible */
/* if insufficient memory, last_query just winds up NULL */
if ( conn - > last_query )
free ( conn - > last_query ) ;
conn - > last_query = strdup ( query ) ;
/* if insufficient memory, query just winds up NULL */
entry - > query = strdup ( query ) ;
pqAppendCmdQueueEntry ( conn , entry ) ;
if ( conn - > pipelineStatus = = PQ_PIPELINE_OFF )
conn - > asyncStatus = PGASYNC_BUSY ;
/*
* Give the data a push . In nonblock mode , don ' t complain if we ' re unable
* to send it all ; PQgetResult ( ) will do any additional flushing needed .
* Give the data a push ( in pipeline mode , only if we ' re past the size
* threshold ) . In nonblock mode , don ' t complain if we ' re unable to send
* it all ; PQgetResult ( ) will do any additional flushing needed .
*/
if ( pqFlush ( conn ) < 0 )
if ( pqPipeline Flush ( conn ) < 0 )
goto sendFailed ;
/* OK, it's launched! */
conn - > asyncStatus = PGASYNC_BUSY ;
return 1 ;
sendFailed :
pqRecycleCmdQueueEntry ( conn , entry ) ;
/* error message should be set up already */
return 0 ;
}
@ -1429,7 +1570,8 @@ PQsendQueryPrepared(PGconn *conn,
}
/*
* Common startup code for PQsendQuery and sibling routines
* PQsendQueryStart
* Common startup code for PQsendQuery and sibling routines
*/
static bool
PQsendQueryStart ( PGconn * conn , bool newQuery )
@ -1450,20 +1592,57 @@ PQsendQueryStart(PGconn *conn, bool newQuery)
libpq_gettext ( " no connection to the server \n " ) ) ;
return false ;
}
/* Can't send while already busy, either. */
if ( conn - > asyncStatus ! = PGASYNC_IDLE )
/* Can't send while already busy, either, unless enqueuing for later */
if ( conn - > asyncStatus ! = PGASYNC_IDLE & &
conn - > pipelineStatus = = PQ_PIPELINE_OFF )
{
appendPQExpBufferStr ( & conn - > errorMessage ,
libpq_gettext ( " another command is already in progress \n " ) ) ;
return false ;
}
/* initialize async result-accumulation state */
pqClearAsyncResult ( conn ) ;
if ( conn - > pipelineStatus ! = PQ_PIPELINE_OFF )
{
/*
* When enqueuing commands we don ' t change much of the connection
* state since it ' s already in use for the current command . The
* connection state will get updated when pqPipelineProcessQueue ( )
* advances to start processing the queued message .
*
* Just make sure we can safely enqueue given the current connection
* state . We can enqueue behind another queue item , or behind a
* non - queue command ( one that sends its own sync ) , but we can ' t
* enqueue if the connection is in a copy state .
*/
switch ( conn - > asyncStatus )
{
case PGASYNC_IDLE :
case PGASYNC_READY :
case PGASYNC_READY_MORE :
case PGASYNC_BUSY :
/* ok to queue */
break ;
case PGASYNC_COPY_IN :
case PGASYNC_COPY_OUT :
case PGASYNC_COPY_BOTH :
appendPQExpBufferStr ( & conn - > errorMessage ,
libpq_gettext ( " cannot queue commands during COPY \n " ) ) ;
return false ;
}
}
else
{
/*
* This command ' s results will come in immediately . Initialize async
* result - accumulation state
*/
pqClearAsyncResult ( conn ) ;
/* reset single-row processing mode */
conn - > singleRowMode = false ;
/* reset single-row processing mode */
conn - > singleRowMode = false ;
}
/* ready to send command message */
return true ;
}
@ -1487,10 +1666,16 @@ PQsendQueryGuts(PGconn *conn,
int resultFormat )
{
int i ;
PGcmdQueueEntry * entry ;
entry = pqAllocCmdQueueEntry ( conn ) ;
if ( entry = = NULL )
return 0 ; /* error msg already set */
/*
* We will send Parse ( if needed ) , Bind , Describe Portal , Execute , Sync ,
* using specified statement name and the unnamed portal .
* We will send Parse ( if needed ) , Bind , Describe Portal , Execute , Sync
* ( if not in pipeline mode ) , using specified statement name and the
* unnamed portal .
*/
if ( command )
@ -1600,35 +1785,38 @@ PQsendQueryGuts(PGconn *conn,
pqPutMsgEnd ( conn ) < 0 )
goto sendFailed ;
/* construct the Sync message */
if ( pqPutMsgStart ( ' S ' , conn ) < 0 | |
pqPutMsgEnd ( conn ) < 0 )
goto sendFailed ;
/* construct the Sync message if not in pipeline mode */
if ( conn - > pipelineStatus = = PQ_PIPELINE_OFF )
{
if ( pqPutMsgStart ( ' S ' , conn ) < 0 | |
pqPutMsgEnd ( conn ) < 0 )
goto sendFailed ;
}
/* remember we are using extended query protocol */
conn - > queryclass = PGQUERY_EXTENDED ;
entry - > queryclass = PGQUERY_EXTENDED ;
/* and remember the query text too, if possible */
/* if insufficient memory, last_query just winds up NULL */
if ( conn - > last_query )
free ( conn - > last_query ) ;
/* if insufficient memory, query just winds up NULL */
if ( command )
conn - > last_query = strdup ( command ) ;
else
conn - > last_query = NULL ;
entry - > query = strdup ( command ) ;
/*
* Give the data a push . In nonblock mode , don ' t complain if we ' re unable
* to send it all ; PQgetResult ( ) will do any additional flushing needed .
* Give the data a push ( in pipeline mode , only if we ' re past the size
* threshold ) . In nonblock mode , don ' t complain if we ' re unable to send
* it all ; PQgetResult ( ) will do any additional flushing needed .
*/
if ( pqFlush ( conn ) < 0 )
if ( pqPipeline Flush ( conn ) < 0 )
goto sendFailed ;
/* OK, it's launched! */
conn - > asyncStatus = PGASYNC_BUSY ;
pqAppendCmdQueueEntry ( conn , entry ) ;
if ( conn - > pipelineStatus = = PQ_PIPELINE_OFF )
conn - > asyncStatus = PGASYNC_BUSY ;
return 1 ;
sendFailed :
pqRecycleCmdQueueEntry ( conn , entry ) ;
/* error message should be set up already */
return 0 ;
}
@ -1647,8 +1835,9 @@ PQsetSingleRowMode(PGconn *conn)
return 0 ;
if ( conn - > asyncStatus ! = PGASYNC_BUSY )
return 0 ;
if ( conn - > queryclass ! = PGQUERY_SIMPLE & &
conn - > queryclass ! = PGQUERY_EXTENDED )
if ( ! conn - > cmd_queue_head | |
( conn - > cmd_queue_head - > queryclass ! = PGQUERY_SIMPLE & &
conn - > cmd_queue_head - > queryclass ! = PGQUERY_EXTENDED ) )
return 0 ;
if ( conn - > result )
return 0 ;
@ -1726,14 +1915,17 @@ PQisBusy(PGconn *conn)
return conn - > asyncStatus = = PGASYNC_BUSY | | conn - > write_failed ;
}
/*
* PQgetResult
* Get the next PGresult produced by a query . Returns NULL if no
* query work remains or an error has occurred ( e . g . out of
* memory ) .
*
* In pipeline mode , once all the result of a query have been returned ,
* PQgetResult returns NULL to let the user know that the next
* query is being processed . At the end of the pipeline , returns a
* result with PQresultStatus ( result ) = = PGRES_PIPELINE_SYNC .
*/
PGresult *
PQgetResult ( PGconn * conn )
{
@ -1803,8 +1995,62 @@ PQgetResult(PGconn *conn)
{
case PGASYNC_IDLE :
res = NULL ; /* query is complete */
if ( conn - > pipelineStatus ! = PQ_PIPELINE_OFF )
{
/*
* We ' re about to return the NULL that terminates the round of
* results from the current query ; prepare to send the results
* of the next query when we ' re called next . Also , since this
* is the start of the results of the next query , clear any
* prior error message .
*/
resetPQExpBuffer ( & conn - > errorMessage ) ;
pqPipelineProcessQueue ( conn ) ;
}
break ;
case PGASYNC_READY :
/*
* For any query type other than simple query protocol , we advance
* the command queue here . This is because for simple query
* protocol we can get the READY state multiple times before the
* command is actually complete , since the command string can
* contain many queries . In simple query protocol , the queue
* advance is done by fe - protocol3 when it receives ReadyForQuery .
*/
if ( conn - > cmd_queue_head & &
conn - > cmd_queue_head - > queryclass ! = PGQUERY_SIMPLE )
pqCommandQueueAdvance ( conn ) ;
res = pqPrepareAsyncResult ( conn ) ;
if ( conn - > pipelineStatus ! = PQ_PIPELINE_OFF )
{
/*
* We ' re about to send the results of the current query . Set
* us idle now , and . . .
*/
conn - > asyncStatus = PGASYNC_IDLE ;
/*
* . . . in cases when we ' re sending a pipeline - sync result ,
* move queue processing forwards immediately , so that next
* time we ' re called , we ' re prepared to return the next result
* received from the server . In all other cases , leave the
* queue state change for next time , so that a terminating
* NULL result is sent .
*
* ( In other words : we don ' t return a NULL after a pipeline
* sync . )
*/
if ( res & & res - > resultStatus = = PGRES_PIPELINE_SYNC )
pqPipelineProcessQueue ( conn ) ;
}
else
{
/* Set the state back to BUSY, allowing parsing to proceed. */
conn - > asyncStatus = PGASYNC_BUSY ;
}
break ;
case PGASYNC_READY_MORE :
res = pqPrepareAsyncResult ( conn ) ;
/* Set the state back to BUSY, allowing parsing to proceed. */
conn - > asyncStatus = PGASYNC_BUSY ;
@ -1985,6 +2231,13 @@ PQexecStart(PGconn *conn)
if ( ! conn )
return false ;
if ( conn - > pipelineStatus ! = PQ_PIPELINE_OFF )
{
appendPQExpBufferStr ( & conn - > errorMessage ,
libpq_gettext ( " synchronous command execution functions are not allowed in pipeline mode \n " ) ) ;
return false ;
}
/*
* Since this is the beginning of a query cycle , reset the error buffer .
*/
@ -2148,6 +2401,8 @@ PQsendDescribePortal(PGconn *conn, const char *portal)
static int
PQsendDescribe ( PGconn * conn , char desc_type , const char * desc_target )
{
PGcmdQueueEntry * entry = NULL ;
/* Treat null desc_target as empty string */
if ( ! desc_target )
desc_target = " " ;
@ -2155,6 +2410,10 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
if ( ! PQsendQueryStart ( conn , true ) )
return 0 ;
entry = pqAllocCmdQueueEntry ( conn ) ;
if ( entry = = NULL )
return 0 ; /* error msg already set */
/* construct the Describe message */
if ( pqPutMsgStart ( ' D ' , conn ) < 0 | |
pqPutc ( desc_type , conn ) < 0 | |
@ -2163,32 +2422,32 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
goto sendFailed ;
/* construct the Sync message */
if ( pqPutMsgStart ( ' S ' , conn ) < 0 | |
pqPutMsgEnd ( conn ) < 0 )
goto sendFailed ;
/* remember we are doing a Describe */
conn - > queryclass = PGQUERY_DESCRIBE ;
/* reset last_query string (not relevant now) */
if ( conn - > last_query )
if ( conn - > pipelineStatus = = PQ_PIPELINE_OFF )
{
free ( conn - > last_query ) ;
conn - > last_query = NULL ;
if ( pqPutMsgStart ( ' S ' , conn ) < 0 | |
pqPutMsgEnd ( conn ) < 0 )
goto sendFailed ;
}
/* remember we are doing a Describe */
entry - > queryclass = PGQUERY_DESCRIBE ;
/*
* Give the data a push . In nonblock mode , don ' t complain if we ' re unable
* to send it all ; PQgetResult ( ) will do any additional flushing needed .
* Give the data a push ( in pipeline mode , only if we ' re past the size
* threshold ) . In nonblock mode , don ' t complain if we ' re unable to send
* it all ; PQgetResult ( ) will do any additional flushing needed .
*/
if ( pqFlush ( conn ) < 0 )
if ( pqPipeline Flush ( conn ) < 0 )
goto sendFailed ;
/* OK, it's launched! */
conn - > asyncStatus = PGASYNC_BUSY ;
pqAppendCmdQueueEntry ( conn , entry ) ;
if ( conn - > pipelineStatus = = PQ_PIPELINE_OFF )
conn - > asyncStatus = PGASYNC_BUSY ;
return 1 ;
sendFailed :
pqRecycleCmdQueueEntry ( conn , entry ) ;
/* error message should be set up already */
return 0 ;
}
@ -2327,7 +2586,8 @@ PQputCopyEnd(PGconn *conn, const char *errormsg)
* If we sent the COPY command in extended - query mode , we must issue a
* Sync as well .
*/
if ( conn - > queryclass ! = PGQUERY_SIMPLE )
if ( conn - > cmd_queue_head & &
conn - > cmd_queue_head - > queryclass ! = PGQUERY_SIMPLE )
{
if ( pqPutMsgStart ( ' S ' , conn ) < 0 | |
pqPutMsgEnd ( conn ) < 0 )
@ -2541,6 +2801,13 @@ PQfn(PGconn *conn,
*/
resetPQExpBuffer ( & conn - > errorMessage ) ;
if ( conn - > pipelineStatus ! = PQ_PIPELINE_OFF )
{
appendPQExpBufferStr ( & conn - > errorMessage ,
libpq_gettext ( " PQfn not allowed in pipeline mode \n " ) ) ;
return NULL ;
}
if ( conn - > sock = = PGINVALID_SOCKET | | conn - > asyncStatus ! = PGASYNC_IDLE | |
conn - > result ! = NULL )
{
@ -2555,6 +2822,277 @@ PQfn(PGconn *conn,
args , nargs ) ;
}
/* ====== Pipeline mode support ======== */
/*
* PQenterPipelineMode
* Put an idle connection in pipeline mode .
*
* Returns 1 on success . On failure , errorMessage is set and 0 is returned .
*
* Commands submitted after this can be pipelined on the connection ;
* there ' s no requirement to wait for one to finish before the next is
* dispatched .
*
* Queuing of a new query or syncing during COPY is not allowed .
*
* A set of commands is terminated by a PQpipelineSync . Multiple sync
* points can be established while in pipeline mode . Pipeline mode can
* be exited by calling PQexitPipelineMode ( ) once all results are processed .
*
* This doesn ' t actually send anything on the wire , it just puts libpq
* into a state where it can pipeline work .
*/
int
PQenterPipelineMode ( PGconn * conn )
{
if ( ! conn )
return 0 ;
/* succeed with no action if already in pipeline mode */
if ( conn - > pipelineStatus ! = PQ_PIPELINE_OFF )
return 1 ;
if ( conn - > asyncStatus ! = PGASYNC_IDLE )
{
appendPQExpBufferStr ( & conn - > errorMessage ,
libpq_gettext ( " cannot enter pipeline mode, connection not idle \n " ) ) ;
return 0 ;
}
conn - > pipelineStatus = PQ_PIPELINE_ON ;
return 1 ;
}
/*
* PQexitPipelineMode
* End pipeline mode and return to normal command mode .
*
* Returns 1 in success ( pipeline mode successfully ended , or not in pipeline
* mode ) .
*
* Returns 0 if in pipeline mode and cannot be ended yet . Error message will
* be set .
*/
int
PQexitPipelineMode ( PGconn * conn )
{
if ( ! conn )
return 0 ;
if ( conn - > pipelineStatus = = PQ_PIPELINE_OFF )
return 1 ;
switch ( conn - > asyncStatus )
{
case PGASYNC_READY :
case PGASYNC_READY_MORE :
/* there are some uncollected results */
appendPQExpBufferStr ( & conn - > errorMessage ,
libpq_gettext ( " cannot exit pipeline mode with uncollected results \n " ) ) ;
return 0 ;
case PGASYNC_BUSY :
appendPQExpBufferStr ( & conn - > errorMessage ,
libpq_gettext ( " cannot exit pipeline mode while busy \n " ) ) ;
return 0 ;
default :
/* OK */
break ;
}
/* still work to process */
if ( conn - > cmd_queue_head ! = NULL )
{
appendPQExpBufferStr ( & conn - > errorMessage ,
libpq_gettext ( " cannot exit pipeline mode with uncollected results \n " ) ) ;
return 0 ;
}
conn - > pipelineStatus = PQ_PIPELINE_OFF ;
conn - > asyncStatus = PGASYNC_IDLE ;
/* Flush any pending data in out buffer */
if ( pqFlush ( conn ) < 0 )
return 0 ; /* error message is setup already */
return 1 ;
}
/*
* pqCommandQueueAdvance
* Remove one query from the command queue , when we receive
* all results from the server that pertain to it .
*/
void
pqCommandQueueAdvance ( PGconn * conn )
{
PGcmdQueueEntry * prevquery ;
if ( conn - > cmd_queue_head = = NULL )
return ;
/* delink from queue */
prevquery = conn - > cmd_queue_head ;
conn - > cmd_queue_head = conn - > cmd_queue_head - > next ;
/* and make it recyclable */
prevquery - > next = NULL ;
pqRecycleCmdQueueEntry ( conn , prevquery ) ;
}
/*
* pqPipelineProcessQueue : subroutine for PQgetResult
* In pipeline mode , start processing the results of the next query in the queue .
*/
void
pqPipelineProcessQueue ( PGconn * conn )
{
switch ( conn - > asyncStatus )
{
case PGASYNC_COPY_IN :
case PGASYNC_COPY_OUT :
case PGASYNC_COPY_BOTH :
case PGASYNC_READY :
case PGASYNC_READY_MORE :
case PGASYNC_BUSY :
/* client still has to process current query or results */
return ;
case PGASYNC_IDLE :
/* next query please */
break ;
}
/* Nothing to do if not in pipeline mode, or queue is empty */
if ( conn - > pipelineStatus = = PQ_PIPELINE_OFF | |
conn - > cmd_queue_head = = NULL )
return ;
/* Initialize async result-accumulation state */
pqClearAsyncResult ( conn ) ;
/*
* Reset single - row processing mode . ( Client has to set it up for each
* query , if desired . )
*/
conn - > singleRowMode = false ;
if ( conn - > pipelineStatus = = PQ_PIPELINE_ABORTED & &
conn - > cmd_queue_head - > queryclass ! = PGQUERY_SYNC )
{
/*
* In an aborted pipeline we don ' t get anything from the server for
* each result ; we ' re just discarding commands from the queue until we
* get to the next sync from the server .
*
* The PGRES_PIPELINE_ABORTED results tell the client that its queries
* got aborted .
*/
conn - > result = PQmakeEmptyPGresult ( conn , PGRES_PIPELINE_ABORTED ) ;
if ( ! conn - > result )
{
appendPQExpBufferStr ( & conn - > errorMessage ,
libpq_gettext ( " out of memory \n " ) ) ;
pqSaveErrorResult ( conn ) ;
return ;
}
conn - > asyncStatus = PGASYNC_READY ;
}
else
{
/* allow parsing to continue */
conn - > asyncStatus = PGASYNC_BUSY ;
}
}
/*
* PQpipelineSync
* Send a Sync message as part of a pipeline , and flush to server
*
* It ' s legal to start submitting more commands in the pipeline immediately ,
* without waiting for the results of the current pipeline . There ' s no need to
* end pipeline mode and start it again .
*
* If a command in a pipeline fails , every subsequent command up to and including
* the result to the Sync message sent by PQpipelineSync gets set to
* PGRES_PIPELINE_ABORTED state . If the whole pipeline is processed without
* error , a PGresult with PGRES_PIPELINE_SYNC is produced .
*
* Queries can already have been sent before PQpipelineSync is called , but
* PQpipelineSync need to be called before retrieving command results .
*
* The connection will remain in pipeline mode and unavailable for new
* synchronous command execution functions until all results from the pipeline
* are processed by the client .
*/
int
PQpipelineSync ( PGconn * conn )
{
PGcmdQueueEntry * entry ;
if ( ! conn )
return 0 ;
if ( conn - > pipelineStatus = = PQ_PIPELINE_OFF )
{
appendPQExpBufferStr ( & conn - > errorMessage ,
libpq_gettext ( " cannot send pipeline when not in pipeline mode \n " ) ) ;
return 0 ;
}
switch ( conn - > asyncStatus )
{
case PGASYNC_COPY_IN :
case PGASYNC_COPY_OUT :
case PGASYNC_COPY_BOTH :
/* should be unreachable */
appendPQExpBufferStr ( & conn - > errorMessage ,
" internal error: cannot send pipeline while in COPY \n " ) ;
return 0 ;
case PGASYNC_READY :
case PGASYNC_READY_MORE :
case PGASYNC_BUSY :
case PGASYNC_IDLE :
/* OK to send sync */
break ;
}
entry = pqAllocCmdQueueEntry ( conn ) ;
if ( entry = = NULL )
return 0 ; /* error msg already set */
entry - > queryclass = PGQUERY_SYNC ;
entry - > query = NULL ;
/* construct the Sync message */
if ( pqPutMsgStart ( ' S ' , conn ) < 0 | |
pqPutMsgEnd ( conn ) < 0 )
goto sendFailed ;
pqAppendCmdQueueEntry ( conn , entry ) ;
/*
* Give the data a push . In nonblock mode , don ' t complain if we ' re unable
* to send it all ; PQgetResult ( ) will do any additional flushing needed .
*/
if ( PQflush ( conn ) < 0 )
goto sendFailed ;
/*
* Call pqPipelineProcessQueue so the user can call start calling
* PQgetResult .
*/
pqPipelineProcessQueue ( conn ) ;
return 1 ;
sendFailed :
pqRecycleCmdQueueEntry ( conn , entry ) ;
/* error message should be set up already */
return 0 ;
}
/* ====== accessor funcs for PGresult ======== */
@ -2569,7 +3107,7 @@ PQresultStatus(const PGresult *res)
char *
PQresStatus ( ExecStatusType status )
{
if ( ( unsigned int ) status > = sizeof pgresStatus / sizeof pgresStatus [ 0 ] )
if ( ( unsigned int ) status > = lengthof ( pgresStatus ) )
return libpq_gettext ( " invalid ExecStatusType code " ) ;
return pgresStatus [ status ] ;
}
@ -3152,6 +3690,23 @@ PQflush(PGconn *conn)
return pqFlush ( conn ) ;
}
/*
* pqPipelineFlush
*
* In pipeline mode , data will be flushed only when the out buffer reaches the
* threshold value . In non - pipeline mode , it behaves as stock pqFlush .
*
* Returns 0 on success .
*/
static int
pqPipelineFlush ( PGconn * conn )
{
if ( ( conn - > pipelineStatus ! = PQ_PIPELINE_ON ) | |
( conn - > outCount > = OUTBUFFER_THRESHOLD ) )
return pqFlush ( conn ) ;
return 0 ;
}
/*
* PQfreemem - safely frees memory allocated