@ -287,7 +287,8 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
if ( ! reinitialize )
tqueuespace =
shm_toc_allocate ( pcxt - > toc ,
PARALLEL_TUPLE_QUEUE_SIZE * pcxt - > nworkers ) ;
mul_size ( PARALLEL_TUPLE_QUEUE_SIZE ,
pcxt - > nworkers ) ) ;
else
tqueuespace = shm_toc_lookup ( pcxt - > toc , PARALLEL_KEY_TUPLE_QUEUE ) ;
@ -296,7 +297,8 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
{
shm_mq * mq ;
mq = shm_mq_create ( tqueuespace + i * PARALLEL_TUPLE_QUEUE_SIZE ,
mq = shm_mq_create ( tqueuespace +
( ( Size ) i ) * PARALLEL_TUPLE_QUEUE_SIZE ,
( Size ) PARALLEL_TUPLE_QUEUE_SIZE ) ;
shm_mq_set_receiver ( mq , MyProc ) ;
@ -380,12 +382,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
* looking at pgBufferUsage , so do it unconditionally .
*/
shm_toc_estimate_chunk ( & pcxt - > estimator ,
sizeof ( BufferUsage ) * pcxt - > nworkers ) ;
mul_size ( sizeof ( BufferUsage ) , pcxt - > nworkers ) ) ;
shm_toc_estimate_keys ( & pcxt - > estimator , 1 ) ;
/* Estimate space for tuple queues. */
shm_toc_estimate_chunk ( & pcxt - > estimator ,
PARALLEL_TUPLE_QUEUE_SIZE * pcxt - > nworkers ) ;
mul_size ( PARALLEL_TUPLE_QUEUE_SIZE , pcxt - > nworkers ) ) ;
shm_toc_estimate_keys ( & pcxt - > estimator , 1 ) ;
/*
@ -404,7 +406,9 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
sizeof ( int ) * e . nnodes ;
instrumentation_len = MAXALIGN ( instrumentation_len ) ;
instrument_offset = instrumentation_len ;
instrumentation_len + = sizeof ( Instrumentation ) * e . nnodes * nworkers ;
instrumentation_len + =
mul_size ( sizeof ( Instrumentation ) ,
mul_size ( e . nnodes , nworkers ) ) ;
shm_toc_estimate_chunk ( & pcxt - > estimator , instrumentation_len ) ;
shm_toc_estimate_keys ( & pcxt - > estimator , 1 ) ;
}
@ -432,7 +436,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
/* Allocate space for each worker's BufferUsage; no need to initialize. */
bufusage_space = shm_toc_allocate ( pcxt - > toc ,
sizeof ( BufferUsage ) * pcxt - > nworkers ) ;
mul_size ( sizeof ( BufferUsage ) , pcxt - > nworkers ) ) ;
shm_toc_insert ( pcxt - > toc , PARALLEL_KEY_BUFFER_USAGE , bufusage_space ) ;
pei - > buffer_usage = bufusage_space ;
@ -511,7 +515,7 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
InstrAggNode ( planstate - > instrument , & instrument [ n ] ) ;
/* Also store the per-worker detail. */
ibytes = instrumentation - > num_workers * sizeof ( Instrumentation ) ;
ibytes = mul_size ( instrumentation - > num_workers , sizeof ( Instrumentation ) ) ;
planstate - > worker_instrument =
palloc ( ibytes + offsetof ( WorkerInstrumentation , instrument ) ) ;
planstate - > worker_instrument - > num_workers = instrumentation - > num_workers ;