@ -92,7 +92,8 @@ static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
const char * slot_name ) ;
static void pg_ctl_status ( const char * pg_ctl_cmd , int rc ) ;
static void start_standby_server ( const struct CreateSubscriberOptions * opt ,
bool restricted_access ) ;
bool restricted_access ,
bool restrict_logical_worker ) ;
static void stop_standby_server ( const char * datadir ) ;
static void wait_for_end_recovery ( const char * conninfo ,
const struct CreateSubscriberOptions * opt ) ;
@ -102,6 +103,10 @@ static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinf
static void set_replication_progress ( PGconn * conn , const struct LogicalRepInfo * dbinfo ,
const char * lsn ) ;
static void enable_subscription ( PGconn * conn , const struct LogicalRepInfo * dbinfo ) ;
static void check_and_drop_existing_subscriptions ( PGconn * conn ,
const struct LogicalRepInfo * dbinfo ) ;
static void drop_existing_subscriptions ( PGconn * conn , const char * subname ,
const char * dbname ) ;
# define USEC_PER_SEC 1000000
# define WAIT_INTERVAL 1 /* 1 second */
@ -1025,6 +1030,87 @@ check_subscriber(const struct LogicalRepInfo *dbinfo)
exit ( 1 ) ;
}
/*
* Drop a specified subscription . This is to avoid duplicate subscriptions on
* the primary ( publisher node ) and the newly created subscriber . We
* shouldn ' t drop the associated slot as that would be used by the publisher
* node .
*/
static void
drop_existing_subscriptions ( PGconn * conn , const char * subname , const char * dbname )
{
PQExpBuffer query = createPQExpBuffer ( ) ;
PGresult * res ;
Assert ( conn ! = NULL ) ;
/*
* Construct a query string . These commands are allowed to be executed
* within a transaction .
*/
appendPQExpBuffer ( query , " ALTER SUBSCRIPTION %s DISABLE; " ,
subname ) ;
appendPQExpBuffer ( query , " ALTER SUBSCRIPTION %s SET (slot_name = NONE); " ,
subname ) ;
appendPQExpBuffer ( query , " DROP SUBSCRIPTION %s; " , subname ) ;
pg_log_info ( " dropping subscription \" %s \" on database \" %s \" " ,
subname , dbname ) ;
if ( ! dry_run )
{
res = PQexec ( conn , query - > data ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
{
pg_log_error ( " could not drop a subscription \" %s \" settings: %s " ,
subname , PQresultErrorMessage ( res ) ) ;
disconnect_database ( conn , true ) ;
}
PQclear ( res ) ;
}
destroyPQExpBuffer ( query ) ;
}
/*
* Retrieve and drop the pre - existing subscriptions .
*/
static void
check_and_drop_existing_subscriptions ( PGconn * conn ,
const struct LogicalRepInfo * dbinfo )
{
PQExpBuffer query = createPQExpBuffer ( ) ;
char * dbname ;
PGresult * res ;
Assert ( conn ! = NULL ) ;
dbname = PQescapeLiteral ( conn , dbinfo - > dbname , strlen ( dbinfo - > dbname ) ) ;
appendPQExpBuffer ( query ,
" SELECT s.subname FROM pg_catalog.pg_subscription s "
" INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
" WHERE d.datname = %s " ,
dbname ) ;
res = PQexec ( conn , query - > data ) ;
if ( PQresultStatus ( res ) ! = PGRES_TUPLES_OK )
{
pg_log_error ( " could not obtain pre-existing subscriptions: %s " ,
PQresultErrorMessage ( res ) ) ;
disconnect_database ( conn , true ) ;
}
for ( int i = 0 ; i < PQntuples ( res ) ; i + + )
drop_existing_subscriptions ( conn , PQgetvalue ( res , i , 0 ) ,
dbinfo - > dbname ) ;
PQclear ( res ) ;
destroyPQExpBuffer ( query ) ;
}
/*
* Create the subscriptions , adjust the initial location for logical
* replication and enable the subscriptions . That ' s the last step for logical
@ -1040,6 +1126,14 @@ setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
/* Connect to subscriber. */
conn = connect_database ( dbinfo [ i ] . subconninfo , true ) ;
/*
* We don ' t need the pre - existing subscriptions on the newly formed
* subscriber . They can connect to other publisher nodes and either
* get some unwarranted data or can lead to ERRORs in connecting to
* such nodes .
*/
check_and_drop_existing_subscriptions ( conn , & dbinfo [ i ] ) ;
/*
* Since the publication was created before the consistent LSN , it is
* available on the subscriber when the physical replica is promoted .
@ -1314,7 +1408,8 @@ pg_ctl_status(const char *pg_ctl_cmd, int rc)
}
static void
start_standby_server ( const struct CreateSubscriberOptions * opt , bool restricted_access )
start_standby_server ( const struct CreateSubscriberOptions * opt , bool restricted_access ,
bool restrict_logical_worker )
{
PQExpBuffer pg_ctl_cmd = createPQExpBuffer ( ) ;
int rc ;
@ -1343,6 +1438,11 @@ start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_
if ( opt - > config_file ! = NULL )
appendPQExpBuffer ( pg_ctl_cmd , " -o \" -c config_file=%s \" " ,
opt - > config_file ) ;
/* Suppress to start logical replication if requested */
if ( restrict_logical_worker )
appendPQExpBuffer ( pg_ctl_cmd , " -o \" -c max_logical_replication_workers=0 \" " ) ;
pg_log_debug ( " pg_ctl command is: %s " , pg_ctl_cmd - > data ) ;
rc = system ( pg_ctl_cmd - > data ) ;
pg_ctl_status ( pg_ctl_cmd - > data , rc ) ;
@ -2067,7 +2167,7 @@ main(int argc, char **argv)
* transformation steps .
*/
pg_log_info ( " starting the standby with command-line options " ) ;
start_standby_server ( & opt , true ) ;
start_standby_server ( & opt , true , false ) ;
/* Check if the standby server is ready for logical replication */
check_subscriber ( dbinfo ) ;
@ -2098,10 +2198,11 @@ main(int argc, char **argv)
/*
* Start subscriber so the recovery parameters will take effect . Wait
* until accepting connections .
* until accepting connections . We don ' t want to start logical replication
* during setup .
*/
pg_log_info ( " starting the subscriber " ) ;
start_standby_server ( & opt , true ) ;
start_standby_server ( & opt , true , true ) ;
/* Waiting the subscriber to be promoted */
wait_for_end_recovery ( dbinfo [ 0 ] . subconninfo , & opt ) ;