|
|
|
@ -16,20 +16,20 @@ |
|
|
|
/*
|
|
|
|
/*
|
|
|
|
* Parallel operation works like this: |
|
|
|
* Parallel operation works like this: |
|
|
|
* |
|
|
|
* |
|
|
|
* The original, master process calls ParallelBackupStart(), which forks off |
|
|
|
* The original, leader process calls ParallelBackupStart(), which forks off |
|
|
|
* the desired number of worker processes, which each enter WaitForCommands(). |
|
|
|
* the desired number of worker processes, which each enter WaitForCommands(). |
|
|
|
* |
|
|
|
* |
|
|
|
* The master process dispatches an individual work item to one of the worker |
|
|
|
* The leader process dispatches an individual work item to one of the worker |
|
|
|
* processes in DispatchJobForTocEntry(). We send a command string such as |
|
|
|
* processes in DispatchJobForTocEntry(). We send a command string such as |
|
|
|
* "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID. |
|
|
|
* "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID. |
|
|
|
* The worker process receives and decodes the command and passes it to the |
|
|
|
* The worker process receives and decodes the command and passes it to the |
|
|
|
* routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr, |
|
|
|
* routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr, |
|
|
|
* which are routines of the current archive format. That routine performs |
|
|
|
* which are routines of the current archive format. That routine performs |
|
|
|
* the required action (dump or restore) and returns an integer status code. |
|
|
|
* the required action (dump or restore) and returns an integer status code. |
|
|
|
* This is passed back to the master where we pass it to the |
|
|
|
* This is passed back to the leader where we pass it to the |
|
|
|
* ParallelCompletionPtr callback function that was passed to |
|
|
|
* ParallelCompletionPtr callback function that was passed to |
|
|
|
* DispatchJobForTocEntry(). The callback function does state updating |
|
|
|
* DispatchJobForTocEntry(). The callback function does state updating |
|
|
|
* for the master control logic in pg_backup_archiver.c. |
|
|
|
* for the leader control logic in pg_backup_archiver.c. |
|
|
|
* |
|
|
|
* |
|
|
|
* In principle additional archive-format-specific information might be needed |
|
|
|
* In principle additional archive-format-specific information might be needed |
|
|
|
* in commands or worker status responses, but so far that hasn't proved |
|
|
|
* in commands or worker status responses, but so far that hasn't proved |
|
|
|
@ -40,7 +40,7 @@ |
|
|
|
* threads in the same process. To avoid problems, they work with cloned |
|
|
|
* threads in the same process. To avoid problems, they work with cloned |
|
|
|
* copies of the Archive data structure; see RunWorker().) |
|
|
|
* copies of the Archive data structure; see RunWorker().) |
|
|
|
* |
|
|
|
* |
|
|
|
* In the master process, the workerStatus field for each worker has one of |
|
|
|
* In the leader process, the workerStatus field for each worker has one of |
|
|
|
* the following values: |
|
|
|
* the following values: |
|
|
|
* WRKR_NOT_STARTED: we've not yet forked this worker |
|
|
|
* WRKR_NOT_STARTED: we've not yet forked this worker |
|
|
|
* WRKR_IDLE: it's waiting for a command |
|
|
|
* WRKR_IDLE: it's waiting for a command |
|
|
|
@ -88,8 +88,8 @@ typedef enum |
|
|
|
/*
|
|
|
|
/*
|
|
|
|
* Private per-parallel-worker state (typedef for this is in parallel.h). |
|
|
|
* Private per-parallel-worker state (typedef for this is in parallel.h). |
|
|
|
* |
|
|
|
* |
|
|
|
* Much of this is valid only in the master process (or, on Windows, should |
|
|
|
* Much of this is valid only in the leader process (or, on Windows, should |
|
|
|
* be touched only by the master thread). But the AH field should be touched |
|
|
|
* be touched only by the leader thread). But the AH field should be touched |
|
|
|
* only by workers. The pipe descriptors are valid everywhere. |
|
|
|
* only by workers. The pipe descriptors are valid everywhere. |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
struct ParallelSlot |
|
|
|
struct ParallelSlot |
|
|
|
@ -102,7 +102,7 @@ struct ParallelSlot |
|
|
|
|
|
|
|
|
|
|
|
ArchiveHandle *AH; /* Archive data worker is using */ |
|
|
|
ArchiveHandle *AH; /* Archive data worker is using */ |
|
|
|
|
|
|
|
|
|
|
|
int pipeRead; /* master's end of the pipes */ |
|
|
|
int pipeRead; /* leader's end of the pipes */ |
|
|
|
int pipeWrite; |
|
|
|
int pipeWrite; |
|
|
|
int pipeRevRead; /* child's end of the pipes */ |
|
|
|
int pipeRevRead; /* child's end of the pipes */ |
|
|
|
int pipeRevWrite; |
|
|
|
int pipeRevWrite; |
|
|
|
@ -124,7 +124,7 @@ struct ParallelSlot |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
typedef struct |
|
|
|
typedef struct |
|
|
|
{ |
|
|
|
{ |
|
|
|
ArchiveHandle *AH; /* master database connection */ |
|
|
|
ArchiveHandle *AH; /* leader database connection */ |
|
|
|
ParallelSlot *slot; /* this worker's parallel slot */ |
|
|
|
ParallelSlot *slot; /* this worker's parallel slot */ |
|
|
|
} WorkerInfo; |
|
|
|
} WorkerInfo; |
|
|
|
|
|
|
|
|
|
|
|
@ -157,9 +157,9 @@ static ShutdownInformation shutdown_info; |
|
|
|
* State info for signal handling. |
|
|
|
* State info for signal handling. |
|
|
|
* We assume signal_info initializes to zeroes. |
|
|
|
* We assume signal_info initializes to zeroes. |
|
|
|
* |
|
|
|
* |
|
|
|
* On Unix, myAH is the master DB connection in the master process, and the |
|
|
|
* On Unix, myAH is the leader DB connection in the leader process, and the |
|
|
|
* worker's own connection in worker processes. On Windows, we have only one |
|
|
|
* worker's own connection in worker processes. On Windows, we have only one |
|
|
|
* instance of signal_info, so myAH is the master connection and the worker |
|
|
|
* instance of signal_info, so myAH is the leader connection and the worker |
|
|
|
* connections must be dug out of pstate->parallelSlot[]. |
|
|
|
* connections must be dug out of pstate->parallelSlot[]. |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
typedef struct DumpSignalInformation |
|
|
|
typedef struct DumpSignalInformation |
|
|
|
@ -216,8 +216,8 @@ static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te); |
|
|
|
static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]); |
|
|
|
static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]); |
|
|
|
static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, |
|
|
|
static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, |
|
|
|
bool do_wait); |
|
|
|
bool do_wait); |
|
|
|
static char *getMessageFromMaster(int pipefd[2]); |
|
|
|
static char *getMessageFromLeader(int pipefd[2]); |
|
|
|
static void sendMessageToMaster(int pipefd[2], const char *str); |
|
|
|
static void sendMessageToLeader(int pipefd[2], const char *str); |
|
|
|
static int select_loop(int maxFd, fd_set *workerset); |
|
|
|
static int select_loop(int maxFd, fd_set *workerset); |
|
|
|
static char *getMessageFromWorker(ParallelState *pstate, |
|
|
|
static char *getMessageFromWorker(ParallelState *pstate, |
|
|
|
bool do_wait, int *worker); |
|
|
|
bool do_wait, int *worker); |
|
|
|
@ -277,7 +277,7 @@ init_parallel_dump_utils(void) |
|
|
|
/*
|
|
|
|
/*
|
|
|
|
* Find the ParallelSlot for the current worker process or thread. |
|
|
|
* Find the ParallelSlot for the current worker process or thread. |
|
|
|
* |
|
|
|
* |
|
|
|
* Returns NULL if no matching slot is found (this implies we're the master). |
|
|
|
* Returns NULL if no matching slot is found (this implies we're the leader). |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
static ParallelSlot * |
|
|
|
static ParallelSlot * |
|
|
|
GetMyPSlot(ParallelState *pstate) |
|
|
|
GetMyPSlot(ParallelState *pstate) |
|
|
|
@ -367,7 +367,7 @@ archive_close_connection(int code, void *arg) |
|
|
|
if (!slot) |
|
|
|
if (!slot) |
|
|
|
{ |
|
|
|
{ |
|
|
|
/*
|
|
|
|
/*
|
|
|
|
* We're the master. Forcibly shut down workers, then close our |
|
|
|
* We're the leader. Forcibly shut down workers, then close our |
|
|
|
* own database connection, if any. |
|
|
|
* own database connection, if any. |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
ShutdownWorkersHard(si->pstate); |
|
|
|
ShutdownWorkersHard(si->pstate); |
|
|
|
@ -381,7 +381,7 @@ archive_close_connection(int code, void *arg) |
|
|
|
* We're a worker. Shut down our own DB connection if any. On |
|
|
|
* We're a worker. Shut down our own DB connection if any. On |
|
|
|
* Windows, we also have to close our communication sockets, to |
|
|
|
* Windows, we also have to close our communication sockets, to |
|
|
|
* emulate what will happen on Unix when the worker process exits. |
|
|
|
* emulate what will happen on Unix when the worker process exits. |
|
|
|
* (Without this, if this is a premature exit, the master would |
|
|
|
* (Without this, if this is a premature exit, the leader would |
|
|
|
* fail to detect it because there would be no EOF condition on |
|
|
|
* fail to detect it because there would be no EOF condition on |
|
|
|
* the other end of the pipe.) |
|
|
|
* the other end of the pipe.) |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
@ -396,7 +396,7 @@ archive_close_connection(int code, void *arg) |
|
|
|
} |
|
|
|
} |
|
|
|
else |
|
|
|
else |
|
|
|
{ |
|
|
|
{ |
|
|
|
/* Non-parallel operation: just kill the master DB connection */ |
|
|
|
/* Non-parallel operation: just kill the leader DB connection */ |
|
|
|
if (si->AHX) |
|
|
|
if (si->AHX) |
|
|
|
DisconnectDatabase(si->AHX); |
|
|
|
DisconnectDatabase(si->AHX); |
|
|
|
} |
|
|
|
} |
|
|
|
@ -541,11 +541,11 @@ WaitForTerminatingWorkers(ParallelState *pstate) |
|
|
|
* |
|
|
|
* |
|
|
|
* In parallel operation on Unix, each process is responsible for canceling |
|
|
|
* In parallel operation on Unix, each process is responsible for canceling |
|
|
|
* its own connection (this must be so because nobody else has access to it). |
|
|
|
* its own connection (this must be so because nobody else has access to it). |
|
|
|
* Furthermore, the master process should attempt to forward its signal to |
|
|
|
* Furthermore, the leader process should attempt to forward its signal to |
|
|
|
* each child. In simple manual use of pg_dump/pg_restore, forwarding isn't |
|
|
|
* each child. In simple manual use of pg_dump/pg_restore, forwarding isn't |
|
|
|
* needed because typing control-C at the console would deliver SIGINT to |
|
|
|
* needed because typing control-C at the console would deliver SIGINT to |
|
|
|
* every member of the terminal process group --- but in other scenarios it |
|
|
|
* every member of the terminal process group --- but in other scenarios it |
|
|
|
* might be that only the master gets signaled. |
|
|
|
* might be that only the leader gets signaled. |
|
|
|
* |
|
|
|
* |
|
|
|
* On Windows, the cancel handler runs in a separate thread, because that's |
|
|
|
* On Windows, the cancel handler runs in a separate thread, because that's |
|
|
|
* how SetConsoleCtrlHandler works. We make it stop worker threads, send |
|
|
|
* how SetConsoleCtrlHandler works. We make it stop worker threads, send |
|
|
|
@ -576,8 +576,8 @@ sigTermHandler(SIGNAL_ARGS) |
|
|
|
pqsignal(SIGQUIT, SIG_IGN); |
|
|
|
pqsignal(SIGQUIT, SIG_IGN); |
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
/*
|
|
|
|
* If we're in the master, forward signal to all workers. (It seems best |
|
|
|
* If we're in the leader, forward signal to all workers. (It seems best |
|
|
|
* to do this before PQcancel; killing the master transaction will result |
|
|
|
* to do this before PQcancel; killing the leader transaction will result |
|
|
|
* in invalid-snapshot errors from active workers, which maybe we can |
|
|
|
* in invalid-snapshot errors from active workers, which maybe we can |
|
|
|
* quiet by killing workers first.) Ignore any errors. |
|
|
|
* quiet by killing workers first.) Ignore any errors. |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
@ -601,7 +601,7 @@ sigTermHandler(SIGNAL_ARGS) |
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
/*
|
|
|
|
* Report we're quitting, using nothing more complicated than write(2). |
|
|
|
* Report we're quitting, using nothing more complicated than write(2). |
|
|
|
* When in parallel operation, only the master process should do this. |
|
|
|
* When in parallel operation, only the leader process should do this. |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
if (!signal_info.am_worker) |
|
|
|
if (!signal_info.am_worker) |
|
|
|
{ |
|
|
|
{ |
|
|
|
@ -665,7 +665,7 @@ consoleHandler(DWORD dwCtrlType) |
|
|
|
* If in parallel mode, stop worker threads and send QueryCancel to |
|
|
|
* If in parallel mode, stop worker threads and send QueryCancel to |
|
|
|
* their connected backends. The main point of stopping the worker |
|
|
|
* their connected backends. The main point of stopping the worker |
|
|
|
* threads is to keep them from reporting the query cancels as errors, |
|
|
|
* threads is to keep them from reporting the query cancels as errors, |
|
|
|
* which would clutter the user's screen. We needn't stop the master |
|
|
|
* which would clutter the user's screen. We needn't stop the leader |
|
|
|
* thread since it won't be doing much anyway. Do this before |
|
|
|
* thread since it won't be doing much anyway. Do this before |
|
|
|
* canceling the main transaction, else we might get invalid-snapshot |
|
|
|
* canceling the main transaction, else we might get invalid-snapshot |
|
|
|
* errors reported before we can stop the workers. Ignore errors, |
|
|
|
* errors reported before we can stop the workers. Ignore errors, |
|
|
|
@ -693,7 +693,7 @@ consoleHandler(DWORD dwCtrlType) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
/*
|
|
|
|
* Send QueryCancel to master connection, if enabled. Ignore errors, |
|
|
|
* Send QueryCancel to leader connection, if enabled. Ignore errors, |
|
|
|
* there's not much we can do about them anyway. |
|
|
|
* there's not much we can do about them anyway. |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL) |
|
|
|
if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL) |
|
|
|
@ -949,11 +949,11 @@ ParallelBackupStart(ArchiveHandle *AH) |
|
|
|
shutdown_info.pstate = pstate; |
|
|
|
shutdown_info.pstate = pstate; |
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
/*
|
|
|
|
* Temporarily disable query cancellation on the master connection. This |
|
|
|
* Temporarily disable query cancellation on the leader connection. This |
|
|
|
* ensures that child processes won't inherit valid AH->connCancel |
|
|
|
* ensures that child processes won't inherit valid AH->connCancel |
|
|
|
* settings and thus won't try to issue cancels against the master's |
|
|
|
* settings and thus won't try to issue cancels against the leader's |
|
|
|
* connection. No harm is done if we fail while it's disabled, because |
|
|
|
* connection. No harm is done if we fail while it's disabled, because |
|
|
|
* the master connection is idle at this point anyway. |
|
|
|
* the leader connection is idle at this point anyway. |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
set_archive_cancel_info(AH, NULL); |
|
|
|
set_archive_cancel_info(AH, NULL); |
|
|
|
|
|
|
|
|
|
|
|
@ -977,7 +977,7 @@ ParallelBackupStart(ArchiveHandle *AH) |
|
|
|
if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0) |
|
|
|
if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0) |
|
|
|
fatal("could not create communication channels: %m"); |
|
|
|
fatal("could not create communication channels: %m"); |
|
|
|
|
|
|
|
|
|
|
|
/* master's ends of the pipes */ |
|
|
|
/* leader's ends of the pipes */ |
|
|
|
slot->pipeRead = pipeWM[PIPE_READ]; |
|
|
|
slot->pipeRead = pipeWM[PIPE_READ]; |
|
|
|
slot->pipeWrite = pipeMW[PIPE_WRITE]; |
|
|
|
slot->pipeWrite = pipeMW[PIPE_WRITE]; |
|
|
|
/* child's ends of the pipes */ |
|
|
|
/* child's ends of the pipes */ |
|
|
|
@ -1008,13 +1008,13 @@ ParallelBackupStart(ArchiveHandle *AH) |
|
|
|
/* instruct signal handler that we're in a worker now */ |
|
|
|
/* instruct signal handler that we're in a worker now */ |
|
|
|
signal_info.am_worker = true; |
|
|
|
signal_info.am_worker = true; |
|
|
|
|
|
|
|
|
|
|
|
/* close read end of Worker -> Master */ |
|
|
|
/* close read end of Worker -> Leader */ |
|
|
|
closesocket(pipeWM[PIPE_READ]); |
|
|
|
closesocket(pipeWM[PIPE_READ]); |
|
|
|
/* close write end of Master -> Worker */ |
|
|
|
/* close write end of Leader -> Worker */ |
|
|
|
closesocket(pipeMW[PIPE_WRITE]); |
|
|
|
closesocket(pipeMW[PIPE_WRITE]); |
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
/*
|
|
|
|
* Close all inherited fds for communication of the master with |
|
|
|
* Close all inherited fds for communication of the leader with |
|
|
|
* previously-forked workers. |
|
|
|
* previously-forked workers. |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
for (j = 0; j < i; j++) |
|
|
|
for (j = 0; j < i; j++) |
|
|
|
@ -1035,19 +1035,19 @@ ParallelBackupStart(ArchiveHandle *AH) |
|
|
|
fatal("could not create worker process: %m"); |
|
|
|
fatal("could not create worker process: %m"); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/* In Master after successful fork */ |
|
|
|
/* In Leader after successful fork */ |
|
|
|
slot->pid = pid; |
|
|
|
slot->pid = pid; |
|
|
|
slot->workerStatus = WRKR_IDLE; |
|
|
|
slot->workerStatus = WRKR_IDLE; |
|
|
|
|
|
|
|
|
|
|
|
/* close read end of Master -> Worker */ |
|
|
|
/* close read end of Leader -> Worker */ |
|
|
|
closesocket(pipeMW[PIPE_READ]); |
|
|
|
closesocket(pipeMW[PIPE_READ]); |
|
|
|
/* close write end of Worker -> Master */ |
|
|
|
/* close write end of Worker -> Leader */ |
|
|
|
closesocket(pipeWM[PIPE_WRITE]); |
|
|
|
closesocket(pipeWM[PIPE_WRITE]); |
|
|
|
#endif /* WIN32 */ |
|
|
|
#endif /* WIN32 */ |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
/*
|
|
|
|
* Having forked off the workers, disable SIGPIPE so that master isn't |
|
|
|
* Having forked off the workers, disable SIGPIPE so that leader isn't |
|
|
|
* killed if it tries to send a command to a dead worker. We don't want |
|
|
|
* killed if it tries to send a command to a dead worker. We don't want |
|
|
|
* the workers to inherit this setting, though. |
|
|
|
* the workers to inherit this setting, though. |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
@ -1056,7 +1056,7 @@ ParallelBackupStart(ArchiveHandle *AH) |
|
|
|
#endif |
|
|
|
#endif |
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
/*
|
|
|
|
* Re-establish query cancellation on the master connection. |
|
|
|
* Re-establish query cancellation on the leader connection. |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
set_archive_cancel_info(AH, AH->connection); |
|
|
|
set_archive_cancel_info(AH, AH->connection); |
|
|
|
|
|
|
|
|
|
|
|
@ -1162,12 +1162,12 @@ parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act, |
|
|
|
Assert(*te != NULL); |
|
|
|
Assert(*te != NULL); |
|
|
|
} |
|
|
|
} |
|
|
|
else |
|
|
|
else |
|
|
|
fatal("unrecognized command received from master: \"%s\"", |
|
|
|
fatal("unrecognized command received from leader: \"%s\"", |
|
|
|
msg); |
|
|
|
msg); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
/*
|
|
|
|
* buildWorkerResponse: format a response string to send to the master. |
|
|
|
* buildWorkerResponse: format a response string to send to the leader. |
|
|
|
* |
|
|
|
* |
|
|
|
* The string is built in the caller-supplied buffer of size buflen. |
|
|
|
* The string is built in the caller-supplied buffer of size buflen. |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
@ -1299,16 +1299,16 @@ IsEveryWorkerIdle(ParallelState *pstate) |
|
|
|
/*
|
|
|
|
/*
|
|
|
|
* Acquire lock on a table to be dumped by a worker process. |
|
|
|
* Acquire lock on a table to be dumped by a worker process. |
|
|
|
* |
|
|
|
* |
|
|
|
* The master process is already holding an ACCESS SHARE lock. Ordinarily |
|
|
|
* The leader process is already holding an ACCESS SHARE lock. Ordinarily |
|
|
|
* it's no problem for a worker to get one too, but if anything else besides |
|
|
|
* it's no problem for a worker to get one too, but if anything else besides |
|
|
|
* pg_dump is running, there's a possible deadlock: |
|
|
|
* pg_dump is running, there's a possible deadlock: |
|
|
|
* |
|
|
|
* |
|
|
|
* 1) Master dumps the schema and locks all tables in ACCESS SHARE mode. |
|
|
|
* 1) Leader dumps the schema and locks all tables in ACCESS SHARE mode. |
|
|
|
* 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted |
|
|
|
* 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted |
|
|
|
* because the master holds a conflicting ACCESS SHARE lock). |
|
|
|
* because the leader holds a conflicting ACCESS SHARE lock). |
|
|
|
* 3) A worker process also requests an ACCESS SHARE lock to read the table. |
|
|
|
* 3) A worker process also requests an ACCESS SHARE lock to read the table. |
|
|
|
* The worker is enqueued behind the ACCESS EXCLUSIVE lock request. |
|
|
|
* The worker is enqueued behind the ACCESS EXCLUSIVE lock request. |
|
|
|
* 4) Now we have a deadlock, since the master is effectively waiting for |
|
|
|
* 4) Now we have a deadlock, since the leader is effectively waiting for |
|
|
|
* the worker. The server cannot detect that, however. |
|
|
|
* the worker. The server cannot detect that, however. |
|
|
|
* |
|
|
|
* |
|
|
|
* To prevent an infinite wait, prior to touching a table in a worker, request |
|
|
|
* To prevent an infinite wait, prior to touching a table in a worker, request |
|
|
|
@ -1349,7 +1349,7 @@ lockTableForWorker(ArchiveHandle *AH, TocEntry *te) |
|
|
|
/*
|
|
|
|
/*
|
|
|
|
* WaitForCommands: main routine for a worker process. |
|
|
|
* WaitForCommands: main routine for a worker process. |
|
|
|
* |
|
|
|
* |
|
|
|
* Read and execute commands from the master until we see EOF on the pipe. |
|
|
|
* Read and execute commands from the leader until we see EOF on the pipe. |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
static void |
|
|
|
static void |
|
|
|
WaitForCommands(ArchiveHandle *AH, int pipefd[2]) |
|
|
|
WaitForCommands(ArchiveHandle *AH, int pipefd[2]) |
|
|
|
@ -1362,7 +1362,7 @@ WaitForCommands(ArchiveHandle *AH, int pipefd[2]) |
|
|
|
|
|
|
|
|
|
|
|
for (;;) |
|
|
|
for (;;) |
|
|
|
{ |
|
|
|
{ |
|
|
|
if (!(command = getMessageFromMaster(pipefd))) |
|
|
|
if (!(command = getMessageFromLeader(pipefd))) |
|
|
|
{ |
|
|
|
{ |
|
|
|
/* EOF, so done */ |
|
|
|
/* EOF, so done */ |
|
|
|
return; |
|
|
|
return; |
|
|
|
@ -1387,10 +1387,10 @@ WaitForCommands(ArchiveHandle *AH, int pipefd[2]) |
|
|
|
else |
|
|
|
else |
|
|
|
Assert(false); |
|
|
|
Assert(false); |
|
|
|
|
|
|
|
|
|
|
|
/* Return status to master */ |
|
|
|
/* Return status to leader */ |
|
|
|
buildWorkerResponse(AH, te, act, status, buf, sizeof(buf)); |
|
|
|
buildWorkerResponse(AH, te, act, status, buf, sizeof(buf)); |
|
|
|
|
|
|
|
|
|
|
|
sendMessageToMaster(pipefd, buf); |
|
|
|
sendMessageToLeader(pipefd, buf); |
|
|
|
|
|
|
|
|
|
|
|
/* command was pg_malloc'd and we are responsible for free()ing it. */ |
|
|
|
/* command was pg_malloc'd and we are responsible for free()ing it. */ |
|
|
|
free(command); |
|
|
|
free(command); |
|
|
|
@ -1464,7 +1464,7 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait) |
|
|
|
* Any received results are passed to the callback specified to |
|
|
|
* Any received results are passed to the callback specified to |
|
|
|
* DispatchJobForTocEntry. |
|
|
|
* DispatchJobForTocEntry. |
|
|
|
* |
|
|
|
* |
|
|
|
* This function is executed in the master process. |
|
|
|
* This function is executed in the leader process. |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
void |
|
|
|
void |
|
|
|
WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode) |
|
|
|
WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode) |
|
|
|
@ -1525,25 +1525,25 @@ WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
/*
|
|
|
|
* Read one command message from the master, blocking if necessary |
|
|
|
* Read one command message from the leader, blocking if necessary |
|
|
|
* until one is available, and return it as a malloc'd string. |
|
|
|
* until one is available, and return it as a malloc'd string. |
|
|
|
* On EOF, return NULL. |
|
|
|
* On EOF, return NULL. |
|
|
|
* |
|
|
|
* |
|
|
|
* This function is executed in worker processes. |
|
|
|
* This function is executed in worker processes. |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
static char * |
|
|
|
static char * |
|
|
|
getMessageFromMaster(int pipefd[2]) |
|
|
|
getMessageFromLeader(int pipefd[2]) |
|
|
|
{ |
|
|
|
{ |
|
|
|
return readMessageFromPipe(pipefd[PIPE_READ]); |
|
|
|
return readMessageFromPipe(pipefd[PIPE_READ]); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
/*
|
|
|
|
* Send a status message to the master. |
|
|
|
* Send a status message to the leader. |
|
|
|
* |
|
|
|
* |
|
|
|
* This function is executed in worker processes. |
|
|
|
* This function is executed in worker processes. |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
static void |
|
|
|
static void |
|
|
|
sendMessageToMaster(int pipefd[2], const char *str) |
|
|
|
sendMessageToLeader(int pipefd[2], const char *str) |
|
|
|
{ |
|
|
|
{ |
|
|
|
int len = strlen(str) + 1; |
|
|
|
int len = strlen(str) + 1; |
|
|
|
|
|
|
|
|
|
|
|
@ -1592,7 +1592,7 @@ select_loop(int maxFd, fd_set *workerset) |
|
|
|
* that's hard to distinguish from the no-data-available case, but for now |
|
|
|
* that's hard to distinguish from the no-data-available case, but for now |
|
|
|
* our one caller is okay with that. |
|
|
|
* our one caller is okay with that. |
|
|
|
* |
|
|
|
* |
|
|
|
* This function is executed in the master process. |
|
|
|
* This function is executed in the leader process. |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
static char * |
|
|
|
static char * |
|
|
|
getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker) |
|
|
|
getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker) |
|
|
|
@ -1657,7 +1657,7 @@ getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker) |
|
|
|
/*
|
|
|
|
/*
|
|
|
|
* Send a command message to the specified worker process. |
|
|
|
* Send a command message to the specified worker process. |
|
|
|
* |
|
|
|
* |
|
|
|
* This function is executed in the master process. |
|
|
|
* This function is executed in the leader process. |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
static void |
|
|
|
static void |
|
|
|
sendMessageToWorker(ParallelState *pstate, int worker, const char *str) |
|
|
|
sendMessageToWorker(ParallelState *pstate, int worker, const char *str) |
|
|
|
@ -1688,7 +1688,7 @@ readMessageFromPipe(int fd) |
|
|
|
/*
|
|
|
|
/*
|
|
|
|
* In theory, if we let piperead() read multiple bytes, it might give us |
|
|
|
* In theory, if we let piperead() read multiple bytes, it might give us |
|
|
|
* back fragments of multiple messages. (That can't actually occur, since |
|
|
|
* back fragments of multiple messages. (That can't actually occur, since |
|
|
|
* neither master nor workers send more than one message without waiting |
|
|
|
* neither leader nor workers send more than one message without waiting |
|
|
|
* for a reply, but we don't wish to assume that here.) For simplicity, |
|
|
|
* for a reply, but we don't wish to assume that here.) For simplicity, |
|
|
|
* read a byte at a time until we get the terminating '\0'. This method |
|
|
|
* read a byte at a time until we get the terminating '\0'. This method |
|
|
|
* is a bit inefficient, but since this is only used for relatively short |
|
|
|
* is a bit inefficient, but since this is only used for relatively short |
|
|
|
|