You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
postgres/src/backend/executor/execParallel.c

1508 lines
47 KiB

/*-------------------------------------------------------------------------
*
* execParallel.c
* Support routines for parallel execution.
*
* Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* This file contains routines that are intended to support setting up,
* using, and tearing down a ParallelContext from within the PostgreSQL
* executor. The ParallelContext machinery will handle starting the
* workers and ensuring that their state generally matches that of the
* leader; see src/backend/access/transam/README.parallel for details.
* However, we must save and restore relevant executor state, such as
* any ParamListInfo associated with the query, buffer/WAL usage info, and
* the actual plan to be passed down to the worker.
*
* IDENTIFICATION
* src/backend/executor/execParallel.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "executor/execParallel.h"
#include "executor/executor.h"
#include "executor/nodeAgg.h"
#include "executor/nodeAppend.h"
#include "executor/nodeBitmapHeapscan.h"
#include "executor/nodeCustom.h"
#include "executor/nodeForeignscan.h"
#include "executor/nodeHash.h"
Add parallel-aware hash joins. Introduce parallel-aware hash joins that appear in EXPLAIN plans as Parallel Hash Join with Parallel Hash. While hash joins could already appear in parallel queries, they were previously always parallel-oblivious and had a partial subplan only on the outer side, meaning that the work of the inner subplan was duplicated in every worker. After this commit, the planner will consider using a partial subplan on the inner side too, using the Parallel Hash node to divide the work over the available CPU cores and combine its results in shared memory. If the join needs to be split into multiple batches in order to respect work_mem, then workers process different batches as much as possible and then work together on the remaining batches. The advantages of a parallel-aware hash join over a parallel-oblivious hash join used in a parallel query are that it: * avoids wasting memory on duplicated hash tables * avoids wasting disk space on duplicated batch files * divides the work of building the hash table over the CPUs One disadvantage is that there is some communication between the participating CPUs which might outweigh the benefits of parallelism in the case of small hash tables. This is avoided by the planner's existing reluctance to supply partial plans for small scans, but it may be necessary to estimate synchronization costs in future if that situation changes. Another is that outer batch 0 must be written to disk if multiple batches are required. A potential future advantage of parallel-aware hash joins is that right and full outer joins could be supported, since there is a single set of matched bits for each hashtable, but that is not yet implemented. A new GUC enable_parallel_hash is defined to control the feature, defaulting to on. Author: Thomas Munro Reviewed-By: Andres Freund, Robert Haas Tested-By: Rafia Sabih, Prabhat Sahu Discussion: https://postgr.es/m/CAEepm=2W=cOkiZxcg6qiFQP-dHUe09aqTrEMM7yJDrHMhDv_RA@mail.gmail.com https://postgr.es/m/CAEepm=37HKyJ4U6XOLi=JgfSHM3o6B-GaeO-6hkOmneTDkH+Uw@mail.gmail.com
8 years ago
#include "executor/nodeHashjoin.h"
Implement Incremental Sort Incremental Sort is an optimized variant of multikey sort for cases when the input is already sorted by a prefix of the requested sort keys. For example when the relation is already sorted by (key1, key2) and we need to sort it by (key1, key2, key3) we can simply split the input rows into groups having equal values in (key1, key2), and only sort/compare the remaining column key3. This has a number of benefits: - Reduced memory consumption, because only a single group (determined by values in the sorted prefix) needs to be kept in memory. This may also eliminate the need to spill to disk. - Lower startup cost, because Incremental Sort produce results after each prefix group, which is beneficial for plans where startup cost matters (like for example queries with LIMIT clause). We consider both Sort and Incremental Sort, and decide based on costing. The implemented algorithm operates in two different modes: - Fetching a minimum number of tuples without check of equality on the prefix keys, and sorting on all columns when safe. - Fetching all tuples for a single prefix group and then sorting by comparing only the remaining (non-prefix) keys. We always start in the first mode, and employ a heuristic to switch into the second mode if we believe it's beneficial - the goal is to minimize the number of unnecessary comparions while keeping memory consumption below work_mem. This is a very old patch series. The idea was originally proposed by Alexander Korotkov back in 2013, and then revived in 2017. In 2018 the patch was taken over by James Coleman, who wrote and rewrote most of the current code. There were many reviewers/contributors since 2013 - I've done my best to pick the most active ones, and listed them in this commit message. Author: James Coleman, Alexander Korotkov Reviewed-by: Tomas Vondra, Andreas Karlsson, Marti Raudsepp, Peter Geoghegan, Robert Haas, Thomas Munro, Antonin Houska, Andres Freund, Alexander Kuzmenkov Discussion: https://postgr.es/m/CAPpHfdscOX5an71nHd8WSUH6GNOCf=V7wgDaTXdDd9=goN-gfA@mail.gmail.com Discussion: https://postgr.es/m/CAPpHfds1waRZ=NOmueYq0sx1ZSCnt+5QJvizT8ndT2=etZEeAQ@mail.gmail.com
5 years ago
#include "executor/nodeIncrementalSort.h"
#include "executor/nodeIndexonlyscan.h"
#include "executor/nodeIndexscan.h"
#include "executor/nodeMemoize.h"
#include "executor/nodeSeqscan.h"
#include "executor/nodeSort.h"
Fix failure with initplans used conditionally during EvalPlanQual rechecks. The EvalPlanQual machinery assumes that any initplans (that is, uncorrelated sub-selects) used during an EPQ recheck would have already been evaluated during the main query; this is implicit in the fact that execPlan pointers are not copied into the EPQ estate's es_param_exec_vals. But it's possible for that assumption to fail, if the initplan is only reached conditionally. For example, a sub-select inside a CASE expression could be reached during a recheck when it had not been previously, if the CASE test depends on a column that was just updated. This bug is old, appearing to date back to my rewrite of EvalPlanQual in commit 9f2ee8f28, but was not detected until Kyle Samson reported a case. To fix, force all not-yet-evaluated initplans used within the EPQ plan subtree to be evaluated at the start of the recheck, before entering the EPQ environment. This could be inefficient, if such an initplan is expensive and goes unused again during the recheck --- but that's piling one layer of improbability atop another. It doesn't seem worth adding more complexity to prevent that, at least not in the back branches. It was convenient to use the new-in-v11 ExecEvalParamExecParams function to implement this, but I didn't like either its name or the specifics of its API, so revise that. Back-patch all the way. Rather than rewrite the patch to avoid depending on bms_next_member() in the oldest branches, I chose to back-patch that function into 9.4 and 9.3. (This isn't the first time back-patches have needed that, and it exhausted my patience.) I also chose to back-patch some test cases added by commits 71404af2a and 342a1ffa2 into 9.4 and 9.3, so that the 9.x versions of eval-plan-qual.spec are all the same. Andrew Gierth diagnosed the problem and contributed the added test cases, though the actual code changes are by me. Discussion: https://postgr.es/m/A033A40A-B234-4324-BE37-272279F7B627@tripadvisor.com
7 years ago
#include "executor/nodeSubplan.h"
#include "executor/tqueue.h"
#include "jit/jit.h"
#include "nodes/nodeFuncs.h"
#include "pgstat.h"
#include "tcop/tcopprot.h"
#include "utils/datum.h"
#include "utils/dsa.h"
#include "utils/lsyscache.h"
#include "utils/snapmgr.h"
/*
* Magic numbers for parallel executor communication. We use constants
* greater than any 32-bit integer here so that values < 2^32 can be used
* by individual parallel nodes to store their own state.
*/
#define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001)
#define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002)
#define PARALLEL_KEY_PARAMLISTINFO UINT64CONST(0xE000000000000003)
#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004)
#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005)
#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006)
#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007)
#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
#define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A)
#define PARALLEL_TUPLE_QUEUE_SIZE 65536
/*
* Fixed-size random stuff that we need to pass to parallel workers.
*/
typedef struct FixedParallelExecutorState
{
int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
dsa_pointer param_exec;
int eflags;
int jit_flags;
} FixedParallelExecutorState;
/*
* DSM structure for accumulating per-PlanState instrumentation.
*
* instrument_options: Same meaning here as in instrument.c.
*
* instrument_offset: Offset, relative to the start of this structure,
* of the first Instrumentation object. This will depend on the length of
* the plan_node_id array.
*
* num_workers: Number of workers.
*
* num_plan_nodes: Number of plan nodes.
*
* plan_node_id: Array of plan nodes for which we are gathering instrumentation
* from parallel workers. The length of this array is given by num_plan_nodes.
*/
struct SharedExecutorInstrumentation
{
int instrument_options;
int instrument_offset;
int num_workers;
int num_plan_nodes;
int plan_node_id[FLEXIBLE_ARRAY_MEMBER];
/* array of num_plan_nodes * num_workers Instrumentation objects follows */
};
#define GetInstrumentationArray(sei) \
(AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
(Instrumentation *) (((char *) sei) + sei->instrument_offset))
/* Context object for ExecParallelEstimate. */
typedef struct ExecParallelEstimateContext
{
ParallelContext *pcxt;
int nnodes;
} ExecParallelEstimateContext;
/* Context object for ExecParallelInitializeDSM. */
typedef struct ExecParallelInitializeDSMContext
{
ParallelContext *pcxt;
SharedExecutorInstrumentation *instrumentation;
int nnodes;
} ExecParallelInitializeDSMContext;
/* Helper functions that run in the parallel leader. */
static char *ExecSerializePlan(Plan *plan, EState *estate);
static bool ExecParallelEstimate(PlanState *planstate,
ExecParallelEstimateContext *e);
static bool ExecParallelInitializeDSM(PlanState *planstate,
ExecParallelInitializeDSMContext *d);
static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
bool reinitialize);
static bool ExecParallelReInitializeDSM(PlanState *planstate,
ParallelContext *pcxt);
static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
SharedExecutorInstrumentation *instrumentation);
/* Helper function that runs in the parallel worker. */
static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
/*
* Create a serialized representation of the plan to be sent to each worker.
*/
static char *
ExecSerializePlan(Plan *plan, EState *estate)
{
PlannedStmt *pstmt;
ListCell *lc;
/* We can't scribble on the original plan, so make a copy. */
plan = copyObject(plan);
/*
* The worker will start its own copy of the executor, and that copy will
* insert a junk filter if the toplevel node has any resjunk entries. We
* don't want that to happen, because while resjunk columns shouldn't be
* sent back to the user, here the tuples are coming back to another
* backend which may very well need them. So mutate the target list
* accordingly. This is sort of a hack; there might be better ways to do
* this...
*/
foreach(lc, plan->targetlist)
{
TargetEntry *tle = lfirst_node(TargetEntry, lc);
tle->resjunk = false;
}
/*
* Create a dummy PlannedStmt. Most of the fields don't need to be valid
* for our purposes, but the worker will need at least a minimal
* PlannedStmt to start the executor.
*/
pstmt = makeNode(PlannedStmt);
pstmt->commandType = CMD_SELECT;
pstmt->queryId = pgstat_get_my_query_id();
Avoid invalidating all foreign-join cached plans when user mappings change. We must not push down a foreign join when the foreign tables involved should be accessed under different user mappings. Previously we tried to enforce that rule literally during planning, but that meant that the resulting plans were dependent on the current contents of the pg_user_mapping catalog, and we had to blow away all cached plans containing any remote join when anything at all changed in pg_user_mapping. This could have been improved somewhat, but the fact that a syscache inval callback has very limited info about what changed made it hard to do better within that design. Instead, let's change the planner to not consider user mappings per se, but to allow a foreign join if both RTEs have the same checkAsUser value. If they do, then they necessarily will use the same user mapping at runtime, and we don't need to know specifically which one that is. Post-plan-time changes in pg_user_mapping no longer require any plan invalidation. This rule does give up some optimization ability, to wit where two foreign table references come from views with different owners or one's from a view and one's directly in the query, but nonetheless the same user mapping would have applied. We'll sacrifice the first case, but to not regress more than we have to in the second case, allow a foreign join involving both zero and nonzero checkAsUser values if the nonzero one is the same as the prevailing effective userID. In that case, mark the plan as only runnable by that userID. The plancache code already had a notion of plans being userID-specific, in order to support RLS. It was a little confused though, in particular lacking clarity of thought as to whether it was the rewritten query or just the finished plan that's dependent on the userID. Rearrange that code so that it's clearer what depends on which, and so that the same logic applies to both RLS-injected role dependency and foreign-join-injected role dependency. Note that this patch doesn't remove the other issue mentioned in the original complaint, which is that while we'll reliably stop using a foreign join if it's disallowed in a new context, we might fail to start using a foreign join if it's now allowed, but we previously created a generic cached plan that didn't use one. It was agreed that the chance of winning that way was not high enough to justify the much larger number of plan invalidations that would have to occur if we tried to cause it to happen. In passing, clean up randomly-varying spelling of EXPLAIN commands in postgres_fdw.sql, and fix a COSTS ON example that had been allowed to leak into the committed tests. This reverts most of commits fbe5a3fb7 and 5d4171d1c, which were the previous attempt at ensuring we wouldn't push down foreign joins that span permissions contexts. Etsuro Fujita and Tom Lane Discussion: <d49c1e5b-f059-20f4-c132-e9752ee0113e@lab.ntt.co.jp>
9 years ago
pstmt->hasReturning = false;
pstmt->hasModifyingCTE = false;
pstmt->canSetTag = true;
pstmt->transientPlan = false;
pstmt->dependsOnRole = false;
pstmt->parallelModeNeeded = false;
pstmt->planTree = plan;
pstmt->partPruneInfos = estate->es_part_prune_infos;
pstmt->rtable = estate->es_range_table;
Track unpruned relids to avoid processing pruned relations This commit introduces changes to track unpruned relations explicitly, making it possible for top-level plan nodes, such as ModifyTable and LockRows, to avoid processing partitions pruned during initial pruning. Scan-level nodes, such as Append and MergeAppend, already avoid the unnecessary processing by accessing partition pruning results directly via part_prune_index. In contrast, top-level nodes cannot access pruning results directly and need to determine which partitions remain unpruned. To address this, this commit introduces a new bitmapset field, es_unpruned_relids, which the executor uses to track the set of unpruned relations. This field is referenced during plan initialization to skip initializing certain nodes for pruned partitions. It is initialized with PlannedStmt.unprunableRelids, a new field that the planner populates with RT indexes of relations that cannot be pruned during runtime pruning. These include relations not subject to partition pruning and those required for execution regardless of pruning. PlannedStmt.unprunableRelids is computed during set_plan_refs() by removing the RT indexes of runtime-prunable relations, identified from PartitionPruneInfos, from the full set of relation RT indexes. ExecDoInitialPruning() then updates es_unpruned_relids by adding partitions that survive initial pruning. To support this, PartitionedRelPruneInfo and PartitionedRelPruningData now include a leafpart_rti_map[] array that maps partition indexes to their corresponding RT indexes. The former is used in set_plan_refs() when constructing unprunableRelids, while the latter is used in ExecDoInitialPruning() to convert partition indexes returned by get_matching_partitions() into RT indexes, which are then added to es_unpruned_relids. These changes make it possible for ModifyTable and LockRows nodes to process only relations that remain unpruned after initial pruning. ExecInitModifyTable() trims lists, such as resultRelations, withCheckOptionLists, returningLists, and updateColnosLists, to consider only unpruned partitions. It also creates ResultRelInfo structs only for these partitions. Similarly, child RowMarks for pruned relations are skipped. By avoiding unnecessary initialization of structures for pruned partitions, these changes improve the performance of updates and deletes on partitioned tables during initial runtime pruning. Due to ExecInitModifyTable() changes as described above, EXPLAIN on a plan for UPDATE and DELETE that uses runtime initial pruning no longer lists partitions pruned during initial pruning. Reviewed-by: Robert Haas <robertmhaas@gmail.com> (earlier versions) Reviewed-by: Tomas Vondra <tomas@vondra.me> Discussion: https://postgr.es/m/CA+HiwqFGkMSge6TgC9KQzde0ohpAycLQuV7ooitEEpbKB0O_mg@mail.gmail.com
5 months ago
pstmt->unprunableRelids = estate->es_unpruned_relids;
Rework query relation permission checking Currently, information about the permissions to be checked on relations mentioned in a query is stored in their range table entries. So the executor must scan the entire range table looking for relations that need to have permissions checked. This can make the permission checking part of the executor initialization needlessly expensive when many inheritance children are present in the range range. While the permissions need not be checked on the individual child relations, the executor still must visit every range table entry to filter them out. This commit moves the permission checking information out of the range table entries into a new plan node called RTEPermissionInfo. Every top-level (inheritance "root") RTE_RELATION entry in the range table gets one and a list of those is maintained alongside the range table. This new list is initialized by the parser when initializing the range table. The rewriter can add more entries to it as rules/views are expanded. Finally, the planner combines the lists of the individual subqueries into one flat list that is passed to the executor for checking. To make it quick to find the RTEPermissionInfo entry belonging to a given relation, RangeTblEntry gets a new Index field 'perminfoindex' that stores the corresponding RTEPermissionInfo's index in the query's list of the latter. ExecutorCheckPerms_hook has gained another List * argument; the signature is now: typedef bool (*ExecutorCheckPerms_hook_type) (List *rangeTable, List *rtePermInfos, bool ereport_on_violation); The first argument is no longer used by any in-core uses of the hook, but we leave it in place because there may be other implementations that do. Implementations should likely scan the rtePermInfos list to determine which operations to allow or deny. Author: Amit Langote <amitlangote09@gmail.com> Discussion: https://postgr.es/m/CA+HiwqGjJDmUhDSfv-U2qhKJjt9ST7Xh9JXC_irsAQ1TAUsJYg@mail.gmail.com
3 years ago
pstmt->permInfos = estate->es_rteperminfos;
pstmt->resultRelations = NIL;
Further adjust EXPLAIN's choices of table alias names. This patch causes EXPLAIN to always assign a separate table alias to the parent RTE of an append relation (inheritance set); before, such RTEs were ignored if not actually scanned by the plan. Since the child RTEs now always have that same alias to start with (cf. commit 55a1954da), the net effect is that the parent RTE usually gets the alias used or implied by the query text, and the children all get that alias with "_N" appended. (The exception to "usually" is if there are duplicate aliases in different subtrees of the original query; then some of those original RTEs will also have "_N" appended.) This results in more uniform output for partitioned-table plans than we had before: the partitioned table itself gets the original alias, and all child tables have aliases with "_N", rather than the previous behavior where one of the children would get an alias without "_N". The reason for giving the parent RTE an alias, even if it isn't scanned by the plan, is that we now use the parent's alias to qualify Vars that refer to an appendrel output column and appear above the Append or MergeAppend that computes the appendrel. But below the append, Vars refer to some one of the child relations, and are displayed that way. This seems clearer than the old behavior where a Var that could carry values from any child relation was displayed as if it referred to only one of them. While at it, change ruleutils.c so that the code paths used by EXPLAIN deal in Plan trees not PlanState trees. This effectively reverts a decision made in commit 1cc29fe7c, which seemed like a good idea at the time to make ruleutils.c consistent with explain.c. However, it's problematic because we'd really like to allow executor startup pruning to remove all the children of an append node when possible, leaving no child PlanState to resolve Vars against. (That's not done here, but will be in the next patch.) This requires different handling of subplans and initplans than before, but is otherwise a pretty straightforward change. Discussion: https://postgr.es/m/001001d4f44b$2a2cca50$7e865ef0$@lab.ntt.co.jp
6 years ago
pstmt->appendRelations = NIL;
/*
* Transfer only parallel-safe subplans, leaving a NULL "hole" in the list
* for unsafe ones (so that the list indexes of the safe ones are
* preserved). This positively ensures that the worker won't try to run,
* or even do ExecInitNode on, an unsafe subplan. That's important to
* protect, eg, non-parallel-aware FDWs from getting into trouble.
*/
pstmt->subplans = NIL;
foreach(lc, estate->es_plannedstmt->subplans)
{
Plan *subplan = (Plan *) lfirst(lc);
if (subplan && !subplan->parallel_safe)
subplan = NULL;
pstmt->subplans = lappend(pstmt->subplans, subplan);
}
pstmt->rewindPlanIDs = NULL;
pstmt->rowMarks = NIL;
pstmt->relationOids = NIL;
pstmt->invalItems = NIL; /* workers can't replan anyway... */
pstmt->paramExecTypes = estate->es_plannedstmt->paramExecTypes;
Change representation of statement lists, and add statement location info. This patch makes several changes that improve the consistency of representation of lists of statements. It's always been the case that the output of parse analysis is a list of Query nodes, whatever the types of the individual statements in the list. This patch brings similar consistency to the outputs of raw parsing and planning steps: * The output of raw parsing is now always a list of RawStmt nodes; the statement-type-dependent nodes are one level down from that. * The output of pg_plan_queries() is now always a list of PlannedStmt nodes, even for utility statements. In the case of a utility statement, "planning" just consists of wrapping a CMD_UTILITY PlannedStmt around the utility node. This list representation is now used in Portal and CachedPlan plan lists, replacing the former convention of intermixing PlannedStmts with bare utility-statement nodes. Now, every list of statements has a consistent head-node type depending on how far along it is in processing. This allows changing many places that formerly used generic "Node *" pointers to use a more specific pointer type, thus reducing the number of IsA() tests and casts needed, as well as improving code clarity. Also, the post-parse-analysis representation of DECLARE CURSOR is changed so that it looks more like EXPLAIN, PREPARE, etc. That is, the contained SELECT remains a child of the DeclareCursorStmt rather than getting flipped around to be the other way. It's now true for both Query and PlannedStmt that utilityStmt is non-null if and only if commandType is CMD_UTILITY. That allows simplifying a lot of places that were testing both fields. (I think some of those were just defensive programming, but in many places, it was actually necessary to avoid confusing DECLARE CURSOR with SELECT.) Because PlannedStmt carries a canSetTag field, we're also able to get rid of some ad-hoc rules about how to reconstruct canSetTag for a bare utility statement; specifically, the assumption that a utility is canSetTag if and only if it's the only one in its list. While I see no near-term need for relaxing that restriction, it's nice to get rid of the ad-hocery. The API of ProcessUtility() is changed so that what it's passed is the wrapper PlannedStmt not just the bare utility statement. This will affect all users of ProcessUtility_hook, but the changes are pretty trivial; see the affected contrib modules for examples of the minimum change needed. (Most compilers should give pointer-type-mismatch warnings for uncorrected code.) There's also a change in the API of ExplainOneQuery_hook, to pass through cursorOptions instead of expecting hook functions to know what to pick. This is needed because of the DECLARE CURSOR changes, but really should have been done in 9.6; it's unlikely that any extant hook functions know about using CURSOR_OPT_PARALLEL_OK. Finally, teach gram.y to save statement boundary locations in RawStmt nodes, and pass those through to Query and PlannedStmt nodes. This allows more intelligent handling of cases where a source query string contains multiple statements. This patch doesn't actually do anything with the information, but a follow-on patch will. (Passing this information through cleanly is the true motivation for these changes; while I think this is all good cleanup, it's unlikely we'd have bothered without this end goal.) catversion bump because addition of location fields to struct Query affects stored rules. This patch is by me, but it owes a good deal to Fabien Coelho who did a lot of preliminary work on the problem, and also reviewed the patch. Discussion: https://postgr.es/m/alpine.DEB.2.20.1612200926310.29821@lancre
9 years ago
pstmt->utilityStmt = NULL;
pstmt->stmt_location = -1;
pstmt->stmt_len = -1;
/* Return serialized copy of our dummy PlannedStmt. */
return nodeToString(pstmt);
}
/*
* Parallel-aware plan nodes (and occasionally others) may need some state
* which is shared across all parallel workers. Before we size the DSM, give
* them a chance to call shm_toc_estimate_chunk or shm_toc_estimate_keys on
* &pcxt->estimator.
*
* While we're at it, count the number of PlanState nodes in the tree, so
* we know how many Instrumentation structures we need.
*/
static bool
ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
{
if (planstate == NULL)
return false;
/* Count this node. */
e->nnodes++;
switch (nodeTag(planstate))
{
case T_SeqScanState:
if (planstate->plan->parallel_aware)
ExecSeqScanEstimate((SeqScanState *) planstate,
e->pcxt);
break;
case T_IndexScanState:
if (planstate->plan->parallel_aware)
ExecIndexScanEstimate((IndexScanState *) planstate,
e->pcxt);
break;
case T_IndexOnlyScanState:
if (planstate->plan->parallel_aware)
ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
e->pcxt);
break;
case T_ForeignScanState:
if (planstate->plan->parallel_aware)
ExecForeignScanEstimate((ForeignScanState *) planstate,
e->pcxt);
break;
case T_AppendState:
if (planstate->plan->parallel_aware)
ExecAppendEstimate((AppendState *) planstate,
e->pcxt);
break;
case T_CustomScanState:
if (planstate->plan->parallel_aware)
ExecCustomScanEstimate((CustomScanState *) planstate,
e->pcxt);
break;
case T_BitmapHeapScanState:
if (planstate->plan->parallel_aware)
ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
e->pcxt);
break;
Add parallel-aware hash joins. Introduce parallel-aware hash joins that appear in EXPLAIN plans as Parallel Hash Join with Parallel Hash. While hash joins could already appear in parallel queries, they were previously always parallel-oblivious and had a partial subplan only on the outer side, meaning that the work of the inner subplan was duplicated in every worker. After this commit, the planner will consider using a partial subplan on the inner side too, using the Parallel Hash node to divide the work over the available CPU cores and combine its results in shared memory. If the join needs to be split into multiple batches in order to respect work_mem, then workers process different batches as much as possible and then work together on the remaining batches. The advantages of a parallel-aware hash join over a parallel-oblivious hash join used in a parallel query are that it: * avoids wasting memory on duplicated hash tables * avoids wasting disk space on duplicated batch files * divides the work of building the hash table over the CPUs One disadvantage is that there is some communication between the participating CPUs which might outweigh the benefits of parallelism in the case of small hash tables. This is avoided by the planner's existing reluctance to supply partial plans for small scans, but it may be necessary to estimate synchronization costs in future if that situation changes. Another is that outer batch 0 must be written to disk if multiple batches are required. A potential future advantage of parallel-aware hash joins is that right and full outer joins could be supported, since there is a single set of matched bits for each hashtable, but that is not yet implemented. A new GUC enable_parallel_hash is defined to control the feature, defaulting to on. Author: Thomas Munro Reviewed-By: Andres Freund, Robert Haas Tested-By: Rafia Sabih, Prabhat Sahu Discussion: https://postgr.es/m/CAEepm=2W=cOkiZxcg6qiFQP-dHUe09aqTrEMM7yJDrHMhDv_RA@mail.gmail.com https://postgr.es/m/CAEepm=37HKyJ4U6XOLi=JgfSHM3o6B-GaeO-6hkOmneTDkH+Uw@mail.gmail.com
8 years ago
case T_HashJoinState:
if (planstate->plan->parallel_aware)
ExecHashJoinEstimate((HashJoinState *) planstate,
e->pcxt);
break;
case T_HashState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecHashEstimate((HashState *) planstate, e->pcxt);
break;
case T_SortState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecSortEstimate((SortState *) planstate, e->pcxt);
break;
Implement Incremental Sort Incremental Sort is an optimized variant of multikey sort for cases when the input is already sorted by a prefix of the requested sort keys. For example when the relation is already sorted by (key1, key2) and we need to sort it by (key1, key2, key3) we can simply split the input rows into groups having equal values in (key1, key2), and only sort/compare the remaining column key3. This has a number of benefits: - Reduced memory consumption, because only a single group (determined by values in the sorted prefix) needs to be kept in memory. This may also eliminate the need to spill to disk. - Lower startup cost, because Incremental Sort produce results after each prefix group, which is beneficial for plans where startup cost matters (like for example queries with LIMIT clause). We consider both Sort and Incremental Sort, and decide based on costing. The implemented algorithm operates in two different modes: - Fetching a minimum number of tuples without check of equality on the prefix keys, and sorting on all columns when safe. - Fetching all tuples for a single prefix group and then sorting by comparing only the remaining (non-prefix) keys. We always start in the first mode, and employ a heuristic to switch into the second mode if we believe it's beneficial - the goal is to minimize the number of unnecessary comparions while keeping memory consumption below work_mem. This is a very old patch series. The idea was originally proposed by Alexander Korotkov back in 2013, and then revived in 2017. In 2018 the patch was taken over by James Coleman, who wrote and rewrote most of the current code. There were many reviewers/contributors since 2013 - I've done my best to pick the most active ones, and listed them in this commit message. Author: James Coleman, Alexander Korotkov Reviewed-by: Tomas Vondra, Andreas Karlsson, Marti Raudsepp, Peter Geoghegan, Robert Haas, Thomas Munro, Antonin Houska, Andres Freund, Alexander Kuzmenkov Discussion: https://postgr.es/m/CAPpHfdscOX5an71nHd8WSUH6GNOCf=V7wgDaTXdDd9=goN-gfA@mail.gmail.com Discussion: https://postgr.es/m/CAPpHfds1waRZ=NOmueYq0sx1ZSCnt+5QJvizT8ndT2=etZEeAQ@mail.gmail.com
5 years ago
case T_IncrementalSortState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
break;
case T_AggState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecAggEstimate((AggState *) planstate, e->pcxt);
break;
case T_MemoizeState:
Add Result Cache executor node (take 2) Here we add a new executor node type named "Result Cache". The planner can include this node type in the plan to have the executor cache the results from the inner side of parameterized nested loop joins. This allows caching of tuples for sets of parameters so that in the event that the node sees the same parameter values again, it can just return the cached tuples instead of rescanning the inner side of the join all over again. Internally, result cache uses a hash table in order to quickly find tuples that have been previously cached. For certain data sets, this can significantly improve the performance of joins. The best cases for using this new node type are for join problems where a large portion of the tuples from the inner side of the join have no join partner on the outer side of the join. In such cases, hash join would have to hash values that are never looked up, thus bloating the hash table and possibly causing it to multi-batch. Merge joins would have to skip over all of the unmatched rows. If we use a nested loop join with a result cache, then we only cache tuples that have at least one join partner on the outer side of the join. The benefits of using a parameterized nested loop with a result cache increase when there are fewer distinct values being looked up and the number of lookups of each value is large. Also, hash probes to lookup the cache can be much faster than the hash probe in a hash join as it's common that the result cache's hash table is much smaller than the hash join's due to result cache only caching useful tuples rather than all tuples from the inner side of the join. This variation in hash probe performance is more significant when the hash join's hash table no longer fits into the CPU's L3 cache, but the result cache's hash table does. The apparent "random" access of hash buckets with each hash probe can cause a poor L3 cache hit ratio for large hash tables. Smaller hash tables generally perform better. The hash table used for the cache limits itself to not exceeding work_mem * hash_mem_multiplier in size. We maintain a dlist of keys for this cache and when we're adding new tuples and realize we've exceeded the memory budget, we evict cache entries starting with the least recently used ones until we have enough memory to add the new tuples to the cache. For parameterized nested loop joins, we now consider using one of these result cache nodes in between the nested loop node and its inner node. We determine when this might be useful based on cost, which is primarily driven off of what the expected cache hit ratio will be. Estimating the cache hit ratio relies on having good distinct estimates on the nested loop's parameters. For now, the planner will only consider using a result cache for parameterized nested loop joins. This works for both normal joins and also for LATERAL type joins to subqueries. It is possible to use this new node for other uses in the future. For example, to cache results from correlated subqueries. However, that's not done here due to some difficulties obtaining a distinct estimation on the outer plan to calculate the estimated cache hit ratio. Currently we plan the inner plan before planning the outer plan so there is no good way to know if a result cache would be useful or not since we can't estimate the number of times the subplan will be called until the outer plan is generated. The functionality being added here is newly introducing a dependency on the return value of estimate_num_groups() during the join search. Previously, during the join search, we only ever needed to perform selectivity estimations. With this commit, we need to use estimate_num_groups() in order to estimate what the hit ratio on the result cache will be. In simple terms, if we expect 10 distinct values and we expect 1000 outer rows, then we'll estimate the hit ratio to be 99%. Since cache hits are very cheap compared to scanning the underlying nodes on the inner side of the nested loop join, then this will significantly reduce the planner's cost for the join. However, it's fairly easy to see here that things will go bad when estimate_num_groups() incorrectly returns a value that's significantly lower than the actual number of distinct values. If this happens then that may cause us to make use of a nested loop join with a result cache instead of some other join type, such as a merge or hash join. Our distinct estimations have been known to be a source of trouble in the past, so the extra reliance on them here could cause the planner to choose slower plans than it did previous to having this feature. Distinct estimations are also fairly hard to estimate accurately when several tables have been joined already or when a WHERE clause filters out a set of values that are correlated to the expressions we're estimating the number of distinct value for. For now, the costing we perform during query planning for result caches does put quite a bit of faith in the distinct estimations being accurate. When these are accurate then we should generally see faster execution times for plans containing a result cache. However, in the real world, we may find that we need to either change the costings to put less trust in the distinct estimations being accurate or perhaps even disable this feature by default. There's always an element of risk when we teach the query planner to do new tricks that it decides to use that new trick at the wrong time and causes a regression. Users may opt to get the old behavior by turning the feature off using the enable_resultcache GUC. Currently, this is enabled by default. It remains to be seen if we'll maintain that setting for the release. Additionally, the name "Result Cache" is the best name I could think of for this new node at the time I started writing the patch. Nobody seems to strongly dislike the name. A few people did suggest other names but no other name seemed to dominate in the brief discussion that there was about names. Let's allow the beta period to see if the current name pleases enough people. If there's some consensus on a better name, then we can change it before the release. Please see the 2nd discussion link below for the discussion on the "Result Cache" name. Author: David Rowley Reviewed-by: Andy Fan, Justin Pryzby, Zhihong Yu, Hou Zhijie Tested-By: Konstantin Knizhnik Discussion: https://postgr.es/m/CAApHDvrPcQyQdWERGYWx8J%2B2DLUNgXu%2BfOSbQ1UscxrunyXyrQ%40mail.gmail.com Discussion: https://postgr.es/m/CAApHDvq=yQXr5kqhRviT2RhNKwToaWr9JAN5t+5_PzhuRJ3wvg@mail.gmail.com
4 years ago
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecMemoizeEstimate((MemoizeState *) planstate, e->pcxt);
Add Result Cache executor node (take 2) Here we add a new executor node type named "Result Cache". The planner can include this node type in the plan to have the executor cache the results from the inner side of parameterized nested loop joins. This allows caching of tuples for sets of parameters so that in the event that the node sees the same parameter values again, it can just return the cached tuples instead of rescanning the inner side of the join all over again. Internally, result cache uses a hash table in order to quickly find tuples that have been previously cached. For certain data sets, this can significantly improve the performance of joins. The best cases for using this new node type are for join problems where a large portion of the tuples from the inner side of the join have no join partner on the outer side of the join. In such cases, hash join would have to hash values that are never looked up, thus bloating the hash table and possibly causing it to multi-batch. Merge joins would have to skip over all of the unmatched rows. If we use a nested loop join with a result cache, then we only cache tuples that have at least one join partner on the outer side of the join. The benefits of using a parameterized nested loop with a result cache increase when there are fewer distinct values being looked up and the number of lookups of each value is large. Also, hash probes to lookup the cache can be much faster than the hash probe in a hash join as it's common that the result cache's hash table is much smaller than the hash join's due to result cache only caching useful tuples rather than all tuples from the inner side of the join. This variation in hash probe performance is more significant when the hash join's hash table no longer fits into the CPU's L3 cache, but the result cache's hash table does. The apparent "random" access of hash buckets with each hash probe can cause a poor L3 cache hit ratio for large hash tables. Smaller hash tables generally perform better. The hash table used for the cache limits itself to not exceeding work_mem * hash_mem_multiplier in size. We maintain a dlist of keys for this cache and when we're adding new tuples and realize we've exceeded the memory budget, we evict cache entries starting with the least recently used ones until we have enough memory to add the new tuples to the cache. For parameterized nested loop joins, we now consider using one of these result cache nodes in between the nested loop node and its inner node. We determine when this might be useful based on cost, which is primarily driven off of what the expected cache hit ratio will be. Estimating the cache hit ratio relies on having good distinct estimates on the nested loop's parameters. For now, the planner will only consider using a result cache for parameterized nested loop joins. This works for both normal joins and also for LATERAL type joins to subqueries. It is possible to use this new node for other uses in the future. For example, to cache results from correlated subqueries. However, that's not done here due to some difficulties obtaining a distinct estimation on the outer plan to calculate the estimated cache hit ratio. Currently we plan the inner plan before planning the outer plan so there is no good way to know if a result cache would be useful or not since we can't estimate the number of times the subplan will be called until the outer plan is generated. The functionality being added here is newly introducing a dependency on the return value of estimate_num_groups() during the join search. Previously, during the join search, we only ever needed to perform selectivity estimations. With this commit, we need to use estimate_num_groups() in order to estimate what the hit ratio on the result cache will be. In simple terms, if we expect 10 distinct values and we expect 1000 outer rows, then we'll estimate the hit ratio to be 99%. Since cache hits are very cheap compared to scanning the underlying nodes on the inner side of the nested loop join, then this will significantly reduce the planner's cost for the join. However, it's fairly easy to see here that things will go bad when estimate_num_groups() incorrectly returns a value that's significantly lower than the actual number of distinct values. If this happens then that may cause us to make use of a nested loop join with a result cache instead of some other join type, such as a merge or hash join. Our distinct estimations have been known to be a source of trouble in the past, so the extra reliance on them here could cause the planner to choose slower plans than it did previous to having this feature. Distinct estimations are also fairly hard to estimate accurately when several tables have been joined already or when a WHERE clause filters out a set of values that are correlated to the expressions we're estimating the number of distinct value for. For now, the costing we perform during query planning for result caches does put quite a bit of faith in the distinct estimations being accurate. When these are accurate then we should generally see faster execution times for plans containing a result cache. However, in the real world, we may find that we need to either change the costings to put less trust in the distinct estimations being accurate or perhaps even disable this feature by default. There's always an element of risk when we teach the query planner to do new tricks that it decides to use that new trick at the wrong time and causes a regression. Users may opt to get the old behavior by turning the feature off using the enable_resultcache GUC. Currently, this is enabled by default. It remains to be seen if we'll maintain that setting for the release. Additionally, the name "Result Cache" is the best name I could think of for this new node at the time I started writing the patch. Nobody seems to strongly dislike the name. A few people did suggest other names but no other name seemed to dominate in the brief discussion that there was about names. Let's allow the beta period to see if the current name pleases enough people. If there's some consensus on a better name, then we can change it before the release. Please see the 2nd discussion link below for the discussion on the "Result Cache" name. Author: David Rowley Reviewed-by: Andy Fan, Justin Pryzby, Zhihong Yu, Hou Zhijie Tested-By: Konstantin Knizhnik Discussion: https://postgr.es/m/CAApHDvrPcQyQdWERGYWx8J%2B2DLUNgXu%2BfOSbQ1UscxrunyXyrQ%40mail.gmail.com Discussion: https://postgr.es/m/CAApHDvq=yQXr5kqhRviT2RhNKwToaWr9JAN5t+5_PzhuRJ3wvg@mail.gmail.com
4 years ago
break;
default:
break;
}
return planstate_tree_walker(planstate, ExecParallelEstimate, e);
}
/*
* Estimate the amount of space required to serialize the indicated parameters.
*/
static Size
EstimateParamExecSpace(EState *estate, Bitmapset *params)
{
int paramid;
Size sz = sizeof(int);
paramid = -1;
while ((paramid = bms_next_member(params, paramid)) >= 0)
{
Oid typeOid;
int16 typLen;
bool typByVal;
ParamExecData *prm;
prm = &(estate->es_param_exec_vals[paramid]);
typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
paramid);
sz = add_size(sz, sizeof(int)); /* space for paramid */
/* space for datum/isnull */
if (OidIsValid(typeOid))
get_typlenbyval(typeOid, &typLen, &typByVal);
else
{
/* If no type OID, assume by-value, like copyParamList does. */
typLen = sizeof(Datum);
typByVal = true;
}
sz = add_size(sz,
datumEstimateSpace(prm->value, prm->isnull,
typByVal, typLen));
}
return sz;
}
/*
* Serialize specified PARAM_EXEC parameters.
*
* We write the number of parameters first, as a 4-byte integer, and then
* write details for each parameter in turn. The details for each parameter
* consist of a 4-byte paramid (location of param in execution time internal
* parameter array) and then the datum as serialized by datumSerialize().
*/
static dsa_pointer
SerializeParamExecParams(EState *estate, Bitmapset *params, dsa_area *area)
{
Size size;
int nparams;
int paramid;
ParamExecData *prm;
dsa_pointer handle;
char *start_address;
/* Allocate enough space for the current parameter values. */
size = EstimateParamExecSpace(estate, params);
handle = dsa_allocate(area, size);
start_address = dsa_get_address(area, handle);
/* First write the number of parameters as a 4-byte integer. */
nparams = bms_num_members(params);
memcpy(start_address, &nparams, sizeof(int));
start_address += sizeof(int);
/* Write details for each parameter in turn. */
paramid = -1;
while ((paramid = bms_next_member(params, paramid)) >= 0)
{
Oid typeOid;
int16 typLen;
bool typByVal;
prm = &(estate->es_param_exec_vals[paramid]);
typeOid = list_nth_oid(estate->es_plannedstmt->paramExecTypes,
paramid);
/* Write paramid. */
memcpy(start_address, &paramid, sizeof(int));
start_address += sizeof(int);
/* Write datum/isnull */
if (OidIsValid(typeOid))
get_typlenbyval(typeOid, &typLen, &typByVal);
else
{
/* If no type OID, assume by-value, like copyParamList does. */
typLen = sizeof(Datum);
typByVal = true;
}
datumSerialize(prm->value, prm->isnull, typByVal, typLen,
&start_address);
}
return handle;
}
/*
* Restore specified PARAM_EXEC parameters.
*/
static void
RestoreParamExecParams(char *start_address, EState *estate)
{
int nparams;
int i;
int paramid;
memcpy(&nparams, start_address, sizeof(int));
start_address += sizeof(int);
for (i = 0; i < nparams; i++)
{
ParamExecData *prm;
/* Read paramid */
memcpy(&paramid, start_address, sizeof(int));
start_address += sizeof(int);
prm = &(estate->es_param_exec_vals[paramid]);
/* Read datum/isnull. */
prm->value = datumRestore(&start_address, &prm->isnull);
prm->execPlan = NULL;
}
}
/*
* Initialize the dynamic shared memory segment that will be used to control
* parallel execution.
*/
static bool
ExecParallelInitializeDSM(PlanState *planstate,
ExecParallelInitializeDSMContext *d)
{
if (planstate == NULL)
return false;
/* If instrumentation is enabled, initialize slot for this node. */
if (d->instrumentation != NULL)
d->instrumentation->plan_node_id[d->nnodes] =
planstate->plan->plan_node_id;
/* Count this node. */
d->nnodes++;
/*
* Call initializers for DSM-using plan nodes.
*
* Most plan nodes won't do anything here, but plan nodes that allocated
* DSM may need to initialize shared state in the DSM before parallel
* workers are launched. They can allocate the space they previously
* estimated using shm_toc_allocate, and add the keys they previously
* estimated using shm_toc_insert, in each case targeting pcxt->toc.
*/
switch (nodeTag(planstate))
{
case T_SeqScanState:
if (planstate->plan->parallel_aware)
ExecSeqScanInitializeDSM((SeqScanState *) planstate,
d->pcxt);
break;
case T_IndexScanState:
if (planstate->plan->parallel_aware)
ExecIndexScanInitializeDSM((IndexScanState *) planstate,
d->pcxt);
break;
case T_IndexOnlyScanState:
if (planstate->plan->parallel_aware)
ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
d->pcxt);
break;
case T_ForeignScanState:
if (planstate->plan->parallel_aware)
ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
d->pcxt);
break;
case T_AppendState:
if (planstate->plan->parallel_aware)
ExecAppendInitializeDSM((AppendState *) planstate,
d->pcxt);
break;
case T_CustomScanState:
if (planstate->plan->parallel_aware)
ExecCustomScanInitializeDSM((CustomScanState *) planstate,
d->pcxt);
break;
case T_BitmapHeapScanState:
if (planstate->plan->parallel_aware)
ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
d->pcxt);
break;
Add parallel-aware hash joins. Introduce parallel-aware hash joins that appear in EXPLAIN plans as Parallel Hash Join with Parallel Hash. While hash joins could already appear in parallel queries, they were previously always parallel-oblivious and had a partial subplan only on the outer side, meaning that the work of the inner subplan was duplicated in every worker. After this commit, the planner will consider using a partial subplan on the inner side too, using the Parallel Hash node to divide the work over the available CPU cores and combine its results in shared memory. If the join needs to be split into multiple batches in order to respect work_mem, then workers process different batches as much as possible and then work together on the remaining batches. The advantages of a parallel-aware hash join over a parallel-oblivious hash join used in a parallel query are that it: * avoids wasting memory on duplicated hash tables * avoids wasting disk space on duplicated batch files * divides the work of building the hash table over the CPUs One disadvantage is that there is some communication between the participating CPUs which might outweigh the benefits of parallelism in the case of small hash tables. This is avoided by the planner's existing reluctance to supply partial plans for small scans, but it may be necessary to estimate synchronization costs in future if that situation changes. Another is that outer batch 0 must be written to disk if multiple batches are required. A potential future advantage of parallel-aware hash joins is that right and full outer joins could be supported, since there is a single set of matched bits for each hashtable, but that is not yet implemented. A new GUC enable_parallel_hash is defined to control the feature, defaulting to on. Author: Thomas Munro Reviewed-By: Andres Freund, Robert Haas Tested-By: Rafia Sabih, Prabhat Sahu Discussion: https://postgr.es/m/CAEepm=2W=cOkiZxcg6qiFQP-dHUe09aqTrEMM7yJDrHMhDv_RA@mail.gmail.com https://postgr.es/m/CAEepm=37HKyJ4U6XOLi=JgfSHM3o6B-GaeO-6hkOmneTDkH+Uw@mail.gmail.com
8 years ago
case T_HashJoinState:
if (planstate->plan->parallel_aware)
ExecHashJoinInitializeDSM((HashJoinState *) planstate,
d->pcxt);
break;
case T_HashState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecHashInitializeDSM((HashState *) planstate, d->pcxt);
break;
case T_SortState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
break;
Implement Incremental Sort Incremental Sort is an optimized variant of multikey sort for cases when the input is already sorted by a prefix of the requested sort keys. For example when the relation is already sorted by (key1, key2) and we need to sort it by (key1, key2, key3) we can simply split the input rows into groups having equal values in (key1, key2), and only sort/compare the remaining column key3. This has a number of benefits: - Reduced memory consumption, because only a single group (determined by values in the sorted prefix) needs to be kept in memory. This may also eliminate the need to spill to disk. - Lower startup cost, because Incremental Sort produce results after each prefix group, which is beneficial for plans where startup cost matters (like for example queries with LIMIT clause). We consider both Sort and Incremental Sort, and decide based on costing. The implemented algorithm operates in two different modes: - Fetching a minimum number of tuples without check of equality on the prefix keys, and sorting on all columns when safe. - Fetching all tuples for a single prefix group and then sorting by comparing only the remaining (non-prefix) keys. We always start in the first mode, and employ a heuristic to switch into the second mode if we believe it's beneficial - the goal is to minimize the number of unnecessary comparions while keeping memory consumption below work_mem. This is a very old patch series. The idea was originally proposed by Alexander Korotkov back in 2013, and then revived in 2017. In 2018 the patch was taken over by James Coleman, who wrote and rewrote most of the current code. There were many reviewers/contributors since 2013 - I've done my best to pick the most active ones, and listed them in this commit message. Author: James Coleman, Alexander Korotkov Reviewed-by: Tomas Vondra, Andreas Karlsson, Marti Raudsepp, Peter Geoghegan, Robert Haas, Thomas Munro, Antonin Houska, Andres Freund, Alexander Kuzmenkov Discussion: https://postgr.es/m/CAPpHfdscOX5an71nHd8WSUH6GNOCf=V7wgDaTXdDd9=goN-gfA@mail.gmail.com Discussion: https://postgr.es/m/CAPpHfds1waRZ=NOmueYq0sx1ZSCnt+5QJvizT8ndT2=etZEeAQ@mail.gmail.com
5 years ago
case T_IncrementalSortState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
break;
case T_AggState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecAggInitializeDSM((AggState *) planstate, d->pcxt);
break;
case T_MemoizeState:
Add Result Cache executor node (take 2) Here we add a new executor node type named "Result Cache". The planner can include this node type in the plan to have the executor cache the results from the inner side of parameterized nested loop joins. This allows caching of tuples for sets of parameters so that in the event that the node sees the same parameter values again, it can just return the cached tuples instead of rescanning the inner side of the join all over again. Internally, result cache uses a hash table in order to quickly find tuples that have been previously cached. For certain data sets, this can significantly improve the performance of joins. The best cases for using this new node type are for join problems where a large portion of the tuples from the inner side of the join have no join partner on the outer side of the join. In such cases, hash join would have to hash values that are never looked up, thus bloating the hash table and possibly causing it to multi-batch. Merge joins would have to skip over all of the unmatched rows. If we use a nested loop join with a result cache, then we only cache tuples that have at least one join partner on the outer side of the join. The benefits of using a parameterized nested loop with a result cache increase when there are fewer distinct values being looked up and the number of lookups of each value is large. Also, hash probes to lookup the cache can be much faster than the hash probe in a hash join as it's common that the result cache's hash table is much smaller than the hash join's due to result cache only caching useful tuples rather than all tuples from the inner side of the join. This variation in hash probe performance is more significant when the hash join's hash table no longer fits into the CPU's L3 cache, but the result cache's hash table does. The apparent "random" access of hash buckets with each hash probe can cause a poor L3 cache hit ratio for large hash tables. Smaller hash tables generally perform better. The hash table used for the cache limits itself to not exceeding work_mem * hash_mem_multiplier in size. We maintain a dlist of keys for this cache and when we're adding new tuples and realize we've exceeded the memory budget, we evict cache entries starting with the least recently used ones until we have enough memory to add the new tuples to the cache. For parameterized nested loop joins, we now consider using one of these result cache nodes in between the nested loop node and its inner node. We determine when this might be useful based on cost, which is primarily driven off of what the expected cache hit ratio will be. Estimating the cache hit ratio relies on having good distinct estimates on the nested loop's parameters. For now, the planner will only consider using a result cache for parameterized nested loop joins. This works for both normal joins and also for LATERAL type joins to subqueries. It is possible to use this new node for other uses in the future. For example, to cache results from correlated subqueries. However, that's not done here due to some difficulties obtaining a distinct estimation on the outer plan to calculate the estimated cache hit ratio. Currently we plan the inner plan before planning the outer plan so there is no good way to know if a result cache would be useful or not since we can't estimate the number of times the subplan will be called until the outer plan is generated. The functionality being added here is newly introducing a dependency on the return value of estimate_num_groups() during the join search. Previously, during the join search, we only ever needed to perform selectivity estimations. With this commit, we need to use estimate_num_groups() in order to estimate what the hit ratio on the result cache will be. In simple terms, if we expect 10 distinct values and we expect 1000 outer rows, then we'll estimate the hit ratio to be 99%. Since cache hits are very cheap compared to scanning the underlying nodes on the inner side of the nested loop join, then this will significantly reduce the planner's cost for the join. However, it's fairly easy to see here that things will go bad when estimate_num_groups() incorrectly returns a value that's significantly lower than the actual number of distinct values. If this happens then that may cause us to make use of a nested loop join with a result cache instead of some other join type, such as a merge or hash join. Our distinct estimations have been known to be a source of trouble in the past, so the extra reliance on them here could cause the planner to choose slower plans than it did previous to having this feature. Distinct estimations are also fairly hard to estimate accurately when several tables have been joined already or when a WHERE clause filters out a set of values that are correlated to the expressions we're estimating the number of distinct value for. For now, the costing we perform during query planning for result caches does put quite a bit of faith in the distinct estimations being accurate. When these are accurate then we should generally see faster execution times for plans containing a result cache. However, in the real world, we may find that we need to either change the costings to put less trust in the distinct estimations being accurate or perhaps even disable this feature by default. There's always an element of risk when we teach the query planner to do new tricks that it decides to use that new trick at the wrong time and causes a regression. Users may opt to get the old behavior by turning the feature off using the enable_resultcache GUC. Currently, this is enabled by default. It remains to be seen if we'll maintain that setting for the release. Additionally, the name "Result Cache" is the best name I could think of for this new node at the time I started writing the patch. Nobody seems to strongly dislike the name. A few people did suggest other names but no other name seemed to dominate in the brief discussion that there was about names. Let's allow the beta period to see if the current name pleases enough people. If there's some consensus on a better name, then we can change it before the release. Please see the 2nd discussion link below for the discussion on the "Result Cache" name. Author: David Rowley Reviewed-by: Andy Fan, Justin Pryzby, Zhihong Yu, Hou Zhijie Tested-By: Konstantin Knizhnik Discussion: https://postgr.es/m/CAApHDvrPcQyQdWERGYWx8J%2B2DLUNgXu%2BfOSbQ1UscxrunyXyrQ%40mail.gmail.com Discussion: https://postgr.es/m/CAApHDvq=yQXr5kqhRviT2RhNKwToaWr9JAN5t+5_PzhuRJ3wvg@mail.gmail.com
4 years ago
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecMemoizeInitializeDSM((MemoizeState *) planstate, d->pcxt);
Add Result Cache executor node (take 2) Here we add a new executor node type named "Result Cache". The planner can include this node type in the plan to have the executor cache the results from the inner side of parameterized nested loop joins. This allows caching of tuples for sets of parameters so that in the event that the node sees the same parameter values again, it can just return the cached tuples instead of rescanning the inner side of the join all over again. Internally, result cache uses a hash table in order to quickly find tuples that have been previously cached. For certain data sets, this can significantly improve the performance of joins. The best cases for using this new node type are for join problems where a large portion of the tuples from the inner side of the join have no join partner on the outer side of the join. In such cases, hash join would have to hash values that are never looked up, thus bloating the hash table and possibly causing it to multi-batch. Merge joins would have to skip over all of the unmatched rows. If we use a nested loop join with a result cache, then we only cache tuples that have at least one join partner on the outer side of the join. The benefits of using a parameterized nested loop with a result cache increase when there are fewer distinct values being looked up and the number of lookups of each value is large. Also, hash probes to lookup the cache can be much faster than the hash probe in a hash join as it's common that the result cache's hash table is much smaller than the hash join's due to result cache only caching useful tuples rather than all tuples from the inner side of the join. This variation in hash probe performance is more significant when the hash join's hash table no longer fits into the CPU's L3 cache, but the result cache's hash table does. The apparent "random" access of hash buckets with each hash probe can cause a poor L3 cache hit ratio for large hash tables. Smaller hash tables generally perform better. The hash table used for the cache limits itself to not exceeding work_mem * hash_mem_multiplier in size. We maintain a dlist of keys for this cache and when we're adding new tuples and realize we've exceeded the memory budget, we evict cache entries starting with the least recently used ones until we have enough memory to add the new tuples to the cache. For parameterized nested loop joins, we now consider using one of these result cache nodes in between the nested loop node and its inner node. We determine when this might be useful based on cost, which is primarily driven off of what the expected cache hit ratio will be. Estimating the cache hit ratio relies on having good distinct estimates on the nested loop's parameters. For now, the planner will only consider using a result cache for parameterized nested loop joins. This works for both normal joins and also for LATERAL type joins to subqueries. It is possible to use this new node for other uses in the future. For example, to cache results from correlated subqueries. However, that's not done here due to some difficulties obtaining a distinct estimation on the outer plan to calculate the estimated cache hit ratio. Currently we plan the inner plan before planning the outer plan so there is no good way to know if a result cache would be useful or not since we can't estimate the number of times the subplan will be called until the outer plan is generated. The functionality being added here is newly introducing a dependency on the return value of estimate_num_groups() during the join search. Previously, during the join search, we only ever needed to perform selectivity estimations. With this commit, we need to use estimate_num_groups() in order to estimate what the hit ratio on the result cache will be. In simple terms, if we expect 10 distinct values and we expect 1000 outer rows, then we'll estimate the hit ratio to be 99%. Since cache hits are very cheap compared to scanning the underlying nodes on the inner side of the nested loop join, then this will significantly reduce the planner's cost for the join. However, it's fairly easy to see here that things will go bad when estimate_num_groups() incorrectly returns a value that's significantly lower than the actual number of distinct values. If this happens then that may cause us to make use of a nested loop join with a result cache instead of some other join type, such as a merge or hash join. Our distinct estimations have been known to be a source of trouble in the past, so the extra reliance on them here could cause the planner to choose slower plans than it did previous to having this feature. Distinct estimations are also fairly hard to estimate accurately when several tables have been joined already or when a WHERE clause filters out a set of values that are correlated to the expressions we're estimating the number of distinct value for. For now, the costing we perform during query planning for result caches does put quite a bit of faith in the distinct estimations being accurate. When these are accurate then we should generally see faster execution times for plans containing a result cache. However, in the real world, we may find that we need to either change the costings to put less trust in the distinct estimations being accurate or perhaps even disable this feature by default. There's always an element of risk when we teach the query planner to do new tricks that it decides to use that new trick at the wrong time and causes a regression. Users may opt to get the old behavior by turning the feature off using the enable_resultcache GUC. Currently, this is enabled by default. It remains to be seen if we'll maintain that setting for the release. Additionally, the name "Result Cache" is the best name I could think of for this new node at the time I started writing the patch. Nobody seems to strongly dislike the name. A few people did suggest other names but no other name seemed to dominate in the brief discussion that there was about names. Let's allow the beta period to see if the current name pleases enough people. If there's some consensus on a better name, then we can change it before the release. Please see the 2nd discussion link below for the discussion on the "Result Cache" name. Author: David Rowley Reviewed-by: Andy Fan, Justin Pryzby, Zhihong Yu, Hou Zhijie Tested-By: Konstantin Knizhnik Discussion: https://postgr.es/m/CAApHDvrPcQyQdWERGYWx8J%2B2DLUNgXu%2BfOSbQ1UscxrunyXyrQ%40mail.gmail.com Discussion: https://postgr.es/m/CAApHDvq=yQXr5kqhRviT2RhNKwToaWr9JAN5t+5_PzhuRJ3wvg@mail.gmail.com
4 years ago
break;
default:
break;
}
return planstate_tree_walker(planstate, ExecParallelInitializeDSM, d);
}
/*
* It sets up the response queues for backend workers to return tuples
* to the main backend and start the workers.
*/
static shm_mq_handle **
ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
{
shm_mq_handle **responseq;
char *tqueuespace;
int i;
/* Skip this if no workers. */
if (pcxt->nworkers == 0)
return NULL;
/* Allocate memory for shared memory queue handles. */
responseq = (shm_mq_handle **)
palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
/*
* If not reinitializing, allocate space from the DSM for the queues;
* otherwise, find the already allocated space.
*/
if (!reinitialize)
tqueuespace =
shm_toc_allocate(pcxt->toc,
mul_size(PARALLEL_TUPLE_QUEUE_SIZE,
pcxt->nworkers));
else
tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, false);
/* Create the queues, and become the receiver for each. */
for (i = 0; i < pcxt->nworkers; ++i)
{
shm_mq *mq;
mq = shm_mq_create(tqueuespace +
((Size) i) * PARALLEL_TUPLE_QUEUE_SIZE,
(Size) PARALLEL_TUPLE_QUEUE_SIZE);
shm_mq_set_receiver(mq, MyProc);
responseq[i] = shm_mq_attach(mq, pcxt->seg, NULL);
}
/* Add array of queues to shm_toc, so others can find it. */
if (!reinitialize)
shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
/* Return array of handles. */
return responseq;
}
/*
* Sets up the required infrastructure for backend workers to perform
* execution and return results to the main backend.
*/
ParallelExecutorInfo *
ExecInitParallelPlan(PlanState *planstate, EState *estate,
Bitmapset *sendParams, int nworkers,
int64 tuples_needed)
{
ParallelExecutorInfo *pei;
ParallelContext *pcxt;
ExecParallelEstimateContext e;
ExecParallelInitializeDSMContext d;
FixedParallelExecutorState *fpes;
char *pstmt_data;
char *pstmt_space;
char *paramlistinfo_space;
BufferUsage *bufusage_space;
WalUsage *walusage_space;
SharedExecutorInstrumentation *instrumentation = NULL;
SharedJitInstrumentation *jit_instrumentation = NULL;
int pstmt_len;
int paramlistinfo_len;
int instrumentation_len = 0;
int jit_instrumentation_len = 0;
int instrument_offset = 0;
Size dsa_minsize = dsa_minimum_size();
char *query_string;
int query_len;
Fix failure with initplans used conditionally during EvalPlanQual rechecks. The EvalPlanQual machinery assumes that any initplans (that is, uncorrelated sub-selects) used during an EPQ recheck would have already been evaluated during the main query; this is implicit in the fact that execPlan pointers are not copied into the EPQ estate's es_param_exec_vals. But it's possible for that assumption to fail, if the initplan is only reached conditionally. For example, a sub-select inside a CASE expression could be reached during a recheck when it had not been previously, if the CASE test depends on a column that was just updated. This bug is old, appearing to date back to my rewrite of EvalPlanQual in commit 9f2ee8f28, but was not detected until Kyle Samson reported a case. To fix, force all not-yet-evaluated initplans used within the EPQ plan subtree to be evaluated at the start of the recheck, before entering the EPQ environment. This could be inefficient, if such an initplan is expensive and goes unused again during the recheck --- but that's piling one layer of improbability atop another. It doesn't seem worth adding more complexity to prevent that, at least not in the back branches. It was convenient to use the new-in-v11 ExecEvalParamExecParams function to implement this, but I didn't like either its name or the specifics of its API, so revise that. Back-patch all the way. Rather than rewrite the patch to avoid depending on bms_next_member() in the oldest branches, I chose to back-patch that function into 9.4 and 9.3. (This isn't the first time back-patches have needed that, and it exhausted my patience.) I also chose to back-patch some test cases added by commits 71404af2a and 342a1ffa2 into 9.4 and 9.3, so that the 9.x versions of eval-plan-qual.spec are all the same. Andrew Gierth diagnosed the problem and contributed the added test cases, though the actual code changes are by me. Discussion: https://postgr.es/m/A033A40A-B234-4324-BE37-272279F7B627@tripadvisor.com
7 years ago
/*
* Force any initplan outputs that we're going to pass to workers to be
* evaluated, if they weren't already.
*
* For simplicity, we use the EState's per-output-tuple ExprContext here.
* That risks intra-query memory leakage, since we might pass through here
* many times before that ExprContext gets reset; but ExecSetParamPlan
* doesn't normally leak any memory in the context (see its comments), so
* it doesn't seem worth complicating this function's API to pass it a
* shorter-lived ExprContext. This might need to change someday.
*/
ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
/* Allocate object for return value. */
pei = palloc0(sizeof(ParallelExecutorInfo));
pei->finished = false;
pei->planstate = planstate;
/* Fix up and serialize plan to be sent to workers. */
pstmt_data = ExecSerializePlan(planstate->plan, estate);
/* Create a parallel context. */
6 years ago
pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
pei->pcxt = pcxt;
/*
* Before telling the parallel context to create a dynamic shared memory
* segment, we need to figure out how big it should be. Estimate space
* for the various things we need to store.
*/
/* Estimate space for fixed-size state. */
shm_toc_estimate_chunk(&pcxt->estimator,
sizeof(FixedParallelExecutorState));
shm_toc_estimate_keys(&pcxt->estimator, 1);
/* Estimate space for query text. */
query_len = strlen(estate->es_sourceText);
shm_toc_estimate_chunk(&pcxt->estimator, query_len + 1);
shm_toc_estimate_keys(&pcxt->estimator, 1);
/* Estimate space for serialized PlannedStmt. */
pstmt_len = strlen(pstmt_data) + 1;
shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len);
shm_toc_estimate_keys(&pcxt->estimator, 1);
/* Estimate space for serialized ParamListInfo. */
paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info);
shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
shm_toc_estimate_keys(&pcxt->estimator, 1);
/*
* Estimate space for BufferUsage.
*
* If EXPLAIN is not in use and there are no extensions loaded that care,
* we could skip this. But we have no way of knowing whether anyone's
* looking at pgBufferUsage, so do it unconditionally.
*/
shm_toc_estimate_chunk(&pcxt->estimator,
mul_size(sizeof(BufferUsage), pcxt->nworkers));
shm_toc_estimate_keys(&pcxt->estimator, 1);
/*
* Same thing for WalUsage.
*/
shm_toc_estimate_chunk(&pcxt->estimator,
mul_size(sizeof(WalUsage), pcxt->nworkers));
shm_toc_estimate_keys(&pcxt->estimator, 1);
/* Estimate space for tuple queues. */
shm_toc_estimate_chunk(&pcxt->estimator,
mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
shm_toc_estimate_keys(&pcxt->estimator, 1);
/*
* Give parallel-aware nodes a chance to add to the estimates, and get a
* count of how many PlanState nodes there are.
*/
e.pcxt = pcxt;
e.nnodes = 0;
ExecParallelEstimate(planstate, &e);
/* Estimate space for instrumentation, if required. */
if (estate->es_instrument)
{
instrumentation_len =
offsetof(SharedExecutorInstrumentation, plan_node_id) +
sizeof(int) * e.nnodes;
instrumentation_len = MAXALIGN(instrumentation_len);
instrument_offset = instrumentation_len;
instrumentation_len +=
mul_size(sizeof(Instrumentation),
mul_size(e.nnodes, nworkers));
shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
shm_toc_estimate_keys(&pcxt->estimator, 1);
/* Estimate space for JIT instrumentation, if required. */
if (estate->es_jit_flags != PGJIT_NONE)
{
jit_instrumentation_len =
offsetof(SharedJitInstrumentation, jit_instr) +
sizeof(JitInstrumentation) * nworkers;
shm_toc_estimate_chunk(&pcxt->estimator, jit_instrumentation_len);
shm_toc_estimate_keys(&pcxt->estimator, 1);
}
}
/* Estimate space for DSA area. */
shm_toc_estimate_chunk(&pcxt->estimator, dsa_minsize);
shm_toc_estimate_keys(&pcxt->estimator, 1);
/*
* InitializeParallelDSM() passes the active snapshot to the parallel
* worker, which uses it to set es_snapshot. Make sure we don't set
* es_snapshot differently in the child.
*/
Assert(GetActiveSnapshot() == estate->es_snapshot);
/* Everyone's had a chance to ask for space, so now create the DSM. */
InitializeParallelDSM(pcxt);
/*
* OK, now we have a dynamic shared memory segment, and it should be big
* enough to store all of the data we estimated we would want to put into
* it, plus whatever general stuff (not specifically executor-related) the
* ParallelContext itself needs to store there. None of the space we
* asked for has been allocated or initialized yet, though, so do that.
*/
/* Store fixed-size state. */
fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
fpes->tuples_needed = tuples_needed;
fpes->param_exec = InvalidDsaPointer;
fpes->eflags = estate->es_top_eflags;
fpes->jit_flags = estate->es_jit_flags;
shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
/* Store query string */
query_string = shm_toc_allocate(pcxt->toc, query_len + 1);
memcpy(query_string, estate->es_sourceText, query_len + 1);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, query_string);
/* Store serialized PlannedStmt. */
pstmt_space = shm_toc_allocate(pcxt->toc, pstmt_len);
memcpy(pstmt_space, pstmt_data, pstmt_len);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space);
/* Store serialized ParamListInfo. */
paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
SerializeParamList(estate->es_param_list_info, &paramlistinfo_space);
/* Allocate space for each worker's BufferUsage; no need to initialize. */
bufusage_space = shm_toc_allocate(pcxt->toc,
mul_size(sizeof(BufferUsage), pcxt->nworkers));
shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
pei->buffer_usage = bufusage_space;
/* Same for WalUsage. */
walusage_space = shm_toc_allocate(pcxt->toc,
mul_size(sizeof(WalUsage), pcxt->nworkers));
shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
pei->wal_usage = walusage_space;
/* Set up the tuple queues that the workers will write into. */
pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
/* We don't need the TupleQueueReaders yet, though. */
pei->reader = NULL;
/*
* If instrumentation options were supplied, allocate space for the data.
* It only gets partially initialized here; the rest happens during
* ExecParallelInitializeDSM.
*/
if (estate->es_instrument)
{
Instrumentation *instrument;
int i;
instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
instrumentation->instrument_options = estate->es_instrument;
instrumentation->instrument_offset = instrument_offset;
instrumentation->num_workers = nworkers;
instrumentation->num_plan_nodes = e.nnodes;
instrument = GetInstrumentationArray(instrumentation);
for (i = 0; i < nworkers * e.nnodes; ++i)
InstrInit(&instrument[i], estate->es_instrument);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
instrumentation);
pei->instrumentation = instrumentation;
if (estate->es_jit_flags != PGJIT_NONE)
{
jit_instrumentation = shm_toc_allocate(pcxt->toc,
jit_instrumentation_len);
jit_instrumentation->num_workers = nworkers;
memset(jit_instrumentation->jit_instr, 0,
sizeof(JitInstrumentation) * nworkers);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
jit_instrumentation);
pei->jit_instrumentation = jit_instrumentation;
}
}
/*
* Create a DSA area that can be used by the leader and all workers.
* (However, if we failed to create a DSM and are using private memory
* instead, then skip this.)
*/
if (pcxt->seg != NULL)
{
char *area_space;
area_space = shm_toc_allocate(pcxt->toc, dsa_minsize);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_DSA, area_space);
pei->area = dsa_create_in_place(area_space, dsa_minsize,
LWTRANCHE_PARALLEL_QUERY_DSA,
pcxt->seg);
/*
* Serialize parameters, if any, using DSA storage. We don't dare use
* the main parallel query DSM for this because we might relaunch
* workers after the values have changed (and thus the amount of
* storage required has changed).
*/
if (!bms_is_empty(sendParams))
{
pei->param_exec = SerializeParamExecParams(estate, sendParams,
pei->area);
fpes->param_exec = pei->param_exec;
}
}
/*
* Give parallel-aware nodes a chance to initialize their shared data.
* This also initializes the elements of instrumentation->ps_instrument,
* if it exists.
*/
d.pcxt = pcxt;
d.instrumentation = instrumentation;
d.nnodes = 0;
/* Install our DSA area while initializing the plan. */
estate->es_query_dsa = pei->area;
ExecParallelInitializeDSM(planstate, &d);
estate->es_query_dsa = NULL;
/*
* Make sure that the world hasn't shifted under our feet. This could
* probably just be an Assert(), but let's be conservative for now.
*/
if (e.nnodes != d.nnodes)
elog(ERROR, "inconsistent count of PlanState nodes");
/* OK, we're ready to rock and roll. */
return pei;
}
/*
* Set up tuple queue readers to read the results of a parallel subplan.
*
* This is separate from ExecInitParallelPlan() because we can launch the
* worker processes and let them start doing something before we do this.
*/
void
ExecParallelCreateReaders(ParallelExecutorInfo *pei)
{
int nworkers = pei->pcxt->nworkers_launched;
int i;
Assert(pei->reader == NULL);
if (nworkers > 0)
{
pei->reader = (TupleQueueReader **)
palloc(nworkers * sizeof(TupleQueueReader *));
for (i = 0; i < nworkers; i++)
{
shm_mq_set_handle(pei->tqueue[i],
pei->pcxt->worker[i].bgwhandle);
pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i]);
}
}
}
/*
* Re-initialize the parallel executor shared memory state before launching
* a fresh batch of workers.
*/
void
ExecParallelReinitialize(PlanState *planstate,
ParallelExecutorInfo *pei,
Bitmapset *sendParams)
{
EState *estate = planstate->state;
FixedParallelExecutorState *fpes;
/* Old workers must already be shut down */
Assert(pei->finished);
Fix failure with initplans used conditionally during EvalPlanQual rechecks. The EvalPlanQual machinery assumes that any initplans (that is, uncorrelated sub-selects) used during an EPQ recheck would have already been evaluated during the main query; this is implicit in the fact that execPlan pointers are not copied into the EPQ estate's es_param_exec_vals. But it's possible for that assumption to fail, if the initplan is only reached conditionally. For example, a sub-select inside a CASE expression could be reached during a recheck when it had not been previously, if the CASE test depends on a column that was just updated. This bug is old, appearing to date back to my rewrite of EvalPlanQual in commit 9f2ee8f28, but was not detected until Kyle Samson reported a case. To fix, force all not-yet-evaluated initplans used within the EPQ plan subtree to be evaluated at the start of the recheck, before entering the EPQ environment. This could be inefficient, if such an initplan is expensive and goes unused again during the recheck --- but that's piling one layer of improbability atop another. It doesn't seem worth adding more complexity to prevent that, at least not in the back branches. It was convenient to use the new-in-v11 ExecEvalParamExecParams function to implement this, but I didn't like either its name or the specifics of its API, so revise that. Back-patch all the way. Rather than rewrite the patch to avoid depending on bms_next_member() in the oldest branches, I chose to back-patch that function into 9.4 and 9.3. (This isn't the first time back-patches have needed that, and it exhausted my patience.) I also chose to back-patch some test cases added by commits 71404af2a and 342a1ffa2 into 9.4 and 9.3, so that the 9.x versions of eval-plan-qual.spec are all the same. Andrew Gierth diagnosed the problem and contributed the added test cases, though the actual code changes are by me. Discussion: https://postgr.es/m/A033A40A-B234-4324-BE37-272279F7B627@tripadvisor.com
7 years ago
/*
* Force any initplan outputs that we're going to pass to workers to be
* evaluated, if they weren't already (see comments in
* ExecInitParallelPlan).
*/
ExecSetParamPlanMulti(sendParams, GetPerTupleExprContext(estate));
ReinitializeParallelDSM(pei->pcxt);
pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
pei->reader = NULL;
pei->finished = false;
fpes = shm_toc_lookup(pei->pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
/* Free any serialized parameters from the last round. */
if (DsaPointerIsValid(fpes->param_exec))
{
dsa_free(pei->area, fpes->param_exec);
fpes->param_exec = InvalidDsaPointer;
}
/* Serialize current parameter values if required. */
if (!bms_is_empty(sendParams))
{
pei->param_exec = SerializeParamExecParams(estate, sendParams,
pei->area);
fpes->param_exec = pei->param_exec;
}
/* Traverse plan tree and let each child node reset associated state. */
estate->es_query_dsa = pei->area;
ExecParallelReInitializeDSM(planstate, pei->pcxt);
estate->es_query_dsa = NULL;
}
/*
* Traverse plan tree to reinitialize per-node dynamic shared memory state
*/
static bool
ExecParallelReInitializeDSM(PlanState *planstate,
ParallelContext *pcxt)
{
if (planstate == NULL)
return false;
/*
* Call reinitializers for DSM-using plan nodes.
*/
switch (nodeTag(planstate))
{
case T_SeqScanState:
if (planstate->plan->parallel_aware)
ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
pcxt);
break;
case T_IndexScanState:
if (planstate->plan->parallel_aware)
ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
pcxt);
break;
case T_IndexOnlyScanState:
if (planstate->plan->parallel_aware)
ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
pcxt);
break;
case T_ForeignScanState:
if (planstate->plan->parallel_aware)
ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
pcxt);
break;
case T_AppendState:
if (planstate->plan->parallel_aware)
ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
break;
case T_CustomScanState:
if (planstate->plan->parallel_aware)
ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
pcxt);
break;
case T_BitmapHeapScanState:
if (planstate->plan->parallel_aware)
ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
pcxt);
break;
Add parallel-aware hash joins. Introduce parallel-aware hash joins that appear in EXPLAIN plans as Parallel Hash Join with Parallel Hash. While hash joins could already appear in parallel queries, they were previously always parallel-oblivious and had a partial subplan only on the outer side, meaning that the work of the inner subplan was duplicated in every worker. After this commit, the planner will consider using a partial subplan on the inner side too, using the Parallel Hash node to divide the work over the available CPU cores and combine its results in shared memory. If the join needs to be split into multiple batches in order to respect work_mem, then workers process different batches as much as possible and then work together on the remaining batches. The advantages of a parallel-aware hash join over a parallel-oblivious hash join used in a parallel query are that it: * avoids wasting memory on duplicated hash tables * avoids wasting disk space on duplicated batch files * divides the work of building the hash table over the CPUs One disadvantage is that there is some communication between the participating CPUs which might outweigh the benefits of parallelism in the case of small hash tables. This is avoided by the planner's existing reluctance to supply partial plans for small scans, but it may be necessary to estimate synchronization costs in future if that situation changes. Another is that outer batch 0 must be written to disk if multiple batches are required. A potential future advantage of parallel-aware hash joins is that right and full outer joins could be supported, since there is a single set of matched bits for each hashtable, but that is not yet implemented. A new GUC enable_parallel_hash is defined to control the feature, defaulting to on. Author: Thomas Munro Reviewed-By: Andres Freund, Robert Haas Tested-By: Rafia Sabih, Prabhat Sahu Discussion: https://postgr.es/m/CAEepm=2W=cOkiZxcg6qiFQP-dHUe09aqTrEMM7yJDrHMhDv_RA@mail.gmail.com https://postgr.es/m/CAEepm=37HKyJ4U6XOLi=JgfSHM3o6B-GaeO-6hkOmneTDkH+Uw@mail.gmail.com
8 years ago
case T_HashJoinState:
if (planstate->plan->parallel_aware)
ExecHashJoinReInitializeDSM((HashJoinState *) planstate,
pcxt);
break;
case T_HashState:
case T_SortState:
Implement Incremental Sort Incremental Sort is an optimized variant of multikey sort for cases when the input is already sorted by a prefix of the requested sort keys. For example when the relation is already sorted by (key1, key2) and we need to sort it by (key1, key2, key3) we can simply split the input rows into groups having equal values in (key1, key2), and only sort/compare the remaining column key3. This has a number of benefits: - Reduced memory consumption, because only a single group (determined by values in the sorted prefix) needs to be kept in memory. This may also eliminate the need to spill to disk. - Lower startup cost, because Incremental Sort produce results after each prefix group, which is beneficial for plans where startup cost matters (like for example queries with LIMIT clause). We consider both Sort and Incremental Sort, and decide based on costing. The implemented algorithm operates in two different modes: - Fetching a minimum number of tuples without check of equality on the prefix keys, and sorting on all columns when safe. - Fetching all tuples for a single prefix group and then sorting by comparing only the remaining (non-prefix) keys. We always start in the first mode, and employ a heuristic to switch into the second mode if we believe it's beneficial - the goal is to minimize the number of unnecessary comparions while keeping memory consumption below work_mem. This is a very old patch series. The idea was originally proposed by Alexander Korotkov back in 2013, and then revived in 2017. In 2018 the patch was taken over by James Coleman, who wrote and rewrote most of the current code. There were many reviewers/contributors since 2013 - I've done my best to pick the most active ones, and listed them in this commit message. Author: James Coleman, Alexander Korotkov Reviewed-by: Tomas Vondra, Andreas Karlsson, Marti Raudsepp, Peter Geoghegan, Robert Haas, Thomas Munro, Antonin Houska, Andres Freund, Alexander Kuzmenkov Discussion: https://postgr.es/m/CAPpHfdscOX5an71nHd8WSUH6GNOCf=V7wgDaTXdDd9=goN-gfA@mail.gmail.com Discussion: https://postgr.es/m/CAPpHfds1waRZ=NOmueYq0sx1ZSCnt+5QJvizT8ndT2=etZEeAQ@mail.gmail.com
5 years ago
case T_IncrementalSortState:
case T_MemoizeState:
/* these nodes have DSM state, but no reinitialization is required */
break;
default:
break;
}
return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
}
/*
* Copy instrumentation information about this node and its descendants from
* dynamic shared memory.
*/
static bool
ExecParallelRetrieveInstrumentation(PlanState *planstate,
SharedExecutorInstrumentation *instrumentation)
{
Instrumentation *instrument;
int i;
int n;
int ibytes;
int plan_node_id = planstate->plan->plan_node_id;
MemoryContext oldcontext;
/* Find the instrumentation for this node. */
for (i = 0; i < instrumentation->num_plan_nodes; ++i)
if (instrumentation->plan_node_id[i] == plan_node_id)
break;
if (i >= instrumentation->num_plan_nodes)
elog(ERROR, "plan node %d not found", plan_node_id);
/* Accumulate the statistics from all workers. */
instrument = GetInstrumentationArray(instrumentation);
instrument += i * instrumentation->num_workers;
for (n = 0; n < instrumentation->num_workers; ++n)
InstrAggNode(planstate->instrument, &instrument[n]);
/*
* Also store the per-worker detail.
*
* Worker instrumentation should be allocated in the same context as the
* regular instrumentation information, which is the per-query context.
* Switch into per-query memory context.
*/
oldcontext = MemoryContextSwitchTo(planstate->state->es_query_cxt);
ibytes = mul_size(instrumentation->num_workers, sizeof(Instrumentation));
planstate->worker_instrument =
palloc(ibytes + offsetof(WorkerInstrumentation, instrument));
MemoryContextSwitchTo(oldcontext);
planstate->worker_instrument->num_workers = instrumentation->num_workers;
memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
/* Perform any node-type-specific work that needs to be done. */
switch (nodeTag(planstate))
{
case T_SortState:
ExecSortRetrieveInstrumentation((SortState *) planstate);
break;
Implement Incremental Sort Incremental Sort is an optimized variant of multikey sort for cases when the input is already sorted by a prefix of the requested sort keys. For example when the relation is already sorted by (key1, key2) and we need to sort it by (key1, key2, key3) we can simply split the input rows into groups having equal values in (key1, key2), and only sort/compare the remaining column key3. This has a number of benefits: - Reduced memory consumption, because only a single group (determined by values in the sorted prefix) needs to be kept in memory. This may also eliminate the need to spill to disk. - Lower startup cost, because Incremental Sort produce results after each prefix group, which is beneficial for plans where startup cost matters (like for example queries with LIMIT clause). We consider both Sort and Incremental Sort, and decide based on costing. The implemented algorithm operates in two different modes: - Fetching a minimum number of tuples without check of equality on the prefix keys, and sorting on all columns when safe. - Fetching all tuples for a single prefix group and then sorting by comparing only the remaining (non-prefix) keys. We always start in the first mode, and employ a heuristic to switch into the second mode if we believe it's beneficial - the goal is to minimize the number of unnecessary comparions while keeping memory consumption below work_mem. This is a very old patch series. The idea was originally proposed by Alexander Korotkov back in 2013, and then revived in 2017. In 2018 the patch was taken over by James Coleman, who wrote and rewrote most of the current code. There were many reviewers/contributors since 2013 - I've done my best to pick the most active ones, and listed them in this commit message. Author: James Coleman, Alexander Korotkov Reviewed-by: Tomas Vondra, Andreas Karlsson, Marti Raudsepp, Peter Geoghegan, Robert Haas, Thomas Munro, Antonin Houska, Andres Freund, Alexander Kuzmenkov Discussion: https://postgr.es/m/CAPpHfdscOX5an71nHd8WSUH6GNOCf=V7wgDaTXdDd9=goN-gfA@mail.gmail.com Discussion: https://postgr.es/m/CAPpHfds1waRZ=NOmueYq0sx1ZSCnt+5QJvizT8ndT2=etZEeAQ@mail.gmail.com
5 years ago
case T_IncrementalSortState:
ExecIncrementalSortRetrieveInstrumentation((IncrementalSortState *) planstate);
break;
case T_HashState:
ExecHashRetrieveInstrumentation((HashState *) planstate);
break;
case T_AggState:
ExecAggRetrieveInstrumentation((AggState *) planstate);
break;
case T_MemoizeState:
ExecMemoizeRetrieveInstrumentation((MemoizeState *) planstate);
Add Result Cache executor node (take 2) Here we add a new executor node type named "Result Cache". The planner can include this node type in the plan to have the executor cache the results from the inner side of parameterized nested loop joins. This allows caching of tuples for sets of parameters so that in the event that the node sees the same parameter values again, it can just return the cached tuples instead of rescanning the inner side of the join all over again. Internally, result cache uses a hash table in order to quickly find tuples that have been previously cached. For certain data sets, this can significantly improve the performance of joins. The best cases for using this new node type are for join problems where a large portion of the tuples from the inner side of the join have no join partner on the outer side of the join. In such cases, hash join would have to hash values that are never looked up, thus bloating the hash table and possibly causing it to multi-batch. Merge joins would have to skip over all of the unmatched rows. If we use a nested loop join with a result cache, then we only cache tuples that have at least one join partner on the outer side of the join. The benefits of using a parameterized nested loop with a result cache increase when there are fewer distinct values being looked up and the number of lookups of each value is large. Also, hash probes to lookup the cache can be much faster than the hash probe in a hash join as it's common that the result cache's hash table is much smaller than the hash join's due to result cache only caching useful tuples rather than all tuples from the inner side of the join. This variation in hash probe performance is more significant when the hash join's hash table no longer fits into the CPU's L3 cache, but the result cache's hash table does. The apparent "random" access of hash buckets with each hash probe can cause a poor L3 cache hit ratio for large hash tables. Smaller hash tables generally perform better. The hash table used for the cache limits itself to not exceeding work_mem * hash_mem_multiplier in size. We maintain a dlist of keys for this cache and when we're adding new tuples and realize we've exceeded the memory budget, we evict cache entries starting with the least recently used ones until we have enough memory to add the new tuples to the cache. For parameterized nested loop joins, we now consider using one of these result cache nodes in between the nested loop node and its inner node. We determine when this might be useful based on cost, which is primarily driven off of what the expected cache hit ratio will be. Estimating the cache hit ratio relies on having good distinct estimates on the nested loop's parameters. For now, the planner will only consider using a result cache for parameterized nested loop joins. This works for both normal joins and also for LATERAL type joins to subqueries. It is possible to use this new node for other uses in the future. For example, to cache results from correlated subqueries. However, that's not done here due to some difficulties obtaining a distinct estimation on the outer plan to calculate the estimated cache hit ratio. Currently we plan the inner plan before planning the outer plan so there is no good way to know if a result cache would be useful or not since we can't estimate the number of times the subplan will be called until the outer plan is generated. The functionality being added here is newly introducing a dependency on the return value of estimate_num_groups() during the join search. Previously, during the join search, we only ever needed to perform selectivity estimations. With this commit, we need to use estimate_num_groups() in order to estimate what the hit ratio on the result cache will be. In simple terms, if we expect 10 distinct values and we expect 1000 outer rows, then we'll estimate the hit ratio to be 99%. Since cache hits are very cheap compared to scanning the underlying nodes on the inner side of the nested loop join, then this will significantly reduce the planner's cost for the join. However, it's fairly easy to see here that things will go bad when estimate_num_groups() incorrectly returns a value that's significantly lower than the actual number of distinct values. If this happens then that may cause us to make use of a nested loop join with a result cache instead of some other join type, such as a merge or hash join. Our distinct estimations have been known to be a source of trouble in the past, so the extra reliance on them here could cause the planner to choose slower plans than it did previous to having this feature. Distinct estimations are also fairly hard to estimate accurately when several tables have been joined already or when a WHERE clause filters out a set of values that are correlated to the expressions we're estimating the number of distinct value for. For now, the costing we perform during query planning for result caches does put quite a bit of faith in the distinct estimations being accurate. When these are accurate then we should generally see faster execution times for plans containing a result cache. However, in the real world, we may find that we need to either change the costings to put less trust in the distinct estimations being accurate or perhaps even disable this feature by default. There's always an element of risk when we teach the query planner to do new tricks that it decides to use that new trick at the wrong time and causes a regression. Users may opt to get the old behavior by turning the feature off using the enable_resultcache GUC. Currently, this is enabled by default. It remains to be seen if we'll maintain that setting for the release. Additionally, the name "Result Cache" is the best name I could think of for this new node at the time I started writing the patch. Nobody seems to strongly dislike the name. A few people did suggest other names but no other name seemed to dominate in the brief discussion that there was about names. Let's allow the beta period to see if the current name pleases enough people. If there's some consensus on a better name, then we can change it before the release. Please see the 2nd discussion link below for the discussion on the "Result Cache" name. Author: David Rowley Reviewed-by: Andy Fan, Justin Pryzby, Zhihong Yu, Hou Zhijie Tested-By: Konstantin Knizhnik Discussion: https://postgr.es/m/CAApHDvrPcQyQdWERGYWx8J%2B2DLUNgXu%2BfOSbQ1UscxrunyXyrQ%40mail.gmail.com Discussion: https://postgr.es/m/CAApHDvq=yQXr5kqhRviT2RhNKwToaWr9JAN5t+5_PzhuRJ3wvg@mail.gmail.com
4 years ago
break;
case T_BitmapHeapScanState:
ExecBitmapHeapRetrieveInstrumentation((BitmapHeapScanState *) planstate);
break;
default:
break;
}
return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
instrumentation);
}
/*
* Add up the workers' JIT instrumentation from dynamic shared memory.
*/
static void
ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
SharedJitInstrumentation *shared_jit)
{
JitInstrumentation *combined;
int ibytes;
int n;
/*
* Accumulate worker JIT instrumentation into the combined JIT
* instrumentation, allocating it if required.
*/
if (!planstate->state->es_jit_worker_instr)
planstate->state->es_jit_worker_instr =
MemoryContextAllocZero(planstate->state->es_query_cxt, sizeof(JitInstrumentation));
combined = planstate->state->es_jit_worker_instr;
/* Accumulate all the workers' instrumentations. */
for (n = 0; n < shared_jit->num_workers; ++n)
InstrJitAgg(combined, &shared_jit->jit_instr[n]);
/*
* Store the per-worker detail.
*
* Similar to ExecParallelRetrieveInstrumentation(), allocate the
* instrumentation in per-query context.
*/
ibytes = offsetof(SharedJitInstrumentation, jit_instr)
+ mul_size(shared_jit->num_workers, sizeof(JitInstrumentation));
planstate->worker_jit_instrument =
MemoryContextAlloc(planstate->state->es_query_cxt, ibytes);
memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
}
/*
* Finish parallel execution. We wait for parallel workers to finish, and
* accumulate their buffer/WAL usage.
*/
void
ExecParallelFinish(ParallelExecutorInfo *pei)
{
int nworkers = pei->pcxt->nworkers_launched;
int i;
/* Make this be a no-op if called twice in a row. */
if (pei->finished)
return;
/*
* Detach from tuple queues ASAP, so that any still-active workers will
* notice that no further results are wanted.
*/
if (pei->tqueue != NULL)
{
for (i = 0; i < nworkers; i++)
shm_mq_detach(pei->tqueue[i]);
pfree(pei->tqueue);
pei->tqueue = NULL;
}
/*
* While we're waiting for the workers to finish, let's get rid of the
* tuple queue readers. (Any other local cleanup could be done here too.)
*/
if (pei->reader != NULL)
{
for (i = 0; i < nworkers; i++)
DestroyTupleQueueReader(pei->reader[i]);
pfree(pei->reader);
pei->reader = NULL;
}
/* Now wait for the workers to finish. */
WaitForParallelWorkersToFinish(pei->pcxt);
/*
* Next, accumulate buffer/WAL usage. (This must wait for the workers to
* finish, or we might get incomplete data.)
*/
for (i = 0; i < nworkers; i++)
InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
pei->finished = true;
}
/*
* Accumulate instrumentation, and then clean up whatever ParallelExecutorInfo
* resources still exist after ExecParallelFinish. We separate these
* routines because someone might want to examine the contents of the DSM
* after ExecParallelFinish and before calling this routine.
*/
void
ExecParallelCleanup(ParallelExecutorInfo *pei)
{
/* Accumulate instrumentation, if any. */
if (pei->instrumentation)
ExecParallelRetrieveInstrumentation(pei->planstate,
pei->instrumentation);
/* Accumulate JIT instrumentation, if any. */
if (pei->jit_instrumentation)
ExecParallelRetrieveJitInstrumentation(pei->planstate,
pei->jit_instrumentation);
/* Free any serialized parameters. */
if (DsaPointerIsValid(pei->param_exec))
{
dsa_free(pei->area, pei->param_exec);
pei->param_exec = InvalidDsaPointer;
}
if (pei->area != NULL)
{
dsa_detach(pei->area);
pei->area = NULL;
}
if (pei->pcxt != NULL)
{
DestroyParallelContext(pei->pcxt);
pei->pcxt = NULL;
}
pfree(pei);
}
/*
* Create a DestReceiver to write tuples we produce to the shm_mq designated
* for that purpose.
*/
static DestReceiver *
ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc)
{
char *mqspace;
shm_mq *mq;
mqspace = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE, false);
mqspace += ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE;
mq = (shm_mq *) mqspace;
shm_mq_set_sender(mq, MyProc);
return CreateTupleQueueDestReceiver(shm_mq_attach(mq, seg, NULL));
}
/*
* Create a QueryDesc for the PlannedStmt we are to execute, and return it.
*/
static QueryDesc *
ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver,
int instrument_options)
{
char *pstmtspace;
char *paramspace;
PlannedStmt *pstmt;
ParamListInfo paramLI;
char *queryString;
/* Get the query string from shared memory */
queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false);
/* Reconstruct leader-supplied PlannedStmt. */
pstmtspace = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT, false);
pstmt = (PlannedStmt *) stringToNode(pstmtspace);
/* Reconstruct ParamListInfo. */
paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false);
paramLI = RestoreParamList(&paramspace);
/* Create a QueryDesc for the query. */
return CreateQueryDesc(pstmt,
queryString,
GetActiveSnapshot(), InvalidSnapshot,
receiver, paramLI, NULL, instrument_options);
}
/*
* Copy instrumentation information from this node and its descendants into
* dynamic shared memory, so that the parallel leader can retrieve it.
*/
static bool
ExecParallelReportInstrumentation(PlanState *planstate,
SharedExecutorInstrumentation *instrumentation)
{
int i;
int plan_node_id = planstate->plan->plan_node_id;
Instrumentation *instrument;
InstrEndLoop(planstate->instrument);
/*
* If we shuffled the plan_node_id values in ps_instrument into sorted
* order, we could use binary search here. This might matter someday if
* we're pushing down sufficiently large plan trees. For now, do it the
* slow, dumb way.
*/
for (i = 0; i < instrumentation->num_plan_nodes; ++i)
if (instrumentation->plan_node_id[i] == plan_node_id)
break;
if (i >= instrumentation->num_plan_nodes)
elog(ERROR, "plan node %d not found", plan_node_id);
/*
* Add our statistics to the per-node, per-worker totals. It's possible
* that this could happen more than once if we relaunched workers.
*/
instrument = GetInstrumentationArray(instrumentation);
instrument += i * instrumentation->num_workers;
Assert(IsParallelWorker());
Assert(ParallelWorkerNumber < instrumentation->num_workers);
InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
instrumentation);
}
/*
* Initialize the PlanState and its descendants with the information
* retrieved from shared memory. This has to be done once the PlanState
* is allocated and initialized by executor; that is, after ExecutorStart().
*/
static bool
ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
{
if (planstate == NULL)
return false;
switch (nodeTag(planstate))
{
case T_SeqScanState:
if (planstate->plan->parallel_aware)
ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
break;
case T_IndexScanState:
if (planstate->plan->parallel_aware)
ExecIndexScanInitializeWorker((IndexScanState *) planstate,
pwcxt);
break;
case T_IndexOnlyScanState:
if (planstate->plan->parallel_aware)
ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
pwcxt);
break;
case T_ForeignScanState:
if (planstate->plan->parallel_aware)
ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
pwcxt);
break;
case T_AppendState:
if (planstate->plan->parallel_aware)
ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
break;
case T_CustomScanState:
if (planstate->plan->parallel_aware)
ExecCustomScanInitializeWorker((CustomScanState *) planstate,
pwcxt);
break;
case T_BitmapHeapScanState:
if (planstate->plan->parallel_aware)
ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate,
pwcxt);
break;
Add parallel-aware hash joins. Introduce parallel-aware hash joins that appear in EXPLAIN plans as Parallel Hash Join with Parallel Hash. While hash joins could already appear in parallel queries, they were previously always parallel-oblivious and had a partial subplan only on the outer side, meaning that the work of the inner subplan was duplicated in every worker. After this commit, the planner will consider using a partial subplan on the inner side too, using the Parallel Hash node to divide the work over the available CPU cores and combine its results in shared memory. If the join needs to be split into multiple batches in order to respect work_mem, then workers process different batches as much as possible and then work together on the remaining batches. The advantages of a parallel-aware hash join over a parallel-oblivious hash join used in a parallel query are that it: * avoids wasting memory on duplicated hash tables * avoids wasting disk space on duplicated batch files * divides the work of building the hash table over the CPUs One disadvantage is that there is some communication between the participating CPUs which might outweigh the benefits of parallelism in the case of small hash tables. This is avoided by the planner's existing reluctance to supply partial plans for small scans, but it may be necessary to estimate synchronization costs in future if that situation changes. Another is that outer batch 0 must be written to disk if multiple batches are required. A potential future advantage of parallel-aware hash joins is that right and full outer joins could be supported, since there is a single set of matched bits for each hashtable, but that is not yet implemented. A new GUC enable_parallel_hash is defined to control the feature, defaulting to on. Author: Thomas Munro Reviewed-By: Andres Freund, Robert Haas Tested-By: Rafia Sabih, Prabhat Sahu Discussion: https://postgr.es/m/CAEepm=2W=cOkiZxcg6qiFQP-dHUe09aqTrEMM7yJDrHMhDv_RA@mail.gmail.com https://postgr.es/m/CAEepm=37HKyJ4U6XOLi=JgfSHM3o6B-GaeO-6hkOmneTDkH+Uw@mail.gmail.com
8 years ago
case T_HashJoinState:
if (planstate->plan->parallel_aware)
ExecHashJoinInitializeWorker((HashJoinState *) planstate,
pwcxt);
break;
case T_HashState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecHashInitializeWorker((HashState *) planstate, pwcxt);
break;
case T_SortState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecSortInitializeWorker((SortState *) planstate, pwcxt);
break;
Implement Incremental Sort Incremental Sort is an optimized variant of multikey sort for cases when the input is already sorted by a prefix of the requested sort keys. For example when the relation is already sorted by (key1, key2) and we need to sort it by (key1, key2, key3) we can simply split the input rows into groups having equal values in (key1, key2), and only sort/compare the remaining column key3. This has a number of benefits: - Reduced memory consumption, because only a single group (determined by values in the sorted prefix) needs to be kept in memory. This may also eliminate the need to spill to disk. - Lower startup cost, because Incremental Sort produce results after each prefix group, which is beneficial for plans where startup cost matters (like for example queries with LIMIT clause). We consider both Sort and Incremental Sort, and decide based on costing. The implemented algorithm operates in two different modes: - Fetching a minimum number of tuples without check of equality on the prefix keys, and sorting on all columns when safe. - Fetching all tuples for a single prefix group and then sorting by comparing only the remaining (non-prefix) keys. We always start in the first mode, and employ a heuristic to switch into the second mode if we believe it's beneficial - the goal is to minimize the number of unnecessary comparions while keeping memory consumption below work_mem. This is a very old patch series. The idea was originally proposed by Alexander Korotkov back in 2013, and then revived in 2017. In 2018 the patch was taken over by James Coleman, who wrote and rewrote most of the current code. There were many reviewers/contributors since 2013 - I've done my best to pick the most active ones, and listed them in this commit message. Author: James Coleman, Alexander Korotkov Reviewed-by: Tomas Vondra, Andreas Karlsson, Marti Raudsepp, Peter Geoghegan, Robert Haas, Thomas Munro, Antonin Houska, Andres Freund, Alexander Kuzmenkov Discussion: https://postgr.es/m/CAPpHfdscOX5an71nHd8WSUH6GNOCf=V7wgDaTXdDd9=goN-gfA@mail.gmail.com Discussion: https://postgr.es/m/CAPpHfds1waRZ=NOmueYq0sx1ZSCnt+5QJvizT8ndT2=etZEeAQ@mail.gmail.com
5 years ago
case T_IncrementalSortState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
pwcxt);
break;
case T_AggState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecAggInitializeWorker((AggState *) planstate, pwcxt);
break;
case T_MemoizeState:
Add Result Cache executor node (take 2) Here we add a new executor node type named "Result Cache". The planner can include this node type in the plan to have the executor cache the results from the inner side of parameterized nested loop joins. This allows caching of tuples for sets of parameters so that in the event that the node sees the same parameter values again, it can just return the cached tuples instead of rescanning the inner side of the join all over again. Internally, result cache uses a hash table in order to quickly find tuples that have been previously cached. For certain data sets, this can significantly improve the performance of joins. The best cases for using this new node type are for join problems where a large portion of the tuples from the inner side of the join have no join partner on the outer side of the join. In such cases, hash join would have to hash values that are never looked up, thus bloating the hash table and possibly causing it to multi-batch. Merge joins would have to skip over all of the unmatched rows. If we use a nested loop join with a result cache, then we only cache tuples that have at least one join partner on the outer side of the join. The benefits of using a parameterized nested loop with a result cache increase when there are fewer distinct values being looked up and the number of lookups of each value is large. Also, hash probes to lookup the cache can be much faster than the hash probe in a hash join as it's common that the result cache's hash table is much smaller than the hash join's due to result cache only caching useful tuples rather than all tuples from the inner side of the join. This variation in hash probe performance is more significant when the hash join's hash table no longer fits into the CPU's L3 cache, but the result cache's hash table does. The apparent "random" access of hash buckets with each hash probe can cause a poor L3 cache hit ratio for large hash tables. Smaller hash tables generally perform better. The hash table used for the cache limits itself to not exceeding work_mem * hash_mem_multiplier in size. We maintain a dlist of keys for this cache and when we're adding new tuples and realize we've exceeded the memory budget, we evict cache entries starting with the least recently used ones until we have enough memory to add the new tuples to the cache. For parameterized nested loop joins, we now consider using one of these result cache nodes in between the nested loop node and its inner node. We determine when this might be useful based on cost, which is primarily driven off of what the expected cache hit ratio will be. Estimating the cache hit ratio relies on having good distinct estimates on the nested loop's parameters. For now, the planner will only consider using a result cache for parameterized nested loop joins. This works for both normal joins and also for LATERAL type joins to subqueries. It is possible to use this new node for other uses in the future. For example, to cache results from correlated subqueries. However, that's not done here due to some difficulties obtaining a distinct estimation on the outer plan to calculate the estimated cache hit ratio. Currently we plan the inner plan before planning the outer plan so there is no good way to know if a result cache would be useful or not since we can't estimate the number of times the subplan will be called until the outer plan is generated. The functionality being added here is newly introducing a dependency on the return value of estimate_num_groups() during the join search. Previously, during the join search, we only ever needed to perform selectivity estimations. With this commit, we need to use estimate_num_groups() in order to estimate what the hit ratio on the result cache will be. In simple terms, if we expect 10 distinct values and we expect 1000 outer rows, then we'll estimate the hit ratio to be 99%. Since cache hits are very cheap compared to scanning the underlying nodes on the inner side of the nested loop join, then this will significantly reduce the planner's cost for the join. However, it's fairly easy to see here that things will go bad when estimate_num_groups() incorrectly returns a value that's significantly lower than the actual number of distinct values. If this happens then that may cause us to make use of a nested loop join with a result cache instead of some other join type, such as a merge or hash join. Our distinct estimations have been known to be a source of trouble in the past, so the extra reliance on them here could cause the planner to choose slower plans than it did previous to having this feature. Distinct estimations are also fairly hard to estimate accurately when several tables have been joined already or when a WHERE clause filters out a set of values that are correlated to the expressions we're estimating the number of distinct value for. For now, the costing we perform during query planning for result caches does put quite a bit of faith in the distinct estimations being accurate. When these are accurate then we should generally see faster execution times for plans containing a result cache. However, in the real world, we may find that we need to either change the costings to put less trust in the distinct estimations being accurate or perhaps even disable this feature by default. There's always an element of risk when we teach the query planner to do new tricks that it decides to use that new trick at the wrong time and causes a regression. Users may opt to get the old behavior by turning the feature off using the enable_resultcache GUC. Currently, this is enabled by default. It remains to be seen if we'll maintain that setting for the release. Additionally, the name "Result Cache" is the best name I could think of for this new node at the time I started writing the patch. Nobody seems to strongly dislike the name. A few people did suggest other names but no other name seemed to dominate in the brief discussion that there was about names. Let's allow the beta period to see if the current name pleases enough people. If there's some consensus on a better name, then we can change it before the release. Please see the 2nd discussion link below for the discussion on the "Result Cache" name. Author: David Rowley Reviewed-by: Andy Fan, Justin Pryzby, Zhihong Yu, Hou Zhijie Tested-By: Konstantin Knizhnik Discussion: https://postgr.es/m/CAApHDvrPcQyQdWERGYWx8J%2B2DLUNgXu%2BfOSbQ1UscxrunyXyrQ%40mail.gmail.com Discussion: https://postgr.es/m/CAApHDvq=yQXr5kqhRviT2RhNKwToaWr9JAN5t+5_PzhuRJ3wvg@mail.gmail.com
4 years ago
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecMemoizeInitializeWorker((MemoizeState *) planstate, pwcxt);
Add Result Cache executor node (take 2) Here we add a new executor node type named "Result Cache". The planner can include this node type in the plan to have the executor cache the results from the inner side of parameterized nested loop joins. This allows caching of tuples for sets of parameters so that in the event that the node sees the same parameter values again, it can just return the cached tuples instead of rescanning the inner side of the join all over again. Internally, result cache uses a hash table in order to quickly find tuples that have been previously cached. For certain data sets, this can significantly improve the performance of joins. The best cases for using this new node type are for join problems where a large portion of the tuples from the inner side of the join have no join partner on the outer side of the join. In such cases, hash join would have to hash values that are never looked up, thus bloating the hash table and possibly causing it to multi-batch. Merge joins would have to skip over all of the unmatched rows. If we use a nested loop join with a result cache, then we only cache tuples that have at least one join partner on the outer side of the join. The benefits of using a parameterized nested loop with a result cache increase when there are fewer distinct values being looked up and the number of lookups of each value is large. Also, hash probes to lookup the cache can be much faster than the hash probe in a hash join as it's common that the result cache's hash table is much smaller than the hash join's due to result cache only caching useful tuples rather than all tuples from the inner side of the join. This variation in hash probe performance is more significant when the hash join's hash table no longer fits into the CPU's L3 cache, but the result cache's hash table does. The apparent "random" access of hash buckets with each hash probe can cause a poor L3 cache hit ratio for large hash tables. Smaller hash tables generally perform better. The hash table used for the cache limits itself to not exceeding work_mem * hash_mem_multiplier in size. We maintain a dlist of keys for this cache and when we're adding new tuples and realize we've exceeded the memory budget, we evict cache entries starting with the least recently used ones until we have enough memory to add the new tuples to the cache. For parameterized nested loop joins, we now consider using one of these result cache nodes in between the nested loop node and its inner node. We determine when this might be useful based on cost, which is primarily driven off of what the expected cache hit ratio will be. Estimating the cache hit ratio relies on having good distinct estimates on the nested loop's parameters. For now, the planner will only consider using a result cache for parameterized nested loop joins. This works for both normal joins and also for LATERAL type joins to subqueries. It is possible to use this new node for other uses in the future. For example, to cache results from correlated subqueries. However, that's not done here due to some difficulties obtaining a distinct estimation on the outer plan to calculate the estimated cache hit ratio. Currently we plan the inner plan before planning the outer plan so there is no good way to know if a result cache would be useful or not since we can't estimate the number of times the subplan will be called until the outer plan is generated. The functionality being added here is newly introducing a dependency on the return value of estimate_num_groups() during the join search. Previously, during the join search, we only ever needed to perform selectivity estimations. With this commit, we need to use estimate_num_groups() in order to estimate what the hit ratio on the result cache will be. In simple terms, if we expect 10 distinct values and we expect 1000 outer rows, then we'll estimate the hit ratio to be 99%. Since cache hits are very cheap compared to scanning the underlying nodes on the inner side of the nested loop join, then this will significantly reduce the planner's cost for the join. However, it's fairly easy to see here that things will go bad when estimate_num_groups() incorrectly returns a value that's significantly lower than the actual number of distinct values. If this happens then that may cause us to make use of a nested loop join with a result cache instead of some other join type, such as a merge or hash join. Our distinct estimations have been known to be a source of trouble in the past, so the extra reliance on them here could cause the planner to choose slower plans than it did previous to having this feature. Distinct estimations are also fairly hard to estimate accurately when several tables have been joined already or when a WHERE clause filters out a set of values that are correlated to the expressions we're estimating the number of distinct value for. For now, the costing we perform during query planning for result caches does put quite a bit of faith in the distinct estimations being accurate. When these are accurate then we should generally see faster execution times for plans containing a result cache. However, in the real world, we may find that we need to either change the costings to put less trust in the distinct estimations being accurate or perhaps even disable this feature by default. There's always an element of risk when we teach the query planner to do new tricks that it decides to use that new trick at the wrong time and causes a regression. Users may opt to get the old behavior by turning the feature off using the enable_resultcache GUC. Currently, this is enabled by default. It remains to be seen if we'll maintain that setting for the release. Additionally, the name "Result Cache" is the best name I could think of for this new node at the time I started writing the patch. Nobody seems to strongly dislike the name. A few people did suggest other names but no other name seemed to dominate in the brief discussion that there was about names. Let's allow the beta period to see if the current name pleases enough people. If there's some consensus on a better name, then we can change it before the release. Please see the 2nd discussion link below for the discussion on the "Result Cache" name. Author: David Rowley Reviewed-by: Andy Fan, Justin Pryzby, Zhihong Yu, Hou Zhijie Tested-By: Konstantin Knizhnik Discussion: https://postgr.es/m/CAApHDvrPcQyQdWERGYWx8J%2B2DLUNgXu%2BfOSbQ1UscxrunyXyrQ%40mail.gmail.com Discussion: https://postgr.es/m/CAApHDvq=yQXr5kqhRviT2RhNKwToaWr9JAN5t+5_PzhuRJ3wvg@mail.gmail.com
4 years ago
break;
default:
break;
}
return planstate_tree_walker(planstate, ExecParallelInitializeWorker,
pwcxt);
}
/*
* Main entrypoint for parallel query worker processes.
*
* We reach this function from ParallelWorkerMain, so the setup necessary to
* create a sensible parallel environment has already been done;
* ParallelWorkerMain worries about stuff like the transaction state, combo
* CID mappings, and GUC values, so we don't need to deal with any of that
* here.
*
* Our job is to deal with concerns specific to the executor. The parallel
* group leader will have stored a serialized PlannedStmt, and it's our job
* to execute that plan and write the resulting tuples to the appropriate
* tuple queue. Various bits of supporting information that we need in order
* to do this are also stored in the dsm_segment and can be accessed through
* the shm_toc.
*/
void
ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
{
FixedParallelExecutorState *fpes;
BufferUsage *buffer_usage;
WalUsage *wal_usage;
DestReceiver *receiver;
QueryDesc *queryDesc;
SharedExecutorInstrumentation *instrumentation;
SharedJitInstrumentation *jit_instrumentation;
int instrument_options = 0;
void *area_space;
dsa_area *area;
ParallelWorkerContext pwcxt;
/* Get fixed-size state. */
fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
/* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
receiver = ExecParallelGetReceiver(seg, toc);
instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
if (instrumentation != NULL)
instrument_options = instrumentation->instrument_options;
jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
true);
queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
/* Setting debug_query_string for individual workers */
debug_query_string = queryDesc->sourceText;
/* Report workers' query for monitoring purposes */
pgstat_report_activity(STATE_RUNNING, debug_query_string);
/* Attach to the dynamic shared memory area. */
area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
area = dsa_attach_in_place(area_space, seg);
/* Start up the executor */
queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
ExecutorStart(queryDesc, fpes->eflags);
/* Special executor initialization steps for parallel workers */
queryDesc->planstate->state->es_query_dsa = area;
if (DsaPointerIsValid(fpes->param_exec))
{
char *paramexec_space;
paramexec_space = dsa_get_address(area, fpes->param_exec);
RestoreParamExecParams(paramexec_space, queryDesc->estate);
}
pwcxt.toc = toc;
pwcxt.seg = seg;
ExecParallelInitializeWorker(queryDesc->planstate, &pwcxt);
/* Pass down any tuple bound */
ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
/*
* Prepare to track buffer/WAL usage during query execution.
*
* We do this after starting up the executor to match what happens in the
* leader, which also doesn't count buffer accesses and WAL activity that
* occur during executor startup.
*/
InstrStartParallelQuery();
/*
* Run the plan. If we specified a tuple bound, be careful not to demand
* more tuples than that.
*/
ExecutorRun(queryDesc,
ForwardScanDirection,
Simplify executor's determination of whether to use parallelism. Our parallel-mode code only works when we are executing a query in full, so ExecutePlan must disable parallel mode when it is asked to do partial execution. The previous logic for this involved passing down a flag (variously named execute_once or run_once) from callers of ExecutorRun or PortalRun. This is overcomplicated, and unsurprisingly some of the callers didn't get it right, since it requires keeping state that not all of them have handy; not to mention that the requirements for it were undocumented. That led to assertion failures in some corner cases. The only state we really need for this is the existing QueryDesc.already_executed flag, so let's just put all the responsibility in ExecutePlan. (It could have been done in ExecutorRun too, leading to a slightly shorter patch -- but if there's ever more than one caller of ExecutePlan, it seems better to have this logic in the subroutine than the callers.) This makes those ExecutorRun/PortalRun parameters unnecessary. In master it seems okay to just remove them, returning the API for those functions to what it was before parallelism. Such an API break is clearly not okay in stable branches, but for them we can just leave the parameters in place after documenting that they do nothing. Per report from Yugo Nagata, who also reviewed and tested this patch. Back-patch to all supported branches. Discussion: https://postgr.es/m/20241206062549.710dc01cf91224809dd6c0e1@sraoss.co.jp
7 months ago
fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed);
/* Shut down the executor */
ExecutorFinish(queryDesc);
/* Report buffer/WAL usage during parallel execution. */
buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
&wal_usage[ParallelWorkerNumber]);
/* Report instrumentation data if any instrumentation options are set. */
if (instrumentation != NULL)
ExecParallelReportInstrumentation(queryDesc->planstate,
instrumentation);
/* Report JIT instrumentation data if any */
if (queryDesc->estate->es_jit && jit_instrumentation != NULL)
{
Assert(ParallelWorkerNumber < jit_instrumentation->num_workers);
jit_instrumentation->jit_instr[ParallelWorkerNumber] =
queryDesc->estate->es_jit->instr;
}
/* Must do this after capturing instrumentation. */
ExecutorEnd(queryDesc);
/* Cleanup. */
dsa_detach(area);
FreeQueryDesc(queryDesc);
receiver->rDestroy(receiver);
}