mirror of https://github.com/postgres/postgres
on CORE previously. This module offers transaction ID's containing the original XID and the transaction epoch as a bigint value to the user level. It also provides a special txid_snapshot data type that contains an entire transactions visibility snapshot information, which is useful to determine if a particular txid was visible to a transaction or not. The module has been tested by porting Slony-I from using its original xxid data type. JanREL8_3_STABLE
parent
2a997a7065
commit
1f92630fc4
@ -0,0 +1,26 @@ |
||||
|
||||
MODULES = txid
|
||||
DATA_built = txid.sql
|
||||
DATA = uninstall_txid.sql
|
||||
DOCS = README.txid
|
||||
REGRESS = txid
|
||||
|
||||
|
||||
ifdef USE_PGXS |
||||
PG_CONFIG = pg_config
|
||||
PGXS := $(shell $(PG_CONFIG) --pgxs)
|
||||
include $(PGXS) |
||||
else |
||||
subdir = contrib/txid
|
||||
top_builddir = ../..
|
||||
include $(top_builddir)/src/Makefile.global |
||||
include $(top_srcdir)/contrib/contrib-global.mk |
||||
endif |
||||
|
||||
|
||||
test: install |
||||
$(MAKE) installcheck || { less regression.diffs; exit 1; }
|
||||
|
||||
ack: |
||||
cp results/* expected/
|
||||
|
@ -0,0 +1,111 @@ |
||||
|
||||
txid - export transaction id's to user level |
||||
============================================ |
||||
|
||||
The goal is to make PostgreSQL internal transaction ID and snapshot |
||||
data usable externally. This allows very efficient queue |
||||
implementation done inside database. |
||||
|
||||
[towrite: what snapshot means] |
||||
|
||||
The module defines type txid_snapshot and following functions: |
||||
|
||||
|
||||
txid_current() returns int8 |
||||
|
||||
Current transaction ID. |
||||
|
||||
txid_current_snapshot() returns txid_snapshot |
||||
|
||||
Current snapshot. |
||||
|
||||
txid_snapshot_xmin( snap ) returns int8 |
||||
|
||||
Smallest TXID in snapshot. TXID's smaller than this |
||||
are all visible in snapshot. |
||||
|
||||
txid_snapshot_xmax( snap ) returns int8 |
||||
|
||||
Largest TXID in snapshot. TXID's starting from this one are |
||||
all invisible in snapshot. |
||||
|
||||
txid_snapshot_xip( snap ) setof int8 |
||||
|
||||
List of in-progress TXID's in snapshot, that are invisible. |
||||
Values are between xmin and xmax. |
||||
|
||||
txid_visible_in_snapshot(id, snap) returns bool |
||||
|
||||
Is TXID visible in snapshot? |
||||
|
||||
|
||||
Fetching events |
||||
--------------- |
||||
|
||||
Lets say there is following event table: |
||||
|
||||
CREATE TABLE events ( |
||||
ev_txid int8 not null default txid_current(), |
||||
ev_data text |
||||
); |
||||
CREATE INDEX ev_txid_idx ON events (ev_txid); |
||||
|
||||
Then event between 2 snapshots snap1 and snap2 can be fetched |
||||
with followign query: |
||||
|
||||
SELECT ev_data FROM events |
||||
WHERE ev_txid >= txid_snapshot_xmin(:snap1) |
||||
AND ev_txid < txid_snapshot_xmax(:snap2) |
||||
AND NOT txid_visible_in_snapshot(ev_txid, :snap1) |
||||
AND txid_visible_in_snapshot(ev_txid, :snap2); |
||||
|
||||
This is the simplest query but it has problem if there are long |
||||
transactions running - the txid_snapshot_xmin(snap1) will stay low |
||||
and the range will get very large. |
||||
|
||||
This can be fixed by fetching only snap1.xmax ... snap1.xmax by range and |
||||
fetching possible txids below snap1.xmax explicitly: |
||||
|
||||
SELECT ev_data FROM events |
||||
WHERE ((ev_txid >= txid_snapshot_xmax(:snap1) AND ev_txid < txid_snapshot_xmax(:snap2)) |
||||
OR |
||||
(ev_txid IN (SELECT * FROM txid_snapshot_xip(:snap1)))) |
||||
AND NOT txid_visible_in_snapshot(ev_txid, :snap1) |
||||
AND txid_visible_in_snapshot(ev_txid, :snap2); |
||||
|
||||
Note that although the above queries work, the PostgreSQL fails to |
||||
plan them correctly. For actual usage the values for txid_snapshot_xmin, |
||||
txid_snapshot_xmax and txid_snapshot_xip should be filled in directly, |
||||
only then will they use index. |
||||
|
||||
There are few more optimizations possible, like: |
||||
|
||||
- Picking out only TXIDs that were actually committed between snap1 and snap2. |
||||
|
||||
- Lowering the range from txid_snapshot_xmax(snap1) to decrease the list if TXIDs to be fetched separately. |
||||
|
||||
To see example code for that it's best to see pgq.batch_event_sql() function in Skytools. |
||||
|
||||
http://pgfoundry.org/projects/skytools/ |
||||
|
||||
|
||||
Dumping and restoring data containing TXIDs. |
||||
-------------------------------------------- |
||||
|
||||
[towrite: reason for epoch increase] |
||||
|
||||
You can look at current epoch with query: |
||||
|
||||
SELECT txid_current() >> 32 as epoch; |
||||
|
||||
So new epoch should be: |
||||
|
||||
SELECT (txid_current() >> 32) + 1 as newepoch; |
||||
|
||||
Epoch can be changed with pg_resetxlog command: |
||||
|
||||
pg_resetxlog -e NEWEPOCH DATADIR |
||||
|
||||
Database needs to be shut down for that moment. |
||||
|
||||
|
@ -0,0 +1,212 @@ |
||||
-- init |
||||
\set ECHO none |
||||
-- i/o |
||||
select '12:13:'::txid_snapshot; |
||||
txid_snapshot |
||||
--------------- |
||||
12:13: |
||||
(1 row) |
||||
|
||||
select '12:13:1,2'::txid_snapshot; |
||||
ERROR: illegal txid_snapshot input format |
||||
-- errors |
||||
select '31:12:'::txid_snapshot; |
||||
ERROR: illegal txid_snapshot input format |
||||
select '0:1:'::txid_snapshot; |
||||
ERROR: illegal txid_snapshot input format |
||||
select '12:13:0'::txid_snapshot; |
||||
ERROR: illegal txid_snapshot input format |
||||
select '12:16:14,13'::txid_snapshot; |
||||
ERROR: illegal txid_snapshot input format |
||||
select '12:16:14,14'::txid_snapshot; |
||||
ERROR: illegal txid_snapshot input format |
||||
create table snapshot_test ( |
||||
nr integer, |
||||
snap txid_snapshot |
||||
); |
||||
insert into snapshot_test values (1, '12:13:'); |
||||
insert into snapshot_test values (2, '12:20:13,15,18'); |
||||
insert into snapshot_test values (3, '100001:100009:100005,100007,100008'); |
||||
insert into snapshot_test values (4, '100:150:101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127,128,129,130,131'); |
||||
select snap from snapshot_test order by nr; |
||||
snap |
||||
------------------------------------------------------------------------------------------------------------------------------------- |
||||
12:13: |
||||
12:20:13,15,18 |
||||
100001:100009:100005,100007,100008 |
||||
100:150:101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127,128,129,130,131 |
||||
(4 rows) |
||||
|
||||
select txid_snapshot_xmin(snap), |
||||
txid_snapshot_xmax(snap), |
||||
txid_snapshot_xip(snap) |
||||
from snapshot_test order by nr; |
||||
txid_snapshot_xmin | txid_snapshot_xmax | txid_snapshot_xip |
||||
--------------------+--------------------+------------------- |
||||
12 | 20 | 13 |
||||
12 | 20 | 15 |
||||
12 | 20 | 18 |
||||
100001 | 100009 | 100005 |
||||
100001 | 100009 | 100007 |
||||
100001 | 100009 | 100008 |
||||
100 | 150 | 101 |
||||
100 | 150 | 102 |
||||
100 | 150 | 103 |
||||
100 | 150 | 104 |
||||
100 | 150 | 105 |
||||
100 | 150 | 106 |
||||
100 | 150 | 107 |
||||
100 | 150 | 108 |
||||
100 | 150 | 109 |
||||
100 | 150 | 110 |
||||
100 | 150 | 111 |
||||
100 | 150 | 112 |
||||
100 | 150 | 113 |
||||
100 | 150 | 114 |
||||
100 | 150 | 115 |
||||
100 | 150 | 116 |
||||
100 | 150 | 117 |
||||
100 | 150 | 118 |
||||
100 | 150 | 119 |
||||
100 | 150 | 120 |
||||
100 | 150 | 121 |
||||
100 | 150 | 122 |
||||
100 | 150 | 123 |
||||
100 | 150 | 124 |
||||
100 | 150 | 125 |
||||
100 | 150 | 126 |
||||
100 | 150 | 127 |
||||
100 | 150 | 128 |
||||
100 | 150 | 129 |
||||
100 | 150 | 130 |
||||
100 | 150 | 131 |
||||
(37 rows) |
||||
|
||||
select id, txid_visible_in_snapshot(id, snap) |
||||
from snapshot_test, generate_series(11, 21) id |
||||
where nr = 2; |
||||
id | txid_visible_in_snapshot |
||||
----+-------------------------- |
||||
11 | t |
||||
12 | t |
||||
13 | f |
||||
14 | t |
||||
15 | f |
||||
16 | t |
||||
17 | t |
||||
18 | f |
||||
19 | t |
||||
20 | f |
||||
21 | f |
||||
(11 rows) |
||||
|
||||
-- test bsearch |
||||
select id, txid_visible_in_snapshot(id, snap) |
||||
from snapshot_test, generate_series(90, 160) id |
||||
where nr = 4; |
||||
id | txid_visible_in_snapshot |
||||
-----+-------------------------- |
||||
90 | t |
||||
91 | t |
||||
92 | t |
||||
93 | t |
||||
94 | t |
||||
95 | t |
||||
96 | t |
||||
97 | t |
||||
98 | t |
||||
99 | t |
||||
100 | t |
||||
101 | f |
||||
102 | f |
||||
103 | f |
||||
104 | f |
||||
105 | f |
||||
106 | f |
||||
107 | f |
||||
108 | f |
||||
109 | f |
||||
110 | f |
||||
111 | f |
||||
112 | f |
||||
113 | f |
||||
114 | f |
||||
115 | f |
||||
116 | f |
||||
117 | f |
||||
118 | f |
||||
119 | f |
||||
120 | f |
||||
121 | f |
||||
122 | f |
||||
123 | f |
||||
124 | f |
||||
125 | f |
||||
126 | f |
||||
127 | f |
||||
128 | f |
||||
129 | f |
||||
130 | f |
||||
131 | f |
||||
132 | t |
||||
133 | t |
||||
134 | t |
||||
135 | t |
||||
136 | t |
||||
137 | t |
||||
138 | t |
||||
139 | t |
||||
140 | t |
||||
141 | t |
||||
142 | t |
||||
143 | t |
||||
144 | t |
||||
145 | t |
||||
146 | t |
||||
147 | t |
||||
148 | t |
||||
149 | t |
||||
150 | f |
||||
151 | f |
||||
152 | f |
||||
153 | f |
||||
154 | f |
||||
155 | f |
||||
156 | f |
||||
157 | f |
||||
158 | f |
||||
159 | f |
||||
160 | f |
||||
(71 rows) |
||||
|
||||
-- test current values also |
||||
select txid_current() >= txid_snapshot_xmin(txid_current_snapshot()); |
||||
?column? |
||||
---------- |
||||
t |
||||
(1 row) |
||||
|
||||
/* due to lazy xid alloc in 8.3 those are different in 8.2 and 8.3 |
||||
select txid_current() < txid_snapshot_xmax(txid_current_snapshot()); |
||||
|
||||
select txid_visible_in_snapshot(txid_current(), txid_current_snapshot()); |
||||
*/ |
||||
-- test 64bitness |
||||
select txid_snapshot '1000100010001000:1000100010001100:1000100010001012,1000100010001013'; |
||||
txid_snapshot |
||||
--------------------------------------------------------------------- |
||||
1000100010001000:1000100010001100:1000100010001012,1000100010001013 |
||||
(1 row) |
||||
|
||||
select txid_visible_in_snapshot('1000100010001012', '1000100010001000:1000100010001100:1000100010001012,1000100010001013'); |
||||
txid_visible_in_snapshot |
||||
-------------------------- |
||||
f |
||||
(1 row) |
||||
|
||||
select txid_visible_in_snapshot('1000100010001015', '1000100010001000:1000100010001100:1000100010001012,1000100010001013'); |
||||
txid_visible_in_snapshot |
||||
-------------------------- |
||||
t |
||||
(1 row) |
||||
|
@ -0,0 +1,58 @@ |
||||
-- init |
||||
\set ECHO none |
||||
set client_min_messages = 'warning'; |
||||
\i txid.sql |
||||
set client_min_messages = 'notice'; |
||||
\set ECHO all |
||||
|
||||
-- i/o |
||||
select '12:13:'::txid_snapshot; |
||||
select '12:13:1,2'::txid_snapshot; |
||||
|
||||
-- errors |
||||
select '31:12:'::txid_snapshot; |
||||
select '0:1:'::txid_snapshot; |
||||
select '12:13:0'::txid_snapshot; |
||||
select '12:16:14,13'::txid_snapshot; |
||||
select '12:16:14,14'::txid_snapshot; |
||||
|
||||
create table snapshot_test ( |
||||
nr integer, |
||||
snap txid_snapshot |
||||
); |
||||
|
||||
insert into snapshot_test values (1, '12:13:'); |
||||
insert into snapshot_test values (2, '12:20:13,15,18'); |
||||
insert into snapshot_test values (3, '100001:100009:100005,100007,100008'); |
||||
insert into snapshot_test values (4, '100:150:101,102,103,104,105,106,107,108,109,110,111,112,113,114,115,116,117,118,119,120,121,122,123,124,125,126,127,128,129,130,131'); |
||||
select snap from snapshot_test order by nr; |
||||
|
||||
select txid_snapshot_xmin(snap), |
||||
txid_snapshot_xmax(snap), |
||||
txid_snapshot_xip(snap) |
||||
from snapshot_test order by nr; |
||||
|
||||
select id, txid_visible_in_snapshot(id, snap) |
||||
from snapshot_test, generate_series(11, 21) id |
||||
where nr = 2; |
||||
|
||||
-- test bsearch |
||||
select id, txid_visible_in_snapshot(id, snap) |
||||
from snapshot_test, generate_series(90, 160) id |
||||
where nr = 4; |
||||
|
||||
-- test current values also |
||||
select txid_current() >= txid_snapshot_xmin(txid_current_snapshot()); |
||||
|
||||
/* due to lazy xid alloc in 8.3 those are different in 8.2 and 8.3 |
||||
select txid_current() < txid_snapshot_xmax(txid_current_snapshot()); |
||||
|
||||
select txid_visible_in_snapshot(txid_current(), txid_current_snapshot()); |
||||
*/ |
||||
|
||||
-- test 64bitness |
||||
|
||||
select txid_snapshot '1000100010001000:1000100010001100:1000100010001012,1000100010001013'; |
||||
select txid_visible_in_snapshot('1000100010001012', '1000100010001000:1000100010001100:1000100010001012,1000100010001013'); |
||||
select txid_visible_in_snapshot('1000100010001015', '1000100010001000:1000100010001100:1000100010001012,1000100010001013'); |
||||
|
@ -0,0 +1,475 @@ |
||||
/*-------------------------------------------------------------------------
|
||||
* txid.c |
||||
* |
||||
* Export backend internal tranasction id's to user level. |
||||
* |
||||
* Copyright (c) 2003-2007, PostgreSQL Global Development Group |
||||
* Author: Jan Wieck, Afilias USA INC. |
||||
* |
||||
* 64-bit txids: Marko Kreen, Skype Technologies |
||||
*------------------------------------------------------------------------- |
||||
*/ |
||||
|
||||
#include "postgres.h" |
||||
|
||||
#include "access/transam.h" |
||||
#include "access/xact.h" |
||||
#include "funcapi.h" |
||||
|
||||
#ifdef PG_MODULE_MAGIC |
||||
PG_MODULE_MAGIC; |
||||
#endif |
||||
|
||||
#ifdef INT64_IS_BUSTED |
||||
#error txid needs working int64 |
||||
#endif |
||||
|
||||
/* txid will be signed int8 in database */ |
||||
#define MAX_TXID UINT64CONST(0x7FFFFFFFFFFFFFFF) |
||||
|
||||
/*
|
||||
* If defined, use bsearch() function for searching |
||||
* txid's inside snapshots that have more than given values. |
||||
*/ |
||||
#define USE_BSEARCH_IF_NXIP_GREATER 30 |
||||
|
||||
/* format code for uint64 to appendStringInfo */ |
||||
#define TXID_FMT UINT64_FORMAT |
||||
|
||||
/* Use unsigned variant internally */ |
||||
typedef uint64 txid; |
||||
|
||||
/*
|
||||
* Snapshot for 8byte txids. |
||||
*/ |
||||
typedef struct |
||||
{ |
||||
/*
|
||||
* 4-byte length hdr, should not be touched directly. |
||||
* |
||||
* Explicit embedding is ok as we want always correct |
||||
* alignment anyway. |
||||
*/ |
||||
int32 __varsz; |
||||
|
||||
uint32 nxip; /* number of txids in xip array */ |
||||
txid xmin; |
||||
txid xmax; |
||||
txid xip[1]; /* in-progress txids */ |
||||
} TxidSnapshot; |
||||
|
||||
#define TXID_SNAPSHOT_SIZE(nxip) (offsetof(TxidSnapshot, xip) + sizeof(txid) * (nxip)) |
||||
|
||||
/*
|
||||
* Epoch values from backend. |
||||
*/ |
||||
typedef struct { |
||||
uint64 last_value; |
||||
uint64 epoch; |
||||
} TxidEpoch; |
||||
|
||||
/* public functions */ |
||||
Datum txid_snapshot_in(PG_FUNCTION_ARGS); |
||||
Datum txid_snapshot_out(PG_FUNCTION_ARGS); |
||||
Datum txid_current(PG_FUNCTION_ARGS); |
||||
Datum txid_current_snapshot(PG_FUNCTION_ARGS); |
||||
Datum txid_snapshot_xmin(PG_FUNCTION_ARGS); |
||||
Datum txid_snapshot_xmax(PG_FUNCTION_ARGS); |
||||
Datum txid_snapshot_xip(PG_FUNCTION_ARGS); |
||||
Datum txid_visible_in_snapshot(PG_FUNCTION_ARGS); |
||||
|
||||
/* public function tags */ |
||||
PG_FUNCTION_INFO_V1(txid_snapshot_in); |
||||
PG_FUNCTION_INFO_V1(txid_snapshot_out); |
||||
PG_FUNCTION_INFO_V1(txid_current); |
||||
PG_FUNCTION_INFO_V1(txid_current_snapshot); |
||||
PG_FUNCTION_INFO_V1(txid_snapshot_xmin); |
||||
PG_FUNCTION_INFO_V1(txid_snapshot_xmax); |
||||
PG_FUNCTION_INFO_V1(txid_snapshot_xip); |
||||
PG_FUNCTION_INFO_V1(txid_visible_in_snapshot); |
||||
|
||||
/*
|
||||
* do a TransactionId -> txid conversion |
||||
*/ |
||||
static txid |
||||
convert_xid(TransactionId xid, const TxidEpoch *state) |
||||
{ |
||||
uint64 epoch; |
||||
|
||||
/* return special xid's as-is */ |
||||
if (xid < FirstNormalTransactionId) |
||||
return xid; |
||||
|
||||
/* xid can on both sides on wrap-around */ |
||||
epoch = state->epoch; |
||||
if (TransactionIdPrecedes(xid, state->last_value)) { |
||||
if (xid > state->last_value) |
||||
epoch--; |
||||
} else if (TransactionIdFollows(xid, state->last_value)) { |
||||
if (xid < state->last_value) |
||||
epoch++; |
||||
} |
||||
return (epoch << 32) | xid; |
||||
} |
||||
|
||||
/*
|
||||
* Fetch epoch data from backend. |
||||
*/ |
||||
static void |
||||
load_xid_epoch(TxidEpoch *state) |
||||
{ |
||||
TransactionId xid; |
||||
uint32 epoch; |
||||
|
||||
GetNextXidAndEpoch(&xid, &epoch); |
||||
|
||||
state->epoch = epoch; |
||||
state->last_value = xid; |
||||
} |
||||
|
||||
/*
|
||||
* compare txid in memory. |
||||
*/ |
||||
static int |
||||
cmp_txid(const void *aa, const void *bb) |
||||
{ |
||||
const uint64 *a = aa; |
||||
const uint64 *b = bb; |
||||
if (*a < *b) |
||||
return -1; |
||||
if (*a > *b) |
||||
return 1; |
||||
return 0; |
||||
} |
||||
|
||||
/*
|
||||
* order txids, for bsearch(). |
||||
*/ |
||||
static void |
||||
sort_snapshot(TxidSnapshot *snap) |
||||
{ |
||||
if (snap->nxip > 1) |
||||
qsort(snap->xip, snap->nxip, sizeof(txid), cmp_txid); |
||||
} |
||||
|
||||
/*
|
||||
* check txid visibility. |
||||
*/ |
||||
static bool |
||||
is_visible_txid(txid value, const TxidSnapshot *snap) |
||||
{ |
||||
if (value < snap->xmin) |
||||
return true; |
||||
else if (value >= snap->xmax) |
||||
return false; |
||||
#ifdef USE_BSEARCH_IF_NXIP_GREATER |
||||
else if (snap->nxip > USE_BSEARCH_IF_NXIP_GREATER) |
||||
{ |
||||
void *res; |
||||
res = bsearch(&value, snap->xip, snap->nxip, sizeof(txid), cmp_txid); |
||||
return (res) ? false : true; |
||||
} |
||||
#endif |
||||
else |
||||
{ |
||||
int i; |
||||
for (i = 0; i < snap->nxip; i++) |
||||
{ |
||||
if (value == snap->xip[i]) |
||||
return false; |
||||
} |
||||
return true; |
||||
} |
||||
} |
||||
|
||||
/*
|
||||
* helper functions to use StringInfo for TxidSnapshot creation. |
||||
*/ |
||||
|
||||
static StringInfo |
||||
buf_init(txid xmin, txid xmax) |
||||
{ |
||||
TxidSnapshot snap; |
||||
StringInfo buf; |
||||
|
||||
snap.xmin = xmin; |
||||
snap.xmax = xmax; |
||||
snap.nxip = 0; |
||||
|
||||
buf = makeStringInfo(); |
||||
appendBinaryStringInfo(buf, (char *)&snap, TXID_SNAPSHOT_SIZE(0)); |
||||
return buf; |
||||
} |
||||
|
||||
static void |
||||
buf_add_txid(StringInfo buf, txid xid) |
||||
{ |
||||
TxidSnapshot *snap = (TxidSnapshot *)buf->data; |
||||
|
||||
/* do it before possible realloc */ |
||||
snap->nxip++; |
||||
|
||||
appendBinaryStringInfo(buf, (char *)&xid, sizeof(xid)); |
||||
} |
||||
|
||||
static TxidSnapshot * |
||||
buf_finalize(StringInfo buf) |
||||
{ |
||||
TxidSnapshot *snap = (TxidSnapshot *)buf->data; |
||||
SET_VARSIZE(snap, buf->len); |
||||
|
||||
/* buf is not needed anymore */ |
||||
buf->data = NULL; |
||||
pfree(buf); |
||||
|
||||
return snap; |
||||
} |
||||
|
||||
/*
|
||||
* parse snapshot from cstring |
||||
*/ |
||||
static TxidSnapshot * |
||||
parse_snapshot(const char *str) |
||||
{ |
||||
txid xmin; |
||||
txid xmax; |
||||
txid last_val = 0, val; |
||||
char *endp; |
||||
StringInfo buf; |
||||
|
||||
xmin = (txid) strtoull(str, &endp, 0); |
||||
if (*endp != ':') |
||||
goto bad_format; |
||||
str = endp + 1; |
||||
|
||||
xmax = (txid) strtoull(str, &endp, 0); |
||||
if (*endp != ':') |
||||
goto bad_format; |
||||
str = endp + 1; |
||||
|
||||
/* it should look sane */ |
||||
if (xmin > xmax || xmin == 0 || xmax > MAX_TXID) |
||||
goto bad_format; |
||||
|
||||
/* allocate buffer */ |
||||
buf = buf_init(xmin, xmax); |
||||
|
||||
/* loop over values */ |
||||
while (*str != '\0') |
||||
{ |
||||
/* read next value */ |
||||
val = (txid) strtoull(str, &endp, 0); |
||||
str = endp; |
||||
|
||||
/* require the input to be in order */ |
||||
if (val < xmin || val <= last_val || val >= xmax) |
||||
goto bad_format; |
||||
|
||||
buf_add_txid(buf, val); |
||||
last_val = val; |
||||
|
||||
if (*str == ',') |
||||
str++; |
||||
else if (*str != '\0') |
||||
goto bad_format; |
||||
} |
||||
|
||||
return buf_finalize(buf); |
||||
|
||||
bad_format: |
||||
elog(ERROR, "illegal txid_snapshot input format"); |
||||
return NULL; |
||||
} |
||||
|
||||
/*
|
||||
* Public functions |
||||
*/ |
||||
|
||||
/*
|
||||
* txid_current() returns int8 |
||||
* |
||||
* Return the current transaction ID |
||||
*/ |
||||
Datum |
||||
txid_current(PG_FUNCTION_ARGS) |
||||
{ |
||||
txid val; |
||||
TxidEpoch state; |
||||
|
||||
load_xid_epoch(&state); |
||||
|
||||
val = convert_xid(GetTopTransactionId(), &state); |
||||
|
||||
PG_RETURN_INT64(val); |
||||
} |
||||
|
||||
/*
|
||||
* txid_current_snapshot() returns txid_snapshot |
||||
* |
||||
* Return current snapshot |
||||
*/ |
||||
Datum |
||||
txid_current_snapshot(PG_FUNCTION_ARGS) |
||||
{ |
||||
TxidSnapshot *snap; |
||||
unsigned nxip, i, size; |
||||
TxidEpoch state; |
||||
Snapshot cur; |
||||
|
||||
cur = SerializableSnapshot; |
||||
if (cur == NULL) |
||||
elog(ERROR, "get_current_snapshot: SerializableSnapshot == NULL"); |
||||
|
||||
load_xid_epoch(&state); |
||||
|
||||
/* allocate */ |
||||
nxip = cur->xcnt; |
||||
size = TXID_SNAPSHOT_SIZE(nxip); |
||||
snap = palloc(size); |
||||
SET_VARSIZE(snap, size); |
||||
|
||||
/* fill */ |
||||
snap->xmin = convert_xid(cur->xmin, &state); |
||||
snap->xmax = convert_xid(cur->xmax, &state); |
||||
snap->nxip = nxip; |
||||
for (i = 0; i < nxip; i++) |
||||
snap->xip[i] = convert_xid(cur->xip[i], &state); |
||||
|
||||
/* we want them guaranteed ascending order */ |
||||
sort_snapshot(snap); |
||||
|
||||
PG_RETURN_POINTER(snap); |
||||
} |
||||
|
||||
/*
|
||||
* txid_snapshot_in(cstring) returns txid_snapshot |
||||
* |
||||
* input function for type txid_snapshot |
||||
*/ |
||||
Datum |
||||
txid_snapshot_in(PG_FUNCTION_ARGS) |
||||
{ |
||||
TxidSnapshot *snap; |
||||
char *str = PG_GETARG_CSTRING(0); |
||||
|
||||
snap = parse_snapshot(str); |
||||
|
||||
PG_RETURN_POINTER(snap); |
||||
} |
||||
|
||||
/*
|
||||
* txid_snapshot_out(txid_snapshot) returns cstring |
||||
* |
||||
* output function for type txid_snapshot |
||||
*/ |
||||
Datum |
||||
txid_snapshot_out(PG_FUNCTION_ARGS) |
||||
{ |
||||
TxidSnapshot *snap; |
||||
StringInfoData str; |
||||
int i; |
||||
|
||||
snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0); |
||||
|
||||
initStringInfo(&str); |
||||
|
||||
appendStringInfo(&str, TXID_FMT ":", snap->xmin); |
||||
appendStringInfo(&str, TXID_FMT ":", snap->xmax); |
||||
|
||||
for (i = 0; i < snap->nxip; i++) |
||||
{ |
||||
appendStringInfo(&str, "%s" TXID_FMT, |
||||
((i > 0) ? "," : ""), |
||||
snap->xip[i]); |
||||
} |
||||
|
||||
PG_FREE_IF_COPY(snap, 0); |
||||
|
||||
PG_RETURN_CSTRING(str.data); |
||||
} |
||||
|
||||
|
||||
/*
|
||||
* txid_visible_in_snapshot(int8, txid_snapshot) returns bool |
||||
* |
||||
* is txid visible in snapshot ? |
||||
*/ |
||||
Datum |
||||
txid_visible_in_snapshot(PG_FUNCTION_ARGS) |
||||
{ |
||||
txid value = PG_GETARG_INT64(0); |
||||
TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(1); |
||||
int res; |
||||
|
||||
res = is_visible_txid(value, snap) ? true : false; |
||||
|
||||
PG_FREE_IF_COPY(snap, 1); |
||||
PG_RETURN_BOOL(res); |
||||
} |
||||
|
||||
/*
|
||||
* txid_snapshot_xmin(txid_snapshot) returns int8 |
||||
* |
||||
* return snapshot's xmin |
||||
*/ |
||||
Datum |
||||
txid_snapshot_xmin(PG_FUNCTION_ARGS) |
||||
{ |
||||
TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0); |
||||
txid res = snap->xmin; |
||||
PG_FREE_IF_COPY(snap, 0); |
||||
PG_RETURN_INT64(res); |
||||
} |
||||
|
||||
/*
|
||||
* txid_snapshot_xmax(txid_snapshot) returns int8 |
||||
* |
||||
* return snapshot's xmax |
||||
*/ |
||||
Datum |
||||
txid_snapshot_xmax(PG_FUNCTION_ARGS) |
||||
{ |
||||
TxidSnapshot *snap = (TxidSnapshot *) PG_GETARG_VARLENA_P(0); |
||||
txid res = snap->xmax; |
||||
PG_FREE_IF_COPY(snap, 0); |
||||
PG_RETURN_INT64(res); |
||||
} |
||||
|
||||
/*
|
||||
* txid_snapshot_xip(txid_snapshot) returns setof int8 |
||||
* |
||||
* return in-progress TXIDs in snapshot. |
||||
*/ |
||||
Datum |
||||
txid_snapshot_xip(PG_FUNCTION_ARGS) |
||||
{ |
||||
FuncCallContext *fctx; |
||||
TxidSnapshot *snap; |
||||
txid value; |
||||
|
||||
/* on first call initialize snap_state and get copy of snapshot */ |
||||
if (SRF_IS_FIRSTCALL()) { |
||||
TxidSnapshot *arg; |
||||
|
||||
fctx = SRF_FIRSTCALL_INIT(); |
||||
|
||||
/* make a copy of user snapshot */ |
||||
arg = (TxidSnapshot *) PG_GETARG_VARLENA_P(0); |
||||
snap = MemoryContextAlloc(fctx->multi_call_memory_ctx, VARSIZE(arg)); |
||||
memcpy(snap, arg, VARSIZE(arg)); |
||||
PG_FREE_IF_COPY(arg, 0); |
||||
|
||||
fctx->user_fctx = snap; |
||||
} |
||||
|
||||
/* return values one-by-one */ |
||||
fctx = SRF_PERCALL_SETUP(); |
||||
snap = fctx->user_fctx; |
||||
if (fctx->call_cntr < snap->nxip) { |
||||
value = snap->xip[fctx->call_cntr]; |
||||
SRF_RETURN_NEXT(fctx, Int64GetDatum(value)); |
||||
} else { |
||||
SRF_RETURN_DONE(fctx); |
||||
} |
||||
} |
||||
|
@ -0,0 +1,68 @@ |
||||
-- ---------- |
||||
-- txid.sql |
||||
-- |
||||
-- SQL script for loading the transaction ID compatible datatype |
||||
-- |
||||
-- Copyright (c) 2003-2007, PostgreSQL Global Development Group |
||||
-- Author: Jan Wieck, Afilias USA INC. |
||||
-- |
||||
-- 64-bit txids: Marko Kreen, Skype Technologies |
||||
-- ---------- |
||||
|
||||
-- |
||||
-- A special transaction snapshot data type for faster visibility checks |
||||
-- |
||||
CREATE OR REPLACE FUNCTION txid_snapshot_in(cstring) |
||||
RETURNS txid_snapshot |
||||
AS 'MODULE_PATHNAME' LANGUAGE C |
||||
IMMUTABLE STRICT; |
||||
CREATE OR REPLACE FUNCTION txid_snapshot_out(txid_snapshot) |
||||
RETURNS cstring |
||||
AS 'MODULE_PATHNAME' LANGUAGE C |
||||
IMMUTABLE STRICT; |
||||
|
||||
-- |
||||
-- The data type itself |
||||
-- |
||||
CREATE TYPE txid_snapshot ( |
||||
INPUT = txid_snapshot_in, |
||||
OUTPUT = txid_snapshot_out, |
||||
INTERNALLENGTH = variable, |
||||
STORAGE = extended, |
||||
ALIGNMENT = double |
||||
); |
||||
|
||||
CREATE OR REPLACE FUNCTION txid_current() |
||||
RETURNS bigint |
||||
AS 'MODULE_PATHNAME', 'txid_current' LANGUAGE C |
||||
STABLE; |
||||
|
||||
CREATE OR REPLACE FUNCTION txid_current_snapshot() |
||||
RETURNS txid_snapshot |
||||
AS 'MODULE_PATHNAME', 'txid_current_snapshot' LANGUAGE C |
||||
STABLE; |
||||
|
||||
CREATE OR REPLACE FUNCTION txid_snapshot_xmin(txid_snapshot) |
||||
RETURNS bigint |
||||
AS 'MODULE_PATHNAME', 'txid_snapshot_xmin' LANGUAGE C |
||||
IMMUTABLE STRICT; |
||||
|
||||
CREATE OR REPLACE FUNCTION txid_snapshot_xmax(txid_snapshot) |
||||
RETURNS bigint |
||||
AS 'MODULE_PATHNAME', 'txid_snapshot_xmax' LANGUAGE C |
||||
IMMUTABLE STRICT; |
||||
|
||||
CREATE OR REPLACE FUNCTION txid_snapshot_xip(txid_snapshot) |
||||
RETURNS setof bigint |
||||
AS 'MODULE_PATHNAME', 'txid_snapshot_xip' LANGUAGE C |
||||
IMMUTABLE STRICT; |
||||
|
||||
|
||||
-- |
||||
-- Special comparision functions for visibility checks |
||||
-- |
||||
CREATE OR REPLACE FUNCTION txid_visible_in_snapshot(bigint, txid_snapshot) |
||||
RETURNS boolean |
||||
AS 'MODULE_PATHNAME', 'txid_visible_in_snapshot' LANGUAGE C |
||||
IMMUTABLE STRICT; |
||||
|
@ -0,0 +1,15 @@ |
||||
|
||||
|
||||
DROP FUNCTION txid_current(); |
||||
DROP FUNCTION txid_current_snapshot(); |
||||
DROP FUNCTION txid_snapshot_xmin(txid_snapshot); |
||||
DROP FUNCTION txid_snapshot_xmax(txid_snapshot); |
||||
DROP FUNCTION txid_snapshot_xip(txid_snapshot); |
||||
DROP FUNCTION txid_visible_in_snapshot(bigint, txid_snapshot); |
||||
DROP FUNCTION txid_not_visible_in_snapshot(bigint, txid_snapshot); |
||||
|
||||
DROP TYPE txid_snapshot cascade; |
||||
-- need cascade to drop those: |
||||
-- DROP FUNCTION txid_snapshot_in(cstring); |
||||
-- DROP FUNCTION txid_snapshot_out(txid_snapshot); |
||||
|
Loading…
Reference in new issue