diff --git a/pkg/engine/internal/planner/physical/planner.go b/pkg/engine/internal/planner/physical/planner.go index a5f5f273ba..51fc4b334e 100644 --- a/pkg/engine/internal/planner/physical/planner.go +++ b/pkg/engine/internal/planner/physical/planner.go @@ -767,6 +767,9 @@ func (p *Planner) Optimize(plan *Plan) (*Plan, error) { newOptimization("ParallelPushdown", plan).withRules( ¶llelPushdown{plan: plan}, ), + newOptimization("AggregationSplit", plan).withRules( + &aggregationSplit{plan: plan}, + ), // Perform cleanups at the very end. newOptimization("Cleanup", plan).withRules( diff --git a/pkg/engine/internal/planner/physical/rule_aggregation_split.go b/pkg/engine/internal/planner/physical/rule_aggregation_split.go new file mode 100644 index 0000000000..0a1a8b8f93 --- /dev/null +++ b/pkg/engine/internal/planner/physical/rule_aggregation_split.go @@ -0,0 +1,156 @@ +package physical + +import ( + "github.com/oklog/ulid/v2" + + "github.com/grafana/loki/v3/pkg/engine/internal/types" +) + +// aggregationSplit is a rule that splits range aggregation nodes into parallel +// pieces by injecting a [Parallelize] node between two [RangeAggregation] nodes. +// +// Supported transformations: +// - max: max -> parallelize -> max +// - min: min -> parallelize -> min +// - sum: sum -> parallelize -> sum +// - count: sum -> parallelize -> count +// +// This rule runs after [parallelPushdown] and is skipped for nodes that have +// already been pushed into a [Parallelize] by that rule. When [parallelPushdown] +// leaves a [Parallelize] as a direct child (because it could not shift the node +// itself), this rule reuses that existing [Parallelize]. +type aggregationSplit struct { + plan *Plan + split map[Node]struct{} +} + +var _ rule = (*aggregationSplit)(nil) + +func (a *aggregationSplit) apply(root Node) bool { + if a.split == nil { + a.split = make(map[Node]struct{}) + } + + nodes := findMatchingNodes(a.plan, root, func(node Node) bool { + rangeAgg, ok := node.(*RangeAggregation) + if !ok { + return false + } + // Skip nodes already processed by this rule. + if _, done := a.split[node]; done { + return false + } + // Skip if already pushed down by ParallelPushdown. + if a.isAlreadyParallelized(rangeAgg) { + return false + } + return canSplitRangeAggregation(rangeAgg) + }) + + changed := false + for _, node := range nodes { + a.applySplit(node.(*RangeAggregation)) + changed = true + } + return changed +} + +// isAlreadyParallelized returns true if the given node has a [Parallelize] +// parent, indicating [parallelPushdown] has already pushed it into a parallel +// region. Because this rule runs after [parallelPushdown], any [Parallelize] +// that was previously deeper in the subtree will already have had intermediate +// nodes (e.g. [ColumnCompat]) shifted below it, leaving it as either the +// parent (node was shifted) or the direct child (node could not be shifted) of +// a [RangeAggregation]. +func (a *aggregationSplit) isAlreadyParallelized(node Node) bool { + for _, parent := range a.plan.Parent(node) { + if parent.Type() == NodeTypeParallelize { + return true + } + } + return false +} + +// applySplit transforms a single [RangeAggregation] into: +// +// outerRangeAgg -> Parallelize -> innerRangeAgg +// +// The original node becomes the outer aggregation (with potentially modified +// operation for count), and a clone becomes the inner (partial) aggregation. +// +// If a [Parallelize] is already a direct child of the node (left there by +// [parallelPushdown] when it could not shift the node), it is reused rather +// than a new one being created. +func (a *aggregationSplit) applySplit(rangeAgg *RangeAggregation) { + // Clone the original to use as the inner (partial) aggregation. + inner := rangeAgg.Clone().(*RangeAggregation) + + // Reuse an existing direct-child Parallelize if present; otherwise inject a + // new one between rangeAgg and its children. + var parallelize Node + for _, child := range a.plan.Children(rangeAgg) { + if child.Type() == NodeTypeParallelize { + parallelize = child + break + } + } + if parallelize == nil { + parallelize = &Parallelize{NodeID: ulid.Make()} + a.plan.graph.Inject(rangeAgg, parallelize) + } + + // Inject the inner clone between parallelize and its children. + a.plan.graph.Inject(parallelize, inner) + + // For count, the outer aggregation sums the partial counts. + if rangeAgg.Operation == types.RangeAggregationTypeCount { + rangeAgg.Operation = types.RangeAggregationTypeSum + } + // max, min, and sum outer operations are unchanged. + + // Set the outer range to the step size so that each outer window captures + // exactly one inner result point. + // + // The inner produces one point per step at timestamp t, representing the + // aggregation of raw data in (t-Range, t]. Consecutive inner points are + // Step apart. With outer.Range=Step, the outer window (t-Step, t] always + // contains exactly the inner point at t and excludes the point at t-Step + // (exclusive window start). This holds for all three window regimes: + // - overlapping (Step < Range): avoids collecting multiple inner points + // - aligned (Step == Range): outer.Range unchanged, already correct + // - gapped (Step > Range): outer.Range widens to Step, still one point + // + // Instant queries (Step == 0) are unaffected: there is only one window and + // one inner point at End, so any positive Range is correct. + if rangeAgg.Step > 0 { + rangeAgg.Range = rangeAgg.Step + } + + a.split[rangeAgg] = struct{}{} +} + +// canSplitRangeAggregation returns true if the range aggregation operation +// can be split into parallel pieces. +func canSplitRangeAggregation(rangeAgg *RangeAggregation) bool { + // Splitting aggregations with overlapping windows (Step < Range) can + // lead to traffic amplification (each raw datapoint can produce several + // aggregated datapoints from inner aggregation). However, if `by (...)` + // grouping is narrow (few labels) it should theoretically aggregate a lot + // of streams into a few datapoints. For now just skip all `without` groupings + // and `by` groupings with 5+ labels. + // TODO(spiridonov): Think if there is a better way to estimate amplification. + if rangeAgg.Step > 0 && rangeAgg.Step < rangeAgg.Range && + (rangeAgg.Grouping.Without || len(rangeAgg.Grouping.Columns) > 4) { + return false + } + + // Supported aggregation operations + switch rangeAgg.Operation { + case types.RangeAggregationTypeMax, + types.RangeAggregationTypeMin, + types.RangeAggregationTypeSum, + types.RangeAggregationTypeCount: + return true + } + return false +} diff --git a/pkg/engine/internal/planner/physical/rule_aggregation_split_test.go b/pkg/engine/internal/planner/physical/rule_aggregation_split_test.go new file mode 100644 index 0000000000..a8abafb096 --- /dev/null +++ b/pkg/engine/internal/planner/physical/rule_aggregation_split_test.go @@ -0,0 +1,358 @@ +package physical + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/engine/internal/types" + "github.com/grafana/loki/v3/pkg/engine/internal/util/dag" +) + +func TestAggregationSplit(t *testing.T) { + newOptimizer := func(plan *Plan) *Optimizer { + return NewOptimizer(plan, []*Optimization{ + newOptimization("AggregationSplit", plan).withRules(&aggregationSplit{plan: plan}), + }) + } + + t.Run("splits max", func(t *testing.T) { + var plan Plan + { + rangeAgg := plan.graph.Add(&RangeAggregation{Operation: types.RangeAggregationTypeMax}) + scan := plan.graph.Add(&DataObjScan{}) + require.NoError(t, plan.graph.AddEdge(dag.Edge[Node]{Parent: rangeAgg, Child: scan})) + } + + root, _ := plan.graph.Root() + newOptimizer(&plan).Optimize(root) + + var expectedPlan Plan + { + outerMax := expectedPlan.graph.Add(&RangeAggregation{Operation: types.RangeAggregationTypeMax}) + parallelize := expectedPlan.graph.Add(&Parallelize{}) + innerMax := expectedPlan.graph.Add(&RangeAggregation{Operation: types.RangeAggregationTypeMax}) + scan := expectedPlan.graph.Add(&DataObjScan{}) + + require.NoError(t, expectedPlan.graph.AddEdge(dag.Edge[Node]{Parent: outerMax, Child: parallelize})) + require.NoError(t, expectedPlan.graph.AddEdge(dag.Edge[Node]{Parent: parallelize, Child: innerMax})) + require.NoError(t, expectedPlan.graph.AddEdge(dag.Edge[Node]{Parent: innerMax, Child: scan})) + } + + require.Equal(t, PrintAsTree(&expectedPlan), PrintAsTree(&plan)) + }) + + t.Run("splits min", func(t *testing.T) { + var plan Plan + { + rangeAgg := plan.graph.Add(&RangeAggregation{Operation: types.RangeAggregationTypeMin}) + scan := plan.graph.Add(&DataObjScan{}) + require.NoError(t, plan.graph.AddEdge(dag.Edge[Node]{Parent: rangeAgg, Child: scan})) + } + + root, _ := plan.graph.Root() + newOptimizer(&plan).Optimize(root) + + var expectedPlan Plan + { + outerMin := expectedPlan.graph.Add(&RangeAggregation{Operation: types.RangeAggregationTypeMin}) + parallelize := expectedPlan.graph.Add(&Parallelize{}) + innerMin := expectedPlan.graph.Add(&RangeAggregation{Operation: types.RangeAggregationTypeMin}) + scan := expectedPlan.graph.Add(&DataObjScan{}) + + require.NoError(t, expectedPlan.graph.AddEdge(dag.Edge[Node]{Parent: outerMin, Child: parallelize})) + require.NoError(t, expectedPlan.graph.AddEdge(dag.Edge[Node]{Parent: parallelize, Child: innerMin})) + require.NoError(t, expectedPlan.graph.AddEdge(dag.Edge[Node]{Parent: innerMin, Child: scan})) + } + + require.Equal(t, PrintAsTree(&expectedPlan), PrintAsTree(&plan)) + }) + + t.Run("splits sum", func(t *testing.T) { + var plan Plan + { + rangeAgg := plan.graph.Add(&RangeAggregation{Operation: types.RangeAggregationTypeSum}) + scan := plan.graph.Add(&DataObjScan{}) + require.NoError(t, plan.graph.AddEdge(dag.Edge[Node]{Parent: rangeAgg, Child: scan})) + } + + root, _ := plan.graph.Root() + newOptimizer(&plan).Optimize(root) + + var expectedPlan Plan + { + outerSum := expectedPlan.graph.Add(&RangeAggregation{Operation: types.RangeAggregationTypeSum}) + parallelize := expectedPlan.graph.Add(&Parallelize{}) + innerSum := expectedPlan.graph.Add(&RangeAggregation{Operation: types.RangeAggregationTypeSum}) + scan := expectedPlan.graph.Add(&DataObjScan{}) + + require.NoError(t, expectedPlan.graph.AddEdge(dag.Edge[Node]{Parent: outerSum, Child: parallelize})) + require.NoError(t, expectedPlan.graph.AddEdge(dag.Edge[Node]{Parent: parallelize, Child: innerSum})) + require.NoError(t, expectedPlan.graph.AddEdge(dag.Edge[Node]{Parent: innerSum, Child: scan})) + } + + require.Equal(t, PrintAsTree(&expectedPlan), PrintAsTree(&plan)) + }) + + t.Run("splits count into sum over count", func(t *testing.T) { + var plan Plan + { + rangeAgg := plan.graph.Add(&RangeAggregation{Operation: types.RangeAggregationTypeCount}) + scan := plan.graph.Add(&DataObjScan{}) + require.NoError(t, plan.graph.AddEdge(dag.Edge[Node]{Parent: rangeAgg, Child: scan})) + } + + root, _ := plan.graph.Root() + newOptimizer(&plan).Optimize(root) + + var expectedPlan Plan + { + outerSum := expectedPlan.graph.Add(&RangeAggregation{Operation: types.RangeAggregationTypeSum}) + parallelize := expectedPlan.graph.Add(&Parallelize{}) + innerCount := expectedPlan.graph.Add(&RangeAggregation{Operation: types.RangeAggregationTypeCount}) + scan := expectedPlan.graph.Add(&DataObjScan{}) + + require.NoError(t, expectedPlan.graph.AddEdge(dag.Edge[Node]{Parent: outerSum, Child: parallelize})) + require.NoError(t, expectedPlan.graph.AddEdge(dag.Edge[Node]{Parent: parallelize, Child: innerCount})) + require.NoError(t, expectedPlan.graph.AddEdge(dag.Edge[Node]{Parent: innerCount, Child: scan})) + } + + require.Equal(t, PrintAsTree(&expectedPlan), PrintAsTree(&plan)) + }) + + t.Run("does not split unsupported operations", func(t *testing.T) { + for _, op := range []types.RangeAggregationType{ + types.RangeAggregationTypeAvg, + types.RangeAggregationTypeBytes, + } { + var plan Plan + { + rangeAgg := plan.graph.Add(&RangeAggregation{Operation: op}) + scan := plan.graph.Add(&DataObjScan{}) + require.NoError(t, plan.graph.AddEdge(dag.Edge[Node]{Parent: rangeAgg, Child: scan})) + } + + before := PrintAsTree(&plan) + root, _ := plan.graph.Root() + newOptimizer(&plan).Optimize(root) + + require.Equal(t, before, PrintAsTree(&plan), "operation %s should not be split", op) + } + }) + + t.Run("does not split node when Parallelize is already parent", func(t *testing.T) { + // ParallelPushdown has already pushed the RangeAgg inside a Parallelize. + // Input: Parallelize -> RangeAgg -> Scan + var plan Plan + { + parallelize := plan.graph.Add(&Parallelize{}) + agg := plan.graph.Add(&RangeAggregation{Operation: types.RangeAggregationTypeSum}) + scan := plan.graph.Add(&DataObjScan{}) + require.NoError(t, plan.graph.AddEdge(dag.Edge[Node]{Parent: parallelize, Child: agg})) + require.NoError(t, plan.graph.AddEdge(dag.Edge[Node]{Parent: agg, Child: scan})) + } + before := PrintAsTree(&plan) + root, _ := plan.graph.Root() + newOptimizer(&plan).Optimize(root) + require.Equal(t, before, PrintAsTree(&plan)) + }) + + t.Run("does not split when step < range with without grouping (overlapping windows)", func(t *testing.T) { + var plan Plan + { + rangeAgg := plan.graph.Add(&RangeAggregation{ + Operation: types.RangeAggregationTypeSum, + Step: 30 * time.Second, + Range: time.Minute, + Grouping: Grouping{Without: true}, + }) + scan := plan.graph.Add(&DataObjScan{}) + require.NoError(t, plan.graph.AddEdge(dag.Edge[Node]{Parent: rangeAgg, Child: scan})) + } + + before := PrintAsTree(&plan) + root, _ := plan.graph.Root() + newOptimizer(&plan).Optimize(root) + + require.Equal(t, before, PrintAsTree(&plan)) + }) + + t.Run("does not split when step < range with by grouping having 5+ labels (overlapping windows)", func(t *testing.T) { + var plan Plan + { + rangeAgg := plan.graph.Add(&RangeAggregation{ + Operation: types.RangeAggregationTypeSum, + Step: 30 * time.Second, + Range: time.Minute, + Grouping: Grouping{Columns: []ColumnExpression{ + newColumnExpr("a", types.ColumnTypeLabel), + newColumnExpr("b", types.ColumnTypeLabel), + newColumnExpr("c", types.ColumnTypeLabel), + newColumnExpr("d", types.ColumnTypeLabel), + newColumnExpr("e", types.ColumnTypeLabel), + }}, + }) + scan := plan.graph.Add(&DataObjScan{}) + require.NoError(t, plan.graph.AddEdge(dag.Edge[Node]{Parent: rangeAgg, Child: scan})) + } + + before := PrintAsTree(&plan) + root, _ := plan.graph.Root() + newOptimizer(&plan).Optimize(root) + + require.Equal(t, before, PrintAsTree(&plan)) + }) + + t.Run("splits when step < range with by grouping (overlapping windows)", func(t *testing.T) { + var plan Plan + { + rangeAgg := plan.graph.Add(&RangeAggregation{ + Operation: types.RangeAggregationTypeSum, + Step: 30 * time.Second, + Range: time.Minute, + }) + scan := plan.graph.Add(&DataObjScan{}) + require.NoError(t, plan.graph.AddEdge(dag.Edge[Node]{Parent: rangeAgg, Child: scan})) + } + + root, _ := plan.graph.Root() + newOptimizer(&plan).Optimize(root) + + var expectedPlan Plan + { + // Outer: Range is set to Step to capture exactly one inner result point. + outerSum := expectedPlan.graph.Add(&RangeAggregation{ + Operation: types.RangeAggregationTypeSum, + Step: 30 * time.Second, + Range: 30 * time.Second, // outer.Range = Step = 30s + }) + parallelize := expectedPlan.graph.Add(&Parallelize{}) + // Inner: clone of original, Range unchanged. + innerSum := expectedPlan.graph.Add(&RangeAggregation{ + Operation: types.RangeAggregationTypeSum, + Step: 30 * time.Second, + Range: time.Minute, + }) + scan := expectedPlan.graph.Add(&DataObjScan{}) + + require.NoError(t, expectedPlan.graph.AddEdge(dag.Edge[Node]{Parent: outerSum, Child: parallelize})) + require.NoError(t, expectedPlan.graph.AddEdge(dag.Edge[Node]{Parent: parallelize, Child: innerSum})) + require.NoError(t, expectedPlan.graph.AddEdge(dag.Edge[Node]{Parent: innerSum, Child: scan})) + } + + require.Equal(t, PrintAsTree(&expectedPlan), PrintAsTree(&plan)) + }) + + t.Run("splits when step == range (aligned windows)", func(t *testing.T) { + var plan Plan + { + rangeAgg := plan.graph.Add(&RangeAggregation{ + Operation: types.RangeAggregationTypeSum, + Step: time.Minute, + Range: time.Minute, + }) + scan := plan.graph.Add(&DataObjScan{}) + require.NoError(t, plan.graph.AddEdge(dag.Edge[Node]{Parent: rangeAgg, Child: scan})) + } + + root, _ := plan.graph.Root() + newOptimizer(&plan).Optimize(root) + + var expectedPlan Plan + { + // Outer: Range is set to Step (== Range here, so unchanged). + outerSum := expectedPlan.graph.Add(&RangeAggregation{ + Operation: types.RangeAggregationTypeSum, + Step: time.Minute, + Range: time.Minute, // outer.Range = Step = 1m + }) + parallelize := expectedPlan.graph.Add(&Parallelize{}) + // Inner: clone of original, Range unchanged. + innerSum := expectedPlan.graph.Add(&RangeAggregation{ + Operation: types.RangeAggregationTypeSum, + Step: time.Minute, + Range: time.Minute, + }) + scan := expectedPlan.graph.Add(&DataObjScan{}) + + require.NoError(t, expectedPlan.graph.AddEdge(dag.Edge[Node]{Parent: outerSum, Child: parallelize})) + require.NoError(t, expectedPlan.graph.AddEdge(dag.Edge[Node]{Parent: parallelize, Child: innerSum})) + require.NoError(t, expectedPlan.graph.AddEdge(dag.Edge[Node]{Parent: innerSum, Child: scan})) + } + + require.Equal(t, PrintAsTree(&expectedPlan), PrintAsTree(&plan)) + }) + + t.Run("splits when step > range (gapped windows)", func(t *testing.T) { + var plan Plan + { + rangeAgg := plan.graph.Add(&RangeAggregation{ + Operation: types.RangeAggregationTypeSum, + Step: 2 * time.Minute, + Range: time.Minute, + }) + scan := plan.graph.Add(&DataObjScan{}) + require.NoError(t, plan.graph.AddEdge(dag.Edge[Node]{Parent: rangeAgg, Child: scan})) + } + + root, _ := plan.graph.Root() + newOptimizer(&plan).Optimize(root) + + var expectedPlan Plan + { + // Outer: Range is widened to Step. + outerSum := expectedPlan.graph.Add(&RangeAggregation{ + Operation: types.RangeAggregationTypeSum, + Step: 2 * time.Minute, + Range: 2 * time.Minute, // outer.Range = Step = 2m + }) + parallelize := expectedPlan.graph.Add(&Parallelize{}) + // Inner: clone of original, Range unchanged. + innerSum := expectedPlan.graph.Add(&RangeAggregation{ + Operation: types.RangeAggregationTypeSum, + Step: 2 * time.Minute, + Range: time.Minute, + }) + scan := expectedPlan.graph.Add(&DataObjScan{}) + + require.NoError(t, expectedPlan.graph.AddEdge(dag.Edge[Node]{Parent: outerSum, Child: parallelize})) + require.NoError(t, expectedPlan.graph.AddEdge(dag.Edge[Node]{Parent: parallelize, Child: innerSum})) + require.NoError(t, expectedPlan.graph.AddEdge(dag.Edge[Node]{Parent: innerSum, Child: scan})) + } + + require.Equal(t, PrintAsTree(&expectedPlan), PrintAsTree(&plan)) + }) + + t.Run("splits when Parallelize is already a direct child", func(t *testing.T) { + // After ParallelPushdown shifts intermediate nodes (e.g. Compat) below + // Parallelize, Parallelize may end up as a direct child of the RangeAgg + // when ParallelPushdown cannot shift the RangeAgg itself. + // AggregationSplit should reuse that existing Parallelize. + // Input: RangeAgg(sum) -> Parallelize -> Scan (simulates post-ParallelPushdown state) + var plan Plan + { + agg := plan.graph.Add(&RangeAggregation{Operation: types.RangeAggregationTypeSum}) + parallelize := plan.graph.Add(&Parallelize{}) + scan := plan.graph.Add(&DataObjScan{}) + require.NoError(t, plan.graph.AddEdge(dag.Edge[Node]{Parent: agg, Child: parallelize})) + require.NoError(t, plan.graph.AddEdge(dag.Edge[Node]{Parent: parallelize, Child: scan})) + } + + root, _ := plan.graph.Root() + newOptimizer(&plan).Optimize(root) + + var expectedPlan Plan + { + outerSum := expectedPlan.graph.Add(&RangeAggregation{Operation: types.RangeAggregationTypeSum}) + parallelize := expectedPlan.graph.Add(&Parallelize{}) + innerSum := expectedPlan.graph.Add(&RangeAggregation{Operation: types.RangeAggregationTypeSum}) + scan := expectedPlan.graph.Add(&DataObjScan{}) + require.NoError(t, expectedPlan.graph.AddEdge(dag.Edge[Node]{Parent: outerSum, Child: parallelize})) + require.NoError(t, expectedPlan.graph.AddEdge(dag.Edge[Node]{Parent: parallelize, Child: innerSum})) + require.NoError(t, expectedPlan.graph.AddEdge(dag.Edge[Node]{Parent: innerSum, Child: scan})) + } + + require.Equal(t, PrintAsTree(&expectedPlan), PrintAsTree(&plan)) + }) +} diff --git a/pkg/engine/internal/planner/physical/rule_parallel_pushdown.go b/pkg/engine/internal/planner/physical/rule_parallel_pushdown.go index f5d1ce7518..fc2eeb04e9 100644 --- a/pkg/engine/internal/planner/physical/rule_parallel_pushdown.go +++ b/pkg/engine/internal/planner/physical/rule_parallel_pushdown.go @@ -81,7 +81,7 @@ func (p *parallelPushdown) applyParallelization(node Node) bool { // RangeAggregation can be parallelized only when the operation // is associative and commutative with the parent vector aggregation. - if !canShardAggregation(vecAgg, node) { + if !p.canShardAggregation(vecAgg, node) { return false } @@ -134,8 +134,8 @@ func (p *parallelPushdown) findParentVectorAggregation(node Node) *VectorAggrega // Supported combinations: // - sum over sum/count: additive operations can be summed across partitions // - max over max: max of local maxes equals global max -// - min over min: min of loca mins equals global min -func canShardAggregation(vec *VectorAggregation, rng *RangeAggregation) bool { +// - min over min: min of local mins equals global min +func (p *parallelPushdown) canShardAggregation(vec *VectorAggregation, rng *RangeAggregation) bool { // without grouping is not pushed down. if vec.Grouping.Without || rng.Grouping.Without { return false diff --git a/pkg/engine/internal/planner/physical/rule_parallel_pushdown_test.go b/pkg/engine/internal/planner/physical/rule_parallel_pushdown_test.go index 0c5e0f56c0..839069381b 100644 --- a/pkg/engine/internal/planner/physical/rule_parallel_pushdown_test.go +++ b/pkg/engine/internal/planner/physical/rule_parallel_pushdown_test.go @@ -219,7 +219,7 @@ func TestParallelPushdown_canShardAggregation(t *testing.T) { t.Run(tc.name, func(t *testing.T) { vec := &VectorAggregation{Operation: tc.vecOp} rng := &RangeAggregation{Operation: tc.rangeOp} - require.Equal(t, tc.expected, canShardAggregation(vec, rng)) + require.Equal(t, tc.expected, (¶llelPushdown{}).canShardAggregation(vec, rng)) }) } }) @@ -231,7 +231,7 @@ func TestParallelPushdown_canShardAggregation(t *testing.T) { Grouping: Grouping{Without: true}, } rng := &RangeAggregation{Operation: types.RangeAggregationTypeSum} - require.False(t, canShardAggregation(vec, rng)) + require.False(t, (¶llelPushdown{}).canShardAggregation(vec, rng)) }) t.Run("rejects mismatched grouping columns", func(t *testing.T) { @@ -247,7 +247,7 @@ func TestParallelPushdown_canShardAggregation(t *testing.T) { Columns: []ColumnExpression{&ColumnExpr{Ref: types.ColumnRef{Column: "bar"}}}, }, } - require.False(t, canShardAggregation(vec, rng)) + require.False(t, (¶llelPushdown{}).canShardAggregation(vec, rng)) }) t.Run("accepts matching grouping columns", func(t *testing.T) { @@ -263,7 +263,7 @@ func TestParallelPushdown_canShardAggregation(t *testing.T) { Columns: []ColumnExpression{&ColumnExpr{Ref: types.ColumnRef{Column: "foo"}}}, }, } - require.True(t, canShardAggregation(vec, rng)) + require.True(t, (¶llelPushdown{}).canShardAggregation(vec, rng)) }) }) } diff --git a/pkg/engine/internal/planner/planner_test.go b/pkg/engine/internal/planner/planner_test.go index 963917b989..75607e42f6 100644 --- a/pkg/engine/internal/planner/planner_test.go +++ b/pkg/engine/internal/planner/planner_test.go @@ -210,18 +210,19 @@ VectorAggregation operation=sum group_by=(ambiguous.bar) query: `sum(count_over_time({app="foo"} | detected_level="error" | json | logfmt | drop __error__,__error_details__[1m]))`, expected: ` VectorAggregation operation=sum group_by=() -└── RangeAggregation operation=count start=2025-01-01T00:00:00Z end=2025-01-01T01:00:00Z step=0s range=1m0s +└── RangeAggregation operation=sum start=2025-01-01T00:00:00Z end=2025-01-01T01:00:00Z step=0s range=1m0s └── Parallelize - └── Projection all=true drop=(ambiguous.__error__, ambiguous.__error_details__) - └── Compat src=parsed dst=parsed collisions=(label, metadata) - └── Projection all=true expand=(PARSE_LOGFMT(builtin.message, [], false, false)) - └── Compat src=parsed dst=parsed collisions=(label, metadata) - └── Projection all=true expand=(PARSE_JSON(builtin.message, [], false, false)) - └── Filter predicate[0]=EQ(ambiguous.detected_level, "error") - └── Compat src=metadata dst=metadata collisions=(label) - └── ScanSet num_targets=2 predicate[0]=GTE(builtin.timestamp, 2024-12-31T23:59:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z) - ├── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=() predicate[0]=EQ(metadata.detected_level, "error") - └── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=0 projections=() predicate[0]=EQ(metadata.detected_level, "error") + └── RangeAggregation operation=count start=2025-01-01T00:00:00Z end=2025-01-01T01:00:00Z step=0s range=1m0s + └── Projection all=true drop=(ambiguous.__error__, ambiguous.__error_details__) + └── Compat src=parsed dst=parsed collisions=(label, metadata) + └── Projection all=true expand=(PARSE_LOGFMT(builtin.message, [], false, false)) + └── Compat src=parsed dst=parsed collisions=(label, metadata) + └── Projection all=true expand=(PARSE_JSON(builtin.message, [], false, false)) + └── Filter predicate[0]=EQ(ambiguous.detected_level, "error") + └── Compat src=metadata dst=metadata collisions=(label) + └── ScanSet num_targets=2 predicate[0]=GTE(builtin.timestamp, 2024-12-31T23:59:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z) + ├── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=() predicate[0]=EQ(metadata.detected_level, "error") + └── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=0 projections=() predicate[0]=EQ(metadata.detected_level, "error") `, }, { @@ -230,12 +231,13 @@ VectorAggregation operation=sum group_by=() expected: ` VectorAggregation operation=sum group_by=(ambiguous.bar) └── Projection all=true expand=(DIV(generated.value, 300)) - └── RangeAggregation operation=count start=2025-01-01T00:00:00Z end=2025-01-01T01:00:00Z step=0s range=1m0s group_by=(ambiguous.bar) + └── RangeAggregation operation=sum start=2025-01-01T00:00:00Z end=2025-01-01T01:00:00Z step=0s range=1m0s group_by=(ambiguous.bar) └── Parallelize - └── Compat src=metadata dst=metadata collisions=(label) - └── ScanSet num_targets=2 projections=(ambiguous.bar, builtin.timestamp) predicate[0]=GTE(builtin.timestamp, 2024-12-31T23:59:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z) - ├── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=() - └── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=0 projections=() + └── RangeAggregation operation=count start=2025-01-01T00:00:00Z end=2025-01-01T01:00:00Z step=0s range=1m0s group_by=(ambiguous.bar) + └── Compat src=metadata dst=metadata collisions=(label) + └── ScanSet num_targets=2 projections=(ambiguous.bar, builtin.timestamp) predicate[0]=GTE(builtin.timestamp, 2024-12-31T23:59:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z) + ├── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=() + └── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=0 projections=() `, }, {