Perform apply of large transactions by parallel workers.

Currently, for large transactions, the publisher sends the data in
multiple streams (changes divided into chunks depending upon
logical_decoding_work_mem), and then on the subscriber-side, the apply
worker writes the changes into temporary files and once it receives the
commit, it reads from those files and applies the entire transaction. To
improve the performance of such transactions, we can instead allow them to
be applied via parallel workers.

In this approach, we assign a new parallel apply worker (if available) as
soon as the xact's first stream is received and the leader apply worker
will send changes to this new worker via shared memory. The parallel apply
worker will directly apply the change instead of writing it to temporary
files. However, if the leader apply worker times out while attempting to
send a message to the parallel apply worker, it will switch to
"partial serialize" mode -  in this mode, the leader serializes all
remaining changes to a file and notifies the parallel apply workers to
read and apply them at the end of the transaction. We use a non-blocking
way to send the messages from the leader apply worker to the parallel
apply to avoid deadlocks. We keep this parallel apply assigned till the
transaction commit is received and also wait for the worker to finish at
commit. This preserves commit ordering and avoid writing to and reading
from files in most cases. We still need to spill if there is no worker
available.

This patch also extends the SUBSCRIPTION 'streaming' parameter so that the
user can control whether to apply the streaming transaction in a parallel
apply worker or spill the change to disk. The user can set the streaming
parameter to 'on/off', or 'parallel'. The parameter value 'parallel' means
the streaming will be applied via a parallel apply worker, if available.
The parameter value 'on' means the streaming transaction will be spilled
to disk. The default value is 'off' (same as current behaviour).

In addition, the patch extends the logical replication STREAM_ABORT
message so that abort_lsn and abort_time can also be sent which can be
used to update the replication origin in parallel apply worker when the
streaming transaction is aborted. Because this message extension is needed
to support parallel streaming, parallel streaming is not supported for
publications on servers < PG16.

Author: Hou Zhijie, Wang wei, Amit Kapila with design inputs from Sawada Masahiko
Reviewed-by: Sawada Masahiko, Peter Smith, Dilip Kumar, Shi yu, Kuroda Hayato, Shveta Mallik
Discussion: https://postgr.es/m/CAA4eK1+wyN6zpaHUkCLorEWNx75MG0xhMwcFhvjqm2KURZEAGw@mail.gmail.com
pull/113/head
Amit Kapila 3 years ago
parent 5687e7810f
commit 216a784829
  1. 11
      doc/src/sgml/catalogs.sgml
  2. 28
      doc/src/sgml/config.sgml
  3. 22
      doc/src/sgml/logical-replication.sgml
  4. 5
      doc/src/sgml/monitoring.sgml
  5. 29
      doc/src/sgml/protocol.sgml
  6. 24
      doc/src/sgml/ref/create_subscription.sgml
  7. 14
      doc/src/sgml/system-views.sgml
  8. 22
      src/backend/access/transam/xact.c
  9. 67
      src/backend/commands/subscriptioncmds.c
  10. 12
      src/backend/libpq/pqmq.c
  11. 3
      src/backend/postmaster/bgworker.c
  12. 5
      src/backend/postmaster/interrupt.c
  13. 6
      src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
  14. 1
      src/backend/replication/logical/Makefile
  15. 1630
      src/backend/replication/logical/applyparallelworker.c
  16. 5
      src/backend/replication/logical/decode.c
  17. 216
      src/backend/replication/logical/launcher.c
  18. 1
      src/backend/replication/logical/meson.build
  19. 24
      src/backend/replication/logical/origin.c
  20. 37
      src/backend/replication/logical/proto.c
  21. 5
      src/backend/replication/logical/reorderbuffer.c
  22. 25
      src/backend/replication/logical/tablesync.c
  23. 1296
      src/backend/replication/logical/worker.c
  24. 22
      src/backend/replication/pgoutput/pgoutput.c
  25. 4
      src/backend/storage/ipc/procsignal.c
  26. 46
      src/backend/storage/lmgr/lmgr.c
  27. 3
      src/backend/tcop/postgres.c
  28. 6
      src/backend/utils/activity/wait_event.c
  29. 16
      src/backend/utils/adt/lockfuncs.c
  30. 12
      src/backend/utils/misc/guc_tables.c
  31. 1
      src/backend/utils/misc/postgresql.conf.sample
  32. 6
      src/bin/pg_dump/pg_dump.c
  33. 17
      src/bin/psql/describe.c
  34. 2
      src/include/catalog/catversion.h
  35. 21
      src/include/catalog/pg_subscription.h
  36. 2
      src/include/commands/subscriptioncmds.h
  37. 1
      src/include/replication/logicallauncher.h
  38. 28
      src/include/replication/logicalproto.h
  39. 9
      src/include/replication/logicalworker.h
  40. 2
      src/include/replication/origin.h
  41. 2
      src/include/replication/pgoutput.h
  42. 4
      src/include/replication/reorderbuffer.h
  43. 2
      src/include/replication/walreceiver.h
  44. 227
      src/include/replication/worker_internal.h
  45. 5
      src/include/storage/lmgr.h
  46. 17
      src/include/storage/lock.h
  47. 1
      src/include/storage/procsignal.h
  48. 2
      src/include/utils/wait_event.h
  49. 46
      src/test/regress/expected/subscription.out
  50. 6
      src/test/regress/sql/subscription.sql
  51. 268
      src/test/subscription/t/015_stream.pl
  52. 126
      src/test/subscription/t/016_stream_subxact.pl
  53. 3
      src/test/subscription/t/017_stream_ddl.pl
  54. 158
      src/test/subscription/t/018_stream_subxact_abort.pl
  55. 3
      src/test/subscription/t/019_stream_subxact_ddl_abort.pl
  56. 2
      src/test/subscription/t/022_twophase_cascade.pl
  57. 225
      src/test/subscription/t/023_twophase_stream.pl
  58. 7
      src/tools/pgindent/typedefs.list

@ -7913,11 +7913,16 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>substream</structfield> <type>bool</type>
<structfield>substream</structfield> <type>char</type>
</para>
<para>
If true, the subscription will allow streaming of in-progress
transactions
Controls how to handle the streaming of in-progress transactions:
<literal>f</literal> = disallow streaming of in-progress transactions,
<literal>t</literal> = spill the changes of in-progress transactions to
disk and apply at once after the transaction is committed on the
publisher and received by the subscriber,
<literal>p</literal> = apply changes directly using a parallel apply
worker if available (same as 't' if no worker is available)
</para></entry>
</row>

@ -4968,7 +4968,8 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
<listitem>
<para>
Specifies maximum number of logical replication workers. This includes
both apply workers and table synchronization workers.
leader apply workers, parallel apply workers, and table synchronization
workers.
</para>
<para>
Logical replication workers are taken from the pool defined by
@ -5008,6 +5009,31 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
</listitem>
</varlistentry>
<varlistentry id="guc-max-parallel-apply-workers-per-subscription" xreflabel="max_parallel_apply_workers_per_subscription">
<term><varname>max_parallel_apply_workers_per_subscription</varname> (<type>integer</type>)
<indexterm>
<primary><varname>max_parallel_apply_workers_per_subscription</varname> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Maximum number of parallel apply workers per subscription. This
parameter controls the amount of parallelism for streaming of
in-progress transactions with subscription parameter
<literal>streaming = parallel</literal>.
</para>
<para>
The parallel apply workers are taken from the pool defined by
<varname>max_logical_replication_workers</varname>.
</para>
<para>
The default value is 2. This parameter can only be set in the
<filename>postgresql.conf</filename> file or on the server command
line.
</para>
</listitem>
</varlistentry>
</variablelist>
</sect2>

@ -1501,6 +1501,16 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER
might not violate any constraint. This can easily make the subscriber
inconsistent.
</para>
<para>
When the streaming mode is <literal>parallel</literal>, the finish LSN of
failed transactions may not be logged. In that case, it may be necessary to
change the streaming mode to <literal>on</literal> or <literal>off</literal> and
cause the same conflicts again so the finish LSN of the failed transaction will
be written to the server log. For the usage of finish LSN, please refer to <link
linkend="sql-altersubscription"><command>ALTER SUBSCRIPTION ...
SKIP</command></link>.
</para>
</sect1>
<sect1 id="logical-replication-restrictions">
@ -1809,8 +1819,9 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER
<para>
<link linkend="guc-max-logical-replication-workers"><varname>max_logical_replication_workers</varname></link>
must be set to at least the number of subscriptions (for apply workers), plus
some reserve for the table synchronization workers.
must be set to at least the number of subscriptions (for leader apply
workers), plus some reserve for the table synchronization workers and
parallel apply workers.
</para>
<para>
@ -1827,6 +1838,13 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER
subscription initialization or when new tables are added.
</para>
<para>
<link linkend="guc-max-parallel-apply-workers-per-subscription"><varname>max_parallel_apply_workers_per_subscription</varname></link>
controls the amount of parallelism for streaming of in-progress
transactions with subscription parameter
<literal>streaming = parallel</literal>.
</para>
<para>
Logical replication workers are also affected by
<link linkend="guc-wal-receiver-timeout"><varname>wal_receiver_timeout</varname></link>,

@ -1858,6 +1858,11 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
<entry><literal>advisory</literal></entry>
<entry>Waiting to acquire an advisory user lock.</entry>
</row>
<row>
<entry><literal>applytransaction</literal></entry>
<entry>Waiting to acquire a lock on a remote transaction being applied
by a logical replication subscriber.</entry>
</row>
<row>
<entry><literal>extend</literal></entry>
<entry>Waiting to extend a relation.</entry>

@ -3103,7 +3103,7 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
<listitem>
<para>
Protocol version. Currently versions <literal>1</literal>, <literal>2</literal>,
and <literal>3</literal> are supported.
<literal>3</literal>, and <literal>4</literal> are supported.
</para>
<para>
Version <literal>2</literal> is supported only for server version 14
@ -3113,6 +3113,11 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
Version <literal>3</literal> is supported only for server version 15
and above, and it allows streaming of two-phase commits.
</para>
<para>
Version <literal>4</literal> is supported only for server version 16
and above, and it allows streams of large in-progress transactions to
be applied in parallel.
</para>
</listitem>
</varlistentry>
@ -6883,6 +6888,28 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>Int64 (XLogRecPtr)</term>
<listitem>
<para>
The LSN of the abort. This field is available since protocol version
4.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term>Int64 (TimestampTz)</term>
<listitem>
<para>
Abort timestamp of the transaction. The value is in number
of microseconds since PostgreSQL epoch (2000-01-01). This field is
available since protocol version 4.
</para>
</listitem>
</varlistentry>
</variablelist>
</listitem>
</varlistentry>

@ -228,13 +228,29 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
</varlistentry>
<varlistentry>
<term><literal>streaming</literal> (<type>boolean</type>)</term>
<term><literal>streaming</literal> (<type>enum</type>)</term>
<listitem>
<para>
Specifies whether to enable streaming of in-progress transactions
for this subscription. By default, all transactions
are fully decoded on the publisher and only then sent to the
subscriber as a whole.
for this subscription. The default value is <literal>off</literal>,
meaning all transactions are fully decoded on the publisher and only
then sent to the subscriber as a whole.
</para>
<para>
If set to <literal>on</literal>, the incoming changes are written to
temporary files and then applied only after the transaction is
committed on the publisher and received by the subscriber.
</para>
<para>
If set to <literal>parallel</literal>, incoming changes are directly
applied via one of the parallel apply workers, if available. If no
parallel apply worker is free to handle streaming transactions then
the changes are written to temporary files and applied after the
transaction is committed. Note that if an error happens in a
parallel apply worker, the finish LSN of the remote transaction
might not be reported in the server log.
</para>
</listitem>
</varlistentry>

@ -1379,8 +1379,9 @@
<literal>virtualxid</literal>,
<literal>spectoken</literal>,
<literal>object</literal>,
<literal>userlock</literal>, or
<literal>advisory</literal>.
<literal>userlock</literal>,
<literal>advisory</literal>, or
<literal>applytransaction</literal>.
(See also <xref linkend="wait-event-lock-table"/>.)
</para></entry>
</row>
@ -1594,6 +1595,15 @@
so the <structfield>database</structfield> column is meaningful for an advisory lock.
</para>
<para>
Apply transaction locks are used in parallel mode to apply the transaction
in logical replication. The remote transaction id is displayed in the
<structfield>transactionid</structfield> column. The <structfield>objsubid</structfield>
displays the lock subtype which is 0 for the lock used to synchronize the
set of changes, and 1 for the lock used to wait for the transaction to
finish to ensure commit order.
</para>
<para>
<structname>pg_locks</structname> provides a global view of all locks
in the database cluster, not only those relevant to the current database.

@ -1713,6 +1713,7 @@ RecordTransactionAbort(bool isSubXact)
int nchildren;
TransactionId *children;
TimestampTz xact_time;
bool replorigin;
/*
* If we haven't been assigned an XID, nobody will care whether we aborted
@ -1743,6 +1744,13 @@ RecordTransactionAbort(bool isSubXact)
elog(PANIC, "cannot abort transaction %u, it was already committed",
xid);
/*
* Are we using the replication origins feature? Or, in other words, are
* we replaying remote actions?
*/
replorigin = (replorigin_session_origin != InvalidRepOriginId &&
replorigin_session_origin != DoNotReplicateId);
/* Fetch the data we need for the abort record */
nrels = smgrGetPendingDeletes(false, &rels);
nchildren = xactGetCommittedChildren(&children);
@ -1766,6 +1774,11 @@ RecordTransactionAbort(bool isSubXact)
MyXactFlags, InvalidTransactionId,
NULL);
if (replorigin)
/* Move LSNs forward for this replication origin */
replorigin_session_advance(replorigin_session_origin_lsn,
XactLastRecEnd);
/*
* Report the latest async abort LSN, so that the WAL writer knows to
* flush this abort. There's nothing to be gained by delaying this, since
@ -5873,11 +5886,10 @@ XactLogAbortRecord(TimestampTz abort_time,
}
/*
* Dump transaction origin information only for abort prepared. We need
* this during recovery to update the replication origin progress.
* Dump transaction origin information. We need this during recovery to
* update the replication origin progress.
*/
if ((replorigin_session_origin != InvalidRepOriginId) &&
TransactionIdIsValid(twophase_xid))
if (replorigin_session_origin != InvalidRepOriginId)
{
xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
@ -5934,7 +5946,7 @@ XactLogAbortRecord(TimestampTz abort_time,
if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
if (TransactionIdIsValid(twophase_xid))
/* Include the replication origin */
XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
return XLogInsert(RM_XACT_ID, info);

@ -85,7 +85,7 @@ typedef struct SubOpts
bool copy_data;
bool refresh;
bool binary;
bool streaming;
char streaming;
bool twophase;
bool disableonerr;
char *origin;
@ -139,7 +139,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
if (IsSet(supported_opts, SUBOPT_BINARY))
opts->binary = false;
if (IsSet(supported_opts, SUBOPT_STREAMING))
opts->streaming = false;
opts->streaming = LOGICALREP_STREAM_OFF;
if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
opts->twophase = false;
if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
@ -242,7 +242,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
errorConflictingDefElem(defel, pstate);
opts->specified_opts |= SUBOPT_STREAMING;
opts->streaming = defGetBoolean(defel);
opts->streaming = defGetStreamingMode(defel);
}
else if (strcmp(defel->defname, "two_phase") == 0)
{
@ -630,7 +630,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
values[Anum_pg_subscription_substream - 1] = BoolGetDatum(opts.streaming);
values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming);
values[Anum_pg_subscription_subtwophasestate - 1] =
CharGetDatum(opts.twophase ?
LOGICALREP_TWOPHASE_STATE_PENDING :
@ -1099,7 +1099,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
{
values[Anum_pg_subscription_substream - 1] =
BoolGetDatum(opts.streaming);
CharGetDatum(opts.streaming);
replaces[Anum_pg_subscription_substream - 1] = true;
}
@ -2128,3 +2128,60 @@ merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *
return oldpublist;
}
/*
* Extract the streaming mode value from a DefElem. This is like
* defGetBoolean() but also accepts the special value of "parallel".
*/
char
defGetStreamingMode(DefElem *def)
{
/*
* If no parameter value given, assume "true" is meant.
*/
if (!def->arg)
return LOGICALREP_STREAM_ON;
/*
* Allow 0, 1, "false", "true", "off", "on" or "parallel".
*/
switch (nodeTag(def->arg))
{
case T_Integer:
switch (intVal(def->arg))
{
case 0:
return LOGICALREP_STREAM_OFF;
case 1:
return LOGICALREP_STREAM_ON;
default:
/* otherwise, error out below */
break;
}
break;
default:
{
char *sval = defGetString(def);
/*
* The set of strings accepted here should match up with the
* grammar's opt_boolean_or_string production.
*/
if (pg_strcasecmp(sval, "false") == 0 ||
pg_strcasecmp(sval, "off") == 0)
return LOGICALREP_STREAM_OFF;
if (pg_strcasecmp(sval, "true") == 0 ||
pg_strcasecmp(sval, "on") == 0)
return LOGICALREP_STREAM_ON;
if (pg_strcasecmp(sval, "parallel") == 0)
return LOGICALREP_STREAM_PARALLEL;
}
break;
}
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("%s requires a Boolean value or \"parallel\"",
def->defname)));
return LOGICALREP_STREAM_OFF; /* keep compiler quiet */
}

@ -13,11 +13,13 @@
#include "postgres.h"
#include "access/parallel.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "libpq/pqmq.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logicalworker.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
@ -162,9 +164,19 @@ mq_putmessage(char msgtype, const char *s, size_t len)
result = shm_mq_sendv(pq_mq_handle, iov, 2, true, true);
if (pq_mq_parallel_leader_pid != 0)
{
if (IsLogicalParallelApplyWorker())
SendProcSignal(pq_mq_parallel_leader_pid,
PROCSIG_PARALLEL_APPLY_MESSAGE,
pq_mq_parallel_leader_backend_id);
else
{
Assert(IsParallelWorker());
SendProcSignal(pq_mq_parallel_leader_pid,
PROCSIG_PARALLEL_MESSAGE,
pq_mq_parallel_leader_backend_id);
}
}
if (result != SHM_MQ_WOULD_BLOCK)
break;

@ -128,6 +128,9 @@ static const struct
},
{
"ApplyWorkerMain", ApplyWorkerMain
},
{
"ParallelApplyWorkerMain", ParallelApplyWorkerMain
}
};

@ -98,8 +98,9 @@ SignalHandlerForCrashExit(SIGNAL_ARGS)
* shut down and exit.
*
* Typically, this handler would be used for SIGTERM, but some processes use
* other signals. In particular, the checkpointer exits on SIGUSR2, and the
* WAL writer exits on either SIGINT or SIGTERM.
* other signals. In particular, the checkpointer exits on SIGUSR2, and the WAL
* writer and the logical replication parallel apply worker exits on either
* SIGINT or SIGTERM.
*
* ShutdownRequestPending should be checked at a convenient place within the
* main loop, or else the main loop should call HandleMainLoopInterrupts.

@ -443,9 +443,9 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
appendStringInfo(&cmd, "proto_version '%u'",
options->proto.logical.proto_version);
if (options->proto.logical.streaming &&
PQserverVersion(conn->streamConn) >= 140000)
appendStringInfoString(&cmd, ", streaming 'on'");
if (options->proto.logical.streaming_str)
appendStringInfo(&cmd, ", streaming '%s'",
options->proto.logical.streaming_str);
if (options->proto.logical.twophase &&
PQserverVersion(conn->streamConn) >= 150000)

@ -15,6 +15,7 @@ include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
OBJS = \
applyparallelworker.o \
decode.o \
launcher.o \
logical.o \

File diff suppressed because it is too large Load Diff

@ -822,10 +822,11 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
for (i = 0; i < parsed->nsubxacts; i++)
{
ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
buf->record->EndRecPtr);
buf->record->EndRecPtr, abort_time);
}
ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr);
ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr,
abort_time);
}
/* update the decoding stats */

@ -55,6 +55,7 @@
/* GUC variables */
int max_logical_replication_workers = 4;
int max_sync_workers_per_subscription = 2;
int max_parallel_apply_workers_per_subscription = 2;
LogicalRepWorker *MyLogicalRepWorker = NULL;
@ -74,6 +75,7 @@ static void logicalrep_launcher_onexit(int code, Datum arg);
static void logicalrep_worker_onexit(int code, Datum arg);
static void logicalrep_worker_detach(void);
static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
static int logicalrep_pa_worker_count(Oid subid);
static bool on_commit_launcher_wakeup = false;
@ -152,8 +154,10 @@ get_subscription_list(void)
*
* This is only needed for cleaning up the shared memory in case the worker
* fails to attach.
*
* Returns whether the attach was successful.
*/
static void
static bool
WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
uint16 generation,
BackgroundWorkerHandle *handle)
@ -169,11 +173,11 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
/* Worker either died or has started; no need to do anything. */
/* Worker either died or has started. Return false if died. */
if (!worker->in_use || worker->proc)
{
LWLockRelease(LogicalRepWorkerLock);
return;
return worker->in_use;
}
LWLockRelease(LogicalRepWorkerLock);
@ -188,7 +192,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
if (generation == worker->generation)
logicalrep_worker_cleanup(worker);
LWLockRelease(LogicalRepWorkerLock);
return;
return false;
}
/*
@ -210,6 +214,8 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
/*
* Walks the workers array and searches for one that matches given
* subscription id and relid.
*
* We are only interested in the leader apply worker or table sync worker.
*/
LogicalRepWorker *
logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
@ -224,6 +230,10 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
{
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
/* Skip parallel apply workers. */
if (isParallelApplyWorker(w))
continue;
if (w->in_use && w->subid == subid && w->relid == relid &&
(!only_running || w->proc))
{
@ -260,11 +270,13 @@ logicalrep_workers_find(Oid subid, bool only_running)
}
/*
* Start new apply background worker, if possible.
* Start new logical replication background worker, if possible.
*
* Returns true on success, false on failure.
*/
void
bool
logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
Oid relid)
Oid relid, dsm_handle subworker_dsm)
{
BackgroundWorker bgw;
BackgroundWorkerHandle *bgw_handle;
@ -273,7 +285,12 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
int slot = 0;
LogicalRepWorker *worker = NULL;
int nsyncworkers;
int nparallelapplyworkers;
TimestampTz now;
bool is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID);
/* Sanity check - tablesync worker cannot be a subworker */
Assert(!(is_parallel_apply_worker && OidIsValid(relid)));
ereport(DEBUG1,
(errmsg_internal("starting logical replication worker for subscription \"%s\"",
@ -351,7 +368,20 @@ retry:
if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription)
{
LWLockRelease(LogicalRepWorkerLock);
return;
return false;
}
nparallelapplyworkers = logicalrep_pa_worker_count(subid);
/*
* Return false if the number of parallel apply workers reached the limit
* per subscription.
*/
if (is_parallel_apply_worker &&
nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
{
LWLockRelease(LogicalRepWorkerLock);
return false;
}
/*
@ -365,7 +395,7 @@ retry:
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("out of logical replication worker slots"),
errhint("You might need to increase max_logical_replication_workers.")));
return;
return false;
}
/* Prepare the worker slot. */
@ -380,6 +410,8 @@ retry:
worker->relstate = SUBREL_STATE_UNKNOWN;
worker->relstate_lsn = InvalidXLogRecPtr;
worker->stream_fileset = NULL;
worker->apply_leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
worker->parallel_apply = is_parallel_apply_worker;
worker->last_lsn = InvalidXLogRecPtr;
TIMESTAMP_NOBEGIN(worker->last_send_time);
TIMESTAMP_NOBEGIN(worker->last_recv_time);
@ -397,19 +429,34 @@ retry:
BGWORKER_BACKEND_DATABASE_CONNECTION;
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
if (is_parallel_apply_worker)
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
else
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
if (OidIsValid(relid))
snprintf(bgw.bgw_name, BGW_MAXLEN,
"logical replication worker for subscription %u sync %u", subid, relid);
else if (is_parallel_apply_worker)
snprintf(bgw.bgw_name, BGW_MAXLEN,
"logical replication parallel apply worker for subscription %u", subid);
else
snprintf(bgw.bgw_name, BGW_MAXLEN,
"logical replication worker for subscription %u", subid);
"logical replication apply worker for subscription %u", subid);
if (is_parallel_apply_worker)
snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
else
snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker");
bgw.bgw_restart_time = BGW_NEVER_RESTART;
bgw.bgw_notify_pid = MyProcPid;
bgw.bgw_main_arg = Int32GetDatum(slot);
if (is_parallel_apply_worker)
memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
{
/* Failed to start worker, so clean up the worker slot. */
@ -422,33 +469,23 @@ retry:
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("out of background worker slots"),
errhint("You might need to increase max_worker_processes.")));
return;
return false;
}
/* Now wait until it attaches. */
WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
}
/*
* Stop the logical replication worker for subid/relid, if any, and wait until
* it detaches from the slot.
* Internal function to stop the worker and wait until it detaches from the
* slot.
*/
void
logicalrep_worker_stop(Oid subid, Oid relid)
static void
logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
{
LogicalRepWorker *worker;
uint16 generation;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
worker = logicalrep_worker_find(subid, relid, false);
/* No worker, nothing to do. */
if (!worker)
{
LWLockRelease(LogicalRepWorkerLock);
return;
}
Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
/*
* Remember which generation was our worker so we can check if what we see
@ -486,10 +523,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
* different, meaning that a different worker has taken the slot.
*/
if (!worker->in_use || worker->generation != generation)
{
LWLockRelease(LogicalRepWorkerLock);
return;
}
/* Worker has assigned proc, so it has started. */
if (worker->proc)
@ -497,7 +531,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
}
/* Now terminate the worker ... */
kill(worker->proc->pid, SIGTERM);
kill(worker->proc->pid, signo);
/* ... and wait for it to die. */
for (;;)
@ -523,6 +557,53 @@ logicalrep_worker_stop(Oid subid, Oid relid)
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
}
}
/*
* Stop the logical replication worker for subid/relid, if any.
*/
void
logicalrep_worker_stop(Oid subid, Oid relid)
{
LogicalRepWorker *worker;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
worker = logicalrep_worker_find(subid, relid, false);
if (worker)
{
Assert(!isParallelApplyWorker(worker));
logicalrep_worker_stop_internal(worker, SIGTERM);
}
LWLockRelease(LogicalRepWorkerLock);
}
/*
* Stop the logical replication parallel apply worker corresponding to the
* input slot number.
*
* Node that the function sends SIGINT instead of SIGTERM to the parallel apply
* worker so that the worker exits cleanly.
*/
void
logicalrep_pa_worker_stop(int slot_no, uint16 generation)
{
LogicalRepWorker *worker;
Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
worker = &LogicalRepCtx->workers[slot_no];
Assert(isParallelApplyWorker(worker));
/*
* Only stop the worker if the generation matches and the worker is alive.
*/
if (worker->generation == generation && worker->proc)
logicalrep_worker_stop_internal(worker, SIGINT);
LWLockRelease(LogicalRepWorkerLock);
}
@ -595,11 +676,40 @@ logicalrep_worker_attach(int slot)
}
/*
* Detach the worker (cleans up the worker info).
* Stop the parallel apply workers if any, and detach the leader apply worker
* (cleans up the worker info).
*/
static void
logicalrep_worker_detach(void)
{
/* Stop the parallel apply workers. */
if (am_leader_apply_worker())
{
List *workers;
ListCell *lc;
/*
* Detach from the error_mq_handle for all parallel apply workers
* before terminating them. This prevents the leader apply worker from
* receiving the worker termination message and sending it to logs
* when the same is already done by the parallel worker.
*/
pa_detach_all_error_mq();
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true);
foreach(lc, workers)
{
LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
if (isParallelApplyWorker(w))
logicalrep_worker_stop_internal(w, SIGTERM);
}
LWLockRelease(LogicalRepWorkerLock);
}
/* Block concurrent access. */
LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
@ -622,6 +732,8 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker)
worker->userid = InvalidOid;
worker->subid = InvalidOid;
worker->relid = InvalidOid;
worker->apply_leader_pid = InvalidPid;
worker->parallel_apply = false;
}
/*
@ -653,6 +765,13 @@ logicalrep_worker_onexit(int code, Datum arg)
if (MyLogicalRepWorker->stream_fileset != NULL)
FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
/*
* Session level locks may be acquired outside of a transaction in
* parallel apply mode and will not be released when the worker
* terminates, so manually release all locks before the worker exits.
*/
LockReleaseAll(DEFAULT_LOCKMETHOD, true);
ApplyLauncherWakeup();
}
@ -680,6 +799,33 @@ logicalrep_sync_worker_count(Oid subid)
return res;
}
/*
* Count the number of registered (but not necessarily running) parallel apply
* workers for a subscription.
*/
static int
logicalrep_pa_worker_count(Oid subid)
{
int i;
int res = 0;
Assert(LWLockHeldByMe(LogicalRepWorkerLock));
/*
* Scan all attached parallel apply workers, only counting those which
* have the given subscription id.
*/
for (i = 0; i < max_logical_replication_workers; i++)
{
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
if (w->subid == subid && isParallelApplyWorker(w))
res++;
}
return res;
}
/*
* ApplyLauncherShmemSize
* Compute space needed for replication launcher shared memory
@ -869,7 +1015,7 @@ ApplyLauncherMain(Datum main_arg)
wait_time = wal_retrieve_retry_interval;
logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
sub->owner, InvalidOid);
sub->owner, InvalidOid, DSM_HANDLE_INVALID);
}
}
@ -952,6 +1098,10 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
if (OidIsValid(subid) && worker.subid != subid)
continue;
/* Skip if this is a parallel apply worker */
if (isParallelApplyWorker(&worker))
continue;
worker_pid = worker.proc->pid;
values[0] = ObjectIdGetDatum(worker.subid);

@ -1,6 +1,7 @@
# Copyright (c) 2022-2023, PostgreSQL Global Development Group
backend_sources += files(
'applyparallelworker.c',
'decode.c',
'launcher.c',
'logical.c',

@ -1075,12 +1075,20 @@ ReplicationOriginExitCleanup(int code, Datum arg)
* array doesn't have to be searched when calling
* replorigin_session_advance().
*
* Obviously only one such cached origin can exist per process and the current
* cached value can only be set again after the previous value is torn down
* with replorigin_session_reset().
* Normally only one such cached origin can exist per process so the cached
* value can only be set again after the previous value is torn down with
* replorigin_session_reset(). For this normal case pass acquired_by = 0
* (meaning the slot is not allowed to be already acquired by another process).
*
* However, sometimes multiple processes can safely re-use the same origin slot
* (for example, multiple parallel apply processes can safely use the same
* origin, provided they maintain commit order by allowing only one process to
* commit at a time). For this case the first process must pass acquired_by =
* 0, and then the other processes sharing that same origin can pass
* acquired_by = PID of the first process.
*/
void
replorigin_session_setup(RepOriginId node)
replorigin_session_setup(RepOriginId node, int acquired_by)
{
static bool registered_cleanup;
int i;
@ -1122,7 +1130,7 @@ replorigin_session_setup(RepOriginId node)
if (curstate->roident != node)
continue;
else if (curstate->acquired_by != 0)
else if (curstate->acquired_by != 0 && acquired_by == 0)
{
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
@ -1153,7 +1161,11 @@ replorigin_session_setup(RepOriginId node)
Assert(session_replication_state->roident != InvalidRepOriginId);
if (acquired_by == 0)
session_replication_state->acquired_by = MyProcPid;
else if (session_replication_state->acquired_by != acquired_by)
elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
node, acquired_by);
LWLockRelease(ReplicationOriginLock);
@ -1337,7 +1349,7 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
origin = replorigin_by_name(name, false);
replorigin_session_setup(origin);
replorigin_session_setup(origin, 0);
replorigin_session_origin = origin;

@ -1164,10 +1164,14 @@ logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
/*
* Write STREAM ABORT to the output stream. Note that xid and subxid will be
* same for the top-level transaction abort.
*
* If write_abort_info is true, send the abort_lsn and abort_time fields,
* otherwise don't.
*/
void
logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
TransactionId subxid)
TransactionId subxid, XLogRecPtr abort_lsn,
TimestampTz abort_time, bool write_abort_info)
{
pq_sendbyte(out, LOGICAL_REP_MSG_STREAM_ABORT);
@ -1176,19 +1180,40 @@ logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
/* transaction ID */
pq_sendint32(out, xid);
pq_sendint32(out, subxid);
if (write_abort_info)
{
pq_sendint64(out, abort_lsn);
pq_sendint64(out, abort_time);
}
}
/*
* Read STREAM ABORT from the output stream.
*
* If read_abort_info is true, read the abort_lsn and abort_time fields,
* otherwise don't.
*/
void
logicalrep_read_stream_abort(StringInfo in, TransactionId *xid,
TransactionId *subxid)
logicalrep_read_stream_abort(StringInfo in,
LogicalRepStreamAbortData *abort_data,
bool read_abort_info)
{
Assert(xid && subxid);
Assert(abort_data);
abort_data->xid = pq_getmsgint(in, 4);
abort_data->subxid = pq_getmsgint(in, 4);
*xid = pq_getmsgint(in, 4);
*subxid = pq_getmsgint(in, 4);
if (read_abort_info)
{
abort_data->abort_lsn = pq_getmsgint64(in);
abort_data->abort_time = pq_getmsgint64(in);
}
else
{
abort_data->abort_lsn = InvalidXLogRecPtr;
abort_data->abort_time = 0;
}
}
/*

@ -2873,7 +2873,8 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
* disk.
*/
void
ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
TimestampTz abort_time)
{
ReorderBufferTXN *txn;
@ -2884,6 +2885,8 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
if (txn == NULL)
return;
txn->xact_time.abort_time = abort_time;
/* For streamed transactions notify the remote node about the abort. */
if (rbtxn_is_streamed(txn))
{

@ -14,7 +14,7 @@
* The initial data synchronization is done separately for each table,
* in a separate apply worker that only fetches the initial snapshot data
* from the publisher and then synchronizes the position in the stream with
* the main apply worker.
* the leader apply worker.
*
* There are several reasons for doing the synchronization this way:
* - It allows us to parallelize the initial data synchronization
@ -153,7 +153,7 @@ finish_sync_worker(void)
get_rel_name(MyLogicalRepWorker->relid))));
CommitTransactionCommand();
/* Find the main apply worker and signal it. */
/* Find the leader apply worker and signal it. */
logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
/* Stop gracefully */
@ -588,7 +588,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
MySubscription->oid,
MySubscription->name,
MyLogicalRepWorker->userid,
rstate->relid);
rstate->relid,
DSM_HANDLE_INVALID);
hentry->last_start_time = now;
}
}
@ -636,6 +637,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
void
process_syncing_tables(XLogRecPtr current_lsn)
{
/*
* Skip for parallel apply workers because they only operate on tables
* that are in a READY state. See pa_can_start() and
* should_apply_changes_for_rel().
*/
if (am_parallel_apply_worker())
return;
if (am_tablesync_worker())
process_syncing_tables_for_sync(current_lsn);
else
@ -1254,7 +1263,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
/*
* Here we use the slot name instead of the subscription name as the
* application_name, so that it is different from the main apply worker,
* application_name, so that it is different from the leader apply worker,
* so that synchronous replication can distinguish them.
*/
LogRepWorkerWalRcvConn =
@ -1302,7 +1311,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* time this tablesync was launched.
*/
originid = replorigin_by_name(originname, false);
replorigin_session_setup(originid);
replorigin_session_setup(originid, 0);
replorigin_session_origin = originid;
*origin_startpos = replorigin_session_get_progress(false);
@ -1413,7 +1422,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
true /* go backward */ , true /* WAL log */ );
UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
replorigin_session_setup(originid);
replorigin_session_setup(originid, 0);
replorigin_session_origin = originid;
}
else
@ -1468,8 +1477,8 @@ copy_table_done:
SpinLockRelease(&MyLogicalRepWorker->relmutex);
/*
* Finally, wait until the main apply worker tells us to catch up and then
* return to let LogicalRepApplyLoop do it.
* Finally, wait until the leader apply worker tells us to catch up and
* then return to let LogicalRepApplyLoop do it.
*/
wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
return slotname;

File diff suppressed because it is too large Load Diff

@ -18,6 +18,7 @@
#include "catalog/pg_publication_rel.h"
#include "catalog/pg_subscription.h"
#include "commands/defrem.h"
#include "commands/subscriptioncmds.h"
#include "executor/executor.h"
#include "fmgr.h"
#include "nodes/makefuncs.h"
@ -290,7 +291,7 @@ parse_output_parameters(List *options, PGOutputData *data)
bool origin_option_given = false;
data->binary = false;
data->streaming = false;
data->streaming = LOGICALREP_STREAM_OFF;
data->messages = false;
data->two_phase = false;
@ -369,7 +370,7 @@ parse_output_parameters(List *options, PGOutputData *data)
errmsg("conflicting or redundant options")));
streaming_given = true;
data->streaming = defGetBoolean(defel);
data->streaming = defGetStreamingMode(defel);
}
else if (strcmp(defel->defname, "two_phase") == 0)
{
@ -461,13 +462,20 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
* we only allow it with sufficient version of the protocol, and when
* the output plugin supports it.
*/
if (!data->streaming)
if (data->streaming == LOGICALREP_STREAM_OFF)
ctx->streaming = false;
else if (data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
else if (data->streaming == LOGICALREP_STREAM_ON &&
data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("requested proto_version=%d does not support streaming, need %d or higher",
data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));
else if (!ctx->streaming)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
@ -1841,6 +1849,8 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
XLogRecPtr abort_lsn)
{
ReorderBufferTXN *toptxn;
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
/*
* The abort should happen outside streaming block, even for streamed
@ -1854,7 +1864,9 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
Assert(rbtxn_is_streamed(toptxn));
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid);
logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
txn->xact_time.abort_time, write_abort_info);
OutputPluginWrite(ctx, true);
cleanup_rel_sync_cache(toptxn->xid, false);

@ -22,6 +22,7 @@
#include "commands/async.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logicalworker.h"
#include "replication/walsender.h"
#include "storage/condition_variable.h"
#include "storage/ipc.h"
@ -657,6 +658,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
if (CheckProcSignal(PROCSIG_LOG_MEMORY_CONTEXT))
HandleLogMemoryContextInterrupt();
if (CheckProcSignal(PROCSIG_PARALLEL_APPLY_MESSAGE))
HandleParallelApplyMessageInterrupt();
if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_DATABASE))
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_DATABASE);

@ -1117,6 +1117,45 @@ UnlockSharedObjectForSession(Oid classid, Oid objid, uint16 objsubid,
LockRelease(&tag, lockmode, true);
}
/*
* LockApplyTransactionForSession
*
* Obtain a session-level lock on a transaction being applied on a logical
* replication subscriber. See LockRelationIdForSession for notes about
* session-level locks.
*/
void
LockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid,
LOCKMODE lockmode)
{
LOCKTAG tag;
SET_LOCKTAG_APPLY_TRANSACTION(tag,
MyDatabaseId,
suboid,
xid,
objid);
(void) LockAcquire(&tag, lockmode, true, false);
}
/*
* UnlockApplyTransactionForSession
*/
void
UnlockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid,
LOCKMODE lockmode)
{
LOCKTAG tag;
SET_LOCKTAG_APPLY_TRANSACTION(tag,
MyDatabaseId,
suboid,
xid,
objid);
LockRelease(&tag, lockmode, true);
}
/*
* Append a description of a lockable object to buf.
@ -1202,6 +1241,13 @@ DescribeLockTag(StringInfo buf, const LOCKTAG *tag)
tag->locktag_field3,
tag->locktag_field4);
break;
case LOCKTAG_APPLY_TRANSACTION:
appendStringInfo(buf,
_("remote transaction %u of subscription %u of database %u"),
tag->locktag_field3,
tag->locktag_field2,
tag->locktag_field1);
break;
default:
appendStringInfo(buf,
_("unrecognized locktag type %d"),

@ -3391,6 +3391,9 @@ ProcessInterrupts(void)
if (LogMemoryContextPending)
ProcessLogMemoryContextInterrupt();
if (ParallelApplyMessagePending)
HandleParallelApplyMessages();
}
/*

@ -230,6 +230,9 @@ pgstat_get_wait_activity(WaitEventActivity w)
case WAIT_EVENT_LOGICAL_LAUNCHER_MAIN:
event_name = "LogicalLauncherMain";
break;
case WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN:
event_name = "LogicalParallelApplyMain";
break;
case WAIT_EVENT_RECOVERY_WAL_STREAM:
event_name = "RecoveryWalStream";
break;
@ -388,6 +391,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
case WAIT_EVENT_HASH_GROW_BUCKETS_REINSERT:
event_name = "HashGrowBucketsReinsert";
break;
case WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE:
event_name = "LogicalParallelApplyStateChange";
break;
case WAIT_EVENT_LOGICAL_SYNC_DATA:
event_name = "LogicalSyncData";
break;

@ -37,10 +37,11 @@ const char *const LockTagTypeNames[] = {
"spectoken",
"object",
"userlock",
"advisory"
"advisory",
"applytransaction"
};
StaticAssertDecl(lengthof(LockTagTypeNames) == (LOCKTAG_ADVISORY + 1),
StaticAssertDecl(lengthof(LockTagTypeNames) == (LOCKTAG_LAST_TYPE + 1),
"array length mismatch");
/* This must match enum PredicateLockTargetType (predicate_internals.h) */
@ -312,6 +313,17 @@ pg_lock_status(PG_FUNCTION_ARGS)
nulls[8] = true;
nulls[9] = true;
break;
case LOCKTAG_APPLY_TRANSACTION:
values[1] = ObjectIdGetDatum(instance->locktag.locktag_field1);
values[8] = ObjectIdGetDatum(instance->locktag.locktag_field2);
values[6] = ObjectIdGetDatum(instance->locktag.locktag_field3);
values[9] = Int16GetDatum(instance->locktag.locktag_field4);
nulls[2] = true;
nulls[3] = true;
nulls[4] = true;
nulls[5] = true;
nulls[7] = true;
break;
case LOCKTAG_OBJECT:
case LOCKTAG_USERLOCK:
case LOCKTAG_ADVISORY:

@ -3002,6 +3002,18 @@ struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},
{
{"max_parallel_apply_workers_per_subscription",
PGC_SIGHUP,
REPLICATION_SUBSCRIBERS,
gettext_noop("Maximum number of parallel apply workers per subscription."),
NULL,
},
&max_parallel_apply_workers_per_subscription,
2, 0, MAX_PARALLEL_WORKER_LIMIT,
NULL, NULL, NULL
},
{
{"log_rotation_age", PGC_SIGHUP, LOGGING_WHERE,
gettext_noop("Sets the amount of time to wait before forcing "

@ -359,6 +359,7 @@
#max_logical_replication_workers = 4 # taken from max_worker_processes
# (change requires restart)
#max_sync_workers_per_subscription = 2 # taken from max_logical_replication_workers
#max_parallel_apply_workers_per_subscription = 2 # taken from max_logical_replication_workers
#------------------------------------------------------------------------------

@ -4533,7 +4533,7 @@ getSubscriptions(Archive *fout)
if (fout->remoteVersion >= 140000)
appendPQExpBufferStr(query, " s.substream,\n");
else
appendPQExpBufferStr(query, " false AS substream,\n");
appendPQExpBufferStr(query, " 'f' AS substream,\n");
if (fout->remoteVersion >= 150000)
appendPQExpBufferStr(query,
@ -4670,8 +4670,10 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
if (strcmp(subinfo->subbinary, "t") == 0)
appendPQExpBufferStr(query, ", binary = true");
if (strcmp(subinfo->substream, "f") != 0)
if (strcmp(subinfo->substream, "t") == 0)
appendPQExpBufferStr(query, ", streaming = on");
else if (strcmp(subinfo->substream, "p") == 0)
appendPQExpBufferStr(query, ", streaming = parallel");
if (strcmp(subinfo->subtwophasestate, two_phase_disabled) != 0)
appendPQExpBufferStr(query, ", two_phase = on");

@ -6500,11 +6500,24 @@ describeSubscriptions(const char *pattern, bool verbose)
{
/* Binary mode and streaming are only supported in v14 and higher */
if (pset.sversion >= 140000)
{
appendPQExpBuffer(&buf,
", subbinary AS \"%s\"\n",
gettext_noop("Binary"));
if (pset.sversion >= 160000)
appendPQExpBuffer(&buf,
", (CASE substream\n"
" WHEN 'f' THEN 'off'\n"
" WHEN 't' THEN 'on'\n"
" WHEN 'p' THEN 'parallel'\n"
" END) AS \"%s\"\n",
gettext_noop("Streaming"));
else
appendPQExpBuffer(&buf,
", subbinary AS \"%s\"\n"
", substream AS \"%s\"\n",
gettext_noop("Binary"),
gettext_noop("Streaming"));
}
/* Two_phase and disable_on_error are only supported in v15 and higher */
if (pset.sversion >= 150000)

@ -57,6 +57,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 202212241
#define CATALOG_VERSION_NO 202301091
#endif

@ -80,7 +80,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
bool subbinary; /* True if the subscription wants the
* publisher to send data in binary */
bool substream; /* Stream in-progress transactions. */
char substream; /* Stream in-progress transactions. See
* LOGICALREP_STREAM_xxx constants. */
char subtwophasestate; /* Stream two-phase transactions */
@ -124,7 +125,8 @@ typedef struct Subscription
bool enabled; /* Indicates if the subscription is enabled */
bool binary; /* Indicates if the subscription wants data in
* binary format */
bool stream; /* Allow streaming in-progress transactions. */
char stream; /* Allow streaming in-progress transactions.
* See LOGICALREP_STREAM_xxx constants. */
char twophasestate; /* Allow streaming two-phase transactions */
bool disableonerr; /* Indicates if the subscription should be
* automatically disabled if a worker error
@ -137,6 +139,21 @@ typedef struct Subscription
* specified origin */
} Subscription;
/* Disallow streaming in-progress transactions. */
#define LOGICALREP_STREAM_OFF 'f'
/*
* Streaming in-progress transactions are written to a temporary file and
* applied only after the transaction is committed on upstream.
*/
#define LOGICALREP_STREAM_ON 't'
/*
* Streaming in-progress transactions are applied immediately via a parallel
* apply worker.
*/
#define LOGICALREP_STREAM_PARALLEL 'p'
extern Subscription *GetSubscription(Oid subid, bool missing_ok);
extern void FreeSubscription(Subscription *sub);
extern void DisableSubscription(Oid subid);

@ -26,4 +26,6 @@ extern void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel);
extern ObjectAddress AlterSubscriptionOwner(const char *name, Oid newOwnerId);
extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId);
extern char defGetStreamingMode(DefElem *def);
#endif /* SUBSCRIPTIONCMDS_H */

@ -14,6 +14,7 @@
extern PGDLLIMPORT int max_logical_replication_workers;
extern PGDLLIMPORT int max_sync_workers_per_subscription;
extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription;
extern void ApplyLauncherRegister(void);
extern void ApplyLauncherMain(Datum main_arg);

@ -32,12 +32,17 @@
*
* LOGICALREP_PROTO_TWOPHASE_VERSION_NUM is the minimum protocol version with
* support for two-phase commit decoding (at prepare time). Introduced in PG15.
*
* LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM is the minimum protocol version
* where we support applying large streaming transactions in parallel.
* Introduced in PG16.
*/
#define LOGICALREP_PROTO_MIN_VERSION_NUM 1
#define LOGICALREP_PROTO_VERSION_NUM 1
#define LOGICALREP_PROTO_STREAM_VERSION_NUM 2
#define LOGICALREP_PROTO_TWOPHASE_VERSION_NUM 3
#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_TWOPHASE_VERSION_NUM
#define LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM 4
#define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM
/*
* Logical message types
@ -175,6 +180,17 @@ typedef struct LogicalRepRollbackPreparedTxnData
char gid[GIDSIZE];
} LogicalRepRollbackPreparedTxnData;
/*
* Transaction protocol information for stream abort.
*/
typedef struct LogicalRepStreamAbortData
{
TransactionId xid;
TransactionId subxid;
XLogRecPtr abort_lsn;
TimestampTz abort_time;
} LogicalRepStreamAbortData;
extern void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn);
extern void logicalrep_read_begin(StringInfo in,
LogicalRepBeginData *begin_data);
@ -246,9 +262,13 @@ extern void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn
extern TransactionId logicalrep_read_stream_commit(StringInfo in,
LogicalRepCommitData *commit_data);
extern void logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
TransactionId subxid);
extern void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid,
TransactionId *subxid);
TransactionId subxid,
XLogRecPtr abort_lsn,
TimestampTz abort_time,
bool write_abort_info);
extern void logicalrep_read_stream_abort(StringInfo in,
LogicalRepStreamAbortData *abort_data,
bool read_abort_info);
extern char *logicalrep_message_type(LogicalRepMsgType action);
#endif /* LOGICAL_PROTO_H */

@ -12,9 +12,18 @@
#ifndef LOGICALWORKER_H
#define LOGICALWORKER_H
#include <signal.h>
extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending;
extern void ApplyWorkerMain(Datum main_arg);
extern void ParallelApplyWorkerMain(Datum main_arg);
extern bool IsLogicalWorker(void);
extern bool IsLogicalParallelApplyWorker(void);
extern void HandleParallelApplyMessageInterrupt(void);
extern void HandleParallelApplyMessages(void);
extern void LogicalRepWorkersWakeupAtCommit(Oid subid);

@ -53,7 +53,7 @@ extern XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush);
extern void replorigin_session_advance(XLogRecPtr remote_commit,
XLogRecPtr local_commit);
extern void replorigin_session_setup(RepOriginId node);
extern void replorigin_session_setup(RepOriginId node, int acquired_by);
extern void replorigin_session_reset(void);
extern XLogRecPtr replorigin_session_get_progress(bool flush);

@ -26,7 +26,7 @@ typedef struct PGOutputData
List *publication_names;
List *publications;
bool binary;
bool streaming;
char streaming;
bool messages;
bool two_phase;
char *origin;

@ -316,6 +316,7 @@ typedef struct ReorderBufferTXN
{
TimestampTz commit_time;
TimestampTz prepare_time;
TimestampTz abort_time;
} xact_time;
/*
@ -678,7 +679,8 @@ extern void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
extern void ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
TransactionId subxid, XLogRecPtr commit_lsn,
XLogRecPtr end_lsn);
extern void ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn);
extern void ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
TimestampTz abort_time);
extern void ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid);
extern void ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn);
extern void ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn);

@ -182,7 +182,7 @@ typedef struct
uint32 proto_version; /* Logical protocol version */
List *publication_names; /* String list of publications */
bool binary; /* Ask publisher to use binary */
bool streaming; /* Streaming of large transactions */
char *streaming_str; /* Streaming of large transactions */
bool twophase; /* Streaming of two-phase transactions at
* prepare time */
char *origin; /* Only publish data originating from the

@ -17,8 +17,13 @@
#include "access/xlogdefs.h"
#include "catalog/pg_subscription.h"
#include "datatype/timestamp.h"
#include "miscadmin.h"
#include "replication/logicalrelation.h"
#include "storage/buffile.h"
#include "storage/fileset.h"
#include "storage/lock.h"
#include "storage/shm_mq.h"
#include "storage/shm_toc.h"
#include "storage/spin.h"
@ -53,13 +58,24 @@ typedef struct LogicalRepWorker
/*
* Used to create the changes and subxact files for the streaming
* transactions. Upon the arrival of the first streaming transaction, the
* fileset will be initialized, and it will be deleted when the worker
* exits. Under this, separate buffiles would be created for each
* transaction which will be deleted after the transaction is finished.
* transactions. Upon the arrival of the first streaming transaction or
* when the first-time leader apply worker times out while sending changes
* to the parallel apply worker, the fileset will be initialized, and it
* will be deleted when the worker exits. Under this, separate buffiles
* would be created for each transaction which will be deleted after the
* transaction is finished.
*/
FileSet *stream_fileset;
/*
* PID of leader apply worker if this slot is used for a parallel apply
* worker, InvalidPid otherwise.
*/
pid_t apply_leader_pid;
/* Indicates whether apply can be performed in parallel. */
bool parallel_apply;
/* Stats. */
XLogRecPtr last_lsn;
TimestampTz last_send_time;
@ -68,9 +84,138 @@ typedef struct LogicalRepWorker
TimestampTz reply_time;
} LogicalRepWorker;
/*
* State of the transaction in parallel apply worker.
*
* The enum values must have the same order as the transaction state
* transitions.
*/
typedef enum ParallelTransState
{
PARALLEL_TRANS_UNKNOWN,
PARALLEL_TRANS_STARTED,
PARALLEL_TRANS_FINISHED
} ParallelTransState;
/*
* State of fileset used to communicate changes from leader to parallel
* apply worker.
*
* FS_EMPTY indicates an initial state where the leader doesn't need to use
* the file to communicate with the parallel apply worker.
*
* FS_SERIALIZE_IN_PROGRESS indicates that the leader is serializing changes
* to the file.
*
* FS_SERIALIZE_DONE indicates that the leader has serialized all changes to
* the file.
*
* FS_READY indicates that it is now ok for a parallel apply worker to
* read the file.
*/
typedef enum PartialFileSetState
{
FS_EMPTY,
FS_SERIALIZE_IN_PROGRESS,
FS_SERIALIZE_DONE,
FS_READY
} PartialFileSetState;
/*
* Struct for sharing information between leader apply worker and parallel
* apply workers.
*/
typedef struct ParallelApplyWorkerShared
{
slock_t mutex;
TransactionId xid;
/*
* State used to ensure commit ordering.
*
* The parallel apply worker will set it to PARALLEL_TRANS_FINISHED after
* handling the transaction finish commands while the apply leader will
* wait for it to become PARALLEL_TRANS_FINISHED before proceeding in
* transaction finish commands (e.g. STREAM_COMMIT/STREAM_PREPARE/
* STREAM_ABORT).
*/
ParallelTransState xact_state;
/* Information from the corresponding LogicalRepWorker slot. */
uint16 logicalrep_worker_generation;
int logicalrep_worker_slot_no;
/*
* Indicates whether there are pending streaming blocks in the queue. The
* parallel apply worker will check it before starting to wait.
*/
pg_atomic_uint32 pending_stream_count;
/*
* XactLastCommitEnd from the parallel apply worker. This is required by
* the leader worker so it can update the lsn_mappings.
*/
XLogRecPtr last_commit_end;
/*
* After entering PARTIAL_SERIALIZE mode, the leader apply worker will
* serialize changes to the file, and share the fileset with the parallel
* apply worker when processing the transaction finish command. Then the
* parallel apply worker will apply all the spooled messages.
*
* FileSet is used here instead of SharedFileSet because we need it to
* survive after releasing the shared memory so that the leader apply
* worker can re-use the same fileset for the next streaming transaction.
*/
PartialFileSetState fileset_state;
FileSet fileset;
} ParallelApplyWorkerShared;
/*
* Information which is used to manage the parallel apply worker.
*/
typedef struct ParallelApplyWorkerInfo
{
/*
* This queue is used to send changes from the leader apply worker to the
* parallel apply worker.
*/
shm_mq_handle *mq_handle;
/*
* This queue is used to transfer error messages from the parallel apply
* worker to the leader apply worker.
*/
shm_mq_handle *error_mq_handle;
dsm_segment *dsm_seg;
/*
* Indicates whether the leader apply worker needs to serialize the
* remaining changes to a file due to timeout when attempting to send data
* to the parallel apply worker via shared memory.
*/
bool serialize_changes;
/*
* True if the worker is being used to process a parallel apply
* transaction. False indicates this worker is available for re-use.
*/
bool in_use;
ParallelApplyWorkerShared *shared;
} ParallelApplyWorkerInfo;
/* Main memory context for apply worker. Permanent during worker lifetime. */
extern PGDLLIMPORT MemoryContext ApplyContext;
extern PGDLLIMPORT MemoryContext ApplyMessageContext;
extern PGDLLIMPORT ErrorContextCallback *apply_error_context_stack;
extern PGDLLIMPORT ParallelApplyWorkerShared *MyParallelShared;
/* libpqreceiver connection */
extern PGDLLIMPORT struct WalReceiverConn *LogRepWorkerWalRcvConn;
@ -84,9 +229,11 @@ extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
bool only_running);
extern List *logicalrep_workers_find(Oid subid, bool only_running);
extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
Oid userid, Oid relid);
extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
Oid userid, Oid relid,
dsm_handle subworker_dsm);
extern void logicalrep_worker_stop(Oid subid, Oid relid);
extern void logicalrep_pa_worker_stop(int slot_no, uint16 generation);
extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
@ -103,10 +250,78 @@ extern void process_syncing_tables(XLogRecPtr current_lsn);
extern void invalidate_syncing_table_states(Datum arg, int cacheid,
uint32 hashvalue);
extern void stream_start_internal(TransactionId xid, bool first_segment);
extern void stream_stop_internal(TransactionId xid);
/* Common streaming function to apply all the spooled messages */
extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
XLogRecPtr lsn);
extern void apply_dispatch(StringInfo s);
extern void maybe_reread_subscription(void);
extern void stream_cleanup_files(Oid subid, TransactionId xid);
extern void InitializeApplyWorker(void);
extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn);
/* Function for apply error callback */
extern void apply_error_callback(void *arg);
extern void set_apply_error_context_origin(char *originname);
/* Parallel apply worker setup and interactions */
extern void pa_allocate_worker(TransactionId xid);
extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid);
extern void pa_detach_all_error_mq(void);
extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes,
const void *data);
extern void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
bool stream_locked);
extern void pa_set_xact_state(ParallelApplyWorkerShared *wshared,
ParallelTransState in_xact);
extern void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo);
extern void pa_start_subtrans(TransactionId current_xid,
TransactionId top_xid);
extern void pa_reset_subtrans(void);
extern void pa_stream_abort(LogicalRepStreamAbortData *abort_data);
extern void pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
PartialFileSetState fileset_state);
extern void pa_lock_stream(TransactionId xid, LOCKMODE lockmode);
extern void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode);
extern void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode);
extern void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode);
extern void pa_decr_and_wait_stream_block(void);
extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
XLogRecPtr remote_lsn);
#define isParallelApplyWorker(worker) ((worker)->apply_leader_pid != InvalidPid)
static inline bool
am_tablesync_worker(void)
{
return OidIsValid(MyLogicalRepWorker->relid);
}
static inline bool
am_leader_apply_worker(void)
{
return (!am_tablesync_worker() &&
!isParallelApplyWorker(MyLogicalRepWorker));
}
static inline bool
am_parallel_apply_worker(void)
{
return isParallelApplyWorker(MyLogicalRepWorker);
}
#endif /* WORKER_INTERNAL_H */

@ -107,6 +107,11 @@ extern void LockSharedObjectForSession(Oid classid, Oid objid, uint16 objsubid,
extern void UnlockSharedObjectForSession(Oid classid, Oid objid, uint16 objsubid,
LOCKMODE lockmode);
extern void LockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid,
LOCKMODE lockmode);
extern void UnlockApplyTransactionForSession(Oid suboid, TransactionId xid, uint16 objid,
LOCKMODE lockmode);
/* Describe a locktag for error messages */
extern void DescribeLockTag(StringInfo buf, const LOCKTAG *tag);

@ -149,10 +149,12 @@ typedef enum LockTagType
LOCKTAG_SPECULATIVE_TOKEN, /* speculative insertion Xid and token */
LOCKTAG_OBJECT, /* non-relation database object */
LOCKTAG_USERLOCK, /* reserved for old contrib/userlock code */
LOCKTAG_ADVISORY /* advisory user locks */
LOCKTAG_ADVISORY, /* advisory user locks */
LOCKTAG_APPLY_TRANSACTION /* transaction being applied on a logical
* replication subscriber */
} LockTagType;
#define LOCKTAG_LAST_TYPE LOCKTAG_ADVISORY
#define LOCKTAG_LAST_TYPE LOCKTAG_APPLY_TRANSACTION
extern PGDLLIMPORT const char *const LockTagTypeNames[];
@ -278,6 +280,17 @@ typedef struct LOCKTAG
(locktag).locktag_type = LOCKTAG_ADVISORY, \
(locktag).locktag_lockmethodid = USER_LOCKMETHOD)
/*
* ID info for a remote transaction on a logical replication subscriber is: DB
* OID + SUBSCRIPTION OID + TRANSACTION ID + OBJID
*/
#define SET_LOCKTAG_APPLY_TRANSACTION(locktag,dboid,suboid,xid,objid) \
((locktag).locktag_field1 = (dboid), \
(locktag).locktag_field2 = (suboid), \
(locktag).locktag_field3 = (xid), \
(locktag).locktag_field4 = (objid), \
(locktag).locktag_type = LOCKTAG_APPLY_TRANSACTION, \
(locktag).locktag_lockmethodid = DEFAULT_LOCKMETHOD)
/*
* Per-locked-object lock information:

@ -35,6 +35,7 @@ typedef enum
PROCSIG_WALSND_INIT_STOPPING, /* ask walsenders to prepare for shutdown */
PROCSIG_BARRIER, /* global barrier interrupt */
PROCSIG_LOG_MEMORY_CONTEXT, /* ask backend to log the memory contexts */
PROCSIG_PARALLEL_APPLY_MESSAGE, /* Message from parallel apply workers */
/* Recovery conflict reasons */
PROCSIG_RECOVERY_CONFLICT_DATABASE,

@ -42,6 +42,7 @@ typedef enum
WAIT_EVENT_CHECKPOINTER_MAIN,
WAIT_EVENT_LOGICAL_APPLY_MAIN,
WAIT_EVENT_LOGICAL_LAUNCHER_MAIN,
WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN,
WAIT_EVENT_RECOVERY_WAL_STREAM,
WAIT_EVENT_SYSLOGGER_MAIN,
WAIT_EVENT_WAL_RECEIVER_MAIN,
@ -105,6 +106,7 @@ typedef enum
WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATE,
WAIT_EVENT_HASH_GROW_BUCKETS_ELECT,
WAIT_EVENT_HASH_GROW_BUCKETS_REINSERT,
WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE,
WAIT_EVENT_LOGICAL_SYNC_DATA,
WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE,
WAIT_EVENT_MQ_INTERNAL,

@ -117,7 +117,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub4 | regress_subscription_user | f | {testpub} | f | f | d | f | none | off | dbname=regress_doesnotexist | 0/0
regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
@ -125,7 +125,7 @@ ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub4 | regress_subscription_user | f | {testpub} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0
regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
DROP SUBSCRIPTION regress_testsub3;
@ -138,7 +138,7 @@ ERROR: invalid connection string syntax: missing "=" after "foobar" in connecti
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0
regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@ -158,7 +158,7 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | any | off | dbname=regress_doesnotexist2 | 0/12345
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | off | dbname=regress_doesnotexist2 | 0/12345
(1 row)
-- ok - with lsn = NONE
@ -170,7 +170,7 @@ ERROR: invalid WAL location (LSN): 0/0
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | any | off | dbname=regress_doesnotexist2 | 0/0
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | off | dbname=regress_doesnotexist2 | 0/0
(1 row)
BEGIN;
@ -205,7 +205,7 @@ HINT: Available values: local, remote_write, remote_apply, on, off.
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+----------
regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | any | local | dbname=regress_doesnotexist2 | 0/0
regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | local | dbname=regress_doesnotexist2 | 0/0
(1 row)
-- rename back to keep the rest simple
@ -242,7 +242,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | f | any | off | dbname=regress_doesnotexist | 0/0
regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (binary = false);
@ -251,13 +251,13 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0
regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
DROP SUBSCRIPTION regress_testsub;
-- fail - streaming must be boolean
-- fail - streaming must be boolean or 'parallel'
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = foo);
ERROR: streaming requires a Boolean value
ERROR: streaming requires a Boolean value or "parallel"
-- now it works
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true);
WARNING: subscription was created, but is not connected
@ -266,7 +266,15 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | f | any | off | dbname=regress_doesnotexist | 0/0
regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
@ -275,7 +283,7 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0
regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
-- fail - publication already exists
@ -293,7 +301,7 @@ ERROR: publication "testpub1" is already in subscription "regress_testsub"
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0
regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
-- fail - publication used more then once
@ -311,7 +319,7 @@ ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (ref
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0
regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
DROP SUBSCRIPTION regress_testsub;
@ -350,7 +358,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | f | any | off | dbname=regress_doesnotexist | 0/0
regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
--fail - alter of two_phase option not supported.
@ -362,7 +370,7 @@ ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | any | off | dbname=regress_doesnotexist | 0/0
regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@ -375,7 +383,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | any | off | dbname=regress_doesnotexist | 0/0
regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@ -391,7 +399,7 @@ HINT: To initiate replication, you must manually create the replication slot, e
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0
regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
@ -399,7 +407,7 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+----------
regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | t | any | off | dbname=regress_doesnotexist | 0/0
regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | off | dbname=regress_doesnotexist | 0/0
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);

@ -167,7 +167,7 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
DROP SUBSCRIPTION regress_testsub;
-- fail - streaming must be boolean
-- fail - streaming must be boolean or 'parallel'
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = foo);
-- now it works
@ -175,6 +175,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
\dRs+
ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
\dRs+
ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);

@ -8,56 +8,39 @@ use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;
# Create publisher node
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->append_conf('postgresql.conf',
'logical_decoding_work_mem = 64kB');
$node_publisher->start;
# Create subscriber node
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init(allows_streaming => 'logical');
$node_subscriber->start;
# Create some preexisting content on publisher
$node_publisher->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b varchar)");
$node_publisher->safe_psql('postgres',
"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
# Setup structure on subscriber
$node_subscriber->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"
);
# Setup logical replication
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub FOR TABLE test_tab");
my $appname = 'tap_sub';
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
);
# Check that the parallel apply worker has finished applying the streaming
# transaction.
sub check_parallel_log
{
my ($node_subscriber, $offset, $is_parallel, $type) = @_;
# Wait for initial table sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
if ($is_parallel)
{
$node_subscriber->wait_for_log(
qr/DEBUG: ( [A-Z0-9]+:)? finished processing the STREAM $type command/,
$offset);
}
}
my $result =
$node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(2|2|2), 'check initial data was copied to subscriber');
# Common test steps for both the streaming=on and streaming=parallel cases.
sub test_streaming
{
my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_;
# Interleave a pair of transactions, each exceeding the 64kB limit.
my $in = '';
my $out = '';
my $offset = 0;
my $timer = IPC::Run::timeout($PostgreSQL::Test::Utils::timeout_default);
my $h = $node_publisher->background_psql('postgres', \$in, \$out, $timer,
on_error_stop => 0);
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
$in .= q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
@ -82,15 +65,21 @@ $h->finish; # errors make the next test fail, so ignore them here
$node_publisher->wait_for_catchup($appname);
$result =
check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT');
my $result =
$node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(3334|3334|3334), 'check extra columns contain local defaults');
is($result, qq(3334|3334|3334),
'check extra columns contain local defaults');
# Test the streaming in binary mode
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub SET (binary = on)");
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
# Insert, update and delete enough rows to exceed the 64kB limit.
$node_publisher->safe_psql(
'postgres', q{
@ -103,10 +92,13 @@ COMMIT;
$node_publisher->wait_for_catchup($appname);
check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT');
$result =
$node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(6667|6667|6667), 'check extra columns contain local defaults');
is($result, qq(6667|6667|6667),
'check extra columns contain local defaults');
# Change the local values of the extra columns on the subscriber,
# update publisher, and check that subscriber retains the expected
@ -115,17 +107,211 @@ is($result, qq(6667|6667|6667), 'check extra columns contain local defaults');
$node_subscriber->safe_psql('postgres',
"UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'"
);
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
$node_publisher->safe_psql('postgres',
"UPDATE test_tab SET b = md5(a::text)");
$node_publisher->wait_for_catchup($appname);
check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT');
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab"
);
is($result, qq(6667|6667|6667),
'check extra columns contain locally changed data');
# Cleanup the test data
$node_publisher->safe_psql('postgres',
"DELETE FROM test_tab WHERE (a > 2)");
$node_publisher->wait_for_catchup($appname);
}
# Create publisher node
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->append_conf('postgresql.conf',
'logical_decoding_work_mem = 64kB');
$node_publisher->start;
# Create subscriber node
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init(allows_streaming => 'logical');
$node_subscriber->start;
# Create some preexisting content on publisher
$node_publisher->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b varchar)");
$node_publisher->safe_psql('postgres',
"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)");
# Setup structure on subscriber
$node_subscriber->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"
);
$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab_2 (a int)");
$node_subscriber->safe_psql('postgres',
"CREATE UNIQUE INDEX idx_tab on test_tab_2(a)");
# Setup logical replication
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub FOR TABLE test_tab, test_tab_2");
my $appname = 'tap_sub';
################################
# Test using streaming mode 'on'
################################
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
);
# Wait for initial table sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
my $result =
$node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(2|2|2), 'check initial data was copied to subscriber');
test_streaming($node_publisher, $node_subscriber, $appname, 0);
######################################
# Test using streaming mode 'parallel'
######################################
my $oldpid = $node_publisher->safe_psql('postgres',
"SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';"
);
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub SET(streaming = parallel, binary = off)");
$node_publisher->poll_query_until('postgres',
"SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';"
)
or die
"Timed out while waiting for apply to restart after changing SUBSCRIPTION";
# We need to check DEBUG logs to ensure that the parallel apply worker has
# applied the transaction. So, bump up the log verbosity.
$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1");
$node_subscriber->reload;
# Run a query to make sure that the reload has taken effect.
$node_subscriber->safe_psql('postgres', q{SELECT 1});
test_streaming($node_publisher, $node_subscriber, $appname, 1);
# Test that the deadlock is detected among the leader and parallel apply
# workers.
$node_subscriber->append_conf('postgresql.conf', "deadlock_timeout = 10ms");
$node_subscriber->reload;
# Run a query to make sure that the reload has taken effect.
$node_subscriber->safe_psql('postgres', q{SELECT 1});
# Interleave a pair of transactions, each exceeding the 64kB limit.
my $in = '';
my $out = '';
my $timer = IPC::Run::timeout($PostgreSQL::Test::Utils::timeout_default);
my $h = $node_publisher->background_psql('postgres', \$in, \$out, $timer,
on_error_stop => 0);
# Confirm if a deadlock between the leader apply worker and the parallel apply
# worker can be detected.
my $offset = -s $node_subscriber->logfile;
$in .= q{
BEGIN;
INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i);
};
$h->pump_nb;
# Ensure that the parallel apply worker executes the insert command before the
# leader worker.
$node_subscriber->wait_for_log(
qr/DEBUG: ( [A-Z0-9]+:)? applied [0-9]+ changes in the streaming chunk/,
$offset);
$node_publisher->safe_psql('postgres', "INSERT INTO test_tab_2 values(1)");
$in .= q{
COMMIT;
\q
};
$h->finish;
$node_subscriber->wait_for_log(qr/ERROR: ( [A-Z0-9]+:)? deadlock detected/,
$offset);
# In order for the two transactions to be completed normally without causing
# conflicts due to the unique index, we temporarily drop it.
$node_subscriber->safe_psql('postgres', "DROP INDEX idx_tab");
# Wait for this streaming transaction to be applied in the apply worker.
$node_publisher->wait_for_catchup($appname);
$result =
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
is($result, qq(5001), 'data replicated to subscriber after dropping index');
# Clean up test data from the environment.
$node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab_2");
$node_publisher->wait_for_catchup($appname);
$node_subscriber->safe_psql('postgres',
"CREATE UNIQUE INDEX idx_tab on test_tab_2(a)");
# Confirm if a deadlock between two parallel apply workers can be detected.
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
$in .= q{
BEGIN;
INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i);
};
$h->pump_nb;
# Ensure that the first parallel apply worker executes the insert command
# before the second one.
$node_subscriber->wait_for_log(
qr/DEBUG: ( [A-Z0-9]+:)? applied [0-9]+ changes in the streaming chunk/,
$offset);
$node_publisher->safe_psql('postgres',
"INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i)");
$in .= q{
COMMIT;
\q
};
$h->finish;
$node_subscriber->wait_for_log(qr/ERROR: ( [A-Z0-9]+:)? deadlock detected/,
$offset);
# In order for the two transactions to be completed normally without causing
# conflicts due to the unique index, we temporarily drop it.
$node_subscriber->safe_psql('postgres', "DROP INDEX idx_tab");
# Wait for this streaming transaction to be applied in the apply worker.
$node_publisher->wait_for_catchup($appname);
$result =
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2");
is($result, qq(10000), 'data replicated to subscriber after dropping index');
$node_subscriber->stop;
$node_publisher->stop;

@ -8,6 +8,73 @@ use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;
# Check that the parallel apply worker has finished applying the streaming
# transaction.
sub check_parallel_log
{
my ($node_subscriber, $offset, $is_parallel, $type) = @_;
if ($is_parallel)
{
$node_subscriber->wait_for_log(
qr/DEBUG: ( [A-Z0-9]+:)? finished processing the STREAM $type command/,
$offset);
}
}
# Common test steps for both the streaming=on and streaming=parallel cases.
sub test_streaming
{
my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_;
my $offset = 0;
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
# Insert, update and delete some rows.
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
SAVEPOINT s1;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(6, 8) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
SAVEPOINT s2;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9, 11) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
SAVEPOINT s3;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(12, 14) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
SAVEPOINT s4;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(15, 17) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
COMMIT;
});
$node_publisher->wait_for_catchup($appname);
check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT');
my $result =
$node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(12|12|12),
'check data was copied to subscriber in streaming mode and extra columns contain local defaults'
);
# Cleanup the test data
$node_publisher->safe_psql('postgres',
"DELETE FROM test_tab WHERE (a > 2)");
$node_publisher->wait_for_catchup($appname);
}
# Create publisher node
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
@ -37,6 +104,10 @@ $node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub FOR TABLE test_tab");
my $appname = 'tap_sub';
################################
# Test using streaming mode 'on'
################################
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
);
@ -49,40 +120,33 @@ my $result =
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(2|2|2), 'check initial data was copied to subscriber');
# Insert, update and delete some rows.
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
SAVEPOINT s1;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(6, 8) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
SAVEPOINT s2;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9, 11) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
SAVEPOINT s3;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(12, 14) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
SAVEPOINT s4;
INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(15, 17) s(i);
UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
DELETE FROM test_tab WHERE mod(a,3) = 0;
COMMIT;
});
test_streaming($node_publisher, $node_subscriber, $appname, 0);
$node_publisher->wait_for_catchup($appname);
######################################
# Test using streaming mode 'parallel'
######################################
my $oldpid = $node_publisher->safe_psql('postgres',
"SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';"
);
$result =
$node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(12|12|12),
'check data was copied to subscriber in streaming mode and extra columns contain local defaults'
);
"ALTER SUBSCRIPTION tap_sub SET(streaming = parallel)");
$node_publisher->poll_query_until('postgres',
"SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';"
)
or die
"Timed out while waiting for apply to restart after changing SUBSCRIPTION";
# We need to check DEBUG logs to ensure that the parallel apply worker has
# applied the transaction. So, bump up the log verbosity.
$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1");
$node_subscriber->reload;
# Run a query to make sure that the reload has taken effect.
$node_subscriber->safe_psql('postgres', q{SELECT 1});
test_streaming($node_publisher, $node_subscriber, $appname, 1);
$node_subscriber->stop;
$node_publisher->stop;

@ -2,6 +2,9 @@
# Copyright (c) 2021-2023, PostgreSQL Global Development Group
# Test streaming of large transaction with DDL and subtransactions
#
# This file is mainly to test the DDL/DML interaction of the publisher side,
# so we didn't add a parallel apply version for the tests in this file.
use strict;
use warnings;
use PostgreSQL::Test::Cluster;

@ -8,45 +8,29 @@ use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;
# Create publisher node
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->append_conf('postgresql.conf',
'logical_decoding_mode = immediate');
$node_publisher->start;
# Create subscriber node
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init(allows_streaming => 'logical');
$node_subscriber->start;
# Create some preexisting content on publisher
$node_publisher->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b varchar)");
$node_publisher->safe_psql('postgres',
"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
# Setup structure on subscriber
$node_subscriber->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)");
# Setup logical replication
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub FOR TABLE test_tab");
my $appname = 'tap_sub';
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
);
# Wait for initial table sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
my $result =
$node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c) FROM test_tab");
is($result, qq(2|0), 'check initial data was copied to subscriber');
# Check that the parallel apply worker has finished applying the streaming
# transaction.
sub check_parallel_log
{
my ($node_subscriber, $offset, $is_parallel, $type) = @_;
if ($is_parallel)
{
$node_subscriber->wait_for_log(
qr/DEBUG: ( [A-Z0-9]+:)? finished processing the STREAM $type command/,
$offset);
}
}
# Common test steps for both the streaming=on and streaming=parallel cases.
sub test_streaming
{
my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_;
my $offset = 0;
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
# streamed transaction with DDL, DML and ROLLBACKs
$node_publisher->safe_psql(
@ -72,15 +56,20 @@ COMMIT;
$node_publisher->wait_for_catchup($appname);
$result =
check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT');
my $result =
$node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c) FROM test_tab");
is($result, qq(6|0),
'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults'
);
# streamed transaction with subscriber receiving out of order subtransaction
# ROLLBACKs
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
# streamed transaction with subscriber receiving out of order
# subtransaction ROLLBACKs
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
@ -99,12 +88,17 @@ COMMIT;
$node_publisher->wait_for_catchup($appname);
check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT');
$result =
$node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c) FROM test_tab");
is($result, qq(7|0),
'check rollback to savepoint was reflected on subscriber');
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
# streamed transaction with subscriber receiving rollback
$node_publisher->safe_psql(
'postgres', q{
@ -119,11 +113,91 @@ ROLLBACK;
$node_publisher->wait_for_catchup($appname);
check_parallel_log($node_subscriber, $offset, $is_parallel, 'ABORT');
$result =
$node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c) FROM test_tab");
is($result, qq(7|0), 'check rollback was reflected on subscriber');
# Cleanup the test data
$node_publisher->safe_psql('postgres',
"DELETE FROM test_tab WHERE (a > 2)");
$node_publisher->wait_for_catchup($appname);
}
# Create publisher node
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->append_conf('postgresql.conf',
'logical_decoding_mode = immediate');
$node_publisher->start;
# Create subscriber node
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init(allows_streaming => 'logical');
$node_subscriber->start;
# Create some preexisting content on publisher
$node_publisher->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b varchar)");
$node_publisher->safe_psql('postgres',
"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
# Setup structure on subscriber
$node_subscriber->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b text, c INT, d INT, e INT)");
# Setup logical replication
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub FOR TABLE test_tab");
my $appname = 'tap_sub';
################################
# Test using streaming mode 'on'
################################
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)"
);
# Wait for initial table sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
my $result =
$node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c) FROM test_tab");
is($result, qq(2|0), 'check initial data was copied to subscriber');
test_streaming($node_publisher, $node_subscriber, $appname, 0);
######################################
# Test using streaming mode 'parallel'
######################################
my $oldpid = $node_publisher->safe_psql('postgres',
"SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';"
);
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub SET(streaming = parallel)");
$node_publisher->poll_query_until('postgres',
"SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';"
)
or die
"Timed out while waiting for apply to restart after changing SUBSCRIPTION";
# We need to check DEBUG logs to ensure that the parallel apply worker has
# applied the transaction. So, bump up the log verbosity.
$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1");
$node_subscriber->reload;
# Run a query to make sure that the reload has taken effect.
$node_subscriber->safe_psql('postgres', q{SELECT 1});
test_streaming($node_publisher, $node_subscriber, $appname, 1);
$node_subscriber->stop;
$node_publisher->stop;

@ -3,6 +3,9 @@
# Test streaming of transaction with subtransactions, DDLs, DMLs, and
# rollbacks
#
# This file is mainly to test the DDL/DML interaction of the publisher side,
# so we didn't add a parallel apply version for the tests in this file.
use strict;
use warnings;
use PostgreSQL::Test::Cluster;

@ -5,6 +5,8 @@
#
# Includes tests for options 2PC (not-streaming) and also for 2PC (streaming).
#
# Two-phase and parallel apply will be tested in 023_twophase_stream, so we
# didn't add a parallel apply version for the tests in this file.
use strict;
use warnings;
use PostgreSQL::Test::Cluster;

@ -8,68 +8,26 @@ use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;
###############################
# Setup
###############################
# Initialize publisher node
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->append_conf(
'postgresql.conf', qq(
max_prepared_transactions = 10
logical_decoding_mode = immediate
));
$node_publisher->start;
# Create subscriber node
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init(allows_streaming => 'logical');
$node_subscriber->append_conf(
'postgresql.conf', qq(
max_prepared_transactions = 10
));
$node_subscriber->start;
# Create some pre-existing content on publisher
$node_publisher->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b varchar)");
$node_publisher->safe_psql('postgres',
"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
# Setup structure on subscriber (columns a and b are compatible with same table name on publisher)
$node_subscriber->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"
);
# Setup logical replication (streaming = on)
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub FOR TABLE test_tab");
my $appname = 'tap_sub';
$node_subscriber->safe_psql(
'postgres', "
CREATE SUBSCRIPTION tap_sub
CONNECTION '$publisher_connstr application_name=$appname'
PUBLICATION tap_pub
WITH (streaming = on, two_phase = on)");
# Wait for initial table sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
# Also wait for two-phase to be enabled
my $twophase_query =
"SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT IN ('e');";
$node_subscriber->poll_query_until('postgres', $twophase_query)
or die "Timed out while waiting for subscriber to enable twophase";
###############################
# Check initial data was copied to subscriber
###############################
my $result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(2|2|2), 'check initial data was copied to subscriber');
# Check that the parallel apply worker has finished applying the streaming
# transaction.
sub check_parallel_log
{
my ($node_subscriber, $offset, $is_parallel, $type) = @_;
if ($is_parallel)
{
$node_subscriber->wait_for_log(
qr/DEBUG: ( [A-Z0-9]+:)? finished processing the STREAM $type command/,
$offset);
}
}
# Common test steps for both the streaming=on and streaming=parallel cases.
sub test_streaming
{
my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_;
my $offset = 0;
###############################
# Test 2PC PREPARE / COMMIT PREPARED.
@ -79,6 +37,9 @@ is($result, qq(2|2|2), 'check initial data was copied to subscriber');
# Expect all data is replicated on subscriber side after the commit.
###############################
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
# check that 2PC gets replicated to subscriber
# Insert, update and delete some rows.
$node_publisher->safe_psql(
@ -91,8 +52,10 @@ $node_publisher->safe_psql(
$node_publisher->wait_for_catchup($appname);
check_parallel_log($node_subscriber, $offset, $is_parallel, 'PREPARE');
# check that transaction is in prepared state on subscriber
$result = $node_subscriber->safe_psql('postgres',
my $result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
is($result, qq(1), 'transaction is prepared on subscriber');
@ -122,7 +85,11 @@ is($result, qq(0), 'transaction is committed on subscriber');
###############################
# First, delete the data except for 2 rows (will be replicated)
$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
$node_publisher->safe_psql('postgres',
"DELETE FROM test_tab WHERE a > 2;");
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
# Then insert, update and delete some rows.
$node_publisher->safe_psql(
@ -135,6 +102,8 @@ $node_publisher->safe_psql(
$node_publisher->wait_for_catchup($appname);
check_parallel_log($node_subscriber, $offset, $is_parallel, 'PREPARE');
# check that transaction is in prepared state on subscriber
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
@ -150,7 +119,8 @@ $node_publisher->wait_for_catchup($appname);
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(2|2|2),
'Rows inserted by 2PC are rolled back, leaving only the original 2 rows');
'Rows inserted by 2PC are rolled back, leaving only the original 2 rows'
);
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
@ -166,6 +136,9 @@ is($result, qq(0), 'transaction is aborted on subscriber');
# Note: both publisher and subscriber do crash/restart.
###############################
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
$node_publisher->safe_psql(
'postgres', q{
BEGIN;
@ -180,6 +153,10 @@ $node_publisher->stop('immediate');
$node_publisher->start;
$node_subscriber->start;
# We don't try to check the log for parallel option here as the subscriber
# may have stopped after finishing the prepare and before logging the
# appropriate message.
# commit post the restart
$node_publisher->safe_psql('postgres',
"COMMIT PREPARED 'test_prepared_tab';");
@ -204,7 +181,11 @@ is($result, qq(4|4|4),
###############################
# First, delete the data except for 2 rows (will be replicated)
$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
$node_publisher->safe_psql('postgres',
"DELETE FROM test_tab WHERE a > 2;");
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
# Then insert, update and delete some rows.
$node_publisher->safe_psql(
@ -217,6 +198,8 @@ $node_publisher->safe_psql(
$node_publisher->wait_for_catchup($appname);
check_parallel_log($node_subscriber, $offset, $is_parallel, 'PREPARE');
# check that transaction is in prepared state on subscriber
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
@ -237,7 +220,8 @@ $node_publisher->wait_for_catchup($appname);
# but the extra INSERT outside of the 2PC still was replicated
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(3|3|3), 'check the outside insert was copied to subscriber');
is($result, qq(3|3|3),
'check the outside insert was copied to subscriber');
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
@ -255,7 +239,11 @@ is($result, qq(0), 'transaction is aborted on subscriber');
###############################
# First, delete the data except for 2 rows (will be replicated)
$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;");
$node_publisher->safe_psql('postgres',
"DELETE FROM test_tab WHERE a > 2;");
# Check the subscriber log from now on.
$offset = -s $node_subscriber->logfile;
# Then insert, update and delete some rows.
$node_publisher->safe_psql(
@ -268,6 +256,8 @@ $node_publisher->safe_psql(
$node_publisher->wait_for_catchup($appname);
check_parallel_log($node_subscriber, $offset, $is_parallel, 'PREPARE');
# check that transaction is in prepared state on subscriber
$result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
@ -295,6 +285,105 @@ $result = $node_subscriber->safe_psql('postgres',
"SELECT count(*) FROM pg_prepared_xacts;");
is($result, qq(0), 'transaction is committed on subscriber');
# Cleanup the test data
$node_publisher->safe_psql('postgres',
"DELETE FROM test_tab WHERE a > 2;");
$node_publisher->wait_for_catchup($appname);
}
###############################
# Setup
###############################
# Initialize publisher node
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->append_conf(
'postgresql.conf', qq(
max_prepared_transactions = 10
logical_decoding_mode = immediate
));
$node_publisher->start;
# Create subscriber node
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init(allows_streaming => 'logical');
$node_subscriber->append_conf(
'postgresql.conf', qq(
max_prepared_transactions = 10
));
$node_subscriber->start;
# Create some pre-existing content on publisher
$node_publisher->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b varchar)");
$node_publisher->safe_psql('postgres',
"INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
# Setup structure on subscriber (columns a and b are compatible with same table name on publisher)
$node_subscriber->safe_psql('postgres',
"CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"
);
# Setup logical replication (streaming = on)
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub FOR TABLE test_tab");
my $appname = 'tap_sub';
################################
# Test using streaming mode 'on'
################################
$node_subscriber->safe_psql(
'postgres', "
CREATE SUBSCRIPTION tap_sub
CONNECTION '$publisher_connstr application_name=$appname'
PUBLICATION tap_pub
WITH (streaming = on, two_phase = on)");
# Wait for initial table sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
# Also wait for two-phase to be enabled
my $twophase_query =
"SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT IN ('e');";
$node_subscriber->poll_query_until('postgres', $twophase_query)
or die "Timed out while waiting for subscriber to enable twophase";
# Check initial data was copied to subscriber
my $result = $node_subscriber->safe_psql('postgres',
"SELECT count(*), count(c), count(d = 999) FROM test_tab");
is($result, qq(2|2|2), 'check initial data was copied to subscriber');
test_streaming($node_publisher, $node_subscriber, $appname, 0);
######################################
# Test using streaming mode 'parallel'
######################################
my $oldpid = $node_publisher->safe_psql('postgres',
"SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';"
);
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tap_sub SET(streaming = parallel)");
$node_publisher->poll_query_until('postgres',
"SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';"
)
or die
"Timed out while waiting for apply to restart after changing SUBSCRIPTION";
# We need to check DEBUG logs to ensure that the parallel apply worker has
# applied the transaction. So, bump up the log verbosity.
$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1");
$node_subscriber->reload;
# Run a query to make sure that the reload has taken effect.
$node_subscriber->safe_psql('postgres', q{SELECT 1});
test_streaming($node_publisher, $node_subscriber, $appname, 1);
###############################
# check all the cleanup
###############################

@ -1473,6 +1473,7 @@ LogicalRepRelId
LogicalRepRelMapEntry
LogicalRepRelation
LogicalRepRollbackPreparedTxnData
LogicalRepStreamAbortData
LogicalRepTupleData
LogicalRepTyp
LogicalRepWorker
@ -1876,6 +1877,9 @@ PageXLogRecPtr
PagetableEntry
Pairs
ParallelAppendState
ParallelApplyWorkerEntry
ParallelApplyWorkerInfo
ParallelApplyWorkerShared
ParallelBitmapHeapState
ParallelBlockTableScanDesc
ParallelBlockTableScanWorker
@ -1895,6 +1899,7 @@ ParallelSlotResultHandler
ParallelState
ParallelTableScanDesc
ParallelTableScanDescData
ParallelTransState
ParallelVacuumState
ParallelWorkerContext
ParallelWorkerInfo
@ -1924,6 +1929,7 @@ ParserState
PartClauseInfo
PartClauseMatchStatus
PartClauseTarget
PartialFileSetState
PartitionBoundInfo
PartitionBoundInfoData
PartitionBoundSpec
@ -2774,6 +2780,7 @@ TocEntry
TokenAuxData
TokenizedAuthLine
TrackItem
TransApplyAction
TransInvalidationInfo
TransState
TransactionId

Loading…
Cancel
Save