@ -38,6 +38,7 @@ struct CreateSubscriberOptions
char * socket_dir ; /* directory for Unix-domain socket, if any */
char * sub_port ; /* subscriber port number */
const char * sub_username ; /* subscriber username */
bool two_phase ; /* enable-two-phase option */
SimpleStringList database_names ; /* list of database names */
SimpleStringList pub_names ; /* list of publication names */
SimpleStringList sub_names ; /* list of subscription names */
@ -45,6 +46,7 @@ struct CreateSubscriberOptions
int recovery_timeout ; /* stop recovery after this time */
} ;
/* per-database publication/subscription info */
struct LogicalRepInfo
{
char * dbname ; /* database name */
@ -58,6 +60,16 @@ struct LogicalRepInfo
bool made_publication ; /* publication was created */
} ;
/*
* Information shared across all the databases ( or publications and
* subscriptions ) .
*/
struct LogicalRepInfos
{
struct LogicalRepInfo * dbinfo ;
bool two_phase ; /* enable-two-phase option */
} ;
static void cleanup_objects_atexit ( void ) ;
static void usage ( ) ;
static char * get_base_conninfo ( const char * conninfo , char * * dbname ) ;
@ -117,7 +129,7 @@ static bool dry_run = false;
static bool success = false ;
static struct LogicalRepInfo * dbinfo ;
static struct LogicalRepInfos dbinfos ;
static int num_dbs = 0 ; /* number of specified databases */
static int num_pubs = 0 ; /* number of specified publications */
static int num_subs = 0 ; /* number of specified subscriptions */
@ -172,17 +184,17 @@ cleanup_objects_atexit(void)
for ( int i = 0 ; i < num_dbs ; i + + )
{
if ( dbinfo [ i ] . made_publication | | dbinfo [ i ] . made_replslot )
if ( dbinfos . dbinfo [ i ] . made_publication | | dbinfos . dbinfo [ i ] . made_replslot )
{
PGconn * conn ;
conn = connect_database ( dbinfo [ i ] . pubconninfo , false ) ;
conn = connect_database ( dbinfos . dbinfo [ i ] . pubconninfo , false ) ;
if ( conn ! = NULL )
{
if ( dbinfo [ i ] . made_publication )
drop_publication ( conn , & dbinfo [ i ] ) ;
if ( dbinfo [ i ] . made_replslot )
drop_replication_slot ( conn , & dbinfo [ i ] , dbinfo [ i ] . replslotname ) ;
if ( dbinfos . dbinfo [ i ] . made_publication )
drop_publication ( conn , & dbinfos . dbinfo [ i ] ) ;
if ( dbinfos . dbinfo [ i ] . made_replslot )
drop_replication_slot ( conn , & dbinfos . dbinfo [ i ] , dbinfos . dbinfo [ i ] . replslotname ) ;
disconnect_database ( conn , false ) ;
}
else
@ -192,16 +204,18 @@ cleanup_objects_atexit(void)
* that some objects were left on primary and should be
* removed before trying again .
*/
if ( dbinfo [ i ] . made_publication )
if ( dbinfos . dbinfo [ i ] . made_publication )
{
pg_log_warning ( " publication \" %s \" created in database \" %s \" on primary was left behind " ,
dbinfo [ i ] . pubname , dbinfo [ i ] . dbname ) ;
dbinfos . dbinfo [ i ] . pubname ,
dbinfos . dbinfo [ i ] . dbname ) ;
pg_log_warning_hint ( " Drop this publication before trying again. " ) ;
}
if ( dbinfo [ i ] . made_replslot )
if ( dbinfos . dbinfo [ i ] . made_replslot )
{
pg_log_warning ( " replication slot \" %s \" created in database \" %s \" on primary was left behind " ,
dbinfo [ i ] . replslotname , dbinfo [ i ] . dbname ) ;
dbinfos . dbinfo [ i ] . replslotname ,
dbinfos . dbinfo [ i ] . dbname ) ;
pg_log_warning_hint ( " Drop this replication slot soon to avoid retention of WAL files. " ) ;
}
}
@ -227,6 +241,7 @@ usage(void)
printf ( _ ( " -P, --publisher-server=CONNSTR publisher connection string \n " ) ) ;
printf ( _ ( " -s, --socketdir=DIR socket directory to use (default current dir.) \n " ) ) ;
printf ( _ ( " -t, --recovery-timeout=SECS seconds to wait for recovery to end \n " ) ) ;
printf ( _ ( " -T, --enable-two-phase enable two-phase commit for all subscriptions \n " ) ) ;
printf ( _ ( " -U, --subscriber-username=NAME user name for subscriber connection \n " ) ) ;
printf ( _ ( " -v, --verbose output verbose messages \n " ) ) ;
printf ( _ ( " --config-file=FILENAME use specified main server configuration \n "
@ -479,9 +494,10 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt,
dbinfo [ i ] . pubname ? dbinfo [ i ] . pubname : " (auto) " ,
dbinfo [ i ] . replslotname ? dbinfo [ i ] . replslotname : " (auto) " ,
dbinfo [ i ] . pubconninfo ) ;
pg_log_debug ( " subscriber(%d): subscription: %s ; connection string: %s " , i ,
pg_log_debug ( " subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s " , i ,
dbinfo [ i ] . subname ? dbinfo [ i ] . subname : " (auto) " ,
dbinfo [ i ] . subconninfo ) ;
dbinfo [ i ] . subconninfo ,
dbinfos . two_phase ? " true " : " false " ) ;
if ( num_pubs > 0 )
pubcell = pubcell - > next ;
@ -938,11 +954,12 @@ check_publisher(const struct LogicalRepInfo *dbinfo)
failed = true ;
}
if ( max_prepared_transactions ! = 0 )
if ( max_prepared_transactions ! = 0 & & ! dbinfos . two_phase )
{
pg_log_warning ( " two_phase option will not be enabled for replication slots " ) ;
pg_log_warning_detail ( " Subscriptions will be created with the two_phase option disabled. "
" Prepared transactions will be replicated at COMMIT PREPARED. " ) ;
pg_log_warning_hint ( " You can use --enable-two-phase switch to enable two_phase. " ) ;
}
/*
@ -1345,8 +1362,9 @@ create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
slot_name_esc = PQescapeLiteral ( conn , slot_name , strlen ( slot_name ) ) ;
appendPQExpBuffer ( str ,
" SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, false, false) " ,
slot_name_esc ) ;
" SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false) " ,
slot_name_esc ,
dbinfos . two_phase ? " true " : " false " ) ;
PQfreemem ( slot_name_esc ) ;
@ -1722,8 +1740,9 @@ create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
appendPQExpBuffer ( str ,
" CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
" WITH (create_slot = false, enabled = false, "
" slot_name = %s, copy_data = false) " ,
subname_esc , pubconninfo_esc , pubname_esc , replslotname_esc ) ;
" slot_name = %s, copy_data = false, two_phase = %s) " ,
subname_esc , pubconninfo_esc , pubname_esc , replslotname_esc ,
dbinfos . two_phase ? " true " : " false " ) ;
PQfreemem ( pubname_esc ) ;
PQfreemem ( subname_esc ) ;
@ -1895,6 +1914,7 @@ main(int argc, char **argv)
{ " publisher-server " , required_argument , NULL , ' P ' } ,
{ " socketdir " , required_argument , NULL , ' s ' } ,
{ " recovery-timeout " , required_argument , NULL , ' t ' } ,
{ " enable-two-phase " , no_argument , NULL , ' T ' } ,
{ " subscriber-username " , required_argument , NULL , ' U ' } ,
{ " verbose " , no_argument , NULL , ' v ' } ,
{ " version " , no_argument , NULL , ' V ' } ,
@ -1950,6 +1970,7 @@ main(int argc, char **argv)
opt . socket_dir = NULL ;
opt . sub_port = DEFAULT_SUB_PORT ;
opt . sub_username = NULL ;
opt . two_phase = false ;
opt . database_names = ( SimpleStringList )
{
0
@ -1972,7 +1993,7 @@ main(int argc, char **argv)
get_restricted_token ( ) ;
while ( ( c = getopt_long ( argc , argv , " d:D:np:P:s:t:U:v " ,
while ( ( c = getopt_long ( argc , argv , " d:D:np:P:s:t:T U:v " ,
long_options , & option_index ) ) ! = - 1 )
{
switch ( c )
@ -2009,6 +2030,9 @@ main(int argc, char **argv)
case ' t ' :
opt . recovery_timeout = atoi ( optarg ) ;
break ;
case ' T ' :
opt . two_phase = true ;
break ;
case ' U ' :
opt . sub_username = pg_strdup ( optarg ) ;
break ;
@ -2170,12 +2194,14 @@ main(int argc, char **argv)
/* Rudimentary check for a data directory */
check_data_directory ( subscriber_dir ) ;
dbinfos . two_phase = opt . two_phase ;
/*
* Store database information for publisher and subscriber . It should be
* called before atexit ( ) because its return is used in the
* cleanup_objects_atexit ( ) .
*/
dbinfo = store_pub_sub_info ( & opt , pub_base_conninfo , sub_base_conninfo ) ;
dbinfos . dbinfo = store_pub_sub_info ( & opt , pub_base_conninfo , sub_base_conninfo ) ;
/* Register a function to clean up objects in case of failure */
atexit ( cleanup_objects_atexit ) ;
@ -2184,7 +2210,7 @@ main(int argc, char **argv)
* Check if the subscriber data directory has the same system identifier
* than the publisher data directory .
*/
pub_sysid = get_primary_sysid ( dbinfo [ 0 ] . pubconninfo ) ;
pub_sysid = get_primary_sysid ( dbinfos . dbinfo [ 0 ] . pubconninfo ) ;
sub_sysid = get_standby_sysid ( subscriber_dir ) ;
if ( pub_sysid ! = sub_sysid )
pg_fatal ( " subscriber data directory is not a copy of the source database cluster " ) ;
@ -2214,10 +2240,10 @@ main(int argc, char **argv)
start_standby_server ( & opt , true , false ) ;
/* Check if the standby server is ready for logical replication */
check_subscriber ( dbinfo ) ;
check_subscriber ( dbinfos . dbinfo ) ;
/* Check if the primary server is ready for logical replication */
check_publisher ( dbinfo ) ;
check_publisher ( dbinfos . dbinfo ) ;
/*
* Stop the target server . The recovery process requires that the server
@ -2230,10 +2256,10 @@ main(int argc, char **argv)
stop_standby_server ( subscriber_dir ) ;
/* Create the required objects for each database on publisher */
consistent_lsn = setup_publisher ( dbinfo ) ;
consistent_lsn = setup_publisher ( dbinfos . dbinfo ) ;
/* Write the required recovery parameters */
setup_recovery ( dbinfo , subscriber_dir , consistent_lsn ) ;
setup_recovery ( dbinfos . dbinfo , subscriber_dir , consistent_lsn ) ;
/*
* Start subscriber so the recovery parameters will take effect . Wait
@ -2244,7 +2270,7 @@ main(int argc, char **argv)
start_standby_server ( & opt , true , true ) ;
/* Waiting the subscriber to be promoted */
wait_for_end_recovery ( dbinfo [ 0 ] . subconninfo , & opt ) ;
wait_for_end_recovery ( dbinfos . dbinfo [ 0 ] . subconninfo , & opt ) ;
/*
* Create the subscription for each database on subscriber . It does not
@ -2252,13 +2278,13 @@ main(int argc, char **argv)
* point to the LSN reported by setup_publisher ( ) . It also cleans up
* publications created by this tool and replication to the standby .
*/
setup_subscriber ( dbinfo , consistent_lsn ) ;
setup_subscriber ( dbinfos . dbinfo , consistent_lsn ) ;
/* Remove primary_slot_name if it exists on primary */
drop_primary_replication_slot ( dbinfo , primary_slot_name ) ;
drop_primary_replication_slot ( dbinfos . dbinfo , primary_slot_name ) ;
/* Remove failover replication slots if they exist on subscriber */
drop_failover_replication_slots ( dbinfo ) ;
drop_failover_replication_slots ( dbinfos . dbinfo ) ;
/* Stop the subscriber */
pg_log_info ( " stopping the subscriber " ) ;