Use ProcNumber rather than pid in ReplicationSlot

This helps the next commit.

Reviewed-by: Chao Li <li.evan.chao@gmail.com>
Discussion: https://www.postgresql.org/message-id/4cc13ba1-4248-4884-b6ba-4805349e7f39@iki.fi
pull/272/head
Heikki Linnakangas 6 days ago
parent f33c585774
commit ddc3250208
  1. 2
      src/backend/replication/logical/slotsync.c
  2. 63
      src/backend/replication/slot.c
  3. 13
      src/backend/replication/slotfuncs.c
  4. 7
      src/include/replication/slot.h

@ -1757,7 +1757,7 @@ update_synced_slots_inactive_since(void)
Assert(SlotIsLogical(s));
/* The slot must not be acquired by any process */
Assert(s->active_pid == 0);
Assert(s->active_proc == INVALID_PROC_NUMBER);
/* Use the same inactive_since time for all the slots. */
if (now == 0)

@ -226,6 +226,7 @@ ReplicationSlotsShmemInit(void)
ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
/* everything else is zeroed by the memset above */
slot->active_proc = INVALID_PROC_NUMBER;
SpinLockInit(&slot->mutex);
LWLockInitialize(&slot->io_in_progress_lock,
LWTRANCHE_REPLICATION_SLOT_IO);
@ -461,7 +462,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
* be doing that. So it's safe to initialize the slot.
*/
Assert(!slot->in_use);
Assert(slot->active_pid == 0);
Assert(slot->active_proc == INVALID_PROC_NUMBER);
/* first initialize persistent data */
memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
@ -505,8 +506,8 @@ ReplicationSlotCreate(const char *name, bool db_specific,
/* We can now mark the slot active, and that makes it our slot. */
SpinLockAcquire(&slot->mutex);
Assert(slot->active_pid == 0);
slot->active_pid = MyProcPid;
Assert(slot->active_proc == INVALID_PROC_NUMBER);
slot->active_proc = MyProcNumber;
SpinLockRelease(&slot->mutex);
MyReplicationSlot = slot;
@ -620,6 +621,7 @@ void
ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
{
ReplicationSlot *s;
ProcNumber active_proc;
int active_pid;
Assert(name != NULL);
@ -672,17 +674,18 @@ retry:
* to inactive_since in InvalidatePossiblyObsoleteSlot.
*/
SpinLockAcquire(&s->mutex);
if (s->active_pid == 0)
s->active_pid = MyProcPid;
active_pid = s->active_pid;
if (s->active_proc == INVALID_PROC_NUMBER)
s->active_proc = MyProcNumber;
active_proc = s->active_proc;
ReplicationSlotSetInactiveSince(s, 0, false);
SpinLockRelease(&s->mutex);
}
else
{
s->active_pid = active_pid = MyProcPid;
s->active_proc = active_proc = MyProcNumber;
ReplicationSlotSetInactiveSince(s, 0, true);
}
active_pid = GetPGProcByNumber(active_proc)->pid;
LWLockRelease(ReplicationSlotControlLock);
/*
@ -690,7 +693,7 @@ retry:
* wait until the owning process signals us that it's been released, or
* error out.
*/
if (active_pid != MyProcPid)
if (active_proc != MyProcNumber)
{
if (!nowait)
{
@ -762,7 +765,7 @@ ReplicationSlotRelease(void)
bool is_logical;
TimestampTz now = 0;
Assert(slot != NULL && slot->active_pid != 0);
Assert(slot != NULL && slot->active_proc != INVALID_PROC_NUMBER);
is_logical = SlotIsLogical(slot);
@ -815,7 +818,7 @@ ReplicationSlotRelease(void)
* disconnecting, but wake up others that may be waiting for it.
*/
SpinLockAcquire(&slot->mutex);
slot->active_pid = 0;
slot->active_proc = INVALID_PROC_NUMBER;
ReplicationSlotSetInactiveSince(slot, now, false);
SpinLockRelease(&slot->mutex);
ConditionVariableBroadcast(&slot->active_cv);
@ -877,7 +880,7 @@ restart:
found_valid_logicalslot |=
(SlotIsLogical(s) && s->data.invalidated == RS_INVAL_NONE);
if ((s->active_pid == MyProcPid &&
if ((s->active_proc == MyProcNumber &&
(!synced_only || s->data.synced)))
{
Assert(s->data.persistency == RS_TEMPORARY);
@ -1088,7 +1091,7 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
bool fail_softly = slot->data.persistency != RS_PERSISTENT;
SpinLockAcquire(&slot->mutex);
slot->active_pid = 0;
slot->active_proc = INVALID_PROC_NUMBER;
SpinLockRelease(&slot->mutex);
/* wake up anyone waiting on this slot */
@ -1110,7 +1113,7 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
* Also wake up processes waiting for it.
*/
LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
slot->active_pid = 0;
slot->active_proc = INVALID_PROC_NUMBER;
slot->in_use = false;
LWLockRelease(ReplicationSlotControlLock);
ConditionVariableBroadcast(&slot->active_cv);
@ -1476,7 +1479,7 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
/* count slots with spinlock held */
SpinLockAcquire(&s->mutex);
(*nslots)++;
if (s->active_pid != 0)
if (s->active_proc != INVALID_PROC_NUMBER)
(*nactive)++;
SpinLockRelease(&s->mutex);
}
@ -1520,7 +1523,7 @@ restart:
{
ReplicationSlot *s;
char *slotname;
int active_pid;
ProcNumber active_proc;
s = &ReplicationSlotCtl->replication_slots[i];
@ -1550,11 +1553,11 @@ restart:
SpinLockAcquire(&s->mutex);
/* can't change while ReplicationSlotControlLock is held */
slotname = NameStr(s->data.name);
active_pid = s->active_pid;
if (active_pid == 0)
active_proc = s->active_proc;
if (active_proc == INVALID_PROC_NUMBER)
{
MyReplicationSlot = s;
s->active_pid = MyProcPid;
s->active_proc = MyProcNumber;
}
SpinLockRelease(&s->mutex);
@ -1579,11 +1582,11 @@ restart:
* XXX: We can consider shutting down the slot sync worker before
* trying to drop synced temporary slots here.
*/
if (active_pid)
if (active_proc != INVALID_PROC_NUMBER)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
errmsg("replication slot \"%s\" is active for PID %d",
slotname, active_pid)));
slotname, GetPGProcByNumber(active_proc)->pid)));
/*
* To avoid duplicating ReplicationSlotDropAcquired() and to avoid
@ -1974,6 +1977,7 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
{
XLogRecPtr restart_lsn;
NameData slotname;
ProcNumber active_proc;
int active_pid = 0;
ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
TimestampTz now = 0;
@ -2027,7 +2031,7 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
}
slotname = s->data.name;
active_pid = s->active_pid;
active_proc = s->active_proc;
/*
* If the slot can be acquired, do so and mark it invalidated
@ -2039,10 +2043,10 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
* is terminated. So, the inactive slot can only be invalidated
* immediately without being terminated.
*/
if (active_pid == 0)
if (active_proc == INVALID_PROC_NUMBER)
{
MyReplicationSlot = s;
s->active_pid = MyProcPid;
s->active_proc = MyProcNumber;
s->data.invalidated = invalidation_cause;
/*
@ -2058,6 +2062,11 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
/* Let caller know */
invalidated = true;
}
else
{
active_pid = GetPGProcByNumber(active_proc)->pid;
Assert(active_pid != 0);
}
SpinLockRelease(&s->mutex);
@ -2073,7 +2082,7 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
&slot_idle_usecs);
}
if (active_pid != 0)
if (active_proc != INVALID_PROC_NUMBER)
{
/*
* Prepare the sleep on the slot's condition variable before
@ -2107,7 +2116,7 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
if (MyBackendType == B_STARTUP)
(void) SendProcSignal(active_pid,
PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
INVALID_PROC_NUMBER);
active_proc);
else
(void) kill(active_pid, SIGTERM);
@ -2875,7 +2884,7 @@ RestoreSlotFromDisk(const char *name)
slot->candidate_restart_valid = InvalidXLogRecPtr;
slot->in_use = true;
slot->active_pid = 0;
slot->active_proc = INVALID_PROC_NUMBER;
/*
* Set the time since the slot has become inactive after loading the
@ -3158,7 +3167,7 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
SpinLockAcquire(&slot->mutex);
restart_lsn = slot->data.restart_lsn;
invalidated = slot->data.invalidated != RS_INVAL_NONE;
inactive = slot->active_pid == 0;
inactive = slot->active_proc == INVALID_PROC_NUMBER;
SpinLockRelease(&slot->mutex);
if (invalidated)

@ -20,6 +20,7 @@
#include "replication/logical.h"
#include "replication/slot.h"
#include "replication/slotsync.h"
#include "storage/proc.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
@ -309,10 +310,10 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
values[i++] = ObjectIdGetDatum(slot_contents.data.database);
values[i++] = BoolGetDatum(slot_contents.data.persistency == RS_TEMPORARY);
values[i++] = BoolGetDatum(slot_contents.active_pid != 0);
values[i++] = BoolGetDatum(slot_contents.active_proc != INVALID_PROC_NUMBER);
if (slot_contents.active_pid != 0)
values[i++] = Int32GetDatum(slot_contents.active_pid);
if (slot_contents.active_proc != INVALID_PROC_NUMBER)
values[i++] = Int32GetDatum(GetPGProcByNumber(slot_contents.active_proc)->pid);
else
nulls[i++] = true;
@ -377,13 +378,13 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
*/
if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
{
int pid;
ProcNumber procno;
SpinLockAcquire(&slot->mutex);
pid = slot->active_pid;
procno = slot->active_proc;
slot_contents.data.restart_lsn = slot->data.restart_lsn;
SpinLockRelease(&slot->mutex);
if (pid != 0)
if (procno != INVALID_PROC_NUMBER)
{
values[i++] = CStringGetTextDatum("unreserved");
walstate = WALAVAIL_UNRESERVED;

@ -185,8 +185,11 @@ typedef struct ReplicationSlot
/* is this slot defined */
bool in_use;
/* Who is streaming out changes for this slot? 0 in unused slots. */
pid_t active_pid;
/*
* Who is streaming out changes for this slot? INVALID_PROC_NUMBER in
* unused slots.
*/
ProcNumber active_proc;
/* any outstanding modifications? */
bool just_dirtied;

Loading…
Cancel
Save