@ -43,7 +43,7 @@
* the catalog . That ' s why our worker processes can also access the catalog
* information . ( In the Windows case , the workers are threads in the same
* process . To avoid problems , they work with cloned copies of the Archive
* data structure ; see init_spawned_worker_win32 ( ) . )
* data structure ; see RunWorker ( ) . )
*
* In the master process , the workerStatus field for each worker has one of
* the following values :
@ -83,9 +83,8 @@
*/
typedef struct
{
ArchiveHandle * AH ;
int pipeRead ;
int pipeWrite ;
ArchiveHandle * AH ; /* master database connection */
ParallelSlot * slot ; /* this worker's parallel slot */
} WorkerInfo ;
/* Windows implementation of pipe access */
@ -95,9 +94,6 @@ static int piperead(int s, char *buf, int len);
# else /* !WIN32 */
/* Signal handler flag */
static volatile sig_atomic_t wantAbort = 0 ;
/* Non-Windows implementation of pipe access */
# define pgpipe(a) pipe(a)
# define piperead(a,b,c) read(a,b,c)
@ -116,10 +112,37 @@ typedef struct ShutdownInformation
static ShutdownInformation shutdown_info ;
/*
* State info for signal handling .
* We assume signal_info initializes to zeroes .
*
* On Unix , myAH is the master DB connection in the master process , and the
* worker ' s own connection in worker processes . On Windows , we have only one
* instance of signal_info , so myAH is the master connection and the worker
* connections must be dug out of pstate - > parallelSlot [ ] .
*/
typedef struct DumpSignalInformation
{
ArchiveHandle * myAH ; /* database connection to issue cancel for */
ParallelState * pstate ; /* parallel state, if any */
bool handler_set ; /* signal handler set up in this process? */
# ifndef WIN32
bool am_worker ; /* am I a worker process? */
# endif
} DumpSignalInformation ;
static volatile DumpSignalInformation signal_info ;
# ifdef WIN32
static CRITICAL_SECTION signal_info_lock ;
# endif
/* Used from signal handlers, no buffering */
# define write_stderr(str) write(fileno(stderr), str, strlen(str))
# ifdef WIN32
/* file-scope variables */
static unsigned int tMasterThreadId = 0 ;
static HANDLE termEvent = INVALID_HANDLE_VALUE ;
static DWORD tls_index ;
/* globally visible variables (needed by exit_nicely) */
@ -134,7 +157,10 @@ static ParallelSlot *GetMyPSlot(ParallelState *pstate);
static void archive_close_connection ( int code , void * arg ) ;
static void ShutdownWorkersHard ( ParallelState * pstate ) ;
static void WaitForTerminatingWorkers ( ParallelState * pstate ) ;
static void RunWorker ( ArchiveHandle * AH , int pipefd [ 2 ] ) ;
static void setup_cancel_handler ( void ) ;
static void set_cancel_pstate ( ParallelState * pstate ) ;
static void set_cancel_slot_archive ( ParallelSlot * slot , ArchiveHandle * AH ) ;
static void RunWorker ( ArchiveHandle * AH , ParallelSlot * slot ) ;
static bool HasEveryWorkerTerminated ( ParallelState * pstate ) ;
static void lockTableForWorker ( ArchiveHandle * AH , TocEntry * te ) ;
static void WaitForCommands ( ArchiveHandle * AH , int pipefd [ 2 ] ) ;
@ -291,13 +317,13 @@ archive_close_connection(int code, void *arg)
if ( ! slot )
{
/*
* We ' re the master . Close our own database connection , if any ,
* and then forcibly shut down workers .
* We ' re the master . Forcibly shut down workers , then close our
* own database connection , if any .
*/
ShutdownWorkersHard ( si - > pstate ) ;
if ( si - > AHX )
DisconnectDatabase ( si - > AHX ) ;
ShutdownWorkersHard ( si - > pstate ) ;
}
else
{
@ -326,34 +352,13 @@ archive_close_connection(int code, void *arg)
}
}
/*
* Check to see if we ' ve been told to abort , and exit the process / thread if
* so . We don ' t print any error message ; that would just clutter the screen .
*
* If we have one worker that terminates for some reason , we ' d like the other
* threads to terminate as well ( and not finish with their 70 GB table dump
* first . . . ) . In Unix , the master sends SIGTERM and the worker ' s signal
* handler sets wantAbort to 1. In Windows we set a termEvent and this serves
* as the signal for worker threads to exit . Note that while we check this
* fairly frequently during data transfers , an idle worker doesn ' t come here
* at all , so additional measures are needed to force shutdown .
*
* XXX in parallel restore , slow server - side operations like CREATE INDEX
* are not interrupted by anything we do here . This needs more work .
*/
void
checkAborting ( ArchiveHandle * AH )
{
# ifdef WIN32
if ( WaitForSingleObject ( termEvent , 0 ) = = WAIT_OBJECT_0 )
# else
if ( wantAbort )
# endif
exit_nicely ( 1 ) ;
}
/*
* Forcibly shut down any remaining workers , waiting for them to finish .
*
* Note that we don ' t expect to come here during normal exit ( the workers
* should be long gone , and the ParallelState too ) . We ' re only here in an
* exit_horribly ( ) situation , so intervening to cancel active commands is
* appropriate .
*/
static void
ShutdownWorkersHard ( ParallelState * pstate )
@ -367,15 +372,37 @@ ShutdownWorkersHard(ParallelState *pstate)
for ( i = 0 ; i < pstate - > numWorkers ; i + + )
closesocket ( pstate - > parallelSlot [ i ] . pipeWrite ) ;
/*
* Force early termination of any commands currently in progress .
*/
# ifndef WIN32
/* On non-Windows, send SIGTERM to abort commands-in-progress. */
/* On non-Windows, send SIGTERM to each worker proc ess. */
for ( i = 0 ; i < pstate - > numWorkers ; i + + )
kill ( pstate - > parallelSlot [ i ] . pid , SIGTERM ) ;
{
pid_t pid = pstate - > parallelSlot [ i ] . pid ;
if ( pid ! = 0 )
kill ( pid , SIGTERM ) ;
}
# else
/* Non-idle workers monitor this event via checkAborting(). */
SetEvent ( termEvent ) ;
/*
* On Windows , send query cancels directly to the workers ' backends . Use
* a critical section to ensure worker threads don ' t change state .
*/
EnterCriticalSection ( & signal_info_lock ) ;
for ( i = 0 ; i < pstate - > numWorkers ; i + + )
{
ArchiveHandle * AH = pstate - > parallelSlot [ i ] . args - > AH ;
char errbuf [ 1 ] ;
if ( AH ! = NULL & & AH - > connCancel ! = NULL )
( void ) PQcancel ( AH - > connCancel , errbuf , sizeof ( errbuf ) ) ;
}
LeaveCriticalSection ( & signal_info_lock ) ;
# endif
/* Now wait for them to terminate. */
WaitForTerminatingWorkers ( pstate ) ;
}
@ -445,16 +472,316 @@ WaitForTerminatingWorkers(ParallelState *pstate)
}
}
/*
* Signal handler ( Unix only )
* Code for responding to cancel interrupts ( SIGINT , control - C , etc )
*
* This doesn ' t quite belong in this module , but it needs access to the
* ParallelState data , so there ' s not really a better place either .
*
* When we get a cancel interrupt , we could just die , but in pg_restore that
* could leave a SQL command ( e . g . , CREATE INDEX on a large table ) running
* for a long time . Instead , we try to send a cancel request and then die .
* pg_dump probably doesn ' t really need this , but we might as well use it
* there too . Note that sending the cancel directly from the signal handler
* is safe because PQcancel ( ) is written to make it so .
*
* In parallel operation on Unix , each process is responsible for canceling
* its own connection ( this must be so because nobody else has access to it ) .
* Furthermore , the master process should attempt to forward its signal to
* each child . In simple manual use of pg_dump / pg_restore , forwarding isn ' t
* needed because typing control - C at the console would deliver SIGINT to
* every member of the terminal process group - - - but in other scenarios it
* might be that only the master gets signaled .
*
* On Windows , the cancel handler runs in a separate thread , because that ' s
* how SetConsoleCtrlHandler works . We make it stop worker threads , send
* cancels on all active connections , and then return FALSE , which will allow
* the process to die . For safety ' s sake , we use a critical section to
* protect the PGcancel structures against being changed while the signal
* thread runs .
*/
# ifndef WIN32
/*
* Signal handler ( Unix only )
*/
static void
sigTermHandler ( SIGNAL_ARGS )
{
wantAbort = 1 ;
int i ;
char errbuf [ 1 ] ;
/*
* Some platforms allow delivery of new signals to interrupt an active
* signal handler . That could muck up our attempt to send PQcancel , so
* disable the signals that setup_cancel_handler enabled .
*/
pqsignal ( SIGINT , SIG_IGN ) ;
pqsignal ( SIGTERM , SIG_IGN ) ;
pqsignal ( SIGQUIT , SIG_IGN ) ;
/*
* If we ' re in the master , forward signal to all workers . ( It seems best
* to do this before PQcancel ; killing the master transaction will result
* in invalid - snapshot errors from active workers , which maybe we can
* quiet by killing workers first . ) Ignore any errors .
*/
if ( signal_info . pstate ! = NULL )
{
for ( i = 0 ; i < signal_info . pstate - > numWorkers ; i + + )
{
pid_t pid = signal_info . pstate - > parallelSlot [ i ] . pid ;
if ( pid ! = 0 )
kill ( pid , SIGTERM ) ;
}
}
/*
* Send QueryCancel if we have a connection to send to . Ignore errors ,
* there ' s not much we can do about them anyway .
*/
if ( signal_info . myAH ! = NULL & & signal_info . myAH - > connCancel ! = NULL )
( void ) PQcancel ( signal_info . myAH - > connCancel , errbuf , sizeof ( errbuf ) ) ;
/*
* Report we ' re quitting , using nothing more complicated than write ( 2 ) .
* When in parallel operation , only the master process should do this .
*/
if ( ! signal_info . am_worker )
{
if ( progname )
{
write_stderr ( progname ) ;
write_stderr ( " : " ) ;
}
write_stderr ( " terminated by user \n " ) ;
}
/* And die. */
exit ( 1 ) ;
}
/*
* Enable cancel interrupt handler , if not already done .
*/
static void
setup_cancel_handler ( void )
{
/*
* When forking , signal_info . handler_set will propagate into the new
* process , but that ' s fine because the signal handler state does too .
*/
if ( ! signal_info . handler_set )
{
signal_info . handler_set = true ;
pqsignal ( SIGINT , sigTermHandler ) ;
pqsignal ( SIGTERM , sigTermHandler ) ;
pqsignal ( SIGQUIT , sigTermHandler ) ;
}
}
# else /* WIN32 */
/*
* Console interrupt handler - - - runs in a newly - started thread .
*
* After stopping other threads and sending cancel requests on all open
* connections , we return FALSE which will allow the default ExitProcess ( )
* action to be taken .
*/
static BOOL WINAPI
consoleHandler ( DWORD dwCtrlType )
{
int i ;
char errbuf [ 1 ] ;
if ( dwCtrlType = = CTRL_C_EVENT | |
dwCtrlType = = CTRL_BREAK_EVENT )
{
/* Critical section prevents changing data we look at here */
EnterCriticalSection ( & signal_info_lock ) ;
/*
* If in parallel mode , stop worker threads and send QueryCancel to
* their connected backends . The main point of stopping the worker
* threads is to keep them from reporting the query cancels as errors ,
* which would clutter the user ' s screen . We needn ' t stop the master
* thread since it won ' t be doing much anyway . Do this before
* canceling the main transaction , else we might get invalid - snapshot
* errors reported before we can stop the workers . Ignore errors ,
* there ' s not much we can do about them anyway .
*/
if ( signal_info . pstate ! = NULL )
{
for ( i = 0 ; i < signal_info . pstate - > numWorkers ; i + + )
{
ParallelSlot * slot = & ( signal_info . pstate - > parallelSlot [ i ] ) ;
ArchiveHandle * AH = slot - > args - > AH ;
HANDLE hThread = ( HANDLE ) slot - > hThread ;
/*
* Using TerminateThread here may leave some resources leaked ,
* but it doesn ' t matter since we ' re about to end the whole
* process .
*/
if ( hThread ! = INVALID_HANDLE_VALUE )
TerminateThread ( hThread , 0 ) ;
if ( AH ! = NULL & & AH - > connCancel ! = NULL )
( void ) PQcancel ( AH - > connCancel , errbuf , sizeof ( errbuf ) ) ;
}
}
/*
* Send QueryCancel to master connection , if enabled . Ignore errors ,
* there ' s not much we can do about them anyway .
*/
if ( signal_info . myAH ! = NULL & & signal_info . myAH - > connCancel ! = NULL )
( void ) PQcancel ( signal_info . myAH - > connCancel ,
errbuf , sizeof ( errbuf ) ) ;
LeaveCriticalSection ( & signal_info_lock ) ;
/*
* Report we ' re quitting , using nothing more complicated than
* write ( 2 ) . ( We might be able to get away with using write_msg ( )
* here , but since we terminated other threads uncleanly above , it
* seems better to assume as little as possible . )
*/
if ( progname )
{
write_stderr ( progname ) ;
write_stderr ( " : " ) ;
}
write_stderr ( " terminated by user \n " ) ;
}
/* Always return FALSE to allow signal handling to continue */
return FALSE ;
}
/*
* Enable cancel interrupt handler , if not already done .
*/
static void
setup_cancel_handler ( void )
{
if ( ! signal_info . handler_set )
{
signal_info . handler_set = true ;
InitializeCriticalSection ( & signal_info_lock ) ;
SetConsoleCtrlHandler ( consoleHandler , TRUE ) ;
}
}
# endif /* WIN32 */
/*
* set_archive_cancel_info
*
* Fill AH - > connCancel with cancellation info for the specified database
* connection ; or clear it if conn is NULL .
*/
void
set_archive_cancel_info ( ArchiveHandle * AH , PGconn * conn )
{
PGcancel * oldConnCancel ;
/*
* Activate the interrupt handler if we didn ' t yet in this process . On
* Windows , this also initializes signal_info_lock ; therefore it ' s
* important that this happen at least once before we fork off any
* threads .
*/
setup_cancel_handler ( ) ;
/*
* On Unix , we assume that storing a pointer value is atomic with respect
* to any possible signal interrupt . On Windows , use a critical section .
*/
# ifdef WIN32
EnterCriticalSection ( & signal_info_lock ) ;
# endif
/* Free the old one if we have one */
oldConnCancel = AH - > connCancel ;
/* be sure interrupt handler doesn't use pointer while freeing */
AH - > connCancel = NULL ;
if ( oldConnCancel ! = NULL )
PQfreeCancel ( oldConnCancel ) ;
/* Set the new one if specified */
if ( conn )
AH - > connCancel = PQgetCancel ( conn ) ;
/*
* On Unix , there ' s only ever one active ArchiveHandle per process , so we
* can just set signal_info . myAH unconditionally . On Windows , do that
* only in the main thread ; worker threads have to make sure their
* ArchiveHandle appears in the pstate data , which is dealt with in
* RunWorker ( ) .
*/
# ifndef WIN32
signal_info . myAH = AH ;
# else
if ( mainThreadId = = GetCurrentThreadId ( ) )
signal_info . myAH = AH ;
# endif
# ifdef WIN32
LeaveCriticalSection ( & signal_info_lock ) ;
# endif
}
/*
* set_cancel_pstate
*
* Set signal_info . pstate to point to the specified ParallelState , if any .
* We need this mainly to have an interlock against Windows signal thread .
*/
static void
set_cancel_pstate ( ParallelState * pstate )
{
# ifdef WIN32
EnterCriticalSection ( & signal_info_lock ) ;
# endif
signal_info . pstate = pstate ;
# ifdef WIN32
LeaveCriticalSection ( & signal_info_lock ) ;
# endif
}
/*
* set_cancel_slot_archive
*
* Set ParallelSlot ' s AH field to point to the specified archive , if any .
* We need this mainly to have an interlock against Windows signal thread .
*/
static void
set_cancel_slot_archive ( ParallelSlot * slot , ArchiveHandle * AH )
{
# ifdef WIN32
EnterCriticalSection ( & signal_info_lock ) ;
# endif
slot - > args - > AH = AH ;
# ifdef WIN32
LeaveCriticalSection ( & signal_info_lock ) ;
# endif
}
/*
* This function is called by both Unix and Windows variants to set up
@ -462,19 +789,44 @@ sigTermHandler(SIGNAL_ARGS)
* upon return .
*/
static void
RunWorker ( ArchiveHandle * AH , int pipefd [ 2 ] )
RunWorker ( ArchiveHandle * AH , ParallelSlot * slot )
{
int pipefd [ 2 ] ;
/* fetch child ends of pipes */
pipefd [ PIPE_READ ] = slot - > pipeRevRead ;
pipefd [ PIPE_WRITE ] = slot - > pipeRevWrite ;
/*
* Clone the archive so that we have our own state to work with , and in
* particular our own database connection .
*
* We clone on Unix as well as Windows , even though technically we don ' t
* need to because fork ( ) gives us a copy in our own address space
* already . But CloneArchive resets the state information and also clones
* the database connection which both seem kinda helpful .
*/
AH = CloneArchive ( AH ) ;
/* Remember cloned archive where signal handler can find it */
set_cancel_slot_archive ( slot , AH ) ;
/*
* Call the setup worker function that ' s defined in the ArchiveHandle .
*/
( AH - > SetupWorkerPtr ) ( ( Archive * ) AH ) ;
Assert ( AH - > connection ! = NULL ) ;
/*
* Execute commands until done .
*/
WaitForCommands ( AH , pipefd ) ;
/*
* Disconnect from database and clean up .
*/
set_cancel_slot_archive ( slot , NULL ) ;
DisconnectDatabase ( & ( AH - > public ) ) ;
DeCloneArchive ( AH ) ;
}
/*
@ -484,22 +836,16 @@ RunWorker(ArchiveHandle *AH, int pipefd[2])
static unsigned __stdcall
init_spawned_worker_win32 ( WorkerInfo * wi )
{
ArchiveHandle * AH ;
int pipefd [ 2 ] = { wi - > pipeRead , wi - > pipeWrite } ;
/*
* Clone the archive so that we have our own state to work with , and in
* particular our own database connection .
*/
AH = CloneArchive ( wi - > AH ) ;
ArchiveHandle * AH = wi - > AH ;
ParallelSlot * slot = wi - > slot ;
/* Don't need WorkerInfo anymore */
free ( wi ) ;
/* Run the worker ... */
RunWorker ( AH , pipefd ) ;
RunWorker ( AH , slot ) ;
/* Clean up and exit the thread */
DeCloneArchive ( AH ) ;
/* Exit the thread */
_endthreadex ( 0 ) ;
return 0 ;
}
@ -519,9 +865,6 @@ ParallelBackupStart(ArchiveHandle *AH)
Assert ( AH - > public . numWorkers > 0 ) ;
/* Ensure stdio state is quiesced before forking */
fflush ( NULL ) ;
pstate = ( ParallelState * ) pg_malloc ( sizeof ( ParallelState ) ) ;
pstate - > numWorkers = AH - > public . numWorkers ;
@ -533,25 +876,31 @@ ParallelBackupStart(ArchiveHandle *AH)
pstate - > parallelSlot = ( ParallelSlot * ) pg_malloc ( slotSize ) ;
memset ( ( void * ) pstate - > parallelSlot , 0 , slotSize ) ;
/*
* Set the pstate in the shutdown_info . The exit handler uses pstate if
* set and falls back to AHX otherwise .
*/
shutdown_info . pstate = pstate ;
# ifdef WIN32
/* Set up thread management state */
tMasterThreadId = GetCurrentThreadId ( ) ;
termEvent = CreateEvent ( NULL , true , false , " Terminate " ) ;
/* Make fmtId() and fmtQualifiedId() use thread-local storage */
getLocalPQExpBuffer = getThreadLocalPQExpBuffer ;
# else
/* Set up signal handling state */
signal ( SIGTERM , sigTermHandler ) ;
signal ( SIGINT , sigTermHandler ) ;
signal ( SIGQUIT , sigTermHandler ) ;
# endif
/*
* Set the pstate in shutdown_info , to tell the exit handler that it must
* clean up workers as well as the main database connection . But we don ' t
* set this in signal_info yet , because we don ' t want child processes to
* inherit non - NULL signal_info . pstate .
*/
shutdown_info . pstate = pstate ;
/*
* Temporarily disable query cancellation on the master connection . This
* ensures that child processes won ' t inherit valid AH - > connCancel
* settings and thus won ' t try to issue cancels against the master ' s
* connection . No harm is done if we fail while it ' s disabled , because
* the master connection is idle at this point anyway .
*/
set_archive_cancel_info ( AH , NULL ) ;
/* Ensure stdio state is quiesced before forking */
fflush ( NULL ) ;
/* Create desired number of workers */
for ( i = 0 ; i < pstate - > numWorkers ; i + + )
{
@ -561,6 +910,7 @@ ParallelBackupStart(ArchiveHandle *AH)
# else
pid_t pid ;
# endif
ParallelSlot * slot = & ( pstate - > parallelSlot [ i ] ) ;
int pipeMW [ 2 ] ,
pipeWM [ 2 ] ;
@ -570,41 +920,40 @@ ParallelBackupStart(ArchiveHandle *AH)
" could not create communication channels: %s \n " ,
strerror ( errno ) ) ;
pstate - > parallelSlot [ i ] . workerStatus = WRKR_IDLE ;
pstate - > parallelSlot [ i ] . args = ( ParallelArgs * ) pg_malloc ( sizeof ( ParallelArgs ) ) ;
pstate - > parallelSlot [ i ] . args - > AH = NULL ;
pstate - > parallelSlot [ i ] . args - > te = NULL ;
slot - > workerStatus = WRKR_IDLE ;
slot - > args = ( ParallelArgs * ) pg_malloc ( sizeof ( ParallelArgs ) ) ;
slot - > args - > AH = NULL ;
slot - > args - > te = NULL ;
/* master's ends of the pipes */
pstate - > parallelSlot [ i ] . pipeRead = pipeWM [ PIPE_READ ] ;
pstate - > parallelSlot [ i ] . pipeWrite = pipeMW [ PIPE_WRITE ] ;
slot - > pipeRead = pipeWM [ PIPE_READ ] ;
slot - > pipeWrite = pipeMW [ PIPE_WRITE ] ;
/* child's ends of the pipes */
pstate - > parallelSlot [ i ] . pipeRevRead = pipeMW [ PIPE_READ ] ;
pstate - > parallelSlot [ i ] . pipeRevWrite = pipeWM [ PIPE_WRITE ] ;
slot - > pipeRevRead = pipeMW [ PIPE_READ ] ;
slot - > pipeRevWrite = pipeWM [ PIPE_WRITE ] ;
# ifdef WIN32
/* Create transient structure to pass args to worker function */
wi = ( WorkerInfo * ) pg_malloc ( sizeof ( WorkerInfo ) ) ;
wi - > AH = AH ;
wi - > pipeRead = pipeMW [ PIPE_READ ] ;
wi - > pipeWrite = pipeWM [ PIPE_WRITE ] ;
wi - > slot = slot ;
handle = _beginthreadex ( NULL , 0 , ( void * ) & init_spawned_worker_win32 ,
wi , 0 , & ( pstate - > parallelSlot [ i ] . threadId ) ) ;
pstate - > parallelSlot [ i ] . hThread = handle ;
wi , 0 , & ( slot - > threadId ) ) ;
slot - > hThread = handle ;
# else /* !WIN32 */
pid = fork ( ) ;
if ( pid = = 0 )
{
/* we are the worker */
int j ;
int pipefd [ 2 ] ;
pipefd [ 0 ] = pipeMW [ PIPE_READ ] ;
pipefd [ 1 ] = pipeWM [ PIPE_WRITE ] ;
/* this is needed for GetMyPSlot() */
slot - > pid = getpid ( ) ;
pstate - > parallelSlot [ i ] . pid = getpid ( ) ;
/* instruct signal handler that we're in a worker now */
signal_info . am_worker = true ;
/* close read end of Worker -> Master */
closesocket ( pipeWM [ PIPE_READ ] ) ;
@ -621,17 +970,8 @@ ParallelBackupStart(ArchiveHandle *AH)
closesocket ( pstate - > parallelSlot [ j ] . pipeWrite ) ;
}
/*
* Call CloneArchive on Unix as well as Windows , even though
* technically we don ' t need to because fork ( ) gives us a copy in
* our own address space already . But CloneArchive resets the
* state information and also clones the database connection which
* both seem kinda helpful .
*/
pstate - > parallelSlot [ i ] . args - > AH = CloneArchive ( AH ) ;
/* Run the worker ... */
RunWorker ( pstate - > parallelSlot [ i ] . args - > AH , pipefd ) ;
RunWorker ( AH , slot ) ;
/* We can just exit(0) when done */
exit ( 0 ) ;
@ -645,7 +985,7 @@ ParallelBackupStart(ArchiveHandle *AH)
}
/* In Master after successful fork */
pstate - > parallelSlot [ i ] . pid = pid ;
slot - > pid = pid ;
/* close read end of Master -> Worker */
closesocket ( pipeMW [ PIPE_READ ] ) ;
@ -660,9 +1000,22 @@ ParallelBackupStart(ArchiveHandle *AH)
* the workers to inherit this setting , though .
*/
# ifndef WIN32
signal ( SIGPIPE , SIG_IGN ) ;
pq signal( SIGPIPE , SIG_IGN ) ;
# endif
/*
* Re - establish query cancellation on the master connection .
*/
set_archive_cancel_info ( AH , AH - > connection ) ;
/*
* Tell the cancel signal handler to forward signals to worker processes ,
* too . ( As with query cancel , we did not need this earlier because the
* workers have not yet been given anything to do ; if we die before this
* point , any already - started workers will see EOF and quit promptly . )
*/
set_cancel_pstate ( pstate ) ;
return pstate ;
}
@ -692,10 +1045,11 @@ ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
WaitForTerminatingWorkers ( pstate ) ;
/*
* Unlink pstate from shutdown_info , so the exit handler will again fall
* back to closing AH - > connection ( if connected ) .
* Unlink pstate from shutdown_info , so the exit handler will not try to
* use it ; and likewise unlink from signal_info .
*/
shutdown_info . pstate = NULL ;
set_cancel_pstate ( NULL ) ;
/* Release state (mere neatnik-ism, since we're about to terminate) */
free ( pstate - > parallelSlot ) ;
@ -848,9 +1202,7 @@ WaitForCommands(ArchiveHandle *AH, int pipefd[2])
{
if ( ! ( command = getMessageFromMaster ( pipefd ) ) )
{
/* EOF ... clean up */
PQfinish ( AH - > connection ) ;
AH - > connection = NULL ;
/* EOF, so done */
return ;
}
@ -1114,44 +1466,20 @@ select_loop(int maxFd, fd_set *workerset)
int i ;
fd_set saveSet = * workerset ;
# ifdef WIN32
for ( ; ; )
{
/*
* Sleep a quarter of a second before checking if we should terminate .
*
* XXX we ' re not actually checking for a cancel interrupt . . . but we
* should be .
*/
struct timeval tv = { 0 , 250000 } ;
* workerset = saveSet ;
i = select ( maxFd + 1 , workerset , NULL , NULL , & tv ) ;
if ( i = = SOCKET_ERROR & & WSAGetLastError ( ) = = WSAEINTR )
continue ;
if ( i )
break ;
}
# else /* !WIN32 */
for ( ; ; )
{
* workerset = saveSet ;
i = select ( maxFd + 1 , workerset , NULL , NULL , NULL ) ;
/*
* If we Ctrl - C the master process , it ' s likely that we interrupt
* select ( ) here . The signal handler will set wantAbort = = true and
* the shutdown journey starts from here .
*/
if ( wantAbort )
exit_horribly ( modulename , " terminated by user \n " ) ;
# ifndef WIN32
if ( i < 0 & & errno = = EINTR )
continue ;
# else
if ( i = = SOCKET_ERROR & & WSAGetLastError ( ) = = WSAEINTR )
continue ;
# endif
break ;
}
# endif /* WIN32 */
return i ;
}