Support existing publications in pg_createsubscriber.

Allow pg_createsubscriber to reuse existing publications instead of
failing when they already exist on the publisher.

Previously, pg_createsubscriber would fail if any specified publication
already existed. Now, existing publications are reused as-is with their
current configuration, and non-existing publications are created
automatically with FOR ALL TABLES.

This change provides flexibility when working with mixed scenarios of
existing and new publications. Users should verify that existing
publications have the desired configuration before reusing them, and can
use --dry-run with verbose mode to see which publications will be reused
and which will be created.

Only publications created by pg_createsubscriber are cleaned up during
error cleanup operations. Pre-existing publications are preserved unless
'--clean=publications' is explicitly specified, which drops all
publications.

This feature would be helpful for pub-sub configurations where users want
to subscribe to a subset of tables from the publisher.

Author: Shubham Khanna <khannashubham1197@gmail.com>
Reviewed-by: Euler Taveira <euler@eulerto.com>
Reviewed-by: Peter Smith <smithpb2250@gmail.com>
Reviewed-by: Zhijie Hou (Fujitsu) <houzj.fnst@fujitsu.com
Reviewed-by: Chao Li <li.evan.chao@gmail.com>
Reviewed-by: vignesh C <vignesh21@gmail.com>
Reviewed-by: tianbing <tian_bing_0531@163.com>
Discussion: https://postgr.es/m/CAHv8Rj%2BsxWutv10WiDEAPZnygaCbuY2RqiLMj2aRMH-H3iZwyA%40mail.gmail.com
master
Amit Kapila 1 day ago
parent f4e797171e
commit 85ddcc2f4c
  1. 8
      doc/src/sgml/ref/pg_createsubscriber.sgml
  2. 83
      src/bin/pg_basebackup/pg_createsubscriber.c
  3. 54
      src/bin/pg_basebackup/t/040_pg_createsubscriber.pl

@ -285,6 +285,14 @@ PostgreSQL documentation
a generated name is assigned to the publication name. This option cannot
be used together with <option>--all</option>.
</para>
<para>
If a specified publication already exists on the publisher, it is reused.
It is useful to partially replicate the database if the specified
publication includes a list of tables. If the publication does not exist,
it is automatically created with <literal>FOR ALL TABLES</literal>. Use
<option>--dry-run</option> option to preview which publications will be
reused and which will be created.
</para>
</listitem>
</varlistentry>

@ -116,6 +116,7 @@ static void stop_standby_server(const char *datadir);
static void wait_for_end_recovery(const char *conninfo,
const struct CreateSubscriberOptions *opt);
static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
static bool find_publication(PGconn *conn, const char *pubname, const char *dbname);
static void drop_publication(PGconn *conn, const char *pubname,
const char *dbname, bool *made_publication);
static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
@ -763,6 +764,39 @@ generate_object_name(PGconn *conn)
return objname;
}
/*
* Does the publication exist in the specified database?
*/
static bool
find_publication(PGconn *conn, const char *pubname, const char *dbname)
{
PQExpBuffer str = createPQExpBuffer();
PGresult *res;
bool found = false;
char *pubname_esc = PQescapeLiteral(conn, pubname, strlen(pubname));
appendPQExpBuffer(str,
"SELECT 1 FROM pg_catalog.pg_publication "
"WHERE pubname = %s",
pubname_esc);
res = PQexec(conn, str->data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
pg_log_error("could not find publication \"%s\" in database \"%s\": %s",
pubname, dbname, PQerrorMessage(conn));
disconnect_database(conn, true);
}
if (PQntuples(res) == 1)
found = true;
PQclear(res);
PQfreemem(pubname_esc);
destroyPQExpBuffer(str);
return found;
}
/*
* Create the publications and replication slots in preparation for logical
* replication. Returns the LSN from latest replication slot. It will be the
@ -799,13 +833,25 @@ setup_publisher(struct LogicalRepInfo *dbinfo)
if (num_replslots == 0)
dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
if (find_publication(conn, dbinfo[i].pubname, dbinfo[i].dbname))
{
/* Reuse existing publication on publisher. */
pg_log_info("use existing publication \"%s\" in database \"%s\"",
dbinfo[i].pubname, dbinfo[i].dbname);
/* Don't remove pre-existing publication if an error occurs. */
dbinfo[i].made_publication = false;
}
else
{
/*
* Create publication on publisher. This step should be executed
* *before* promoting the subscriber to avoid any transactions between
* consistent LSN and the new publication rows (such transactions
* wouldn't see the new publication rows resulting in an error).
* *before* promoting the subscriber to avoid any transactions
* between consistent LSN and the new publication rows (such
* transactions wouldn't see the new publication rows resulting in
* an error).
*/
create_publication(conn, &dbinfo[i]);
}
/* Create replication slot on publisher */
if (lsn)
@ -1749,11 +1795,10 @@ drop_publication(PGconn *conn, const char *pubname, const char *dbname,
/*
* Retrieve and drop the publications.
*
* Since the publications were created before the consistent LSN, they
* remain on the subscriber even after the physical replica is
* promoted. Remove these publications from the subscriber because
* they have no use. Additionally, if requested, drop all pre-existing
* publications.
* Publications copied during physical replication remain on the subscriber
* after promotion. If --clean=publications is specified, drop all existing
* publications in the subscriber database. Otherwise, only drop publications
* that were created by pg_createsubscriber during this operation.
*/
static void
check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
@ -1785,15 +1830,25 @@ check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
PQclear(res);
}
/*
* In dry-run mode, we don't create publications, but we still try to drop
* those to provide necessary information to the user.
*/
if (!drop_all_pubs || dry_run)
else
{
/* Drop publication only if it was created by this tool */
if (dbinfo->made_publication)
{
drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
&dbinfo->made_publication);
}
else
{
if (dry_run)
pg_log_info("dry-run: would preserve existing publication \"%s\" in database \"%s\"",
dbinfo->pubname, dbinfo->dbname);
else
pg_log_info("preserve existing publication \"%s\" in database \"%s\"",
dbinfo->pubname, dbinfo->dbname);
}
}
}
/*
* Create a subscription with some predefined options.

@ -443,10 +443,17 @@ is(scalar(() = $stderr =~ /would create the replication slot/g),
is(scalar(() = $stderr =~ /would create subscription/g),
3, "verify subscriptions are created for all databases");
# Create a user-defined publication, and a table that is not a member of that
# publication.
$node_p->safe_psql($db1, qq(
CREATE PUBLICATION test_pub3 FOR TABLE tbl1;
CREATE TABLE not_replicated (a int);
));
# Run pg_createsubscriber on node S. --verbose is used twice
# to show more information.
# In passing, also test the --enable-two-phase option and
# --clean option
#
# Test two phase and clean options. Use pre-existing publication.
command_ok(
[
'pg_createsubscriber',
@ -456,7 +463,7 @@ command_ok(
'--publisher-server' => $node_p->connstr($db1),
'--socketdir' => $node_s->host,
'--subscriber-port' => $node_s->port,
'--publication' => 'pub1',
'--publication' => 'test_pub3',
'--publication' => 'pub2',
'--replication-slot' => 'replslot1',
'--replication-slot' => 'replslot2',
@ -478,13 +485,16 @@ is($result, qq(0),
# Insert rows on P
$node_p->safe_psql($db1, "INSERT INTO tbl1 VALUES('third row')");
$node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 1')");
$node_p->safe_psql($db1, "INSERT INTO not_replicated VALUES(0)");
# Start subscriber
$node_s->start;
# Confirm publications are removed from the subscriber node
is($node_s->safe_psql($db1, "SELECT COUNT(*) FROM pg_publication;"),
'0', 'all publications on subscriber have been removed');
is($node_s->safe_psql($db1, 'SELECT COUNT(*) FROM pg_publication'),
'0', 'all publications were removed from db1');
is($node_s->safe_psql($db2, 'SELECT COUNT(*) FROM pg_publication'),
'0', 'all publications were removed from db2');
# Verify that all subtwophase states are pending or enabled,
# e.g. there are no subscriptions where subtwophase is disabled ('d')
@ -525,6 +535,9 @@ is( $result, qq(first row
second row
third row),
"logical replication works in database $db1");
$result = $node_s->safe_psql($db1, 'SELECT * FROM not_replicated');
is($result, qq(),
"table is not replicated in database $db1");
# Check result in database $db2
$result = $node_s->safe_psql($db2, 'SELECT * FROM tbl2');
@ -537,6 +550,37 @@ my $sysid_s = $node_s->safe_psql('postgres',
'SELECT system_identifier FROM pg_control_system()');
isnt($sysid_p, $sysid_s, 'system identifier was changed');
# Verify that pub2 was created in $db2
is($node_p->safe_psql($db2, "SELECT COUNT(*) FROM pg_publication WHERE pubname = 'pub2'"),
'1', "publication pub2 was created in $db2");
# Get subscription and publication names
$result = $node_s->safe_psql(
'postgres', qq(
SELECT subname, subpublications FROM pg_subscription WHERE subname ~ '^pg_createsubscriber_'
ORDER BY subpublications;
));
like(
$result,
qr/^pg_createsubscriber_\d+_[0-9a-f]+ \|\{pub2\}\n
pg_createsubscriber_\d+_[0-9a-f]+ \|\{test_pub3\}$/x,
'subscription and publication names are ok');
# Verify that the correct publications are being used
$result = $node_s->safe_psql(
'postgres', qq(
SELECT d.datname, s.subpublications
FROM pg_subscription s
JOIN pg_database d ON d.oid = s.subdbid
WHERE subname ~ '^pg_createsubscriber_'
ORDER BY s.subdbid
)
);
is($result, qq($db1|{test_pub3}
$db2|{pub2}),
"subscriptions use the correct publications");
# clean up
$node_p->teardown_node;
$node_s->teardown_node;

Loading…
Cancel
Save