@ -34,6 +34,7 @@
# include "lib/stringinfo.h"
# include "libpq/pqsignal.h"
# include "miscadmin.h"
# include "nodes/pg_list.h"
# include "pgtime.h"
# include "postmaster/fork_process.h"
# include "postmaster/postmaster.h"
@ -93,11 +94,14 @@ static char *last_file_name = NULL;
static char * last_csv_file_name = NULL ;
/*
* Buffers for saving partial messages from different backends . We don ' t expect
* that there will be very many outstanding at one time , so 20 seems plenty of
* leeway . If this array gets full we won ' t lose messages , but we will lose
* the protocol protection against them being partially written or interleaved .
* Buffers for saving partial messages from different backends .
*
* Keep NBUFFER_LISTS lists of these , with the entry for a given source pid
* being in the list numbered ( pid % NBUFFER_LISTS ) , so as to cut down on
* the number of entries we have to examine for any one incoming message .
* There must never be more than one entry for the same source pid .
*
* An inactive buffer is not removed from its list , just held for re - use .
* An inactive buffer has pid = = 0 and undefined contents of data .
*/
typedef struct
@ -106,8 +110,8 @@ typedef struct
StringInfoData data ; /* accumulated data, as a StringInfo */
} save_buffer ;
# define CHUNK_SLOTS 20
static save_buffer saved_chunks [ CHUNK_SLO TS] ;
# define NBUFFER_LISTS 256
static List * buffer_lists [ NBUFFER_LIS TS] ;
/* These must be exported for EXEC_BACKEND case ... annoying */
# ifndef WIN32
@ -592,7 +596,7 @@ SysLogger_Start(void)
* Now we are done with the write end of the pipe .
* CloseHandle ( ) must not be called because the preceding
* close ( ) closes the underlying handle .
*/
*/
syslogPipe [ 1 ] = 0 ;
# endif
redirection_done = true ;
@ -734,6 +738,12 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
( p . is_last = = ' t ' | | p . is_last = = ' f ' | |
p . is_last = = ' T ' | | p . is_last = = ' F ' ) )
{
List * buffer_list ;
ListCell * cell ;
save_buffer * existing_slot = NULL ,
* free_slot = NULL ;
StringInfo str ;
chunklen = PIPE_HEADER_SIZE + p . len ;
/* Fall out of loop if we don't have the whole chunk yet */
@ -743,53 +753,54 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
dest = ( p . is_last = = ' T ' | | p . is_last = = ' F ' ) ?
LOG_DESTINATION_CSVLOG : LOG_DESTINATION_STDERR ;
if ( p . is_last = = ' f ' | | p . is_last = = ' F ' )
/* Locate any existing buffer for this source pid */
buffer_list = buffer_lists [ p . pid % NBUFFER_LISTS ] ;
foreach ( cell , buffer_list )
{
/*
* Save a complete non - final chunk in the per - pid buffer if
* possible - if not just write it out .
*/
int free_slot = - 1 ,
existing_slot = - 1 ;
int i ;
StringInfo str ;
save_buffer * buf = ( save_buffer * ) lfirst ( cell ) ;
for ( i = 0 ; i < CHUNK_SLOTS ; i + + )
if ( buf - > pid = = p . pid )
{
if ( saved_chunks [ i ] . pid = = p . pid )
{
existing_slot = i ;
break ;
}
if ( free_slot < 0 & & saved_chunks [ i ] . pid = = 0 )
free_slot = i ;
existing_slot = buf ;
break ;
}
if ( existing_slot > = 0 )
if ( buf - > pid = = 0 & & free_slot = = NULL )
free_slot = buf ;
}
if ( p . is_last = = ' f ' | | p . is_last = = ' F ' )
{
/*
* Save a complete non - final chunk in a per - pid buffer
*/
if ( existing_slot ! = NULL )
{
str = & ( saved_chunks [ existing_slot ] . data ) ;
/* Add chunk to data from preceding chunks */
str = & ( existing_slot - > data ) ;
appendBinaryStringInfo ( str ,
cursor + PIPE_HEADER_SIZE ,
p . len ) ;
}
else if ( free_slot > = 0 )
else
{
saved_chunks [ free_slot ] . pid = p . pid ;
str = & ( saved_chunks [ free_slot ] . data ) ;
/* First chunk of message, save in a new buffer */
if ( free_slot = = NULL )
{
/*
* Need a free slot , but there isn ' t one in the list ,
* so create a new one and extend the list with it .
*/
free_slot = palloc ( sizeof ( save_buffer ) ) ;
buffer_list = lappend ( buffer_list , free_slot ) ;
buffer_lists [ p . pid % NBUFFER_LISTS ] = buffer_list ;
}
free_slot - > pid = p . pid ;
str = & ( free_slot - > data ) ;
initStringInfo ( str ) ;
appendBinaryStringInfo ( str ,
cursor + PIPE_HEADER_SIZE ,
p . len ) ;
}
else
{
/*
* If there is no free slot we ' ll just have to take our
* chances and write out a partial message and hope that
* it ' s not followed by something from another pid .
*/
write_syslogger_file ( cursor + PIPE_HEADER_SIZE , p . len ,
dest ) ;
}
}
else
{
@ -797,26 +808,15 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
* Final chunk - - - add it to anything saved for that pid , and
* either way write the whole thing out .
*/
int existing_slot = - 1 ;
int i ;
StringInfo str ;
for ( i = 0 ; i < CHUNK_SLOTS ; i + + )
{
if ( saved_chunks [ i ] . pid = = p . pid )
{
existing_slot = i ;
break ;
}
}
if ( existing_slot > = 0 )
if ( existing_slot ! = NULL )
{
str = & ( saved_chunks [ existing_slot ] . data ) ;
str = & ( existing_slot - > data ) ;
appendBinaryStringInfo ( str ,
cursor + PIPE_HEADER_SIZE ,
p . len ) ;
write_syslogger_file ( str - > data , str - > len , dest ) ;
saved_chunks [ existing_slot ] . pid = 0 ;
/* Mark the buffer unused, and reclaim string storage */
existing_slot - > pid = 0 ;
pfree ( str - > data ) ;
}
else
@ -872,17 +872,27 @@ static void
flush_pipe_input ( char * logbuffer , int * bytes_in_logbuffer )
{
int i ;
StringInfo str ;
/* Dump any incomplete protocol messages */
for ( i = 0 ; i < CHUNK_SLO TS; i + + )
for ( i = 0 ; i < NBUFFER_LIS TS; i + + )
{
if ( saved_chunks [ i ] . pid ! = 0 )
List * list = buffer_lists [ i ] ;
ListCell * cell ;
foreach ( cell , list )
{
str = & ( saved_chunks [ i ] . data ) ;
write_syslogger_file ( str - > data , str - > len , LOG_DESTINATION_STDERR ) ;
saved_chunks [ i ] . pid = 0 ;
pfree ( str - > data ) ;
save_buffer * buf = ( save_buffer * ) lfirst ( cell ) ;
if ( buf - > pid ! = 0 )
{
StringInfo str = & ( buf - > data ) ;
write_syslogger_file ( str - > data , str - > len ,
LOG_DESTINATION_STDERR ) ;
/* Mark the buffer unused, and reclaim string storage */
buf - > pid = 0 ;
pfree ( str - > data ) ;
}
}
}