|
|
|
|
@ -99,8 +99,6 @@ ReplicationSlot *MyReplicationSlot = NULL; |
|
|
|
|
int max_replication_slots = 0; /* the maximum number of replication
|
|
|
|
|
* slots */ |
|
|
|
|
|
|
|
|
|
static int ReplicationSlotAcquireInternal(ReplicationSlot *slot, |
|
|
|
|
const char *name, SlotAcquireBehavior behavior); |
|
|
|
|
static void ReplicationSlotDropAcquired(void); |
|
|
|
|
static void ReplicationSlotDropPtr(ReplicationSlot *slot); |
|
|
|
|
|
|
|
|
|
@ -374,34 +372,16 @@ SearchNamedReplicationSlot(const char *name, bool need_lock) |
|
|
|
|
/*
|
|
|
|
|
* Find a previously created slot and mark it as used by this process. |
|
|
|
|
* |
|
|
|
|
* The return value is only useful if behavior is SAB_Inquire, in which |
|
|
|
|
* it's zero if we successfully acquired the slot, -1 if the slot no longer |
|
|
|
|
* exists, or the PID of the owning process otherwise. If behavior is |
|
|
|
|
* SAB_Error, then trying to acquire an owned slot is an error. |
|
|
|
|
* If SAB_Block, we sleep until the slot is released by the owning process. |
|
|
|
|
* An error is raised if nowait is true and the slot is currently in use. If |
|
|
|
|
* nowait is false, we sleep until the slot is released by the owning process. |
|
|
|
|
*/ |
|
|
|
|
int |
|
|
|
|
ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior) |
|
|
|
|
{ |
|
|
|
|
return ReplicationSlotAcquireInternal(NULL, name, behavior); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Mark the specified slot as used by this process. |
|
|
|
|
* |
|
|
|
|
* Only one of slot and name can be specified. |
|
|
|
|
* If slot == NULL, search for the slot with the given name. |
|
|
|
|
* |
|
|
|
|
* See comments about the return value in ReplicationSlotAcquire(). |
|
|
|
|
*/ |
|
|
|
|
static int |
|
|
|
|
ReplicationSlotAcquireInternal(ReplicationSlot *slot, const char *name, |
|
|
|
|
SlotAcquireBehavior behavior) |
|
|
|
|
void |
|
|
|
|
ReplicationSlotAcquire(const char *name, bool nowait) |
|
|
|
|
{ |
|
|
|
|
ReplicationSlot *s; |
|
|
|
|
int active_pid; |
|
|
|
|
|
|
|
|
|
AssertArg((slot == NULL) ^ (name == NULL)); |
|
|
|
|
AssertArg(name != NULL); |
|
|
|
|
|
|
|
|
|
retry: |
|
|
|
|
Assert(MyReplicationSlot == NULL); |
|
|
|
|
@ -412,17 +392,15 @@ retry: |
|
|
|
|
* Search for the slot with the specified name if the slot to acquire is |
|
|
|
|
* not given. If the slot is not found, we either return -1 or error out. |
|
|
|
|
*/ |
|
|
|
|
s = slot ? slot : SearchNamedReplicationSlot(name, false); |
|
|
|
|
s = SearchNamedReplicationSlot(name, false); |
|
|
|
|
if (s == NULL || !s->in_use) |
|
|
|
|
{ |
|
|
|
|
LWLockRelease(ReplicationSlotControlLock); |
|
|
|
|
|
|
|
|
|
if (behavior == SAB_Inquire) |
|
|
|
|
return -1; |
|
|
|
|
ereport(ERROR, |
|
|
|
|
(errcode(ERRCODE_UNDEFINED_OBJECT), |
|
|
|
|
errmsg("replication slot \"%s\" does not exist", |
|
|
|
|
name ? name : NameStr(slot->data.name)))); |
|
|
|
|
name))); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
@ -436,7 +414,7 @@ retry: |
|
|
|
|
* (We may end up not sleeping, but we don't want to do this while |
|
|
|
|
* holding the spinlock.) |
|
|
|
|
*/ |
|
|
|
|
if (behavior == SAB_Block) |
|
|
|
|
if (!nowait) |
|
|
|
|
ConditionVariablePrepareToSleep(&s->active_cv); |
|
|
|
|
|
|
|
|
|
SpinLockAcquire(&s->mutex); |
|
|
|
|
@ -456,13 +434,11 @@ retry: |
|
|
|
|
*/ |
|
|
|
|
if (active_pid != MyProcPid) |
|
|
|
|
{ |
|
|
|
|
if (behavior == SAB_Error) |
|
|
|
|
if (!nowait) |
|
|
|
|
ereport(ERROR, |
|
|
|
|
(errcode(ERRCODE_OBJECT_IN_USE), |
|
|
|
|
errmsg("replication slot \"%s\" is active for PID %d", |
|
|
|
|
NameStr(s->data.name), active_pid))); |
|
|
|
|
else if (behavior == SAB_Inquire) |
|
|
|
|
return active_pid; |
|
|
|
|
|
|
|
|
|
/* Wait here until we get signaled, and then restart */ |
|
|
|
|
ConditionVariableSleep(&s->active_cv, |
|
|
|
|
@ -470,7 +446,7 @@ retry: |
|
|
|
|
ConditionVariableCancelSleep(); |
|
|
|
|
goto retry; |
|
|
|
|
} |
|
|
|
|
else if (behavior == SAB_Block) |
|
|
|
|
else if (!nowait) |
|
|
|
|
ConditionVariableCancelSleep(); /* no sleep needed after all */ |
|
|
|
|
|
|
|
|
|
/* Let everybody know we've modified this slot */ |
|
|
|
|
@ -478,9 +454,6 @@ retry: |
|
|
|
|
|
|
|
|
|
/* We made this slot active, so it's ours now. */ |
|
|
|
|
MyReplicationSlot = s; |
|
|
|
|
|
|
|
|
|
/* success */ |
|
|
|
|
return 0; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
@ -588,7 +561,7 @@ ReplicationSlotDrop(const char *name, bool nowait) |
|
|
|
|
{ |
|
|
|
|
Assert(MyReplicationSlot == NULL); |
|
|
|
|
|
|
|
|
|
(void) ReplicationSlotAcquire(name, nowait ? SAB_Error : SAB_Block); |
|
|
|
|
ReplicationSlotAcquire(name, nowait); |
|
|
|
|
|
|
|
|
|
ReplicationSlotDropAcquired(); |
|
|
|
|
} |
|
|
|
|
@ -1271,8 +1244,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN) |
|
|
|
|
WAIT_EVENT_REPLICATION_SLOT_DROP); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Re-acquire lock and start over; we expect to invalidate the slot |
|
|
|
|
* next time (unless another process acquires the slot in the |
|
|
|
|
* Re-acquire lock and start over; we expect to invalidate the |
|
|
|
|
* slot next time (unless another process acquires the slot in the |
|
|
|
|
* meantime). |
|
|
|
|
*/ |
|
|
|
|
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); |
|
|
|
|
|