mirror of https://github.com/postgres/postgres
API and mechanism to allow generic messages to be inserted into WAL that are intended to be read by logical decoding plugins. This commit adds an optional new callback to the logical decoding API. Messages are either text or bytea. Messages can be transactional, or not, and are identified by a prefix to allow multiple concurrent decoding plugins. (Not to be confused with Generic WAL records, which are intended to allow crash recovery of extensible objects.) Author: Petr Jelinek and Andres Freund Reviewers: Artur Zakirov, Tomas Vondra, Simon Riggs Discussion: 5685F999.6010202@2ndquadrant.compull/11/head
parent
989be0810d
commit
3fe3511d05
@ -0,0 +1,79 @@ |
||||
-- predictability |
||||
SET synchronous_commit = on; |
||||
SET client_encoding = 'utf8'; |
||||
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); |
||||
?column? |
||||
---------- |
||||
init |
||||
(1 row) |
||||
|
||||
SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1'); |
||||
?column? |
||||
---------- |
||||
msg1 |
||||
(1 row) |
||||
|
||||
SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2'); |
||||
?column? |
||||
---------- |
||||
msg2 |
||||
(1 row) |
||||
|
||||
BEGIN; |
||||
SELECT 'msg3' FROM pg_logical_emit_message(true, 'test', 'msg3'); |
||||
?column? |
||||
---------- |
||||
msg3 |
||||
(1 row) |
||||
|
||||
SELECT 'msg4' FROM pg_logical_emit_message(false, 'test', 'msg4'); |
||||
?column? |
||||
---------- |
||||
msg4 |
||||
(1 row) |
||||
|
||||
ROLLBACK; |
||||
BEGIN; |
||||
SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', 'msg5'); |
||||
?column? |
||||
---------- |
||||
msg5 |
||||
(1 row) |
||||
|
||||
SELECT 'msg6' FROM pg_logical_emit_message(false, 'test', 'msg6'); |
||||
?column? |
||||
---------- |
||||
msg6 |
||||
(1 row) |
||||
|
||||
SELECT 'msg7' FROM pg_logical_emit_message(true, 'test', 'msg7'); |
||||
?column? |
||||
---------- |
||||
msg7 |
||||
(1 row) |
||||
|
||||
COMMIT; |
||||
SELECT 'žluťoučký kůň' FROM pg_logical_emit_message(true, 'test', 'žluťoučký kůň'); |
||||
?column? |
||||
--------------- |
||||
žluťoučký kůň |
||||
(1 row) |
||||
|
||||
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1'); |
||||
data |
||||
---------------------------------------------------------------------- |
||||
message: transactional: 1 prefix: test, sz: 4 content:msg1 |
||||
message: transactional: 0 prefix: test, sz: 4 content:msg2 |
||||
message: transactional: 0 prefix: test, sz: 4 content:msg4 |
||||
message: transactional: 0 prefix: test, sz: 4 content:msg6 |
||||
message: transactional: 1 prefix: test, sz: 4 content:msg5 |
||||
message: transactional: 1 prefix: test, sz: 4 content:msg7 |
||||
message: transactional: 1 prefix: test, sz: 19 content:žluťoučký kůň |
||||
(7 rows) |
||||
|
||||
SELECT 'init' FROM pg_drop_replication_slot('regression_slot'); |
||||
?column? |
||||
---------- |
||||
init |
||||
(1 row) |
||||
|
@ -0,0 +1,25 @@ |
||||
-- predictability |
||||
SET synchronous_commit = on; |
||||
SET client_encoding = 'utf8'; |
||||
|
||||
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); |
||||
|
||||
SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1'); |
||||
SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2'); |
||||
|
||||
BEGIN; |
||||
SELECT 'msg3' FROM pg_logical_emit_message(true, 'test', 'msg3'); |
||||
SELECT 'msg4' FROM pg_logical_emit_message(false, 'test', 'msg4'); |
||||
ROLLBACK; |
||||
|
||||
BEGIN; |
||||
SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', 'msg5'); |
||||
SELECT 'msg6' FROM pg_logical_emit_message(false, 'test', 'msg6'); |
||||
SELECT 'msg7' FROM pg_logical_emit_message(true, 'test', 'msg7'); |
||||
COMMIT; |
||||
|
||||
SELECT 'žluťoučký kůň' FROM pg_logical_emit_message(true, 'test', 'žluťoučký kůň'); |
||||
|
||||
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1'); |
||||
|
||||
SELECT 'init' FROM pg_drop_replication_slot('regression_slot'); |
@ -0,0 +1,41 @@ |
||||
/*-------------------------------------------------------------------------
|
||||
* |
||||
* logicalmsgdesc.c |
||||
* rmgr descriptor routines for replication/logical/message.c |
||||
* |
||||
* Portions Copyright (c) 2015-2016, PostgreSQL Global Development Group |
||||
* |
||||
* |
||||
* IDENTIFICATION |
||||
* src/backend/access/rmgrdesc/logicalmsgdesc.c |
||||
* |
||||
*------------------------------------------------------------------------- |
||||
*/ |
||||
#include "postgres.h" |
||||
|
||||
#include "replication/message.h" |
||||
|
||||
void |
||||
logicalmsg_desc(StringInfo buf, XLogReaderState *record) |
||||
{ |
||||
char *rec = XLogRecGetData(record); |
||||
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; |
||||
|
||||
if (info == XLOG_LOGICAL_MESSAGE) |
||||
{ |
||||
xl_logical_message *xlrec = (xl_logical_message *) rec; |
||||
|
||||
appendStringInfo(buf, "%s message size %zu bytes", |
||||
xlrec->transactional ? "transactional" : "nontransactional", |
||||
xlrec->message_size); |
||||
} |
||||
} |
||||
|
||||
const char * |
||||
logicalmsg_identify(uint8 info) |
||||
{ |
||||
if ((info & ~XLR_INFO_MASK) == XLOG_LOGICAL_MESSAGE) |
||||
return "MESSAGE"; |
||||
|
||||
return NULL; |
||||
} |
@ -0,0 +1,87 @@ |
||||
/*-------------------------------------------------------------------------
|
||||
* |
||||
* message.c |
||||
* Generic logical messages. |
||||
* |
||||
* Copyright (c) 2013-2016, PostgreSQL Global Development Group |
||||
* |
||||
* IDENTIFICATION |
||||
* src/backend/replication/logical/message.c |
||||
* |
||||
* NOTES |
||||
* |
||||
* Generic logical messages allow XLOG logging of arbitrary binary blobs that |
||||
* get passed to the logical decoding plugin. In normal XLOG processing they |
||||
* are same as NOOP. |
||||
* |
||||
* These messages can be either transactional or non-transactional. |
||||
* Transactional messages are part of current transaction and will be sent to |
||||
* decoding plugin using in a same way as DML operations. |
||||
* Non-transactional messages are sent to the plugin at the time when the |
||||
* logical decoding reads them from XLOG. This also means that transactional |
||||
* messages won't be delivered if the transaction was rolled back but the |
||||
* non-transactional one will be delivered always. |
||||
* |
||||
* Every message carries prefix to avoid conflicts between different decoding |
||||
* plugins. The plugin authors must take extra care to use unique prefix, |
||||
* good options seems to be for example to use the name of the extension. |
||||
* |
||||
* --------------------------------------------------------------------------- |
||||
*/ |
||||
|
||||
#include "postgres.h" |
||||
|
||||
#include "access/xact.h" |
||||
|
||||
#include "catalog/indexing.h" |
||||
|
||||
#include "nodes/execnodes.h" |
||||
|
||||
#include "replication/message.h" |
||||
#include "replication/logical.h" |
||||
|
||||
#include "utils/memutils.h" |
||||
|
||||
/*
|
||||
* Write logical decoding message into XLog. |
||||
*/ |
||||
XLogRecPtr |
||||
LogLogicalMessage(const char *prefix, const char *message, size_t size, |
||||
bool transactional) |
||||
{ |
||||
xl_logical_message xlrec; |
||||
|
||||
/*
|
||||
* Force xid to be allocated if we're emitting a transactional message. |
||||
*/ |
||||
if (transactional) |
||||
{ |
||||
Assert(IsTransactionState()); |
||||
GetCurrentTransactionId(); |
||||
} |
||||
|
||||
xlrec.transactional = transactional; |
||||
xlrec.prefix_size = strlen(prefix) + 1; |
||||
xlrec.message_size = size; |
||||
|
||||
XLogBeginInsert(); |
||||
XLogRegisterData((char *) &xlrec, SizeOfLogicalMessage); |
||||
XLogRegisterData((char *) prefix, xlrec.prefix_size); |
||||
XLogRegisterData((char *) message, size); |
||||
|
||||
return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE); |
||||
} |
||||
|
||||
/*
|
||||
* Redo is basically just noop for logical decoding messages. |
||||
*/ |
||||
void |
||||
logicalmsg_redo(XLogReaderState *record) |
||||
{ |
||||
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; |
||||
|
||||
if (info != XLOG_LOGICAL_MESSAGE) |
||||
elog(PANIC, "logicalmsg_redo: unknown op code %u", info); |
||||
|
||||
/* This is only interesting for logical decoding, see decode.c. */ |
||||
} |
@ -1,23 +1,4 @@ |
||||
/pg_xlogdump |
||||
# Source files copied from src/backend/access/rmgrdesc/ |
||||
/brindesc.c |
||||
/clogdesc.c |
||||
/committsdesc.c |
||||
/dbasedesc.c |
||||
/genericdesc.c |
||||
/gindesc.c |
||||
/gistdesc.c |
||||
/hashdesc.c |
||||
/heapdesc.c |
||||
/mxactdesc.c |
||||
/nbtdesc.c |
||||
/relmapdesc.c |
||||
/replorigindesc.c |
||||
/seqdesc.c |
||||
/smgrdesc.c |
||||
/spgdesc.c |
||||
/standbydesc.c |
||||
/tblspcdesc.c |
||||
/xactdesc.c |
||||
/xlogdesc.c |
||||
/*desc.c |
||||
/xlogreader.c |
||||
|
@ -0,0 +1,41 @@ |
||||
/*-------------------------------------------------------------------------
|
||||
* message.h |
||||
* Exports from replication/logical/message.c |
||||
* |
||||
* Copyright (c) 2013-2016, PostgreSQL Global Development Group |
||||
* |
||||
* src/include/replication/message.h |
||||
*------------------------------------------------------------------------- |
||||
*/ |
||||
#ifndef PG_LOGICAL_MESSAGE_H |
||||
#define PG_LOGICAL_MESSAGE_H |
||||
|
||||
#include "access/xlog.h" |
||||
#include "access/xlogdefs.h" |
||||
#include "access/xlogreader.h" |
||||
|
||||
/*
|
||||
* Generic logical decoding message wal record. |
||||
*/ |
||||
typedef struct xl_logical_message |
||||
{ |
||||
bool transactional; /* is message transactional? */ |
||||
Size prefix_size; /* length of prefix */ |
||||
Size message_size; /* size of the message */ |
||||
char message[FLEXIBLE_ARRAY_MEMBER]; /* message including the null
|
||||
* terminated prefix of length |
||||
* prefix_size */ |
||||
} xl_logical_message; |
||||
|
||||
#define SizeOfLogicalMessage (offsetof(xl_logical_message, message)) |
||||
|
||||
extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message, |
||||
size_t size, bool transactional); |
||||
|
||||
/* RMGR API*/ |
||||
#define XLOG_LOGICAL_MESSAGE 0x00 |
||||
void logicalmsg_redo(XLogReaderState *record); |
||||
void logicalmsg_desc(StringInfo buf, XLogReaderState *record); |
||||
const char *logicalmsg_identify(uint8 info); |
||||
|
||||
#endif /* PG_LOGICAL_MESSAGE_H */ |
Loading…
Reference in new issue