@ -55,10 +55,12 @@
* pq_peekbyte - peek at next byte from connection
* pq_putbytes - send bytes to connection ( not flushed until pq_flush )
* pq_flush - flush pending output
* pq_flush_if_writable - flush pending output if writable without blocking
* pq_getbyte_if_available - get a byte if available without blocking
*
* message - level I / O ( and old - style - COPY - OUT cruft ) :
* pq_putmessage - send a normal message ( suppressed in COPY OUT mode )
* pq_putmessage_noblock - buffer a normal message ( suppressed in COPY OUT )
* pq_startcopyout - inform libpq that a COPY OUT transfer is beginning
* pq_endcopyout - end a COPY OUT transfer
*
@ -92,6 +94,7 @@
# include "miscadmin.h"
# include "storage/ipc.h"
# include "utils/guc.h"
# include "utils/memutils.h"
/*
* Configuration options
@ -105,15 +108,21 @@ static char sock_path[MAXPGPATH];
/*
* Buffers for low - level I / O
* Buffers for low - level I / O .
*
* The receive buffer is fixed size . Send buffer is usually 8 k , but can be
* enlarged by pq_putmessage_noblock ( ) if the message doesn ' t fit otherwise .
*/
# define PQ_BUFFER_SIZE 8192
# define PQ_SEND_BUFFER_SIZE 8192
# define PQ_RECV_BUFFER_SIZE 8192
static char PqSendBuffer [ PQ_BUFFER_SIZE ] ;
static char * PqSendBuffer ;
static int PqSendBufferSize ; /* Size send buffer */
static int PqSendPointer ; /* Next index to store a byte in PqSendBuffer */
static int PqSendStart ; /* Next index to send a byte in PqSendBuffer */
static char PqRecvBuffer [ PQ_BUFFER_SIZE ] ;
static char PqRecvBuffer [ PQ_RECV_ BUFFER_SIZE ] ;
static int PqRecvPointer ; /* Next index to read a byte from PqRecvBuffer */
static int PqRecvLength ; /* End of data available in PqRecvBuffer */
@ -128,6 +137,7 @@ static bool DoingCopyOut;
static void pq_close ( int code , Datum arg ) ;
static int internal_putbytes ( const char * s , size_t len ) ;
static int internal_flush ( void ) ;
static void pq_set_nonblocking ( bool nonblocking ) ;
# ifdef HAVE_UNIX_SOCKETS
static int Lock_AF_UNIX ( unsigned short portNumber , char * unixSocketName ) ;
@ -142,7 +152,9 @@ static int Setup_AF_UNIX(void);
void
pq_init ( void )
{
PqSendPointer = PqRecvPointer = PqRecvLength = 0 ;
PqSendBufferSize = PQ_SEND_BUFFER_SIZE ;
PqSendBuffer = MemoryContextAlloc ( TopMemoryContext , PqSendBufferSize ) ;
PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0 ;
PqCommBusy = false ;
DoingCopyOut = false ;
on_proc_exit ( pq_close , 0 ) ;
@ -732,6 +744,42 @@ TouchSocketFile(void)
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
/* --------------------------------
* pq_set_nonblocking - set socket blocking / non - blocking
*
* Sets the socket non - blocking if nonblocking is TRUE , or sets it
* blocking otherwise .
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
static void
pq_set_nonblocking ( bool nonblocking )
{
if ( MyProcPort - > noblock = = nonblocking )
return ;
# ifdef WIN32
pgwin32_noblock = nonblocking ? 1 : 0 ;
# else
/*
* Use COMMERROR on failure , because ERROR would try to send the error
* to the client , which might require changing the mode again , leading
* to infinite recursion .
*/
if ( nonblocking )
{
if ( ! pg_set_noblock ( MyProcPort - > sock ) )
ereport ( COMMERROR ,
( errmsg ( " could not set socket to non-blocking mode: %m " ) ) ) ;
}
else
{
if ( ! pg_set_block ( MyProcPort - > sock ) )
ereport ( COMMERROR ,
( errmsg ( " could not set socket to blocking mode: %m " ) ) ) ;
}
# endif
MyProcPort - > noblock = nonblocking ;
}
/* --------------------------------
* pq_recvbuf - load some bytes into the input buffer
@ -756,13 +804,16 @@ pq_recvbuf(void)
PqRecvLength = PqRecvPointer = 0 ;
}
/* Ensure that we're in blocking mode */
pq_set_nonblocking ( false ) ;
/* Can fill buffer from PqRecvLength and upwards */
for ( ; ; )
{
int r ;
r = secure_read ( MyProcPort , PqRecvBuffer + PqRecvLength ,
PQ_BUFFER_SIZE - PqRecvLength ) ;
PQ_RECV_ BUFFER_SIZE - PqRecvLength ) ;
if ( r < 0 )
{
@ -825,7 +876,6 @@ pq_peekbyte(void)
return ( unsigned char ) PqRecvBuffer [ PqRecvPointer ] ;
}
/* --------------------------------
* pq_getbyte_if_available - get a single byte from connection ,
* if available
@ -845,72 +895,38 @@ pq_getbyte_if_available(unsigned char *c)
return 1 ;
}
/* Temporarily put the socket into non-blocking mode */
# ifdef WIN32
pgwin32_noblock = 1 ;
# else
if ( ! pg_set_noblock ( MyProcPort - > sock ) )
ereport ( ERROR ,
( errmsg ( " could not set socket to non-blocking mode: %m " ) ) ) ;
# endif
MyProcPort - > noblock = true ;
PG_TRY ( ) ;
/* Put the socket into non-blocking mode */
pq_set_nonblocking ( true ) ;
r = secure_read ( MyProcPort , c , 1 ) ;
if ( r < 0 )
{
r = secure_read ( MyProcPort , c , 1 ) ;
if ( r < 0 )
/*
* Ok if no data available without blocking or interrupted ( though
* EINTR really shouldn ' t happen with a non - blocking socket ) .
* Report other errors .
*/
if ( errno = = EAGAIN | | errno = = EWOULDBLOCK | | errno = = EINTR )
r = 0 ;
else
{
/*
* Ok if no data available without blocking or interrupted ( though
* EINTR really shouldn ' t happen with a non - blocking socket ) .
* Report other errors .
* Careful : an ereport ( ) that tries to write to the client
* would cause recursion to here , leading to stack overflow
* and core dump ! This message must go * only * to the
* postmaster log .
*/
if ( errno = = EAGAIN | | errno = = EWOULDBLOCK | | errno = = EINTR )
r = 0 ;
else
{
/*
* Careful : an ereport ( ) that tries to write to the client
* would cause recursion to here , leading to stack overflow
* and core dump ! This message must go * only * to the
* postmaster log .
*/
ereport ( COMMERROR ,
( errcode_for_socket_access ( ) ,
errmsg ( " could not receive data from client: %m " ) ) ) ;
r = EOF ;
}
}
else if ( r = = 0 )
{
/* EOF detected */
ereport ( COMMERROR ,
( errcode_for_socket_access ( ) ,
errmsg ( " could not receive data from client: %m " ) ) ) ;
r = EOF ;
}
}
PG_CATCH ( ) ;
else if ( r = = 0 )
{
/*
* The rest of the backend code assumes the socket is in blocking
* mode , so treat failure as FATAL .
*/
# ifdef WIN32
pgwin32_noblock = 0 ;
# else
if ( ! pg_set_block ( MyProcPort - > sock ) )
ereport ( FATAL ,
( errmsg ( " could not set socket to blocking mode: %m " ) ) ) ;
# endif
MyProcPort - > noblock = false ;
PG_RE_THROW ( ) ;
/* EOF detected */
r = EOF ;
}
PG_END_TRY ( ) ;
# ifdef WIN32
pgwin32_noblock = 0 ;
# else
if ( ! pg_set_block ( MyProcPort - > sock ) )
ereport ( FATAL ,
( errmsg ( " could not set socket to blocking mode: %m " ) ) ) ;
# endif
MyProcPort - > noblock = false ;
return r ;
}
@ -1138,10 +1154,13 @@ internal_putbytes(const char *s, size_t len)
while ( len > 0 )
{
/* If buffer is full, then flush it out */
if ( PqSendPointer > = PQ_BUFFER_SIZE )
if ( PqSendPointer > = PqSendBufferSize )
{
pq_set_nonblocking ( false ) ;
if ( internal_flush ( ) )
return EOF ;
amount = PQ_BUFFER_SIZE - PqSendPointer ;
}
amount = PqSendBufferSize - PqSendPointer ;
if ( amount > len )
amount = len ;
memcpy ( PqSendBuffer + PqSendPointer , s , amount ) ;
@ -1167,17 +1186,25 @@ pq_flush(void)
if ( PqCommBusy )
return 0 ;
PqCommBusy = true ;
pq_set_nonblocking ( false ) ;
res = internal_flush ( ) ;
PqCommBusy = false ;
return res ;
}
/* --------------------------------
* internal_flush - flush pending output
*
* Returns 0 if OK ( meaning everything was sent , or operation would block
* and the socket is in non - blocking mode ) , or EOF if trouble .
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
static int
internal_flush ( void )
{
static int last_reported_send_errno = 0 ;
char * bufptr = PqSendBuffer ;
char * bufptr = PqSendBuffer + PqSendStart ;
char * bufend = PqSendBuffer + PqSendPointer ;
while ( bufptr < bufend )
@ -1191,6 +1218,16 @@ internal_flush(void)
if ( errno = = EINTR )
continue ; /* Ok if we were interrupted */
/*
* Ok if no data writable without blocking , and the socket
* is in non - blocking mode .
*/
if ( errno = = EAGAIN | |
errno = = EWOULDBLOCK )
{
return 0 ;
}
/*
* Careful : an ereport ( ) that tries to write to the client would
* cause recursion to here , leading to stack overflow and core
@ -1212,18 +1249,56 @@ internal_flush(void)
* We drop the buffered data anyway so that processing can
* continue , even though we ' ll probably quit soon .
*/
PqSendPointer = 0 ;
PqSendStart = PqSend Pointer = 0 ;
return EOF ;
}
last_reported_send_errno = 0 ; /* reset after any successful send */
bufptr + = r ;
PqSendStart + = r ;
}
PqSendPointer = 0 ;
PqSendStart = PqSend Pointer = 0 ;
return 0 ;
}
/* --------------------------------
* pq_flush_if_writable - flush pending output if writable without blocking
*
* Returns 0 if OK , or EOF if trouble .
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
int
pq_flush_if_writable ( void )
{
int res ;
/* Quick exit if nothing to do */
if ( PqSendPointer = = PqSendStart )
return 0 ;
/* No-op if reentrant call */
if ( PqCommBusy )
return 0 ;
/* Temporarily put the socket into non-blocking mode */
pq_set_nonblocking ( true ) ;
PqCommBusy = true ;
res = internal_flush ( ) ;
PqCommBusy = false ;
return res ;
}
/* --------------------------------
* pq_is_send_pending - is there any pending data in the output buffer ?
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
bool
pq_is_send_pending ( void )
{
return ( PqSendStart < PqSendPointer ) ;
}
/* --------------------------------
* Message - level I / O routines begin here .
@ -1285,6 +1360,33 @@ fail:
return EOF ;
}
/* --------------------------------
* pq_putmessage_noblock - like pq_putmessage , but never blocks
*
* If the output buffer is too small to hold the message , the buffer
* is enlarged .
*/
void
pq_putmessage_noblock ( char msgtype , const char * s , size_t len )
{
int res ;
int required ;
/*
* Ensure we have enough space in the output buffer for the message header
* as well as the message itself .
*/
required = PqSendPointer + 1 + 4 + len ;
if ( required > PqSendBufferSize )
{
PqSendBuffer = repalloc ( PqSendBuffer , required ) ;
PqSendBufferSize = required ;
}
res = pq_putmessage ( msgtype , s , len ) ;
Assert ( res = = 0 ) ; /* should not fail when the message fits in buffer */
}
/* --------------------------------
* pq_startcopyout - inform libpq that an old - style COPY OUT transfer
* is beginning