mirror of https://github.com/postgres/postgres
Replication slots are a crash-safe data structure which can be created on either a master or a standby to prevent premature removal of write-ahead log segments needed by a standby, as well as (with hot_standby_feedback=on) pruning of tuples whose removal would cause replication conflicts. Slots have some advantages over existing techniques, as explained in the documentation. In a few places, we refer to the type of replication slots introduced by this patch as "physical" slots, because forthcoming patches for logical decoding will also have slots, but with somewhat different properties. Andres Freund and Robert Haaspull/6/head
parent
5bdef38b89
commit
858ec11858
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,193 @@ |
||||
/*-------------------------------------------------------------------------
|
||||
* |
||||
* slotfuncs.c |
||||
* Support functions for replication slots |
||||
* |
||||
* Copyright (c) 2012-2014, PostgreSQL Global Development Group |
||||
* |
||||
* IDENTIFICATION |
||||
* src/backend/replication/slotfuncs.c |
||||
* |
||||
*------------------------------------------------------------------------- |
||||
*/ |
||||
|
||||
#include "postgres.h" |
||||
|
||||
#include "funcapi.h" |
||||
#include "miscadmin.h" |
||||
#include "access/htup_details.h" |
||||
#include "utils/builtins.h" |
||||
#include "replication/slot.h" |
||||
|
||||
Datum pg_create_physical_replication_slot(PG_FUNCTION_ARGS); |
||||
Datum pg_drop_replication_slot(PG_FUNCTION_ARGS); |
||||
|
||||
static void |
||||
check_permissions(void) |
||||
{ |
||||
if (!superuser() && !has_rolreplication(GetUserId())) |
||||
ereport(ERROR, |
||||
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), |
||||
(errmsg("must be superuser or replication role to use replication slots")))); |
||||
} |
||||
|
||||
/*
|
||||
* SQL function for creating a new physical (streaming replication) |
||||
* replication slot. |
||||
*/ |
||||
Datum |
||||
pg_create_physical_replication_slot(PG_FUNCTION_ARGS) |
||||
{ |
||||
Name name = PG_GETARG_NAME(0); |
||||
Datum values[2]; |
||||
bool nulls[2]; |
||||
TupleDesc tupdesc; |
||||
HeapTuple tuple; |
||||
Datum result; |
||||
|
||||
check_permissions(); |
||||
|
||||
CheckSlotRequirements(); |
||||
|
||||
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) |
||||
elog(ERROR, "return type must be a row type"); |
||||
|
||||
/* acquire replication slot, this will check for conflicting names*/ |
||||
ReplicationSlotCreate(NameStr(*name), false); |
||||
|
||||
values[0] = CStringGetTextDatum(NameStr(MyReplicationSlot->data.name)); |
||||
|
||||
nulls[0] = false; |
||||
nulls[1] = true; |
||||
|
||||
tuple = heap_form_tuple(tupdesc, values, nulls); |
||||
result = HeapTupleGetDatum(tuple); |
||||
|
||||
ReplicationSlotRelease(); |
||||
|
||||
PG_RETURN_DATUM(result); |
||||
} |
||||
|
||||
/*
|
||||
* SQL function for dropping a replication slot. |
||||
*/ |
||||
Datum |
||||
pg_drop_replication_slot(PG_FUNCTION_ARGS) |
||||
{ |
||||
Name name = PG_GETARG_NAME(0); |
||||
|
||||
check_permissions(); |
||||
|
||||
CheckSlotRequirements(); |
||||
|
||||
ReplicationSlotDrop(NameStr(*name)); |
||||
|
||||
PG_RETURN_VOID(); |
||||
} |
||||
|
||||
/*
|
||||
* pg_get_replication_slots - SQL SRF showing active replication slots. |
||||
*/ |
||||
Datum |
||||
pg_get_replication_slots(PG_FUNCTION_ARGS) |
||||
{ |
||||
#define PG_STAT_GET_REPLICATION_SLOTS_COLS 6 |
||||
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; |
||||
TupleDesc tupdesc; |
||||
Tuplestorestate *tupstore; |
||||
MemoryContext per_query_ctx; |
||||
MemoryContext oldcontext; |
||||
int slotno; |
||||
|
||||
/* check to see if caller supports us returning a tuplestore */ |
||||
if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) |
||||
ereport(ERROR, |
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
||||
errmsg("set-valued function called in context that cannot accept a set"))); |
||||
if (!(rsinfo->allowedModes & SFRM_Materialize)) |
||||
ereport(ERROR, |
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
||||
errmsg("materialize mode required, but it is not " \
|
||||
"allowed in this context"))); |
||||
|
||||
/* Build a tuple descriptor for our result type */ |
||||
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) |
||||
elog(ERROR, "return type must be a row type"); |
||||
|
||||
/*
|
||||
* We don't require any special permission to see this function's data |
||||
* because nothing should be sensitive. The most critical being the slot |
||||
* name, which shouldn't contain anything particularly sensitive. |
||||
*/ |
||||
|
||||
per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; |
||||
oldcontext = MemoryContextSwitchTo(per_query_ctx); |
||||
|
||||
tupstore = tuplestore_begin_heap(true, false, work_mem); |
||||
rsinfo->returnMode = SFRM_Materialize; |
||||
rsinfo->setResult = tupstore; |
||||
rsinfo->setDesc = tupdesc; |
||||
|
||||
MemoryContextSwitchTo(oldcontext); |
||||
|
||||
for (slotno = 0; slotno < max_replication_slots; slotno++) |
||||
{ |
||||
ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno]; |
||||
Datum values[PG_STAT_GET_REPLICATION_SLOTS_COLS]; |
||||
bool nulls[PG_STAT_GET_REPLICATION_SLOTS_COLS]; |
||||
|
||||
TransactionId xmin; |
||||
XLogRecPtr restart_lsn; |
||||
bool active; |
||||
Oid database; |
||||
const char *slot_name; |
||||
|
||||
char restart_lsn_s[MAXFNAMELEN]; |
||||
int i; |
||||
|
||||
SpinLockAcquire(&slot->mutex); |
||||
if (!slot->in_use) |
||||
{ |
||||
SpinLockRelease(&slot->mutex); |
||||
continue; |
||||
} |
||||
else |
||||
{ |
||||
xmin = slot->data.xmin; |
||||
database = slot->data.database; |
||||
restart_lsn = slot->data.restart_lsn; |
||||
slot_name = pstrdup(NameStr(slot->data.name)); |
||||
|
||||
active = slot->active; |
||||
} |
||||
SpinLockRelease(&slot->mutex); |
||||
|
||||
memset(nulls, 0, sizeof(nulls)); |
||||
|
||||
snprintf(restart_lsn_s, sizeof(restart_lsn_s), "%X/%X", |
||||
(uint32) (restart_lsn >> 32), (uint32) restart_lsn); |
||||
|
||||
i = 0; |
||||
values[i++] = CStringGetTextDatum(slot_name); |
||||
if (database == InvalidOid) |
||||
values[i++] = CStringGetTextDatum("physical"); |
||||
else |
||||
values[i++] = CStringGetTextDatum("logical"); |
||||
values[i++] = database; |
||||
values[i++] = BoolGetDatum(active); |
||||
if (xmin != InvalidTransactionId) |
||||
values[i++] = TransactionIdGetDatum(xmin); |
||||
else |
||||
nulls[i++] = true; |
||||
if (restart_lsn != InvalidTransactionId) |
||||
values[i++] = CStringGetTextDatum(restart_lsn_s); |
||||
else |
||||
nulls[i++] = true; |
||||
|
||||
tuplestore_putvalues(tupstore, tupdesc, values, nulls); |
||||
} |
||||
|
||||
tuplestore_donestoring(tupstore); |
||||
|
||||
return (Datum) 0; |
||||
} |
||||
@ -0,0 +1,120 @@ |
||||
/*-------------------------------------------------------------------------
|
||||
* slot.h |
||||
* Replication slot management. |
||||
* |
||||
* Copyright (c) 2012-2014, PostgreSQL Global Development Group |
||||
* |
||||
*------------------------------------------------------------------------- |
||||
*/ |
||||
#ifndef SLOT_H |
||||
#define SLOT_H |
||||
|
||||
#include "fmgr.h" |
||||
#include "access/xlog.h" |
||||
#include "access/xlogreader.h" |
||||
#include "storage/lwlock.h" |
||||
#include "storage/shmem.h" |
||||
#include "storage/spin.h" |
||||
|
||||
typedef struct ReplicationSlotPersistentData |
||||
{ |
||||
/* The slot's identifier */ |
||||
NameData name; |
||||
|
||||
/* database the slot is active on */ |
||||
Oid database; |
||||
|
||||
/*
|
||||
* xmin horizon for data |
||||
* |
||||
* NB: This may represent a value that hasn't been written to disk yet; |
||||
* see notes for effective_xmin, below. |
||||
*/ |
||||
TransactionId xmin; |
||||
|
||||
/* oldest LSN that might be required by this replication slot */ |
||||
XLogRecPtr restart_lsn; |
||||
|
||||
} ReplicationSlotPersistentData; |
||||
|
||||
/*
|
||||
* Shared memory state of a single replication slot. |
||||
*/ |
||||
typedef struct ReplicationSlot |
||||
{ |
||||
/* lock, on same cacheline as effective_xmin */ |
||||
slock_t mutex; |
||||
|
||||
/* is this slot defined */ |
||||
bool in_use; |
||||
|
||||
/* is somebody streaming out changes for this slot */ |
||||
bool active; |
||||
|
||||
/* any outstanding modifications? */ |
||||
bool just_dirtied; |
||||
bool dirty; |
||||
|
||||
/*
|
||||
* For logical decoding, it's extremely important that we never remove any |
||||
* data that's still needed for decoding purposes, even after a crash; |
||||
* otherwise, decoding will produce wrong answers. Ordinary streaming |
||||
* replication also needs to prevent old row versions from being removed |
||||
* too soon, but the worst consequence we might encounter there is unwanted |
||||
* query cancellations on the standby. Thus, for logical decoding, |
||||
* this value represents the latest xmin that has actually been |
||||
* written to disk, whereas for streaming replication, it's just the |
||||
* same as the persistent value (data.xmin). |
||||
*/ |
||||
TransactionId effective_xmin; |
||||
|
||||
/* data surviving shutdowns and crashes */ |
||||
ReplicationSlotPersistentData data; |
||||
|
||||
/* is somebody performing io on this slot? */ |
||||
LWLock *io_in_progress_lock; |
||||
} ReplicationSlot; |
||||
|
||||
/*
|
||||
* Shared memory control area for all of replication slots. |
||||
*/ |
||||
typedef struct ReplicationSlotCtlData |
||||
{ |
||||
ReplicationSlot replication_slots[1]; |
||||
} ReplicationSlotCtlData; |
||||
|
||||
/*
|
||||
* Pointers to shared memory |
||||
*/ |
||||
extern ReplicationSlotCtlData *ReplicationSlotCtl; |
||||
extern ReplicationSlot *MyReplicationSlot; |
||||
|
||||
/* GUCs */ |
||||
extern PGDLLIMPORT int max_replication_slots; |
||||
|
||||
/* shmem initialization functions */ |
||||
extern Size ReplicationSlotsShmemSize(void); |
||||
extern void ReplicationSlotsShmemInit(void); |
||||
|
||||
/* management of individual slots */ |
||||
extern void ReplicationSlotCreate(const char *name, bool db_specific); |
||||
extern void ReplicationSlotDrop(const char *name); |
||||
extern void ReplicationSlotAcquire(const char *name); |
||||
extern void ReplicationSlotRelease(void); |
||||
extern void ReplicationSlotSave(void); |
||||
extern void ReplicationSlotMarkDirty(void); |
||||
|
||||
/* misc stuff */ |
||||
extern bool ReplicationSlotValidateName(const char *name, int elevel); |
||||
extern void ReplicationSlotsComputeRequiredXmin(void); |
||||
extern void ReplicationSlotsComputeRequiredLSN(void); |
||||
extern void StartupReplicationSlots(XLogRecPtr checkPointRedo); |
||||
extern void CheckPointReplicationSlots(void); |
||||
|
||||
extern void CheckSlotRequirements(void); |
||||
extern void ReplicationSlotAtProcExit(void); |
||||
|
||||
/* SQL callable functions */ |
||||
extern Datum pg_get_replication_slots(PG_FUNCTION_ARGS); |
||||
|
||||
#endif /* SLOT_H */ |
||||
Loading…
Reference in new issue