@ -240,6 +240,7 @@
# include "postgres.h"
# include "postgres.h"
# include "access/htup_details.h"
# include "access/htup_details.h"
# include "access/parallel.h"
# include "catalog/objectaccess.h"
# include "catalog/objectaccess.h"
# include "catalog/pg_aggregate.h"
# include "catalog/pg_aggregate.h"
# include "catalog/pg_proc.h"
# include "catalog/pg_proc.h"
@ -4483,6 +4484,22 @@ ExecEndAgg(AggState *node)
int numGroupingSets = Max ( node - > maxsets , 1 ) ;
int numGroupingSets = Max ( node - > maxsets , 1 ) ;
int setno ;
int setno ;
/*
* When ending a parallel worker , copy the statistics gathered by the
* worker back into shared memory so that it can be picked up by the main
* process to report in EXPLAIN ANALYZE .
*/
if ( node - > shared_info & & IsParallelWorker ( ) )
{
AggregateInstrumentation * si ;
Assert ( ParallelWorkerNumber < = node - > shared_info - > num_workers ) ;
si = & node - > shared_info - > sinstrument [ ParallelWorkerNumber ] ;
si - > hash_batches_used = node - > hash_batches_used ;
si - > hash_disk_used = node - > hash_disk_used ;
si - > hash_mem_peak = node - > hash_mem_peak ;
}
/* Make sure we have closed any open tuplesorts */
/* Make sure we have closed any open tuplesorts */
if ( node - > sort_in )
if ( node - > sort_in )
@ -4854,3 +4871,89 @@ aggregate_dummy(PG_FUNCTION_ARGS)
fcinfo - > flinfo - > fn_oid ) ;
fcinfo - > flinfo - > fn_oid ) ;
return ( Datum ) 0 ; /* keep compiler quiet */
return ( Datum ) 0 ; /* keep compiler quiet */
}
}
/* ----------------------------------------------------------------
* Parallel Query Support
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
/* ----------------------------------------------------------------
* ExecAggEstimate
*
* Estimate space required to propagate aggregate statistics .
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
void
ExecAggEstimate ( AggState * node , ParallelContext * pcxt )
{
Size size ;
/* don't need this if not instrumenting or no workers */
if ( ! node - > ss . ps . instrument | | pcxt - > nworkers = = 0 )
return ;
size = mul_size ( pcxt - > nworkers , sizeof ( AggregateInstrumentation ) ) ;
size = add_size ( size , offsetof ( SharedAggInfo , sinstrument ) ) ;
shm_toc_estimate_chunk ( & pcxt - > estimator , size ) ;
shm_toc_estimate_keys ( & pcxt - > estimator , 1 ) ;
}
/* ----------------------------------------------------------------
* ExecAggInitializeDSM
*
* Initialize DSM space for aggregate statistics .
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
void
ExecAggInitializeDSM ( AggState * node , ParallelContext * pcxt )
{
Size size ;
/* don't need this if not instrumenting or no workers */
if ( ! node - > ss . ps . instrument | | pcxt - > nworkers = = 0 )
return ;
size = offsetof ( SharedAggInfo , sinstrument )
+ pcxt - > nworkers * sizeof ( AggregateInstrumentation ) ;
node - > shared_info = shm_toc_allocate ( pcxt - > toc , size ) ;
/* ensure any unfilled slots will contain zeroes */
memset ( node - > shared_info , 0 , size ) ;
node - > shared_info - > num_workers = pcxt - > nworkers ;
shm_toc_insert ( pcxt - > toc , node - > ss . ps . plan - > plan_node_id ,
node - > shared_info ) ;
}
/* ----------------------------------------------------------------
* ExecAggInitializeWorker
*
* Attach worker to DSM space for aggregate statistics .
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
void
ExecAggInitializeWorker ( AggState * node , ParallelWorkerContext * pwcxt )
{
node - > shared_info =
shm_toc_lookup ( pwcxt - > toc , node - > ss . ps . plan - > plan_node_id , true ) ;
}
/* ----------------------------------------------------------------
* ExecAggRetrieveInstrumentation
*
* Transfer aggregate statistics from DSM to private memory .
* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
*/
void
ExecAggRetrieveInstrumentation ( AggState * node )
{
Size size ;
SharedAggInfo * si ;
if ( node - > shared_info = = NULL )
return ;
size = offsetof ( SharedAggInfo , sinstrument )
+ node - > shared_info - > num_workers * sizeof ( AggregateInstrumentation ) ;
si = palloc ( size ) ;
memcpy ( si , node - > shared_info , size ) ;
node - > shared_info = si ;
}