@ -53,11 +53,15 @@
static TupleTableSlot * BitmapHeapNext ( BitmapHeapScanState * node ) ;
static void bitgetpage ( HeapScanDesc scan , TBMIterateResult * tbmres ) ;
static inline void BitmapDoneInitializingSharedState (
ParallelBitmapHeapState * pstate ) ;
static inline void BitmapAdjustPrefetchIterator ( BitmapHeapScanState * node ,
TBMIterateResult * tbmres ) ;
static inline void BitmapAdjustPrefetchTarget ( BitmapHeapScanState * node ) ;
static inline void BitmapPrefetch ( BitmapHeapScanState * node ,
HeapScanDesc scan ) ;
static bool BitmapShouldInitializeSharedState (
ParallelBitmapHeapState * pstate ) ;
/* ----------------------------------------------------------------
@ -73,9 +77,12 @@ BitmapHeapNext(BitmapHeapScanState *node)
HeapScanDesc scan ;
TIDBitmap * tbm ;
TBMIterator * tbmiterator ;
TBMSharedIterator * shared_tbmiterator ;
TBMIterateResult * tbmres ;
OffsetNumber targoffset ;
TupleTableSlot * slot ;
ParallelBitmapHeapState * pstate = node - > pstate ;
dsa_area * dsa = node - > ss . ps . state - > es_query_dsa ;
/*
* extract necessary information from index scan node
@ -84,7 +91,10 @@ BitmapHeapNext(BitmapHeapScanState *node)
slot = node - > ss . ss_ScanTupleSlot ;
scan = node - > ss . ss_currentScanDesc ;
tbm = node - > tbm ;
tbmiterator = node - > tbmiterator ;
if ( pstate = = NULL )
tbmiterator = node - > tbmiterator ;
else
shared_tbmiterator = node - > shared_tbmiterator ;
tbmres = node - > tbmres ;
/*
@ -99,25 +109,82 @@ BitmapHeapNext(BitmapHeapScanState *node)
* node - > prefetch_maximum . This is to avoid doing a lot of prefetching in
* a scan that stops after a few tuples because of a LIMIT .
*/
if ( tbm = = NULL )
if ( ! node - > initialized )
{
tbm = ( TIDBitmap * ) MultiExecProcNode ( outerPlanState ( node ) ) ;
if ( ! pstate )
{
tbm = ( TIDBitmap * ) MultiExecProcNode ( outerPlanState ( node ) ) ;
if ( ! tbm | | ! IsA ( tbm , TIDBitmap ) )
elog ( ERROR , " unrecognized result from subplan " ) ;
if ( ! tbm | | ! IsA ( tbm , TIDBitmap ) )
elog ( ERROR , " unrecognized result from subplan " ) ;
node - > tbm = tbm ;
node - > tbmiterator = tbmiterator = tbm_begin_iterate ( tbm ) ;
node - > tbmres = tbmres = NULL ;
node - > tbm = tbm ;
node - > tbmiterator = tbmiterator = tbm_begin_iterate ( tbm ) ;
node - > tbmres = tbmres = NULL ;
# ifdef USE_PREFETCH
if ( node - > prefetch_maximum > 0 )
{
node - > prefetch_iterator = tbm_begin_iterate ( tbm ) ;
node - > prefetch_pages = 0 ;
node - > prefetch_target = - 1 ;
if ( node - > prefetch_maximum > 0 )
{
node - > prefetch_iterator = tbm_begin_iterate ( tbm ) ;
node - > prefetch_pages = 0 ;
node - > prefetch_target = - 1 ;
}
# endif /* USE_PREFETCH */
}
else
{
/*
* The leader will immediately come out of the function , but
* others will be blocked until leader populates the TBM and wakes
* them up .
*/
if ( BitmapShouldInitializeSharedState ( pstate ) )
{
tbm = ( TIDBitmap * ) MultiExecProcNode ( outerPlanState ( node ) ) ;
if ( ! tbm | | ! IsA ( tbm , TIDBitmap ) )
elog ( ERROR , " unrecognized result from subplan " ) ;
node - > tbm = tbm ;
/*
* Prepare to iterate over the TBM . This will return the
* dsa_pointer of the iterator state which will be used by
* multiple processes to iterate jointly .
*/
pstate - > tbmiterator = tbm_prepare_shared_iterate ( tbm ) ;
# ifdef USE_PREFETCH
if ( node - > prefetch_maximum > 0 )
{
pstate - > prefetch_iterator =
tbm_prepare_shared_iterate ( tbm ) ;
/*
* We don ' t need the mutex here as we haven ' t yet woke up
* others .
*/
pstate - > prefetch_pages = 0 ;
pstate - > prefetch_target = - 1 ;
}
# endif
/* We have initialized the shared state so wake up others. */
BitmapDoneInitializingSharedState ( pstate ) ;
}
/* Allocate a private iterator and attach the shared state to it */
node - > shared_tbmiterator = shared_tbmiterator =
tbm_attach_shared_iterate ( dsa , pstate - > tbmiterator ) ;
node - > tbmres = tbmres = NULL ;
# ifdef USE_PREFETCH
if ( node - > prefetch_maximum > 0 )
{
node - > shared_prefetch_iterator =
tbm_attach_shared_iterate ( dsa , pstate - > prefetch_iterator ) ;
}
# endif /* USE_PREFETCH */
}
node - > initialized = true ;
}
for ( ; ; )
@ -130,7 +197,10 @@ BitmapHeapNext(BitmapHeapScanState *node)
*/
if ( tbmres = = NULL )
{
node - > tbmres = tbmres = tbm_iterate ( tbmiterator ) ;
if ( ! pstate )
node - > tbmres = tbmres = tbm_iterate ( tbmiterator ) ;
else
node - > tbmres = tbmres = tbm_shared_iterate ( shared_tbmiterator ) ;
if ( tbmres = = NULL )
{
/* no more entries in the bitmap */
@ -182,8 +252,19 @@ BitmapHeapNext(BitmapHeapScanState *node)
* Try to prefetch at least a few pages even before we get to the
* second page if we don ' t stop reading after the first tuple .
*/
if ( node - > prefetch_target < node - > prefetch_maximum )
node - > prefetch_target + + ;
if ( ! pstate )
{
if ( node - > prefetch_target < node - > prefetch_maximum )
node - > prefetch_target + + ;
}
else if ( pstate - > prefetch_target < node - > prefetch_maximum )
{
/* take spinlock while updating shared state */
SpinLockAcquire ( & pstate - > mutex ) ;
if ( pstate - > prefetch_target < node - > prefetch_maximum )
pstate - > prefetch_target + + ;
SpinLockRelease ( & pstate - > mutex ) ;
}
# endif /* USE_PREFETCH */
}
@ -361,6 +442,21 @@ bitgetpage(HeapScanDesc scan, TBMIterateResult *tbmres)
scan - > rs_ntuples = ntup ;
}
/*
* BitmapDoneInitializingSharedState - Shared state is initialized
*
* By this time the leader has already populated the TBM and initialized the
* shared state so wake up other processes .
*/
static inline void
BitmapDoneInitializingSharedState ( ParallelBitmapHeapState * pstate )
{
SpinLockAcquire ( & pstate - > mutex ) ;
pstate - > state = BM_FINISHED ;
SpinLockRelease ( & pstate - > mutex ) ;
ConditionVariableBroadcast ( & pstate - > cv ) ;
}
/*
* BitmapAdjustPrefetchIterator - Adjust the prefetch iterator
*/
@ -369,20 +465,53 @@ BitmapAdjustPrefetchIterator(BitmapHeapScanState *node,
TBMIterateResult * tbmres )
{
# ifdef USE_PREFETCH
TBMIterator * prefetch_iterator = node - > prefetch_iterator ;
ParallelBitmapHeapState * pstate = node - > pstate ;
if ( node - > prefetch_pages > 0 )
if ( pstate = = NULL )
{
/* The main iterator has closed the distance by one page */
node - > prefetch_pages - - ;
TBMIterator * prefetch_iterator = node - > prefetch_iterator ;
if ( node - > prefetch_pages > 0 )
{
/* The main iterator has closed the distance by one page */
node - > prefetch_pages - - ;
}
else if ( prefetch_iterator )
{
/* Do not let the prefetch iterator get behind the main one */
TBMIterateResult * tbmpre = tbm_iterate ( prefetch_iterator ) ;
if ( tbmpre = = NULL | | tbmpre - > blockno ! = tbmres - > blockno )
elog ( ERROR , " prefetch and main iterators are out of sync " ) ;
}
return ;
}
else if ( prefetch_iterator )
if ( node - > prefetch_maximum > 0 )
{
/* Do not let the prefetch iterator get behind the main one */
TBMIterateResult * tbmpre = tbm_iterate ( prefetch_iterator ) ;
TBMSharedIterator * prefetch_iterator = node - > shared_prefetch_iterator ;
SpinLockAcquire ( & pstate - > mutex ) ;
if ( pstate - > prefetch_pages > 0 )
{
node - > prefetch_pages - - ;
SpinLockRelease ( & pstate - > mutex ) ;
}
else
{
/* Release the mutex before iterating */
SpinLockRelease ( & pstate - > mutex ) ;
if ( tbmpre = = NULL | | tbmpre - > blockno ! = tbmres - > blockno )
elog ( ERROR , " prefetch and main iterators are out of sync " ) ;
/*
* In case of shared mode , we can not ensure that the current
* blockno of the main iterator and that of the prefetch iterator
* are same . It ' s possible that whatever blockno we are
* prefetching will be processed by another process . Therefore , we
* don ' t validate the blockno here as we do in non - parallel case .
*/
if ( prefetch_iterator )
tbm_shared_iterate ( prefetch_iterator ) ;
}
}
# endif /* USE_PREFETCH */
}
@ -399,14 +528,35 @@ static inline void
BitmapAdjustPrefetchTarget ( BitmapHeapScanState * node )
{
# ifdef USE_PREFETCH
if ( node - > prefetch_target > = node - > prefetch_maximum )
/* don't increase any further */ ;
else if ( node - > prefetch_target > = node - > prefetch_maximum / 2 )
node - > prefetch_target = node - > prefetch_maximum ;
else if ( node - > prefetch_target > 0 )
node - > prefetch_target * = 2 ;
else
node - > prefetch_target + + ;
ParallelBitmapHeapState * pstate = node - > pstate ;
if ( pstate = = NULL )
{
if ( node - > prefetch_target > = node - > prefetch_maximum )
/* don't increase any further */ ;
else if ( node - > prefetch_target > = node - > prefetch_maximum / 2 )
node - > prefetch_target = node - > prefetch_maximum ;
else if ( node - > prefetch_target > 0 )
node - > prefetch_target * = 2 ;
else
node - > prefetch_target + + ;
return ;
}
/* Do an unlocked check first to save spinlock acquisitions. */
if ( pstate - > prefetch_target < node - > prefetch_maximum )
{
SpinLockAcquire ( & pstate - > mutex ) ;
if ( pstate - > prefetch_target > = node - > prefetch_maximum )
/* don't increase any further */ ;
else if ( pstate - > prefetch_target > = node - > prefetch_maximum / 2 )
pstate - > prefetch_target = node - > prefetch_maximum ;
else if ( pstate - > prefetch_target > 0 )
pstate - > prefetch_target * = 2 ;
else
pstate - > prefetch_target + + ;
SpinLockRelease ( & pstate - > mutex ) ;
}
# endif /* USE_PREFETCH */
}
@ -417,23 +567,70 @@ static inline void
BitmapPrefetch ( BitmapHeapScanState * node , HeapScanDesc scan )
{
# ifdef USE_PREFETCH
TBMIterator * prefetch_iterator = node - > prefetch_iterator ;
ParallelBitmapHeapState * pstate = node - > pstate ;
if ( prefetch_iterator )
if ( pstate = = NULL )
{
while ( node - > prefetch_pages < node - > prefetch_target )
TBMIterator * prefetch_iterator = node - > prefetch_iterator ;
if ( prefetch_iterator )
{
TBMIterateResult * tbmpre = tbm_iterate ( prefetch_iterator ) ;
while ( node - > prefetch_pages < node - > prefetch_target )
{
TBMIterateResult * tbmpre = tbm_iterate ( prefetch_iterator ) ;
if ( tbmpre = = NULL )
{
/* No more pages to prefetch */
tbm_end_iterate ( prefetch_iterator ) ;
node - > prefetch_iterator = NULL ;
break ;
}
node - > prefetch_pages + + ;
PrefetchBuffer ( scan - > rs_rd , MAIN_FORKNUM , tbmpre - > blockno ) ;
}
}
return ;
}
if ( tbmpre = = NULL )
if ( pstate - > prefetch_pages < pstate - > prefetch_target )
{
TBMSharedIterator * prefetch_iterator = node - > shared_prefetch_iterator ;
if ( prefetch_iterator )
{
while ( 1 )
{
/* No more pages to prefetch */
tbm_end_iterate ( prefetch_iterator ) ;
node - > prefetch_iterator = NULL ;
break ;
TBMIterateResult * tbmpre ;
bool do_prefetch = false ;
/*
* Recheck under the mutex . If some other process has already
* done enough prefetching then we need not to do anything .
*/
SpinLockAcquire ( & pstate - > mutex ) ;
if ( pstate - > prefetch_pages < pstate - > prefetch_target )
{
pstate - > prefetch_pages + + ;
do_prefetch = true ;
}
SpinLockRelease ( & pstate - > mutex ) ;
if ( ! do_prefetch )
return ;
tbmpre = tbm_shared_iterate ( prefetch_iterator ) ;
if ( tbmpre = = NULL )
{
/* No more pages to prefetch */
tbm_end_shared_iterate ( prefetch_iterator ) ;
node - > shared_prefetch_iterator = NULL ;
break ;
}
PrefetchBuffer ( scan - > rs_rd , MAIN_FORKNUM , tbmpre - > blockno ) ;
}
node - > prefetch_pages + + ;
PrefetchBuffer ( scan - > rs_rd , MAIN_FORKNUM , tbmpre - > blockno ) ;
}
}
# endif /* USE_PREFETCH */
@ -488,12 +685,36 @@ ExecReScanBitmapHeapScan(BitmapHeapScanState *node)
tbm_end_iterate ( node - > tbmiterator ) ;
if ( node - > prefetch_iterator )
tbm_end_iterate ( node - > prefetch_iterator ) ;
if ( node - > shared_tbmiterator )
tbm_end_shared_iterate ( node - > shared_tbmiterator ) ;
if ( node - > shared_prefetch_iterator )
tbm_end_shared_iterate ( node - > shared_prefetch_iterator ) ;
if ( node - > tbm )
tbm_free ( node - > tbm ) ;
node - > tbm = NULL ;
node - > tbmiterator = NULL ;
node - > tbmres = NULL ;
node - > prefetch_iterator = NULL ;
node - > initialized = false ;
node - > shared_tbmiterator = NULL ;
node - > shared_prefetch_iterator = NULL ;
/* Reset parallel bitmap state, if present */
if ( node - > pstate )
{
dsa_area * dsa = node - > ss . ps . state - > es_query_dsa ;
node - > pstate - > state = BM_INITIAL ;
if ( DsaPointerIsValid ( node - > pstate - > tbmiterator ) )
tbm_free_shared_area ( dsa , node - > pstate - > tbmiterator ) ;
if ( DsaPointerIsValid ( node - > pstate - > prefetch_iterator ) )
tbm_free_shared_area ( dsa , node - > pstate - > prefetch_iterator ) ;
node - > pstate - > tbmiterator = InvalidDsaPointer ;
node - > pstate - > prefetch_iterator = InvalidDsaPointer ;
}
ExecScanReScan ( & node - > ss ) ;
@ -546,6 +767,10 @@ ExecEndBitmapHeapScan(BitmapHeapScanState *node)
tbm_end_iterate ( node - > prefetch_iterator ) ;
if ( node - > tbm )
tbm_free ( node - > tbm ) ;
if ( node - > shared_tbmiterator )
tbm_end_shared_iterate ( node - > shared_tbmiterator ) ;
if ( node - > shared_prefetch_iterator )
tbm_end_shared_iterate ( node - > shared_prefetch_iterator ) ;
/*
* close heap scan
@ -597,6 +822,10 @@ ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags)
scanstate - > prefetch_target = 0 ;
/* may be updated below */
scanstate - > prefetch_maximum = target_prefetch_pages ;
scanstate - > pscan_len = 0 ;
scanstate - > initialized = false ;
scanstate - > shared_tbmiterator = NULL ;
scanstate - > pstate = NULL ;
/*
* Miscellaneous initialization
@ -681,3 +910,108 @@ ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags)
*/
return scanstate ;
}
/*----------------
* BitmapShouldInitializeSharedState
*
* The first process to come here and see the state to the BM_INITIAL
* will become the leader for the parallel bitmap scan and will be
* responsible for populating the TIDBitmap . The other processes will
* be blocked by the condition variable until the leader wakes them up .
* - - - - - - - - - - - - - - -
*/
static bool
BitmapShouldInitializeSharedState ( ParallelBitmapHeapState * pstate )
{
SharedBitmapState state ;
while ( 1 )
{
SpinLockAcquire ( & pstate - > mutex ) ;
state = pstate - > state ;
if ( pstate - > state = = BM_INITIAL )
pstate - > state = BM_INPROGRESS ;
SpinLockRelease ( & pstate - > mutex ) ;
/* Exit if bitmap is done, or if we're the leader. */
if ( state ! = BM_INPROGRESS )
break ;
/* Wait for the leader to wake us up. */
ConditionVariableSleep ( & pstate - > cv , WAIT_EVENT_PARALLEL_BITMAP_SCAN ) ;
}
ConditionVariableCancelSleep ( ) ;
return ( state = = BM_INITIAL ) ;
}
/* ----------------------------------------------------------------
* ExecBitmapHeapEstimate
*
* estimates the space required to serialize bitmap scan node .
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
void
ExecBitmapHeapEstimate ( BitmapHeapScanState * node ,
ParallelContext * pcxt )
{
EState * estate = node - > ss . ps . state ;
node - > pscan_len = add_size ( offsetof ( ParallelBitmapHeapState ,
phs_snapshot_data ) ,
EstimateSnapshotSpace ( estate - > es_snapshot ) ) ;
shm_toc_estimate_chunk ( & pcxt - > estimator , node - > pscan_len ) ;
shm_toc_estimate_keys ( & pcxt - > estimator , 1 ) ;
}
/* ----------------------------------------------------------------
* ExecBitmapHeapInitializeDSM
*
* Set up a parallel bitmap heap scan descriptor .
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
void
ExecBitmapHeapInitializeDSM ( BitmapHeapScanState * node ,
ParallelContext * pcxt )
{
ParallelBitmapHeapState * pstate ;
EState * estate = node - > ss . ps . state ;
pstate = shm_toc_allocate ( pcxt - > toc , node - > pscan_len ) ;
pstate - > tbmiterator = 0 ;
pstate - > prefetch_iterator = 0 ;
/* Initialize the mutex */
SpinLockInit ( & pstate - > mutex ) ;
pstate - > prefetch_pages = 0 ;
pstate - > prefetch_target = 0 ;
pstate - > state = BM_INITIAL ;
ConditionVariableInit ( & pstate - > cv ) ;
SerializeSnapshot ( estate - > es_snapshot , pstate - > phs_snapshot_data ) ;
shm_toc_insert ( pcxt - > toc , node - > ss . ps . plan - > plan_node_id , pstate ) ;
node - > pstate = pstate ;
}
/* ----------------------------------------------------------------
* ExecBitmapHeapInitializeWorker
*
* Copy relevant information from TOC into planstate .
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
void
ExecBitmapHeapInitializeWorker ( BitmapHeapScanState * node , shm_toc * toc )
{
ParallelBitmapHeapState * pstate ;
Snapshot snapshot ;
pstate = shm_toc_lookup ( toc , node - > ss . ps . plan - > plan_node_id ) ;
node - > pstate = pstate ;
snapshot = RestoreSnapshot ( pstate - > phs_snapshot_data ) ;
heap_update_snapshot ( node - > ss . ss_currentScanDesc , snapshot ) ;
}