@ -29,6 +29,7 @@
# include "getopt_long.h"
# include "getopt_long.h"
# define DEFAULT_SUB_PORT "50432"
# define DEFAULT_SUB_PORT "50432"
# define OBJECTTYPE_PUBLICATIONS 0x0001
/* Command-line options */
/* Command-line options */
struct CreateSubscriberOptions
struct CreateSubscriberOptions
@ -44,6 +45,7 @@ struct CreateSubscriberOptions
SimpleStringList sub_names ; /* list of subscription names */
SimpleStringList sub_names ; /* list of subscription names */
SimpleStringList replslot_names ; /* list of replication slot names */
SimpleStringList replslot_names ; /* list of replication slot names */
int recovery_timeout ; /* stop recovery after this time */
int recovery_timeout ; /* stop recovery after this time */
SimpleStringList objecttypes_to_remove ; /* list of object types to remove */
} ;
} ;
/* per-database publication/subscription info */
/* per-database publication/subscription info */
@ -68,6 +70,8 @@ struct LogicalRepInfos
{
{
struct LogicalRepInfo * dbinfo ;
struct LogicalRepInfo * dbinfo ;
bool two_phase ; /* enable-two-phase option */
bool two_phase ; /* enable-two-phase option */
bits32 objecttypes_to_remove ; /* flags indicating which object types
* to remove on subscriber */
} ;
} ;
static void cleanup_objects_atexit ( void ) ;
static void cleanup_objects_atexit ( void ) ;
@ -109,7 +113,9 @@ static void stop_standby_server(const char *datadir);
static void wait_for_end_recovery ( const char * conninfo ,
static void wait_for_end_recovery ( const char * conninfo ,
const struct CreateSubscriberOptions * opt ) ;
const struct CreateSubscriberOptions * opt ) ;
static void create_publication ( PGconn * conn , struct LogicalRepInfo * dbinfo ) ;
static void create_publication ( PGconn * conn , struct LogicalRepInfo * dbinfo ) ;
static void drop_publication ( PGconn * conn , struct LogicalRepInfo * dbinfo ) ;
static void drop_publication ( PGconn * conn , const char * pubname ,
const char * dbname , bool * made_publication ) ;
static void check_and_drop_publications ( PGconn * conn , struct LogicalRepInfo * dbinfo ) ;
static void create_subscription ( PGconn * conn , const struct LogicalRepInfo * dbinfo ) ;
static void create_subscription ( PGconn * conn , const struct LogicalRepInfo * dbinfo ) ;
static void set_replication_progress ( PGconn * conn , const struct LogicalRepInfo * dbinfo ,
static void set_replication_progress ( PGconn * conn , const struct LogicalRepInfo * dbinfo ,
const char * lsn ) ;
const char * lsn ) ;
@ -194,7 +200,8 @@ cleanup_objects_atexit(void)
if ( conn ! = NULL )
if ( conn ! = NULL )
{
{
if ( dbinfo - > made_publication )
if ( dbinfo - > made_publication )
drop_publication ( conn , dbinfo ) ;
drop_publication ( conn , dbinfo - > pubname , dbinfo - > dbname ,
& dbinfo - > made_publication ) ;
if ( dbinfo - > made_replslot )
if ( dbinfo - > made_replslot )
drop_replication_slot ( conn , dbinfo , dbinfo - > replslotname ) ;
drop_replication_slot ( conn , dbinfo , dbinfo - > replslotname ) ;
disconnect_database ( conn , false ) ;
disconnect_database ( conn , false ) ;
@ -241,6 +248,8 @@ usage(void)
printf ( _ ( " -n, --dry-run dry run, just show what would be done \n " ) ) ;
printf ( _ ( " -n, --dry-run dry run, just show what would be done \n " ) ) ;
printf ( _ ( " -p, --subscriber-port=PORT subscriber port number (default %s) \n " ) , DEFAULT_SUB_PORT ) ;
printf ( _ ( " -p, --subscriber-port=PORT subscriber port number (default %s) \n " ) , DEFAULT_SUB_PORT ) ;
printf ( _ ( " -P, --publisher-server=CONNSTR publisher connection string \n " ) ) ;
printf ( _ ( " -P, --publisher-server=CONNSTR publisher connection string \n " ) ) ;
printf ( _ ( " -R, --remove=OBJECTTYPE remove all objects of the specified type from specified \n "
" databases on the subscriber; accepts: publications \n " ) ) ;
printf ( _ ( " -s, --socketdir=DIR socket directory to use (default current dir.) \n " ) ) ;
printf ( _ ( " -s, --socketdir=DIR socket directory to use (default current dir.) \n " ) ) ;
printf ( _ ( " -t, --recovery-timeout=SECS seconds to wait for recovery to end \n " ) ) ;
printf ( _ ( " -t, --recovery-timeout=SECS seconds to wait for recovery to end \n " ) ) ;
printf ( _ ( " -T, --enable-two-phase enable two-phase commit for all subscriptions \n " ) ) ;
printf ( _ ( " -T, --enable-two-phase enable two-phase commit for all subscriptions \n " ) ) ;
@ -1193,12 +1202,8 @@ setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
*/
*/
check_and_drop_existing_subscriptions ( conn , & dbinfo [ i ] ) ;
check_and_drop_existing_subscriptions ( conn , & dbinfo [ i ] ) ;
/*
/* Check and drop the required publications in the given database. */
* Since the publication was created before the consistent LSN , it is
check_and_drop_publications ( conn , & dbinfo [ i ] ) ;
* available on the subscriber when the physical replica is promoted .
* Remove publications from the subscriber because it has no use .
*/
drop_publication ( conn , & dbinfo [ i ] ) ;
create_subscription ( conn , & dbinfo [ i ] ) ;
create_subscription ( conn , & dbinfo [ i ] ) ;
@ -1663,10 +1668,11 @@ create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
}
}
/*
/*
* Remove publication if it couldn ' t finish all steps .
* Drop the specified publication in the given database .
*/
*/
static void
static void
drop_publication ( PGconn * conn , struct LogicalRepInfo * dbinfo )
drop_publication ( PGconn * conn , const char * pubname , const char * dbname ,
bool * made_publication )
{
{
PQExpBuffer str = createPQExpBuffer ( ) ;
PQExpBuffer str = createPQExpBuffer ( ) ;
PGresult * res ;
PGresult * res ;
@ -1674,10 +1680,10 @@ drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
Assert ( conn ! = NULL ) ;
Assert ( conn ! = NULL ) ;
pubname_esc = PQescapeIdentifier ( conn , dbinfo - > pubname , strlen ( dbinfo - > pubname ) ) ;
pubname_esc = PQescapeIdentifier ( conn , pubname , strlen ( pubname ) ) ;
pg_log_info ( " dropping publication \" %s \" in database \" %s \" " ,
pg_log_info ( " dropping publication \" %s \" in database \" %s \" " ,
dbinfo - > pubname , dbinfo - > dbname ) ;
pubname , dbname ) ;
appendPQExpBuffer ( str , " DROP PUBLICATION %s " , pubname_esc ) ;
appendPQExpBuffer ( str , " DROP PUBLICATION %s " , pubname_esc ) ;
@ -1691,8 +1697,8 @@ drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
{
{
pg_log_error ( " could not drop publication \" %s \" in database \" %s \" : %s " ,
pg_log_error ( " could not drop publication \" %s \" in database \" %s \" : %s " ,
dbinfo - > pubname , dbinfo - > dbname , PQresultErrorMessage ( res ) ) ;
pubname , dbname , PQresultErrorMessage ( res ) ) ;
dbinfo - > made_publication = false ; /* don't try again. */
* made_publication = false ; /* don't try again. */
/*
/*
* Don ' t disconnect and exit here . This routine is used by primary
* Don ' t disconnect and exit here . This routine is used by primary
@ -1708,6 +1714,55 @@ drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
destroyPQExpBuffer ( str ) ;
destroyPQExpBuffer ( str ) ;
}
}
/*
* Retrieve and drop the publications .
*
* Since the publications were created before the consistent LSN , they
* remain on the subscriber even after the physical replica is
* promoted . Remove these publications from the subscriber because
* they have no use . Additionally , if requested , drop all pre - existing
* publications .
*/
static void
check_and_drop_publications ( PGconn * conn , struct LogicalRepInfo * dbinfo )
{
PGresult * res ;
bool drop_all_pubs = dbinfos . objecttypes_to_remove & OBJECTTYPE_PUBLICATIONS ;
Assert ( conn ! = NULL ) ;
if ( drop_all_pubs )
{
pg_log_info ( " dropping all existing publications in database \" %s \" " ,
dbinfo - > dbname ) ;
/* Fetch all publication names */
res = PQexec ( conn , " SELECT pubname FROM pg_catalog.pg_publication; " ) ;
if ( PQresultStatus ( res ) ! = PGRES_TUPLES_OK )
{
pg_log_error ( " could not obtain publication information: %s " ,
PQresultErrorMessage ( res ) ) ;
PQclear ( res ) ;
disconnect_database ( conn , true ) ;
}
/* Drop each publication */
for ( int i = 0 ; i < PQntuples ( res ) ; i + + )
drop_publication ( conn , PQgetvalue ( res , i , 0 ) , dbinfo - > dbname ,
& dbinfo - > made_publication ) ;
PQclear ( res ) ;
}
/*
* In dry - run mode , we don ' t create publications , but we still try to drop
* those to provide necessary information to the user .
*/
if ( ! drop_all_pubs | | dry_run )
drop_publication ( conn , dbinfo - > pubname , dbinfo - > dbname ,
& dbinfo - > made_publication ) ;
}
/*
/*
* Create a subscription with some predefined options .
* Create a subscription with some predefined options .
*
*
@ -1914,6 +1969,7 @@ main(int argc, char **argv)
{ " dry-run " , no_argument , NULL , ' n ' } ,
{ " dry-run " , no_argument , NULL , ' n ' } ,
{ " subscriber-port " , required_argument , NULL , ' p ' } ,
{ " subscriber-port " , required_argument , NULL , ' p ' } ,
{ " publisher-server " , required_argument , NULL , ' P ' } ,
{ " publisher-server " , required_argument , NULL , ' P ' } ,
{ " remove " , required_argument , NULL , ' R ' } ,
{ " socketdir " , required_argument , NULL , ' s ' } ,
{ " socketdir " , required_argument , NULL , ' s ' } ,
{ " recovery-timeout " , required_argument , NULL , ' t ' } ,
{ " recovery-timeout " , required_argument , NULL , ' t ' } ,
{ " enable-two-phase " , no_argument , NULL , ' T ' } ,
{ " enable-two-phase " , no_argument , NULL , ' T ' } ,
@ -1995,7 +2051,7 @@ main(int argc, char **argv)
get_restricted_token ( ) ;
get_restricted_token ( ) ;
while ( ( c = getopt_long ( argc , argv , " d:D:np:P:s:t:TU:v " ,
while ( ( c = getopt_long ( argc , argv , " d:D:np:P:R: s:t:TU:v " ,
long_options , & option_index ) ) ! = - 1 )
long_options , & option_index ) ) ! = - 1 )
{
{
switch ( c )
switch ( c )
@ -2025,6 +2081,12 @@ main(int argc, char **argv)
case ' P ' :
case ' P ' :
opt . pub_conninfo_str = pg_strdup ( optarg ) ;
opt . pub_conninfo_str = pg_strdup ( optarg ) ;
break ;
break ;
case ' R ' :
if ( ! simple_string_list_member ( & opt . objecttypes_to_remove , optarg ) )
simple_string_list_append ( & opt . objecttypes_to_remove , optarg ) ;
else
pg_fatal ( " object type \" %s \" is specified more than once for --remove " , optarg ) ;
break ;
case ' s ' :
case ' s ' :
opt . socket_dir = pg_strdup ( optarg ) ;
opt . socket_dir = pg_strdup ( optarg ) ;
canonicalize_path ( opt . socket_dir ) ;
canonicalize_path ( opt . socket_dir ) ;
@ -2189,6 +2251,19 @@ main(int argc, char **argv)
exit ( 1 ) ;
exit ( 1 ) ;
}
}
/* Verify the object types specified for removal from the subscriber */
for ( SimpleStringListCell * cell = opt . objecttypes_to_remove . head ; cell ; cell = cell - > next )
{
if ( pg_strcasecmp ( cell - > val , " publications " ) = = 0 )
dbinfos . objecttypes_to_remove | = OBJECTTYPE_PUBLICATIONS ;
else
{
pg_log_error ( " invalid object type \" %s \" specified for --remove " , cell - > val ) ;
pg_log_error_hint ( " The valid option is: \" publications \" " ) ;
exit ( 1 ) ;
}
}
/* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
/* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
pg_ctl_path = get_exec_path ( argv [ 0 ] , " pg_ctl " ) ;
pg_ctl_path = get_exec_path ( argv [ 0 ] , " pg_ctl " ) ;
pg_resetwal_path = get_exec_path ( argv [ 0 ] , " pg_resetwal " ) ;
pg_resetwal_path = get_exec_path ( argv [ 0 ] , " pg_resetwal " ) ;