|
|
|
|
@ -85,6 +85,7 @@ static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *data |
|
|
|
|
const char *lsn); |
|
|
|
|
static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, |
|
|
|
|
const char *slotname); |
|
|
|
|
static void drop_failover_replication_slots(struct LogicalRepInfo *dbinfo); |
|
|
|
|
static char *create_logical_replication_slot(PGconn *conn, |
|
|
|
|
struct LogicalRepInfo *dbinfo); |
|
|
|
|
static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo, |
|
|
|
|
@ -1137,6 +1138,49 @@ drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotnam |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Drop failover replication slots on subscriber. After the transformation, |
|
|
|
|
* they have no use. |
|
|
|
|
* |
|
|
|
|
* XXX We do not fail here. Instead, we provide a warning so the user can drop |
|
|
|
|
* them later. |
|
|
|
|
*/ |
|
|
|
|
static void |
|
|
|
|
drop_failover_replication_slots(struct LogicalRepInfo *dbinfo) |
|
|
|
|
{ |
|
|
|
|
PGconn *conn; |
|
|
|
|
PGresult *res; |
|
|
|
|
|
|
|
|
|
conn = connect_database(dbinfo[0].subconninfo, false); |
|
|
|
|
if (conn != NULL) |
|
|
|
|
{ |
|
|
|
|
/* Get failover replication slot names */ |
|
|
|
|
res = PQexec(conn, |
|
|
|
|
"SELECT slot_name FROM pg_catalog.pg_replication_slots WHERE failover"); |
|
|
|
|
|
|
|
|
|
if (PQresultStatus(res) == PGRES_TUPLES_OK) |
|
|
|
|
{ |
|
|
|
|
/* Remove failover replication slots from subscriber */ |
|
|
|
|
for (int i = 0; i < PQntuples(res); i++) |
|
|
|
|
drop_replication_slot(conn, &dbinfo[0], PQgetvalue(res, i, 0)); |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
pg_log_warning("could not obtain failover replication slot information: %s", |
|
|
|
|
PQresultErrorMessage(res)); |
|
|
|
|
pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files."); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
PQclear(res); |
|
|
|
|
disconnect_database(conn, false); |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
pg_log_warning("could not drop failover replication slot"); |
|
|
|
|
pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files."); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Create a logical replication slot and returns a LSN. |
|
|
|
|
* |
|
|
|
|
@ -1268,7 +1312,7 @@ start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_ |
|
|
|
|
PQExpBuffer pg_ctl_cmd = createPQExpBuffer(); |
|
|
|
|
int rc; |
|
|
|
|
|
|
|
|
|
appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D \"%s\" -s", |
|
|
|
|
appendPQExpBuffer(pg_ctl_cmd, "\"%s\" start -D \"%s\" -s -o \"-c sync_replication_slots=off\"", |
|
|
|
|
pg_ctl_path, subscriber_dir); |
|
|
|
|
if (restricted_access) |
|
|
|
|
{ |
|
|
|
|
@ -2065,6 +2109,9 @@ main(int argc, char **argv) |
|
|
|
|
/* Remove primary_slot_name if it exists on primary */ |
|
|
|
|
drop_primary_replication_slot(dbinfo, primary_slot_name); |
|
|
|
|
|
|
|
|
|
/* Remove failover replication slots if they exist on subscriber */ |
|
|
|
|
drop_failover_replication_slots(dbinfo); |
|
|
|
|
|
|
|
|
|
/* Stop the subscriber */ |
|
|
|
|
pg_log_info("stopping the subscriber"); |
|
|
|
|
stop_standby_server(subscriber_dir); |
|
|
|
|
|