@ -10,18 +10,112 @@
* IDENTIFICATION
* src / backend / executor / nodeHashjoin . c
*
* PARALLELISM
*
* Hash joins can participate in parallel query execution in several ways . A
* parallel - oblivious hash join is one where the node is unaware that it is
* part of a parallel plan . In this case , a copy of the inner plan is used to
* build a copy of the hash table in every backend , and the outer plan could
* either be built from a partial or complete path , so that the results of the
* hash join are correspondingly either partial or complete . A parallel - aware
* hash join is one that behaves differently , coordinating work between
* backends , and appears as Parallel Hash Join in EXPLAIN output . A Parallel
* Hash Join always appears with a Parallel Hash node .
*
* Parallel - aware hash joins use the same per - backend state machine to track
* progress through the hash join algorithm as parallel - oblivious hash joins .
* In a parallel - aware hash join , there is also a shared state machine that
* co - operating backends use to synchronize their local state machines and
* program counters . The shared state machine is managed with a Barrier IPC
* primitive . When all attached participants arrive at a barrier , the phase
* advances and all waiting participants are released .
*
* When a participant begins working on a parallel hash join , it must first
* figure out how much progress has already been made , because participants
* don ' t wait for each other to begin . For this reason there are switch
* statements at key points in the code where we have to synchronize our local
* state machine with the phase , and then jump to the correct part of the
* algorithm so that we can get started .
*
* One barrier called build_barrier is used to coordinate the hashing phases .
* The phase is represented by an integer which begins at zero and increments
* one by one , but in the code it is referred to by symbolic names as follows :
*
* PHJ_BUILD_ELECTING - - initial state
* PHJ_BUILD_ALLOCATING - - one sets up the batches and table 0
* PHJ_BUILD_HASHING_INNER - - all hash the inner rel
* PHJ_BUILD_HASHING_OUTER - - ( multi - batch only ) all hash the outer
* PHJ_BUILD_DONE - - building done , probing can begin
*
* While in the phase PHJ_BUILD_HASHING_INNER a separate pair of barriers may
* be used repeatedly as required to coordinate expansions in the number of
* batches or buckets . Their phases are as follows :
*
* PHJ_GROW_BATCHES_ELECTING - - initial state
* PHJ_GROW_BATCHES_ALLOCATING - - one allocates new batches
* PHJ_GROW_BATCHES_REPARTITIONING - - all repartition
* PHJ_GROW_BATCHES_FINISHING - - one cleans up , detects skew
*
* PHJ_GROW_BUCKETS_ELECTING - - initial state
* PHJ_GROW_BUCKETS_ALLOCATING - - one allocates new buckets
* PHJ_GROW_BUCKETS_REINSERTING - - all insert tuples
*
* If the planner got the number of batches and buckets right , those won ' t be
* necessary , but on the other hand we might finish up needing to expand the
* buckets or batches multiple times while hashing the inner relation to stay
* within our memory budget and load factor target . For that reason it ' s a
* separate pair of barriers using circular phases .
*
* The PHJ_BUILD_HASHING_OUTER phase is required only for multi - batch joins ,
* because we need to divide the outer relation into batches up front in order
* to be able to process batches entirely independently . In contrast , the
* parallel - oblivious algorithm simply throws tuples ' forward ' to ' later '
* batches whenever it encounters them while scanning and probing , which it
* can do because it processes batches in serial order .
*
* Once PHJ_BUILD_DONE is reached , backends then split up and process
* different batches , or gang up and work together on probing batches if there
* aren ' t enough to go around . For each batch there is a separate barrier
* with the following phases :
*
* PHJ_BATCH_ELECTING - - initial state
* PHJ_BATCH_ALLOCATING - - one allocates buckets
* PHJ_BATCH_LOADING - - all load the hash table from disk
* PHJ_BATCH_PROBING - - all probe
* PHJ_BATCH_DONE - - end
*
* Batch 0 is a special case , because it starts out in phase
* PHJ_BATCH_PROBING ; populating batch 0 ' s hash table is done during
* PHJ_BUILD_HASHING_INNER so we can skip loading .
*
* Initially we try to plan for a single - batch hash join using the combined
* work_mem of all participants to create a large shared hash table . If that
* turns out either at planning or execution time to be impossible then we
* fall back to regular work_mem sized hash tables .
*
* To avoid deadlocks , we never wait for any barrier unless it is known that
* all other backends attached to it are actively executing the node or have
* already arrived . Practically , that means that we never return a tuple
* while attached to a barrier , unless the barrier has reached its final
* state . In the slightly special case of the per - batch barrier , we return
* tuples while in PHJ_BATCH_PROBING phase , but that ' s OK because we use
* BarrierArriveAndDetach ( ) to advance it to PHJ_BATCH_DONE without waiting .
*
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
# include "postgres.h"
# include "access/htup_details.h"
# include "access/parallel.h"
# include "executor/executor.h"
# include "executor/hashjoin.h"
# include "executor/nodeHash.h"
# include "executor/nodeHashjoin.h"
# include "miscadmin.h"
# include "pgstat.h"
# include "utils/memutils.h"
# include "utils/sharedtuplestore.h"
/*
@ -42,24 +136,34 @@
static TupleTableSlot * ExecHashJoinOuterGetTuple ( PlanState * outerNode ,
HashJoinState * hjstate ,
uint32 * hashvalue ) ;
static TupleTableSlot * ExecParallelHashJoinOuterGetTuple ( PlanState * outerNode ,
HashJoinState * hjstate ,
uint32 * hashvalue ) ;
static TupleTableSlot * ExecHashJoinGetSavedTuple ( HashJoinState * hjstate ,
BufFile * file ,
uint32 * hashvalue ,
TupleTableSlot * tupleSlot ) ;
static bool ExecHashJoinNewBatch ( HashJoinState * hjstate ) ;
static bool ExecParallelHashJoinNewBatch ( HashJoinState * hjstate ) ;
static void ExecParallelHashJoinPartitionOuter ( HashJoinState * node ) ;
/* ----------------------------------------------------------------
* ExecHashJoin
* ExecHashJoinImpl
*
* This function implements the Hybrid Hashjoin algorithm .
* This function implements the Hybrid Hashjoin algorithm . It is marked
* with an always - inline attribute so that ExecHashJoin ( ) and
* ExecParallelHashJoin ( ) can inline it . Compilers that respect the
* attribute should create versions specialized for parallel = = true and
* parallel = = false with unnecessary branches removed .
*
* Note : the relation we build hash table on is the " inner "
* the other one is " outer " .
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
static TupleTableSlot * /* return: a tuple or NULL */
ExecHashJoin ( PlanState * pstate )
pg_attribute_always_inline
static inline TupleTableSlot *
ExecHashJoinImpl ( PlanState * pstate , bool parallel )
{
HashJoinState * node = castNode ( HashJoinState , pstate ) ;
PlanState * outerNode ;
@ -71,6 +175,7 @@ ExecHashJoin(PlanState *pstate)
TupleTableSlot * outerTupleSlot ;
uint32 hashvalue ;
int batchno ;
ParallelHashJoinState * parallel_state ;
/*
* get information from HashJoin node
@ -81,6 +186,7 @@ ExecHashJoin(PlanState *pstate)
outerNode = outerPlanState ( node ) ;
hashtable = node - > hj_HashTable ;
econtext = node - > js . ps . ps_ExprContext ;
parallel_state = hashNode - > parallel_state ;
/*
* Reset per - tuple memory context to free any expression evaluation
@ -138,6 +244,18 @@ ExecHashJoin(PlanState *pstate)
/* no chance to not build the hash table */
node - > hj_FirstOuterTupleSlot = NULL ;
}
else if ( parallel )
{
/*
* The empty - outer optimization is not implemented for
* shared hash tables , because no one participant can
* determine that there are no outer tuples , and it ' s not
* yet clear that it ' s worth the synchronization overhead
* of reaching consensus to figure that out . So we have
* to build the hash table .
*/
node - > hj_FirstOuterTupleSlot = NULL ;
}
else if ( HJ_FILL_OUTER ( node ) | |
( outerNode - > plan - > startup_cost < hashNode - > ps . plan - > total_cost & &
! node - > hj_OuterNotEmpty ) )
@ -155,15 +273,19 @@ ExecHashJoin(PlanState *pstate)
node - > hj_FirstOuterTupleSlot = NULL ;
/*
* create the hash table
* Create the hash table . If using Parallel Hash , then
* whoever gets here first will create the hash table and any
* later arrivals will merely attach to it .
*/
hashtable = ExecHashTableCreate ( ( Hash * ) hashNode - > ps . plan ,
hashtable = ExecHashTableCreate ( hashNode ,
node - > hj_HashOperators ,
HJ_FILL_INNER ( node ) ) ;
node - > hj_HashTable = hashtable ;
/*
* execute the Hash node , to build the hash table
* Execute the Hash node , to build the hash table . If using
* Parallel Hash , then we ' ll try to help hashing unless we
* arrived too late .
*/
hashNode - > hashtable = hashtable ;
( void ) MultiExecProcNode ( ( PlanState * ) hashNode ) ;
@ -189,7 +311,34 @@ ExecHashJoin(PlanState *pstate)
*/
node - > hj_OuterNotEmpty = false ;
node - > hj_JoinState = HJ_NEED_NEW_OUTER ;
if ( parallel )
{
Barrier * build_barrier ;
build_barrier = & parallel_state - > build_barrier ;
Assert ( BarrierPhase ( build_barrier ) = = PHJ_BUILD_HASHING_OUTER | |
BarrierPhase ( build_barrier ) = = PHJ_BUILD_DONE ) ;
if ( BarrierPhase ( build_barrier ) = = PHJ_BUILD_HASHING_OUTER )
{
/*
* If multi - batch , we need to hash the outer relation
* up front .
*/
if ( hashtable - > nbatch > 1 )
ExecParallelHashJoinPartitionOuter ( node ) ;
BarrierArriveAndWait ( build_barrier ,
WAIT_EVENT_HASH_BUILD_HASHING_OUTER ) ;
}
Assert ( BarrierPhase ( build_barrier ) = = PHJ_BUILD_DONE ) ;
/* Each backend should now select a batch to work on. */
hashtable - > curbatch = - 1 ;
node - > hj_JoinState = HJ_NEED_NEW_BATCH ;
continue ;
}
else
node - > hj_JoinState = HJ_NEED_NEW_OUTER ;
/* FALL THRU */
@ -198,9 +347,14 @@ ExecHashJoin(PlanState *pstate)
/*
* We don ' t have an outer tuple , try to get the next one
*/
outerTupleSlot = ExecHashJoinOuterGetTuple ( outerNode ,
node ,
& hashvalue ) ;
if ( parallel )
outerTupleSlot =
ExecParallelHashJoinOuterGetTuple ( outerNode , node ,
& hashvalue ) ;
else
outerTupleSlot =
ExecHashJoinOuterGetTuple ( outerNode , node , & hashvalue ) ;
if ( TupIsNull ( outerTupleSlot ) )
{
/* end of batch, or maybe whole join */
@ -240,10 +394,12 @@ ExecHashJoin(PlanState *pstate)
* Need to postpone this outer tuple to a later batch .
* Save it in the corresponding outer - batch file .
*/
Assert ( parallel_state = = NULL ) ;
Assert ( batchno > hashtable - > curbatch ) ;
ExecHashJoinSaveTuple ( ExecFetchSlotMinimalTuple ( outerTupleSlot ) ,
hashvalue ,
& hashtable - > outerBatchFile [ batchno ] ) ;
/* Loop around, staying in HJ_NEED_NEW_OUTER state */
continue ;
}
@ -258,11 +414,23 @@ ExecHashJoin(PlanState *pstate)
/*
* Scan the selected hash bucket for matches to current outer
*/
if ( ! ExecScanHashBucket ( node , econtext ) )
if ( parallel )
{
/* out of matches; check for possible outer-join fill */
node - > hj_JoinState = HJ_FILL_OUTER_TUPLE ;
continue ;
if ( ! ExecParallelScanHashBucket ( node , econtext ) )
{
/* out of matches; check for possible outer-join fill */
node - > hj_JoinState = HJ_FILL_OUTER_TUPLE ;
continue ;
}
}
else
{
if ( ! ExecScanHashBucket ( node , econtext ) )
{
/* out of matches; check for possible outer-join fill */
node - > hj_JoinState = HJ_FILL_OUTER_TUPLE ;
continue ;
}
}
/*
@ -362,8 +530,16 @@ ExecHashJoin(PlanState *pstate)
/*
* Try to advance to next batch . Done if there are no more .
*/
if ( ! ExecHashJoinNewBatch ( node ) )
return NULL ; /* end of join */
if ( parallel )
{
if ( ! ExecParallelHashJoinNewBatch ( node ) )
return NULL ; /* end of parallel-aware join */
}
else
{
if ( ! ExecHashJoinNewBatch ( node ) )
return NULL ; /* end of parallel-oblivious join */
}
node - > hj_JoinState = HJ_NEED_NEW_OUTER ;
break ;
@ -374,6 +550,38 @@ ExecHashJoin(PlanState *pstate)
}
}
/* ----------------------------------------------------------------
* ExecHashJoin
*
* Parallel - oblivious version .
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
static TupleTableSlot * /* return: a tuple or NULL */
ExecHashJoin ( PlanState * pstate )
{
/*
* On sufficiently smart compilers this should be inlined with the
* parallel - aware branches removed .
*/
return ExecHashJoinImpl ( pstate , false ) ;
}
/* ----------------------------------------------------------------
* ExecParallelHashJoin
*
* Parallel - aware version .
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
static TupleTableSlot * /* return: a tuple or NULL */
ExecParallelHashJoin ( PlanState * pstate )
{
/*
* On sufficiently smart compilers this should be inlined with the
* parallel - oblivious branches removed .
*/
return ExecHashJoinImpl ( pstate , true ) ;
}
/* ----------------------------------------------------------------
* ExecInitHashJoin
*
@ -400,6 +608,12 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
hjstate = makeNode ( HashJoinState ) ;
hjstate - > js . ps . plan = ( Plan * ) node ;
hjstate - > js . ps . state = estate ;
/*
* See ExecHashJoinInitializeDSM ( ) and ExecHashJoinInitializeWorker ( )
* where this function may be replaced with a parallel version , if we
* managed to launch a parallel query .
*/
hjstate - > js . ps . ExecProcNode = ExecHashJoin ;
/*
@ -581,9 +795,9 @@ ExecEndHashJoin(HashJoinState *node)
/*
* ExecHashJoinOuterGetTuple
*
* get the next outer tuple for hashjoin : either by
* executing the outer plan node in the first pass , or from
* the temp files for the hashjoin batches .
* get the next outer tuple for a parallel oblivious hashjoin : either by
* executing the outer plan node in the first pass , or from the temp
* files for the hashjoin batches .
*
* Returns a null slot if no more outer tuples ( within the current batch ) .
*
@ -661,6 +875,67 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode,
return NULL ;
}
/*
* ExecHashJoinOuterGetTuple variant for the parallel case .
*/
static TupleTableSlot *
ExecParallelHashJoinOuterGetTuple ( PlanState * outerNode ,
HashJoinState * hjstate ,
uint32 * hashvalue )
{
HashJoinTable hashtable = hjstate - > hj_HashTable ;
int curbatch = hashtable - > curbatch ;
TupleTableSlot * slot ;
/*
* In the Parallel Hash case we only run the outer plan directly for
* single - batch hash joins . Otherwise we have to go to batch files , even
* for batch 0.
*/
if ( curbatch = = 0 & & hashtable - > nbatch = = 1 )
{
slot = ExecProcNode ( outerNode ) ;
while ( ! TupIsNull ( slot ) )
{
ExprContext * econtext = hjstate - > js . ps . ps_ExprContext ;
econtext - > ecxt_outertuple = slot ;
if ( ExecHashGetHashValue ( hashtable , econtext ,
hjstate - > hj_OuterHashKeys ,
true , /* outer tuple */
HJ_FILL_OUTER ( hjstate ) ,
hashvalue ) )
return slot ;
/*
* That tuple couldn ' t match because of a NULL , so discard it and
* continue with the next one .
*/
slot = ExecProcNode ( outerNode ) ;
}
}
else if ( curbatch < hashtable - > nbatch )
{
MinimalTuple tuple ;
tuple = sts_parallel_scan_next ( hashtable - > batches [ curbatch ] . outer_tuples ,
hashvalue ) ;
if ( tuple ! = NULL )
{
slot = ExecStoreMinimalTuple ( tuple ,
hjstate - > hj_OuterTupleSlot ,
false ) ;
return slot ;
}
else
ExecClearTuple ( hjstate - > hj_OuterTupleSlot ) ;
}
/* End of this batch */
return NULL ;
}
/*
* ExecHashJoinNewBatch
* switch to a new hashjoin batch
@ -803,6 +1078,135 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
return true ;
}
/*
* Choose a batch to work on , and attach to it . Returns true if successful ,
* false if there are no more batches .
*/
static bool
ExecParallelHashJoinNewBatch ( HashJoinState * hjstate )
{
HashJoinTable hashtable = hjstate - > hj_HashTable ;
int start_batchno ;
int batchno ;
/*
* If we started up so late that the batch tracking array has been freed
* already by ExecHashTableDetach ( ) , then we are finished . See also
* ExecParallelHashEnsureBatchAccessors ( ) .
*/
if ( hashtable - > batches = = NULL )
return false ;
/*
* If we were already attached to a batch , remember not to bother checking
* it again , and detach from it ( possibly freeing the hash table if we are
* last to detach ) .
*/
if ( hashtable - > curbatch > = 0 )
{
hashtable - > batches [ hashtable - > curbatch ] . done = true ;
ExecHashTableDetachBatch ( hashtable ) ;
}
/*
* Search for a batch that isn ' t done . We use an atomic counter to start
* our search at a different batch in every participant when there are
* more batches than participants .
*/
batchno = start_batchno =
pg_atomic_fetch_add_u32 ( & hashtable - > parallel_state - > distributor , 1 ) %
hashtable - > nbatch ;
do
{
uint32 hashvalue ;
MinimalTuple tuple ;
TupleTableSlot * slot ;
if ( ! hashtable - > batches [ batchno ] . done )
{
SharedTuplestoreAccessor * inner_tuples ;
Barrier * batch_barrier =
& hashtable - > batches [ batchno ] . shared - > batch_barrier ;
switch ( BarrierAttach ( batch_barrier ) )
{
case PHJ_BATCH_ELECTING :
/* One backend allocates the hash table. */
if ( BarrierArriveAndWait ( batch_barrier ,
WAIT_EVENT_HASH_BATCH_ELECTING ) )
ExecParallelHashTableAlloc ( hashtable , batchno ) ;
/* Fall through. */
case PHJ_BATCH_ALLOCATING :
/* Wait for allocation to complete. */
BarrierArriveAndWait ( batch_barrier ,
WAIT_EVENT_HASH_BATCH_ALLOCATING ) ;
/* Fall through. */
case PHJ_BATCH_LOADING :
/* Start (or join in) loading tuples. */
ExecParallelHashTableSetCurrentBatch ( hashtable , batchno ) ;
inner_tuples = hashtable - > batches [ batchno ] . inner_tuples ;
sts_begin_parallel_scan ( inner_tuples ) ;
while ( ( tuple = sts_parallel_scan_next ( inner_tuples ,
& hashvalue ) ) )
{
slot = ExecStoreMinimalTuple ( tuple ,
hjstate - > hj_HashTupleSlot ,
false ) ;
ExecParallelHashTableInsertCurrentBatch ( hashtable , slot ,
hashvalue ) ;
}
sts_end_parallel_scan ( inner_tuples ) ;
BarrierArriveAndWait ( batch_barrier ,
WAIT_EVENT_HASH_BATCH_LOADING ) ;
/* Fall through. */
case PHJ_BATCH_PROBING :
/*
* This batch is ready to probe . Return control to
* caller . We stay attached to batch_barrier so that the
* hash table stays alive until everyone ' s finished
* probing it , but no participant is allowed to wait at
* this barrier again ( or else a deadlock could occur ) .
* All attached participants must eventually call
* BarrierArriveAndDetach ( ) so that the final phase
* PHJ_BATCH_DONE can be reached .
*/
ExecParallelHashTableSetCurrentBatch ( hashtable , batchno ) ;
sts_begin_parallel_scan ( hashtable - > batches [ batchno ] . outer_tuples ) ;
return true ;
case PHJ_BATCH_DONE :
/*
* Already done . Detach and go around again ( if any
* remain ) .
*/
BarrierDetach ( batch_barrier ) ;
/*
* We didn ' t work on this batch , but we need to observe
* its size for EXPLAIN .
*/
ExecParallelHashUpdateSpacePeak ( hashtable , batchno ) ;
hashtable - > batches [ batchno ] . done = true ;
hashtable - > curbatch = - 1 ;
break ;
default :
elog ( ERROR , " unexpected batch phase %d " ,
BarrierPhase ( batch_barrier ) ) ;
}
}
batchno = ( batchno + 1 ) % hashtable - > nbatch ;
} while ( batchno ! = start_batchno ) ;
return false ;
}
/*
* ExecHashJoinSaveTuple
* save a tuple to a batch file .
@ -964,3 +1368,176 @@ ExecReScanHashJoin(HashJoinState *node)
if ( node - > js . ps . lefttree - > chgParam = = NULL )
ExecReScan ( node - > js . ps . lefttree ) ;
}
void
ExecShutdownHashJoin ( HashJoinState * node )
{
if ( node - > hj_HashTable )
{
/*
* Detach from shared state before DSM memory goes away . This makes
* sure that we don ' t have any pointers into DSM memory by the time
* ExecEndHashJoin runs .
*/
ExecHashTableDetachBatch ( node - > hj_HashTable ) ;
ExecHashTableDetach ( node - > hj_HashTable ) ;
}
}
static void
ExecParallelHashJoinPartitionOuter ( HashJoinState * hjstate )
{
PlanState * outerState = outerPlanState ( hjstate ) ;
ExprContext * econtext = hjstate - > js . ps . ps_ExprContext ;
HashJoinTable hashtable = hjstate - > hj_HashTable ;
TupleTableSlot * slot ;
uint32 hashvalue ;
int i ;
Assert ( hjstate - > hj_FirstOuterTupleSlot = = NULL ) ;
/* Execute outer plan, writing all tuples to shared tuplestores. */
for ( ; ; )
{
slot = ExecProcNode ( outerState ) ;
if ( TupIsNull ( slot ) )
break ;
econtext - > ecxt_outertuple = slot ;
if ( ExecHashGetHashValue ( hashtable , econtext ,
hjstate - > hj_OuterHashKeys ,
true , /* outer tuple */
false , /* outer join, currently unsupported */
& hashvalue ) )
{
int batchno ;
int bucketno ;
ExecHashGetBucketAndBatch ( hashtable , hashvalue , & bucketno ,
& batchno ) ;
sts_puttuple ( hashtable - > batches [ batchno ] . outer_tuples ,
& hashvalue , ExecFetchSlotMinimalTuple ( slot ) ) ;
}
CHECK_FOR_INTERRUPTS ( ) ;
}
/* Make sure all outer partitions are readable by any backend. */
for ( i = 0 ; i < hashtable - > nbatch ; + + i )
sts_end_write ( hashtable - > batches [ i ] . outer_tuples ) ;
}
void
ExecHashJoinEstimate ( HashJoinState * state , ParallelContext * pcxt )
{
shm_toc_estimate_chunk ( & pcxt - > estimator , sizeof ( ParallelHashJoinState ) ) ;
shm_toc_estimate_keys ( & pcxt - > estimator , 1 ) ;
}
void
ExecHashJoinInitializeDSM ( HashJoinState * state , ParallelContext * pcxt )
{
int plan_node_id = state - > js . ps . plan - > plan_node_id ;
HashState * hashNode ;
ParallelHashJoinState * pstate ;
/*
* Disable shared hash table mode if we failed to create a real DSM
* segment , because that means that we don ' t have a DSA area to work with .
*/
if ( pcxt - > seg = = NULL )
return ;
ExecSetExecProcNode ( & state - > js . ps , ExecParallelHashJoin ) ;
/*
* Set up the state needed to coordinate access to the shared hash
* table ( s ) , using the plan node ID as the toc key .
*/
pstate = shm_toc_allocate ( pcxt - > toc , sizeof ( ParallelHashJoinState ) ) ;
shm_toc_insert ( pcxt - > toc , plan_node_id , pstate ) ;
/*
* Set up the shared hash join state with no batches initially .
* ExecHashTableCreate ( ) will prepare at least one later and set nbatch
* and space_allowed .
*/
pstate - > nbatch = 0 ;
pstate - > space_allowed = 0 ;
pstate - > batches = InvalidDsaPointer ;
pstate - > old_batches = InvalidDsaPointer ;
pstate - > nbuckets = 0 ;
pstate - > growth = PHJ_GROWTH_OK ;
pstate - > chunk_work_queue = InvalidDsaPointer ;
pg_atomic_init_u32 ( & pstate - > distributor , 0 ) ;
pstate - > nparticipants = pcxt - > nworkers + 1 ;
pstate - > total_tuples = 0 ;
LWLockInitialize ( & pstate - > lock ,
LWTRANCHE_PARALLEL_HASH_JOIN ) ;
BarrierInit ( & pstate - > build_barrier , 0 ) ;
BarrierInit ( & pstate - > grow_batches_barrier , 0 ) ;
BarrierInit ( & pstate - > grow_buckets_barrier , 0 ) ;
/* Set up the space we'll use for shared temporary files. */
SharedFileSetInit ( & pstate - > fileset , pcxt - > seg ) ;
/* Initialize the shared state in the hash node. */
hashNode = ( HashState * ) innerPlanState ( state ) ;
hashNode - > parallel_state = pstate ;
}
/* ----------------------------------------------------------------
* ExecHashJoinReInitializeDSM
*
* Reset shared state before beginning a fresh scan .
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
void
ExecHashJoinReInitializeDSM ( HashJoinState * state , ParallelContext * cxt )
{
int plan_node_id = state - > js . ps . plan - > plan_node_id ;
ParallelHashJoinState * pstate =
shm_toc_lookup ( cxt - > toc , plan_node_id , false ) ;
/*
* It would be possible to reuse the shared hash table in single - batch
* cases by resetting and then fast - forwarding build_barrier to
* PHJ_BUILD_DONE and batch 0 ' s batch_barrier to PHJ_BATCH_PROBING , but
* currently shared hash tables are already freed by now ( by the last
* participant to detach from the batch ) . We could consider keeping it
* around for single - batch joins . We ' d also need to adjust
* finalize_plan ( ) so that it doesn ' t record a dummy dependency for
* Parallel Hash nodes , preventing the rescan optimization . For now we
* don ' t try .
*/
/* Detach, freeing any remaining shared memory. */
if ( state - > hj_HashTable ! = NULL )
{
ExecHashTableDetachBatch ( state - > hj_HashTable ) ;
ExecHashTableDetach ( state - > hj_HashTable ) ;
}
/* Clear any shared batch files. */
SharedFileSetDeleteAll ( & pstate - > fileset ) ;
/* Reset build_barrier to PHJ_BUILD_ELECTING so we can go around again. */
BarrierInit ( & pstate - > build_barrier , 0 ) ;
}
void
ExecHashJoinInitializeWorker ( HashJoinState * state ,
ParallelWorkerContext * pwcxt )
{
HashState * hashNode ;
int plan_node_id = state - > js . ps . plan - > plan_node_id ;
ParallelHashJoinState * pstate =
shm_toc_lookup ( pwcxt - > toc , plan_node_id , false ) ;
/* Attach to the space for shared temporary files. */
SharedFileSetAttach ( & pstate - > fileset , pwcxt - > seg ) ;
/* Attach to the shared state in the hash node. */
hashNode = ( HashState * ) innerPlanState ( state ) ;
hashNode - > parallel_state = pstate ;
ExecSetExecProcNode ( & state - > js . ps , ExecParallelHashJoin ) ;
}