diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 5684901fb7a..0de795d7e73 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -2244,6 +2244,117 @@ asyncQueueAdvanceTail(void) LWLockRelease(NotifyQueueTailLock); } +/* + * AsyncNotifyFreezeXids + * + * Prepare the async notification queue for CLOG truncation by freezing + * transaction IDs that are about to become inaccessible. + * + * This function is called by VACUUM before advancing datfrozenxid. It scans + * the notification queue and replaces XIDs that would become inaccessible + * after CLOG truncation with special markers: + * - Committed transactions are set to FrozenTransactionId + * - Aborted/crashed transactions are set to InvalidTransactionId + * + * Only XIDs < newFrozenXid are processed, as those are the ones whose CLOG + * pages will be truncated. If XID < newFrozenXid, it cannot still be running + * (or it would have held back newFrozenXid through ProcArray). + * Therefore, if TransactionIdDidCommit returns false, we know the transaction + * either aborted explicitly or crashed, and we can safely mark it invalid. + */ +void +AsyncNotifyFreezeXids(TransactionId newFrozenXid) +{ + QueuePosition pos; + QueuePosition head; + int64 curpage = -1; + int slotno = -1; + char *page_buffer = NULL; + bool page_dirty = false; + + /* + * Acquire locks in the correct order to avoid deadlocks. As per the + * locking protocol: NotifyQueueTailLock, then NotifyQueueLock, then + * NotifySLRULock. + * + * We only need SHARED mode since we're just reading the head/tail + * positions, not modifying them. + */ + LWLockAcquire(NotifyQueueTailLock, LW_SHARED); + LWLockAcquire(NotifyQueueLock, LW_SHARED); + + pos = QUEUE_TAIL; + head = QUEUE_HEAD; + + /* Release NotifyQueueLock early, we only needed to read the positions */ + LWLockRelease(NotifyQueueLock); + + /* + * Scan the queue from tail to head, freezing XIDs as needed. We hold + * NotifyQueueTailLock throughout to ensure the tail doesn't move while + * we're working. + */ + while (!QUEUE_POS_EQUAL(pos, head)) + { + AsyncQueueEntry *qe; + TransactionId xid; + int64 pageno = QUEUE_POS_PAGE(pos); + int offset = QUEUE_POS_OFFSET(pos); + + /* If we need a different page, release old lock and get new one */ + if (pageno != curpage) + { + /* Release previous page if any */ + if (slotno >= 0) + { + if (page_dirty) + { + NotifyCtl->shared->page_dirty[slotno] = true; + page_dirty = false; + } + LWLockRelease(NotifySLRULock); + } + + LWLockAcquire(NotifySLRULock, LW_EXCLUSIVE); + slotno = SimpleLruReadPage(NotifyCtl, pageno, true, + InvalidTransactionId); + page_buffer = NotifyCtl->shared->page_buffer[slotno]; + curpage = pageno; + } + + qe = (AsyncQueueEntry *) (page_buffer + offset); + xid = qe->xid; + + if (TransactionIdIsNormal(xid) && + TransactionIdPrecedes(xid, newFrozenXid)) + { + if (TransactionIdDidCommit(xid)) + { + qe->xid = FrozenTransactionId; + page_dirty = true; + } + else + { + qe->xid = InvalidTransactionId; + page_dirty = true; + } + } + + /* Advance to next entry */ + asyncQueueAdvance(&pos, qe->length); + } + + /* Release final page lock if we acquired one */ + if (slotno >= 0) + { + if (page_dirty) + NotifyCtl->shared->page_dirty[slotno] = true; + LWLockRelease(NotifySLRULock); + } + + LWLockRelease(NotifyQueueTailLock); +} + /* * ProcessIncomingNotify * diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 1c9b3d1a8f7..6322d2c53b8 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -35,6 +35,7 @@ #include "catalog/pg_database.h" #include "catalog/pg_inherits.h" #include "catalog/pg_namespace.h" +#include "commands/async.h" #include "commands/cluster.h" #include "commands/defrem.h" #include "commands/vacuum.h" @@ -1788,6 +1789,12 @@ vac_truncate_clog(TransactionId frozenXID, return; } + /* + * Freeze any old transaction IDs in the async notification queue before + * CLOG truncation. + */ + AsyncNotifyFreezeXids(frozenXID); + /* * Advance the oldest value for commit timestamps before truncating, so * that if a user requests a timestamp for a transaction we're truncating diff --git a/src/include/commands/async.h b/src/include/commands/async.h index 85bfb247682..e80fe3bad19 100644 --- a/src/include/commands/async.h +++ b/src/include/commands/async.h @@ -51,4 +51,7 @@ extern void HandleNotifyInterrupt(void); /* process interrupts */ extern void ProcessNotifyInterrupt(bool flush); +/* freeze old transaction IDs in notify queue (called by VACUUM) */ +extern void AsyncNotifyFreezeXids(TransactionId newFrozenXid); + #endif /* ASYNC_H */