|
|
|
|
@ -226,6 +226,7 @@ ReplicationSlotsShmemInit(void) |
|
|
|
|
ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i]; |
|
|
|
|
|
|
|
|
|
/* everything else is zeroed by the memset above */ |
|
|
|
|
slot->active_proc = INVALID_PROC_NUMBER; |
|
|
|
|
SpinLockInit(&slot->mutex); |
|
|
|
|
LWLockInitialize(&slot->io_in_progress_lock, |
|
|
|
|
LWTRANCHE_REPLICATION_SLOT_IO); |
|
|
|
|
@ -461,7 +462,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, |
|
|
|
|
* be doing that. So it's safe to initialize the slot. |
|
|
|
|
*/ |
|
|
|
|
Assert(!slot->in_use); |
|
|
|
|
Assert(slot->active_pid == 0); |
|
|
|
|
Assert(slot->active_proc == INVALID_PROC_NUMBER); |
|
|
|
|
|
|
|
|
|
/* first initialize persistent data */ |
|
|
|
|
memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData)); |
|
|
|
|
@ -505,8 +506,8 @@ ReplicationSlotCreate(const char *name, bool db_specific, |
|
|
|
|
|
|
|
|
|
/* We can now mark the slot active, and that makes it our slot. */ |
|
|
|
|
SpinLockAcquire(&slot->mutex); |
|
|
|
|
Assert(slot->active_pid == 0); |
|
|
|
|
slot->active_pid = MyProcPid; |
|
|
|
|
Assert(slot->active_proc == INVALID_PROC_NUMBER); |
|
|
|
|
slot->active_proc = MyProcNumber; |
|
|
|
|
SpinLockRelease(&slot->mutex); |
|
|
|
|
MyReplicationSlot = slot; |
|
|
|
|
|
|
|
|
|
@ -620,6 +621,7 @@ void |
|
|
|
|
ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid) |
|
|
|
|
{ |
|
|
|
|
ReplicationSlot *s; |
|
|
|
|
ProcNumber active_proc; |
|
|
|
|
int active_pid; |
|
|
|
|
|
|
|
|
|
Assert(name != NULL); |
|
|
|
|
@ -672,17 +674,18 @@ retry: |
|
|
|
|
* to inactive_since in InvalidatePossiblyObsoleteSlot. |
|
|
|
|
*/ |
|
|
|
|
SpinLockAcquire(&s->mutex); |
|
|
|
|
if (s->active_pid == 0) |
|
|
|
|
s->active_pid = MyProcPid; |
|
|
|
|
active_pid = s->active_pid; |
|
|
|
|
if (s->active_proc == INVALID_PROC_NUMBER) |
|
|
|
|
s->active_proc = MyProcNumber; |
|
|
|
|
active_proc = s->active_proc; |
|
|
|
|
ReplicationSlotSetInactiveSince(s, 0, false); |
|
|
|
|
SpinLockRelease(&s->mutex); |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
s->active_pid = active_pid = MyProcPid; |
|
|
|
|
s->active_proc = active_proc = MyProcNumber; |
|
|
|
|
ReplicationSlotSetInactiveSince(s, 0, true); |
|
|
|
|
} |
|
|
|
|
active_pid = GetPGProcByNumber(active_proc)->pid; |
|
|
|
|
LWLockRelease(ReplicationSlotControlLock); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
@ -690,7 +693,7 @@ retry: |
|
|
|
|
* wait until the owning process signals us that it's been released, or |
|
|
|
|
* error out. |
|
|
|
|
*/ |
|
|
|
|
if (active_pid != MyProcPid) |
|
|
|
|
if (active_proc != MyProcNumber) |
|
|
|
|
{ |
|
|
|
|
if (!nowait) |
|
|
|
|
{ |
|
|
|
|
@ -762,7 +765,7 @@ ReplicationSlotRelease(void) |
|
|
|
|
bool is_logical; |
|
|
|
|
TimestampTz now = 0; |
|
|
|
|
|
|
|
|
|
Assert(slot != NULL && slot->active_pid != 0); |
|
|
|
|
Assert(slot != NULL && slot->active_proc != INVALID_PROC_NUMBER); |
|
|
|
|
|
|
|
|
|
is_logical = SlotIsLogical(slot); |
|
|
|
|
|
|
|
|
|
@ -815,7 +818,7 @@ ReplicationSlotRelease(void) |
|
|
|
|
* disconnecting, but wake up others that may be waiting for it. |
|
|
|
|
*/ |
|
|
|
|
SpinLockAcquire(&slot->mutex); |
|
|
|
|
slot->active_pid = 0; |
|
|
|
|
slot->active_proc = INVALID_PROC_NUMBER; |
|
|
|
|
ReplicationSlotSetInactiveSince(slot, now, false); |
|
|
|
|
SpinLockRelease(&slot->mutex); |
|
|
|
|
ConditionVariableBroadcast(&slot->active_cv); |
|
|
|
|
@ -877,7 +880,7 @@ restart: |
|
|
|
|
found_valid_logicalslot |= |
|
|
|
|
(SlotIsLogical(s) && s->data.invalidated == RS_INVAL_NONE); |
|
|
|
|
|
|
|
|
|
if ((s->active_pid == MyProcPid && |
|
|
|
|
if ((s->active_proc == MyProcNumber && |
|
|
|
|
(!synced_only || s->data.synced))) |
|
|
|
|
{ |
|
|
|
|
Assert(s->data.persistency == RS_TEMPORARY); |
|
|
|
|
@ -1088,7 +1091,7 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) |
|
|
|
|
bool fail_softly = slot->data.persistency != RS_PERSISTENT; |
|
|
|
|
|
|
|
|
|
SpinLockAcquire(&slot->mutex); |
|
|
|
|
slot->active_pid = 0; |
|
|
|
|
slot->active_proc = INVALID_PROC_NUMBER; |
|
|
|
|
SpinLockRelease(&slot->mutex); |
|
|
|
|
|
|
|
|
|
/* wake up anyone waiting on this slot */ |
|
|
|
|
@ -1110,7 +1113,7 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) |
|
|
|
|
* Also wake up processes waiting for it. |
|
|
|
|
*/ |
|
|
|
|
LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); |
|
|
|
|
slot->active_pid = 0; |
|
|
|
|
slot->active_proc = INVALID_PROC_NUMBER; |
|
|
|
|
slot->in_use = false; |
|
|
|
|
LWLockRelease(ReplicationSlotControlLock); |
|
|
|
|
ConditionVariableBroadcast(&slot->active_cv); |
|
|
|
|
@ -1476,7 +1479,7 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive) |
|
|
|
|
/* count slots with spinlock held */ |
|
|
|
|
SpinLockAcquire(&s->mutex); |
|
|
|
|
(*nslots)++; |
|
|
|
|
if (s->active_pid != 0) |
|
|
|
|
if (s->active_proc != INVALID_PROC_NUMBER) |
|
|
|
|
(*nactive)++; |
|
|
|
|
SpinLockRelease(&s->mutex); |
|
|
|
|
} |
|
|
|
|
@ -1520,7 +1523,7 @@ restart: |
|
|
|
|
{ |
|
|
|
|
ReplicationSlot *s; |
|
|
|
|
char *slotname; |
|
|
|
|
int active_pid; |
|
|
|
|
ProcNumber active_proc; |
|
|
|
|
|
|
|
|
|
s = &ReplicationSlotCtl->replication_slots[i]; |
|
|
|
|
|
|
|
|
|
@ -1550,11 +1553,11 @@ restart: |
|
|
|
|
SpinLockAcquire(&s->mutex); |
|
|
|
|
/* can't change while ReplicationSlotControlLock is held */ |
|
|
|
|
slotname = NameStr(s->data.name); |
|
|
|
|
active_pid = s->active_pid; |
|
|
|
|
if (active_pid == 0) |
|
|
|
|
active_proc = s->active_proc; |
|
|
|
|
if (active_proc == INVALID_PROC_NUMBER) |
|
|
|
|
{ |
|
|
|
|
MyReplicationSlot = s; |
|
|
|
|
s->active_pid = MyProcPid; |
|
|
|
|
s->active_proc = MyProcNumber; |
|
|
|
|
} |
|
|
|
|
SpinLockRelease(&s->mutex); |
|
|
|
|
|
|
|
|
|
@ -1579,11 +1582,11 @@ restart: |
|
|
|
|
* XXX: We can consider shutting down the slot sync worker before |
|
|
|
|
* trying to drop synced temporary slots here. |
|
|
|
|
*/ |
|
|
|
|
if (active_pid) |
|
|
|
|
if (active_proc != INVALID_PROC_NUMBER) |
|
|
|
|
ereport(ERROR, |
|
|
|
|
(errcode(ERRCODE_OBJECT_IN_USE), |
|
|
|
|
errmsg("replication slot \"%s\" is active for PID %d", |
|
|
|
|
slotname, active_pid))); |
|
|
|
|
slotname, GetPGProcByNumber(active_proc)->pid))); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* To avoid duplicating ReplicationSlotDropAcquired() and to avoid |
|
|
|
|
@ -1974,6 +1977,7 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, |
|
|
|
|
{ |
|
|
|
|
XLogRecPtr restart_lsn; |
|
|
|
|
NameData slotname; |
|
|
|
|
ProcNumber active_proc; |
|
|
|
|
int active_pid = 0; |
|
|
|
|
ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE; |
|
|
|
|
TimestampTz now = 0; |
|
|
|
|
@ -2027,7 +2031,7 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
slotname = s->data.name; |
|
|
|
|
active_pid = s->active_pid; |
|
|
|
|
active_proc = s->active_proc; |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* If the slot can be acquired, do so and mark it invalidated |
|
|
|
|
@ -2039,10 +2043,10 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, |
|
|
|
|
* is terminated. So, the inactive slot can only be invalidated |
|
|
|
|
* immediately without being terminated. |
|
|
|
|
*/ |
|
|
|
|
if (active_pid == 0) |
|
|
|
|
if (active_proc == INVALID_PROC_NUMBER) |
|
|
|
|
{ |
|
|
|
|
MyReplicationSlot = s; |
|
|
|
|
s->active_pid = MyProcPid; |
|
|
|
|
s->active_proc = MyProcNumber; |
|
|
|
|
s->data.invalidated = invalidation_cause; |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
@ -2058,6 +2062,11 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, |
|
|
|
|
/* Let caller know */ |
|
|
|
|
invalidated = true; |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
active_pid = GetPGProcByNumber(active_proc)->pid; |
|
|
|
|
Assert(active_pid != 0); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
SpinLockRelease(&s->mutex); |
|
|
|
|
|
|
|
|
|
@ -2073,7 +2082,7 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, |
|
|
|
|
&slot_idle_usecs); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (active_pid != 0) |
|
|
|
|
if (active_proc != INVALID_PROC_NUMBER) |
|
|
|
|
{ |
|
|
|
|
/*
|
|
|
|
|
* Prepare the sleep on the slot's condition variable before |
|
|
|
|
@ -2107,7 +2116,7 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, |
|
|
|
|
if (MyBackendType == B_STARTUP) |
|
|
|
|
(void) SendProcSignal(active_pid, |
|
|
|
|
PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, |
|
|
|
|
INVALID_PROC_NUMBER); |
|
|
|
|
active_proc); |
|
|
|
|
else |
|
|
|
|
(void) kill(active_pid, SIGTERM); |
|
|
|
|
|
|
|
|
|
@ -2875,7 +2884,7 @@ RestoreSlotFromDisk(const char *name) |
|
|
|
|
slot->candidate_restart_valid = InvalidXLogRecPtr; |
|
|
|
|
|
|
|
|
|
slot->in_use = true; |
|
|
|
|
slot->active_pid = 0; |
|
|
|
|
slot->active_proc = INVALID_PROC_NUMBER; |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Set the time since the slot has become inactive after loading the |
|
|
|
|
@ -3158,7 +3167,7 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) |
|
|
|
|
SpinLockAcquire(&slot->mutex); |
|
|
|
|
restart_lsn = slot->data.restart_lsn; |
|
|
|
|
invalidated = slot->data.invalidated != RS_INVAL_NONE; |
|
|
|
|
inactive = slot->active_pid == 0; |
|
|
|
|
inactive = slot->active_proc == INVALID_PROC_NUMBER; |
|
|
|
|
SpinLockRelease(&slot->mutex); |
|
|
|
|
|
|
|
|
|
if (invalidated) |
|
|
|
|
|