chore(engine): Add simple plan optimizer (#16965)

This PR adds a plan optimizer that pushes down limit and filter predicates to the scan node.

### Example

Plan:
```
Limit #0xc0000110f8 offset=0 limit=1000
└── Filter #0xc00039dd10
    │   └── Predicate expr=LT(timestamp.builtin, 1742826126000000000)
    └── Filter #0xc00039ddd0
        │   └── Predicate expr=GT(age.metadata, 21)
        └── SortMerge #0xc00039de60 column=timestamp.builtin order=ASC
            ├── DataObjScan #0xc00011b420 location=obj1 stream_ids=(1, 2) projections=() direction=0 limit=0
            └── DataObjScan #0xc00011b490 location=obj2 stream_ids=(3, 4) projections=() direction=0 limit=0
```

Optimized plan:
```

Limit #0xc0000110f8 offset=0 limit=1000
└── SortMerge #0xc00039de60 column=timestamp.builtin order=ASC
    ├── DataObjScan #0xc00011b420 location=obj1 stream_ids=(1, 2) projections=() direction=0 limit=1000
    │       ├── Predicate expr=LT(timestamp.builtin, 1742826126000000000)
    │       └── Predicate expr=GT(age.metadata, 21)
    └── DataObjScan #0xc00011b490 location=obj2 stream_ids=(3, 4) projections=() direction=0 limit=1000
            ├── Predicate expr=LT(timestamp.builtin, 1742826126000000000)
            └── Predicate expr=GT(age.metadata, 21)
```

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/16902/head^2
Christian Haudum 3 months ago committed by GitHub
parent 40223d7320
commit 04d2ba3ca8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 9
      pkg/engine/planner/physical/expressions.go
  2. 8
      pkg/engine/planner/physical/limit.go
  3. 187
      pkg/engine/planner/physical/optimizer.go
  4. 189
      pkg/engine/planner/physical/optimizer_test.go
  5. 25
      pkg/engine/planner/physical/plan.go
  6. 27
      pkg/engine/planner/physical/planner.go
  7. 6
      pkg/engine/planner/physical/planner_test.go
  8. 4
      pkg/engine/planner/physical/printer.go

@ -142,6 +142,15 @@ type ColumnExpr struct {
ref types.ColumnRef
}
func newColumnExpr(column string, ty types.ColumnType) *ColumnExpr {
return &ColumnExpr{
ref: types.ColumnRef{
Column: column,
Type: ty,
},
}
}
func (e *ColumnExpr) isExpr() {}
func (e *ColumnExpr) isColumnExpr() {}

@ -9,10 +9,10 @@ import "fmt"
type Limit struct {
id string
// Offset specifies how many initial rows should be skipped.
Offset uint32
// Limit specifies how many rows should be returned in total.
Limit uint32
// Skip specifies how many initial rows should be skipped.
Skip uint32
// Fetch specifies how many rows should be returned in total.
Fetch uint32
}
// ID implements the [Node] interface.

@ -0,0 +1,187 @@
package physical
import (
"slices"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
)
// A rule is a tranformation that can be applied on a Node.
type rule interface {
// apply tries to apply the transformation on the node.
// It returns a boolean indicating whether the transformation has been applied.
apply(Node) bool
}
// removeNoopFilter is a rule that removes Filter nodes without predicates.
type removeNoopFilter struct {
plan *Plan
}
// apply implements rule.
func (r *removeNoopFilter) apply(node Node) bool {
changed := false
switch node := node.(type) {
case *Filter:
if len(node.Predicates) == 0 {
r.plan.eliminateNode(node)
changed = true
}
}
return changed
}
var _ rule = (*removeNoopFilter)(nil)
// predicatePushdown is a rule that moves down filter predicates to the scan nodes.
type predicatePushdown struct {
plan *Plan
}
// apply implements rule.
func (r *predicatePushdown) apply(node Node) bool {
changed := false
switch node := node.(type) {
case *Filter:
for i := 0; i < len(node.Predicates); i++ {
if ok := r.applyPredicatePushdown(node, node.Predicates[i]); ok {
changed = true
// remove predicates that have been pushed down
node.Predicates = slices.Delete(node.Predicates, i, i+1)
i--
}
}
}
return changed
}
func (r *predicatePushdown) applyPredicatePushdown(node Node, predicate Expression) bool {
switch node := node.(type) {
case *DataObjScan:
if canApplyPredicate(predicate) {
node.Predicates = append(node.Predicates, predicate)
return true
}
return false
}
for _, child := range r.plan.Children(node) {
if ok := r.applyPredicatePushdown(child, predicate); !ok {
return ok
}
}
return true
}
func canApplyPredicate(predicate Expression) bool {
switch pred := predicate.(type) {
case *BinaryExpr:
return canApplyPredicate(pred.Left) && canApplyPredicate(pred.Right)
case *ColumnExpr:
return pred.ref.Type == types.ColumnTypeBuiltin || pred.ref.Type == types.ColumnTypeMetadata
case *LiteralExpr:
return true
default:
return false
}
}
var _ rule = (*predicatePushdown)(nil)
// limitPushdown is a rule that moves down the limit to the scan nodes.
type limitPushdown struct {
plan *Plan
}
// apply implements rule.
func (r *limitPushdown) apply(node Node) bool {
switch node := node.(type) {
case *Limit:
return r.applyLimitPushdown(node, node.Fetch)
}
return false
}
func (r *limitPushdown) applyLimitPushdown(node Node, limit uint32) bool {
switch node := node.(type) {
case *DataObjScan:
// In case the scan node is reachable from multiple different limit nodes, we need to take the largest limit.
node.Limit = max(node.Limit, limit)
return true
}
for _, child := range r.plan.Children(node) {
if ok := r.applyLimitPushdown(child, limit); !ok {
return ok
}
}
return true
}
var _ rule = (*limitPushdown)(nil)
// optimization represents a single optimization pass and can hold multiple rules.
type optimization struct {
plan *Plan
name string
rules []rule
}
func newOptimization(name string, plan *Plan) *optimization {
return &optimization{
name: name,
plan: plan,
}
}
func (o *optimization) withRules(rules ...rule) *optimization {
o.rules = append(o.rules, rules...)
return o
}
func (o *optimization) optimize(node Node) {
iterations, maxIterations := 0, 3
for iterations < maxIterations {
iterations++
if !o.applyRules(node) {
// Stop immediately if an optimization pass produced no changes.
break
}
}
}
func (o *optimization) applyRules(node Node) bool {
anyChanged := false
for _, child := range o.plan.Children(node) {
changed := o.applyRules(child)
if changed {
anyChanged = true
}
}
for _, rule := range o.rules {
changed := rule.apply(node)
if changed {
anyChanged = true
}
}
return anyChanged
}
// The optimizer can optimize physical plans using the provided optimization passes.
type optimizer struct {
plan *Plan
passes []*optimization
}
func newOptimizer(plan *Plan, passes []*optimization) *optimizer {
return &optimizer{plan: plan, passes: passes}
}
func (o *optimizer) optimize(node Node) {
for _, pass := range o.passes {
pass.optimize(node)
}
}

@ -0,0 +1,189 @@
package physical
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
)
func TestCanApplyPredicate(t *testing.T) {
tests := []struct {
predicate Expression
want bool
}{
{
predicate: NewLiteral(123),
want: true,
},
{
predicate: newColumnExpr("timestamp", types.ColumnTypeBuiltin),
want: true,
},
{
predicate: newColumnExpr("foo", types.ColumnTypeLabel),
want: false,
},
{
predicate: &BinaryExpr{
Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin),
Right: NewLiteral(uint64(1743424636000000000)),
Op: types.BinaryOpGt,
},
want: true,
},
{
predicate: &BinaryExpr{
Left: newColumnExpr("foo", types.ColumnTypeLabel),
Right: NewLiteral("bar"),
Op: types.BinaryOpEq,
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.predicate.String(), func(t *testing.T) {
got := canApplyPredicate(tt.predicate)
require.Equal(t, tt.want, got)
})
}
}
func dummyPlan() *Plan {
plan := &Plan{}
scan1 := plan.addNode(&DataObjScan{id: "scan1"})
scan2 := plan.addNode(&DataObjScan{id: "scan2"})
merge := plan.addNode(&SortMerge{id: "merge"})
filter1 := plan.addNode(&Filter{id: "filter1", Predicates: []Expression{
&BinaryExpr{
Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin),
Right: NewLiteral(uint64(1000000000)),
Op: types.BinaryOpGt,
},
}})
filter2 := plan.addNode(&Filter{id: "filter2", Predicates: []Expression{
&BinaryExpr{
Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin),
Right: NewLiteral(uint64(2000000000)),
Op: types.BinaryOpLte,
},
}})
filter3 := plan.addNode(&Filter{id: "filter3", Predicates: []Expression{}})
_ = plan.addEdge(Edge{Parent: filter3, Child: filter2})
_ = plan.addEdge(Edge{Parent: filter2, Child: filter1})
_ = plan.addEdge(Edge{Parent: filter1, Child: merge})
_ = plan.addEdge(Edge{Parent: merge, Child: scan1})
_ = plan.addEdge(Edge{Parent: merge, Child: scan2})
return plan
}
func TestOptimizer(t *testing.T) {
t.Run("noop", func(t *testing.T) {
plan := dummyPlan()
optimizations := []*optimization{
newOptimization("noop", plan),
}
original := PrintAsTree(plan)
o := newOptimizer(plan, optimizations)
o.optimize(plan.Roots()[0])
optimized := PrintAsTree(plan)
require.Equal(t, original, optimized)
})
t.Run("filter predicate pushdown", func(t *testing.T) {
plan := dummyPlan()
optimizations := []*optimization{
newOptimization("predicate pushdown", plan).withRules(
&predicatePushdown{plan},
),
}
o := newOptimizer(plan, optimizations)
o.optimize(plan.Roots()[0])
actual := PrintAsTree(plan)
optimized := &Plan{}
scan1 := optimized.addNode(&DataObjScan{id: "scan1", Predicates: []Expression{
&BinaryExpr{
Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin),
Right: NewLiteral(uint64(1000000000)),
Op: types.BinaryOpGt,
},
&BinaryExpr{
Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin),
Right: NewLiteral(uint64(2000000000)),
Op: types.BinaryOpLte,
},
}})
scan2 := optimized.addNode(&DataObjScan{id: "scan2", Predicates: []Expression{
&BinaryExpr{
Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin),
Right: NewLiteral(uint64(1000000000)),
Op: types.BinaryOpGt,
},
&BinaryExpr{
Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin),
Right: NewLiteral(uint64(2000000000)),
Op: types.BinaryOpLte,
},
}})
merge := optimized.addNode(&SortMerge{id: "merge"})
filter1 := optimized.addNode(&Filter{id: "filter1", Predicates: []Expression{}})
filter2 := optimized.addNode(&Filter{id: "filter2", Predicates: []Expression{}})
filter3 := optimized.addNode(&Filter{id: "filter3", Predicates: []Expression{}})
_ = optimized.addEdge(Edge{Parent: filter3, Child: filter2})
_ = optimized.addEdge(Edge{Parent: filter2, Child: filter1})
_ = optimized.addEdge(Edge{Parent: filter1, Child: merge})
_ = optimized.addEdge(Edge{Parent: merge, Child: scan1})
_ = optimized.addEdge(Edge{Parent: merge, Child: scan2})
expected := PrintAsTree(optimized)
require.Equal(t, expected, actual)
})
t.Run("filter remove", func(t *testing.T) {
plan := dummyPlan()
optimizations := []*optimization{
newOptimization("noop filter", plan).withRules(
&removeNoopFilter{plan},
),
}
o := newOptimizer(plan, optimizations)
o.optimize(plan.Roots()[0])
actual := PrintAsTree(plan)
optimized := &Plan{}
scan1 := optimized.addNode(&DataObjScan{id: "scan1", Predicates: []Expression{}})
scan2 := optimized.addNode(&DataObjScan{id: "scan2", Predicates: []Expression{}})
merge := optimized.addNode(&SortMerge{id: "merge"})
filter1 := optimized.addNode(&Filter{id: "filter1", Predicates: []Expression{
&BinaryExpr{
Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin),
Right: NewLiteral(uint64(1000000000)),
Op: types.BinaryOpGt,
},
}})
filter2 := optimized.addNode(&Filter{id: "filter2", Predicates: []Expression{
&BinaryExpr{
Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin),
Right: NewLiteral(uint64(2000000000)),
Op: types.BinaryOpLte,
},
}})
_ = optimized.addEdge(Edge{Parent: filter2, Child: filter1})
_ = optimized.addEdge(Edge{Parent: filter1, Child: merge})
_ = optimized.addEdge(Edge{Parent: merge, Child: scan1})
_ = optimized.addEdge(Edge{Parent: merge, Child: scan2})
expected := PrintAsTree(optimized)
require.Equal(t, expected, actual)
})
}

@ -198,6 +198,31 @@ func (p *Plan) addEdge(e Edge) error {
return nil
}
// eliminateNode removes a node from the plan and reconnects its parents to its children.
// This maintains the graph's connectivity by creating direct edges from each parent
// to each child of the removed node. The function also cleans up all references to
// the node in the plan's internal data structures.
func (p *Plan) eliminateNode(node Node) {
for _, parent := range p.Parents(node) {
for _, child := range p.Children(node) {
_ = p.addEdge(Edge{Parent: parent, Child: child})
}
}
for _, parent := range p.Parents(node) {
p.children[parent].remove(node)
p.parents[node].remove(parent)
}
for _, child := range p.Children(node) {
p.parents[child].remove(node)
p.children[node].remove(child)
}
p.nodes.remove(node)
delete(p.nodesByID, node.ID())
}
// Len returns the number of nodes in the graph
func (p *Plan) Len() int {
return len(p.nodes)

@ -142,8 +142,8 @@ func (p *Planner) processSort(lp *logical.Sort) ([]Node, error) {
// Convert [logical.Limit] into one [Limit] node.
func (p *Planner) processLimit(lp *logical.Limit) ([]Node, error) {
node := &Limit{
Offset: lp.Skip,
Limit: lp.Fetch,
Skip: lp.Skip,
Fetch: lp.Fetch,
}
p.plan.addNode(node)
children, err := p.process(lp.Table)
@ -157,3 +157,26 @@ func (p *Planner) processLimit(lp *logical.Limit) ([]Node, error) {
}
return []Node{node}, nil
}
// Optimize tries to optimize the plan by pushing down filter predicates and limits
// to the scan nodes.
func (p *Planner) Optimize(plan *Plan) (*Plan, error) {
for i, root := range plan.Roots() {
optimizations := []*optimization{
newOptimization("PredicatePushdown", plan).withRules(
&predicatePushdown{plan: plan},
&removeNoopFilter{plan: plan},
),
newOptimization("LimitPushdown", plan).withRules(
&limitPushdown{plan: plan},
),
}
optimizer := newOptimizer(plan, optimizations)
optimizer.optimize(root)
if i == 1 {
return nil, errors.New("physcial plan must only have exactly one root node")
}
}
return plan, nil
}

@ -65,8 +65,12 @@ func TestPlanner_Convert(t *testing.T) {
},
}
planner := NewPlanner(catalog)
physicalPlan, err := planner.Build(logicalPlan)
require.NoError(t, err)
t.Logf("Physical plan\n%s\n", PrintAsTree(physicalPlan))
t.Logf("\n%s\n", PrintAsTree(physicalPlan))
physicalPlan, err = planner.Optimize(physicalPlan)
require.NoError(t, err)
t.Logf("Optimized plan\n%s\n", PrintAsTree(physicalPlan))
}

@ -55,8 +55,8 @@ func toTreeNode(n Node) *tree.Node {
}
case *Limit:
treeNode.Properties = []tree.Property{
tree.NewProperty("offset", false, node.Offset),
tree.NewProperty("limit", false, node.Limit),
tree.NewProperty("offset", false, node.Skip),
tree.NewProperty("limit", false, node.Fetch),
}
}
return treeNode

Loading…
Cancel
Save