@ -23,6 +23,7 @@
# include "postgres.h"
# include "executor/execExpr.h"
# include "executor/execParallel.h"
# include "executor/executor.h"
# include "executor/nodeBitmapHeapscan.h"
@ -38,7 +39,9 @@
# include "optimizer/planner.h"
# include "storage/spin.h"
# include "tcop/tcopprot.h"
# include "utils/datum.h"
# include "utils/dsa.h"
# include "utils/lsyscache.h"
# include "utils/memutils.h"
# include "utils/snapmgr.h"
# include "pgstat.h"
@ -50,7 +53,7 @@
*/
# define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001)
# define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002)
# define PARALLEL_KEY_PARAMS UINT64CONST(0xE000000000000003)
# define PARALLEL_KEY_PARAMLISTINFO UINT64CONST(0xE000000000000003)
# define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004)
# define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005)
# define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006)
@ -65,6 +68,7 @@
typedef struct FixedParallelExecutorState
{
int64 tuples_needed ; /* tuple bound, see ExecSetTupleBound */
dsa_pointer param_exec ;
} FixedParallelExecutorState ;
/*
@ -266,6 +270,133 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
return planstate_tree_walker ( planstate , ExecParallelEstimate , e ) ;
}
/*
* Estimate the amount of space required to serialize the indicated parameters .
*/
static Size
EstimateParamExecSpace ( EState * estate , Bitmapset * params )
{
int paramid ;
Size sz = sizeof ( int ) ;
paramid = - 1 ;
while ( ( paramid = bms_next_member ( params , paramid ) ) > = 0 )
{
Oid typeOid ;
int16 typLen ;
bool typByVal ;
ParamExecData * prm ;
prm = & ( estate - > es_param_exec_vals [ paramid ] ) ;
typeOid = list_nth_oid ( estate - > es_plannedstmt - > paramExecTypes ,
paramid ) ;
sz = add_size ( sz , sizeof ( int ) ) ; /* space for paramid */
/* space for datum/isnull */
if ( OidIsValid ( typeOid ) )
get_typlenbyval ( typeOid , & typLen , & typByVal ) ;
else
{
/* If no type OID, assume by-value, like copyParamList does. */
typLen = sizeof ( Datum ) ;
typByVal = true ;
}
sz = add_size ( sz ,
datumEstimateSpace ( prm - > value , prm - > isnull ,
typByVal , typLen ) ) ;
}
return sz ;
}
/*
* Serialize specified PARAM_EXEC parameters .
*
* We write the number of parameters first , as a 4 - byte integer , and then
* write details for each parameter in turn . The details for each parameter
* consist of a 4 - byte paramid ( location of param in execution time internal
* parameter array ) and then the datum as serialized by datumSerialize ( ) .
*/
static dsa_pointer
SerializeParamExecParams ( EState * estate , Bitmapset * params )
{
Size size ;
int nparams ;
int paramid ;
ParamExecData * prm ;
dsa_pointer handle ;
char * start_address ;
/* Allocate enough space for the current parameter values. */
size = EstimateParamExecSpace ( estate , params ) ;
handle = dsa_allocate ( estate - > es_query_dsa , size ) ;
start_address = dsa_get_address ( estate - > es_query_dsa , handle ) ;
/* First write the number of parameters as a 4-byte integer. */
nparams = bms_num_members ( params ) ;
memcpy ( start_address , & nparams , sizeof ( int ) ) ;
start_address + = sizeof ( int ) ;
/* Write details for each parameter in turn. */
paramid = - 1 ;
while ( ( paramid = bms_next_member ( params , paramid ) ) > = 0 )
{
Oid typeOid ;
int16 typLen ;
bool typByVal ;
prm = & ( estate - > es_param_exec_vals [ paramid ] ) ;
typeOid = list_nth_oid ( estate - > es_plannedstmt - > paramExecTypes ,
paramid ) ;
/* Write paramid. */
memcpy ( start_address , & paramid , sizeof ( int ) ) ;
start_address + = sizeof ( int ) ;
/* Write datum/isnull */
if ( OidIsValid ( typeOid ) )
get_typlenbyval ( typeOid , & typLen , & typByVal ) ;
else
{
/* If no type OID, assume by-value, like copyParamList does. */
typLen = sizeof ( Datum ) ;
typByVal = true ;
}
datumSerialize ( prm - > value , prm - > isnull , typByVal , typLen ,
& start_address ) ;
}
return handle ;
}
/*
* Restore specified PARAM_EXEC parameters .
*/
static void
RestoreParamExecParams ( char * start_address , EState * estate )
{
int nparams ;
int i ;
int paramid ;
memcpy ( & nparams , start_address , sizeof ( int ) ) ;
start_address + = sizeof ( int ) ;
for ( i = 0 ; i < nparams ; i + + )
{
ParamExecData * prm ;
/* Read paramid */
memcpy ( & paramid , start_address , sizeof ( int ) ) ;
start_address + = sizeof ( int ) ;
prm = & ( estate - > es_param_exec_vals [ paramid ] ) ;
/* Read datum/isnull. */
prm - > value = datumRestore ( & start_address , & prm - > isnull ) ;
prm - > execPlan = NULL ;
}
}
/*
* Initialize the dynamic shared memory segment that will be used to control
* parallel execution .
@ -395,7 +526,8 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
* execution and return results to the main backend .
*/
ParallelExecutorInfo *
ExecInitParallelPlan ( PlanState * planstate , EState * estate , int nworkers ,
ExecInitParallelPlan ( PlanState * planstate , EState * estate ,
Bitmapset * sendParams , int nworkers ,
int64 tuples_needed )
{
ParallelExecutorInfo * pei ;
@ -405,17 +537,20 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
FixedParallelExecutorState * fpes ;
char * pstmt_data ;
char * pstmt_space ;
char * param_space ;
char * paramlistinfo _space ;
BufferUsage * bufusage_space ;
SharedExecutorInstrumentation * instrumentation = NULL ;
int pstmt_len ;
int param_len ;
int paramlistinfo _len ;
int instrumentation_len = 0 ;
int instrument_offset = 0 ;
Size dsa_minsize = dsa_minimum_size ( ) ;
char * query_string ;
int query_len ;
/* Force parameters we're going to pass to workers to be evaluated. */
ExecEvalParamExecParams ( sendParams , estate ) ;
/* Allocate object for return value. */
pei = palloc0 ( sizeof ( ParallelExecutorInfo ) ) ;
pei - > finished = false ;
@ -450,8 +585,8 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
shm_toc_estimate_keys ( & pcxt - > estimator , 1 ) ;
/* Estimate space for serialized ParamListInfo. */
param_len = EstimateParamListSpace ( estate - > es_param_list_info ) ;
shm_toc_estimate_chunk ( & pcxt - > estimator , param_len ) ;
paramlistinfo _len = EstimateParamListSpace ( estate - > es_param_list_info ) ;
shm_toc_estimate_chunk ( & pcxt - > estimator , paramlistinfo _len ) ;
shm_toc_estimate_keys ( & pcxt - > estimator , 1 ) ;
/*
@ -511,6 +646,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
/* Store fixed-size state. */
fpes = shm_toc_allocate ( pcxt - > toc , sizeof ( FixedParallelExecutorState ) ) ;
fpes - > tuples_needed = tuples_needed ;
fpes - > param_exec = InvalidDsaPointer ;
shm_toc_insert ( pcxt - > toc , PARALLEL_KEY_EXECUTOR_FIXED , fpes ) ;
/* Store query string */
@ -524,9 +660,9 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
shm_toc_insert ( pcxt - > toc , PARALLEL_KEY_PLANNEDSTMT , pstmt_space ) ;
/* Store serialized ParamListInfo. */
param_space = shm_toc_allocate ( pcxt - > toc , param_len ) ;
shm_toc_insert ( pcxt - > toc , PARALLEL_KEY_PARAMS , param_space ) ;
SerializeParamList ( estate - > es_param_list_info , & param_space ) ;
paramlistinfo _space = shm_toc_allocate ( pcxt - > toc , paramlistinfo _len ) ;
shm_toc_insert ( pcxt - > toc , PARALLEL_KEY_PARAMLI STINFO , paramlistinfo _space ) ;
SerializeParamList ( estate - > es_param_list_info , & paramlistinfo _space ) ;
/* Allocate space for each worker's BufferUsage; no need to initialize. */
bufusage_space = shm_toc_allocate ( pcxt - > toc ,
@ -577,13 +713,25 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
pei - > area = dsa_create_in_place ( area_space , dsa_minsize ,
LWTRANCHE_PARALLEL_QUERY_DSA ,
pcxt - > seg ) ;
}
/*
* Make the area available to executor nodes running in the leader . See
* also ParallelQueryMain which makes it available to workers .
*/
estate - > es_query_dsa = pei - > area ;
/*
* Make the area available to executor nodes running in the leader .
* See also ParallelQueryMain which makes it available to workers .
*/
estate - > es_query_dsa = pei - > area ;
/*
* Serialize parameters , if any , using DSA storage . We don ' t dare use
* the main parallel query DSM for this because we might relaunch
* workers after the values have changed ( and thus the amount of
* storage required has changed ) .
*/
if ( ! bms_is_empty ( sendParams ) )
{
pei - > param_exec = SerializeParamExecParams ( estate , sendParams ) ;
fpes - > param_exec = pei - > param_exec ;
}
}
/*
* Give parallel - aware nodes a chance to initialize their shared data .
@ -640,16 +788,39 @@ ExecParallelCreateReaders(ParallelExecutorInfo *pei)
*/
void
ExecParallelReinitialize ( PlanState * planstate ,
ParallelExecutorInfo * pei )
ParallelExecutorInfo * pei ,
Bitmapset * sendParams )
{
EState * estate = planstate - > state ;
FixedParallelExecutorState * fpes ;
/* Old workers must already be shut down */
Assert ( pei - > finished ) ;
/* Force parameters we're going to pass to workers to be evaluated. */
ExecEvalParamExecParams ( sendParams , estate ) ;
ReinitializeParallelDSM ( pei - > pcxt ) ;
pei - > tqueue = ExecParallelSetupTupleQueues ( pei - > pcxt , true ) ;
pei - > reader = NULL ;
pei - > finished = false ;
fpes = shm_toc_lookup ( pei - > pcxt - > toc , PARALLEL_KEY_EXECUTOR_FIXED , false ) ;
/* Free any serialized parameters from the last round. */
if ( DsaPointerIsValid ( fpes - > param_exec ) )
{
dsa_free ( estate - > es_query_dsa , fpes - > param_exec ) ;
fpes - > param_exec = InvalidDsaPointer ;
}
/* Serialize current parameter values if required. */
if ( ! bms_is_empty ( sendParams ) )
{
pei - > param_exec = SerializeParamExecParams ( estate , sendParams ) ;
fpes - > param_exec = pei - > param_exec ;
}
/* Traverse plan tree and let each child node reset associated state. */
ExecParallelReInitializeDSM ( planstate , pei - > pcxt ) ;
}
@ -831,6 +1002,12 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
void
ExecParallelCleanup ( ParallelExecutorInfo * pei )
{
/* Free any serialized parameters. */
if ( DsaPointerIsValid ( pei - > param_exec ) )
{
dsa_free ( pei - > area , pei - > param_exec ) ;
pei - > param_exec = InvalidDsaPointer ;
}
if ( pei - > area ! = NULL )
{
dsa_detach ( pei - > area ) ;
@ -882,7 +1059,7 @@ ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
pstmt = ( PlannedStmt * ) stringToNode ( pstmtspace ) ;
/* Reconstruct ParamListInfo. */
paramspace = shm_toc_lookup ( toc , PARALLEL_KEY_PARAMS , false ) ;
paramspace = shm_toc_lookup ( toc , PARALLEL_KEY_PARAMLI STINFO , false ) ;
paramLI = RestoreParamList ( & paramspace ) ;
/*
@ -1046,6 +1223,14 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
/* Special executor initialization steps for parallel workers */
queryDesc - > planstate - > state - > es_query_dsa = area ;
if ( DsaPointerIsValid ( fpes - > param_exec ) )
{
char * paramexec_space ;
paramexec_space = dsa_get_address ( area , fpes - > param_exec ) ;
RestoreParamExecParams ( paramexec_space , queryDesc - > estate ) ;
}
ExecParallelInitializeWorker ( queryDesc - > planstate , toc ) ;
/* Pass down any tuple bound */