|
|
|
/*-------------------------------------------------------------------------
|
|
|
|
* launcher.c
|
|
|
|
* PostgreSQL logical replication worker launcher process
|
|
|
|
*
|
|
|
|
* Copyright (c) 2016-2021, PostgreSQL Global Development Group
|
|
|
|
*
|
|
|
|
* IDENTIFICATION
|
|
|
|
* src/backend/replication/logical/launcher.c
|
|
|
|
*
|
|
|
|
* NOTES
|
|
|
|
* This module contains the logical replication worker launcher which
|
|
|
|
* uses the background worker infrastructure to start the logical
|
|
|
|
* replication workers for every enabled subscription.
|
|
|
|
*
|
|
|
|
*-------------------------------------------------------------------------
|
|
|
|
*/
|
|
|
|
|
|
|
|
#include "postgres.h"
|
|
|
|
|
|
|
|
#include "access/heapam.h"
|
|
|
|
#include "access/htup.h"
|
|
|
|
#include "access/htup_details.h"
|
tableam: Add and use scan APIs.
Too allow table accesses to be not directly dependent on heap, several
new abstractions are needed. Specifically:
1) Heap scans need to be generalized into table scans. Do this by
introducing TableScanDesc, which will be the "base class" for
individual AMs. This contains the AM independent fields from
HeapScanDesc.
The previous heap_{beginscan,rescan,endscan} et al. have been
replaced with a table_ version.
There's no direct replacement for heap_getnext(), as that returned
a HeapTuple, which is undesirable for a other AMs. Instead there's
table_scan_getnextslot(). But note that heap_getnext() lives on,
it's still used widely to access catalog tables.
This is achieved by new scan_begin, scan_end, scan_rescan,
scan_getnextslot callbacks.
2) The portion of parallel scans that's shared between backends need
to be able to do so without the user doing per-AM work. To achieve
that new parallelscan_{estimate, initialize, reinitialize}
callbacks are introduced, which operate on a new
ParallelTableScanDesc, which again can be subclassed by AMs.
As it is likely that several AMs are going to be block oriented,
block oriented callbacks that can be shared between such AMs are
provided and used by heap. table_block_parallelscan_{estimate,
intiialize, reinitialize} as callbacks, and
table_block_parallelscan_{nextpage, init} for use in AMs. These
operate on a ParallelBlockTableScanDesc.
3) Index scans need to be able to access tables to return a tuple, and
there needs to be state across individual accesses to the heap to
store state like buffers. That's now handled by introducing a
sort-of-scan IndexFetchTable, which again is intended to be
subclassed by individual AMs (for heap IndexFetchHeap).
The relevant callbacks for an AM are index_fetch_{end, begin,
reset} to create the necessary state, and index_fetch_tuple to
retrieve an indexed tuple. Note that index_fetch_tuple
implementations need to be smarter than just blindly fetching the
tuples for AMs that have optimizations similar to heap's HOT - the
currently alive tuple in the update chain needs to be fetched if
appropriate.
Similar to table_scan_getnextslot(), it's undesirable to continue
to return HeapTuples. Thus index_fetch_heap (might want to rename
that later) now accepts a slot as an argument. Core code doesn't
have a lot of call sites performing index scans without going
through the systable_* API (in contrast to loads of heap_getnext
calls and working directly with HeapTuples).
Index scans now store the result of a search in
IndexScanDesc->xs_heaptid, rather than xs_ctup->t_self. As the
target is not generally a HeapTuple anymore that seems cleaner.
To be able to sensible adapt code to use the above, two further
callbacks have been introduced:
a) slot_callbacks returns a TupleTableSlotOps* suitable for creating
slots capable of holding a tuple of the AMs
type. table_slot_callbacks() and table_slot_create() are based
upon that, but have additional logic to deal with views, foreign
tables, etc.
While this change could have been done separately, nearly all the
call sites that needed to be adapted for the rest of this commit
also would have been needed to be adapted for
table_slot_callbacks(), making separation not worthwhile.
b) tuple_satisfies_snapshot checks whether the tuple in a slot is
currently visible according to a snapshot. That's required as a few
places now don't have a buffer + HeapTuple around, but a
slot (which in heap's case internally has that information).
Additionally a few infrastructure changes were needed:
I) SysScanDesc, as used by systable_{beginscan, getnext} et al. now
internally uses a slot to keep track of tuples. While
systable_getnext() still returns HeapTuples, and will so for the
foreseeable future, the index API (see 1) above) now only deals with
slots.
The remainder, and largest part, of this commit is then adjusting all
scans in postgres to use the new APIs.
Author: Andres Freund, Haribabu Kommi, Alvaro Herrera
Discussion:
https://postgr.es/m/20180703070645.wchpu5muyto5n647@alap3.anarazel.de
https://postgr.es/m/20160812231527.GA690404@alvherre.pgsql
6 years ago
|
|
|
#include "access/tableam.h"
|
|
|
|
#include "access/xact.h"
|
|
|
|
#include "catalog/pg_subscription.h"
|
|
|
|
#include "catalog/pg_subscription_rel.h"
|
|
|
|
#include "funcapi.h"
|
|
|
|
#include "libpq/pqsignal.h"
|
|
|
|
#include "miscadmin.h"
|
|
|
|
#include "pgstat.h"
|
|
|
|
#include "postmaster/bgworker.h"
|
|
|
|
#include "postmaster/fork_process.h"
|
|
|
|
#include "postmaster/interrupt.h"
|
|
|
|
#include "postmaster/postmaster.h"
|
|
|
|
#include "replication/logicallauncher.h"
|
|
|
|
#include "replication/logicalworker.h"
|
|
|
|
#include "replication/slot.h"
|
|
|
|
#include "replication/walreceiver.h"
|
|
|
|
#include "replication/worker_internal.h"
|
|
|
|
#include "storage/ipc.h"
|
|
|
|
#include "storage/proc.h"
|
|
|
|
#include "storage/procarray.h"
|
|
|
|
#include "storage/procsignal.h"
|
|
|
|
#include "tcop/tcopprot.h"
|
|
|
|
#include "utils/memutils.h"
|
|
|
|
#include "utils/pg_lsn.h"
|
|
|
|
#include "utils/ps_status.h"
|
|
|
|
#include "utils/snapmgr.h"
|
|
|
|
#include "utils/timeout.h"
|
|
|
|
|
|
|
|
/* max sleep time between cycles (3min) */
|
|
|
|
#define DEFAULT_NAPTIME_PER_CYCLE 180000L
|
|
|
|
|
|
|
|
int max_logical_replication_workers = 4;
|
|
|
|
int max_sync_workers_per_subscription = 2;
|
|
|
|
|
|
|
|
LogicalRepWorker *MyLogicalRepWorker = NULL;
|
|
|
|
|
|
|
|
typedef struct LogicalRepCtxStruct
|
|
|
|
{
|
|
|
|
/* Supervisor process. */
|
|
|
|
pid_t launcher_pid;
|
|
|
|
|
|
|
|
/* Background workers. */
|
|
|
|
LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER];
|
|
|
|
} LogicalRepCtxStruct;
|
|
|
|
|
|
|
|
LogicalRepCtxStruct *LogicalRepCtx;
|
|
|
|
|
|
|
|
static void ApplyLauncherWakeup(void);
|
|
|
|
static void logicalrep_launcher_onexit(int code, Datum arg);
|
|
|
|
static void logicalrep_worker_onexit(int code, Datum arg);
|
|
|
|
static void logicalrep_worker_detach(void);
|
|
|
|
static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
|
|
|
|
|
|
|
|
static bool on_commit_launcher_wakeup = false;
|
|
|
|
|
|
|
|
Datum pg_stat_get_subscription(PG_FUNCTION_ARGS);
|
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Load the list of subscriptions.
|
|
|
|
*
|
|
|
|
* Only the fields interesting for worker start/stop functions are filled for
|
|
|
|
* each subscription.
|
|
|
|
*/
|
|
|
|
static List *
|
|
|
|
get_subscription_list(void)
|
|
|
|
{
|
|
|
|
List *res = NIL;
|
|
|
|
Relation rel;
|
tableam: Add and use scan APIs.
Too allow table accesses to be not directly dependent on heap, several
new abstractions are needed. Specifically:
1) Heap scans need to be generalized into table scans. Do this by
introducing TableScanDesc, which will be the "base class" for
individual AMs. This contains the AM independent fields from
HeapScanDesc.
The previous heap_{beginscan,rescan,endscan} et al. have been
replaced with a table_ version.
There's no direct replacement for heap_getnext(), as that returned
a HeapTuple, which is undesirable for a other AMs. Instead there's
table_scan_getnextslot(). But note that heap_getnext() lives on,
it's still used widely to access catalog tables.
This is achieved by new scan_begin, scan_end, scan_rescan,
scan_getnextslot callbacks.
2) The portion of parallel scans that's shared between backends need
to be able to do so without the user doing per-AM work. To achieve
that new parallelscan_{estimate, initialize, reinitialize}
callbacks are introduced, which operate on a new
ParallelTableScanDesc, which again can be subclassed by AMs.
As it is likely that several AMs are going to be block oriented,
block oriented callbacks that can be shared between such AMs are
provided and used by heap. table_block_parallelscan_{estimate,
intiialize, reinitialize} as callbacks, and
table_block_parallelscan_{nextpage, init} for use in AMs. These
operate on a ParallelBlockTableScanDesc.
3) Index scans need to be able to access tables to return a tuple, and
there needs to be state across individual accesses to the heap to
store state like buffers. That's now handled by introducing a
sort-of-scan IndexFetchTable, which again is intended to be
subclassed by individual AMs (for heap IndexFetchHeap).
The relevant callbacks for an AM are index_fetch_{end, begin,
reset} to create the necessary state, and index_fetch_tuple to
retrieve an indexed tuple. Note that index_fetch_tuple
implementations need to be smarter than just blindly fetching the
tuples for AMs that have optimizations similar to heap's HOT - the
currently alive tuple in the update chain needs to be fetched if
appropriate.
Similar to table_scan_getnextslot(), it's undesirable to continue
to return HeapTuples. Thus index_fetch_heap (might want to rename
that later) now accepts a slot as an argument. Core code doesn't
have a lot of call sites performing index scans without going
through the systable_* API (in contrast to loads of heap_getnext
calls and working directly with HeapTuples).
Index scans now store the result of a search in
IndexScanDesc->xs_heaptid, rather than xs_ctup->t_self. As the
target is not generally a HeapTuple anymore that seems cleaner.
To be able to sensible adapt code to use the above, two further
callbacks have been introduced:
a) slot_callbacks returns a TupleTableSlotOps* suitable for creating
slots capable of holding a tuple of the AMs
type. table_slot_callbacks() and table_slot_create() are based
upon that, but have additional logic to deal with views, foreign
tables, etc.
While this change could have been done separately, nearly all the
call sites that needed to be adapted for the rest of this commit
also would have been needed to be adapted for
table_slot_callbacks(), making separation not worthwhile.
b) tuple_satisfies_snapshot checks whether the tuple in a slot is
currently visible according to a snapshot. That's required as a few
places now don't have a buffer + HeapTuple around, but a
slot (which in heap's case internally has that information).
Additionally a few infrastructure changes were needed:
I) SysScanDesc, as used by systable_{beginscan, getnext} et al. now
internally uses a slot to keep track of tuples. While
systable_getnext() still returns HeapTuples, and will so for the
foreseeable future, the index API (see 1) above) now only deals with
slots.
The remainder, and largest part, of this commit is then adjusting all
scans in postgres to use the new APIs.
Author: Andres Freund, Haribabu Kommi, Alvaro Herrera
Discussion:
https://postgr.es/m/20180703070645.wchpu5muyto5n647@alap3.anarazel.de
https://postgr.es/m/20160812231527.GA690404@alvherre.pgsql
6 years ago
|
|
|
TableScanDesc scan;
|
|
|
|
HeapTuple tup;
|
|
|
|
MemoryContext resultcxt;
|
|
|
|
|
|
|
|
/* This is the context that we will allocate our output data in */
|
|
|
|
resultcxt = CurrentMemoryContext;
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Start a transaction so we can access pg_database, and get a snapshot.
|
|
|
|
* We don't have a use for the snapshot itself, but we're interested in
|
|
|
|
* the secondary effect that it sets RecentGlobalXmin. (This is critical
|
|
|
|
* for anything that reads heap pages, because HOT may decide to prune
|
|
|
|
* them even if the process doesn't attempt to modify any tuples.)
|
snapshot scalability: Don't compute global horizons while building snapshots.
To make GetSnapshotData() more scalable, it cannot not look at at each proc's
xmin: While snapshot contents do not need to change whenever a read-only
transaction commits or a snapshot is released, a proc's xmin is modified in
those cases. The frequency of xmin modifications leads to, particularly on
higher core count systems, many cache misses inside GetSnapshotData(), despite
the data underlying a snapshot not changing. That is the most
significant source of GetSnapshotData() scaling poorly on larger systems.
Without accessing xmins, GetSnapshotData() cannot calculate accurate horizons /
thresholds as it has so far. But we don't really have to: The horizons don't
actually change that much between GetSnapshotData() calls. Nor are the horizons
actually used every time a snapshot is built.
The trick this commit introduces is to delay computation of accurate horizons
until there use and using horizon boundaries to determine whether accurate
horizons need to be computed.
The use of RecentGlobal[Data]Xmin to decide whether a row version could be
removed has been replaces with new GlobalVisTest* functions. These use two
thresholds to determine whether a row can be pruned:
1) definitely_needed, indicating that rows deleted by XIDs >= definitely_needed
are definitely still visible.
2) maybe_needed, indicating that rows deleted by XIDs < maybe_needed can
definitely be removed
GetSnapshotData() updates definitely_needed to be the xmin of the computed
snapshot.
When testing whether a row can be removed (with GlobalVisTestIsRemovableXid())
and the tested XID falls in between the two (i.e. XID >= maybe_needed && XID <
definitely_needed) the boundaries can be recomputed to be more accurate. As it
is not cheap to compute accurate boundaries, we limit the number of times that
happens in short succession. As the boundaries used by
GlobalVisTestIsRemovableXid() are never reset (with maybe_needed updated by
GetSnapshotData()), it is likely that further test can benefit from an earlier
computation of accurate horizons.
To avoid regressing performance when old_snapshot_threshold is set (as that
requires an accurate horizon to be computed), heap_page_prune_opt() doesn't
unconditionally call TransactionIdLimitedForOldSnapshots() anymore. Both the
computation of the limited horizon, and the triggering of errors (with
SetOldSnapshotThresholdTimestamp()) is now only done when necessary to remove
tuples.
This commit just removes the accesses to PGXACT->xmin from
GetSnapshotData(), but other members of PGXACT residing in the same
cache line are accessed. Therefore this in itself does not result in a
significant improvement. Subsequent commits will take advantage of the
fact that GetSnapshotData() now does not need to access xmins anymore.
Note: This contains a workaround in heap_page_prune_opt() to keep the
snapshot_too_old tests working. While that workaround is ugly, the tests
currently are not meaningful, and it seems best to address them separately.
Author: Andres Freund <andres@anarazel.de>
Reviewed-By: Robert Haas <robertmhaas@gmail.com>
Reviewed-By: Thomas Munro <thomas.munro@gmail.com>
Reviewed-By: David Rowley <dgrowleyml@gmail.com>
Discussion: https://postgr.es/m/20200301083601.ews6hz5dduc3w2se@alap3.anarazel.de
5 years ago
|
|
|
*
|
|
|
|
* FIXME: This comment is inaccurate / the code buggy. A snapshot that is
|
|
|
|
* not pushed/active does not reliably prevent HOT pruning (->xmin could
|
|
|
|
* e.g. be cleared when cache invalidations are processed).
|
|
|
|
*/
|
|
|
|
StartTransactionCommand();
|
|
|
|
(void) GetTransactionSnapshot();
|
|
|
|
|
|
|
|
rel = table_open(SubscriptionRelationId, AccessShareLock);
|
tableam: Add and use scan APIs.
Too allow table accesses to be not directly dependent on heap, several
new abstractions are needed. Specifically:
1) Heap scans need to be generalized into table scans. Do this by
introducing TableScanDesc, which will be the "base class" for
individual AMs. This contains the AM independent fields from
HeapScanDesc.
The previous heap_{beginscan,rescan,endscan} et al. have been
replaced with a table_ version.
There's no direct replacement for heap_getnext(), as that returned
a HeapTuple, which is undesirable for a other AMs. Instead there's
table_scan_getnextslot(). But note that heap_getnext() lives on,
it's still used widely to access catalog tables.
This is achieved by new scan_begin, scan_end, scan_rescan,
scan_getnextslot callbacks.
2) The portion of parallel scans that's shared between backends need
to be able to do so without the user doing per-AM work. To achieve
that new parallelscan_{estimate, initialize, reinitialize}
callbacks are introduced, which operate on a new
ParallelTableScanDesc, which again can be subclassed by AMs.
As it is likely that several AMs are going to be block oriented,
block oriented callbacks that can be shared between such AMs are
provided and used by heap. table_block_parallelscan_{estimate,
intiialize, reinitialize} as callbacks, and
table_block_parallelscan_{nextpage, init} for use in AMs. These
operate on a ParallelBlockTableScanDesc.
3) Index scans need to be able to access tables to return a tuple, and
there needs to be state across individual accesses to the heap to
store state like buffers. That's now handled by introducing a
sort-of-scan IndexFetchTable, which again is intended to be
subclassed by individual AMs (for heap IndexFetchHeap).
The relevant callbacks for an AM are index_fetch_{end, begin,
reset} to create the necessary state, and index_fetch_tuple to
retrieve an indexed tuple. Note that index_fetch_tuple
implementations need to be smarter than just blindly fetching the
tuples for AMs that have optimizations similar to heap's HOT - the
currently alive tuple in the update chain needs to be fetched if
appropriate.
Similar to table_scan_getnextslot(), it's undesirable to continue
to return HeapTuples. Thus index_fetch_heap (might want to rename
that later) now accepts a slot as an argument. Core code doesn't
have a lot of call sites performing index scans without going
through the systable_* API (in contrast to loads of heap_getnext
calls and working directly with HeapTuples).
Index scans now store the result of a search in
IndexScanDesc->xs_heaptid, rather than xs_ctup->t_self. As the
target is not generally a HeapTuple anymore that seems cleaner.
To be able to sensible adapt code to use the above, two further
callbacks have been introduced:
a) slot_callbacks returns a TupleTableSlotOps* suitable for creating
slots capable of holding a tuple of the AMs
type. table_slot_callbacks() and table_slot_create() are based
upon that, but have additional logic to deal with views, foreign
tables, etc.
While this change could have been done separately, nearly all the
call sites that needed to be adapted for the rest of this commit
also would have been needed to be adapted for
table_slot_callbacks(), making separation not worthwhile.
b) tuple_satisfies_snapshot checks whether the tuple in a slot is
currently visible according to a snapshot. That's required as a few
places now don't have a buffer + HeapTuple around, but a
slot (which in heap's case internally has that information).
Additionally a few infrastructure changes were needed:
I) SysScanDesc, as used by systable_{beginscan, getnext} et al. now
internally uses a slot to keep track of tuples. While
systable_getnext() still returns HeapTuples, and will so for the
foreseeable future, the index API (see 1) above) now only deals with
slots.
The remainder, and largest part, of this commit is then adjusting all
scans in postgres to use the new APIs.
Author: Andres Freund, Haribabu Kommi, Alvaro Herrera
Discussion:
https://postgr.es/m/20180703070645.wchpu5muyto5n647@alap3.anarazel.de
https://postgr.es/m/20160812231527.GA690404@alvherre.pgsql
6 years ago
|
|
|
scan = table_beginscan_catalog(rel, 0, NULL);
|
|
|
|
|
|
|
|
while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
|
|
|
|
{
|
|
|
|
Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
|
|
|
|
Subscription *sub;
|
|
|
|
MemoryContext oldcxt;
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Allocate our results in the caller's context, not the
|
|
|
|
* transaction's. We do this inside the loop, and restore the original
|
|
|
|
* context at the end, so that leaky things like heap_getnext() are
|
|
|
|
* not called in a potentially long-lived context.
|
|
|
|
*/
|
|
|
|
oldcxt = MemoryContextSwitchTo(resultcxt);
|
|
|
|
|
|
|
|
sub = (Subscription *) palloc0(sizeof(Subscription));
|
Remove WITH OIDS support, change oid catalog column visibility.
Previously tables declared WITH OIDS, including a significant fraction
of the catalog tables, stored the oid column not as a normal column,
but as part of the tuple header.
This special column was not shown by default, which was somewhat odd,
as it's often (consider e.g. pg_class.oid) one of the more important
parts of a row. Neither pg_dump nor COPY included the contents of the
oid column by default.
The fact that the oid column was not an ordinary column necessitated a
significant amount of special case code to support oid columns. That
already was painful for the existing, but upcoming work aiming to make
table storage pluggable, would have required expanding and duplicating
that "specialness" significantly.
WITH OIDS has been deprecated since 2005 (commit ff02d0a05280e0).
Remove it.
Removing includes:
- CREATE TABLE and ALTER TABLE syntax for declaring the table to be
WITH OIDS has been removed (WITH (oids[ = true]) will error out)
- pg_dump does not support dumping tables declared WITH OIDS and will
issue a warning when dumping one (and ignore the oid column).
- restoring an pg_dump archive with pg_restore will warn when
restoring a table with oid contents (and ignore the oid column)
- COPY will refuse to load binary dump that includes oids.
- pg_upgrade will error out when encountering tables declared WITH
OIDS, they have to be altered to remove the oid column first.
- Functionality to access the oid of the last inserted row (like
plpgsql's RESULT_OID, spi's SPI_lastoid, ...) has been removed.
The syntax for declaring a table WITHOUT OIDS (or WITH (oids = false)
for CREATE TABLE) is still supported. While that requires a bit of
support code, it seems unnecessary to break applications / dumps that
do not use oids, and are explicit about not using them.
The biggest user of WITH OID columns was postgres' catalog. This
commit changes all 'magic' oid columns to be columns that are normally
declared and stored. To reduce unnecessary query breakage all the
newly added columns are still named 'oid', even if a table's column
naming scheme would indicate 'reloid' or such. This obviously
requires adapting a lot code, mostly replacing oid access via
HeapTupleGetOid() with access to the underlying Form_pg_*->oid column.
The bootstrap process now assigns oids for all oid columns in
genbki.pl that do not have an explicit value (starting at the largest
oid previously used), only oids assigned later by oids will be above
FirstBootstrapObjectId. As the oid column now is a normal column the
special bootstrap syntax for oids has been removed.
Oids are not automatically assigned during insertion anymore, all
backend code explicitly assigns oids with GetNewOidWithIndex(). For
the rare case that insertions into the catalog via SQL are called for
the new pg_nextoid() function can be used (which only works on catalog
tables).
The fact that oid columns on system tables are now normal columns
means that they will be included in the set of columns expanded
by * (i.e. SELECT * FROM pg_class will now include the table's oid,
previously it did not). It'd not technically be hard to hide oid
column by default, but that'd mean confusing behavior would either
have to be carried forward forever, or it'd cause breakage down the
line.
While it's not unlikely that further adjustments are needed, the
scope/invasiveness of the patch makes it worthwhile to get merge this
now. It's painful to maintain externally, too complicated to commit
after the code code freeze, and a dependency of a number of other
patches.
Catversion bump, for obvious reasons.
Author: Andres Freund, with contributions by John Naylor
Discussion: https://postgr.es/m/20180930034810.ywp2c7awz7opzcfr@alap3.anarazel.de
7 years ago
|
|
|
sub->oid = subform->oid;
|
|
|
|
sub->dbid = subform->subdbid;
|
|
|
|
sub->owner = subform->subowner;
|
|
|
|
sub->enabled = subform->subenabled;
|
|
|
|
sub->name = pstrdup(NameStr(subform->subname));
|
|
|
|
/* We don't fill fields we are not interested in. */
|
|
|
|
|
|
|
|
res = lappend(res, sub);
|
|
|
|
MemoryContextSwitchTo(oldcxt);
|
|
|
|
}
|
|
|
|
|
tableam: Add and use scan APIs.
Too allow table accesses to be not directly dependent on heap, several
new abstractions are needed. Specifically:
1) Heap scans need to be generalized into table scans. Do this by
introducing TableScanDesc, which will be the "base class" for
individual AMs. This contains the AM independent fields from
HeapScanDesc.
The previous heap_{beginscan,rescan,endscan} et al. have been
replaced with a table_ version.
There's no direct replacement for heap_getnext(), as that returned
a HeapTuple, which is undesirable for a other AMs. Instead there's
table_scan_getnextslot(). But note that heap_getnext() lives on,
it's still used widely to access catalog tables.
This is achieved by new scan_begin, scan_end, scan_rescan,
scan_getnextslot callbacks.
2) The portion of parallel scans that's shared between backends need
to be able to do so without the user doing per-AM work. To achieve
that new parallelscan_{estimate, initialize, reinitialize}
callbacks are introduced, which operate on a new
ParallelTableScanDesc, which again can be subclassed by AMs.
As it is likely that several AMs are going to be block oriented,
block oriented callbacks that can be shared between such AMs are
provided and used by heap. table_block_parallelscan_{estimate,
intiialize, reinitialize} as callbacks, and
table_block_parallelscan_{nextpage, init} for use in AMs. These
operate on a ParallelBlockTableScanDesc.
3) Index scans need to be able to access tables to return a tuple, and
there needs to be state across individual accesses to the heap to
store state like buffers. That's now handled by introducing a
sort-of-scan IndexFetchTable, which again is intended to be
subclassed by individual AMs (for heap IndexFetchHeap).
The relevant callbacks for an AM are index_fetch_{end, begin,
reset} to create the necessary state, and index_fetch_tuple to
retrieve an indexed tuple. Note that index_fetch_tuple
implementations need to be smarter than just blindly fetching the
tuples for AMs that have optimizations similar to heap's HOT - the
currently alive tuple in the update chain needs to be fetched if
appropriate.
Similar to table_scan_getnextslot(), it's undesirable to continue
to return HeapTuples. Thus index_fetch_heap (might want to rename
that later) now accepts a slot as an argument. Core code doesn't
have a lot of call sites performing index scans without going
through the systable_* API (in contrast to loads of heap_getnext
calls and working directly with HeapTuples).
Index scans now store the result of a search in
IndexScanDesc->xs_heaptid, rather than xs_ctup->t_self. As the
target is not generally a HeapTuple anymore that seems cleaner.
To be able to sensible adapt code to use the above, two further
callbacks have been introduced:
a) slot_callbacks returns a TupleTableSlotOps* suitable for creating
slots capable of holding a tuple of the AMs
type. table_slot_callbacks() and table_slot_create() are based
upon that, but have additional logic to deal with views, foreign
tables, etc.
While this change could have been done separately, nearly all the
call sites that needed to be adapted for the rest of this commit
also would have been needed to be adapted for
table_slot_callbacks(), making separation not worthwhile.
b) tuple_satisfies_snapshot checks whether the tuple in a slot is
currently visible according to a snapshot. That's required as a few
places now don't have a buffer + HeapTuple around, but a
slot (which in heap's case internally has that information).
Additionally a few infrastructure changes were needed:
I) SysScanDesc, as used by systable_{beginscan, getnext} et al. now
internally uses a slot to keep track of tuples. While
systable_getnext() still returns HeapTuples, and will so for the
foreseeable future, the index API (see 1) above) now only deals with
slots.
The remainder, and largest part, of this commit is then adjusting all
scans in postgres to use the new APIs.
Author: Andres Freund, Haribabu Kommi, Alvaro Herrera
Discussion:
https://postgr.es/m/20180703070645.wchpu5muyto5n647@alap3.anarazel.de
https://postgr.es/m/20160812231527.GA690404@alvherre.pgsql
6 years ago
|
|
|
table_endscan(scan);
|
|
|
|
table_close(rel, AccessShareLock);
|
|
|
|
|
|
|
|
CommitTransactionCommand();
|
|
|
|
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Wait for a background worker to start up and attach to the shmem context.
|
|
|
|
*
|
|
|
|
* This is only needed for cleaning up the shared memory in case the worker
|
|
|
|
* fails to attach.
|
|
|
|
*/
|
|
|
|
static void
|
|
|
|
WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
|
|
|
|
uint16 generation,
|
|
|
|
BackgroundWorkerHandle *handle)
|
|
|
|
{
|
|
|
|
BgwHandleStatus status;
|
|
|
|
int rc;
|
|
|
|
|
|
|
|
for (;;)
|
|
|
|
{
|
|
|
|
pid_t pid;
|
|
|
|
|
|
|
|
CHECK_FOR_INTERRUPTS();
|
|
|
|
|
|
|
|
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
|
|
|
|
|
|
|
|
/* Worker either died or has started; no need to do anything. */
|
|
|
|
if (!worker->in_use || worker->proc)
|
|
|
|
{
|
|
|
|
LWLockRelease(LogicalRepWorkerLock);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
LWLockRelease(LogicalRepWorkerLock);
|
|
|
|
|
|
|
|
/* Check if worker has died before attaching, and clean up after it. */
|
|
|
|
status = GetBackgroundWorkerPid(handle, &pid);
|
|
|
|
|
|
|
|
if (status == BGWH_STOPPED)
|
|
|
|
{
|
|
|
|
LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
|
|
|
|
/* Ensure that this was indeed the worker we waited for. */
|
|
|
|
if (generation == worker->generation)
|
|
|
|
logicalrep_worker_cleanup(worker);
|
|
|
|
LWLockRelease(LogicalRepWorkerLock);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* We need timeout because we generally don't get notified via latch
|
|
|
|
* about the worker attach. But we don't expect to have to wait long.
|
|
|
|
*/
|
|
|
|
rc = WaitLatch(MyLatch,
|
Add WL_EXIT_ON_PM_DEATH pseudo-event.
Users of the WaitEventSet and WaitLatch() APIs can now choose between
asking for WL_POSTMASTER_DEATH and then handling it explicitly, or asking
for WL_EXIT_ON_PM_DEATH to trigger immediate exit on postmaster death.
This reduces code duplication, since almost all callers want the latter.
Repair all code that was previously ignoring postmaster death completely,
or requesting the event but ignoring it, or requesting the event but then
doing an unconditional PostmasterIsAlive() call every time through its
event loop (which is an expensive syscall on platforms for which we don't
have USE_POSTMASTER_DEATH_SIGNAL support).
Assert that callers of WaitLatchXXX() under the postmaster remember to
ask for either WL_POSTMASTER_DEATH or WL_EXIT_ON_PM_DEATH, to prevent
future bugs.
The only process that doesn't handle postmaster death is syslogger. It
waits until all backends holding the write end of the syslog pipe
(including the postmaster) have closed it by exiting, to be sure to
capture any parting messages. By using the WaitEventSet API directly
it avoids the new assertion, and as a by-product it may be slightly
more efficient on platforms that have epoll().
Author: Thomas Munro
Reviewed-by: Kyotaro Horiguchi, Heikki Linnakangas, Tom Lane
Discussion: https://postgr.es/m/CAEepm%3D1TCviRykkUb69ppWLr_V697rzd1j3eZsRMmbXvETfqbQ%40mail.gmail.com,
https://postgr.es/m/CAEepm=2LqHzizbe7muD7-2yHUbTOoF7Q+qkSD5Q41kuhttRTwA@mail.gmail.com
7 years ago
|
|
|
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
|
|
|
|
10L, WAIT_EVENT_BGWORKER_STARTUP);
|
|
|
|
|
|
|
|
if (rc & WL_LATCH_SET)
|
|
|
|
{
|
|
|
|
ResetLatch(MyLatch);
|
|
|
|
CHECK_FOR_INTERRUPTS();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Walks the workers array and searches for one that matches given
|
|
|
|
* subscription id and relid.
|
|
|
|
*/
|
|
|
|
LogicalRepWorker *
|
|
|
|
logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
|
|
|
|
{
|
|
|
|
int i;
|
|
|
|
LogicalRepWorker *res = NULL;
|
|
|
|
|
|
|
|
Assert(LWLockHeldByMe(LogicalRepWorkerLock));
|
|
|
|
|
|
|
|
/* Search for attached worker for a given subscription id. */
|
|
|
|
for (i = 0; i < max_logical_replication_workers; i++)
|
|
|
|
{
|
|
|
|
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
|
|
|
|
|
|
|
|
if (w->in_use && w->subid == subid && w->relid == relid &&
|
|
|
|
(!only_running || w->proc))
|
|
|
|
{
|
|
|
|
res = w;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Similar to logicalrep_worker_find(), but returns list of all workers for
|
|
|
|
* the subscription, instead just one.
|
|
|
|
*/
|
|
|
|
List *
|
|
|
|
logicalrep_workers_find(Oid subid, bool only_running)
|
|
|
|
{
|
|
|
|
int i;
|
|
|
|
List *res = NIL;
|
|
|
|
|
|
|
|
Assert(LWLockHeldByMe(LogicalRepWorkerLock));
|
|
|
|
|
|
|
|
/* Search for attached worker for a given subscription id. */
|
|
|
|
for (i = 0; i < max_logical_replication_workers; i++)
|
|
|
|
{
|
|
|
|
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
|
|
|
|
|
|
|
|
if (w->in_use && w->subid == subid && (!only_running || w->proc))
|
|
|
|
res = lappend(res, w);
|
|
|
|
}
|
|
|
|
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Start new apply background worker, if possible.
|
|
|
|
*/
|
|
|
|
void
|
|
|
|
logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
|
|
|
|
Oid relid)
|
|
|
|
{
|
|
|
|
BackgroundWorker bgw;
|
|
|
|
BackgroundWorkerHandle *bgw_handle;
|
|
|
|
uint16 generation;
|
|
|
|
int i;
|
|
|
|
int slot = 0;
|
|
|
|
LogicalRepWorker *worker = NULL;
|
|
|
|
int nsyncworkers;
|
|
|
|
TimestampTz now;
|
|
|
|
|
|
|
|
ereport(DEBUG1,
|
|
|
|
(errmsg_internal("starting logical replication worker for subscription \"%s\"",
|
|
|
|
subname)));
|
|
|
|
|
|
|
|
/* Report this after the initial starting message for consistency. */
|
|
|
|
if (max_replication_slots == 0)
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
|
|
|
|
errmsg("cannot start logical replication workers when max_replication_slots = 0")));
|
|
|
|
|
|
|
|
/*
|
|
|
|
* We need to do the modification of the shared memory under lock so that
|
|
|
|
* we have consistent view.
|
|
|
|
*/
|
|
|
|
LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
|
|
|
|
|
|
|
|
retry:
|
|
|
|
/* Find unused worker slot. */
|
|
|
|
for (i = 0; i < max_logical_replication_workers; i++)
|
|
|
|
{
|
|
|
|
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
|
|
|
|
|
|
|
|
if (!w->in_use)
|
|
|
|
{
|
|
|
|
worker = w;
|
|
|
|
slot = i;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
nsyncworkers = logicalrep_sync_worker_count(subid);
|
|
|
|
|
|
|
|
now = GetCurrentTimestamp();
|
|
|
|
|
|
|
|
/*
|
|
|
|
* If we didn't find a free slot, try to do garbage collection. The
|
|
|
|
* reason we do this is because if some worker failed to start up and its
|
|
|
|
* parent has crashed while waiting, the in_use state was never cleared.
|
|
|
|
*/
|
|
|
|
if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
|
|
|
|
{
|
|
|
|
bool did_cleanup = false;
|
|
|
|
|
|
|
|
for (i = 0; i < max_logical_replication_workers; i++)
|
|
|
|
{
|
|
|
|
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
|
|
|
|
|
|
|
|
/*
|
|
|
|
* If the worker was marked in use but didn't manage to attach in
|
|
|
|
* time, clean it up.
|
|
|
|
*/
|
|
|
|
if (w->in_use && !w->proc &&
|
|
|
|
TimestampDifferenceExceeds(w->launch_time, now,
|
|
|
|
wal_receiver_timeout))
|
|
|
|
{
|
|
|
|
elog(WARNING,
|
|
|
|
"logical replication worker for subscription %u took too long to start; canceled",
|
|
|
|
w->subid);
|
|
|
|
|
|
|
|
logicalrep_worker_cleanup(w);
|
|
|
|
did_cleanup = true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (did_cleanup)
|
|
|
|
goto retry;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* If we reached the sync worker limit per subscription, just exit
|
|
|
|
* silently as we might get here because of an otherwise harmless race
|
|
|
|
* condition.
|
|
|
|
*/
|
|
|
|
if (nsyncworkers >= max_sync_workers_per_subscription)
|
|
|
|
{
|
|
|
|
LWLockRelease(LogicalRepWorkerLock);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* However if there are no more free worker slots, inform user about it
|
|
|
|
* before exiting.
|
|
|
|
*/
|
|
|
|
if (worker == NULL)
|
|
|
|
{
|
|
|
|
LWLockRelease(LogicalRepWorkerLock);
|
|
|
|
ereport(WARNING,
|
|
|
|
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
|
|
|
|
errmsg("out of logical replication worker slots"),
|
|
|
|
errhint("You might need to increase max_logical_replication_workers.")));
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Prepare the worker slot. */
|
|
|
|
worker->launch_time = now;
|
|
|
|
worker->in_use = true;
|
|
|
|
worker->generation++;
|
|
|
|
worker->proc = NULL;
|
|
|
|
worker->dbid = dbid;
|
|
|
|
worker->userid = userid;
|
|
|
|
worker->subid = subid;
|
|
|
|
worker->relid = relid;
|
|
|
|
worker->relstate = SUBREL_STATE_UNKNOWN;
|
|
|
|
worker->relstate_lsn = InvalidXLogRecPtr;
|
|
|
|
worker->last_lsn = InvalidXLogRecPtr;
|
|
|
|
TIMESTAMP_NOBEGIN(worker->last_send_time);
|
|
|
|
TIMESTAMP_NOBEGIN(worker->last_recv_time);
|
|
|
|
worker->reply_lsn = InvalidXLogRecPtr;
|
|
|
|
TIMESTAMP_NOBEGIN(worker->reply_time);
|
|
|
|
|
|
|
|
/* Before releasing lock, remember generation for future identification. */
|
|
|
|
generation = worker->generation;
|
|
|
|
|
|
|
|
LWLockRelease(LogicalRepWorkerLock);
|
|
|
|
|
|
|
|
/* Register the new dynamic worker. */
|
|
|
|
memset(&bgw, 0, sizeof(bgw));
|
|
|
|
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
|
|
|
|
BGWORKER_BACKEND_DATABASE_CONNECTION;
|
|
|
|
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
|
|
|
|
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
|
|
|
|
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
|
|
|
|
if (OidIsValid(relid))
|
|
|
|
snprintf(bgw.bgw_name, BGW_MAXLEN,
|
|
|
|
"logical replication worker for subscription %u sync %u", subid, relid);
|
|
|
|
else
|
|
|
|
snprintf(bgw.bgw_name, BGW_MAXLEN,
|
|
|
|
"logical replication worker for subscription %u", subid);
|
|
|
|
snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker");
|
|
|
|
|
|
|
|
bgw.bgw_restart_time = BGW_NEVER_RESTART;
|
|
|
|
bgw.bgw_notify_pid = MyProcPid;
|
|
|
|
bgw.bgw_main_arg = Int32GetDatum(slot);
|
|
|
|
|
|
|
|
if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
|
|
|
|
{
|
|
|
|
/* Failed to start worker, so clean up the worker slot. */
|
|
|
|
LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
|
|
|
|
Assert(generation == worker->generation);
|
|
|
|
logicalrep_worker_cleanup(worker);
|
|
|
|
LWLockRelease(LogicalRepWorkerLock);
|
|
|
|
|
|
|
|
ereport(WARNING,
|
|
|
|
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
|
|
|
|
errmsg("out of background worker slots"),
|
Phase 3 of pgindent updates.
Don't move parenthesized lines to the left, even if that means they
flow past the right margin.
By default, BSD indent lines up statement continuation lines that are
within parentheses so that they start just to the right of the preceding
left parenthesis. However, traditionally, if that resulted in the
continuation line extending to the right of the desired right margin,
then indent would push it left just far enough to not overrun the margin,
if it could do so without making the continuation line start to the left of
the current statement indent. That makes for a weird mix of indentations
unless one has been completely rigid about never violating the 80-column
limit.
This behavior has been pretty universally panned by Postgres developers.
Hence, disable it with indent's new -lpl switch, so that parenthesized
lines are always lined up with the preceding left paren.
This patch is much less interesting than the first round of indent
changes, but also bulkier, so I thought it best to separate the effects.
Discussion: https://postgr.es/m/E1dAmxK-0006EE-1r@gemulon.postgresql.org
Discussion: https://postgr.es/m/30527.1495162840@sss.pgh.pa.us
8 years ago
|
|
|
errhint("You might need to increase max_worker_processes.")));
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Now wait until it attaches. */
|
|
|
|
WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Stop the logical replication worker for subid/relid, if any, and wait until
|
|
|
|
* it detaches from the slot.
|
|
|
|
*/
|
|
|
|
void
|
|
|
|
logicalrep_worker_stop(Oid subid, Oid relid)
|
|
|
|
{
|
|
|
|
LogicalRepWorker *worker;
|
|
|
|
uint16 generation;
|
|
|
|
|
|
|
|
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
|
|
|
|
|
|
|
|
worker = logicalrep_worker_find(subid, relid, false);
|
|
|
|
|
|
|
|
/* No worker, nothing to do. */
|
|
|
|
if (!worker)
|
|
|
|
{
|
|
|
|
LWLockRelease(LogicalRepWorkerLock);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Remember which generation was our worker so we can check if what we see
|
|
|
|
* is still the same one.
|
|
|
|
*/
|
|
|
|
generation = worker->generation;
|
|
|
|
|
|
|
|
/*
|
|
|
|
* If we found a worker but it does not have proc set then it is still
|
|
|
|
* starting up; wait for it to finish starting and then kill it.
|
|
|
|
*/
|
|
|
|
while (worker->in_use && !worker->proc)
|
|
|
|
{
|
|
|
|
int rc;
|
|
|
|
|
|
|
|
LWLockRelease(LogicalRepWorkerLock);
|
|
|
|
|
|
|
|
/* Wait a bit --- we don't expect to have to wait long. */
|
|
|
|
rc = WaitLatch(MyLatch,
|
Add WL_EXIT_ON_PM_DEATH pseudo-event.
Users of the WaitEventSet and WaitLatch() APIs can now choose between
asking for WL_POSTMASTER_DEATH and then handling it explicitly, or asking
for WL_EXIT_ON_PM_DEATH to trigger immediate exit on postmaster death.
This reduces code duplication, since almost all callers want the latter.
Repair all code that was previously ignoring postmaster death completely,
or requesting the event but ignoring it, or requesting the event but then
doing an unconditional PostmasterIsAlive() call every time through its
event loop (which is an expensive syscall on platforms for which we don't
have USE_POSTMASTER_DEATH_SIGNAL support).
Assert that callers of WaitLatchXXX() under the postmaster remember to
ask for either WL_POSTMASTER_DEATH or WL_EXIT_ON_PM_DEATH, to prevent
future bugs.
The only process that doesn't handle postmaster death is syslogger. It
waits until all backends holding the write end of the syslog pipe
(including the postmaster) have closed it by exiting, to be sure to
capture any parting messages. By using the WaitEventSet API directly
it avoids the new assertion, and as a by-product it may be slightly
more efficient on platforms that have epoll().
Author: Thomas Munro
Reviewed-by: Kyotaro Horiguchi, Heikki Linnakangas, Tom Lane
Discussion: https://postgr.es/m/CAEepm%3D1TCviRykkUb69ppWLr_V697rzd1j3eZsRMmbXvETfqbQ%40mail.gmail.com,
https://postgr.es/m/CAEepm=2LqHzizbe7muD7-2yHUbTOoF7Q+qkSD5Q41kuhttRTwA@mail.gmail.com
7 years ago
|
|
|
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
|
|
|
|
10L, WAIT_EVENT_BGWORKER_STARTUP);
|
|
|
|
|
|
|
|
if (rc & WL_LATCH_SET)
|
|
|
|
{
|
|
|
|
ResetLatch(MyLatch);
|
|
|
|
CHECK_FOR_INTERRUPTS();
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Recheck worker status. */
|
|
|
|
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Check whether the worker slot is no longer used, which would mean
|
|
|
|
* that the worker has exited, or whether the worker generation is
|
|
|
|
* different, meaning that a different worker has taken the slot.
|
|
|
|
*/
|
|
|
|
if (!worker->in_use || worker->generation != generation)
|
|
|
|
{
|
|
|
|
LWLockRelease(LogicalRepWorkerLock);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Worker has assigned proc, so it has started. */
|
|
|
|
if (worker->proc)
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Now terminate the worker ... */
|
|
|
|
kill(worker->proc->pid, SIGTERM);
|
|
|
|
|
|
|
|
/* ... and wait for it to die. */
|
|
|
|
for (;;)
|
|
|
|
{
|
|
|
|
int rc;
|
|
|
|
|
|
|
|
/* is it gone? */
|
|
|
|
if (!worker->proc || worker->generation != generation)
|
|
|
|
break;
|
|
|
|
|
|
|
|
LWLockRelease(LogicalRepWorkerLock);
|
|
|
|
|
|
|
|
/* Wait a bit --- we don't expect to have to wait long. */
|
|
|
|
rc = WaitLatch(MyLatch,
|
Add WL_EXIT_ON_PM_DEATH pseudo-event.
Users of the WaitEventSet and WaitLatch() APIs can now choose between
asking for WL_POSTMASTER_DEATH and then handling it explicitly, or asking
for WL_EXIT_ON_PM_DEATH to trigger immediate exit on postmaster death.
This reduces code duplication, since almost all callers want the latter.
Repair all code that was previously ignoring postmaster death completely,
or requesting the event but ignoring it, or requesting the event but then
doing an unconditional PostmasterIsAlive() call every time through its
event loop (which is an expensive syscall on platforms for which we don't
have USE_POSTMASTER_DEATH_SIGNAL support).
Assert that callers of WaitLatchXXX() under the postmaster remember to
ask for either WL_POSTMASTER_DEATH or WL_EXIT_ON_PM_DEATH, to prevent
future bugs.
The only process that doesn't handle postmaster death is syslogger. It
waits until all backends holding the write end of the syslog pipe
(including the postmaster) have closed it by exiting, to be sure to
capture any parting messages. By using the WaitEventSet API directly
it avoids the new assertion, and as a by-product it may be slightly
more efficient on platforms that have epoll().
Author: Thomas Munro
Reviewed-by: Kyotaro Horiguchi, Heikki Linnakangas, Tom Lane
Discussion: https://postgr.es/m/CAEepm%3D1TCviRykkUb69ppWLr_V697rzd1j3eZsRMmbXvETfqbQ%40mail.gmail.com,
https://postgr.es/m/CAEepm=2LqHzizbe7muD7-2yHUbTOoF7Q+qkSD5Q41kuhttRTwA@mail.gmail.com
7 years ago
|
|
|
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
|
|
|
|
10L, WAIT_EVENT_BGWORKER_SHUTDOWN);
|
|
|
|
|
|
|
|
if (rc & WL_LATCH_SET)
|
|
|
|
{
|
|
|
|
ResetLatch(MyLatch);
|
|
|
|
CHECK_FOR_INTERRUPTS();
|
|
|
|
}
|
|
|
|
|
|
|
|
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
|
|
|
|
}
|
|
|
|
|
|
|
|
LWLockRelease(LogicalRepWorkerLock);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Wake up (using latch) any logical replication worker for specified sub/rel.
|
|
|
|
*/
|
|
|
|
void
|
|
|
|
logicalrep_worker_wakeup(Oid subid, Oid relid)
|
|
|
|
{
|
|
|
|
LogicalRepWorker *worker;
|
|
|
|
|
|
|
|
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
|
|
|
|
|
|
|
|
worker = logicalrep_worker_find(subid, relid, true);
|
|
|
|
|
|
|
|
if (worker)
|
|
|
|
logicalrep_worker_wakeup_ptr(worker);
|
|
|
|
|
|
|
|
LWLockRelease(LogicalRepWorkerLock);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Wake up (using latch) the specified logical replication worker.
|
|
|
|
*
|
|
|
|
* Caller must hold lock, else worker->proc could change under us.
|
|
|
|
*/
|
|
|
|
void
|
|
|
|
logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
|
|
|
|
{
|
|
|
|
Assert(LWLockHeldByMe(LogicalRepWorkerLock));
|
|
|
|
|
|
|
|
SetLatch(&worker->proc->procLatch);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Attach to a slot.
|
|
|
|
*/
|
|
|
|
void
|
|
|
|
logicalrep_worker_attach(int slot)
|
|
|
|
{
|
|
|
|
/* Block concurrent access. */
|
|
|
|
LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
|
|
|
|
|
|
|
|
Assert(slot >= 0 && slot < max_logical_replication_workers);
|
|
|
|
MyLogicalRepWorker = &LogicalRepCtx->workers[slot];
|
|
|
|
|
|
|
|
if (!MyLogicalRepWorker->in_use)
|
|
|
|
{
|
|
|
|
LWLockRelease(LogicalRepWorkerLock);
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
Phase 3 of pgindent updates.
Don't move parenthesized lines to the left, even if that means they
flow past the right margin.
By default, BSD indent lines up statement continuation lines that are
within parentheses so that they start just to the right of the preceding
left parenthesis. However, traditionally, if that resulted in the
continuation line extending to the right of the desired right margin,
then indent would push it left just far enough to not overrun the margin,
if it could do so without making the continuation line start to the left of
the current statement indent. That makes for a weird mix of indentations
unless one has been completely rigid about never violating the 80-column
limit.
This behavior has been pretty universally panned by Postgres developers.
Hence, disable it with indent's new -lpl switch, so that parenthesized
lines are always lined up with the preceding left paren.
This patch is much less interesting than the first round of indent
changes, but also bulkier, so I thought it best to separate the effects.
Discussion: https://postgr.es/m/E1dAmxK-0006EE-1r@gemulon.postgresql.org
Discussion: https://postgr.es/m/30527.1495162840@sss.pgh.pa.us
8 years ago
|
|
|
errmsg("logical replication worker slot %d is empty, cannot attach",
|
|
|
|
slot)));
|
|
|
|
}
|
|
|
|
|
|
|
|
if (MyLogicalRepWorker->proc)
|
|
|
|
{
|
|
|
|
LWLockRelease(LogicalRepWorkerLock);
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
Phase 3 of pgindent updates.
Don't move parenthesized lines to the left, even if that means they
flow past the right margin.
By default, BSD indent lines up statement continuation lines that are
within parentheses so that they start just to the right of the preceding
left parenthesis. However, traditionally, if that resulted in the
continuation line extending to the right of the desired right margin,
then indent would push it left just far enough to not overrun the margin,
if it could do so without making the continuation line start to the left of
the current statement indent. That makes for a weird mix of indentations
unless one has been completely rigid about never violating the 80-column
limit.
This behavior has been pretty universally panned by Postgres developers.
Hence, disable it with indent's new -lpl switch, so that parenthesized
lines are always lined up with the preceding left paren.
This patch is much less interesting than the first round of indent
changes, but also bulkier, so I thought it best to separate the effects.
Discussion: https://postgr.es/m/E1dAmxK-0006EE-1r@gemulon.postgresql.org
Discussion: https://postgr.es/m/30527.1495162840@sss.pgh.pa.us
8 years ago
|
|
|
errmsg("logical replication worker slot %d is already used by "
|
|
|
|
"another worker, cannot attach", slot)));
|
|
|
|
}
|
|
|
|
|
|
|
|
MyLogicalRepWorker->proc = MyProc;
|
|
|
|
before_shmem_exit(logicalrep_worker_onexit, (Datum) 0);
|
|
|
|
|
|
|
|
LWLockRelease(LogicalRepWorkerLock);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Detach the worker (cleans up the worker info).
|
|
|
|
*/
|
|
|
|
static void
|
|
|
|
logicalrep_worker_detach(void)
|
|
|
|
{
|
|
|
|
/* Block concurrent access. */
|
|
|
|
LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
|
|
|
|
|
|
|
|
logicalrep_worker_cleanup(MyLogicalRepWorker);
|
|
|
|
|
|
|
|
LWLockRelease(LogicalRepWorkerLock);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Clean up worker info.
|
|
|
|
*/
|
|
|
|
static void
|
|
|
|
logicalrep_worker_cleanup(LogicalRepWorker *worker)
|
|
|
|
{
|
|
|
|
Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
|
|
|
|
|
|
|
|
worker->in_use = false;
|
|
|
|
worker->proc = NULL;
|
|
|
|
worker->dbid = InvalidOid;
|
|
|
|
worker->userid = InvalidOid;
|
|
|
|
worker->subid = InvalidOid;
|
|
|
|
worker->relid = InvalidOid;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Cleanup function for logical replication launcher.
|
|
|
|
*
|
|
|
|
* Called on logical replication launcher exit.
|
|
|
|
*/
|
|
|
|
static void
|
|
|
|
logicalrep_launcher_onexit(int code, Datum arg)
|
|
|
|
{
|
|
|
|
LogicalRepCtx->launcher_pid = 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Cleanup function.
|
|
|
|
*
|
|
|
|
* Called on logical replication worker exit.
|
|
|
|
*/
|
|
|
|
static void
|
|
|
|
logicalrep_worker_onexit(int code, Datum arg)
|
|
|
|
{
|
|
|
|
/* Disconnect gracefully from the remote side. */
|
|
|
|
if (LogRepWorkerWalRcvConn)
|
|
|
|
walrcv_disconnect(LogRepWorkerWalRcvConn);
|
|
|
|
|
|
|
|
logicalrep_worker_detach();
|
|
|
|
|
|
|
|
/* Cleanup filesets used for streaming transactions. */
|
|
|
|
logicalrep_worker_cleanupfileset();
|
|
|
|
|
|
|
|
ApplyLauncherWakeup();
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Count the number of registered (not necessarily running) sync workers
|
|
|
|
* for a subscription.
|
|
|
|
*/
|
|
|
|
int
|
|
|
|
logicalrep_sync_worker_count(Oid subid)
|
|
|
|
{
|
|
|
|
int i;
|
|
|
|
int res = 0;
|
|
|
|
|
|
|
|
Assert(LWLockHeldByMe(LogicalRepWorkerLock));
|
|
|
|
|
|
|
|
/* Search for attached worker for a given subscription id. */
|
|
|
|
for (i = 0; i < max_logical_replication_workers; i++)
|
|
|
|
{
|
|
|
|
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
|
|
|
|
|
|
|
|
if (w->subid == subid && OidIsValid(w->relid))
|
|
|
|
res++;
|
|
|
|
}
|
|
|
|
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* ApplyLauncherShmemSize
|
|
|
|
* Compute space needed for replication launcher shared memory
|
|
|
|
*/
|
|
|
|
Size
|
|
|
|
ApplyLauncherShmemSize(void)
|
|
|
|
{
|
|
|
|
Size size;
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Need the fixed struct and the array of LogicalRepWorker.
|
|
|
|
*/
|
|
|
|
size = sizeof(LogicalRepCtxStruct);
|
|
|
|
size = MAXALIGN(size);
|
|
|
|
size = add_size(size, mul_size(max_logical_replication_workers,
|
|
|
|
sizeof(LogicalRepWorker)));
|
|
|
|
return size;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* ApplyLauncherRegister
|
|
|
|
* Register a background worker running the logical replication launcher.
|
|
|
|
*/
|
|
|
|
void
|
|
|
|
ApplyLauncherRegister(void)
|
|
|
|
{
|
|
|
|
BackgroundWorker bgw;
|
|
|
|
|
|
|
|
if (max_logical_replication_workers == 0)
|
|
|
|
return;
|
|
|
|
|
|
|
|
memset(&bgw, 0, sizeof(bgw));
|
|
|
|
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
|
|
|
|
BGWORKER_BACKEND_DATABASE_CONNECTION;
|
|
|
|
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
|
|
|
|
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
|
|
|
|
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain");
|
|
|
|
snprintf(bgw.bgw_name, BGW_MAXLEN,
|
|
|
|
"logical replication launcher");
|
|
|
|
snprintf(bgw.bgw_type, BGW_MAXLEN,
|
|
|
|
"logical replication launcher");
|
|
|
|
bgw.bgw_restart_time = 5;
|
|
|
|
bgw.bgw_notify_pid = 0;
|
|
|
|
bgw.bgw_main_arg = (Datum) 0;
|
|
|
|
|
|
|
|
RegisterBackgroundWorker(&bgw);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* ApplyLauncherShmemInit
|
|
|
|
* Allocate and initialize replication launcher shared memory
|
|
|
|
*/
|
|
|
|
void
|
|
|
|
ApplyLauncherShmemInit(void)
|
|
|
|
{
|
|
|
|
bool found;
|
|
|
|
|
|
|
|
LogicalRepCtx = (LogicalRepCtxStruct *)
|
|
|
|
ShmemInitStruct("Logical Replication Launcher Data",
|
|
|
|
ApplyLauncherShmemSize(),
|
|
|
|
&found);
|
|
|
|
|
|
|
|
if (!found)
|
|
|
|
{
|
|
|
|
int slot;
|
|
|
|
|
|
|
|
memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
|
|
|
|
|
|
|
|
/* Initialize memory and spin locks for each worker slot. */
|
|
|
|
for (slot = 0; slot < max_logical_replication_workers; slot++)
|
|
|
|
{
|
|
|
|
LogicalRepWorker *worker = &LogicalRepCtx->workers[slot];
|
|
|
|
|
|
|
|
memset(worker, 0, sizeof(LogicalRepWorker));
|
|
|
|
SpinLockInit(&worker->relmutex);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Wakeup the launcher on commit if requested.
|
|
|
|
*/
|
|
|
|
void
|
|
|
|
AtEOXact_ApplyLauncher(bool isCommit)
|
|
|
|
{
|
|
|
|
if (isCommit)
|
|
|
|
{
|
|
|
|
if (on_commit_launcher_wakeup)
|
|
|
|
ApplyLauncherWakeup();
|
|
|
|
}
|
|
|
|
|
|
|
|
on_commit_launcher_wakeup = false;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Request wakeup of the launcher on commit of the transaction.
|
|
|
|
*
|
|
|
|
* This is used to send launcher signal to stop sleeping and process the
|
|
|
|
* subscriptions when current transaction commits. Should be used when new
|
|
|
|
* tuple was added to the pg_subscription catalog.
|
|
|
|
*/
|
|
|
|
void
|
|
|
|
ApplyLauncherWakeupAtCommit(void)
|
|
|
|
{
|
|
|
|
if (!on_commit_launcher_wakeup)
|
|
|
|
on_commit_launcher_wakeup = true;
|
|
|
|
}
|
|
|
|
|
|
|
|
static void
|
|
|
|
ApplyLauncherWakeup(void)
|
|
|
|
{
|
|
|
|
if (LogicalRepCtx->launcher_pid != 0)
|
|
|
|
kill(LogicalRepCtx->launcher_pid, SIGUSR1);
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Main loop for the apply launcher process.
|
|
|
|
*/
|
|
|
|
void
|
|
|
|
ApplyLauncherMain(Datum main_arg)
|
|
|
|
{
|
|
|
|
TimestampTz last_start_time = 0;
|
|
|
|
|
|
|
|
ereport(DEBUG1,
|
|
|
|
(errmsg_internal("logical replication launcher started")));
|
|
|
|
|
|
|
|
before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
|
|
|
|
|
|
|
|
Assert(LogicalRepCtx->launcher_pid == 0);
|
|
|
|
LogicalRepCtx->launcher_pid = MyProcPid;
|
|
|
|
|
|
|
|
/* Establish signal handlers. */
|
|
|
|
pqsignal(SIGHUP, SignalHandlerForConfigReload);
|
|
|
|
pqsignal(SIGTERM, die);
|
|
|
|
BackgroundWorkerUnblockSignals();
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Establish connection to nailed catalogs (we only ever access
|
|
|
|
* pg_subscription).
|
|
|
|
*/
|
|
|
|
BackgroundWorkerInitializeConnection(NULL, NULL, 0);
|
|
|
|
|
|
|
|
/* Enter main loop */
|
|
|
|
for (;;)
|
|
|
|
{
|
|
|
|
int rc;
|
|
|
|
List *sublist;
|
|
|
|
ListCell *lc;
|
|
|
|
MemoryContext subctx;
|
|
|
|
MemoryContext oldctx;
|
|
|
|
TimestampTz now;
|
|
|
|
long wait_time = DEFAULT_NAPTIME_PER_CYCLE;
|
|
|
|
|
|
|
|
CHECK_FOR_INTERRUPTS();
|
|
|
|
|
|
|
|
now = GetCurrentTimestamp();
|
|
|
|
|
|
|
|
/* Limit the start retry to once a wal_retrieve_retry_interval */
|
|
|
|
if (TimestampDifferenceExceeds(last_start_time, now,
|
|
|
|
wal_retrieve_retry_interval))
|
|
|
|
{
|
|
|
|
/* Use temporary context for the database list and worker info. */
|
|
|
|
subctx = AllocSetContextCreate(TopMemoryContext,
|
Phase 3 of pgindent updates.
Don't move parenthesized lines to the left, even if that means they
flow past the right margin.
By default, BSD indent lines up statement continuation lines that are
within parentheses so that they start just to the right of the preceding
left parenthesis. However, traditionally, if that resulted in the
continuation line extending to the right of the desired right margin,
then indent would push it left just far enough to not overrun the margin,
if it could do so without making the continuation line start to the left of
the current statement indent. That makes for a weird mix of indentations
unless one has been completely rigid about never violating the 80-column
limit.
This behavior has been pretty universally panned by Postgres developers.
Hence, disable it with indent's new -lpl switch, so that parenthesized
lines are always lined up with the preceding left paren.
This patch is much less interesting than the first round of indent
changes, but also bulkier, so I thought it best to separate the effects.
Discussion: https://postgr.es/m/E1dAmxK-0006EE-1r@gemulon.postgresql.org
Discussion: https://postgr.es/m/30527.1495162840@sss.pgh.pa.us
8 years ago
|
|
|
"Logical Replication Launcher sublist",
|
Rethink MemoryContext creation to improve performance.
This patch makes a number of interrelated changes to reduce the overhead
involved in creating/deleting memory contexts. The key ideas are:
* Include the AllocSetContext header of an aset.c context in its first
malloc request, rather than allocating it separately in TopMemoryContext.
This means that we now always create an initial or "keeper" block in an
aset, even if it never receives any allocation requests.
* Create freelists in which we can save and recycle recently-destroyed
asets (this idea is due to Robert Haas).
* In the common case where the name of a context is a constant string,
just store a pointer to it in the context header, rather than copying
the string.
The first change eliminates a palloc/pfree cycle per context, and
also avoids bloat in TopMemoryContext, at the price that creating
a context now involves a malloc/free cycle even if the context never
receives any allocations. That would be a loser for some common
usage patterns, but recycling short-lived contexts via the freelist
eliminates that pain.
Avoiding copying constant strings not only saves strlen() and strcpy()
overhead, but is an essential part of the freelist optimization because
it makes the context header size constant. Currently we make no
attempt to use the freelist for contexts with non-constant names.
(Perhaps someday we'll need to think harder about that, but in current
usage, most contexts with custom names are long-lived anyway.)
The freelist management in this initial commit is pretty simplistic,
and we might want to refine it later --- but in common workloads that
will never matter because the freelists will never get full anyway.
To create a context with a non-constant name, one is now required to
call AllocSetContextCreateExtended and specify the MEMCONTEXT_COPY_NAME
option. AllocSetContextCreate becomes a wrapper macro, and it includes
a test that will complain about non-string-literal context name
parameters on gcc and similar compilers.
An unfortunate side effect of making AllocSetContextCreate a macro is
that one is now *required* to use the size parameter abstraction macros
(ALLOCSET_DEFAULT_SIZES and friends) with it; the pre-9.6 habit of
writing out individual size parameters no longer works unless you
switch to AllocSetContextCreateExtended.
Internally to the memory-context-related modules, the context creation
APIs are simplified, removing the rather baroque original design whereby
a context-type module called mcxt.c which then called back into the
context-type module. That saved a bit of code duplication, but not much,
and it prevented context-type modules from exercising control over the
allocation of context headers.
In passing, I converted the test-and-elog validation of aset size
parameters into Asserts to save a few more cycles. The original thought
was that callers might compute size parameters on the fly, but in practice
nobody does that, so it's useless to expend cycles on checking those
numbers in production builds.
Also, mark the memory context method-pointer structs "const",
just for cleanliness.
Discussion: https://postgr.es/m/2264.1512870796@sss.pgh.pa.us
8 years ago
|
|
|
ALLOCSET_DEFAULT_SIZES);
|
|
|
|
oldctx = MemoryContextSwitchTo(subctx);
|
|
|
|
|
|
|
|
/* search for subscriptions to start or stop. */
|
|
|
|
sublist = get_subscription_list();
|
|
|
|
|
|
|
|
/* Start the missing workers for enabled subscriptions. */
|
|
|
|
foreach(lc, sublist)
|
|
|
|
{
|
|
|
|
Subscription *sub = (Subscription *) lfirst(lc);
|
|
|
|
LogicalRepWorker *w;
|
|
|
|
|
|
|
|
if (!sub->enabled)
|
|
|
|
continue;
|
|
|
|
|
|
|
|
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
|
|
|
|
w = logicalrep_worker_find(sub->oid, InvalidOid, false);
|
|
|
|
LWLockRelease(LogicalRepWorkerLock);
|
|
|
|
|
|
|
|
if (w == NULL)
|
|
|
|
{
|
|
|
|
last_start_time = now;
|
|
|
|
wait_time = wal_retrieve_retry_interval;
|
|
|
|
|
|
|
|
logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
|
|
|
|
sub->owner, InvalidOid);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Switch back to original memory context. */
|
|
|
|
MemoryContextSwitchTo(oldctx);
|
|
|
|
/* Clean the temporary memory. */
|
|
|
|
MemoryContextDelete(subctx);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
/*
|
|
|
|
* The wait in previous cycle was interrupted in less than
|
|
|
|
* wal_retrieve_retry_interval since last worker was started, this
|
|
|
|
* usually means crash of the worker, so we should retry in
|
|
|
|
* wal_retrieve_retry_interval again.
|
|
|
|
*/
|
|
|
|
wait_time = wal_retrieve_retry_interval;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Wait for more work. */
|
|
|
|
rc = WaitLatch(MyLatch,
|
Add WL_EXIT_ON_PM_DEATH pseudo-event.
Users of the WaitEventSet and WaitLatch() APIs can now choose between
asking for WL_POSTMASTER_DEATH and then handling it explicitly, or asking
for WL_EXIT_ON_PM_DEATH to trigger immediate exit on postmaster death.
This reduces code duplication, since almost all callers want the latter.
Repair all code that was previously ignoring postmaster death completely,
or requesting the event but ignoring it, or requesting the event but then
doing an unconditional PostmasterIsAlive() call every time through its
event loop (which is an expensive syscall on platforms for which we don't
have USE_POSTMASTER_DEATH_SIGNAL support).
Assert that callers of WaitLatchXXX() under the postmaster remember to
ask for either WL_POSTMASTER_DEATH or WL_EXIT_ON_PM_DEATH, to prevent
future bugs.
The only process that doesn't handle postmaster death is syslogger. It
waits until all backends holding the write end of the syslog pipe
(including the postmaster) have closed it by exiting, to be sure to
capture any parting messages. By using the WaitEventSet API directly
it avoids the new assertion, and as a by-product it may be slightly
more efficient on platforms that have epoll().
Author: Thomas Munro
Reviewed-by: Kyotaro Horiguchi, Heikki Linnakangas, Tom Lane
Discussion: https://postgr.es/m/CAEepm%3D1TCviRykkUb69ppWLr_V697rzd1j3eZsRMmbXvETfqbQ%40mail.gmail.com,
https://postgr.es/m/CAEepm=2LqHzizbe7muD7-2yHUbTOoF7Q+qkSD5Q41kuhttRTwA@mail.gmail.com
7 years ago
|
|
|
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
|
|
|
|
wait_time,
|
|
|
|
WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
|
|
|
|
|
|
|
|
if (rc & WL_LATCH_SET)
|
|
|
|
{
|
|
|
|
ResetLatch(MyLatch);
|
|
|
|
CHECK_FOR_INTERRUPTS();
|
|
|
|
}
|
|
|
|
|
|
|
|
if (ConfigReloadPending)
|
|
|
|
{
|
|
|
|
ConfigReloadPending = false;
|
|
|
|
ProcessConfigFile(PGC_SIGHUP);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Not reachable */
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Is current process the logical replication launcher?
|
|
|
|
*/
|
|
|
|
bool
|
|
|
|
IsLogicalLauncher(void)
|
|
|
|
{
|
|
|
|
return LogicalRepCtx->launcher_pid == MyProcPid;
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
|
|
|
* Returns state of the subscriptions.
|
|
|
|
*/
|
|
|
|
Datum
|
|
|
|
pg_stat_get_subscription(PG_FUNCTION_ARGS)
|
|
|
|
{
|
|
|
|
#define PG_STAT_GET_SUBSCRIPTION_COLS 8
|
|
|
|
Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
|
|
|
|
int i;
|
|
|
|
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
|
|
|
|
TupleDesc tupdesc;
|
|
|
|
Tuplestorestate *tupstore;
|
|
|
|
MemoryContext per_query_ctx;
|
|
|
|
MemoryContext oldcontext;
|
|
|
|
|
|
|
|
/* check to see if caller supports us returning a tuplestore */
|
|
|
|
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
|
|
errmsg("set-valued function called in context that cannot accept a set")));
|
|
|
|
if (!(rsinfo->allowedModes & SFRM_Materialize))
|
|
|
|
ereport(ERROR,
|
|
|
|
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
|
|
errmsg("materialize mode required, but it is not allowed in this context")));
|
|
|
|
|
|
|
|
/* Build a tuple descriptor for our result type */
|
|
|
|
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
|
|
|
|
elog(ERROR, "return type must be a row type");
|
|
|
|
|
|
|
|
per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
|
|
|
|
oldcontext = MemoryContextSwitchTo(per_query_ctx);
|
|
|
|
|
|
|
|
tupstore = tuplestore_begin_heap(true, false, work_mem);
|
|
|
|
rsinfo->returnMode = SFRM_Materialize;
|
|
|
|
rsinfo->setResult = tupstore;
|
|
|
|
rsinfo->setDesc = tupdesc;
|
|
|
|
|
|
|
|
MemoryContextSwitchTo(oldcontext);
|
|
|
|
|
|
|
|
/* Make sure we get consistent view of the workers. */
|
|
|
|
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
|
|
|
|
|
|
|
|
for (i = 0; i <= max_logical_replication_workers; i++)
|
|
|
|
{
|
|
|
|
/* for each row */
|
|
|
|
Datum values[PG_STAT_GET_SUBSCRIPTION_COLS];
|
|
|
|
bool nulls[PG_STAT_GET_SUBSCRIPTION_COLS];
|
|
|
|
int worker_pid;
|
|
|
|
LogicalRepWorker worker;
|
|
|
|
|
|
|
|
memcpy(&worker, &LogicalRepCtx->workers[i],
|
|
|
|
sizeof(LogicalRepWorker));
|
|
|
|
if (!worker.proc || !IsBackendPid(worker.proc->pid))
|
|
|
|
continue;
|
|
|
|
|
|
|
|
if (OidIsValid(subid) && worker.subid != subid)
|
|
|
|
continue;
|
|
|
|
|
|
|
|
worker_pid = worker.proc->pid;
|
|
|
|
|
|
|
|
MemSet(values, 0, sizeof(values));
|
|
|
|
MemSet(nulls, 0, sizeof(nulls));
|
|
|
|
|
|
|
|
values[0] = ObjectIdGetDatum(worker.subid);
|
|
|
|
if (OidIsValid(worker.relid))
|
|
|
|
values[1] = ObjectIdGetDatum(worker.relid);
|
|
|
|
else
|
|
|
|
nulls[1] = true;
|
|
|
|
values[2] = Int32GetDatum(worker_pid);
|
|
|
|
if (XLogRecPtrIsInvalid(worker.last_lsn))
|
|
|
|
nulls[3] = true;
|
|
|
|
else
|
|
|
|
values[3] = LSNGetDatum(worker.last_lsn);
|
|
|
|
if (worker.last_send_time == 0)
|
|
|
|
nulls[4] = true;
|
|
|
|
else
|
|
|
|
values[4] = TimestampTzGetDatum(worker.last_send_time);
|
|
|
|
if (worker.last_recv_time == 0)
|
|
|
|
nulls[5] = true;
|
|
|
|
else
|
|
|
|
values[5] = TimestampTzGetDatum(worker.last_recv_time);
|
|
|
|
if (XLogRecPtrIsInvalid(worker.reply_lsn))
|
|
|
|
nulls[6] = true;
|
|
|
|
else
|
|
|
|
values[6] = LSNGetDatum(worker.reply_lsn);
|
|
|
|
if (worker.reply_time == 0)
|
|
|
|
nulls[7] = true;
|
|
|
|
else
|
|
|
|
values[7] = TimestampTzGetDatum(worker.reply_time);
|
|
|
|
|
|
|
|
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
|
|
|
|
|
|
|
|
/*
|
|
|
|
* If only a single subscription was requested, and we found it,
|
|
|
|
* break.
|
|
|
|
*/
|
|
|
|
if (OidIsValid(subid))
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
LWLockRelease(LogicalRepWorkerLock);
|
|
|
|
|
|
|
|
/* clean up and return the tuplestore */
|
|
|
|
tuplestore_donestoring(tupstore);
|
|
|
|
|
|
|
|
return (Datum) 0;
|
|
|
|
}
|