btree: Support parallel index scans.

This isn't exposed to the optimizer or the executor yet; we'll add
support for those things in a separate patch.  But this puts the
basic mechanism in place: several processes can attach to a parallel
btree index scan, and each one will get a subset of the tuples that
would have been produced by a non-parallel scan.  Each index page
becomes the responsibility of a single worker, which then returns
all of the TIDs on that page.

Rahila Syed, Amit Kapila, Robert Haas, reviewed and tested by
Anastasia Lubennikova, Tushar Ahuja, and Haribabu Kommi.
pull/17/merge
Robert Haas 9 years ago
parent 8569955ee3
commit 569174f1be
  1. 6
      doc/src/sgml/monitoring.sgml
  2. 259
      src/backend/access/nbtree/nbtree.c
  3. 286
      src/backend/access/nbtree/nbtsearch.c
  4. 4
      src/backend/access/nbtree/nbtutils.c
  5. 3
      src/backend/postmaster/pgstat.c
  6. 15
      src/include/access/nbtree.h
  7. 1
      src/include/pgstat.h
  8. 3
      src/tools/pgindent/typedefs.list

@ -1207,7 +1207,7 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
<entry>Waiting in an extension.</entry> <entry>Waiting in an extension.</entry>
</row> </row>
<row> <row>
<entry morerows="9"><literal>IPC</></entry> <entry morerows="10"><literal>IPC</></entry>
<entry><literal>BgWorkerShutdown</></entry> <entry><literal>BgWorkerShutdown</></entry>
<entry>Waiting for background worker to shut down.</entry> <entry>Waiting for background worker to shut down.</entry>
</row> </row>
@ -1215,6 +1215,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
<entry><literal>BgWorkerStartup</></entry> <entry><literal>BgWorkerStartup</></entry>
<entry>Waiting for background worker to start up.</entry> <entry>Waiting for background worker to start up.</entry>
</row> </row>
<row>
<entry><literal>BtreePage</></entry>
<entry>Waiting for the page number needed to continue a parallel btree scan to become available.</entry>
</row>
<row> <row>
<entry><literal>ExecuteGather</></entry> <entry><literal>ExecuteGather</></entry>
<entry>Waiting for activity from child process when executing <literal>Gather</> node.</entry> <entry>Waiting for activity from child process when executing <literal>Gather</> node.</entry>

@ -23,6 +23,8 @@
#include "access/xlog.h" #include "access/xlog.h"
#include "catalog/index.h" #include "catalog/index.h"
#include "commands/vacuum.h" #include "commands/vacuum.h"
#include "pgstat.h"
#include "storage/condition_variable.h"
#include "storage/indexfsm.h" #include "storage/indexfsm.h"
#include "storage/ipc.h" #include "storage/ipc.h"
#include "storage/lmgr.h" #include "storage/lmgr.h"
@ -63,6 +65,45 @@ typedef struct
MemoryContext pagedelcontext; MemoryContext pagedelcontext;
} BTVacState; } BTVacState;
/*
* BTPARALLEL_NOT_INITIALIZED indicates that the scan has not started.
*
* BTPARALLEL_ADVANCING indicates that some process is advancing the scan to
* a new page; others must wait.
*
* BTPARALLEL_IDLE indicates that no backend is currently advancing the scan
* to a new page; some process can start doing that.
*
* BTPARALLEL_DONE indicates that the scan is complete (including error exit).
* We reach this state once for every distinct combination of array keys.
*/
typedef enum
{
BTPARALLEL_NOT_INITIALIZED,
BTPARALLEL_ADVANCING,
BTPARALLEL_IDLE,
BTPARALLEL_DONE
} BTPS_State;
/*
* BTParallelScanDescData contains btree specific shared information required
* for parallel scan.
*/
typedef struct BTParallelScanDescData
{
BlockNumber btps_scanPage; /* latest or next page to be scanned */
BTPS_State btps_pageStatus;/* indicates whether next page is available
* for scan. see above for possible states of
* parallel scan. */
int btps_arrayKeyCount; /* count indicating number of array
* scan keys processed by parallel
* scan */
slock_t btps_mutex; /* protects above variables */
ConditionVariable btps_cv; /* used to synchronize parallel scan */
} BTParallelScanDescData;
typedef struct BTParallelScanDescData *BTParallelScanDesc;
static void btbuildCallback(Relation index, static void btbuildCallback(Relation index,
HeapTuple htup, HeapTuple htup,
@ -118,9 +159,9 @@ bthandler(PG_FUNCTION_ARGS)
amroutine->amendscan = btendscan; amroutine->amendscan = btendscan;
amroutine->ammarkpos = btmarkpos; amroutine->ammarkpos = btmarkpos;
amroutine->amrestrpos = btrestrpos; amroutine->amrestrpos = btrestrpos;
amroutine->amestimateparallelscan = NULL; amroutine->amestimateparallelscan = btestimateparallelscan;
amroutine->aminitparallelscan = NULL; amroutine->aminitparallelscan = btinitparallelscan;
amroutine->amparallelrescan = NULL; amroutine->amparallelrescan = btparallelrescan;
PG_RETURN_POINTER(amroutine); PG_RETURN_POINTER(amroutine);
} }
@ -491,6 +532,7 @@ btrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
} }
so->markItemIndex = -1; so->markItemIndex = -1;
so->arrayKeyCount = 0;
BTScanPosUnpinIfPinned(so->markPos); BTScanPosUnpinIfPinned(so->markPos);
BTScanPosInvalidate(so->markPos); BTScanPosInvalidate(so->markPos);
@ -652,6 +694,217 @@ btrestrpos(IndexScanDesc scan)
} }
} }
/*
* btestimateparallelscan -- estimate storage for BTParallelScanDescData
*/
Size
btestimateparallelscan(void)
{
return sizeof(BTParallelScanDescData);
}
/*
* btinitparallelscan -- initialize BTParallelScanDesc for parallel btree scan
*/
void
btinitparallelscan(void *target)
{
BTParallelScanDesc bt_target = (BTParallelScanDesc) target;
SpinLockInit(&bt_target->btps_mutex);
bt_target->btps_scanPage = InvalidBlockNumber;
bt_target->btps_pageStatus = BTPARALLEL_NOT_INITIALIZED;
bt_target->btps_arrayKeyCount = 0;
ConditionVariableInit(&bt_target->btps_cv);
}
/*
* btparallelrescan() -- reset parallel scan
*/
void
btparallelrescan(IndexScanDesc scan)
{
BTParallelScanDesc btscan;
ParallelIndexScanDesc parallel_scan = scan->parallel_scan;
Assert(parallel_scan);
btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan,
parallel_scan->ps_offset);
/*
* In theory, we don't need to acquire the spinlock here, because there
* shouldn't be any other workers running at this point, but we do so for
* consistency.
*/
SpinLockAcquire(&btscan->btps_mutex);
btscan->btps_scanPage = InvalidBlockNumber;
btscan->btps_pageStatus = BTPARALLEL_NOT_INITIALIZED;
btscan->btps_arrayKeyCount = 0;
SpinLockRelease(&btscan->btps_mutex);
}
/*
* _bt_parallel_seize() -- Begin the process of advancing the scan to a new
* page. Other scans must wait until we call bt_parallel_release() or
* bt_parallel_done().
*
* The return value is true if we successfully seized the scan and false
* if we did not. The latter case occurs if no pages remain for the current
* set of scankeys.
*
* If the return value is true, *pageno returns the next or current page
* of the scan (depending on the scan direction). An invalid block number
* means the scan hasn't yet started, and P_NONE means we've reached the end.
* The first time a participating process reaches the last page, it will return
* true and set *pageno to P_NONE; after that, further attempts to seize the
* scan will return false.
*
* Callers should ignore the value of pageno if the return value is false.
*/
bool
_bt_parallel_seize(IndexScanDesc scan, BlockNumber *pageno)
{
BTScanOpaque so = (BTScanOpaque) scan->opaque;
BTPS_State pageStatus;
bool exit_loop = false;
bool status = true;
ParallelIndexScanDesc parallel_scan = scan->parallel_scan;
BTParallelScanDesc btscan;
*pageno = P_NONE;
btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan,
parallel_scan->ps_offset);
while (1)
{
SpinLockAcquire(&btscan->btps_mutex);
pageStatus = btscan->btps_pageStatus;
if (so->arrayKeyCount < btscan->btps_arrayKeyCount)
{
/* Parallel scan has already advanced to a new set of scankeys. */
status = false;
}
else if (pageStatus == BTPARALLEL_DONE)
{
/*
* We're done with this set of scankeys. This may be the end, or
* there could be more sets to try.
*/
status = false;
}
else if (pageStatus != BTPARALLEL_ADVANCING)
{
/*
* We have successfully seized control of the scan for the purpose
* of advancing it to a new page!
*/
btscan->btps_pageStatus = BTPARALLEL_ADVANCING;
*pageno = btscan->btps_scanPage;
exit_loop = true;
}
SpinLockRelease(&btscan->btps_mutex);
if (exit_loop || !status)
break;
ConditionVariableSleep(&btscan->btps_cv, WAIT_EVENT_BTREE_PAGE);
}
ConditionVariableCancelSleep();
return status;
}
/*
* _bt_parallel_release() -- Complete the process of advancing the scan to a
* new page. We now have the new value btps_scanPage; some other backend
* can now begin advancing the scan.
*/
void
_bt_parallel_release(IndexScanDesc scan, BlockNumber scan_page)
{
ParallelIndexScanDesc parallel_scan = scan->parallel_scan;
BTParallelScanDesc btscan;
btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan,
parallel_scan->ps_offset);
SpinLockAcquire(&btscan->btps_mutex);
btscan->btps_scanPage = scan_page;
btscan->btps_pageStatus = BTPARALLEL_IDLE;
SpinLockRelease(&btscan->btps_mutex);
ConditionVariableSignal(&btscan->btps_cv);
}
/*
* _bt_parallel_done() -- Mark the parallel scan as complete.
*
* When there are no pages left to scan, this function should be called to
* notify other workers. Otherwise, they might wait forever for the scan to
* advance to the next page.
*/
void
_bt_parallel_done(IndexScanDesc scan)
{
BTScanOpaque so = (BTScanOpaque) scan->opaque;
ParallelIndexScanDesc parallel_scan = scan->parallel_scan;
BTParallelScanDesc btscan;
bool status_changed = false;
/* Do nothing, for non-parallel scans */
if (parallel_scan == NULL)
return;
btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan,
parallel_scan->ps_offset);
/*
* Mark the parallel scan as done for this combination of scan keys,
* unless some other process already did so. See also
* _bt_advance_array_keys.
*/
SpinLockAcquire(&btscan->btps_mutex);
if (so->arrayKeyCount >= btscan->btps_arrayKeyCount &&
btscan->btps_pageStatus != BTPARALLEL_DONE)
{
btscan->btps_pageStatus = BTPARALLEL_DONE;
status_changed = true;
}
SpinLockRelease(&btscan->btps_mutex);
/* wake up all the workers associated with this parallel scan */
if (status_changed)
ConditionVariableBroadcast(&btscan->btps_cv);
}
/*
* _bt_parallel_advance_array_keys() -- Advances the parallel scan for array
* keys.
*
* Updates the count of array keys processed for both local and parallel
* scans.
*/
void
_bt_parallel_advance_array_keys(IndexScanDesc scan)
{
BTScanOpaque so = (BTScanOpaque) scan->opaque;
ParallelIndexScanDesc parallel_scan = scan->parallel_scan;
BTParallelScanDesc btscan;
btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan,
parallel_scan->ps_offset);
so->arrayKeyCount++;
SpinLockAcquire(&btscan->btps_mutex);
if (btscan->btps_pageStatus == BTPARALLEL_DONE)
{
btscan->btps_scanPage = InvalidBlockNumber;
btscan->btps_pageStatus = BTPARALLEL_NOT_INITIALIZED;
btscan->btps_arrayKeyCount++;
}
SpinLockRelease(&btscan->btps_mutex);
}
/* /*
* Bulk deletion of all index entries pointing to a set of heap tuples. * Bulk deletion of all index entries pointing to a set of heap tuples.
* The set of target tuples is specified via a callback routine that tells * The set of target tuples is specified via a callback routine that tells

@ -30,9 +30,13 @@ static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir,
static void _bt_saveitem(BTScanOpaque so, int itemIndex, static void _bt_saveitem(BTScanOpaque so, int itemIndex,
OffsetNumber offnum, IndexTuple itup); OffsetNumber offnum, IndexTuple itup);
static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir); static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir);
static bool _bt_readnextpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir);
static bool _bt_parallel_readpage(IndexScanDesc scan, BlockNumber blkno,
ScanDirection dir);
static Buffer _bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot); static Buffer _bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot);
static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir); static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir);
static void _bt_drop_lock_and_maybe_pin(IndexScanDesc scan, BTScanPos sp); static void _bt_drop_lock_and_maybe_pin(IndexScanDesc scan, BTScanPos sp);
static inline void _bt_initialize_more_data(BTScanOpaque so, ScanDirection dir);
/* /*
@ -544,8 +548,10 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
ScanKeyData notnullkeys[INDEX_MAX_KEYS]; ScanKeyData notnullkeys[INDEX_MAX_KEYS];
int keysCount = 0; int keysCount = 0;
int i; int i;
bool status = true;
StrategyNumber strat_total; StrategyNumber strat_total;
BTScanPosItem *currItem; BTScanPosItem *currItem;
BlockNumber blkno;
Assert(!BTScanPosIsValid(so->currPos)); Assert(!BTScanPosIsValid(so->currPos));
@ -564,6 +570,30 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
if (!so->qual_ok) if (!so->qual_ok)
return false; return false;
/*
* For parallel scans, get the starting page from shared state. If the
* scan has not started, proceed to find out first leaf page in the usual
* way while keeping other participating processes waiting. If the scan
* has already begun, use the page number from the shared structure.
*/
if (scan->parallel_scan != NULL)
{
status = _bt_parallel_seize(scan, &blkno);
if (!status)
return false;
else if (blkno == P_NONE)
{
_bt_parallel_done(scan);
return false;
}
else if (blkno != InvalidBlockNumber)
{
if (!_bt_parallel_readpage(scan, blkno, dir))
return false;
goto readcomplete;
}
}
/*---------- /*----------
* Examine the scan keys to discover where we need to start the scan. * Examine the scan keys to discover where we need to start the scan.
* *
@ -743,7 +773,19 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
* there. * there.
*/ */
if (keysCount == 0) if (keysCount == 0)
return _bt_endpoint(scan, dir); {
bool match;
match = _bt_endpoint(scan, dir);
if (!match)
{
/* No match, so mark (parallel) scan finished */
_bt_parallel_done(scan);
}
return match;
}
/* /*
* We want to start the scan somewhere within the index. Set up an * We want to start the scan somewhere within the index. Set up an
@ -773,7 +815,10 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
Assert(subkey->sk_flags & SK_ROW_MEMBER); Assert(subkey->sk_flags & SK_ROW_MEMBER);
if (subkey->sk_flags & SK_ISNULL) if (subkey->sk_flags & SK_ISNULL)
{
_bt_parallel_done(scan);
return false; return false;
}
memcpy(scankeys + i, subkey, sizeof(ScanKeyData)); memcpy(scankeys + i, subkey, sizeof(ScanKeyData));
/* /*
@ -993,25 +1038,21 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
* because nothing finer to lock exists. * because nothing finer to lock exists.
*/ */
PredicateLockRelation(rel, scan->xs_snapshot); PredicateLockRelation(rel, scan->xs_snapshot);
/*
* mark parallel scan as done, so that all the workers can finish
* their scan
*/
_bt_parallel_done(scan);
BTScanPosInvalidate(so->currPos);
return false; return false;
} }
else else
PredicateLockPage(rel, BufferGetBlockNumber(buf), PredicateLockPage(rel, BufferGetBlockNumber(buf),
scan->xs_snapshot); scan->xs_snapshot);
/* initialize moreLeft/moreRight appropriately for scan direction */ _bt_initialize_more_data(so, dir);
if (ScanDirectionIsForward(dir))
{
so->currPos.moreLeft = false;
so->currPos.moreRight = true;
}
else
{
so->currPos.moreLeft = true;
so->currPos.moreRight = false;
}
so->numKilled = 0; /* just paranoia */
Assert(so->markItemIndex == -1);
/* position to the precise item on the page */ /* position to the precise item on the page */
offnum = _bt_binsrch(rel, buf, keysCount, scankeys, nextkey); offnum = _bt_binsrch(rel, buf, keysCount, scankeys, nextkey);
@ -1060,6 +1101,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
_bt_drop_lock_and_maybe_pin(scan, &so->currPos); _bt_drop_lock_and_maybe_pin(scan, &so->currPos);
} }
readcomplete:
/* OK, itemIndex says what to return */ /* OK, itemIndex says what to return */
currItem = &so->currPos.items[so->currPos.itemIndex]; currItem = &so->currPos.items[so->currPos.itemIndex];
scan->xs_ctup.t_self = currItem->heapTid; scan->xs_ctup.t_self = currItem->heapTid;
@ -1132,6 +1174,10 @@ _bt_next(IndexScanDesc scan, ScanDirection dir)
* moreLeft or moreRight (as appropriate) is cleared if _bt_checkkeys reports * moreLeft or moreRight (as appropriate) is cleared if _bt_checkkeys reports
* that there can be no more matching tuples in the current scan direction. * that there can be no more matching tuples in the current scan direction.
* *
* In the case of a parallel scan, caller must have called _bt_parallel_seize
* prior to calling this function; this function will invoke
* _bt_parallel_release before returning.
*
* Returns true if any matching items found on the page, false if none. * Returns true if any matching items found on the page, false if none.
*/ */
static bool static bool
@ -1154,6 +1200,16 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
page = BufferGetPage(so->currPos.buf); page = BufferGetPage(so->currPos.buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page); opaque = (BTPageOpaque) PageGetSpecialPointer(page);
/* allow next page be processed by parallel worker */
if (scan->parallel_scan)
{
if (ScanDirectionIsForward(dir))
_bt_parallel_release(scan, opaque->btpo_next);
else
_bt_parallel_release(scan, BufferGetBlockNumber(so->currPos.buf));
}
minoff = P_FIRSTDATAKEY(opaque); minoff = P_FIRSTDATAKEY(opaque);
maxoff = PageGetMaxOffsetNumber(page); maxoff = PageGetMaxOffsetNumber(page);
@ -1278,21 +1334,16 @@ _bt_saveitem(BTScanOpaque so, int itemIndex,
* if pinned, we'll drop the pin before moving to next page. The buffer is * if pinned, we'll drop the pin before moving to next page. The buffer is
* not locked on entry. * not locked on entry.
* *
* On success exit, so->currPos is updated to contain data from the next * For success on a scan using a non-MVCC snapshot we hold a pin, but not a
* interesting page. For success on a scan using a non-MVCC snapshot we hold * read lock, on that page. If we do not hold the pin, we set so->currPos.buf
* a pin, but not a read lock, on that page. If we do not hold the pin, we * to InvalidBuffer. We return TRUE to indicate success.
* set so->currPos.buf to InvalidBuffer. We return TRUE to indicate success.
*
* If there are no more matching records in the given direction, we drop all
* locks and pins, set so->currPos.buf to InvalidBuffer, and return FALSE.
*/ */
static bool static bool
_bt_steppage(IndexScanDesc scan, ScanDirection dir) _bt_steppage(IndexScanDesc scan, ScanDirection dir)
{ {
BTScanOpaque so = (BTScanOpaque) scan->opaque; BTScanOpaque so = (BTScanOpaque) scan->opaque;
Relation rel; BlockNumber blkno = InvalidBlockNumber;
Page page; bool status = true;
BTPageOpaque opaque;
Assert(BTScanPosIsValid(so->currPos)); Assert(BTScanPosIsValid(so->currPos));
@ -1319,25 +1370,103 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
so->markItemIndex = -1; so->markItemIndex = -1;
} }
rel = scan->indexRelation;
if (ScanDirectionIsForward(dir)) if (ScanDirectionIsForward(dir))
{ {
/* Walk right to the next page with data */ /* Walk right to the next page with data */
/* We must rely on the previously saved nextPage link! */ if (scan->parallel_scan != NULL)
BlockNumber blkno = so->currPos.nextPage; {
/*
* Seize the scan to get the next block number; if the scan has
* ended already, bail out.
*/
status = _bt_parallel_seize(scan, &blkno);
if (!status)
{
/* release the previous buffer, if pinned */
BTScanPosUnpinIfPinned(so->currPos);
BTScanPosInvalidate(so->currPos);
return false;
}
}
else
{
/* Not parallel, so use the previously-saved nextPage link. */
blkno = so->currPos.nextPage;
}
/* Remember we left a page with data */ /* Remember we left a page with data */
so->currPos.moreLeft = true; so->currPos.moreLeft = true;
/* release the previous buffer, if pinned */ /* release the previous buffer, if pinned */
BTScanPosUnpinIfPinned(so->currPos); BTScanPosUnpinIfPinned(so->currPos);
}
else
{
/* Remember we left a page with data */
so->currPos.moreRight = true;
if (scan->parallel_scan != NULL)
{
/*
* Seize the scan to get the current block number; if the scan has
* ended already, bail out.
*/
status = _bt_parallel_seize(scan, &blkno);
BTScanPosUnpinIfPinned(so->currPos);
if (!status)
{
BTScanPosInvalidate(so->currPos);
return false;
}
}
else
{
/* Not parallel, so just use our own notion of the current page */
blkno = so->currPos.currPage;
}
}
if (!_bt_readnextpage(scan, blkno, dir))
return false;
/* Drop the lock, and maybe the pin, on the current page */
_bt_drop_lock_and_maybe_pin(scan, &so->currPos);
return true;
}
/*
* _bt_readnextpage() -- Read next page containing valid data for scan
*
* On success exit, so->currPos is updated to contain data from the next
* interesting page. Caller is responsible to release lock and pin on
* buffer on success. We return TRUE to indicate success.
*
* If there are no more matching records in the given direction, we drop all
* locks and pins, set so->currPos.buf to InvalidBuffer, and return FALSE.
*/
static bool
_bt_readnextpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir)
{
BTScanOpaque so = (BTScanOpaque) scan->opaque;
Relation rel;
Page page;
BTPageOpaque opaque;
bool status = true;
rel = scan->indexRelation;
if (ScanDirectionIsForward(dir))
{
for (;;) for (;;)
{ {
/* if we're at end of scan, give up */ /*
* if we're at end of scan, give up and mark parallel scan as
* done, so that all the workers can finish their scan
*/
if (blkno == P_NONE || !so->currPos.moreRight) if (blkno == P_NONE || !so->currPos.moreRight)
{ {
_bt_parallel_done(scan);
BTScanPosInvalidate(so->currPos); BTScanPosInvalidate(so->currPos);
return false; return false;
} }
@ -1359,14 +1488,32 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
} }
/* nope, keep going */ /* nope, keep going */
blkno = opaque->btpo_next; if (scan->parallel_scan != NULL)
{
status = _bt_parallel_seize(scan, &blkno);
if (!status)
{
_bt_relbuf(rel, so->currPos.buf);
BTScanPosInvalidate(so->currPos);
return false;
}
}
else
blkno = opaque->btpo_next;
_bt_relbuf(rel, so->currPos.buf); _bt_relbuf(rel, so->currPos.buf);
} }
} }
else else
{ {
/* Remember we left a page with data */ /*
so->currPos.moreRight = true; * Should only happen in parallel cases, when some other backend
* advanced the scan.
*/
if (so->currPos.currPage != blkno)
{
BTScanPosUnpinIfPinned(so->currPos);
so->currPos.currPage = blkno;
}
/* /*
* Walk left to the next page with data. This is much more complex * Walk left to the next page with data. This is much more complex
@ -1401,6 +1548,7 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
if (!so->currPos.moreLeft) if (!so->currPos.moreLeft)
{ {
_bt_relbuf(rel, so->currPos.buf); _bt_relbuf(rel, so->currPos.buf);
_bt_parallel_done(scan);
BTScanPosInvalidate(so->currPos); BTScanPosInvalidate(so->currPos);
return false; return false;
} }
@ -1412,6 +1560,7 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
/* if we're physically at end of index, return failure */ /* if we're physically at end of index, return failure */
if (so->currPos.buf == InvalidBuffer) if (so->currPos.buf == InvalidBuffer)
{ {
_bt_parallel_done(scan);
BTScanPosInvalidate(so->currPos); BTScanPosInvalidate(so->currPos);
return false; return false;
} }
@ -1432,9 +1581,46 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
if (_bt_readpage(scan, dir, PageGetMaxOffsetNumber(page))) if (_bt_readpage(scan, dir, PageGetMaxOffsetNumber(page)))
break; break;
} }
/*
* For parallel scans, get the last page scanned as it is quite
* possible that by the time we try to seize the scan, some other
* worker has already advanced the scan to a different page. We
* must continue based on the latest page scanned by any worker.
*/
if (scan->parallel_scan != NULL)
{
_bt_relbuf(rel, so->currPos.buf);
status = _bt_parallel_seize(scan, &blkno);
if (!status)
{
BTScanPosInvalidate(so->currPos);
return false;
}
so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ);
}
} }
} }
return true;
}
/*
* _bt_parallel_readpage() -- Read current page containing valid data for scan
*
* On success, release lock and maybe pin on buffer. We return TRUE to
* indicate success.
*/
static bool
_bt_parallel_readpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir)
{
BTScanOpaque so = (BTScanOpaque) scan->opaque;
_bt_initialize_more_data(so, dir);
if (!_bt_readnextpage(scan, blkno, dir))
return false;
/* Drop the lock, and maybe the pin, on the current page */ /* Drop the lock, and maybe the pin, on the current page */
_bt_drop_lock_and_maybe_pin(scan, &so->currPos); _bt_drop_lock_and_maybe_pin(scan, &so->currPos);
@ -1712,19 +1898,7 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
/* remember which buffer we have pinned */ /* remember which buffer we have pinned */
so->currPos.buf = buf; so->currPos.buf = buf;
/* initialize moreLeft/moreRight appropriately for scan direction */ _bt_initialize_more_data(so, dir);
if (ScanDirectionIsForward(dir))
{
so->currPos.moreLeft = false;
so->currPos.moreRight = true;
}
else
{
so->currPos.moreLeft = true;
so->currPos.moreRight = false;
}
so->numKilled = 0; /* just paranoia */
so->markItemIndex = -1; /* ditto */
/* /*
* Now load data from the first page of the scan. * Now load data from the first page of the scan.
@ -1753,3 +1927,25 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
return true; return true;
} }
/*
* _bt_initialize_more_data() -- initialize moreLeft/moreRight appropriately
* for scan direction
*/
static inline void
_bt_initialize_more_data(BTScanOpaque so, ScanDirection dir)
{
/* initialize moreLeft/moreRight appropriately for scan direction */
if (ScanDirectionIsForward(dir))
{
so->currPos.moreLeft = false;
so->currPos.moreRight = true;
}
else
{
so->currPos.moreLeft = true;
so->currPos.moreRight = false;
}
so->numKilled = 0; /* just paranoia */
so->markItemIndex = -1; /* ditto */
}

@ -590,6 +590,10 @@ _bt_advance_array_keys(IndexScanDesc scan, ScanDirection dir)
break; break;
} }
/* advance parallel scan */
if (scan->parallel_scan != NULL)
_bt_parallel_advance_array_keys(scan);
return found; return found;
} }

@ -3374,6 +3374,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
case WAIT_EVENT_BGWORKER_STARTUP: case WAIT_EVENT_BGWORKER_STARTUP:
event_name = "BgWorkerStartup"; event_name = "BgWorkerStartup";
break; break;
case WAIT_EVENT_BTREE_PAGE:
event_name = "BtreePage";
break;
case WAIT_EVENT_EXECUTE_GATHER: case WAIT_EVENT_EXECUTE_GATHER:
event_name = "ExecuteGather"; event_name = "ExecuteGather";
break; break;

@ -383,6 +383,8 @@ typedef struct BTScanOpaqueData
ScanKey arrayKeyData; /* modified copy of scan->keyData */ ScanKey arrayKeyData; /* modified copy of scan->keyData */
int numArrayKeys; /* number of equality-type array keys (-1 if int numArrayKeys; /* number of equality-type array keys (-1 if
* there are any unsatisfiable array keys) */ * there are any unsatisfiable array keys) */
int arrayKeyCount; /* count indicating number of array scan keys
* processed */
BTArrayKeyInfo *arrayKeys; /* info about each equality-type array key */ BTArrayKeyInfo *arrayKeys; /* info about each equality-type array key */
MemoryContext arrayContext; /* scan-lifespan context for array data */ MemoryContext arrayContext; /* scan-lifespan context for array data */
@ -426,7 +428,7 @@ typedef BTScanOpaqueData *BTScanOpaque;
#define SK_BT_NULLS_FIRST (INDOPTION_NULLS_FIRST << SK_BT_INDOPTION_SHIFT) #define SK_BT_NULLS_FIRST (INDOPTION_NULLS_FIRST << SK_BT_INDOPTION_SHIFT)
/* /*
* prototypes for functions in nbtree.c (external entry points for btree) * external entry points for btree, in nbtree.c
*/ */
extern IndexBuildResult *btbuild(Relation heap, Relation index, extern IndexBuildResult *btbuild(Relation heap, Relation index,
struct IndexInfo *indexInfo); struct IndexInfo *indexInfo);
@ -436,10 +438,13 @@ extern bool btinsert(Relation rel, Datum *values, bool *isnull,
IndexUniqueCheck checkUnique, IndexUniqueCheck checkUnique,
struct IndexInfo *indexInfo); struct IndexInfo *indexInfo);
extern IndexScanDesc btbeginscan(Relation rel, int nkeys, int norderbys); extern IndexScanDesc btbeginscan(Relation rel, int nkeys, int norderbys);
extern Size btestimateparallelscan(void);
extern void btinitparallelscan(void *target);
extern bool btgettuple(IndexScanDesc scan, ScanDirection dir); extern bool btgettuple(IndexScanDesc scan, ScanDirection dir);
extern int64 btgetbitmap(IndexScanDesc scan, TIDBitmap *tbm); extern int64 btgetbitmap(IndexScanDesc scan, TIDBitmap *tbm);
extern void btrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys, extern void btrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
ScanKey orderbys, int norderbys); ScanKey orderbys, int norderbys);
extern void btparallelrescan(IndexScanDesc scan);
extern void btendscan(IndexScanDesc scan); extern void btendscan(IndexScanDesc scan);
extern void btmarkpos(IndexScanDesc scan); extern void btmarkpos(IndexScanDesc scan);
extern void btrestrpos(IndexScanDesc scan); extern void btrestrpos(IndexScanDesc scan);
@ -451,6 +456,14 @@ extern IndexBulkDeleteResult *btvacuumcleanup(IndexVacuumInfo *info,
IndexBulkDeleteResult *stats); IndexBulkDeleteResult *stats);
extern bool btcanreturn(Relation index, int attno); extern bool btcanreturn(Relation index, int attno);
/*
* prototypes for internal functions in nbtree.c
*/
extern bool _bt_parallel_seize(IndexScanDesc scan, BlockNumber *pageno);
extern void _bt_parallel_release(IndexScanDesc scan, BlockNumber scan_page);
extern void _bt_parallel_done(IndexScanDesc scan);
extern void _bt_parallel_advance_array_keys(IndexScanDesc scan);
/* /*
* prototypes for functions in nbtinsert.c * prototypes for functions in nbtinsert.c
*/ */

@ -780,6 +780,7 @@ typedef enum
{ {
WAIT_EVENT_BGWORKER_SHUTDOWN = PG_WAIT_IPC, WAIT_EVENT_BGWORKER_SHUTDOWN = PG_WAIT_IPC,
WAIT_EVENT_BGWORKER_STARTUP, WAIT_EVENT_BGWORKER_STARTUP,
WAIT_EVENT_BTREE_PAGE,
WAIT_EVENT_EXECUTE_GATHER, WAIT_EVENT_EXECUTE_GATHER,
WAIT_EVENT_MQ_INTERNAL, WAIT_EVENT_MQ_INTERNAL,
WAIT_EVENT_MQ_PUT_MESSAGE, WAIT_EVENT_MQ_PUT_MESSAGE,

@ -161,6 +161,9 @@ BTPageOpaque
BTPageOpaqueData BTPageOpaqueData
BTPageStat BTPageStat
BTPageState BTPageState
BTParallelScanDesc
BTParallelScanDescData
BTPS_State
BTScanOpaque BTScanOpaque
BTScanOpaqueData BTScanOpaqueData
BTScanPos BTScanPos

Loading…
Cancel
Save