diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index b6353c7a125..5ebf23e963d 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -458,6 +458,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; LogicalDecodeMessageCB message_cb; + LogicalDecodeSequenceCB sequence_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; LogicalDecodeFilterPrepareCB filter_prepare_cb; @@ -472,6 +473,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeStreamCommitCB stream_commit_cb; LogicalDecodeStreamChangeCB stream_change_cb; LogicalDecodeStreamMessageCB stream_message_cb; + LogicalDecodeStreamSequenceCB stream_sequence_cb; LogicalDecodeStreamTruncateCB stream_truncate_cb; } OutputPluginCallbacks; @@ -481,9 +483,11 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb); and commit_cb callbacks are required, while startup_cb, filter_by_origin_cb, truncate_cb, - and shutdown_cb are optional. - If truncate_cb is not set but a + sequence_cb, and shutdown_cb are + optional. If truncate_cb is not set but a TRUNCATE is to be decoded, the action will be ignored. + Similarly, if sequence_cb is not set and a sequence + change is to be decoded, the action will be ignored. @@ -492,7 +496,8 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb); stream_stop_cb, stream_abort_cb, stream_commit_cb, stream_change_cb, and stream_prepare_cb - are required, while stream_message_cb and + are required, while stream_message_cb, + stream_sequence_cb, and stream_truncate_cb are optional. @@ -808,6 +813,35 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, + + Sequence Callback + + + The optional sequence_cb callback is called for + actions that update a sequence value. + +typedef void (*LogicalDecodeSequenceCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, + Relation rel, + bool transactional, + int64 last_value, + int64 log_cnt, + bool is_called); + + The txn parameter contains meta information about + the transaction the sequence change is part of. Note however that for + non-transactional increments, the transaction may be either NULL or not + NULL, depending on if the transaction already has XID assigned. + The sequence_lsn has WAL location of the sequence + update. The transactional says if the sequence has + to be replayed as part of the transaction or directly. + + The last_value, log_cnt and + is_called parameters describe the sequence change. + + + Prepare Filter Callback @@ -1017,6 +1051,26 @@ typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx + + Stream Sequence Callback + + The optional stream_sequence_cb callback is called + for actions that change a sequence in a block of streamed changes + (demarcated by stream_start_cb and + stream_stop_cb calls). + +typedef void (*LogicalDecodeStreamSequenceCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, + Relation rel, + bool transactional, + int64 last_value, + int64 log_cnt, + bool is_called); + + + + Stream Truncate Callback @@ -1197,8 +1251,9 @@ OutputPluginWrite(ctx, true); in-progress transactions. There are multiple required streaming callbacks (stream_start_cb, stream_stop_cb, stream_abort_cb, stream_commit_cb - and stream_change_cb) and two optional callbacks - (stream_message_cb and stream_truncate_cb). + and stream_change_cb) and multiple optional callbacks + (stream_message_cb, stream_sequence_cb, + and stream_truncate_cb). Also, if streaming of two-phase commands is to be supported, then additional callbacks must be provided. (See for details). diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index 27cb6307581..ab592ce2f15 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -378,8 +378,13 @@ fill_seq_with_data(Relation rel, HeapTuple tuple) /* check the comment above nextval_internal()'s equivalent call. */ if (RelationNeedsWAL(rel)) + { GetTopTransactionId(); + if (XLogLogicalInfoActive()) + GetCurrentTransactionId(); + } + START_CRIT_SECTION(); MarkBufferDirty(buf); @@ -399,6 +404,7 @@ fill_seq_with_data(Relation rel, HeapTuple tuple) XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT); xlrec.node = rel->rd_node; + xlrec.created = true; XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec)); XLogRegisterData((char *) tuple->t_data, tuple->t_len); @@ -764,10 +770,28 @@ nextval_internal(Oid relid, bool check_permissions) * It's sufficient to ensure the toplevel transaction has an xid, no need * to assign xids subxacts, that'll already trigger an appropriate wait. * (Have to do that here, so we're outside the critical section) + * + * We have to ensure we have a proper XID, which will be included in + * the XLOG record by XLogRecordAssemble. Otherwise the first nextval() + * in a subxact (without any preceding changes) would get XID 0, and it + * would then be impossible to decide which top xact it belongs to. + * It'd also trigger assert in DecodeSequence. We only do that with + * wal_level=logical, though. + * + * XXX This might seem unnecessary, because if there's no XID the xact + * couldn't have done anything important yet, e.g. it could not have + * created a sequence. But that's incorrect, because of subxacts. The + * current subtransaction might not have done anything yet (thus no XID), + * but an earlier one might have created the sequence. */ if (logit && RelationNeedsWAL(seqrel)) + { GetTopTransactionId(); + if (XLogLogicalInfoActive()) + GetCurrentTransactionId(); + } + /* ready to change the on-disk (or really, in-buffer) tuple */ START_CRIT_SECTION(); @@ -803,6 +827,7 @@ nextval_internal(Oid relid, bool check_permissions) seq->log_cnt = 0; xlrec.node = seqrel->rd_node; + xlrec.created = false; XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec)); XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len); @@ -977,8 +1002,13 @@ do_setval(Oid relid, int64 next, bool iscalled) /* check the comment above nextval_internal()'s equivalent call. */ if (RelationNeedsWAL(seqrel)) + { GetTopTransactionId(); + if (XLogLogicalInfoActive()) + GetCurrentTransactionId(); + } + /* ready to change the on-disk (or really, in-buffer) tuple */ START_CRIT_SECTION(); @@ -999,6 +1029,8 @@ do_setval(Oid relid, int64 next, bool iscalled) XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT); xlrec.node = seqrel->rd_node; + xlrec.created = false; + XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec)); XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len); diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 3fb5a92a1a1..18cf9318221 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -42,6 +42,7 @@ #include "replication/reorderbuffer.h" #include "replication/snapbuild.h" #include "storage/standby.h" +#include "commands/sequence.h" /* individual record(group)'s handlers */ static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); @@ -63,6 +64,7 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, /* common function to decode tuples */ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); +static void DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple); /* helper functions for decoding transactions */ static inline bool FilterPrepare(LogicalDecodingContext *ctx, @@ -1250,3 +1252,130 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) || ctx->fast_forward || FilterByOrigin(ctx, origin_id)); } + +/* + * DecodeSeqTuple + * decode tuple describing the sequence increment + * + * Sequences are represented as a table with a single row, which gets updated + * by nextval(). The tuple is stored in WAL right after the xl_seq_rec, so we + * simply copy it into the tuplebuf (similar to seq_redo). + */ +static void +DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple) +{ + int datalen = len - sizeof(xl_seq_rec) - SizeofHeapTupleHeader; + + Assert(datalen >= 0); + + tuple->tuple.t_len = datalen + SizeofHeapTupleHeader; + + ItemPointerSetInvalid(&tuple->tuple.t_self); + + tuple->tuple.t_tableOid = InvalidOid; + + memcpy(((char *) tuple->tuple.t_data), + data + sizeof(xl_seq_rec), + SizeofHeapTupleHeader); + + memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader, + data + sizeof(xl_seq_rec) + SizeofHeapTupleHeader, + datalen); +} + +/* + * Handle sequence decode + * + * Decoding sequences is a bit tricky, because while most sequence actions + * are non-transactional (not subject to rollback), some need to be handled + * as transactional. + * + * By default, a sequence increment is non-transactional - we must not queue + * it in a transaction as other changes, because the transaction might get + * rolled back and we'd discard the increment. The downstream would not be + * notified about the increment, which is wrong. + * + * On the other hand, the sequence may be created in a transaction. In this + * case we *should* queue the change as other changes in the transaction, + * because we don't want to send the increments for unknown sequence to the + * plugin - it might get confused about which sequence it's related to etc. + */ +void +sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + SnapBuild *builder = ctx->snapshot_builder; + ReorderBufferTupleBuf *tuplebuf; + RelFileNode target_node; + XLogReaderState *r = buf->record; + char *tupledata = NULL; + Size tuplelen; + Size datalen = 0; + TransactionId xid = XLogRecGetXid(r); + uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK; + xl_seq_rec *xlrec; + Snapshot snapshot; + RepOriginId origin_id = XLogRecGetOrigin(r); + bool transactional; + + /* only decode changes flagged with XLOG_SEQ_LOG */ + if (info != XLOG_SEQ_LOG) + elog(ERROR, "unexpected RM_SEQ_ID record type: %u", info); + + /* + * If we don't have snapshot or we are just fast-forwarding, there is no + * point in decoding messages. + */ + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || + ctx->fast_forward) + return; + + /* only interested in our database */ + XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); + if (target_node.dbNode != ctx->slot->data.database) + return; + + /* output plugin doesn't look for this origin, no need to queue */ + if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + return; + + tupledata = XLogRecGetData(r); + datalen = XLogRecGetDataLen(r); + tuplelen = datalen - SizeOfHeapHeader - sizeof(xl_seq_rec); + + /* extract the WAL record, with "created" flag */ + xlrec = (xl_seq_rec *) XLogRecGetData(r); + + /* XXX how could we have sequence change without data? */ + if(!datalen || !tupledata) + return; + + tuplebuf = ReorderBufferGetTupleBuf(ctx->reorder, tuplelen); + DecodeSeqTuple(tupledata, datalen, tuplebuf); + + /* + * Should we handle the sequence increment as transactional or not? + * + * If the sequence was created in a still-running transaction, treat + * it as transactional and queue the increments. Otherwise it needs + * to be treated as non-transactional, in which case we send it to + * the plugin right away. + */ + transactional = ReorderBufferSequenceIsTransactional(ctx->reorder, + target_node, + xlrec->created); + + /* Skip the change if already processed (per the snapshot). */ + if (transactional && + !SnapBuildProcessChange(builder, xid, buf->origptr)) + return; + else if (!transactional && + (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT || + SnapBuildXactNeedsSkip(builder, buf->origptr))) + return; + + /* Queue the increment (or send immediately if not transactional). */ + snapshot = SnapBuildGetOrBuildSnapshot(builder, xid); + ReorderBufferQueueSequence(ctx->reorder, xid, snapshot, buf->endptr, + origin_id, target_node, transactional, + xlrec->created, tuplebuf); +} diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 9bc3a2d8deb..934aa13f2d3 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -73,6 +73,10 @@ static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message); +static void sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, Relation rel, + bool transactional, + int64 last_value, int64 log_cnt, bool is_called); /* streaming callbacks */ static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, @@ -90,6 +94,10 @@ static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message); +static void stream_sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, Relation rel, + bool transactional, + int64 last_value, int64 log_cnt, bool is_called); static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change); @@ -218,6 +226,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->apply_truncate = truncate_cb_wrapper; ctx->reorder->commit = commit_cb_wrapper; ctx->reorder->message = message_cb_wrapper; + ctx->reorder->sequence = sequence_cb_wrapper; /* * To support streaming, we require start/stop/abort/commit/change @@ -234,6 +243,7 @@ StartupDecodingContext(List *output_plugin_options, (ctx->callbacks.stream_commit_cb != NULL) || (ctx->callbacks.stream_change_cb != NULL) || (ctx->callbacks.stream_message_cb != NULL) || + (ctx->callbacks.stream_sequence_cb != NULL) || (ctx->callbacks.stream_truncate_cb != NULL); /* @@ -251,6 +261,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->stream_commit = stream_commit_cb_wrapper; ctx->reorder->stream_change = stream_change_cb_wrapper; ctx->reorder->stream_message = stream_message_cb_wrapper; + ctx->reorder->stream_sequence = stream_sequence_cb_wrapper; ctx->reorder->stream_truncate = stream_truncate_cb_wrapper; @@ -1203,6 +1214,42 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +static void +sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, Relation rel, bool transactional, + int64 last_value, int64 log_cnt, bool is_called) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + if (ctx->callbacks.sequence_cb == NULL) + return; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "sequence"; + state.report_location = sequence_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; + ctx->write_location = sequence_lsn; + + /* do the actual work: call callback */ + ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel, transactional, + last_value, log_cnt, is_called); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr first_lsn) @@ -1508,6 +1555,47 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +static void +stream_sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, Relation rel, + bool transactional, + int64 last_value, int64 log_cnt, bool is_called) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* this callback is optional */ + if (ctx->callbacks.stream_sequence_cb == NULL) + return; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_sequence"; + state.report_location = sequence_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; + ctx->write_location = sequence_lsn; + + /* do the actual work: call callback */ + ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel, transactional, + last_value, log_cnt, is_called); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, int nrelations, Relation relations[], diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 19b2ba2000c..c2d9be81fae 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -77,6 +77,40 @@ * a bit more memory to the oldest subtransactions, because it's likely * they are the source for the next sequence of changes. * + * When decoding sequences, we differentiate between a sequences created + * in a (running) transaction, and sequences created in other (already + * committed) transactions. Changes for sequences created in the same + * top-level transaction are treated as "transactional" i.e. just like + * any other change from that transaction (and discarded in case of a + * rollback). Changes for sequences created earlier are treated as not + * transactional - are processed immediately, as if performed outside + * any transaction (and thus not rolled back). + * + * This mixed behavior is necessary - sequences are non-transactional + * (e.g. ROLLBACK does not undo the sequence increments). But for new + * sequences, we need to handle them in a transactional way, because if + * we ever get some DDL support, the sequence won't exist until the + * transaction gets applied. So we need to ensure the increments don't + * happen until the sequence gets created. + * + * To differentiate which sequences are "old" and which were created + * in a still-running transaction, we track sequences created in running + * transactions in a hash table. Sequences are identified by relfilenode, + * and we track XID of the (sub)transaction that created it. This means + * that if a transaction does something that changes the relfilenode + * (like an alter / reset of a sequence), the new relfilenode will be + * treated as if created in the transaction. The list of sequences gets + * discarded when the transaction completes (commit/rollback). + * + * We don't use the XID to check if it's the same top-level transaction. + * It's enough to know it was created in an in-progress transaction, + * and we know it must be the current one because otherwise it wouldn't + * see the sequence object. + * + * The XID may be valid even for non-transactional sequences - we simply + * keep the XID logged to WAL, it's up to the reorderbuffer to decide if + * the increment is transactional. + * * ------------------------------------------------------------------------- */ #include "postgres.h" @@ -91,6 +125,7 @@ #include "access/xact.h" #include "access/xlog_internal.h" #include "catalog/catalog.h" +#include "commands/sequence.h" #include "lib/binaryheap.h" #include "miscadmin.h" #include "pgstat.h" @@ -116,6 +151,13 @@ typedef struct ReorderBufferTXNByIdEnt ReorderBufferTXN *txn; } ReorderBufferTXNByIdEnt; +/* entry for hash table we use to track sequences created in running xacts */ +typedef struct ReorderBufferSequenceEnt +{ + RelFileNode rnode; + TransactionId xid; +} ReorderBufferSequenceEnt; + /* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */ typedef struct ReorderBufferTupleCidKey { @@ -339,6 +381,14 @@ ReorderBufferAllocate(void) buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + /* hash table of sequences, mapping relfilenode to XID of transaction */ + hash_ctl.keysize = sizeof(RelFileNode); + hash_ctl.entrysize = sizeof(ReorderBufferSequenceEnt); + hash_ctl.hcxt = buffer->context; + + buffer->sequences = hash_create("ReorderBufferSequenceHash", 1000, &hash_ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + buffer->by_txn_last_xid = InvalidTransactionId; buffer->by_txn_last_txn = NULL; @@ -525,6 +575,13 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change, change->data.truncate.relids = NULL; } break; + case REORDER_BUFFER_CHANGE_SEQUENCE: + if (change->data.sequence.tuple) + { + ReorderBufferReturnTupleBuf(rb, change->data.sequence.tuple); + change->data.sequence.tuple = NULL; + } + break; case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: @@ -859,6 +916,230 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, } } +/* + * Treat the sequence increment as transactional? + * + * The hash table tracks all sequences created in in-progress transactions, + * so we simply do a lookup (the sequence is identified by relfilende). If + * we find a match, the increment should be handled as transactional. + */ +bool +ReorderBufferSequenceIsTransactional(ReorderBuffer *rb, + RelFileNode rnode, bool created) +{ + bool found = false; + + if (created) + return true; + + hash_search(rb->sequences, + (void *) &rnode, + HASH_FIND, + &found); + + return found; +} + +/* + * Cleanup sequences created in in-progress transactions. + * + * There's no way to search by XID, so we simply do a seqscan of all + * the entries in the hash table. Hopefully there are only a couple + * entries in most cases - people generally don't create many new + * sequences over and over. + */ +static void +ReorderBufferSequenceCleanup(ReorderBuffer *rb, TransactionId xid) +{ + HASH_SEQ_STATUS scan_status; + ReorderBufferSequenceEnt *ent; + + hash_seq_init(&scan_status, rb->sequences); + while ((ent = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL) + { + /* skip sequences not from this transaction */ + if (ent->xid != xid) + continue; + + (void) hash_search(rb->sequences, + (void *) &(ent->rnode), + HASH_REMOVE, NULL); + } +} + +/* + * A transactional sequence increment is queued to be processed upon commit + * and a non-transactional increment gets processed immediately. + * + * A sequence update may be both transactional and non-transactional. When + * created in a running transaction, treat it as transactional and queue + * the change in it. Otherwise treat it as non-transactional, so that we + * don't forget the increment in case of a rollback. + */ +void +ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid, + Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id, + RelFileNode rnode, bool transactional, bool created, + ReorderBufferTupleBuf *tuplebuf) +{ + /* + * Change needs to be handled as transactional, because the sequence was + * created in a transaction that is still running. In that case all the + * changes need to be queued in that transaction, we must not send them + * to the downstream until the transaction commits. + * + * There's a bit of a trouble with subtransactions - we can't queue it + * into the subxact, because it might be rolled back and we'd lose the + * increment. We need to queue it into the same (sub)xact that created + * the sequence, which is why we track the XID in the hash table. + */ + if (transactional) + { + MemoryContext oldcontext; + ReorderBufferChange *change; + + /* lookup sequence by relfilenode */ + ReorderBufferSequenceEnt *ent; + bool found; + + /* transactional changes require a transaction */ + Assert(xid != InvalidTransactionId); + + /* search the lookup table (we ignore the return value, found is enough) */ + ent = hash_search(rb->sequences, + (void *) &rnode, + created ? HASH_ENTER : HASH_FIND, + &found); + + /* + * If this is the "create" increment, we must not have found any + * pre-existing entry in the hash table (i.e. there must not be + * any conflicting sequence). + */ + Assert(!(created && found)); + + /* But we must have either created or found an existing entry. */ + Assert(created || found); + + /* + * When creating the sequence, remember the XID of the transaction + * that created id. + */ + if (created) + ent->xid = xid; + + /* XXX Maybe check that we're still in the same top-level xact? */ + + /* OK, allocate and queue the change */ + oldcontext = MemoryContextSwitchTo(rb->context); + + change = ReorderBufferGetChange(rb); + + change->action = REORDER_BUFFER_CHANGE_SEQUENCE; + change->origin_id = origin_id; + + memcpy(&change->data.sequence.relnode, &rnode, sizeof(RelFileNode)); + + change->data.sequence.tuple = tuplebuf; + + /* add it to the same subxact that created the sequence */ + ReorderBufferQueueChange(rb, ent->xid, lsn, change, false); + + MemoryContextSwitchTo(oldcontext); + } + else + { + /* + * This increment is for a sequence that was not created in any + * running transaction, so we treat it as non-transactional and + * just send it to the output plugin directly. + */ + ReorderBufferTXN *txn = NULL; + volatile Snapshot snapshot_now = snapshot; + bool using_subtxn; + +#ifdef USE_ASSERT_CHECKING + /* All "creates" have to be handled as transactional. */ + Assert(!created); + + /* Make sure the sequence is not in the hash table. */ + { + bool found; + hash_search(rb->sequences, + (void *) &rnode, + HASH_FIND, &found); + Assert(!found); + } +#endif + + if (xid != InvalidTransactionId) + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + + /* setup snapshot to allow catalog access */ + SetupHistoricSnapshot(snapshot_now, NULL); + + /* + * Decoding needs access to syscaches et al., which in turn use + * heavyweight locks and such. Thus we need to have enough state around to + * keep track of those. The easiest way is to simply use a transaction + * internally. That also allows us to easily enforce that nothing writes + * to the database by checking for xid assignments. + * + * When we're called via the SQL SRF there's already a transaction + * started, so start an explicit subtransaction there. + */ + using_subtxn = IsTransactionOrTransactionBlock(); + + PG_TRY(); + { + Relation relation; + HeapTuple tuple; + Form_pg_sequence_data seq; + Oid reloid; + + if (using_subtxn) + BeginInternalSubTransaction("sequence"); + else + StartTransactionCommand(); + + reloid = RelidByRelfilenode(rnode.spcNode, rnode.relNode); + + if (reloid == InvalidOid) + elog(ERROR, "could not map filenode \"%s\" to relation OID", + relpathperm(rnode, + MAIN_FORKNUM)); + + relation = RelationIdGetRelation(reloid); + tuple = &tuplebuf->tuple; + seq = (Form_pg_sequence_data) GETSTRUCT(tuple); + + rb->sequence(rb, txn, lsn, relation, transactional, + seq->last_value, seq->log_cnt, seq->is_called); + + RelationClose(relation); + + TeardownHistoricSnapshot(false); + + AbortCurrentTransaction(); + + if (using_subtxn) + RollbackAndReleaseCurrentSubTransaction(); + } + PG_CATCH(); + { + TeardownHistoricSnapshot(true); + + AbortCurrentTransaction(); + + if (using_subtxn) + RollbackAndReleaseCurrentSubTransaction(); + + PG_RE_THROW(); + } + PG_END_TRY(); + } +} + /* * AssertTXNLsnOrder * Verify LSN ordering of transaction lists in the reorderbuffer @@ -1535,6 +1816,9 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) &found); Assert(found); + /* Remove sequences created in this transaction (if any). */ + ReorderBufferSequenceCleanup(rb, txn->xid); + /* remove entries spilled to disk */ if (rbtxn_is_serialized(txn)) ReorderBufferRestoreCleanup(rb, txn); @@ -1950,6 +2234,29 @@ ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, change->data.msg.message); } +/* + * Helper function for ReorderBufferProcessTXN for applying sequences. + */ +static inline void +ReorderBufferApplySequence(ReorderBuffer *rb, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change, + bool streaming) +{ + HeapTuple tuple; + Form_pg_sequence_data seq; + + tuple = &change->data.sequence.tuple->tuple; + seq = (Form_pg_sequence_data) GETSTRUCT(tuple); + + /* Only ever called from ReorderBufferApplySequence, so transational. */ + if (streaming) + rb->stream_sequence(rb, txn, change->lsn, relation, true, + seq->last_value, seq->log_cnt, seq->is_called); + else + rb->sequence(rb, txn, change->lsn, relation, true, + seq->last_value, seq->log_cnt, seq->is_called); +} + /* * Function to store the command id and snapshot at the end of the current * stream so that we can reuse the same while sending the next stream. @@ -2392,6 +2699,31 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: elog(ERROR, "tuplecid value in changequeue"); break; + + case REORDER_BUFFER_CHANGE_SEQUENCE: + Assert(snapshot_now); + + reloid = RelidByRelfilenode(change->data.sequence.relnode.spcNode, + change->data.sequence.relnode.relNode); + + if (reloid == InvalidOid) + elog(ERROR, "could not map filenode \"%s\" to relation OID", + relpathperm(change->data.sequence.relnode, + MAIN_FORKNUM)); + + relation = RelationIdGetRelation(reloid); + + if (!RelationIsValid(relation)) + elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")", + reloid, + relpathperm(change->data.sequence.relnode, + MAIN_FORKNUM)); + + if (RelationIsLogicallyLogged(relation)) + ReorderBufferApplySequence(rb, txn, relation, change, streaming); + + RelationClose(relation); + break; } } @@ -3776,6 +4108,39 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, memcpy(data, change->data.truncate.relids, size); data += size; + break; + } + case REORDER_BUFFER_CHANGE_SEQUENCE: + { + char *data; + ReorderBufferTupleBuf *tup; + Size len = 0; + + tup = change->data.sequence.tuple; + + if (tup) + { + sz += sizeof(HeapTupleData); + len = tup->tuple.t_len; + sz += len; + } + + /* make sure we have enough space */ + ReorderBufferSerializeReserve(rb, sz); + + data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); + /* might have been reallocated above */ + ondisk = (ReorderBufferDiskChange *) rb->outbuf; + + if (len) + { + memcpy(data, &tup->tuple, sizeof(HeapTupleData)); + data += sizeof(HeapTupleData); + + memcpy(data, tup->tuple.t_data, len); + data += len; + } + break; } case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: @@ -4040,6 +4405,22 @@ ReorderBufferChangeSize(ReorderBufferChange *change) { sz += sizeof(Oid) * change->data.truncate.nrelids; + break; + } + case REORDER_BUFFER_CHANGE_SEQUENCE: + { + ReorderBufferTupleBuf *tup; + Size len = 0; + + tup = change->data.sequence.tuple; + + if (tup) + { + sz += sizeof(HeapTupleData); + len = tup->tuple.t_len; + sz += len; + } + break; } case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: @@ -4341,6 +4722,30 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } + + case REORDER_BUFFER_CHANGE_SEQUENCE: + if (change->data.sequence.tuple) + { + uint32 tuplelen = ((HeapTuple) data)->t_len; + + change->data.sequence.tuple = + ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader); + + /* restore ->tuple */ + memcpy(&change->data.sequence.tuple->tuple, data, + sizeof(HeapTupleData)); + data += sizeof(HeapTupleData); + + /* reset t_data pointer into the new tuplebuf */ + change->data.sequence.tuple->tuple.t_data = + ReorderBufferTupleBufData(change->data.sequence.tuple); + + /* restore tuple data itself */ + memcpy(change->data.sequence.tuple->tuple.t_data, data, tuplelen); + data += tuplelen; + } + break; + case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index 9a74721c97c..cf8b6d48193 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -40,7 +40,7 @@ PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, btree_xlog PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL) PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL) PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL) -PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL) +PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, sequence_decode) PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL) PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL) PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL) diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h index f7542f2bccb..9fecc41954e 100644 --- a/src/include/commands/sequence.h +++ b/src/include/commands/sequence.h @@ -48,6 +48,7 @@ typedef FormData_pg_sequence_data *Form_pg_sequence_data; typedef struct xl_seq_rec { RelFileNode node; + bool created; /* creates a new relfilenode (CREATE/ALTER) */ /* SEQUENCE TUPLE DATA FOLLOWS AT THE END */ } xl_seq_rec; diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h index a33c2a718a7..8e07bb7409a 100644 --- a/src/include/replication/decode.h +++ b/src/include/replication/decode.h @@ -27,6 +27,7 @@ extern void heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +extern void sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record); diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 41157fda7cc..a16bebf76ca 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -88,6 +88,18 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, Size message_size, const char *message); +/* + * Called for the generic logical decoding sequences. + */ +typedef void (*LogicalDecodeSequenceCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, + Relation rel, + bool transactional, + int64 last_value, + int64 log_cnt, + bool is_called); + /* * Filter changes by origin. */ @@ -199,6 +211,19 @@ typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx Size message_size, const char *message); +/* + * Called for the streaming generic logical decoding sequences from in-progress + * transactions. + */ +typedef void (*LogicalDecodeStreamSequenceCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, + Relation rel, + bool transactional, + int64 last_value, + int64 log_cnt, + bool is_called); + /* * Callback for streaming truncates from in-progress transactions. */ @@ -219,6 +244,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; LogicalDecodeMessageCB message_cb; + LogicalDecodeSequenceCB sequence_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; @@ -237,6 +263,7 @@ typedef struct OutputPluginCallbacks LogicalDecodeStreamCommitCB stream_commit_cb; LogicalDecodeStreamChangeCB stream_change_cb; LogicalDecodeStreamMessageCB stream_message_cb; + LogicalDecodeStreamSequenceCB stream_sequence_cb; LogicalDecodeStreamTruncateCB stream_truncate_cb; } OutputPluginCallbacks; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index aa0a73382f6..859424bbd9b 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -64,7 +64,8 @@ enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, - REORDER_BUFFER_CHANGE_TRUNCATE + REORDER_BUFFER_CHANGE_TRUNCATE, + REORDER_BUFFER_CHANGE_SEQUENCE }; /* forward declaration */ @@ -158,6 +159,13 @@ typedef struct ReorderBufferChange uint32 ninvalidations; /* Number of messages */ SharedInvalidationMessage *invalidations; /* invalidation message */ } inval; + + /* Context data for Sequence changes */ + struct + { + RelFileNode relnode; + ReorderBufferTupleBuf *tuple; + } sequence; } data; /* @@ -430,6 +438,15 @@ typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb, const char *prefix, Size sz, const char *message); +/* sequence callback signature */ +typedef void (*ReorderBufferSequenceCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, + Relation rel, + bool transactional, + int64 last_value, int64 log_cnt, + bool is_called); + /* begin prepare callback signature */ typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb, ReorderBufferTXN *txn); @@ -496,6 +513,15 @@ typedef void (*ReorderBufferStreamMessageCB) ( const char *prefix, Size sz, const char *message); +/* stream sequence callback signature */ +typedef void (*ReorderBufferStreamSequenceCB) (ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, + Relation rel, + bool transactional, + int64 last_value, int64 log_cnt, + bool is_called); + /* stream truncate callback signature */ typedef void (*ReorderBufferStreamTruncateCB) ( ReorderBuffer *rb, @@ -511,6 +537,12 @@ struct ReorderBuffer */ HTAB *by_txn; + /* + * relfilenode => XID lookup table for sequences created in a transaction + * (also includes altered sequences, which assigns new relfilenode) + */ + HTAB *sequences; + /* * Transactions that could be a toplevel xact, ordered by LSN of the first * record bearing that xid. @@ -541,6 +573,7 @@ struct ReorderBuffer ReorderBufferApplyTruncateCB apply_truncate; ReorderBufferCommitCB commit; ReorderBufferMessageCB message; + ReorderBufferSequenceCB sequence; /* * Callbacks to be called when streaming a transaction at prepare time. @@ -560,6 +593,7 @@ struct ReorderBuffer ReorderBufferStreamCommitCB stream_commit; ReorderBufferStreamChangeCB stream_change; ReorderBufferStreamMessageCB stream_message; + ReorderBufferStreamSequenceCB stream_sequence; ReorderBufferStreamTruncateCB stream_truncate; /* @@ -635,6 +669,10 @@ void ReorderBufferQueueChange(ReorderBuffer *, TransactionId, void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message); +void ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid, + Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id, + RelFileNode rnode, bool transactional, bool created, + ReorderBufferTupleBuf *tuplebuf); void ReorderBufferCommit(ReorderBuffer *, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); @@ -682,4 +720,7 @@ void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr); void StartupReorderBuffer(void); +bool ReorderBufferSequenceIsTransactional(ReorderBuffer *rb, + RelFileNode rnode, bool created); + #endif