@ -2071,6 +2071,69 @@ ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
hjstate - > hj_CurTuple = NULL ;
}
/*
* Decide if this process is allowed to run the unmatched scan . If so , the
* batch barrier is advanced to PHJ_BATCH_SCAN and true is returned .
* Otherwise the batch is detached and false is returned .
*/
bool
ExecParallelPrepHashTableForUnmatched ( HashJoinState * hjstate )
{
HashJoinTable hashtable = hjstate - > hj_HashTable ;
int curbatch = hashtable - > curbatch ;
ParallelHashJoinBatch * batch = hashtable - > batches [ curbatch ] . shared ;
Assert ( BarrierPhase ( & batch - > batch_barrier ) = = PHJ_BATCH_PROBE ) ;
/*
* It would not be deadlock - free to wait on the batch barrier , because it
* is in PHJ_BATCH_PROBE phase , and thus processes attached to it have
* already emitted tuples . Therefore , we ' ll hold a wait - free election :
* only one process can continue to the next phase , and all others detach
* from this batch . They can still go any work on other batches , if there
* are any .
*/
if ( ! BarrierArriveAndDetachExceptLast ( & batch - > batch_barrier ) )
{
/* This process considers the batch to be done. */
hashtable - > batches [ hashtable - > curbatch ] . done = true ;
/* Make sure any temporary files are closed. */
sts_end_parallel_scan ( hashtable - > batches [ curbatch ] . inner_tuples ) ;
sts_end_parallel_scan ( hashtable - > batches [ curbatch ] . outer_tuples ) ;
/*
* Track largest batch we ' ve seen , which would normally happen in
* ExecHashTableDetachBatch ( ) .
*/
hashtable - > spacePeak =
Max ( hashtable - > spacePeak ,
batch - > size + sizeof ( dsa_pointer_atomic ) * hashtable - > nbuckets ) ;
hashtable - > curbatch = - 1 ;
return false ;
}
/* Now we are alone with this batch. */
Assert ( BarrierPhase ( & batch - > batch_barrier ) = = PHJ_BATCH_SCAN ) ;
Assert ( BarrierParticipants ( & batch - > batch_barrier ) = = 1 ) ;
/*
* Has another process decided to give up early and command all processes
* to skip the unmatched scan ?
*/
if ( batch - > skip_unmatched )
{
hashtable - > batches [ hashtable - > curbatch ] . done = true ;
ExecHashTableDetachBatch ( hashtable ) ;
return false ;
}
/* Now prepare the process local state, just as for non-parallel join. */
ExecPrepHashTableForUnmatched ( hjstate ) ;
return true ;
}
/*
* ExecScanHashTableForUnmatched
* scan the hash table for unmatched inner tuples
@ -2145,6 +2208,72 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
return false ;
}
/*
* ExecParallelScanHashTableForUnmatched
* scan the hash table for unmatched inner tuples , in parallel join
*
* On success , the inner tuple is stored into hjstate - > hj_CurTuple and
* econtext - > ecxt_innertuple , using hjstate - > hj_HashTupleSlot as the slot
* for the latter .
*/
bool
ExecParallelScanHashTableForUnmatched ( HashJoinState * hjstate ,
ExprContext * econtext )
{
HashJoinTable hashtable = hjstate - > hj_HashTable ;
HashJoinTuple hashTuple = hjstate - > hj_CurTuple ;
for ( ; ; )
{
/*
* hj_CurTuple is the address of the tuple last returned from the
* current bucket , or NULL if it ' s time to start scanning a new
* bucket .
*/
if ( hashTuple ! = NULL )
hashTuple = ExecParallelHashNextTuple ( hashtable , hashTuple ) ;
else if ( hjstate - > hj_CurBucketNo < hashtable - > nbuckets )
hashTuple = ExecParallelHashFirstTuple ( hashtable ,
hjstate - > hj_CurBucketNo + + ) ;
else
break ; /* finished all buckets */
while ( hashTuple ! = NULL )
{
if ( ! HeapTupleHeaderHasMatch ( HJTUPLE_MINTUPLE ( hashTuple ) ) )
{
TupleTableSlot * inntuple ;
/* insert hashtable's tuple into exec slot */
inntuple = ExecStoreMinimalTuple ( HJTUPLE_MINTUPLE ( hashTuple ) ,
hjstate - > hj_HashTupleSlot ,
false ) ; /* do not pfree */
econtext - > ecxt_innertuple = inntuple ;
/*
* Reset temp memory each time ; although this function doesn ' t
* do any qual eval , the caller will , so let ' s keep it
* parallel to ExecScanHashBucket .
*/
ResetExprContext ( econtext ) ;
hjstate - > hj_CurTuple = hashTuple ;
return true ;
}
hashTuple = ExecParallelHashNextTuple ( hashtable , hashTuple ) ;
}
/* allow this loop to be cancellable */
CHECK_FOR_INTERRUPTS ( ) ;
}
/*
* no more unmatched tuples
*/
return false ;
}
/*
* ExecHashTableReset
*
@ -3088,6 +3217,7 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
accessor - > shared = shared ;
accessor - > preallocated = 0 ;
accessor - > done = false ;
accessor - > outer_eof = false ;
accessor - > inner_tuples =
sts_attach ( ParallelHashJoinBatchInner ( shared ) ,
ParallelWorkerNumber + 1 ,
@ -3133,18 +3263,53 @@ ExecHashTableDetachBatch(HashJoinTable hashtable)
{
int curbatch = hashtable - > curbatch ;
ParallelHashJoinBatch * batch = hashtable - > batches [ curbatch ] . shared ;
bool attached = true ;
/* Make sure any temporary files are closed. */
sts_end_parallel_scan ( hashtable - > batches [ curbatch ] . inner_tuples ) ;
sts_end_parallel_scan ( hashtable - > batches [ curbatch ] . outer_tuples ) ;
/* Detach from the batch we were last working on. */
if ( BarrierArriveAndDetach ( & batch - > batch_barrier ) )
/* After attaching we always get at least to PHJ_BATCH_PROBE. */
Assert ( BarrierPhase ( & batch - > batch_barrier ) = = PHJ_BATCH_PROBE | |
BarrierPhase ( & batch - > batch_barrier ) = = PHJ_BATCH_SCAN ) ;
/*
* If we ' re abandoning the PHJ_BATCH_PROBE phase early without having
* reached the end of it , it means the plan doesn ' t want any more
* tuples , and it is happy to abandon any tuples buffered in this
* process ' s subplans . For correctness , we can ' t allow any process to
* execute the PHJ_BATCH_SCAN phase , because we will never have the
* complete set of match bits . Therefore we skip emitting unmatched
* tuples in all backends ( if this is a full / right join ) , as if those
* tuples were all due to be emitted by this process and it has
* abandoned them too .
*/
if ( BarrierPhase ( & batch - > batch_barrier ) = = PHJ_BATCH_PROBE & &
! hashtable - > batches [ curbatch ] . outer_eof )
{
/*
* This flag may be written to by multiple backends during
* PHJ_BATCH_PROBE phase , but will only be read in PHJ_BATCH_SCAN
* phase so requires no extra locking .
*/
batch - > skip_unmatched = true ;
}
/*
* Even if we aren ' t doing a full / right outer join , we ' ll step through
* the PHJ_BATCH_SCAN phase just to maintain the invariant that
* freeing happens in PHJ_BATCH_FREE , but that ' ll be wait - free .
*/
if ( BarrierPhase ( & batch - > batch_barrier ) = = PHJ_BATCH_PROBE )
attached = BarrierArriveAndDetachExceptLast ( & batch - > batch_barrier ) ;
if ( attached & & BarrierArriveAndDetach ( & batch - > batch_barrier ) )
{
/*
* Technically we shouldn ' t access the barrier because we ' re no
* longer attached , but since there is no way it ' s moving after
* this point it seems safe to make the following assertion .
* We are not longer attached to the batch barrier , but we ' re the
* process that was chosen to free resources and it ' s safe to
* assert the current phase . The ParallelHashJoinBatch can ' t go
* away underneath us while we are attached to the build barrier ,
* making this access safe .
*/
Assert ( BarrierPhase ( & batch - > batch_barrier ) = = PHJ_BATCH_FREE ) ;