@ -124,8 +124,8 @@ static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *
static void enable_subscription ( PGconn * conn , const struct LogicalRepInfo * dbinfo ) ;
static void enable_subscription ( PGconn * conn , const struct LogicalRepInfo * dbinfo ) ;
static void check_and_drop_existing_subscriptions ( PGconn * conn ,
static void check_and_drop_existing_subscriptions ( PGconn * conn ,
const struct LogicalRepInfo * dbinfo ) ;
const struct LogicalRepInfo * dbinfo ) ;
static void drop_existing_subscriptions ( PGconn * conn , const char * subname ,
static void drop_existing_subscription ( PGconn * conn , const char * subname ,
const char * dbname ) ;
const char * dbname ) ;
static void get_publisher_databases ( struct CreateSubscriberOptions * opt ,
static void get_publisher_databases ( struct CreateSubscriberOptions * opt ,
bool dbnamespecified ) ;
bool dbnamespecified ) ;
@ -688,13 +688,20 @@ modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
cf - > system_identifier | = ( ( uint64 ) tv . tv_usec ) < < 12 ;
cf - > system_identifier | = ( ( uint64 ) tv . tv_usec ) < < 12 ;
cf - > system_identifier | = getpid ( ) & 0xFFF ;
cf - > system_identifier | = getpid ( ) & 0xFFF ;
if ( ! dry_run )
if ( dry_run )
pg_log_info ( " dry-run: would set system identifier to % " PRIu64 " on subscriber " ,
cf - > system_identifier ) ;
else
{
update_controlfile ( subscriber_dir , cf , true ) ;
update_controlfile ( subscriber_dir , cf , true ) ;
pg_log_info ( " system identifier is % " PRIu64 " on subscriber " ,
cf - > system_identifier ) ;
}
pg_log_info ( " system identifier is % " PRIu64 " on subscriber " ,
if ( dry_run )
cf - > system_identifier ) ;
pg_log_info ( " dry-run: would run pg_resetwal on the subscriber " ) ;
else
pg_log_info ( " running pg_resetwal on the subscriber " ) ;
pg_log_info ( " running pg_resetwal on the subscriber " ) ;
cmd_str = psprintf ( " \" %s \" -D \" %s \" > \" %s \" " , pg_resetwal_path ,
cmd_str = psprintf ( " \" %s \" -D \" %s \" > \" %s \" " , pg_resetwal_path ,
subscriber_dir , DEVNULL ) ;
subscriber_dir , DEVNULL ) ;
@ -983,9 +990,9 @@ check_publisher(const struct LogicalRepInfo *dbinfo)
}
}
/*
/*
* V alidate ' max_slot_wal_keep_size ' . If this parameter is set to a
* In dry - run mode , v alidate ' max_slot_wal_keep_size ' . If this parameter
* non - default value , it may cause replication failures due to required
* is set to a non - default value , it may cause replication failures due to
* WAL files being prematurely removed .
* required WAL files being prematurely removed .
*/
*/
if ( dry_run & & ( strcmp ( max_slot_wal_keep_size , " -1 " ) ! = 0 ) )
if ( dry_run & & ( strcmp ( max_slot_wal_keep_size , " -1 " ) ! = 0 ) )
{
{
@ -1113,7 +1120,7 @@ check_subscriber(const struct LogicalRepInfo *dbinfo)
* node .
* node .
*/
*/
static void
static void
drop_existing_subscriptions ( PGconn * conn , const char * subname , const char * dbname )
drop_existing_subscription ( PGconn * conn , const char * subname , const char * dbname )
{
{
PQExpBuffer query = createPQExpBuffer ( ) ;
PQExpBuffer query = createPQExpBuffer ( ) ;
PGresult * res ;
PGresult * res ;
@ -1130,11 +1137,14 @@ drop_existing_subscriptions(PGconn *conn, const char *subname, const char *dbnam
subname ) ;
subname ) ;
appendPQExpBuffer ( query , " DROP SUBSCRIPTION %s; " , subname ) ;
appendPQExpBuffer ( query , " DROP SUBSCRIPTION %s; " , subname ) ;
pg_log_info ( " dropping subscription \" %s \" in database \" %s \" " ,
if ( dry_run )
subname , dbname ) ;
pg_log_info ( " dry-run: would drop subscription \" %s \" in database \" %s \" " ,
subname , dbname ) ;
if ( ! dry_run )
else
{
{
pg_log_info ( " dropping subscription \" %s \" in database \" %s \" " ,
subname , dbname ) ;
res = PQexec ( conn , query - > data ) ;
res = PQexec ( conn , query - > data ) ;
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
if ( PQresultStatus ( res ) ! = PGRES_COMMAND_OK )
@ -1180,8 +1190,8 @@ check_and_drop_existing_subscriptions(PGconn *conn,
}
}
for ( int i = 0 ; i < PQntuples ( res ) ; i + + )
for ( int i = 0 ; i < PQntuples ( res ) ; i + + )
drop_existing_subscriptions ( conn , PQgetvalue ( res , i , 0 ) ,
drop_existing_subscription ( conn , PQgetvalue ( res , i , 0 ) ,
dbinfo - > dbname ) ;
dbinfo - > dbname ) ;
PQclear ( res ) ;
PQclear ( res ) ;
destroyPQExpBuffer ( query ) ;
destroyPQExpBuffer ( query ) ;
@ -1275,7 +1285,7 @@ setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const c
if ( dry_run )
if ( dry_run )
{
{
appendPQExpBufferStr ( recoveryconfcontents , " # dry run mode " ) ;
appendPQExpBufferStr ( recoveryconfcontents , " # dry run mode \n " ) ;
appendPQExpBuffer ( recoveryconfcontents ,
appendPQExpBuffer ( recoveryconfcontents ,
" recovery_target_lsn = '%X/%08X' \n " ,
" recovery_target_lsn = '%X/%08X' \n " ,
LSN_FORMAT_ARGS ( ( XLogRecPtr ) InvalidXLogRecPtr ) ) ;
LSN_FORMAT_ARGS ( ( XLogRecPtr ) InvalidXLogRecPtr ) ) ;
@ -1381,8 +1391,12 @@ create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
Assert ( conn ! = NULL ) ;
Assert ( conn ! = NULL ) ;
pg_log_info ( " creating the replication slot \" %s \" in database \" %s \" on publisher " ,
if ( dry_run )
slot_name , dbinfo - > dbname ) ;
pg_log_info ( " dry-run: would create the replication slot \" %s \" in database \" %s \" on publisher " ,
slot_name , dbinfo - > dbname ) ;
else
pg_log_info ( " creating the replication slot \" %s \" in database \" %s \" on publisher " ,
slot_name , dbinfo - > dbname ) ;
slot_name_esc = PQescapeLiteral ( conn , slot_name , strlen ( slot_name ) ) ;
slot_name_esc = PQescapeLiteral ( conn , slot_name , strlen ( slot_name ) ) ;
@ -1430,8 +1444,12 @@ drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
Assert ( conn ! = NULL ) ;
Assert ( conn ! = NULL ) ;
pg_log_info ( " dropping the replication slot \" %s \" in database \" %s \" " ,
if ( dry_run )
slot_name , dbinfo - > dbname ) ;
pg_log_info ( " dry-run: would drop the replication slot \" %s \" in database \" %s \" " ,
slot_name , dbinfo - > dbname ) ;
else
pg_log_info ( " dropping the replication slot \" %s \" in database \" %s \" " ,
slot_name , dbinfo - > dbname ) ;
slot_name_esc = PQescapeLiteral ( conn , slot_name , strlen ( slot_name ) ) ;
slot_name_esc = PQescapeLiteral ( conn , slot_name , strlen ( slot_name ) ) ;
@ -1575,13 +1593,8 @@ wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions
for ( ; ; )
for ( ; ; )
{
{
bool in_recovery = server_is_in_recovery ( conn ) ;
/* Did the recovery process finish? We're done if so. */
if ( dry_run | | ! server_is_in_recovery ( conn ) )
/*
* Does the recovery process finish ? In dry run mode , there is no
* recovery mode . Bail out as the recovery process has ended .
*/
if ( ! in_recovery | | dry_run )
{
{
status = POSTMASTER_READY ;
status = POSTMASTER_READY ;
recovery_ended = true ;
recovery_ended = true ;
@ -1657,8 +1670,12 @@ create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
PQclear ( res ) ;
PQclear ( res ) ;
resetPQExpBuffer ( str ) ;
resetPQExpBuffer ( str ) ;
pg_log_info ( " creating publication \" %s \" in database \" %s \" " ,
if ( dry_run )
dbinfo - > pubname , dbinfo - > dbname ) ;
pg_log_info ( " dry-run: would create publication \" %s \" in database \" %s \" " ,
dbinfo - > pubname , dbinfo - > dbname ) ;
else
pg_log_info ( " creating publication \" %s \" in database \" %s \" " ,
dbinfo - > pubname , dbinfo - > dbname ) ;
appendPQExpBuffer ( str , " CREATE PUBLICATION %s FOR ALL TABLES " ,
appendPQExpBuffer ( str , " CREATE PUBLICATION %s FOR ALL TABLES " ,
ipubname_esc ) ;
ipubname_esc ) ;
@ -1700,8 +1717,12 @@ drop_publication(PGconn *conn, const char *pubname, const char *dbname,
pubname_esc = PQescapeIdentifier ( conn , pubname , strlen ( pubname ) ) ;
pubname_esc = PQescapeIdentifier ( conn , pubname , strlen ( pubname ) ) ;
pg_log_info ( " dropping publication \" %s \" in database \" %s \" " ,
if ( dry_run )
pubname , dbname ) ;
pg_log_info ( " dry-run: would drop publication \" %s \" in database \" %s \" " ,
pubname , dbname ) ;
else
pg_log_info ( " dropping publication \" %s \" in database \" %s \" " ,
pubname , dbname ) ;
appendPQExpBuffer ( str , " DROP PUBLICATION %s " , pubname_esc ) ;
appendPQExpBuffer ( str , " DROP PUBLICATION %s " , pubname_esc ) ;
@ -1809,8 +1830,12 @@ create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
pubconninfo_esc = PQescapeLiteral ( conn , dbinfo - > pubconninfo , strlen ( dbinfo - > pubconninfo ) ) ;
pubconninfo_esc = PQescapeLiteral ( conn , dbinfo - > pubconninfo , strlen ( dbinfo - > pubconninfo ) ) ;
replslotname_esc = PQescapeLiteral ( conn , dbinfo - > replslotname , strlen ( dbinfo - > replslotname ) ) ;
replslotname_esc = PQescapeLiteral ( conn , dbinfo - > replslotname , strlen ( dbinfo - > replslotname ) ) ;
pg_log_info ( " creating subscription \" %s \" in database \" %s \" " ,
if ( dry_run )
dbinfo - > subname , dbinfo - > dbname ) ;
pg_log_info ( " dry-run: would create subscription \" %s \" in database \" %s \" " ,
dbinfo - > subname , dbinfo - > dbname ) ;
else
pg_log_info ( " creating subscription \" %s \" in database \" %s \" " ,
dbinfo - > subname , dbinfo - > dbname ) ;
appendPQExpBuffer ( str ,
appendPQExpBuffer ( str ,
" CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
" CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
@ -1907,8 +1932,12 @@ set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, cons
*/
*/
originname = psprintf ( " pg_%u " , suboid ) ;
originname = psprintf ( " pg_%u " , suboid ) ;
pg_log_info ( " setting the replication progress (node name \" %s \" , LSN %s) in database \" %s \" " ,
if ( dry_run )
originname , lsnstr , dbinfo - > dbname ) ;
pg_log_info ( " dry-run: would set the replication progress (node name \" %s \" , LSN %s) in database \" %s \" " ,
originname , lsnstr , dbinfo - > dbname ) ;
else
pg_log_info ( " setting the replication progress (node name \" %s \" , LSN %s) in database \" %s \" " ,
originname , lsnstr , dbinfo - > dbname ) ;
resetPQExpBuffer ( str ) ;
resetPQExpBuffer ( str ) ;
appendPQExpBuffer ( str ,
appendPQExpBuffer ( str ,
@ -1953,8 +1982,12 @@ enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
subname = PQescapeIdentifier ( conn , dbinfo - > subname , strlen ( dbinfo - > subname ) ) ;
subname = PQescapeIdentifier ( conn , dbinfo - > subname , strlen ( dbinfo - > subname ) ) ;
pg_log_info ( " enabling subscription \" %s \" in database \" %s \" " ,
if ( dry_run )
dbinfo - > subname , dbinfo - > dbname ) ;
pg_log_info ( " dry-run: would enable subscription \" %s \" in database \" %s \" " ,
dbinfo - > subname , dbinfo - > dbname ) ;
else
pg_log_info ( " enabling subscription \" %s \" in database \" %s \" " ,
dbinfo - > subname , dbinfo - > dbname ) ;
appendPQExpBuffer ( str , " ALTER SUBSCRIPTION %s ENABLE " , subname ) ;
appendPQExpBuffer ( str , " ALTER SUBSCRIPTION %s ENABLE " , subname ) ;