|
|
|
@ -0,0 +1,902 @@ |
|
|
|
|
/*-------------------------------------------------------------------------
|
|
|
|
|
* |
|
|
|
|
* commit_ts.c |
|
|
|
|
* PostgreSQL commit timestamp manager |
|
|
|
|
* |
|
|
|
|
* This module is a pg_clog-like system that stores the commit timestamp |
|
|
|
|
* for each transaction. |
|
|
|
|
* |
|
|
|
|
* XLOG interactions: this module generates an XLOG record whenever a new |
|
|
|
|
* CommitTs page is initialized to zeroes. Also, one XLOG record is |
|
|
|
|
* generated for setting of values when the caller requests it; this allows |
|
|
|
|
* us to support values coming from places other than transaction commit. |
|
|
|
|
* Other writes of CommitTS come from recording of transaction commit in |
|
|
|
|
* xact.c, which generates its own XLOG records for these events and will |
|
|
|
|
* re-perform the status update on redo; so we need make no additional XLOG |
|
|
|
|
* entry here. |
|
|
|
|
* |
|
|
|
|
* Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group |
|
|
|
|
* Portions Copyright (c) 1994, Regents of the University of California |
|
|
|
|
* |
|
|
|
|
* src/backend/access/transam/commit_ts.c |
|
|
|
|
* |
|
|
|
|
*------------------------------------------------------------------------- |
|
|
|
|
*/ |
|
|
|
|
#include "postgres.h" |
|
|
|
|
|
|
|
|
|
#include "access/commit_ts.h" |
|
|
|
|
#include "access/htup_details.h" |
|
|
|
|
#include "access/slru.h" |
|
|
|
|
#include "access/transam.h" |
|
|
|
|
#include "catalog/pg_type.h" |
|
|
|
|
#include "funcapi.h" |
|
|
|
|
#include "miscadmin.h" |
|
|
|
|
#include "pg_trace.h" |
|
|
|
|
#include "utils/builtins.h" |
|
|
|
|
#include "utils/snapmgr.h" |
|
|
|
|
#include "utils/timestamp.h" |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Defines for CommitTs page sizes. A page is the same BLCKSZ as is used |
|
|
|
|
* everywhere else in Postgres. |
|
|
|
|
* |
|
|
|
|
* Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF, |
|
|
|
|
* CommitTs page numbering also wraps around at |
|
|
|
|
* 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at |
|
|
|
|
* 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no |
|
|
|
|
* explicit notice of that fact in this module, except when comparing segment |
|
|
|
|
* and page numbers in TruncateCommitTs (see CommitTsPagePrecedes). |
|
|
|
|
*/ |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* We need 8+4 bytes per xact. Note that enlarging this struct might mean |
|
|
|
|
* the largest possible file name is more than 5 chars long; see |
|
|
|
|
* SlruScanDirectory. |
|
|
|
|
*/ |
|
|
|
|
typedef struct CommitTimestampEntry |
|
|
|
|
{ |
|
|
|
|
TimestampTz time; |
|
|
|
|
CommitTsNodeId nodeid; |
|
|
|
|
} CommitTimestampEntry; |
|
|
|
|
|
|
|
|
|
#define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \ |
|
|
|
|
sizeof(CommitTsNodeId)) |
|
|
|
|
|
|
|
|
|
#define COMMIT_TS_XACTS_PER_PAGE \ |
|
|
|
|
(BLCKSZ / SizeOfCommitTimestampEntry) |
|
|
|
|
|
|
|
|
|
#define TransactionIdToCTsPage(xid) \ |
|
|
|
|
((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE) |
|
|
|
|
#define TransactionIdToCTsEntry(xid) \ |
|
|
|
|
((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE) |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Link to shared-memory data structures for CommitTs control |
|
|
|
|
*/ |
|
|
|
|
static SlruCtlData CommitTsCtlData; |
|
|
|
|
|
|
|
|
|
#define CommitTsCtl (&CommitTsCtlData) |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* We keep a cache of the last value set in shared memory. This is protected |
|
|
|
|
* by CommitTsLock. |
|
|
|
|
*/ |
|
|
|
|
typedef struct CommitTimestampShared |
|
|
|
|
{ |
|
|
|
|
TransactionId xidLastCommit; |
|
|
|
|
CommitTimestampEntry dataLastCommit; |
|
|
|
|
} CommitTimestampShared; |
|
|
|
|
|
|
|
|
|
CommitTimestampShared *commitTsShared; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/* GUC variable */ |
|
|
|
|
bool track_commit_timestamp; |
|
|
|
|
|
|
|
|
|
static CommitTsNodeId default_node_id = InvalidCommitTsNodeId; |
|
|
|
|
|
|
|
|
|
static void SetXidCommitTsInPage(TransactionId xid, int nsubxids, |
|
|
|
|
TransactionId *subxids, TimestampTz ts, |
|
|
|
|
CommitTsNodeId nodeid, int pageno); |
|
|
|
|
static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, |
|
|
|
|
CommitTsNodeId nodeid, int slotno); |
|
|
|
|
static int ZeroCommitTsPage(int pageno, bool writeXlog); |
|
|
|
|
static bool CommitTsPagePrecedes(int page1, int page2); |
|
|
|
|
static void WriteZeroPageXlogRec(int pageno); |
|
|
|
|
static void WriteTruncateXlogRec(int pageno); |
|
|
|
|
static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids, |
|
|
|
|
TransactionId *subxids, TimestampTz timestamp, |
|
|
|
|
CommitTsNodeId nodeid); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* CommitTsSetDefaultNodeId |
|
|
|
|
* |
|
|
|
|
* Set default nodeid for current backend. |
|
|
|
|
*/ |
|
|
|
|
void |
|
|
|
|
CommitTsSetDefaultNodeId(CommitTsNodeId nodeid) |
|
|
|
|
{ |
|
|
|
|
default_node_id = nodeid; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* CommitTsGetDefaultNodeId |
|
|
|
|
* |
|
|
|
|
* Set default nodeid for current backend. |
|
|
|
|
*/ |
|
|
|
|
CommitTsNodeId |
|
|
|
|
CommitTsGetDefaultNodeId(void) |
|
|
|
|
{ |
|
|
|
|
return default_node_id; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* TransactionTreeSetCommitTsData |
|
|
|
|
* |
|
|
|
|
* Record the final commit timestamp of transaction entries in the commit log |
|
|
|
|
* for a transaction and its subtransaction tree, as efficiently as possible. |
|
|
|
|
* |
|
|
|
|
* xid is the top level transaction id. |
|
|
|
|
* |
|
|
|
|
* subxids is an array of xids of length nsubxids, representing subtransactions |
|
|
|
|
* in the tree of xid. In various cases nsubxids may be zero. |
|
|
|
|
* The reason why tracking just the parent xid commit timestamp is not enough |
|
|
|
|
* is that the subtrans SLRU does not stay valid across crashes (it's not |
|
|
|
|
* permanent) so we need to keep the information about them here. If the |
|
|
|
|
* subtrans implementation changes in the future, we might want to revisit the |
|
|
|
|
* decision of storing timestamp info for each subxid. |
|
|
|
|
* |
|
|
|
|
* The do_xlog parameter tells us whether to include a XLog record of this |
|
|
|
|
* or not. Normal path through RecordTransactionCommit() will be related |
|
|
|
|
* to a transaction commit XLog record, and so should pass "false" here. |
|
|
|
|
* Other callers probably want to pass true, so that the given values persist |
|
|
|
|
* in case of crashes. |
|
|
|
|
*/ |
|
|
|
|
void |
|
|
|
|
TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids, |
|
|
|
|
TransactionId *subxids, TimestampTz timestamp, |
|
|
|
|
CommitTsNodeId nodeid, bool do_xlog) |
|
|
|
|
{ |
|
|
|
|
int i; |
|
|
|
|
TransactionId headxid; |
|
|
|
|
TransactionId newestXact; |
|
|
|
|
|
|
|
|
|
if (!track_commit_timestamp) |
|
|
|
|
return; |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Comply with the WAL-before-data rule: if caller specified it wants |
|
|
|
|
* this value to be recorded in WAL, do so before touching the data. |
|
|
|
|
*/ |
|
|
|
|
if (do_xlog) |
|
|
|
|
WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, nodeid); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Figure out the latest Xid in this batch: either the last subxid if |
|
|
|
|
* there's any, otherwise the parent xid. |
|
|
|
|
*/ |
|
|
|
|
if (nsubxids > 0) |
|
|
|
|
newestXact = subxids[nsubxids - 1]; |
|
|
|
|
else |
|
|
|
|
newestXact = xid; |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* We split the xids to set the timestamp to in groups belonging to the |
|
|
|
|
* same SLRU page; the first element in each such set is its head. The |
|
|
|
|
* first group has the main XID as the head; subsequent sets use the |
|
|
|
|
* first subxid not on the previous page as head. This way, we only have |
|
|
|
|
* to lock/modify each SLRU page once. |
|
|
|
|
*/ |
|
|
|
|
for (i = 0, headxid = xid;;) |
|
|
|
|
{ |
|
|
|
|
int pageno = TransactionIdToCTsPage(headxid); |
|
|
|
|
int j; |
|
|
|
|
|
|
|
|
|
for (j = i; j < nsubxids; j++) |
|
|
|
|
{ |
|
|
|
|
if (TransactionIdToCTsPage(subxids[j]) != pageno) |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
/* subxids[i..j] are on the same page as the head */ |
|
|
|
|
|
|
|
|
|
SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid, |
|
|
|
|
pageno); |
|
|
|
|
|
|
|
|
|
/* if we wrote out all subxids, we're done. */ |
|
|
|
|
if (j + 1 >= nsubxids) |
|
|
|
|
break; |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Set the new head and skip over it, as well as over the subxids |
|
|
|
|
* we just wrote. |
|
|
|
|
*/ |
|
|
|
|
headxid = subxids[j]; |
|
|
|
|
i += j - i + 1; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* update the cached value in shared memory */ |
|
|
|
|
LWLockAcquire(CommitTsLock, LW_EXCLUSIVE); |
|
|
|
|
commitTsShared->xidLastCommit = xid; |
|
|
|
|
commitTsShared->dataLastCommit.time = timestamp; |
|
|
|
|
commitTsShared->dataLastCommit.nodeid = nodeid; |
|
|
|
|
|
|
|
|
|
/* and move forwards our endpoint, if needed */ |
|
|
|
|
if (TransactionIdPrecedes(ShmemVariableCache->newestCommitTs, newestXact)) |
|
|
|
|
ShmemVariableCache->newestCommitTs = newestXact; |
|
|
|
|
LWLockRelease(CommitTsLock); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Record the commit timestamp of transaction entries in the commit log for all |
|
|
|
|
* entries on a single page. Atomic only on this page. |
|
|
|
|
*/ |
|
|
|
|
static void |
|
|
|
|
SetXidCommitTsInPage(TransactionId xid, int nsubxids, |
|
|
|
|
TransactionId *subxids, TimestampTz ts, |
|
|
|
|
CommitTsNodeId nodeid, int pageno) |
|
|
|
|
{ |
|
|
|
|
int slotno; |
|
|
|
|
int i; |
|
|
|
|
|
|
|
|
|
LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE); |
|
|
|
|
|
|
|
|
|
slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid); |
|
|
|
|
|
|
|
|
|
TransactionIdSetCommitTs(xid, ts, nodeid, slotno); |
|
|
|
|
for (i = 0; i < nsubxids; i++) |
|
|
|
|
TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno); |
|
|
|
|
|
|
|
|
|
CommitTsCtl->shared->page_dirty[slotno] = true; |
|
|
|
|
|
|
|
|
|
LWLockRelease(CommitTsControlLock); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Sets the commit timestamp of a single transaction. |
|
|
|
|
* |
|
|
|
|
* Must be called with CommitTsControlLock held |
|
|
|
|
*/ |
|
|
|
|
static void |
|
|
|
|
TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, |
|
|
|
|
CommitTsNodeId nodeid, int slotno) |
|
|
|
|
{ |
|
|
|
|
int entryno = TransactionIdToCTsEntry(xid); |
|
|
|
|
CommitTimestampEntry entry; |
|
|
|
|
|
|
|
|
|
Assert(TransactionIdIsNormal(xid)); |
|
|
|
|
|
|
|
|
|
entry.time = ts; |
|
|
|
|
entry.nodeid = nodeid; |
|
|
|
|
|
|
|
|
|
memcpy(CommitTsCtl->shared->page_buffer[slotno] + |
|
|
|
|
SizeOfCommitTimestampEntry * entryno, |
|
|
|
|
&entry, SizeOfCommitTimestampEntry); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Interrogate the commit timestamp of a transaction. |
|
|
|
|
* |
|
|
|
|
* Return value indicates whether commit timestamp record was found for |
|
|
|
|
* given xid. |
|
|
|
|
*/ |
|
|
|
|
bool |
|
|
|
|
TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, |
|
|
|
|
CommitTsNodeId *nodeid) |
|
|
|
|
{ |
|
|
|
|
int pageno = TransactionIdToCTsPage(xid); |
|
|
|
|
int entryno = TransactionIdToCTsEntry(xid); |
|
|
|
|
int slotno; |
|
|
|
|
CommitTimestampEntry entry; |
|
|
|
|
TransactionId oldestCommitTs; |
|
|
|
|
TransactionId newestCommitTs; |
|
|
|
|
|
|
|
|
|
/* Error if module not enabled */ |
|
|
|
|
if (!track_commit_timestamp) |
|
|
|
|
ereport(ERROR, |
|
|
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
|
|
|
|
errmsg("could not get commit timestamp data"), |
|
|
|
|
errhint("Make sure the configuration parameter \"%s\" is set.", |
|
|
|
|
"track_commit_timestamp"))); |
|
|
|
|
|
|
|
|
|
/* error if the given Xid doesn't normally commit */ |
|
|
|
|
if (!TransactionIdIsNormal(xid)) |
|
|
|
|
ereport(ERROR, |
|
|
|
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
|
|
|
|
errmsg("cannot retrieve commit timestamp for transaction %u", xid))); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Return empty if the requested value is outside our valid range. |
|
|
|
|
*/ |
|
|
|
|
LWLockAcquire(CommitTsLock, LW_SHARED); |
|
|
|
|
oldestCommitTs = ShmemVariableCache->oldestCommitTs; |
|
|
|
|
newestCommitTs = ShmemVariableCache->newestCommitTs; |
|
|
|
|
/* neither is invalid, or both are */ |
|
|
|
|
Assert(TransactionIdIsValid(oldestCommitTs) == TransactionIdIsValid(newestCommitTs)); |
|
|
|
|
LWLockRelease(CommitTsLock); |
|
|
|
|
|
|
|
|
|
if (!TransactionIdIsValid(oldestCommitTs) || |
|
|
|
|
TransactionIdPrecedes(xid, oldestCommitTs) || |
|
|
|
|
TransactionIdPrecedes(newestCommitTs, xid)) |
|
|
|
|
{ |
|
|
|
|
if (ts) |
|
|
|
|
*ts = 0; |
|
|
|
|
if (nodeid) |
|
|
|
|
*nodeid = InvalidCommitTsNodeId; |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Use an unlocked atomic read on our cached value in shared memory; if |
|
|
|
|
* it's a hit, acquire a lock and read the data, after verifying that it's |
|
|
|
|
* still what we initially read. Otherwise, fall through to read from |
|
|
|
|
* SLRU. |
|
|
|
|
*/ |
|
|
|
|
if (commitTsShared->xidLastCommit == xid) |
|
|
|
|
{ |
|
|
|
|
LWLockAcquire(CommitTsLock, LW_SHARED); |
|
|
|
|
if (commitTsShared->xidLastCommit == xid) |
|
|
|
|
{ |
|
|
|
|
if (ts) |
|
|
|
|
*ts = commitTsShared->dataLastCommit.time; |
|
|
|
|
if (nodeid) |
|
|
|
|
*nodeid = commitTsShared->dataLastCommit.nodeid; |
|
|
|
|
|
|
|
|
|
LWLockRelease(CommitTsLock); |
|
|
|
|
return *ts != 0; |
|
|
|
|
} |
|
|
|
|
LWLockRelease(CommitTsLock); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* lock is acquired by SimpleLruReadPage_ReadOnly */ |
|
|
|
|
slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid); |
|
|
|
|
memcpy(&entry, |
|
|
|
|
CommitTsCtl->shared->page_buffer[slotno] + |
|
|
|
|
SizeOfCommitTimestampEntry * entryno, |
|
|
|
|
SizeOfCommitTimestampEntry); |
|
|
|
|
|
|
|
|
|
if (ts) |
|
|
|
|
*ts = entry.time; |
|
|
|
|
if (nodeid) |
|
|
|
|
*nodeid = entry.nodeid; |
|
|
|
|
|
|
|
|
|
LWLockRelease(CommitTsControlLock); |
|
|
|
|
return *ts != 0; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Return the Xid of the latest committed transaction. (As far as this module |
|
|
|
|
* is concerned, anyway; it's up to the caller to ensure the value is useful |
|
|
|
|
* for its purposes.) |
|
|
|
|
* |
|
|
|
|
* ts and extra are filled with the corresponding data; they can be passed |
|
|
|
|
* as NULL if not wanted. |
|
|
|
|
*/ |
|
|
|
|
TransactionId |
|
|
|
|
GetLatestCommitTsData(TimestampTz *ts, CommitTsNodeId *nodeid) |
|
|
|
|
{ |
|
|
|
|
TransactionId xid; |
|
|
|
|
|
|
|
|
|
/* Error if module not enabled */ |
|
|
|
|
if (!track_commit_timestamp) |
|
|
|
|
ereport(ERROR, |
|
|
|
|
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
|
|
|
|
errmsg("could not get commit timestamp data"), |
|
|
|
|
errhint("Make sure the configuration parameter \"%s\" is set.", |
|
|
|
|
"track_commit_timestamp"))); |
|
|
|
|
|
|
|
|
|
LWLockAcquire(CommitTsLock, LW_SHARED); |
|
|
|
|
xid = commitTsShared->xidLastCommit; |
|
|
|
|
if (ts) |
|
|
|
|
*ts = commitTsShared->dataLastCommit.time; |
|
|
|
|
if (nodeid) |
|
|
|
|
*nodeid = commitTsShared->dataLastCommit.nodeid; |
|
|
|
|
LWLockRelease(CommitTsLock); |
|
|
|
|
|
|
|
|
|
return xid; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* SQL-callable wrapper to obtain commit time of a transaction |
|
|
|
|
*/ |
|
|
|
|
Datum |
|
|
|
|
pg_xact_commit_timestamp(PG_FUNCTION_ARGS) |
|
|
|
|
{ |
|
|
|
|
TransactionId xid = PG_GETARG_UINT32(0); |
|
|
|
|
TimestampTz ts; |
|
|
|
|
bool found; |
|
|
|
|
|
|
|
|
|
found = TransactionIdGetCommitTsData(xid, &ts, NULL); |
|
|
|
|
|
|
|
|
|
if (!found) |
|
|
|
|
PG_RETURN_NULL(); |
|
|
|
|
|
|
|
|
|
PG_RETURN_TIMESTAMPTZ(ts); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Datum |
|
|
|
|
pg_last_committed_xact(PG_FUNCTION_ARGS) |
|
|
|
|
{ |
|
|
|
|
TransactionId xid; |
|
|
|
|
TimestampTz ts; |
|
|
|
|
Datum values[2]; |
|
|
|
|
bool nulls[2]; |
|
|
|
|
TupleDesc tupdesc; |
|
|
|
|
HeapTuple htup; |
|
|
|
|
|
|
|
|
|
/* and construct a tuple with our data */ |
|
|
|
|
xid = GetLatestCommitTsData(&ts, NULL); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Construct a tuple descriptor for the result row. This must match this |
|
|
|
|
* function's pg_proc entry! |
|
|
|
|
*/ |
|
|
|
|
tupdesc = CreateTemplateTupleDesc(2, false); |
|
|
|
|
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid", |
|
|
|
|
XIDOID, -1, 0); |
|
|
|
|
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp", |
|
|
|
|
TIMESTAMPTZOID, -1, 0); |
|
|
|
|
tupdesc = BlessTupleDesc(tupdesc); |
|
|
|
|
|
|
|
|
|
if (!TransactionIdIsNormal(xid)) |
|
|
|
|
{ |
|
|
|
|
memset(nulls, true, sizeof(nulls)); |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
values[0] = TransactionIdGetDatum(xid); |
|
|
|
|
nulls[0] = false; |
|
|
|
|
|
|
|
|
|
values[1] = TimestampTzGetDatum(ts); |
|
|
|
|
nulls[1] = false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
htup = heap_form_tuple(tupdesc, values, nulls); |
|
|
|
|
|
|
|
|
|
PG_RETURN_DATUM(HeapTupleGetDatum(htup)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Number of shared CommitTS buffers. |
|
|
|
|
* |
|
|
|
|
* We use a very similar logic as for the number of CLOG buffers; see comments |
|
|
|
|
* in CLOGShmemBuffers. |
|
|
|
|
*/ |
|
|
|
|
Size |
|
|
|
|
CommitTsShmemBuffers(void) |
|
|
|
|
{ |
|
|
|
|
return Min(16, Max(4, NBuffers / 1024)); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Shared memory sizing for CommitTs |
|
|
|
|
*/ |
|
|
|
|
Size |
|
|
|
|
CommitTsShmemSize(void) |
|
|
|
|
{ |
|
|
|
|
return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) + |
|
|
|
|
sizeof(CommitTimestampShared); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Initialize CommitTs at system startup (postmaster start or standalone |
|
|
|
|
* backend) |
|
|
|
|
*/ |
|
|
|
|
void |
|
|
|
|
CommitTsShmemInit(void) |
|
|
|
|
{ |
|
|
|
|
bool found; |
|
|
|
|
|
|
|
|
|
CommitTsCtl->PagePrecedes = CommitTsPagePrecedes; |
|
|
|
|
SimpleLruInit(CommitTsCtl, "CommitTs Ctl", CommitTsShmemBuffers(), 0, |
|
|
|
|
CommitTsControlLock, "pg_commit_ts"); |
|
|
|
|
|
|
|
|
|
commitTsShared = ShmemInitStruct("CommitTs shared", |
|
|
|
|
sizeof(CommitTimestampShared), |
|
|
|
|
&found); |
|
|
|
|
|
|
|
|
|
if (!IsUnderPostmaster) |
|
|
|
|
{ |
|
|
|
|
Assert(!found); |
|
|
|
|
|
|
|
|
|
commitTsShared->xidLastCommit = InvalidTransactionId; |
|
|
|
|
TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time); |
|
|
|
|
commitTsShared->dataLastCommit.nodeid = InvalidCommitTsNodeId; |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
Assert(found); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* This function must be called ONCE on system install. |
|
|
|
|
* |
|
|
|
|
* (The CommitTs directory is assumed to have been created by initdb, and |
|
|
|
|
* CommitTsShmemInit must have been called already.) |
|
|
|
|
*/ |
|
|
|
|
void |
|
|
|
|
BootStrapCommitTs(void) |
|
|
|
|
{ |
|
|
|
|
/*
|
|
|
|
|
* Nothing to do here at present, unlike most other SLRU modules; segments |
|
|
|
|
* are created when the server is started with this module enabled. |
|
|
|
|
* See StartupCommitTs. |
|
|
|
|
*/ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Initialize (or reinitialize) a page of CommitTs to zeroes. |
|
|
|
|
* If writeXlog is TRUE, also emit an XLOG record saying we did this. |
|
|
|
|
* |
|
|
|
|
* The page is not actually written, just set up in shared memory. |
|
|
|
|
* The slot number of the new page is returned. |
|
|
|
|
* |
|
|
|
|
* Control lock must be held at entry, and will be held at exit. |
|
|
|
|
*/ |
|
|
|
|
static int |
|
|
|
|
ZeroCommitTsPage(int pageno, bool writeXlog) |
|
|
|
|
{ |
|
|
|
|
int slotno; |
|
|
|
|
|
|
|
|
|
slotno = SimpleLruZeroPage(CommitTsCtl, pageno); |
|
|
|
|
|
|
|
|
|
if (writeXlog) |
|
|
|
|
WriteZeroPageXlogRec(pageno); |
|
|
|
|
|
|
|
|
|
return slotno; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* This must be called ONCE during postmaster or standalone-backend startup, |
|
|
|
|
* after StartupXLOG has initialized ShmemVariableCache->nextXid. |
|
|
|
|
*/ |
|
|
|
|
void |
|
|
|
|
StartupCommitTs(void) |
|
|
|
|
{ |
|
|
|
|
TransactionId xid = ShmemVariableCache->nextXid; |
|
|
|
|
int pageno = TransactionIdToCTsPage(xid); |
|
|
|
|
|
|
|
|
|
LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Initialize our idea of the latest page number. |
|
|
|
|
*/ |
|
|
|
|
CommitTsCtl->shared->latest_page_number = pageno; |
|
|
|
|
|
|
|
|
|
LWLockRelease(CommitTsControlLock); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* This must be called ONCE during postmaster or standalone-backend startup, |
|
|
|
|
* when commit timestamp is enabled. Must be called after recovery has |
|
|
|
|
* finished. |
|
|
|
|
* |
|
|
|
|
* This is in charge of creating the currently active segment, if it's not |
|
|
|
|
* already there. The reason for this is that the server might have been |
|
|
|
|
* running with this module disabled for a while and thus might have skipped |
|
|
|
|
* the normal creation point. |
|
|
|
|
*/ |
|
|
|
|
void |
|
|
|
|
CompleteCommitTsInitialization(void) |
|
|
|
|
{ |
|
|
|
|
TransactionId xid = ShmemVariableCache->nextXid; |
|
|
|
|
int pageno = TransactionIdToCTsPage(xid); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Re-Initialize our idea of the latest page number. |
|
|
|
|
*/ |
|
|
|
|
LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE); |
|
|
|
|
CommitTsCtl->shared->latest_page_number = pageno; |
|
|
|
|
LWLockRelease(CommitTsControlLock); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* If this module is not currently enabled, make sure we don't hand back |
|
|
|
|
* possibly-invalid data; also remove segments of old data. |
|
|
|
|
*/ |
|
|
|
|
if (!track_commit_timestamp) |
|
|
|
|
{ |
|
|
|
|
LWLockAcquire(CommitTsLock, LW_EXCLUSIVE); |
|
|
|
|
ShmemVariableCache->oldestCommitTs = InvalidTransactionId; |
|
|
|
|
ShmemVariableCache->newestCommitTs = InvalidTransactionId; |
|
|
|
|
LWLockRelease(CommitTsLock); |
|
|
|
|
|
|
|
|
|
TruncateCommitTs(ReadNewTransactionId()); |
|
|
|
|
|
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* If CommitTs is enabled, but it wasn't in the previous server run, we |
|
|
|
|
* need to set the oldest and newest values to the next Xid; that way, we |
|
|
|
|
* will not try to read data that might not have been set. |
|
|
|
|
* |
|
|
|
|
* XXX does this have a problem if a server is started with commitTs |
|
|
|
|
* enabled, then started with commitTs disabled, then restarted with it |
|
|
|
|
* enabled again? It doesn't look like it does, because there should be a |
|
|
|
|
* checkpoint that sets the value to InvalidTransactionId at end of |
|
|
|
|
* recovery; and so any chance of injecting new transactions without |
|
|
|
|
* CommitTs values would occur after the oldestCommitTs has been set to |
|
|
|
|
* Invalid temporarily. |
|
|
|
|
*/ |
|
|
|
|
LWLockAcquire(CommitTsLock, LW_EXCLUSIVE); |
|
|
|
|
if (ShmemVariableCache->oldestCommitTs == InvalidTransactionId) |
|
|
|
|
{ |
|
|
|
|
ShmemVariableCache->oldestCommitTs = |
|
|
|
|
ShmemVariableCache->newestCommitTs = ReadNewTransactionId(); |
|
|
|
|
} |
|
|
|
|
LWLockRelease(CommitTsLock); |
|
|
|
|
|
|
|
|
|
/* Finally, create the current segment file, if necessary */ |
|
|
|
|
if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno)) |
|
|
|
|
{ |
|
|
|
|
int slotno; |
|
|
|
|
|
|
|
|
|
LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE); |
|
|
|
|
slotno = ZeroCommitTsPage(pageno, false); |
|
|
|
|
SimpleLruWritePage(CommitTsCtl, slotno); |
|
|
|
|
Assert(!CommitTsCtl->shared->page_dirty[slotno]); |
|
|
|
|
LWLockRelease(CommitTsControlLock); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* This must be called ONCE during postmaster or standalone-backend shutdown |
|
|
|
|
*/ |
|
|
|
|
void |
|
|
|
|
ShutdownCommitTs(void) |
|
|
|
|
{ |
|
|
|
|
/* Flush dirty CommitTs pages to disk */ |
|
|
|
|
SimpleLruFlush(CommitTsCtl, false); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Perform a checkpoint --- either during shutdown, or on-the-fly |
|
|
|
|
*/ |
|
|
|
|
void |
|
|
|
|
CheckPointCommitTs(void) |
|
|
|
|
{ |
|
|
|
|
/* Flush dirty CommitTs pages to disk */ |
|
|
|
|
SimpleLruFlush(CommitTsCtl, true); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Make sure that CommitTs has room for a newly-allocated XID. |
|
|
|
|
* |
|
|
|
|
* NB: this is called while holding XidGenLock. We want it to be very fast |
|
|
|
|
* most of the time; even when it's not so fast, no actual I/O need happen |
|
|
|
|
* unless we're forced to write out a dirty CommitTs or xlog page to make room |
|
|
|
|
* in shared memory. |
|
|
|
|
* |
|
|
|
|
* NB: the current implementation relies on track_commit_timestamp being |
|
|
|
|
* PGC_POSTMASTER. |
|
|
|
|
*/ |
|
|
|
|
void |
|
|
|
|
ExtendCommitTs(TransactionId newestXact) |
|
|
|
|
{ |
|
|
|
|
int pageno; |
|
|
|
|
|
|
|
|
|
/* nothing to do if module not enabled */ |
|
|
|
|
if (!track_commit_timestamp) |
|
|
|
|
return; |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* No work except at first XID of a page. But beware: just after |
|
|
|
|
* wraparound, the first XID of page zero is FirstNormalTransactionId. |
|
|
|
|
*/ |
|
|
|
|
if (TransactionIdToCTsEntry(newestXact) != 0 && |
|
|
|
|
!TransactionIdEquals(newestXact, FirstNormalTransactionId)) |
|
|
|
|
return; |
|
|
|
|
|
|
|
|
|
pageno = TransactionIdToCTsPage(newestXact); |
|
|
|
|
|
|
|
|
|
LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE); |
|
|
|
|
|
|
|
|
|
/* Zero the page and make an XLOG entry about it */ |
|
|
|
|
ZeroCommitTsPage(pageno, !InRecovery); |
|
|
|
|
|
|
|
|
|
LWLockRelease(CommitTsControlLock); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Remove all CommitTs segments before the one holding the passed |
|
|
|
|
* transaction ID. |
|
|
|
|
* |
|
|
|
|
* Note that we don't need to flush XLOG here. |
|
|
|
|
*/ |
|
|
|
|
void |
|
|
|
|
TruncateCommitTs(TransactionId oldestXact) |
|
|
|
|
{ |
|
|
|
|
int cutoffPage; |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* The cutoff point is the start of the segment containing oldestXact. We |
|
|
|
|
* pass the *page* containing oldestXact to SimpleLruTruncate. |
|
|
|
|
*/ |
|
|
|
|
cutoffPage = TransactionIdToCTsPage(oldestXact); |
|
|
|
|
|
|
|
|
|
/* Check to see if there's any files that could be removed */ |
|
|
|
|
if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence, |
|
|
|
|
&cutoffPage)) |
|
|
|
|
return; /* nothing to remove */ |
|
|
|
|
|
|
|
|
|
/* Write XLOG record */ |
|
|
|
|
WriteTruncateXlogRec(cutoffPage); |
|
|
|
|
|
|
|
|
|
/* Now we can remove the old CommitTs segment(s) */ |
|
|
|
|
SimpleLruTruncate(CommitTsCtl, cutoffPage); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Set the limit values between which commit TS can be consulted. |
|
|
|
|
*/ |
|
|
|
|
void |
|
|
|
|
SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact) |
|
|
|
|
{ |
|
|
|
|
/*
|
|
|
|
|
* Be careful not to overwrite values that are either further into the |
|
|
|
|
* "future" or signal a disabled committs. |
|
|
|
|
*/ |
|
|
|
|
LWLockAcquire(CommitTsLock, LW_EXCLUSIVE); |
|
|
|
|
if (ShmemVariableCache->oldestCommitTs != InvalidTransactionId) |
|
|
|
|
{ |
|
|
|
|
if (TransactionIdPrecedes(ShmemVariableCache->oldestCommitTs, oldestXact)) |
|
|
|
|
ShmemVariableCache->oldestCommitTs = oldestXact; |
|
|
|
|
if (TransactionIdPrecedes(newestXact, ShmemVariableCache->newestCommitTs)) |
|
|
|
|
ShmemVariableCache->newestCommitTs = newestXact; |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
{ |
|
|
|
|
Assert(ShmemVariableCache->newestCommitTs == InvalidTransactionId); |
|
|
|
|
} |
|
|
|
|
LWLockRelease(CommitTsLock); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Move forwards the oldest commitTS value that can be consulted |
|
|
|
|
*/ |
|
|
|
|
void |
|
|
|
|
AdvanceOldestCommitTs(TransactionId oldestXact) |
|
|
|
|
{ |
|
|
|
|
LWLockAcquire(CommitTsLock, LW_EXCLUSIVE); |
|
|
|
|
if (ShmemVariableCache->oldestCommitTs != InvalidTransactionId && |
|
|
|
|
TransactionIdPrecedes(ShmemVariableCache->oldestCommitTs, oldestXact)) |
|
|
|
|
ShmemVariableCache->oldestCommitTs = oldestXact; |
|
|
|
|
LWLockRelease(CommitTsLock); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Decide which of two CLOG page numbers is "older" for truncation purposes. |
|
|
|
|
* |
|
|
|
|
* We need to use comparison of TransactionIds here in order to do the right |
|
|
|
|
* thing with wraparound XID arithmetic. However, if we are asked about |
|
|
|
|
* page number zero, we don't want to hand InvalidTransactionId to |
|
|
|
|
* TransactionIdPrecedes: it'll get weird about permanent xact IDs. So, |
|
|
|
|
* offset both xids by FirstNormalTransactionId to avoid that. |
|
|
|
|
*/ |
|
|
|
|
static bool |
|
|
|
|
CommitTsPagePrecedes(int page1, int page2) |
|
|
|
|
{ |
|
|
|
|
TransactionId xid1; |
|
|
|
|
TransactionId xid2; |
|
|
|
|
|
|
|
|
|
xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE; |
|
|
|
|
xid1 += FirstNormalTransactionId; |
|
|
|
|
xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE; |
|
|
|
|
xid2 += FirstNormalTransactionId; |
|
|
|
|
|
|
|
|
|
return TransactionIdPrecedes(xid1, xid2); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Write a ZEROPAGE xlog record |
|
|
|
|
*/ |
|
|
|
|
static void |
|
|
|
|
WriteZeroPageXlogRec(int pageno) |
|
|
|
|
{ |
|
|
|
|
XLogBeginInsert(); |
|
|
|
|
XLogRegisterData((char *) (&pageno), sizeof(int)); |
|
|
|
|
(void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Write a TRUNCATE xlog record |
|
|
|
|
*/ |
|
|
|
|
static void |
|
|
|
|
WriteTruncateXlogRec(int pageno) |
|
|
|
|
{ |
|
|
|
|
XLogBeginInsert(); |
|
|
|
|
XLogRegisterData((char *) (&pageno), sizeof(int)); |
|
|
|
|
(void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Write a SETTS xlog record |
|
|
|
|
*/ |
|
|
|
|
static void |
|
|
|
|
WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids, |
|
|
|
|
TransactionId *subxids, TimestampTz timestamp, |
|
|
|
|
CommitTsNodeId nodeid) |
|
|
|
|
{ |
|
|
|
|
xl_commit_ts_set record; |
|
|
|
|
|
|
|
|
|
record.timestamp = timestamp; |
|
|
|
|
record.nodeid = nodeid; |
|
|
|
|
record.mainxid = mainxid; |
|
|
|
|
|
|
|
|
|
XLogBeginInsert(); |
|
|
|
|
XLogRegisterData((char *) &record, |
|
|
|
|
offsetof(xl_commit_ts_set, mainxid) + |
|
|
|
|
sizeof(TransactionId)); |
|
|
|
|
XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId)); |
|
|
|
|
XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_SETTS); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* CommitTS resource manager's routines |
|
|
|
|
*/ |
|
|
|
|
void |
|
|
|
|
commit_ts_redo(XLogReaderState *record) |
|
|
|
|
{ |
|
|
|
|
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; |
|
|
|
|
|
|
|
|
|
/* Backup blocks are not used in commit_ts records */ |
|
|
|
|
Assert(!XLogRecHasAnyBlockRefs(record)); |
|
|
|
|
|
|
|
|
|
if (info == COMMIT_TS_ZEROPAGE) |
|
|
|
|
{ |
|
|
|
|
int pageno; |
|
|
|
|
int slotno; |
|
|
|
|
|
|
|
|
|
memcpy(&pageno, XLogRecGetData(record), sizeof(int)); |
|
|
|
|
|
|
|
|
|
LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE); |
|
|
|
|
|
|
|
|
|
slotno = ZeroCommitTsPage(pageno, false); |
|
|
|
|
SimpleLruWritePage(CommitTsCtl, slotno); |
|
|
|
|
Assert(!CommitTsCtl->shared->page_dirty[slotno]); |
|
|
|
|
|
|
|
|
|
LWLockRelease(CommitTsControlLock); |
|
|
|
|
} |
|
|
|
|
else if (info == COMMIT_TS_TRUNCATE) |
|
|
|
|
{ |
|
|
|
|
int pageno; |
|
|
|
|
|
|
|
|
|
memcpy(&pageno, XLogRecGetData(record), sizeof(int)); |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* During XLOG replay, latest_page_number isn't set up yet; insert a |
|
|
|
|
* suitable value to bypass the sanity test in SimpleLruTruncate. |
|
|
|
|
*/ |
|
|
|
|
CommitTsCtl->shared->latest_page_number = pageno; |
|
|
|
|
|
|
|
|
|
SimpleLruTruncate(CommitTsCtl, pageno); |
|
|
|
|
} |
|
|
|
|
else if (info == COMMIT_TS_SETTS) |
|
|
|
|
{ |
|
|
|
|
xl_commit_ts_set *setts = (xl_commit_ts_set *) XLogRecGetData(record); |
|
|
|
|
int nsubxids; |
|
|
|
|
TransactionId *subxids; |
|
|
|
|
|
|
|
|
|
nsubxids = ((XLogRecGetDataLen(record) - SizeOfCommitTsSet) / |
|
|
|
|
sizeof(TransactionId)); |
|
|
|
|
if (nsubxids > 0) |
|
|
|
|
{ |
|
|
|
|
subxids = palloc(sizeof(TransactionId) * nsubxids); |
|
|
|
|
memcpy(subxids, |
|
|
|
|
XLogRecGetData(record) + SizeOfCommitTsSet, |
|
|
|
|
sizeof(TransactionId) * nsubxids); |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
subxids = NULL; |
|
|
|
|
|
|
|
|
|
TransactionTreeSetCommitTsData(setts->mainxid, nsubxids, subxids, |
|
|
|
|
setts->timestamp, setts->nodeid, false); |
|
|
|
|
if (subxids) |
|
|
|
|
pfree(subxids); |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
elog(PANIC, "commit_ts_redo: unknown op code %u", info); |
|
|
|
|
} |