@ -39,6 +39,7 @@
# include "utils/dsa.h"
# include "utils/dsa.h"
# include "utils/memutils.h"
# include "utils/memutils.h"
# include "utils/snapmgr.h"
# include "utils/snapmgr.h"
# include "pgstat.h"
/*
/*
* Magic numbers for parallel executor communication . We use constants
* Magic numbers for parallel executor communication . We use constants
@ -51,6 +52,7 @@
# define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000004)
# define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000004)
# define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000005)
# define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000005)
# define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000006)
# define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000006)
# define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000007)
# define PARALLEL_TUPLE_QUEUE_SIZE 65536
# define PARALLEL_TUPLE_QUEUE_SIZE 65536
@ -368,6 +370,8 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
int instrumentation_len = 0 ;
int instrumentation_len = 0 ;
int instrument_offset = 0 ;
int instrument_offset = 0 ;
Size dsa_minsize = dsa_minimum_size ( ) ;
Size dsa_minsize = dsa_minimum_size ( ) ;
char * query_string ;
int query_len ;
/* Allocate object for return value. */
/* Allocate object for return value. */
pei = palloc0 ( sizeof ( ParallelExecutorInfo ) ) ;
pei = palloc0 ( sizeof ( ParallelExecutorInfo ) ) ;
@ -387,6 +391,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
* for the various things we need to store .
* for the various things we need to store .
*/
*/
/* Estimate space for query text. */
query_len = strlen ( estate - > es_sourceText ) ;
shm_toc_estimate_chunk ( & pcxt - > estimator , query_len ) ;
shm_toc_estimate_keys ( & pcxt - > estimator , 1 ) ;
/* Estimate space for serialized PlannedStmt. */
/* Estimate space for serialized PlannedStmt. */
pstmt_len = strlen ( pstmt_data ) + 1 ;
pstmt_len = strlen ( pstmt_data ) + 1 ;
shm_toc_estimate_chunk ( & pcxt - > estimator , pstmt_len ) ;
shm_toc_estimate_chunk ( & pcxt - > estimator , pstmt_len ) ;
@ -451,6 +460,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
* asked for has been allocated or initialized yet , though , so do that .
* asked for has been allocated or initialized yet , though , so do that .
*/
*/
/* Store query string */
query_string = shm_toc_allocate ( pcxt - > toc , query_len ) ;
memcpy ( query_string , estate - > es_sourceText , query_len ) ;
shm_toc_insert ( pcxt - > toc , PARALLEL_KEY_QUERY_TEXT , query_string ) ;
/* Store serialized PlannedStmt. */
/* Store serialized PlannedStmt. */
pstmt_space = shm_toc_allocate ( pcxt - > toc , pstmt_len ) ;
pstmt_space = shm_toc_allocate ( pcxt - > toc , pstmt_len ) ;
memcpy ( pstmt_space , pstmt_data , pstmt_len ) ;
memcpy ( pstmt_space , pstmt_data , pstmt_len ) ;
@ -661,6 +675,10 @@ ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
char * paramspace ;
char * paramspace ;
PlannedStmt * pstmt ;
PlannedStmt * pstmt ;
ParamListInfo paramLI ;
ParamListInfo paramLI ;
char * queryString ;
/* Get the query string from shared memory */
queryString = shm_toc_lookup ( toc , PARALLEL_KEY_QUERY_TEXT ) ;
/* Reconstruct leader-supplied PlannedStmt. */
/* Reconstruct leader-supplied PlannedStmt. */
pstmtspace = shm_toc_lookup ( toc , PARALLEL_KEY_PLANNEDSTMT ) ;
pstmtspace = shm_toc_lookup ( toc , PARALLEL_KEY_PLANNEDSTMT ) ;
@ -679,7 +697,7 @@ ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
* revising this someday .
* revising this someday .
*/
*/
return CreateQueryDesc ( pstmt ,
return CreateQueryDesc ( pstmt ,
" <parallel query> " ,
queryString ,
GetActiveSnapshot ( ) , InvalidSnapshot ,
GetActiveSnapshot ( ) , InvalidSnapshot ,
receiver , paramLI , instrument_options ) ;
receiver , paramLI , instrument_options ) ;
}
}
@ -799,6 +817,12 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
instrument_options = instrumentation - > instrument_options ;
instrument_options = instrumentation - > instrument_options ;
queryDesc = ExecParallelGetQueryDesc ( toc , receiver , instrument_options ) ;
queryDesc = ExecParallelGetQueryDesc ( toc , receiver , instrument_options ) ;
/* Setting debug_query_string for individual workers */
debug_query_string = queryDesc - > sourceText ;
/* Report workers' query for monitoring purposes */
pgstat_report_activity ( STATE_RUNNING , debug_query_string ) ;
/* Prepare to track buffer usage during query execution. */
/* Prepare to track buffer usage during query execution. */
InstrStartParallelQuery ( ) ;
InstrStartParallelQuery ( ) ;