|
|
|
@ -35,7 +35,7 @@ |
|
|
|
|
#include "access/xlogrecovery.h" |
|
|
|
|
#include "catalog/pg_database.h" |
|
|
|
|
#include "commands/dbcommands.h" |
|
|
|
|
#include "replication/logical.h" |
|
|
|
|
#include "replication/slot.h" |
|
|
|
|
#include "replication/slotsync.h" |
|
|
|
|
#include "storage/ipc.h" |
|
|
|
|
#include "storage/lmgr.h" |
|
|
|
@ -368,14 +368,13 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) |
|
|
|
|
* XXX should this be changed to elog(DEBUG1) perhaps? |
|
|
|
|
*/ |
|
|
|
|
ereport(LOG, |
|
|
|
|
errmsg("could not sync slot information as remote slot precedes local slot:" |
|
|
|
|
" remote slot \"%s\": LSN (%X/%X), catalog xmin (%u) local slot: LSN (%X/%X), catalog xmin (%u)", |
|
|
|
|
remote_slot->name, |
|
|
|
|
LSN_FORMAT_ARGS(remote_slot->restart_lsn), |
|
|
|
|
remote_slot->catalog_xmin, |
|
|
|
|
LSN_FORMAT_ARGS(slot->data.restart_lsn), |
|
|
|
|
slot->data.catalog_xmin)); |
|
|
|
|
|
|
|
|
|
errmsg("could not sync slot \"%s\" as remote slot precedes local slot", |
|
|
|
|
remote_slot->name), |
|
|
|
|
errdetail("Remote slot has LSN %X/%X and catalog xmin %u, but local slot has LSN %X/%X and catalog xmin %u.", |
|
|
|
|
LSN_FORMAT_ARGS(remote_slot->restart_lsn), |
|
|
|
|
remote_slot->catalog_xmin, |
|
|
|
|
LSN_FORMAT_ARGS(slot->data.restart_lsn), |
|
|
|
|
slot->data.catalog_xmin)); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -569,8 +568,12 @@ synchronize_slots(WalReceiverConn *wrconn) |
|
|
|
|
|
|
|
|
|
WalRcvExecResult *res; |
|
|
|
|
TupleTableSlot *tupslot; |
|
|
|
|
StringInfoData s; |
|
|
|
|
List *remote_slot_list = NIL; |
|
|
|
|
const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn," |
|
|
|
|
" restart_lsn, catalog_xmin, two_phase, failover," |
|
|
|
|
" database, conflict_reason" |
|
|
|
|
" FROM pg_catalog.pg_replication_slots" |
|
|
|
|
" WHERE failover and NOT temporary"; |
|
|
|
|
|
|
|
|
|
SpinLockAcquire(&SlotSyncCtx->mutex); |
|
|
|
|
if (SlotSyncCtx->syncing) |
|
|
|
@ -586,19 +589,8 @@ synchronize_slots(WalReceiverConn *wrconn) |
|
|
|
|
|
|
|
|
|
syncing_slots = true; |
|
|
|
|
|
|
|
|
|
initStringInfo(&s); |
|
|
|
|
|
|
|
|
|
/* Construct query to fetch slots with failover enabled. */ |
|
|
|
|
appendStringInfo(&s, |
|
|
|
|
"SELECT slot_name, plugin, confirmed_flush_lsn," |
|
|
|
|
" restart_lsn, catalog_xmin, two_phase, failover," |
|
|
|
|
" database, conflict_reason" |
|
|
|
|
" FROM pg_catalog.pg_replication_slots" |
|
|
|
|
" WHERE failover and NOT temporary"); |
|
|
|
|
|
|
|
|
|
/* Execute the query */ |
|
|
|
|
res = walrcv_exec(wrconn, s.data, SLOTSYNC_COLUMN_COUNT, slotRow); |
|
|
|
|
pfree(s.data); |
|
|
|
|
res = walrcv_exec(wrconn, query, SLOTSYNC_COLUMN_COUNT, slotRow); |
|
|
|
|
|
|
|
|
|
if (res->status != WALRCV_OK_TUPLES) |
|
|
|
|
ereport(ERROR, |
|
|
|
@ -743,12 +735,12 @@ validate_remote_info(WalReceiverConn *wrconn) |
|
|
|
|
ereport(ERROR, |
|
|
|
|
errmsg("could not fetch primary_slot_name \"%s\" info from the primary server: %s", |
|
|
|
|
PrimarySlotName, res->err), |
|
|
|
|
errhint("Check if \"primary_slot_name\" is configured correctly.")); |
|
|
|
|
errhint("Check if primary_slot_name is configured correctly.")); |
|
|
|
|
|
|
|
|
|
tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); |
|
|
|
|
if (!tuplestore_gettupleslot(res->tuplestore, true, false, tupslot)) |
|
|
|
|
elog(ERROR, |
|
|
|
|
"failed to fetch tuple for the primary server slot specified by \"primary_slot_name\""); |
|
|
|
|
"failed to fetch tuple for the primary server slot specified by primary_slot_name"); |
|
|
|
|
|
|
|
|
|
remote_in_recovery = DatumGetBool(slot_getattr(tupslot, 1, &isnull)); |
|
|
|
|
Assert(!isnull); |
|
|
|
@ -764,9 +756,9 @@ validate_remote_info(WalReceiverConn *wrconn) |
|
|
|
|
if (!primary_slot_valid) |
|
|
|
|
ereport(ERROR, |
|
|
|
|
errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
|
|
|
|
errmsg("bad configuration for slot synchronization"), |
|
|
|
|
errmsg("slot synchronization requires valid primary_slot_name"), |
|
|
|
|
/* translator: second %s is a GUC variable name */ |
|
|
|
|
errdetail("The replication slot \"%s\" specified by \"%s\" does not exist on the primary server.", |
|
|
|
|
errdetail("The replication slot \"%s\" specified by %s does not exist on the primary server.", |
|
|
|
|
PrimarySlotName, "primary_slot_name")); |
|
|
|
|
|
|
|
|
|
ExecClearTuple(tupslot); |
|
|
|
@ -792,8 +784,7 @@ ValidateSlotSyncParams(void) |
|
|
|
|
ereport(ERROR, |
|
|
|
|
/* translator: %s is a GUC variable name */ |
|
|
|
|
errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
|
|
|
|
errmsg("bad configuration for slot synchronization"), |
|
|
|
|
errhint("\"%s\" must be defined.", "primary_slot_name")); |
|
|
|
|
errmsg("slot synchronization requires %s to be defined", "primary_slot_name")); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* hot_standby_feedback must be enabled to cooperate with the physical |
|
|
|
@ -804,15 +795,14 @@ ValidateSlotSyncParams(void) |
|
|
|
|
ereport(ERROR, |
|
|
|
|
/* translator: %s is a GUC variable name */ |
|
|
|
|
errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
|
|
|
|
errmsg("bad configuration for slot synchronization"), |
|
|
|
|
errhint("\"%s\" must be enabled.", "hot_standby_feedback")); |
|
|
|
|
errmsg("slot synchronization requires %s to be enabled", |
|
|
|
|
"hot_standby_feedback")); |
|
|
|
|
|
|
|
|
|
/* Logical slot sync/creation requires wal_level >= logical. */ |
|
|
|
|
if (wal_level < WAL_LEVEL_LOGICAL) |
|
|
|
|
ereport(ERROR, |
|
|
|
|
errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
|
|
|
|
errmsg("bad configuration for slot synchronization"), |
|
|
|
|
errhint("\"wal_level\" must be >= logical.")); |
|
|
|
|
errmsg("slot synchronization requires wal_level >= \"logical\"")); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* The primary_conninfo is required to make connection to primary for |
|
|
|
@ -822,8 +812,8 @@ ValidateSlotSyncParams(void) |
|
|
|
|
ereport(ERROR, |
|
|
|
|
/* translator: %s is a GUC variable name */ |
|
|
|
|
errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
|
|
|
|
errmsg("bad configuration for slot synchronization"), |
|
|
|
|
errhint("\"%s\" must be defined.", "primary_conninfo")); |
|
|
|
|
errmsg("slot synchronization requires %s to be defined", |
|
|
|
|
"primary_conninfo")); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* The slot synchronization needs a database connection for walrcv_exec to |
|
|
|
@ -834,12 +824,11 @@ ValidateSlotSyncParams(void) |
|
|
|
|
ereport(ERROR, |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* translator: 'dbname' is a specific option; %s is a GUC variable |
|
|
|
|
* name |
|
|
|
|
* translator: dbname is a specific option; %s is a GUC variable name |
|
|
|
|
*/ |
|
|
|
|
errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
|
|
|
|
errmsg("bad configuration for slot synchronization"), |
|
|
|
|
errhint("'dbname' must be specified in \"%s\".", "primary_conninfo")); |
|
|
|
|
errmsg("slot synchronization requires dbname to be specified in %s", |
|
|
|
|
"primary_conninfo")); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|