mirror of https://github.com/postgres/postgres
The new facility makes it easier to optimize bulk loading, as the logic for buffering, WAL-logging, and syncing the relation only needs to be implemented once. It's also less error-prone: We have had a number of bugs in how a relation is fsync'd - or not - at the end of a bulk loading operation. By centralizing that logic to one place, we only need to write it correctly once. The new facility is faster for small relations: Instead of of calling smgrimmedsync(), we register the fsync to happen at next checkpoint, which avoids the fsync latency. That can make a big difference if you are e.g. restoring a schema-only dump with lots of relations. It is also slightly more efficient with large relations, as the WAL logging is performed multiple pages at a time. That avoids some WAL header overhead. The sorted GiST index build did that already, this moves the buffering to the new facility. The changes to pageinspect GiST test needs an explanation: Before this patch, the sorted GiST index build set the LSN on every page to the special GistBuildLSN value, not the LSN of the WAL record, even though they were WAL-logged. There was no particular need for it, it just happened naturally when we wrote out the pages before WAL-logging them. Now we WAL-log the pages first, like in B-tree build, so the pages are stamped with the record's real LSN. When the build is not WAL-logged, we still use GistBuildLSN. To make the test output predictable, use an unlogged index. Reviewed-by: Andres Freund Discussion: https://www.postgresql.org/message-id/30e8f366-58b3-b239-c521-422122dd5150%40iki.fipull/157/head
parent
e612384fc7
commit
8af2565248
@ -0,0 +1,298 @@ |
||||
/*-------------------------------------------------------------------------
|
||||
* |
||||
* bulk_write.c |
||||
* Efficiently and reliably populate a new relation |
||||
* |
||||
* The assumption is that no other backends access the relation while we are |
||||
* loading it, so we can take some shortcuts. Do not mix operations through |
||||
* the regular buffer manager and the bulk loading interface! |
||||
* |
||||
* We bypass the buffer manager to avoid the locking overhead, and call |
||||
* smgrextend() directly. A downside is that the pages will need to be |
||||
* re-read into shared buffers on first use after the build finishes. That's |
||||
* usually a good tradeoff for large relations, and for small relations, the |
||||
* overhead isn't very significant compared to creating the relation in the |
||||
* first place. |
||||
* |
||||
* The pages are WAL-logged if needed. To save on WAL header overhead, we |
||||
* WAL-log several pages in one record. |
||||
* |
||||
* One tricky point is that because we bypass the buffer manager, we need to |
||||
* register the relation for fsyncing at the next checkpoint ourselves, and |
||||
* make sure that the relation is correctly fsync'd by us or the checkpointer |
||||
* even if a checkpoint happens concurrently. |
||||
* |
||||
* |
||||
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group |
||||
* Portions Copyright (c) 1994, Regents of the University of California |
||||
* |
||||
* |
||||
* IDENTIFICATION |
||||
* src/backend/storage/smgr/bulk_write.c |
||||
* |
||||
*------------------------------------------------------------------------- |
||||
*/ |
||||
#include "postgres.h" |
||||
|
||||
#include "access/xloginsert.h" |
||||
#include "access/xlogrecord.h" |
||||
#include "storage/bufmgr.h" |
||||
#include "storage/bufpage.h" |
||||
#include "storage/bulk_write.h" |
||||
#include "storage/proc.h" |
||||
#include "storage/smgr.h" |
||||
#include "utils/rel.h" |
||||
|
||||
#define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID |
||||
|
||||
static const PGIOAlignedBlock zero_buffer = {{0}}; /* worth BLCKSZ */ |
||||
|
||||
typedef struct PendingWrite |
||||
{ |
||||
BulkWriteBuffer buf; |
||||
BlockNumber blkno; |
||||
bool page_std; |
||||
} PendingWrite; |
||||
|
||||
/*
|
||||
* Bulk writer state for one relation fork. |
||||
*/ |
||||
typedef struct BulkWriteState |
||||
{ |
||||
/* Information about the target relation we're writing */ |
||||
SMgrRelation smgr; |
||||
ForkNumber forknum; |
||||
bool use_wal; |
||||
|
||||
/* We keep several writes queued, and WAL-log them in batches */ |
||||
int npending; |
||||
PendingWrite pending_writes[MAX_PENDING_WRITES]; |
||||
|
||||
/* Current size of the relation */ |
||||
BlockNumber pages_written; |
||||
|
||||
/* The RedoRecPtr at the time that the bulk operation started */ |
||||
XLogRecPtr start_RedoRecPtr; |
||||
|
||||
MemoryContext memcxt; |
||||
} BulkWriteState; |
||||
|
||||
static void smgr_bulk_flush(BulkWriteState *bulkstate); |
||||
|
||||
/*
|
||||
* Start a bulk write operation on a relation fork. |
||||
*/ |
||||
BulkWriteState * |
||||
smgr_bulk_start_rel(Relation rel, ForkNumber forknum) |
||||
{ |
||||
return smgr_bulk_start_smgr(RelationGetSmgr(rel), |
||||
forknum, |
||||
RelationNeedsWAL(rel) || forknum == INIT_FORKNUM); |
||||
} |
||||
|
||||
/*
|
||||
* Start a bulk write operation on a relation fork. |
||||
* |
||||
* This is like smgr_bulk_start_rel, but can be used without a relcache entry. |
||||
*/ |
||||
BulkWriteState * |
||||
smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal) |
||||
{ |
||||
BulkWriteState *state; |
||||
|
||||
state = palloc(sizeof(BulkWriteState)); |
||||
state->smgr = smgr; |
||||
state->forknum = forknum; |
||||
state->use_wal = use_wal; |
||||
|
||||
state->npending = 0; |
||||
state->pages_written = 0; |
||||
|
||||
state->start_RedoRecPtr = GetRedoRecPtr(); |
||||
|
||||
/*
|
||||
* Remember the memory context. We will use it to allocate all the |
||||
* buffers later. |
||||
*/ |
||||
state->memcxt = CurrentMemoryContext; |
||||
|
||||
return state; |
||||
} |
||||
|
||||
/*
|
||||
* Finish bulk write operation. |
||||
* |
||||
* This WAL-logs and flushes any remaining pending writes to disk, and fsyncs |
||||
* the relation if needed. |
||||
*/ |
||||
void |
||||
smgr_bulk_finish(BulkWriteState *bulkstate) |
||||
{ |
||||
/* WAL-log and flush any remaining pages */ |
||||
smgr_bulk_flush(bulkstate); |
||||
|
||||
/*
|
||||
* When we wrote out the pages, we passed skipFsync=true to avoid the |
||||
* overhead of registering all the writes with the checkpointer. Register |
||||
* the whole relation now. |
||||
* |
||||
* There is one hole in that idea: If a checkpoint occurred while we were |
||||
* writing the pages, it already missed fsyncing the pages we had written |
||||
* before the checkpoint started. A crash later on would replay the WAL |
||||
* starting from the checkpoint, therefore it wouldn't replay our earlier |
||||
* WAL records. So if a checkpoint started after the bulk write, fsync |
||||
* the files now. |
||||
*/ |
||||
if (!SmgrIsTemp(bulkstate->smgr)) |
||||
{ |
||||
/*
|
||||
* Prevent a checkpoint from starting between the GetRedoRecPtr() and |
||||
* smgrregistersync() calls. |
||||
*/ |
||||
Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0); |
||||
MyProc->delayChkptFlags |= DELAY_CHKPT_START; |
||||
|
||||
if (bulkstate->start_RedoRecPtr != GetRedoRecPtr()) |
||||
{ |
||||
/*
|
||||
* A checkpoint occurred and it didn't know about our writes, so |
||||
* fsync() the relation ourselves. |
||||
*/ |
||||
MyProc->delayChkptFlags &= ~DELAY_CHKPT_START; |
||||
smgrimmedsync(bulkstate->smgr, bulkstate->forknum); |
||||
elog(DEBUG1, "flushed relation because a checkpoint occurred concurrently"); |
||||
} |
||||
else |
||||
{ |
||||
smgrregistersync(bulkstate->smgr, bulkstate->forknum); |
||||
MyProc->delayChkptFlags &= ~DELAY_CHKPT_START; |
||||
} |
||||
} |
||||
} |
||||
|
||||
static int |
||||
buffer_cmp(const void *a, const void *b) |
||||
{ |
||||
const PendingWrite *bufa = (const PendingWrite *) a; |
||||
const PendingWrite *bufb = (const PendingWrite *) b; |
||||
|
||||
/* We should not see duplicated writes for the same block */ |
||||
Assert(bufa->blkno != bufb->blkno); |
||||
if (bufa->blkno > bufb->blkno) |
||||
return 1; |
||||
else |
||||
return -1; |
||||
} |
||||
|
||||
/*
|
||||
* Finish all the pending writes. |
||||
*/ |
||||
static void |
||||
smgr_bulk_flush(BulkWriteState *bulkstate) |
||||
{ |
||||
int npending = bulkstate->npending; |
||||
PendingWrite *pending_writes = bulkstate->pending_writes; |
||||
|
||||
if (npending == 0) |
||||
return; |
||||
|
||||
if (npending > 1) |
||||
qsort(pending_writes, npending, sizeof(PendingWrite), buffer_cmp); |
||||
|
||||
if (bulkstate->use_wal) |
||||
{ |
||||
BlockNumber blknos[MAX_PENDING_WRITES]; |
||||
Page pages[MAX_PENDING_WRITES]; |
||||
bool page_std = true; |
||||
|
||||
for (int i = 0; i < npending; i++) |
||||
{ |
||||
blknos[i] = pending_writes[i].blkno; |
||||
pages[i] = pending_writes[i].buf->data; |
||||
|
||||
/*
|
||||
* If any of the pages use !page_std, we log them all as such. |
||||
* That's a bit wasteful, but in practice, a mix of standard and |
||||
* non-standard page layout is rare. None of the built-in AMs do |
||||
* that. |
||||
*/ |
||||
if (!pending_writes[i].page_std) |
||||
page_std = false; |
||||
} |
||||
log_newpages(&bulkstate->smgr->smgr_rlocator.locator, bulkstate->forknum, |
||||
npending, blknos, pages, page_std); |
||||
} |
||||
|
||||
for (int i = 0; i < npending; i++) |
||||
{ |
||||
BlockNumber blkno = pending_writes[i].blkno; |
||||
Page page = pending_writes[i].buf->data; |
||||
|
||||
PageSetChecksumInplace(page, blkno); |
||||
|
||||
if (blkno >= bulkstate->pages_written) |
||||
{ |
||||
/*
|
||||
* If we have to write pages nonsequentially, fill in the space |
||||
* with zeroes until we come back and overwrite. This is not |
||||
* logically necessary on standard Unix filesystems (unwritten |
||||
* space will read as zeroes anyway), but it should help to avoid |
||||
* fragmentation. The dummy pages aren't WAL-logged though. |
||||
*/ |
||||
while (blkno > bulkstate->pages_written) |
||||
{ |
||||
/* don't set checksum for all-zero page */ |
||||
smgrextend(bulkstate->smgr, bulkstate->forknum, |
||||
bulkstate->pages_written++, |
||||
&zero_buffer, |
||||
true); |
||||
} |
||||
|
||||
smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true); |
||||
bulkstate->pages_written = pending_writes[i].blkno + 1; |
||||
} |
||||
else |
||||
smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true); |
||||
pfree(page); |
||||
} |
||||
|
||||
bulkstate->npending = 0; |
||||
} |
||||
|
||||
/*
|
||||
* Queue write of 'buf'. |
||||
* |
||||
* NB: this takes ownership of 'buf'! |
||||
* |
||||
* You are only allowed to write a given block once as part of one bulk write |
||||
* operation. |
||||
*/ |
||||
void |
||||
smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std) |
||||
{ |
||||
PendingWrite *w; |
||||
|
||||
w = &bulkstate->pending_writes[bulkstate->npending++]; |
||||
w->buf = buf; |
||||
w->blkno = blocknum; |
||||
w->page_std = page_std; |
||||
|
||||
if (bulkstate->npending == MAX_PENDING_WRITES) |
||||
smgr_bulk_flush(bulkstate); |
||||
} |
||||
|
||||
/*
|
||||
* Allocate a new buffer which can later be written with smgr_bulk_write(). |
||||
* |
||||
* There is no function to free the buffer. When you pass it to |
||||
* smgr_bulk_write(), it takes ownership and frees it when it's no longer |
||||
* needed. |
||||
* |
||||
* This is currently implemented as a simple palloc, but could be implemented |
||||
* using a ring buffer or larger chunks in the future, so don't rely on it. |
||||
*/ |
||||
BulkWriteBuffer |
||||
smgr_bulk_get_buf(BulkWriteState *bulkstate) |
||||
{ |
||||
return MemoryContextAllocAligned(bulkstate->memcxt, BLCKSZ, PG_IO_ALIGN_SIZE, 0); |
||||
} |
@ -1,6 +1,7 @@ |
||||
# Copyright (c) 2022-2024, PostgreSQL Global Development Group |
||||
|
||||
backend_sources += files( |
||||
'bulk_write.c', |
||||
'md.c', |
||||
'smgr.c', |
||||
) |
||||
|
@ -0,0 +1,40 @@ |
||||
/*-------------------------------------------------------------------------
|
||||
* |
||||
* bulk_write.h |
||||
* Efficiently and reliably populate a new relation |
||||
* |
||||
* |
||||
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group |
||||
* Portions Copyright (c) 1994, Regents of the University of California |
||||
* |
||||
* src/include/storage/bulk_write.h |
||||
* |
||||
*------------------------------------------------------------------------- |
||||
*/ |
||||
#ifndef BULK_WRITE_H |
||||
#define BULK_WRITE_H |
||||
|
||||
#include "storage/smgr.h" |
||||
#include "utils/rel.h" |
||||
|
||||
typedef struct BulkWriteState BulkWriteState; |
||||
|
||||
/*
|
||||
* Temporary buffer to hold a page to until it's written out. Use |
||||
* smgr_bulk_get_buf() to reserve one of these. This is a separate typedef to |
||||
* distinguish it from other block-sized buffers passed around in the system. |
||||
*/ |
||||
typedef PGIOAlignedBlock *BulkWriteBuffer; |
||||
|
||||
/* forward declared from smgr.h */ |
||||
struct SMgrRelationData; |
||||
|
||||
extern BulkWriteState *smgr_bulk_start_rel(Relation rel, ForkNumber forknum); |
||||
extern BulkWriteState *smgr_bulk_start_smgr(struct SMgrRelationData *smgr, ForkNumber forknum, bool use_wal); |
||||
|
||||
extern BulkWriteBuffer smgr_bulk_get_buf(BulkWriteState *bulkstate); |
||||
extern void smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std); |
||||
|
||||
extern void smgr_bulk_finish(BulkWriteState *bulkstate); |
||||
|
||||
#endif /* BULK_WRITE_H */ |
Loading…
Reference in new issue