|
|
|
@ -359,6 +359,14 @@ typedef struct HashAggBatch |
|
|
|
|
int64 input_tuples; /* number of tuples in this batch */ |
|
|
|
|
} HashAggBatch; |
|
|
|
|
|
|
|
|
|
/* used to find referenced colnos */ |
|
|
|
|
typedef struct FindColsContext |
|
|
|
|
{ |
|
|
|
|
bool is_aggref; /* is under an aggref */ |
|
|
|
|
Bitmapset *aggregated; /* column references under an aggref */ |
|
|
|
|
Bitmapset *unaggregated; /* other column references */ |
|
|
|
|
} FindColsContext; |
|
|
|
|
|
|
|
|
|
static void select_current_set(AggState *aggstate, int setno, bool is_hash); |
|
|
|
|
static void initialize_phase(AggState *aggstate, int newphase); |
|
|
|
|
static TupleTableSlot *fetch_input_tuple(AggState *aggstate); |
|
|
|
@ -391,8 +399,9 @@ static void finalize_aggregates(AggState *aggstate, |
|
|
|
|
AggStatePerAgg peragg, |
|
|
|
|
AggStatePerGroup pergroup); |
|
|
|
|
static TupleTableSlot *project_aggregates(AggState *aggstate); |
|
|
|
|
static Bitmapset *find_unaggregated_cols(AggState *aggstate); |
|
|
|
|
static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos); |
|
|
|
|
static void find_cols(AggState *aggstate, Bitmapset **aggregated, |
|
|
|
|
Bitmapset **unaggregated); |
|
|
|
|
static bool find_cols_walker(Node *node, FindColsContext *context); |
|
|
|
|
static void build_hash_tables(AggState *aggstate); |
|
|
|
|
static void build_hash_table(AggState *aggstate, int setno, long nbuckets); |
|
|
|
|
static void hashagg_recompile_expressions(AggState *aggstate, bool minslot, |
|
|
|
@ -425,8 +434,8 @@ static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp); |
|
|
|
|
static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, |
|
|
|
|
int used_bits, uint64 input_tuples, |
|
|
|
|
double hashentrysize); |
|
|
|
|
static Size hashagg_spill_tuple(HashAggSpill *spill, TupleTableSlot *slot, |
|
|
|
|
uint32 hash); |
|
|
|
|
static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill, |
|
|
|
|
TupleTableSlot *slot, uint32 hash); |
|
|
|
|
static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, |
|
|
|
|
int setno); |
|
|
|
|
static void hashagg_tapeinfo_init(AggState *aggstate); |
|
|
|
@ -1375,26 +1384,28 @@ project_aggregates(AggState *aggstate) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* find_unaggregated_cols |
|
|
|
|
* Construct a bitmapset of the column numbers of un-aggregated Vars |
|
|
|
|
* appearing in our targetlist and qual (HAVING clause) |
|
|
|
|
* Walk tlist and qual to find referenced colnos, dividing them into |
|
|
|
|
* aggregated and unaggregated sets. |
|
|
|
|
*/ |
|
|
|
|
static Bitmapset * |
|
|
|
|
find_unaggregated_cols(AggState *aggstate) |
|
|
|
|
static void |
|
|
|
|
find_cols(AggState *aggstate, Bitmapset **aggregated, Bitmapset **unaggregated) |
|
|
|
|
{ |
|
|
|
|
Agg *node = (Agg *) aggstate->ss.ps.plan; |
|
|
|
|
Bitmapset *colnos; |
|
|
|
|
|
|
|
|
|
colnos = NULL; |
|
|
|
|
(void) find_unaggregated_cols_walker((Node *) node->plan.targetlist, |
|
|
|
|
&colnos); |
|
|
|
|
(void) find_unaggregated_cols_walker((Node *) node->plan.qual, |
|
|
|
|
&colnos); |
|
|
|
|
return colnos; |
|
|
|
|
Agg *agg = (Agg *) aggstate->ss.ps.plan; |
|
|
|
|
FindColsContext context; |
|
|
|
|
|
|
|
|
|
context.is_aggref = false; |
|
|
|
|
context.aggregated = NULL; |
|
|
|
|
context.unaggregated = NULL; |
|
|
|
|
|
|
|
|
|
(void) find_cols_walker((Node *) agg->plan.targetlist, &context); |
|
|
|
|
(void) find_cols_walker((Node *) agg->plan.qual, &context); |
|
|
|
|
|
|
|
|
|
*aggregated = context.aggregated; |
|
|
|
|
*unaggregated = context.unaggregated; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static bool |
|
|
|
|
find_unaggregated_cols_walker(Node *node, Bitmapset **colnos) |
|
|
|
|
find_cols_walker(Node *node, FindColsContext *context) |
|
|
|
|
{ |
|
|
|
|
if (node == NULL) |
|
|
|
|
return false; |
|
|
|
@ -1405,16 +1416,24 @@ find_unaggregated_cols_walker(Node *node, Bitmapset **colnos) |
|
|
|
|
/* setrefs.c should have set the varno to OUTER_VAR */ |
|
|
|
|
Assert(var->varno == OUTER_VAR); |
|
|
|
|
Assert(var->varlevelsup == 0); |
|
|
|
|
*colnos = bms_add_member(*colnos, var->varattno); |
|
|
|
|
if (context->is_aggref) |
|
|
|
|
context->aggregated = bms_add_member(context->aggregated, |
|
|
|
|
var->varattno); |
|
|
|
|
else |
|
|
|
|
context->unaggregated = bms_add_member(context->unaggregated, |
|
|
|
|
var->varattno); |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
if (IsA(node, Aggref) || IsA(node, GroupingFunc)) |
|
|
|
|
if (IsA(node, Aggref)) |
|
|
|
|
{ |
|
|
|
|
/* do not descend into aggregate exprs */ |
|
|
|
|
Assert(!context->is_aggref); |
|
|
|
|
context->is_aggref = true; |
|
|
|
|
expression_tree_walker(node, find_cols_walker, (void *) context); |
|
|
|
|
context->is_aggref = false; |
|
|
|
|
return false; |
|
|
|
|
} |
|
|
|
|
return expression_tree_walker(node, find_unaggregated_cols_walker, |
|
|
|
|
(void *) colnos); |
|
|
|
|
return expression_tree_walker(node, find_cols_walker, |
|
|
|
|
(void *) context); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
@ -1532,13 +1551,27 @@ static void |
|
|
|
|
find_hash_columns(AggState *aggstate) |
|
|
|
|
{ |
|
|
|
|
Bitmapset *base_colnos; |
|
|
|
|
Bitmapset *aggregated_colnos; |
|
|
|
|
TupleDesc scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor; |
|
|
|
|
List *outerTlist = outerPlanState(aggstate)->plan->targetlist; |
|
|
|
|
int numHashes = aggstate->num_hashes; |
|
|
|
|
EState *estate = aggstate->ss.ps.state; |
|
|
|
|
int j; |
|
|
|
|
|
|
|
|
|
/* Find Vars that will be needed in tlist and qual */ |
|
|
|
|
base_colnos = find_unaggregated_cols(aggstate); |
|
|
|
|
find_cols(aggstate, &aggregated_colnos, &base_colnos); |
|
|
|
|
aggstate->colnos_needed = bms_union(base_colnos, aggregated_colnos); |
|
|
|
|
aggstate->max_colno_needed = 0; |
|
|
|
|
aggstate->all_cols_needed = true; |
|
|
|
|
|
|
|
|
|
for (int i = 0; i < scanDesc->natts; i++) |
|
|
|
|
{ |
|
|
|
|
int colno = i + 1; |
|
|
|
|
if (bms_is_member(colno, aggstate->colnos_needed)) |
|
|
|
|
aggstate->max_colno_needed = colno; |
|
|
|
|
else |
|
|
|
|
aggstate->all_cols_needed = false; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for (j = 0; j < numHashes; ++j) |
|
|
|
|
{ |
|
|
|
@ -2097,7 +2130,7 @@ lookup_hash_entries(AggState *aggstate) |
|
|
|
|
perhash->aggnode->numGroups, |
|
|
|
|
aggstate->hashentrysize); |
|
|
|
|
|
|
|
|
|
hashagg_spill_tuple(spill, slot, hash); |
|
|
|
|
hashagg_spill_tuple(aggstate, spill, slot, hash); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -2619,7 +2652,7 @@ agg_refill_hash_table(AggState *aggstate) |
|
|
|
|
HASHAGG_READ_BUFFER_SIZE); |
|
|
|
|
for (;;) |
|
|
|
|
{ |
|
|
|
|
TupleTableSlot *slot = aggstate->hash_spill_slot; |
|
|
|
|
TupleTableSlot *slot = aggstate->hash_spill_rslot; |
|
|
|
|
MinimalTuple tuple; |
|
|
|
|
uint32 hash; |
|
|
|
|
bool in_hash_table; |
|
|
|
@ -2655,7 +2688,7 @@ agg_refill_hash_table(AggState *aggstate) |
|
|
|
|
ngroups_estimate, aggstate->hashentrysize); |
|
|
|
|
} |
|
|
|
|
/* no memory for a new group, spill */ |
|
|
|
|
hashagg_spill_tuple(&spill, slot, hash); |
|
|
|
|
hashagg_spill_tuple(aggstate, &spill, slot, hash); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
@ -2934,9 +2967,11 @@ hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, int used_bits, |
|
|
|
|
* partition. |
|
|
|
|
*/ |
|
|
|
|
static Size |
|
|
|
|
hashagg_spill_tuple(HashAggSpill *spill, TupleTableSlot *slot, uint32 hash) |
|
|
|
|
hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill, |
|
|
|
|
TupleTableSlot *inputslot, uint32 hash) |
|
|
|
|
{ |
|
|
|
|
LogicalTapeSet *tapeset = spill->tapeset; |
|
|
|
|
TupleTableSlot *spillslot; |
|
|
|
|
int partition; |
|
|
|
|
MinimalTuple tuple; |
|
|
|
|
int tapenum; |
|
|
|
@ -2945,8 +2980,28 @@ hashagg_spill_tuple(HashAggSpill *spill, TupleTableSlot *slot, uint32 hash) |
|
|
|
|
|
|
|
|
|
Assert(spill->partitions != NULL); |
|
|
|
|
|
|
|
|
|
/* XXX: may contain unnecessary attributes, should project */ |
|
|
|
|
tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree); |
|
|
|
|
/* spill only attributes that we actually need */ |
|
|
|
|
if (!aggstate->all_cols_needed) |
|
|
|
|
{ |
|
|
|
|
spillslot = aggstate->hash_spill_wslot; |
|
|
|
|
slot_getsomeattrs(inputslot, aggstate->max_colno_needed); |
|
|
|
|
ExecClearTuple(spillslot); |
|
|
|
|
for (int i = 0; i < spillslot->tts_tupleDescriptor->natts; i++) |
|
|
|
|
{ |
|
|
|
|
if (bms_is_member(i + 1, aggstate->colnos_needed)) |
|
|
|
|
{ |
|
|
|
|
spillslot->tts_values[i] = inputslot->tts_values[i]; |
|
|
|
|
spillslot->tts_isnull[i] = inputslot->tts_isnull[i]; |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
spillslot->tts_isnull[i] = true; |
|
|
|
|
} |
|
|
|
|
ExecStoreVirtualTuple(spillslot); |
|
|
|
|
} |
|
|
|
|
else |
|
|
|
|
spillslot = inputslot; |
|
|
|
|
|
|
|
|
|
tuple = ExecFetchSlotMinimalTuple(spillslot, &shouldFree); |
|
|
|
|
|
|
|
|
|
partition = (hash & spill->mask) >> spill->shift; |
|
|
|
|
spill->ntuples[partition]++; |
|
|
|
@ -3563,8 +3618,10 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) |
|
|
|
|
aggstate->hash_metacxt = AllocSetContextCreate(aggstate->ss.ps.state->es_query_cxt, |
|
|
|
|
"HashAgg meta context", |
|
|
|
|
ALLOCSET_DEFAULT_SIZES); |
|
|
|
|
aggstate->hash_spill_slot = ExecInitExtraTupleSlot(estate, scanDesc, |
|
|
|
|
&TTSOpsMinimalTuple); |
|
|
|
|
aggstate->hash_spill_rslot = ExecInitExtraTupleSlot(estate, scanDesc, |
|
|
|
|
&TTSOpsMinimalTuple); |
|
|
|
|
aggstate->hash_spill_wslot = ExecInitExtraTupleSlot(estate, scanDesc, |
|
|
|
|
&TTSOpsVirtual); |
|
|
|
|
|
|
|
|
|
/* this is an array of pointers, not structures */ |
|
|
|
|
aggstate->hash_pergroup = pergroups; |
|
|
|
|