@ -26,24 +26,30 @@
# include "utils/memutils.h"
# include "utils/rel.h"
/*
* Tuple array for each worker
*/
typedef struct GMReaderTupleBuffer
{
HeapTuple * tuple ;
int readCounter ;
int nTuples ;
bool done ;
} GMReaderTupleBuffer ;
/*
* When we read tuples from workers , it ' s a good idea to read several at once
* for efficiency when possible : this minimizes context - switching overhead .
* But reading too many at a time wastes memory without improving performance .
* We ' ll read up to MAX_TUPLE_STORE tuples ( in addition to the first one ) .
*/
# define MAX_TUPLE_STORE 10
/*
* Pending - tuple array for each worker . This holds additional tuples that
* we were able to fetch from the worker , but can ' t process yet . In addition ,
* this struct holds the " done " flag indicating the worker is known to have
* no more tuples . ( We do not use this struct for the leader ; we don ' t keep
* any pending tuples for the leader , and the need_to_scan_locally flag serves
* as its " done " indicator . )
*/
typedef struct GMReaderTupleBuffer
{
HeapTuple * tuple ; /* array of length MAX_TUPLE_STORE */
int nTuples ; /* number of tuples currently stored */
int readCounter ; /* index of next tuple to extract */
bool done ; /* true if reader is known exhausted */
} GMReaderTupleBuffer ;
static TupleTableSlot * ExecGatherMerge ( PlanState * pstate ) ;
static int32 heap_compare_slots ( Datum a , Datum b , void * arg ) ;
static TupleTableSlot * gather_merge_getnext ( GatherMergeState * gm_state ) ;
@ -53,7 +59,7 @@ static void gather_merge_init(GatherMergeState *gm_state);
static void ExecShutdownGatherMergeWorkers ( GatherMergeState * node ) ;
static bool gather_merge_readnext ( GatherMergeState * gm_state , int reader ,
bool nowait ) ;
static void form _tuple_array( GatherMergeState * gm_state , int reader ) ;
static void load _tuple_array( GatherMergeState * gm_state , int reader ) ;
/* ----------------------------------------------------------------
* ExecInitGather
@ -77,6 +83,9 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
gm_state - > ps . plan = ( Plan * ) node ;
gm_state - > ps . state = estate ;
gm_state - > ps . ExecProcNode = ExecGatherMerge ;
gm_state - > initialized = false ;
gm_state - > gm_initialized = false ;
gm_state - > tuples_needed = - 1 ;
/*
@ -87,10 +96,10 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
ExecAssignExprContext ( estate , & gm_state - > ps ) ;
/*
* initialize child expressions
* GatherMerge doesn ' t support checking a qual ( it ' s always more efficient
* to do it in the child node ) .
*/
gm_state - > ps . qual =
ExecInitQual ( node - > plan . qual , & gm_state - > ps ) ;
Assert ( ! node - > plan . qual ) ;
/*
* tuple table initialization
@ -109,8 +118,6 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
ExecAssignResultTypeFromTL ( & gm_state - > ps ) ;
ExecAssignProjectionInfo ( & gm_state - > ps , NULL ) ;
gm_state - > gm_initialized = false ;
/*
* initialize sort - key information
*/
@ -177,7 +184,7 @@ ExecGatherMerge(PlanState *pstate)
if ( ! node - > initialized )
{
EState * estate = node - > ps . state ;
GatherMerge * gm = ( GatherMerge * ) node - > ps . plan ;
GatherMerge * gm = castNode ( GatherMerge , node - > ps . plan ) ;
/*
* Sometimes we might have to run without parallelism ; but if parallel
@ -200,17 +207,16 @@ ExecGatherMerge(PlanState *pstate)
/* Try to launch workers. */
pcxt = node - > pei - > pcxt ;
LaunchParallelWorkers ( pcxt ) ;
/* We save # workers launched for the benefit of EXPLAIN */
node - > nworkers_launched = pcxt - > nworkers_launched ;
node - > nreaders = 0 ;
/* Set up tuple queue readers to read the results. */
if ( pcxt - > nworkers_launched > 0 )
{
node - > nreaders = 0 ;
node - > reader = palloc ( pcxt - > nworkers_launched *
sizeof ( TupleQueueReader * ) ) ;
Assert ( gm - > numCols ) ;
for ( i = 0 ; i < pcxt - > nworkers_launched ; + + i )
{
shm_mq_set_handle ( node - > pei - > tqueue [ i ] ,
@ -248,9 +254,7 @@ ExecGatherMerge(PlanState *pstate)
return NULL ;
/*
* form the result tuple using ExecProject ( ) , and return it - - - unless the
* projection produces an empty set , in which case we must loop back
* around for another tuple
* Form the result tuple using ExecProject ( ) , and return it .
*/
econtext - > ecxt_outertuple = slot ;
return ExecProject ( node - > ps . ps_ProjInfo ) ;
@ -374,17 +378,16 @@ static void
gather_merge_init ( GatherMergeState * gm_state )
{
int nreaders = gm_state - > nreaders ;
bool initialize = true ;
bool nowait = true ;
int i ;
/*
* Allocate gm_slots for the number of worker + one more slot for leader .
* Allocate gm_slots for the number of workers + one more slot for leader .
* Last slot is always for leader . Leader always calls ExecProcNode ( ) to
* read the tuple which will return the TupleTableSlot . Later it will
* directly get assigned to gm_slot . So just initialize leader gm_slot
* with NULL . For other slots below code will call
* ExecInitExtraTupleSlot ( ) which will do the initialization of worker
* slots .
* with NULL . For other slots , code below will call
* ExecInitExtraTupleSlot ( ) to create a slot for the worker ' s results .
*/
gm_state - > gm_slots =
palloc ( ( gm_state - > nreaders + 1 ) * sizeof ( TupleTableSlot * ) ) ;
@ -393,10 +396,10 @@ gather_merge_init(GatherMergeState *gm_state)
/* Initialize the tuple slot and tuple array for each worker */
gm_state - > gm_tuple_buffers =
( GMReaderTupleBuffer * ) palloc0 ( sizeof ( GMReaderTupleBuffer ) *
( gm_state - > nreaders + 1 ) ) ;
gm_state - > nreaders ) ;
for ( i = 0 ; i < gm_state - > nreaders ; i + + )
{
/* Allocate the tuple array with MAX_TUPLE_STORE size */
/* Allocate the tuple array with length MAX_TUPLE_STORE */
gm_state - > gm_tuple_buffers [ i ] . tuple =
( HeapTuple * ) palloc0 ( sizeof ( HeapTuple ) * MAX_TUPLE_STORE ) ;
@ -413,39 +416,53 @@ gather_merge_init(GatherMergeState *gm_state)
/*
* First , try to read a tuple from each worker ( including leader ) in
* nowait mode , so that we initialize read from each worker as well as
* leader . After this , if all active workers are unable to produce a
* tuple , then re - read and this time use wait mode . For workers that were
* able to produce a tuple in the earlier loop and are still active , just
* try to fill the tuple array if more tuples are avaiable .
* nowait mode . After this , if not all workers were able to produce a
* tuple ( or a " done " indication ) , then re - read from remaining workers ,
* this time using wait mode . Add all live readers ( those producing at
* least one tuple ) to the heap .
*/
reread :
for ( i = 0 ; i < nreaders + 1 ; i + + )
{
CHECK_FOR_INTERRUPTS ( ) ;
if ( ! gm_state - > gm_tuple_buffers [ i ] . done & &
( TupIsNull ( gm_state - > gm_slots [ i ] ) | |
gm_state - > gm_slots [ i ] - > tts_isempty ) )
/* ignore this source if already known done */
if ( ( i < nreaders ) ?
! gm_state - > gm_tuple_buffers [ i ] . done :
gm_state - > need_to_scan_locally )
{
if ( gather_merge_readnext ( gm_state , i , initialize ) )
if ( TupIsNull ( gm_state - > gm_slots [ i ] ) )
{
binaryheap_add_unordered ( gm_state - > gm_heap ,
Int32GetDatum ( i ) ) ;
/* Don't have a tuple yet, try to get one */
if ( gather_merge_readnext ( gm_state , i , nowait ) )
binaryheap_add_unordered ( gm_state - > gm_heap ,
Int32GetDatum ( i ) ) ;
}
else
{
/*
* We already got at least one tuple from this worker , but
* might as well see if it has any more ready by now .
*/
load_tuple_array ( gm_state , i ) ;
}
}
else
form_tuple_array ( gm_state , i ) ;
}
initialize = false ;
/* need not recheck leader, since nowait doesn't matter for it */
for ( i = 0 ; i < nreaders ; i + + )
{
if ( ! gm_state - > gm_tuple_buffers [ i ] . done & &
( TupIsNull ( gm_state - > gm_slots [ i ] ) | |
gm_state - > gm_slots [ i ] - > tts_isempty ) )
TupIsNull ( gm_state - > gm_slots [ i ] ) )
{
nowait = false ;
goto reread ;
}
}
/* Now heapify the heap. */
binaryheap_build ( gm_state - > gm_heap ) ;
gm_state - > gm_initialized = true ;
}
@ -460,7 +477,7 @@ gather_merge_clear_slots(GatherMergeState *gm_state)
for ( i = 0 ; i < gm_state - > nreaders ; i + + )
{
pfree ( gm_state - > gm_tuple_buffers [ i ] . tuple ) ;
gm_state - > gm_slots [ i ] = ExecClearTuple ( gm_state - > gm_slots [ i ] ) ;
ExecClearTuple ( gm_state - > gm_slots [ i ] ) ;
}
/* Free tuple array as we don't need it any more */
@ -500,7 +517,10 @@ gather_merge_getnext(GatherMergeState *gm_state)
if ( gather_merge_readnext ( gm_state , i , false ) )
binaryheap_replace_first ( gm_state - > gm_heap , Int32GetDatum ( i ) ) ;
else
{
/* reader exhausted, remove it from heap */
( void ) binaryheap_remove_first ( gm_state - > gm_heap ) ;
}
}
if ( binaryheap_empty ( gm_state - > gm_heap ) )
@ -518,37 +538,37 @@ gather_merge_getnext(GatherMergeState *gm_state)
}
/*
* Read the tuple for given reader in nowait mode , and form the tuple array .
* Read tuple ( s ) for given reader in nowait mode , and load into its tuple
* array , until we have MAX_TUPLE_STORE of them or would have to block .
*/
static void
form _tuple_array( GatherMergeState * gm_state , int reader )
load _tuple_array( GatherMergeState * gm_state , int reader )
{
GMReaderTupleBuffer * tuple_buffer = & gm_state - > gm_tuple_buffers [ reader ] ;
GMReaderTupleBuffer * tuple_buffer ;
int i ;
/* Last slot is for leader and we don't build tuple array for leader */
/* Don't do anything if this is the leader. */
if ( reader = = gm_state - > nreaders )
return ;
/*
* We here because we already read all the tuples from the tuple array , so
* initialize the counter to zero .
*/
tuple_buffer = & gm_state - > gm_tuple_buffers [ reader ] ;
/* If there's nothing in the array, reset the counters to zero. */
if ( tuple_buffer - > nTuples = = tuple_buffer - > readCounter )
tuple_buffer - > nTuples = tuple_buffer - > readCounter = 0 ;
/* Tuple array is already full? */
if ( tuple_buffer - > nTuples = = MAX_TUPLE_STORE )
return ;
/* Try to fill additional slots in the array. */
for ( i = tuple_buffer - > nTuples ; i < MAX_TUPLE_STORE ; i + + )
{
tuple_buffer - > tuple [ i ] = heap_copytuple ( gm_readnext_tuple ( gm_state ,
reader ,
false ,
& tuple_buffer - > done ) ) ;
if ( ! HeapTupleIsValid ( tuple_buffer - > tuple [ i ] ) )
HeapTuple tuple ;
tuple = gm_readnext_tuple ( gm_state ,
reader ,
true ,
& tuple_buffer - > done ) ;
if ( ! HeapTupleIsValid ( tuple ) )
break ;
tuple_buffer - > tuple [ i ] = heap_copytuple ( tuple ) ;
tuple_buffer - > nTuples + + ;
}
}
@ -556,13 +576,15 @@ form_tuple_array(GatherMergeState *gm_state, int reader)
/*
* Store the next tuple for a given reader into the appropriate slot .
*
* Returns false if the reader is exhausted , and true otherwise .
* Returns true if successful , false if not ( either reader is exhausted ,
* or we didn ' t want to wait for a tuple ) . Sets done flag if reader
* is found to be exhausted .
*/
static bool
gather_merge_readnext ( GatherMergeState * gm_state , int reader , bool nowait )
{
GMReaderTupleBuffer * tuple_buffer ;
HeapTuple tup = NULL ;
HeapTuple tup ;
/*
* If we ' re being asked to generate a tuple from the leader , then we just
@ -582,7 +604,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
gm_state - > gm_slots [ reader ] = outerTupleSlot ;
return true ;
}
gm_state - > gm_tuple_buffers [ reader ] . done = true ;
/* need_to_scan_locally serves as "done" flag for leader */
gm_state - > need_to_scan_locally = false ;
}
return false ;
@ -594,7 +616,6 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
if ( tuple_buffer - > nTuples > tuple_buffer - > readCounter )
{
/* Return any tuple previously read that is still buffered. */
tuple_buffer = & gm_state - > gm_tuple_buffers [ reader ] ;
tup = tuple_buffer - > tuple [ tuple_buffer - > readCounter + + ] ;
}
else if ( tuple_buffer - > done )
@ -607,19 +628,19 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
else
{
/* Read and buffer next tuple. */
tup = heap_copytuple ( gm_readnext_tuple ( gm_state ,
reader ,
nowait ,
& tuple_buffer - > done ) ) ;
tup = gm_readnext_tuple ( gm_state ,
reader ,
nowait ,
& tuple_buffer - > done ) ;
if ( ! HeapTupleIsValid ( tup ) )
return false ;
tup = heap_copytuple ( tup ) ;
/*
* Attempt to read more tuples in nowait mode and store them in the
* tuple array .
* pending - tuple array for the reader .
*/
if ( HeapTupleIsValid ( tup ) )
form_tuple_array ( gm_state , reader ) ;
else
return false ;
load_tuple_array ( gm_state , reader ) ;
}
Assert ( HeapTupleIsValid ( tup ) ) ;
@ -642,15 +663,10 @@ gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
bool * done )
{
TupleQueueReader * reader ;
HeapTuple tup = NULL ;
HeapTuple tup ;
MemoryContext oldContext ;
MemoryContext tupleContext ;
tupleContext = gm_state - > ps . ps_ExprContext - > ecxt_per_tuple_memory ;
if ( done ! = NULL )
* done = false ;
/* Check for async events, particularly messages from workers. */
CHECK_FOR_INTERRUPTS ( ) ;
@ -658,6 +674,7 @@ gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
reader = gm_state - > reader [ nreader ] ;
/* Run TupleQueueReaders in per-tuple context */
tupleContext = gm_state - > ps . ps_ExprContext - > ecxt_per_tuple_memory ;
oldContext = MemoryContextSwitchTo ( tupleContext ) ;
tup = TupleQueueReaderNext ( reader , nowait , done ) ;
MemoryContextSwitchTo ( oldContext ) ;