You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
postgres/src/backend/access/nbtree/nbtree.c

884 lines
24 KiB

/*-------------------------------------------------------------------------
*
* nbtree.c
* Implementation of Lehman and Yao's btree management algorithm for
* Postgres.
*
* NOTES
* This file contains only the public interface routines.
*
*
* Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.96 2003/02/22 00:45:04 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/genam.h"
#include "access/heapam.h"
26 years ago
#include "access/nbtree.h"
#include "catalog/index.h"
#include "miscadmin.h"
#include "storage/freespace.h"
/* Working state for btbuild and its callback */
typedef struct
{
bool usefast;
bool isUnique;
bool haveDead;
Relation heapRel;
BTSpool *spool;
/*
* spool2 is needed only when the index is an unique index. Dead
* tuples are put into spool2 instead of spool in order to avoid
* uniqueness check.
*/
BTSpool *spool2;
double indtuples;
} BTBuildState;
bool FastBuild = true; /* use SORT instead of insertion build */
/*
* TEMPORARY FLAG FOR TESTING NEW FIX TREE
* CODE WITHOUT AFFECTING ANYONE ELSE
*/
bool FixBTree = true;
25 years ago
static void _bt_restscan(IndexScanDesc scan);
static void btbuildCallback(Relation index,
HeapTuple htup,
Datum *attdata,
char *nulls,
bool tupleIsAlive,
void *state);
/*
* AtEOXact_nbtree() --- clean up nbtree subsystem at xact abort or commit.
*/
void
AtEOXact_nbtree(void)
{
/* nothing to do at the moment */
}
/*
* btbuild() -- build a new btree index.
*
* We use a global variable to record the fact that we're creating
* a new index. This is used to avoid high-concurrency locking,
* since the index won't be visible until this transaction commits
* and since building is guaranteed to be single-threaded.
*/
Datum
btbuild(PG_FUNCTION_ARGS)
{
Relation heap = (Relation) PG_GETARG_POINTER(0);
Relation index = (Relation) PG_GETARG_POINTER(1);
IndexInfo *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
double reltuples;
BTBuildState buildstate;
What looks like some *major* improvements to btree indexing... Patches from: aoki@CS.Berkeley.EDU (Paul M. Aoki) i gave jolly my btree bulkload code a long, long time ago but never gave him a bunch of my bugfixes. here's a diff against the 6.0 baseline. for some reason, this code has slowed down somewhat relative to the insertion-build code on very small tables. don't know why -- it used to be within about 10%. anyway, here are some (highly unscientific!) timings on a dec 3000/300 for synthetic tables with 10k, 100k and 1000k tuples (basically, 1mb, 10mb and 100mb heaps). 'c' means clustered (pre-sorted) inputs and 'u' means unclustered (randomly ordered) inputs. the 10k table basically fits in the buffer pool, but the 100k and 1000k tables don't. as you can see, insertion build is fine if you've sorted your heaps on your index key or if your heap fits in core, but is absolutely horrible on unordered data (yes, that's 7.5 hours to index 100mb of data...) because of the zillions of random i/os. if it doesn't work for you for whatever reason, you can always turn it back off by flipping the FastBuild flag in nbtree.c. i don't have time to maintain it. good luck! baseline code: time psql -c 'create index c10 on k10 using btree (c int4_ops)' bttest real 8.6 time psql -c 'create index u10 on k10 using btree (b int4_ops)' bttest real 9.1 time psql -c 'create index c100 on k100 using btree (c int4_ops)' bttest real 59.2 time psql -c 'create index u100 on k100 using btree (b int4_ops)' bttest real 652.4 time psql -c 'create index c1000 on k1000 using btree (c int4_ops)' bttest real 636.1 time psql -c 'create index u1000 on k1000 using btree (b int4_ops)' bttest real 26772.9 bulkloading code: time psql -c 'create index c10 on k10 using btree (c int4_ops)' bttest real 11.3 time psql -c 'create index u10 on k10 using btree (b int4_ops)' bttest real 10.4 time psql -c 'create index c100 on k100 using btree (c int4_ops)' bttest real 59.5 time psql -c 'create index u100 on k100 using btree (b int4_ops)' bttest real 63.5 time psql -c 'create index c1000 on k1000 using btree (c int4_ops)' bttest real 636.9 time psql -c 'create index u1000 on k1000 using btree (b int4_ops)' bttest real 701.0
29 years ago
/*
* bootstrap processing does something strange, so don't use
* sort/build for initial catalog indices. at some point i need to
* look harder at this. (there is some kind of incremental processing
* going on there.) -- pma 08/29/95
What looks like some *major* improvements to btree indexing... Patches from: aoki@CS.Berkeley.EDU (Paul M. Aoki) i gave jolly my btree bulkload code a long, long time ago but never gave him a bunch of my bugfixes. here's a diff against the 6.0 baseline. for some reason, this code has slowed down somewhat relative to the insertion-build code on very small tables. don't know why -- it used to be within about 10%. anyway, here are some (highly unscientific!) timings on a dec 3000/300 for synthetic tables with 10k, 100k and 1000k tuples (basically, 1mb, 10mb and 100mb heaps). 'c' means clustered (pre-sorted) inputs and 'u' means unclustered (randomly ordered) inputs. the 10k table basically fits in the buffer pool, but the 100k and 1000k tables don't. as you can see, insertion build is fine if you've sorted your heaps on your index key or if your heap fits in core, but is absolutely horrible on unordered data (yes, that's 7.5 hours to index 100mb of data...) because of the zillions of random i/os. if it doesn't work for you for whatever reason, you can always turn it back off by flipping the FastBuild flag in nbtree.c. i don't have time to maintain it. good luck! baseline code: time psql -c 'create index c10 on k10 using btree (c int4_ops)' bttest real 8.6 time psql -c 'create index u10 on k10 using btree (b int4_ops)' bttest real 9.1 time psql -c 'create index c100 on k100 using btree (c int4_ops)' bttest real 59.2 time psql -c 'create index u100 on k100 using btree (b int4_ops)' bttest real 652.4 time psql -c 'create index c1000 on k1000 using btree (c int4_ops)' bttest real 636.1 time psql -c 'create index u1000 on k1000 using btree (b int4_ops)' bttest real 26772.9 bulkloading code: time psql -c 'create index c10 on k10 using btree (c int4_ops)' bttest real 11.3 time psql -c 'create index u10 on k10 using btree (b int4_ops)' bttest real 10.4 time psql -c 'create index c100 on k100 using btree (c int4_ops)' bttest real 59.5 time psql -c 'create index u100 on k100 using btree (b int4_ops)' bttest real 63.5 time psql -c 'create index c1000 on k1000 using btree (c int4_ops)' bttest real 636.9 time psql -c 'create index u1000 on k1000 using btree (b int4_ops)' bttest real 701.0
29 years ago
*/
buildstate.usefast = (FastBuild && IsNormalProcessingMode());
buildstate.isUnique = indexInfo->ii_Unique;
buildstate.haveDead = false;
buildstate.heapRel = heap;
buildstate.spool = NULL;
buildstate.spool2 = NULL;
buildstate.indtuples = 0;
#ifdef BTREE_BUILD_STATS
if (log_btree_build_stats)
ResetUsage();
#endif /* BTREE_BUILD_STATS */
/*
* We expect to be called exactly once for any index relation. If
* that's not the case, big trouble's what we have.
*/
if (RelationGetNumberOfBlocks(index) != 0)
elog(ERROR, "%s already contains data",
RelationGetRelationName(index));
/* initialize the btree index metadata page */
_bt_metapinit(index);
if (buildstate.usefast)
{
buildstate.spool = _bt_spoolinit(index, indexInfo->ii_Unique);
/*
* Different from spool, the uniqueness isn't checked for spool2.
*/
if (indexInfo->ii_Unique)
buildstate.spool2 = _bt_spoolinit(index, false);
}
/* do the heap scan */
reltuples = IndexBuildHeapScan(heap, index, indexInfo,
btbuildCallback, (void *) &buildstate);
/* okay, all heap tuples are indexed */
if (buildstate.spool2 && !buildstate.haveDead)
{
/* spool2 turns out to be unnecessary */
_bt_spooldestroy(buildstate.spool2);
buildstate.spool2 = NULL;
}
/*
* if we are doing bottom-up btree build, finish the build by (1)
* completing the sort of the spool file, (2) inserting the sorted
* tuples into btree pages and (3) building the upper levels.
*/
if (buildstate.usefast)
{
_bt_leafbuild(buildstate.spool, buildstate.spool2);
_bt_spooldestroy(buildstate.spool);
if (buildstate.spool2)
_bt_spooldestroy(buildstate.spool2);
}
#ifdef BTREE_BUILD_STATS
if (log_btree_build_stats)
{
ShowUsage("BTREE BUILD STATS");
ResetUsage();
}
#endif /* BTREE_BUILD_STATS */
/*
* Since we just counted the tuples in the heap, we update its stats
* in pg_class to guarantee that the planner takes advantage of the
* index we just created. But, only update statistics during normal
* index definitions, not for indices on system catalogs created
* during bootstrap processing. We must close the relations before
* updating statistics to guarantee that the relcache entries are
* flushed when we increment the command counter in UpdateStats(). But
* we do not release any locks on the relations; those will be held
* until end of transaction.
*/
if (IsNormalProcessingMode())
{
Oid hrelid = RelationGetRelid(heap);
Oid irelid = RelationGetRelid(index);
heap_close(heap, NoLock);
index_close(index);
UpdateStats(hrelid, reltuples);
UpdateStats(irelid, buildstate.indtuples);
}
PG_RETURN_VOID();
}
/*
* Per-tuple callback from IndexBuildHeapScan
*/
static void
btbuildCallback(Relation index,
HeapTuple htup,
Datum *attdata,
char *nulls,
bool tupleIsAlive,
void *state)
{
BTBuildState *buildstate = (BTBuildState *) state;
IndexTuple itup;
BTItem btitem;
InsertIndexResult res;
/* form an index tuple and point it at the heap tuple */
itup = index_formtuple(RelationGetDescr(index), attdata, nulls);
itup->t_tid = htup->t_self;
btitem = _bt_formitem(itup);
/*
* if we are doing bottom-up btree build, we insert the index into a
* spool file for subsequent processing. otherwise, we insert into
* the btree.
*/
if (buildstate->usefast)
{
if (tupleIsAlive || buildstate->spool2 == NULL)
_bt_spool(btitem, buildstate->spool);
else
{
/* dead tuples are put into spool2 */
buildstate->haveDead = true;
_bt_spool(btitem, buildstate->spool2);
}
}
else
{
res = _bt_doinsert(index, btitem,
buildstate->isUnique, buildstate->heapRel);
if (res)
pfree(res);
}
buildstate->indtuples += 1;
pfree(btitem);
pfree(itup);
}
/*
* btinsert() -- insert an index tuple into a btree.
*
* Descend the tree recursively, find the appropriate location for our
* new tuple, put it there, set its unique OID as appropriate, and
* return an InsertIndexResult to the caller.
*/
Datum
btinsert(PG_FUNCTION_ARGS)
{
Relation rel = (Relation) PG_GETARG_POINTER(0);
Datum *datum = (Datum *) PG_GETARG_POINTER(1);
char *nulls = (char *) PG_GETARG_POINTER(2);
ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
Relation heapRel = (Relation) PG_GETARG_POINTER(4);
bool checkUnique = PG_GETARG_BOOL(5);
InsertIndexResult res;
BTItem btitem;
IndexTuple itup;
/* generate an index tuple */
itup = index_formtuple(RelationGetDescr(rel), datum, nulls);
itup->t_tid = *ht_ctid;
btitem = _bt_formitem(itup);
res = _bt_doinsert(rel, btitem, checkUnique, heapRel);
pfree(btitem);
pfree(itup);
PG_RETURN_POINTER(res);
}
/*
* btgettuple() -- Get the next tuple in the scan.
*/
Datum
btgettuple(PG_FUNCTION_ARGS)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
BTScanOpaque so = (BTScanOpaque) scan->opaque;
Page page;
OffsetNumber offnum;
bool res;
/*
* If we've already initialized this scan, we can just advance it in
* the appropriate direction. If we haven't done so yet, we call a
* routine to get the first item in the scan.
*/
if (ItemPointerIsValid(&(scan->currentItemData)))
{
/*
* Restore scan position using heap TID returned by previous call
* to btgettuple(). _bt_restscan() re-grabs the read lock on the
* buffer, too.
*/
_bt_restscan(scan);
23 years ago
/*
* Check to see if we should kill the previously-fetched tuple.
*/
if (scan->kill_prior_tuple)
{
/*
23 years ago
* Yes, so mark it by setting the LP_DELETE bit in the item
* flags.
*/
offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData));
page = BufferGetPage(so->btso_curbuf);
PageGetItemId(page, offnum)->lp_flags |= LP_DELETE;
23 years ago
/*
* Since this can be redone later if needed, it's treated the
23 years ago
* same as a commit-hint-bit status update for heap tuples: we
* mark the buffer dirty but don't make a WAL log entry.
*/
SetBufferCommitInfoNeedsSave(so->btso_curbuf);
}
23 years ago
/*
* Now continue the scan.
*/
res = _bt_next(scan, dir);
}
else
res = _bt_first(scan, dir);
/*
* Skip killed tuples if asked to.
*/
if (scan->ignore_killed_tuples)
{
while (res)
{
offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData));
page = BufferGetPage(so->btso_curbuf);
if (!ItemIdDeleted(PageGetItemId(page, offnum)))
break;
res = _bt_next(scan, dir);
}
}
/*
* Save heap TID to use it in _bt_restscan. Then release the read
* lock on the buffer so that we aren't blocking other backends.
*
* NOTE: we do keep the pin on the buffer! This is essential to ensure
* that someone else doesn't delete the index entry we are stopped on.
*/
if (res)
{
((BTScanOpaque) scan->opaque)->curHeapIptr = scan->xs_ctup.t_self;
LockBuffer(((BTScanOpaque) scan->opaque)->btso_curbuf,
BUFFER_LOCK_UNLOCK);
}
PG_RETURN_BOOL(res);
}
/*
* btbeginscan() -- start a scan on a btree index
*/
Datum
btbeginscan(PG_FUNCTION_ARGS)
{
Relation rel = (Relation) PG_GETARG_POINTER(0);
int keysz = PG_GETARG_INT32(1);
ScanKey scankey = (ScanKey) PG_GETARG_POINTER(2);
IndexScanDesc scan;
/* get the scan */
scan = RelationGetIndexScan(rel, keysz, scankey);
PG_RETURN_POINTER(scan);
}
/*
* btrescan() -- rescan an index relation
*/
Datum
btrescan(PG_FUNCTION_ARGS)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ScanKey scankey = (ScanKey) PG_GETARG_POINTER(1);
ItemPointer iptr;
BTScanOpaque so;
so = (BTScanOpaque) scan->opaque;
if (so == NULL) /* if called from btbeginscan */
{
so = (BTScanOpaque) palloc(sizeof(BTScanOpaqueData));
so->btso_curbuf = so->btso_mrkbuf = InvalidBuffer;
ItemPointerSetInvalid(&(so->curHeapIptr));
ItemPointerSetInvalid(&(so->mrkHeapIptr));
if (scan->numberOfKeys > 0)
so->keyData = (ScanKey) palloc(scan->numberOfKeys * sizeof(ScanKeyData));
else
so->keyData = (ScanKey) NULL;
scan->opaque = so;
}
/* we aren't holding any read locks, but gotta drop the pins */
if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
{
ReleaseBuffer(so->btso_curbuf);
so->btso_curbuf = InvalidBuffer;
ItemPointerSetInvalid(&(so->curHeapIptr));
ItemPointerSetInvalid(iptr);
}
if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
{
ReleaseBuffer(so->btso_mrkbuf);
so->btso_mrkbuf = InvalidBuffer;
ItemPointerSetInvalid(&(so->mrkHeapIptr));
ItemPointerSetInvalid(iptr);
}
/*
* Reset the scan keys. Note that keys ordering stuff moved to
* _bt_first. - vadim 05/05/97
*/
so->numberOfKeys = scan->numberOfKeys;
if (scan->numberOfKeys > 0)
{
memmove(scan->keyData,
scankey,
scan->numberOfKeys * sizeof(ScanKeyData));
memmove(so->keyData,
scankey,
so->numberOfKeys * sizeof(ScanKeyData));
}
PG_RETURN_VOID();
}
void
btmovescan(IndexScanDesc scan, Datum v)
{
ItemPointer iptr;
BTScanOpaque so;
so = (BTScanOpaque) scan->opaque;
/* we aren't holding any read locks, but gotta drop the pin */
if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
{
ReleaseBuffer(so->btso_curbuf);
so->btso_curbuf = InvalidBuffer;
ItemPointerSetInvalid(iptr);
}
so->keyData[0].sk_argument = v;
}
/*
* btendscan() -- close down a scan
*/
Datum
btendscan(PG_FUNCTION_ARGS)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ItemPointer iptr;
BTScanOpaque so;
so = (BTScanOpaque) scan->opaque;
/* we aren't holding any read locks, but gotta drop the pins */
if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
{
if (BufferIsValid(so->btso_curbuf))
ReleaseBuffer(so->btso_curbuf);
so->btso_curbuf = InvalidBuffer;
ItemPointerSetInvalid(iptr);
}
if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
{
if (BufferIsValid(so->btso_mrkbuf))
ReleaseBuffer(so->btso_mrkbuf);
so->btso_mrkbuf = InvalidBuffer;
ItemPointerSetInvalid(iptr);
}
if (so->keyData != (ScanKey) NULL)
pfree(so->keyData);
pfree(so);
PG_RETURN_VOID();
}
/*
* btmarkpos() -- save current scan position
*/
Datum
btmarkpos(PG_FUNCTION_ARGS)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ItemPointer iptr;
BTScanOpaque so;
so = (BTScanOpaque) scan->opaque;
/* we aren't holding any read locks, but gotta drop the pin */
if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
{
ReleaseBuffer(so->btso_mrkbuf);
so->btso_mrkbuf = InvalidBuffer;
ItemPointerSetInvalid(iptr);
}
/* bump pin on current buffer for assignment to mark buffer */
if (ItemPointerIsValid(&(scan->currentItemData)))
{
so->btso_mrkbuf = ReadBuffer(scan->indexRelation,
BufferGetBlockNumber(so->btso_curbuf));
scan->currentMarkData = scan->currentItemData;
so->mrkHeapIptr = so->curHeapIptr;
}
PG_RETURN_VOID();
}
/*
* btrestrpos() -- restore scan to last saved position
*/
Datum
btrestrpos(PG_FUNCTION_ARGS)
{
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ItemPointer iptr;
BTScanOpaque so;
so = (BTScanOpaque) scan->opaque;
/* we aren't holding any read locks, but gotta drop the pin */
if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
{
ReleaseBuffer(so->btso_curbuf);
so->btso_curbuf = InvalidBuffer;
ItemPointerSetInvalid(iptr);
}
/* bump pin on marked buffer */
if (ItemPointerIsValid(&(scan->currentMarkData)))
{
so->btso_curbuf = ReadBuffer(scan->indexRelation,
BufferGetBlockNumber(so->btso_mrkbuf));
scan->currentItemData = scan->currentMarkData;
so->curHeapIptr = so->mrkHeapIptr;
}
PG_RETURN_VOID();
}
/*
* Bulk deletion of all index entries pointing to a set of heap tuples.
* The set of target tuples is specified via a callback routine that tells
* whether any given heap tuple (identified by ItemPointer) is being deleted.
*
* Result: a palloc'd struct containing statistical info for VACUUM displays.
*/
Datum
btbulkdelete(PG_FUNCTION_ARGS)
{
Relation rel = (Relation) PG_GETARG_POINTER(0);
IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(1);
void *callback_state = (void *) PG_GETARG_POINTER(2);
IndexBulkDeleteResult *result;
BlockNumber num_pages;
double tuples_removed;
double num_index_tuples;
IndexScanDesc scan;
BTScanOpaque so;
ItemPointer current;
tuples_removed = 0;
num_index_tuples = 0;
/*
* We use a standard IndexScanDesc scan object, but to speed up the
* loop, we skip most of the wrapper layers of index_getnext and
* instead call _bt_step directly. This implies holding buffer lock
* on a target page throughout the loop over the page's tuples.
*
* Whenever we step onto a new page, we have to trade in the read
* lock acquired by _bt_first or _bt_step for an exclusive write lock
* (fortunately, _bt_relbuf doesn't care which kind of lock it's
* releasing when it comes time for _bt_step to release our lock).
*
* Note that we exclusive-lock every leaf page, or at least every one
* containing data items. It sounds attractive to only exclusive-lock
* those containing items we need to delete, but unfortunately that
* is not safe: we could then pass a stopped indexscan, which could
* in rare cases lead to deleting the item it needs to find when it
* resumes. (See _bt_restscan --- this could only happen if an indexscan
* stops on a deletable item and then a page split moves that item
* into a page further to its right, which the indexscan will have no
* pin on.)
*/
scan = index_beginscan(NULL, rel, SnapshotAny, 0, (ScanKey) NULL);
so = (BTScanOpaque) scan->opaque;
current = &(scan->currentItemData);
/* Use _bt_first to get started, then _bt_step to remaining tuples */
if (_bt_first(scan, ForwardScanDirection))
{
Buffer buf;
BlockNumber lockedBlock = InvalidBlockNumber;
/* we have the buffer pinned and read-locked */
buf = so->btso_curbuf;
Assert(BufferIsValid(buf));
do
{
Page page;
BlockNumber blkno;
OffsetNumber offnum;
BTItem btitem;
BTPageOpaque opaque;
IndexTuple itup;
ItemPointer htup;
CHECK_FOR_INTERRUPTS();
/* current is the next index tuple */
page = BufferGetPage(buf);
blkno = ItemPointerGetBlockNumber(current);
/*
* Make sure we have a super-exclusive write lock on this page.
*
* We assume that only concurrent insertions, not deletions,
* can occur while we're not holding the page lock (the
* caller should hold a suitable relation lock to ensure
* this). Therefore, no items can escape being scanned because
* of this temporary lock release.
*/
if (blkno != lockedBlock)
{
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
LockBufferForCleanup(buf);
lockedBlock = blkno;
/*
* If the page was formerly rightmost but was split while we
* didn't hold the lock, and ip_posid is pointing to item
* 1, then ip_posid now points at the high key not a valid
* data item. In this case we need to step forward.
*/
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (current->ip_posid < P_FIRSTDATAKEY(opaque))
current->ip_posid = P_FIRSTDATAKEY(opaque);
}
offnum = ItemPointerGetOffsetNumber(current);
btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
itup = &btitem->bti_itup;
htup = &(itup->t_tid);
if (callback(htup, callback_state))
{
/* Okay to delete the item from the page */
_bt_itemdel(rel, buf, current);
/* Mark buffer dirty, but keep the lock and pin */
WriteNoReleaseBuffer(buf);
tuples_removed += 1;
/*
* We now need to back up the scan one item, so that the next
* cycle will re-examine the same offnum on this page (which
* now holds the next item).
*/
current->ip_posid--;
}
else
num_index_tuples += 1;
} while (_bt_step(scan, &buf, ForwardScanDirection));
}
index_endscan(scan);
/* return statistics */
num_pages = RelationGetNumberOfBlocks(rel);
result = (IndexBulkDeleteResult *) palloc(sizeof(IndexBulkDeleteResult));
result->num_pages = num_pages;
result->num_index_tuples = num_index_tuples;
result->tuples_removed = tuples_removed;
result->pages_free = 0; /* not computed here */
PG_RETURN_POINTER(result);
}
/*
* Post-VACUUM cleanup.
*
* Here, we scan looking for pages we can delete or return to the freelist.
*
* Result: a palloc'd struct containing statistical info for VACUUM displays.
*/
Datum
btvacuumcleanup(PG_FUNCTION_ARGS)
{
Relation rel = (Relation) PG_GETARG_POINTER(0);
#ifdef NOT_USED
IndexVacuumCleanupInfo *info = (IndexVacuumCleanupInfo *) PG_GETARG_POINTER(1);
#endif
IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(2);
BlockNumber num_pages;
BlockNumber blkno;
PageFreeSpaceInfo *pageSpaces;
int nFreePages,
maxFreePages;
Assert(stats != NULL);
num_pages = RelationGetNumberOfBlocks(rel);
/* No point in remembering more than MaxFSMPages pages */
maxFreePages = MaxFSMPages;
if ((BlockNumber) maxFreePages > num_pages)
maxFreePages = (int) num_pages + 1; /* +1 to avoid palloc(0) */
pageSpaces = (PageFreeSpaceInfo *) palloc(maxFreePages * sizeof(PageFreeSpaceInfo));
nFreePages = 0;
/*
* Scan through all pages of index, except metapage. (Any pages added
* after we start the scan will not be examined; this should be fine,
* since they can't possibly be empty.)
*/
for (blkno = BTREE_METAPAGE+1; blkno < num_pages; blkno++)
{
Buffer buf;
Page page;
BTPageOpaque opaque;
buf = _bt_getbuf(rel, blkno, BT_READ);
page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (P_ISDELETED(opaque))
{
/* XXX if safe-to-reclaim... */
if (nFreePages < maxFreePages)
{
pageSpaces[nFreePages].blkno = blkno;
/* The avail-space value is bogus, but must be < BLCKSZ */
pageSpaces[nFreePages].avail = BLCKSZ-1;
nFreePages++;
}
}
_bt_relbuf(rel, buf);
}
/*
* Update the shared Free Space Map with the info we now have about
* free space in the index, discarding any old info the map may have.
* We do not need to sort the page numbers; they're in order already.
*/
MultiRecordFreeSpace(&rel->rd_node, 0, nFreePages, pageSpaces);
pfree(pageSpaces);
/* update statistics */
stats->num_pages = num_pages;
stats->pages_free = nFreePages;
PG_RETURN_POINTER(stats);
}
/*
* Restore scan position when btgettuple is called to continue a scan.
*
* This is nontrivial because concurrent insertions might have moved the
* index tuple we stopped on. We assume the tuple can only have moved to
* the right from our stop point, because we kept a pin on the buffer,
* and so no deletion can have occurred on that page.
*
* On entry, we have a pin but no read lock on the buffer that contained
* the index tuple we stopped the scan on. On exit, we have pin and read
* lock on the buffer that now contains that index tuple, and the scandesc's
* current position is updated to point at it.
*/
static void
_bt_restscan(IndexScanDesc scan)
{
Relation rel = scan->indexRelation;
BTScanOpaque so = (BTScanOpaque) scan->opaque;
Buffer buf = so->btso_curbuf;
Page page;
ItemPointer current = &(scan->currentItemData);
OffsetNumber offnum = ItemPointerGetOffsetNumber(current),
maxoff;
BTPageOpaque opaque;
Buffer nextbuf;
ItemPointer target = &(so->curHeapIptr);
BTItem item;
BlockNumber blkno;
/*
* Reacquire read lock on the buffer. (We should still have
* a reference-count pin on it, so need not get that.)
*/
LockBuffer(buf, BT_READ);
page = BufferGetPage(buf);
maxoff = PageGetMaxOffsetNumber(page);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
/*
* We use this as flag when first index tuple on page is deleted but
* we do not move left (this would slowdown vacuum) - so we set
* current->ip_posid before first index tuple on the current page
* (_bt_step will move it right)... XXX still needed?
*/
if (!ItemPointerIsValid(target))
{
ItemPointerSetOffsetNumber(current,
OffsetNumberPrev(P_FIRSTDATAKEY(opaque)));
return;
}
/*
* The item we were on may have moved right due to insertions. Find it
* again. We use the heap TID to identify the item uniquely.
*/
for (;;)
{
/* Check for item on this page */
for (;
offnum <= maxoff;
offnum = OffsetNumberNext(offnum))
{
item = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
if (BTTidSame(item->bti_itup.t_tid, *target))
{
/* Found it */
current->ip_posid = offnum;
return;
}
}
/*
* The item we're looking for moved right at least one page, so
* move right. We are careful here to pin and read-lock the next
* non-dead page before releasing the current one. This ensures that
* a concurrent btbulkdelete scan cannot pass our position --- if it
* did, it might be able to reach and delete our target item before
* we can find it again.
*/
if (P_RIGHTMOST(opaque))
elog(ERROR, "_bt_restscan: my bits moved right off the end of the world!"
"\n\tRecreate index %s.", RelationGetRelationName(rel));
/* Advance to next non-dead page --- there must be one */
nextbuf = InvalidBuffer;
for (;;)
{
blkno = opaque->btpo_next;
if (nextbuf != InvalidBuffer)
_bt_relbuf(rel, nextbuf);
nextbuf = _bt_getbuf(rel, blkno, BT_READ);
page = BufferGetPage(nextbuf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (!P_IGNORE(opaque))
break;
if (P_RIGHTMOST(opaque))
elog(ERROR, "_bt_restscan: fell off the end of %s",
RelationGetRelationName(rel));
}
_bt_relbuf(rel, buf);
so->btso_curbuf = buf = nextbuf;
maxoff = PageGetMaxOffsetNumber(page);
offnum = P_FIRSTDATAKEY(opaque);
ItemPointerSet(current, blkno, offnum);
}
}