From cbe97e6c1abc1ca58e9dc8c42b53d360bc7f602b Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Thu, 20 Mar 2025 11:29:25 -0400 Subject: [PATCH] chore(engine): Update logical planner types to be structured as SSA (#16841) Signed-off-by: Robert Fratto Co-authored-by: Christian Haudum --- pkg/engine/planner/internal/tree/printer.go | 13 +- pkg/engine/planner/logical/aggregate_expr.go | 108 ---- pkg/engine/planner/logical/aggregate_plan.go | 74 --- pkg/engine/planner/logical/binop.go | 286 ---------- pkg/engine/planner/logical/builder.go | 65 +++ pkg/engine/planner/logical/builder_convert.go | 134 +++++ pkg/engine/planner/logical/column.go | 32 -- pkg/engine/planner/logical/column_ref.go | 65 +++ pkg/engine/planner/logical/dataframe.go | 126 ----- pkg/engine/planner/logical/expr.go | 129 ----- pkg/engine/planner/logical/filter.go | 51 -- pkg/engine/planner/logical/format_tree.go | 189 +++---- .../planner/logical/format_tree_test.go | 226 ++------ pkg/engine/planner/logical/limit.go | 85 --- pkg/engine/planner/logical/literal.go | 99 ---- pkg/engine/planner/logical/logical.go | 94 ++++ pkg/engine/planner/logical/logical_test.go | 59 ++ pkg/engine/planner/logical/node_binop.go | 109 ++++ pkg/engine/planner/logical/node_limit.go | 53 ++ pkg/engine/planner/logical/node_literal.go | 145 +++++ pkg/engine/planner/logical/node_maketable.go | 49 ++ pkg/engine/planner/logical/node_return.go | 14 + pkg/engine/planner/logical/node_select.go | 48 ++ pkg/engine/planner/logical/node_sort.go | 57 ++ pkg/engine/planner/logical/node_unaryop.go | 69 +++ pkg/engine/planner/logical/plan.go | 187 ------- pkg/engine/planner/logical/project.go | 50 -- pkg/engine/planner/logical/sort_expr.go | 59 -- pkg/engine/planner/logical/sort_plan.go | 54 -- pkg/engine/planner/logical/ssa.go | 512 ------------------ pkg/engine/planner/logical/ssa_test.go | 189 ------- pkg/engine/planner/logical/table.go | 43 -- pkg/engine/planner/physical/planner.go | 2 +- 33 files changed, 1086 insertions(+), 2389 deletions(-) delete mode 100644 pkg/engine/planner/logical/aggregate_expr.go delete mode 100644 pkg/engine/planner/logical/aggregate_plan.go delete mode 100644 pkg/engine/planner/logical/binop.go create mode 100644 pkg/engine/planner/logical/builder.go create mode 100644 pkg/engine/planner/logical/builder_convert.go delete mode 100644 pkg/engine/planner/logical/column.go create mode 100644 pkg/engine/planner/logical/column_ref.go delete mode 100644 pkg/engine/planner/logical/dataframe.go delete mode 100644 pkg/engine/planner/logical/expr.go delete mode 100644 pkg/engine/planner/logical/filter.go delete mode 100644 pkg/engine/planner/logical/limit.go delete mode 100644 pkg/engine/planner/logical/literal.go create mode 100644 pkg/engine/planner/logical/logical.go create mode 100644 pkg/engine/planner/logical/logical_test.go create mode 100644 pkg/engine/planner/logical/node_binop.go create mode 100644 pkg/engine/planner/logical/node_limit.go create mode 100644 pkg/engine/planner/logical/node_literal.go create mode 100644 pkg/engine/planner/logical/node_maketable.go create mode 100644 pkg/engine/planner/logical/node_return.go create mode 100644 pkg/engine/planner/logical/node_select.go create mode 100644 pkg/engine/planner/logical/node_sort.go create mode 100644 pkg/engine/planner/logical/node_unaryop.go delete mode 100644 pkg/engine/planner/logical/plan.go delete mode 100644 pkg/engine/planner/logical/project.go delete mode 100644 pkg/engine/planner/logical/sort_expr.go delete mode 100644 pkg/engine/planner/logical/sort_plan.go delete mode 100644 pkg/engine/planner/logical/ssa.go delete mode 100644 pkg/engine/planner/logical/ssa_test.go delete mode 100644 pkg/engine/planner/logical/table.go diff --git a/pkg/engine/planner/internal/tree/printer.go b/pkg/engine/planner/internal/tree/printer.go index bd0c779dd9..99963938a0 100644 --- a/pkg/engine/planner/internal/tree/printer.go +++ b/pkg/engine/planner/internal/tree/printer.go @@ -145,7 +145,7 @@ func (tp *Printer) printNode(node *Node) { tp.w.WriteString("\n") } -// printChildren recursively prints all children with appropriate indentation +// printChildren recursively prints all children with appropriate indentation. func (tp *Printer) printChildren(comments, children []*Node, prefix string) { hasChildren := len(children) > 0 @@ -155,16 +155,23 @@ func (tp *Printer) printChildren(comments, children []*Node, prefix string) { for i, node := range comments { isLast := i == len(comments)-1 + // Choose indentation symbols based on whether the node we're printing has + // any children to print. + indent := symIndent + if !hasChildren { + indent = symPrefix + } + // Choose connector symbols based on whether this is the last item connector := symPrefix + symConn - newPrefix := prefix + symIndent + symIndent + newPrefix := prefix + indent + symIndent if hasChildren { connector = symIndent + symConn } if isLast { connector = symPrefix + symLastConn - newPrefix = prefix + symIndent + symPrefix + newPrefix = prefix + indent + symPrefix if hasChildren { connector = symIndent + symLastConn } diff --git a/pkg/engine/planner/logical/aggregate_expr.go b/pkg/engine/planner/logical/aggregate_expr.go deleted file mode 100644 index 9625fb82b3..0000000000 --- a/pkg/engine/planner/logical/aggregate_expr.go +++ /dev/null @@ -1,108 +0,0 @@ -package logical - -import ( - "github.com/grafana/loki/v3/pkg/engine/planner/schema" -) - -// AggregateOp represents the type of aggregation operation to perform. -// It is a string-based enum that identifies different aggregation functions -// that can be applied to expressions. -type AggregateOp string - -const ( - // AggregateOpSum represents a sum aggregation - AggregateOpSum AggregateOp = "sum" - // AggregateOpAvg represents an average aggregation - AggregateOpAvg AggregateOp = "avg" - // AggregateOpMin represents a minimum value aggregation - AggregateOpMin AggregateOp = "min" - // AggregateOpMax represents a maximum value aggregation - AggregateOpMax AggregateOp = "max" - // AggregateOpCount represents a count aggregation - AggregateOpCount AggregateOp = "count" -) - -// Convenience constructors for each aggregate operation -var ( - // Sum creates a sum aggregation expression - Sum = newAggregateExprConstructor(AggregateOpSum) - // Avg creates an average aggregation expression - Avg = newAggregateExprConstructor(AggregateOpAvg) - // Min creates a minimum value aggregation expression - Min = newAggregateExprConstructor(AggregateOpMin) - // Max creates a maximum value aggregation expression - Max = newAggregateExprConstructor(AggregateOpMax) - // Count creates a count aggregation expression - Count = newAggregateExprConstructor(AggregateOpCount) -) - -// AggregateExpr represents an aggregation operation on an expression. -// It encapsulates the operation to perform (sum, avg, etc.), the expression -// to aggregate, and a name for the result. -type AggregateExpr struct { - // name is the identifier for this aggregation - name string - // op specifies which aggregation operation to perform - op AggregateOp - // expr is the expression to aggregate - expr Expr -} - -// newAggregateExprConstructor creates a constructor function for a specific aggregate operation. -// This is a higher-order function that returns a function for creating aggregate expressions -// with a specific operation type. -func newAggregateExprConstructor(op AggregateOp) func(name string, expr Expr) AggregateExpr { - return func(name string, expr Expr) AggregateExpr { - return AggregateExpr{ - name: name, - op: op, - expr: expr, - } - } -} - -// Type returns the type of the expression. -// For aggregate expressions, this is always ExprTypeAggregate. -func (a AggregateExpr) Type() ExprType { - return ExprTypeAggregate -} - -// Name returns the name of the aggregation. -// This is used as the column name in the output schema. -func (a AggregateExpr) Name() string { - return a.name -} - -// Op returns the aggregation operation. -// This identifies which aggregation function to apply. -func (a AggregateExpr) Op() AggregateOp { - return a.op -} - -// SubExpr returns the expression being aggregated. -// This is the input to the aggregation function. -func (a AggregateExpr) SubExpr() Expr { - return a.expr -} - -// ToField converts the aggregation expression to a column schema. -// It determines the output type based on the input expression and -// the aggregation operation. -func (a AggregateExpr) ToField(p Plan) schema.ColumnSchema { - // Get the input field schema - - return schema.ColumnSchema{ - Name: a.name, - // Aggregations typically result in numeric types - Type: determineAggregationTypeFromFieldType(a.expr.ToField(p).Type, a.op), - } -} - -// determineAggregationTypeFromFieldType calculates the output type of an aggregation -// based on the input field type and the aggregation operation. -// Currently, this is a placeholder that always returns int64, but it should be -// implemented to handle different input types and operations correctly. -func determineAggregationTypeFromFieldType(_ schema.ValueType, _ AggregateOp) schema.ValueType { - // TODO: implement - return schema.ValueTypeInt64 -} diff --git a/pkg/engine/planner/logical/aggregate_plan.go b/pkg/engine/planner/logical/aggregate_plan.go deleted file mode 100644 index e1c4d6d019..0000000000 --- a/pkg/engine/planner/logical/aggregate_plan.go +++ /dev/null @@ -1,74 +0,0 @@ -package logical - -import ( - "github.com/grafana/loki/v3/pkg/engine/planner/schema" -) - -// Aggregate represents a plan node that performs aggregation operations. -// The output schema is organized with grouping columns followed by aggregate expressions. -// It corresponds to the GROUP BY clause in SQL and is used to compute aggregate -// functions like SUM, AVG, MIN, MAX, and COUNT over groups of rows. -type Aggregate struct { - // input is the child plan node providing data to aggregate - input Plan - // groupExprs are the expressions to group by - groupExprs []Expr - // aggExprs are the aggregate expressions to compute - aggExprs []AggregateExpr -} - -// NewAggregate creates a new Aggregate plan node. -// The Aggregate logical plan calculates aggregates of underlying data such as -// calculating minimum, maximum, averages, and sums of data. Aggregates are often -// grouped by other columns (or expressions). -// A simple example would be SELECT region, SUM(sales) FROM orders GROUP BY region. -func newAggregate(input Plan, groupExprs []Expr, aggExprs []AggregateExpr) *Aggregate { - return &Aggregate{ - input: input, - groupExprs: groupExprs, - aggExprs: aggExprs, - } -} - -// Schema returns the schema of the data produced by this aggregate. -// The schema consists of group-by expressions followed by aggregate expressions. -// This ordering is important for downstream operations that expect group columns -// to come before aggregate columns. -func (a *Aggregate) Schema() schema.Schema { - var columns []schema.ColumnSchema - - // Group expressions come first - for _, expr := range a.groupExprs { - columns = append(columns, expr.ToField(a.input)) - } - - // Followed by aggregate expressions - for _, expr := range a.aggExprs { - columns = append(columns, expr.ToField(a.input)) - } - - return schema.FromColumns(columns) -} - -// Type implements the ast interface -func (a *Aggregate) Type() PlanType { - return PlanTypeAggregate -} - -// GroupExprs returns the list of expressions to group by. -// These expressions define the grouping keys for the aggregation. -func (a *Aggregate) GroupExprs() []Expr { - return a.groupExprs -} - -// AggregateExprs returns the list of aggregate expressions to compute. -// These expressions define the aggregate functions to apply to each group. -func (a *Aggregate) AggregateExprs() []AggregateExpr { - return a.aggExprs -} - -// Child returns the input plan. -// This is a convenience method for accessing the child plan. -func (a *Aggregate) Child() Plan { - return a.input -} diff --git a/pkg/engine/planner/logical/binop.go b/pkg/engine/planner/logical/binop.go deleted file mode 100644 index f7837059a9..0000000000 --- a/pkg/engine/planner/logical/binop.go +++ /dev/null @@ -1,286 +0,0 @@ -package logical - -import ( - "fmt" - - "github.com/grafana/loki/v3/pkg/engine/planner/schema" -) - -/* -This file defines the structures and methods for binary operations within the logical query plan. - -Key components: -1. BinOpType: An enum representing the type of binary operation (e.g., math, comparison, set). -2. BinOpExpr: A struct representing a binary operation expression, which includes: - - name: Identifier for the binary operation. - - ty: Type of binary operation (BinOpType). - - op: The actual operation (e.g., +, -, ==, !=, &&, ||). - - l: Left expression operand. - - r: Right expression operand. -3. Methods for BinOpExpr: - - ToField: Converts the binary operation expression to a column schema. - - Type: Returns the type of binary operation. - - Name: Returns the name of the binary operation. -*/ - -// UNKNOWN is a constant used for string representation of unknown operation types -const UNKNOWN = "unknown" - -// BinOpType is an enum representing the category of binary operation. -// It allows for grouping similar operations together for type-safe handling. -type BinOpType int - -const ( - BinOpTypeInvalid BinOpType = iota // Invalid or uninitialized binary operation - BinOpTypeMath // Mathematical operations (+, -, *, /, %) - BinOpTypeCmp // Comparison operations (==, !=, <, <=, >, >=) - BinOpTypeSet // Set operations (AND, OR, NOT, XOR) -) - -// String returns a human-readable representation of the binary operation type. -func (t BinOpType) String() string { - switch t { - case BinOpTypeMath: - return "math" - case BinOpTypeCmp: - return "cmp" - case BinOpTypeSet: - return "set" - default: - return UNKNOWN - } -} - -// BinOpExpr represents a binary operation expression in the query plan. -// It combines two expressions with an operation to produce a result. -type BinOpExpr struct { - // name is the identifier for this binary operation - name string - // ty is the type of binary operation (e.g. math, cmp, set) - ty BinOpType - // op is the actual operation (e.g. +, -, ==, !=, &&, ||) - op int - // l is the left expression - l Expr - // r is the right expression - r Expr -} - -// ToField converts the binary operation to a column schema. -// The name of the column is the name of the binary operation, -// and the type is derived from the left operand. -func (b BinOpExpr) ToField(p Plan) schema.ColumnSchema { - return schema.ColumnSchema{ - Name: b.name, - Type: b.l.ToField(p).Type, - } -} - -// Type returns the type of the binary operation. -func (b BinOpExpr) Type() BinOpType { - return b.ty -} - -// Name returns the name of the binary operation. -func (b BinOpExpr) Name() string { - return b.name -} - -// Left returns the left operand of the binary operation. -func (b BinOpExpr) Left() Expr { - return b.l -} - -// Right returns the right operand of the binary operation. -func (b BinOpExpr) Right() Expr { - return b.r -} - -// Op returns a string representation of the binary operation. -// It delegates to the appropriate type-specific operation based on the operation type. -func (b BinOpExpr) Op() fmt.Stringer { - switch b.ty { - case BinOpTypeMath: - return BinaryOpMath(b.op) - case BinOpTypeCmp: - return BinaryOpCmp(b.op) - case BinOpTypeSet: - return BinaryOpSet(b.op) - default: - panic(fmt.Sprintf("unknown binary operation type: %d", b.ty)) - } -} - -// BinaryOpMath represents mathematical binary operations -type BinaryOpMath int - -const ( - BinaryOpMathInvalid BinaryOpMath = iota - // BinaryOpAdd represents addition operation (+) - BinaryOpAdd - // BinaryOpSubtract represents subtraction operation (-) - BinaryOpSubtract - // BinaryOpMultiply represents multiplication operation (*) - BinaryOpMultiply - // BinaryOpDivide represents division operation (/) - BinaryOpDivide - // BinaryOpModulo represents modulo operation (%) - BinaryOpModulo -) - -// String returns a human-readable representation of the mathematical operation. -func (b BinaryOpMath) String() string { - switch b { - case BinaryOpAdd: - return "+" - case BinaryOpSubtract: - return "-" - case BinaryOpMultiply: - return "*" - case BinaryOpDivide: - return "/" - case BinaryOpModulo: - return "%" - default: - return "unknown" - } -} - -// BinaryOpCmp represents comparison binary operations -type BinaryOpCmp int - -const ( - BinaryOpCmpInvalid BinaryOpCmp = iota - // BinaryOpEq represents equality comparison (==) - BinaryOpEq - // BinaryOpNeq represents inequality comparison (!=) - BinaryOpNeq - // BinaryOpLt represents less than comparison (<) - BinaryOpLt - // BinaryOpLte represents less than or equal comparison (<=) - BinaryOpLte - // BinaryOpGt represents greater than comparison (>) - BinaryOpGt - // BinaryOpGte represents greater than or equal comparison (>=) - BinaryOpGte -) - -// String returns a human-readable representation of the comparison operation. -func (b BinaryOpCmp) String() string { - switch b { - case BinaryOpEq: - return "==" - case BinaryOpNeq: - return "!=" - case BinaryOpLt: - return "<" - case BinaryOpLte: - return "<=" - case BinaryOpGt: - return ">" - case BinaryOpGte: - return ">=" - default: - return UNKNOWN - } -} - -// BinaryOpSet represents set operations between boolean expressions -type BinaryOpSet int - -const ( - BinaryOpSetInvalid BinaryOpSet = iota - // BinaryOpAnd represents logical AND operation - BinaryOpAnd - // BinaryOpOr represents logical OR operation - BinaryOpOr - // BinaryOpNot represents logical NOT operation (also known as "unless") - BinaryOpNot - // BinaryOpXor represents logical XOR operation - BinaryOpXor -) - -// String returns a human-readable representation of the set operation. -func (b BinaryOpSet) String() string { - switch b { - case BinaryOpAnd: - return "and" - case BinaryOpOr: - return "or" - case BinaryOpNot: - return "not" - case BinaryOpXor: - return "xor" - default: - return UNKNOWN - } -} - -// newBinOpConstructor creates a constructor function for binary operations of a specific type. -// This is a higher-order function that returns a function for creating binary operations. -func newBinOpConstructor(t BinOpType, op int) func(name string, l Expr, r Expr) Expr { - return func(name string, l Expr, r Expr) Expr { - binop := BinOpExpr{ - name: name, - ty: t, - op: op, - l: l, - r: r, - } - return NewBinOpExpr(binop) - } -} - -// newBinOpSetConstructor creates a constructor function for set operations. -func newBinOpSetConstructor(op BinaryOpSet) func(name string, l Expr, r Expr) Expr { - return newBinOpConstructor(BinOpTypeSet, int(op)) -} - -// newBinOpCmpConstructor creates a constructor function for comparison operations. -func newBinOpCmpConstructor(op BinaryOpCmp) func(name string, l Expr, r Expr) Expr { - return newBinOpConstructor(BinOpTypeCmp, int(op)) -} - -// newBinOpMathConstructor creates a constructor function for mathematical operations. -func newBinOpMathConstructor(op BinaryOpMath) func(name string, l Expr, r Expr) Expr { - return newBinOpConstructor(BinOpTypeMath, int(op)) -} - -var ( - // And creates a logical AND expression - And = newBinOpSetConstructor(BinaryOpAnd) - // Or creates a logical OR expression - Or = newBinOpSetConstructor(BinaryOpOr) - // Not creates a logical NOT expression - Not = newBinOpSetConstructor(BinaryOpNot) - // Xor creates a logical XOR expression - Xor = newBinOpSetConstructor(BinaryOpXor) -) - -var ( - // Eq creates an equality comparison expression - Eq = newBinOpCmpConstructor(BinaryOpEq) - // Neq creates an inequality comparison expression - Neq = newBinOpCmpConstructor(BinaryOpNeq) - // Lt creates a less than comparison expression - Lt = newBinOpCmpConstructor(BinaryOpLt) - // Lte creates a less than or equal comparison expression - Lte = newBinOpCmpConstructor(BinaryOpLte) - // Gt creates a greater than comparison expression - Gt = newBinOpCmpConstructor(BinaryOpGt) - // Gte creates a greater than or equal comparison expression - Gte = newBinOpCmpConstructor(BinaryOpGte) -) - -var ( - // Add creates a binary addition expression - Add = newBinOpMathConstructor(BinaryOpAdd) - // Subtract creates a binary subtraction expression - Subtract = newBinOpMathConstructor(BinaryOpSubtract) - // Multiply creates a binary multiplication expression - Multiply = newBinOpMathConstructor(BinaryOpMultiply) - // Divide creates a binary division expression - Divide = newBinOpMathConstructor(BinaryOpDivide) - // Modulo creates a binary modulo expression - Modulo = newBinOpMathConstructor(BinaryOpModulo) -) diff --git a/pkg/engine/planner/logical/builder.go b/pkg/engine/planner/logical/builder.go new file mode 100644 index 0000000000..7cf771a9e2 --- /dev/null +++ b/pkg/engine/planner/logical/builder.go @@ -0,0 +1,65 @@ +package logical + +import ( + "github.com/grafana/loki/v3/pkg/engine/planner/schema" +) + +// Builder provides an ergonomic interface for constructing a [Plan]. +type Builder struct { + val Value +} + +// NewBuilder creates a new Builder from a Value, where the starting Value is +// usually a [MakeTable]. +func NewBuilder(val Value) *Builder { + return &Builder{val: val} +} + +// Select applies a [Select] operation to the Builder. +func (b *Builder) Select(predicate Value) *Builder { + return &Builder{ + val: &Select{ + Table: b.val, + Predicate: predicate, + }, + } +} + +// Limit applies a [Limit] operation to the Builder. +func (b *Builder) Limit(skip uint64, fetch uint64) *Builder { + return &Builder{ + val: &Limit{ + Table: b.val, + Skip: skip, + Fetch: fetch, + }, + } +} + +// Sort applies a [Sort] operation to the Builder. +func (b *Builder) Sort(column ColumnRef, ascending, nullsFirst bool) *Builder { + return &Builder{ + val: &Sort{ + Table: b.val, + + Column: column, + Ascending: ascending, + NullsFirst: nullsFirst, + }, + } +} + +// Schema returns the schema of the data that will be produced by this Builder. +func (b *Builder) Schema() *schema.Schema { + return b.val.Schema() +} + +// Value returns the underlying [Value]. This is useful when you need to access +// the value directly, such as when passing it to a function that operates on +// values rather than a Builder. +func (b *Builder) Value() Value { return b.val } + +// ToPlan converts the Builder to a Plan. +func (b *Builder) ToPlan() (*Plan, error) { + return convertToPlan(b.val) +} diff --git a/pkg/engine/planner/logical/builder_convert.go b/pkg/engine/planner/logical/builder_convert.go new file mode 100644 index 0000000000..49ddd00a80 --- /dev/null +++ b/pkg/engine/planner/logical/builder_convert.go @@ -0,0 +1,134 @@ +package logical + +import ( + "fmt" +) + +// convertToPlan converts a [Value] into a [Plan]. The value becomes the last +// instruction returned by [Return]. +func convertToPlan(value Value) (*Plan, error) { + var builder ssaBuilder + value, err := builder.process(value) + if err != nil { + return nil, fmt.Errorf("error converting plan to SSA: %w", err) + } + + // Add the final Return instruction based on the last value. + builder.instructions = append(builder.instructions, &Return{Value: value}) + + return &Plan{Instructions: builder.instructions}, nil +} + +// ssaBuilder is a helper type for building SSA forms +type ssaBuilder struct { + instructions []Instruction + nextID int +} + +func (b *ssaBuilder) getID() int { + b.nextID++ + return b.nextID +} + +// processPlan processes a logical plan and returns the resulting Value. +func (b *ssaBuilder) process(value Value) (Value, error) { + switch value := value.(type) { + case *MakeTable: + return b.processMakeTablePlan(value) + case *Select: + return b.processSelectPlan(value) + case *Limit: + return b.processLimitPlan(value) + case *Sort: + return b.processSortPlan(value) + + case *UnaryOp: + return b.processUnaryOp(value) + case *BinOp: + return b.processBinOp(value) + case *ColumnRef: + return b.processColumnRef(value) + case *Literal: + return b.processLiteral(value) + + default: + return nil, fmt.Errorf("unsupported value type %T", value) + } +} + +func (b *ssaBuilder) processMakeTablePlan(plan *MakeTable) (Value, error) { + if _, err := b.process(plan.Selector); err != nil { + return nil, err + } + + plan.id = fmt.Sprintf("%%%d", b.getID()) + b.instructions = append(b.instructions, plan) + return plan, nil +} + +func (b *ssaBuilder) processSelectPlan(plan *Select) (Value, error) { + // Process the child plan first + if _, err := b.process(plan.Table); err != nil { + return nil, err + } else if _, err := b.process(plan.Predicate); err != nil { + return nil, err + } + + // Create a node for the select + plan.id = fmt.Sprintf("%%%d", b.getID()) + b.instructions = append(b.instructions, plan) + return plan, nil +} + +func (b *ssaBuilder) processLimitPlan(plan *Limit) (Value, error) { + if _, err := b.process(plan.Table); err != nil { + return nil, err + } + + plan.id = fmt.Sprintf("%%%d", b.getID()) + b.instructions = append(b.instructions, plan) + return plan, nil +} + +func (b *ssaBuilder) processSortPlan(plan *Sort) (Value, error) { + if _, err := b.process(plan.Table); err != nil { + return nil, err + } + + plan.id = fmt.Sprintf("%%%d", b.getID()) + b.instructions = append(b.instructions, plan) + return plan, nil +} + +func (b *ssaBuilder) processUnaryOp(value *UnaryOp) (Value, error) { + if _, err := b.process(value.Value); err != nil { + return nil, err + } + + // Create a node for the unary operation + value.id = fmt.Sprintf("%%%d", b.getID()) + b.instructions = append(b.instructions, value) + return value, nil +} + +func (b *ssaBuilder) processBinOp(expr *BinOp) (Value, error) { + if _, err := b.process(expr.Left); err != nil { + return nil, err + } else if _, err := b.process(expr.Right); err != nil { + return nil, err + } + + expr.id = fmt.Sprintf("%%%d", b.getID()) + b.instructions = append(b.instructions, expr) + return expr, nil +} + +func (b *ssaBuilder) processColumnRef(value *ColumnRef) (Value, error) { + // Nothing to do. + return value, nil +} + +func (b *ssaBuilder) processLiteral(expr *Literal) (Value, error) { + // Nothing to do. + return expr, nil +} diff --git a/pkg/engine/planner/logical/column.go b/pkg/engine/planner/logical/column.go deleted file mode 100644 index 30b04564eb..0000000000 --- a/pkg/engine/planner/logical/column.go +++ /dev/null @@ -1,32 +0,0 @@ -package logical - -import ( - "fmt" - - "github.com/grafana/loki/v3/pkg/engine/planner/schema" -) - -// ColumnExpr represents a reference to a column in the input data -type ColumnExpr struct { - // name is the identifier of the referenced column - name string -} - -// Col creates a column reference expression -func Col(name string) Expr { - return NewColumnExpr(ColumnExpr{name: name}) -} - -// ToField looks up and returns the schema for the referenced column -func (c ColumnExpr) ToField(p Plan) schema.ColumnSchema { - for _, col := range p.Schema().Columns { - if col.Name == c.name { - return col - } - } - panic(fmt.Sprintf("column %s not found", c.name)) -} - -func (c ColumnExpr) ColumnName() string { - return c.name -} diff --git a/pkg/engine/planner/logical/column_ref.go b/pkg/engine/planner/logical/column_ref.go new file mode 100644 index 0000000000..a21103395f --- /dev/null +++ b/pkg/engine/planner/logical/column_ref.go @@ -0,0 +1,65 @@ +package logical + +import ( + "fmt" + + "github.com/grafana/loki/v3/pkg/engine/planner/schema" +) + +// ColumnType denotes the column type for a [ColumnRef]. +type ColumnType int + +// Recognized values of [ColumnType]. +const ( + // ColumnTypeInvalid indicates an invalid column type. + ColumnTypeInvalid ColumnType = iota + + ColumnTypeBuiltin // ColumnTypeBuiltin represents a builtin column (such as timestamp). + ColumnTypeLabel // ColumnTypeLabel represents a column from a stream label. + ColumnTypeMetadata // ColumnTypeMetadata represents a column from a log metadata. +) + +// String returns a human-readable representation of the column type. +func (ct ColumnType) String() string { + switch ct { + case ColumnTypeInvalid: + return "invalid" + case ColumnTypeBuiltin: + return "builtin" + case ColumnTypeLabel: + return "label" + case ColumnTypeMetadata: + return "metadata" + default: + return fmt.Sprintf("ColumnType(%d)", ct) + } +} + +// A ColumnRef referenes a column within a table relation. ColumnRef only +// implements [Value]. +type ColumnRef struct { + Column string // Name of the column being referenced. + Type ColumnType // Type of the column being referenced. +} + +var ( + _ Value = (*ColumnRef)(nil) +) + +// Name returns the identifier of the ColumnRef, which combines the column type +// and column name being referenced. +func (c *ColumnRef) Name() string { + return fmt.Sprintf("%s.%s", c.Type, c.Column) +} + +// String returns [ColumnRef.Name]. +func (c *ColumnRef) String() string { return c.Name() } + +// Schema returns the schema of the column being referenced. +func (c *ColumnRef) Schema() *schema.Schema { + // TODO(rfratto): Update *schema.Schema to allow representing a single + // column. + return nil +} + +func (c *ColumnRef) isValue() {} diff --git a/pkg/engine/planner/logical/dataframe.go b/pkg/engine/planner/logical/dataframe.go deleted file mode 100644 index 953724e751..0000000000 --- a/pkg/engine/planner/logical/dataframe.go +++ /dev/null @@ -1,126 +0,0 @@ -package logical - -import ( - "github.com/grafana/loki/v3/pkg/engine/planner/schema" -) - -// DataFrame provides an ergonomic interface for building logical query plans. -// It wraps a logical Plan and provides fluent methods for common operations -// like projection, filtering, and aggregation. This makes it easier to build -// complex query plans in a readable and maintainable way. -type DataFrame struct { - plan Plan -} - -// NewDataFrame creates a new DataFrame from a logical plan. -// This is typically used to wrap a table scan plan as the starting point -// for building a more complex query. -func NewDataFrame(plan Plan) *DataFrame { - return &DataFrame{plan: plan} -} - -// Project applies a projection to the DataFrame. -// It creates a new DataFrame with a projection plan that selects or computes -// the specified expressions from the input DataFrame. -// This corresponds to the SELECT clause in SQL. -func (df *DataFrame) Project(exprs []Expr) *DataFrame { - return &DataFrame{ - plan: NewProjection(df.plan, exprs), - } -} - -// Filter applies a filter to the DataFrame. -// It creates a new DataFrame with a filter plan that selects rows from the -// input DataFrame based on the specified boolean expression. -// This corresponds to the WHERE clause in SQL. -func (df *DataFrame) Filter(expr Expr) *DataFrame { - return &DataFrame{ - plan: NewFilter(df.plan, expr), - } -} - -// Aggregate applies grouping and aggregation to the DataFrame. -// It creates a new DataFrame with an aggregate plan that groups rows by the -// specified expressions and computes the specified aggregate expressions. -// This corresponds to the GROUP BY clause in SQL. -func (df *DataFrame) Aggregate(groupBy []Expr, aggExprs []AggregateExpr) *DataFrame { - return &DataFrame{ - plan: NewAggregate(df.plan, groupBy, aggExprs), - } -} - -// Limit applies a row limit to the DataFrame. -// It creates a new DataFrame with a limit plan that restricts the number of rows -// returned, optionally with an offset to skip initial rows. -// This corresponds to the LIMIT and OFFSET clauses in SQL. -// -// Parameters: -// - skip: Number of rows to skip before returning results (OFFSET in SQL). -// Use 0 to start from the first row. -// - fetch: Maximum number of rows to return after skipping (LIMIT in SQL). -// Use 0 to return all remaining rows. -// -// Example usage: -// -// // Return the first 10 rows -// df = df.Limit(0, 10) -// -// // Skip the first 20 rows and return the next 10 -// df = df.Limit(20, 10) -// -// // Skip the first 100 rows and return all remaining rows -// df = df.Limit(100, 0) -// -// The Limit operation is typically applied as the final step in a query, -// after filtering, projection, and aggregation. -func (df *DataFrame) Limit(skip uint64, fetch uint64) *DataFrame { - return &DataFrame{ - plan: NewLimit(df.plan, skip, fetch), - } -} - -// Schema returns the schema of the data that will be produced by this DataFrame. -// This is useful for understanding the structure of the data that will result -// from executing the query plan. -func (df *DataFrame) Schema() schema.Schema { - return df.plan.Schema() -} - -// LogicalPlan returns the underlying logical plan. -// This is useful when you need to access the plan directly, such as when -// passing it to a function that operates on plans rather than DataFrames. -func (df *DataFrame) LogicalPlan() Plan { - return df.plan -} - -// ToSSA converts the DataFrame to SSA form. -// This is useful for optimizing and executing the query plan, as the SSA form -// is easier to analyze and transform than the tree-based logical plan. -func (df *DataFrame) ToSSA() (*SSAForm, error) { - return ConvertToSSA(df.plan) -} - -// Sort applies a sort operation to the DataFrame. -// It creates a new DataFrame with a sort plan that orders rows from the -// input DataFrame based on the specified sort expression. -// This corresponds to the ORDER BY clause in SQL. -// -// Parameters: -// - expr: The sort expression specifying the column to sort by, sort direction, -// and NULL handling. -// -// Example usage: -// -// // Sort by age in ascending order, NULLs last -// df = df.Sort(NewSortExpr("sort_by_age", Col("age"), true, false)) -// -// // Sort by name in descending order, NULLs first -// df = df.Sort(NewSortExpr("sort_by_name", Col("name"), false, true)) -// -// The Sort operation is typically applied after filtering and projection, -// but before limiting the results. -func (df *DataFrame) Sort(expr SortExpr) *DataFrame { - return &DataFrame{ - plan: NewSort(df.plan, expr), - } -} diff --git a/pkg/engine/planner/logical/expr.go b/pkg/engine/planner/logical/expr.go deleted file mode 100644 index 18d8e7463d..0000000000 --- a/pkg/engine/planner/logical/expr.go +++ /dev/null @@ -1,129 +0,0 @@ -package logical - -import ( - "fmt" - - "github.com/grafana/loki/v3/pkg/engine/planner/schema" -) - -// ExprType is an enum representing the type of expression. -// It allows consumers to determine the concrete type of an Expr -// and safely cast to the appropriate interface. -type ExprType int - -const ( - ExprTypeInvalid ExprType = iota - ExprTypeColumn // Represents a reference to a column in the input - ExprTypeLiteral // Represents a literal value - ExprTypeBinaryOp // Represents a binary operation (e.g., a + b) - ExprTypeAggregate // Represents an aggregate function (e.g., SUM(a)) - ExprTypeSort // Represents a sort expression -) - -func (t ExprType) String() string { - switch t { - case ExprTypeColumn: - return "Column" - case ExprTypeLiteral: - return "Literal" - case ExprTypeBinaryOp: - return "BinaryOp" - case ExprTypeAggregate: - return "Aggregate" - case ExprTypeSort: - return "Sort" - default: - return "Unknown" - } -} - -type Expr struct { - ty ExprType - val any -} - -func (e Expr) Type() ExprType { - return e.ty -} - -func (e Expr) ToField(p Plan) schema.ColumnSchema { - switch e.ty { - case ExprTypeColumn: - return e.val.(*ColumnExpr).ToField(p) - case ExprTypeLiteral: - return e.val.(*LiteralExpr).ToField(p) - case ExprTypeBinaryOp: - return e.val.(*BinOpExpr).ToField(p) - case ExprTypeAggregate: - return e.val.(*AggregateExpr).ToField(p) - default: - panic(fmt.Sprintf("unsupported expression type: %d", e.ty)) - } -} - -// shortcut: must be checked elsewhere -func (e Expr) Column() *ColumnExpr { - if e.ty != ExprTypeColumn { - panic(fmt.Sprintf("expression is not a column: %d", e.ty)) - } - return e.val.(*ColumnExpr) -} - -func NewColumnExpr(expr ColumnExpr) Expr { - return Expr{ - ty: ExprTypeColumn, - val: &expr, - } -} - -func NewLiteralExpr(expr LiteralExpr) Expr { - return Expr{ - ty: ExprTypeLiteral, - val: &expr, - } -} - -func NewBinOpExpr(expr BinOpExpr) Expr { - return Expr{ - ty: ExprTypeBinaryOp, - val: &expr, - } -} - -func NewAggregateExpr(expr AggregateExpr) Expr { - return Expr{ - ty: ExprTypeAggregate, - val: &expr, - } -} - -// shortcut: must be checked elsewhere -func (e Expr) Literal() *LiteralExpr { - if e.ty != ExprTypeLiteral { - panic(fmt.Sprintf("expression is not a literal: %d", e.ty)) - } - return e.val.(*LiteralExpr) -} - -// shortcut: must be checked elsewhere -func (e Expr) BinaryOp() *BinOpExpr { - if e.ty != ExprTypeBinaryOp { - panic(fmt.Sprintf("expression is not a binary operation: %d", e.ty)) - } - return e.val.(*BinOpExpr) -} - -// shortcut: must be checked elsewhere -func (e Expr) Aggregate() *AggregateExpr { - if e.ty != ExprTypeAggregate { - panic(fmt.Sprintf("expression is not an aggregate: %d", e.ty)) - } - return e.val.(*AggregateExpr) -} - -func (e Expr) Sort() *SortExpr { - if e.ty != ExprTypeSort { - panic(fmt.Sprintf("expression is not a sort: %d", e.ty)) - } - return e.val.(*SortExpr) -} diff --git a/pkg/engine/planner/logical/filter.go b/pkg/engine/planner/logical/filter.go deleted file mode 100644 index b212fc825a..0000000000 --- a/pkg/engine/planner/logical/filter.go +++ /dev/null @@ -1,51 +0,0 @@ -package logical - -import ( - "github.com/grafana/loki/v3/pkg/engine/planner/schema" -) - -// Filter represents a plan node that filters rows based on a boolean expression. -// It corresponds to the WHERE clause in SQL and is used to select a subset of rows -// from the input plan based on a predicate expression. -type Filter struct { - // input is the child plan node providing data to filter - input Plan - // expr is the boolean expression used to filter rows - expr Expr -} - -// newFilter creates a new Filter plan node. -// It takes an input plan and a boolean expression that determines which rows -// should be selected (included) in its output. This is represented by the WHERE -// clause in SQL. A simple example would be SELECT * FROM foo WHERE a > 5. -// The filter expression needs to evaluate to a Boolean result. -func newFilter(input Plan, expr Expr) *Filter { - return &Filter{ - input: input, - expr: expr, - } -} - -// Schema returns the schema of the filter plan. -// The schema of a filter is the same as the schema of its input, -// as filtering only removes rows and doesn't modify the structure. -func (f *Filter) Schema() schema.Schema { - return f.input.Schema() -} - -// Child returns the input plan. -// This is a convenience method for accessing the child plan. -func (f *Filter) Child() Plan { - return f.input -} - -// FilterExpr returns the filter expression. -// This is the boolean expression used to determine which rows to include. -func (f *Filter) FilterExpr() Expr { - return f.expr -} - -// Type implements the Plan interface -func (f *Filter) Type() PlanType { - return PlanTypeFilter -} diff --git a/pkg/engine/planner/logical/format_tree.go b/pkg/engine/planner/logical/format_tree.go index 120122faec..658a0ffddf 100644 --- a/pkg/engine/planner/logical/format_tree.go +++ b/pkg/engine/planner/logical/format_tree.go @@ -2,176 +2,109 @@ package logical import ( "fmt" - "strings" + "io" "github.com/grafana/loki/v3/pkg/engine/planner/internal/tree" ) -// TreeFormatter formats a logical plan as a tree structure. -type TreeFormatter struct{} +// PrintTree prints the given value and its dependencies as a tree structure to +// w. +func PrintTree(w io.StringWriter, value Value) { + p := tree.NewPrinter(w) -// Format formats a logical plan as a tree structure, similar to the Unix 'tree' command. -// It takes a [Plan] as input and returns a string representation of the plan tree. -func (t *TreeFormatter) Format(ast Plan) string { - var sb strings.Builder - p := tree.NewPrinter(&sb) - p.Print(t.convert(ast)) - return sb.String() + var t treeFormatter + p.Print(t.convert(value)) } -// convert dispatches to the appropriate method based on the plan type and -// returns the newly created [tree.Node]. -func (t *TreeFormatter) convert(ast Plan) *tree.Node { - switch ast.Type() { - case PlanTypeTable: - return t.convertMakeTable(ast.Table()) - case PlanTypeFilter: - return t.convertFilter(ast.Filter()) - case PlanTypeProjection: - return t.convertProjection(ast.Projection()) - case PlanTypeAggregate: - return t.convertAggregation(ast.Aggregate()) - case PlanTypeLimit: - return t.convertLimit(ast.Limit()) - case PlanTypeSort: - return t.convertSort(ast.Sort()) +type treeFormatter struct{} + +func (t *treeFormatter) convert(value Value) *tree.Node { + switch value := value.(type) { + case *MakeTable: + return t.convertMakeTable(value) + case *Select: + return t.convertSelect(value) + case *Limit: + return t.convertLimit(value) + case *Sort: + return t.convertSort(value) + + case *UnaryOp: + return t.convertUnaryOp(value) + case *BinOp: + return t.convertBinOp(value) + case *ColumnRef: + return t.convertColumnRef(value) + case *Literal: + return t.convertLiteral(value) + default: - panic(fmt.Sprintf("unknown plan type: %v", ast.Type())) + panic(fmt.Sprintf("unknown value type %T", value)) } } -func (t *TreeFormatter) convertMakeTable(ast *MakeTable) *tree.Node { - return tree.NewNode("MakeTable", "", tree.Property{Key: "name", Values: []any{ast.TableName()}}) -} - -func (t *TreeFormatter) convertFilter(ast *Filter) *tree.Node { - node := tree.NewNode("Filter", "", tree.NewProperty("expr", false, ast.FilterExpr().ToField(ast.Child()).Name)) - node.Comments = append(node.Comments, t.convertExpr(ast.FilterExpr())) - node.Children = append(node.Children, t.convert(ast.Child())) +func (t *treeFormatter) convertMakeTable(ast *MakeTable) *tree.Node { + node := tree.NewNode("MakeTable", "") + node.Comments = append(node.Children, t.convert(ast.Selector)) return node } -func (t *TreeFormatter) convertProjection(ast *Projection) *tree.Node { - node := tree.NewNode("Projection", "") - for _, expr := range ast.ProjectExprs() { - field := expr.ToField(ast.Child()) - node.Properties = append(node.Properties, tree.NewProperty(field.Name, false, field.Type.String())) - node.Comments = append(node.Comments, t.convertExpr(expr)) - } - node.Children = append(node.Children, t.convert(ast.Child())) +func (t *treeFormatter) convertSelect(ast *Select) *tree.Node { + node := tree.NewNode("Select", "") + node.Comments = append(node.Comments, t.convert(ast.Predicate)) + node.Children = append(node.Children, t.convert(ast.Table)) return node } -func (t *TreeFormatter) convertAggregation(ast *Aggregate) *tree.Node { - // Collect grouping names - var groupNames []string - for _, expr := range ast.GroupExprs() { - groupNames = append(groupNames, expr.ToField(ast.Child()).Name) - } - - // Collect aggregate names - var aggNames []string - for _, expr := range ast.AggregateExprs() { - aggNames = append(aggNames, expr.ToField(ast.Child()).Name) - } - - node := tree.NewNode("Aggregate", "", - tree.NewProperty("groupings", true, groupNames), - tree.NewProperty("aggregates", true, aggNames), - ) - - // Format grouping expressions - groupNode := tree.NewNode("GroupExpr", "") - for _, expr := range ast.GroupExprs() { - groupNode.Children = append(groupNode.Children, t.convertExpr(expr)) - } - node.Comments = append(node.Comments, groupNode) - - // Format aggregate expressions - aggNode := tree.NewNode("AggregateExpr", "") - for _, expr := range ast.AggregateExprs() { - aggNode.Children = append(aggNode.Children, t.convertAggregateExpr(&expr)) - } - node.Comments = append(node.Comments, aggNode) - - node.Children = append(node.Children, t.convert(ast.Child())) - return node -} - -func (t *TreeFormatter) convertLimit(ast *Limit) *tree.Node { +func (t *treeFormatter) convertLimit(ast *Limit) *tree.Node { node := tree.NewNode("Limit", "", - tree.NewProperty("offset", false, ast.Skip()), - tree.NewProperty("fetch", false, ast.Fetch()), + tree.NewProperty("offset", false, ast.Skip), + tree.NewProperty("fetch", false, ast.Fetch), ) - node.Children = append(node.Children, t.convert(ast.Child())) + node.Children = append(node.Children, t.convert(ast.Table)) return node } -func (t *TreeFormatter) convertSort(ast *Sort) *tree.Node { +func (t *treeFormatter) convertSort(ast *Sort) *tree.Node { direction := "asc" - if !ast.Expr().Asc() { + if !ast.Ascending { direction = "desc" } nullsPosition := "last" - if ast.Expr().NullsFirst() { + if ast.NullsFirst { nullsPosition = "first" } node := tree.NewNode("Sort", "", - tree.NewProperty("expr", false, ast.Expr().Name()), tree.NewProperty("direction", false, direction), tree.NewProperty("nulls", false, nullsPosition), ) - node.Comments = append(node.Comments, t.convertExpr(ast.Expr().Expr())) - node.Children = append(node.Children, t.convert(ast.Child())) + node.Comments = append(node.Comments, t.convert(&ast.Column)) + node.Children = append(node.Children, t.convert(ast.Table)) return node } -// convert dispatches to the appropriate method based on the expression type and -// returns the newly created [tree.Node], which can be used as comment for the -// parent node. -func (t *TreeFormatter) convertExpr(expr Expr) *tree.Node { - switch expr.Type() { - case ExprTypeColumn: - return t.convertColumnExpr(expr.Column()) - case ExprTypeLiteral: - return t.convertLiteralExpr(expr.Literal()) - case ExprTypeBinaryOp: - return t.convertBinaryOpExpr(expr.BinaryOp()) - case ExprTypeAggregate: - return t.convertAggregateExpr(expr.Aggregate()) - default: - panic(fmt.Sprintf("unknown expr type: (named: %v, type: %T)", expr.Type(), expr)) - } -} - -func (t *TreeFormatter) convertColumnExpr(expr *ColumnExpr) *tree.Node { - return tree.NewNode("Column", expr.ColumnName()) +func (t *treeFormatter) convertUnaryOp(expr *UnaryOp) *tree.Node { + node := tree.NewNode("UnaryOp", "", tree.NewProperty("op", false, expr.Op.String())) + node.Children = append(node.Children, t.convert(expr.Value)) + return node } -func (t *TreeFormatter) convertLiteralExpr(expr *LiteralExpr) *tree.Node { - return tree.NewNode("Literal", "", - tree.NewProperty("value", false, expr.ValueString()), - tree.NewProperty("type", false, expr.ValueType()), - ) +func (t *treeFormatter) convertBinOp(expr *BinOp) *tree.Node { + node := tree.NewNode("BinOp", "", tree.NewProperty("op", false, expr.Op.String())) + node.Children = append(node.Children, t.convert(expr.Left)) + node.Children = append(node.Children, t.convert(expr.Right)) + return node } -func (t *TreeFormatter) convertBinaryOpExpr(expr *BinOpExpr) *tree.Node { - node := tree.NewNode(ExprTypeBinaryOp.String(), "", - tree.NewProperty("type", false, expr.Type().String()), - tree.NewProperty("op", false, fmt.Sprintf(`"%s"`, expr.Op())), - tree.NewProperty("name", false, expr.Name()), - ) - node.Children = append(node.Children, t.convertExpr(expr.Left())) - node.Children = append(node.Children, t.convertExpr(expr.Right())) - return node +func (t *treeFormatter) convertColumnRef(expr *ColumnRef) *tree.Node { + return tree.NewNode("ColumnRef", expr.Name()) } -func (t *TreeFormatter) convertAggregateExpr(expr *AggregateExpr) *tree.Node { - node := tree.NewNode(ExprTypeAggregate.String(), "", - tree.NewProperty("op", false, expr.Op()), +func (t *treeFormatter) convertLiteral(expr *Literal) *tree.Node { + return tree.NewNode("Literal", "", + tree.NewProperty("value", false, expr.String()), + tree.NewProperty("kind", false, expr.Kind()), ) - node.Children = append(node.Children, t.convertExpr(expr.SubExpr())) - return node } diff --git a/pkg/engine/planner/logical/format_tree_test.go b/pkg/engine/planner/logical/format_tree_test.go index 6b721c6fe7..2be9339778 100644 --- a/pkg/engine/planner/logical/format_tree_test.go +++ b/pkg/engine/planner/logical/format_tree_test.go @@ -1,6 +1,7 @@ package logical import ( + "strings" "testing" "github.com/stretchr/testify/require" @@ -18,201 +19,80 @@ func (t *testDataSource) Name() string { return t.name } func TestFormatSimpleQuery(t *testing.T) { // Build a simple query plan: - // SELECT id, name FROM users WHERE age > 21 - ds := &testDataSource{ - name: "users", - schema: schema.Schema{ - Columns: []schema.ColumnSchema{ - {Name: "id", Type: schema.ValueTypeUint64}, - {Name: "name", Type: schema.ValueTypeString}, - {Name: "age", Type: schema.ValueTypeUint64}, + // { app="users" } | age > 21 + b := NewBuilder( + &MakeTable{ + Selector: &BinOp{ + Left: &ColumnRef{Column: "app", Type: ColumnTypeLabel}, + Right: LiteralString("users"), + Op: BinOpKindEq, }, }, - } - - scan := NewScan(ds.Name(), ds.Schema()) - filter := NewFilter(scan, Gt("age_gt_21", Col("age"), LitI64(21))) - proj := NewProjection(filter, []Expr{Col("id"), Col("name")}) - - var f TreeFormatter - - actual := "\n" + f.Format(proj) - t.Logf("Actual output:\n%s", actual) - - expected := ` -Projection id=VALUE_TYPE_UINT64 name=VALUE_TYPE_STRING -│ ├── Column #id -│ └── Column #name -└── Filter expr=age_gt_21 - │ └── BinaryOp type=cmp op=">" name=age_gt_21 - │ ├── Column #age - │ └── Literal value=21 type=VALUE_TYPE_INT64 - └── MakeTable name=users -` - - require.Equal(t, expected, actual) -} - -func TestFormatDataFrameQuery(t *testing.T) { - // Calculate the sum of sales per region for the year 2020 - ds := &testDataSource{ - name: "orders", - schema: schema.Schema{ - Columns: []schema.ColumnSchema{ - {Name: "region", Type: schema.ValueTypeString}, - {Name: "sales", Type: schema.ValueTypeUint64}, - {Name: "year", Type: schema.ValueTypeUint64}, - }, + ).Select( + &BinOp{ + Left: &ColumnRef{Column: "age", Type: ColumnTypeMetadata}, + Right: LiteralInt64(21), + Op: BinOpKindGt, }, - } - - df := NewDataFrame( - NewScan(ds.Name(), ds.Schema()), - ).Filter( - Eq("year_2020", Col("year"), LitI64(2020)), - ).Project( - []Expr{ - Col("region"), - Col("sales"), - Col("year"), - }, - ).Aggregate( - []Expr{Col("region")}, - []AggregateExpr{ - Sum("total_sales", Col("sales")), - }, - ).Limit( - 0, - 10, ) - var f TreeFormatter + var sb strings.Builder + PrintTree(&sb, b.Value()) - actual := "\n" + f.Format(df.LogicalPlan()) + actual := "\n" + sb.String() t.Logf("Actual output:\n%s", actual) expected := ` -Limit offset=0 fetch=10 -└── Aggregate groupings=([region]) aggregates=([total_sales]) - │ ├── GroupExpr - │ │ └── Column #region - │ └── AggregateExpr - │ └── Aggregate op=sum - │ └── Column #sales - └── Projection region=VALUE_TYPE_STRING sales=VALUE_TYPE_UINT64 year=VALUE_TYPE_UINT64 - │ ├── Column #region - │ ├── Column #sales - │ └── Column #year - └── Filter expr=year_2020 - │ └── BinaryOp type=cmp op="==" name=year_2020 - │ ├── Column #year - │ └── Literal value=2020 type=VALUE_TYPE_INT64 - └── MakeTable name=orders +Select +│ └── BinOp op=GT +│ ├── ColumnRef #metadata.age +│ └── Literal value=21 kind=int64 +└── MakeTable + └── BinOp op=EQ + ├── ColumnRef #label.app + └── Literal value="users" kind=string ` - require.Equal(t, expected, actual) -} - -func TestFormatSortQuery(t *testing.T) { - // Build a query plan with sorting: - // SELECT id, name, age FROM users WHERE age > 21 ORDER BY age ASC, name DESC - ds := &testDataSource{ - name: "users", - schema: schema.Schema{ - Columns: []schema.ColumnSchema{ - {Name: "id", Type: schema.ValueTypeUint64}, - {Name: "name", Type: schema.ValueTypeString}, - {Name: "age", Type: schema.ValueTypeUint64}, - }, - }, - } - - scan := NewScan(ds.Name(), ds.Schema()) - filter := NewFilter(scan, Gt("age_gt_21", Col("age"), LitI64(21))) - proj := NewProjection(filter, []Expr{Col("id"), Col("name"), Col("age")}) - - // Sort by age ascending, nulls last - sortByAge := NewSort(proj, NewSortExpr("sort_by_age", Col("age"), true, false)) - - var f TreeFormatter - actual := "\n" + f.Format(sortByAge) - t.Logf("Actual output:\n%s", actual) - - expected := ` -Sort expr=sort_by_age direction=asc nulls=last -│ └── Column #age -└── Projection id=VALUE_TYPE_UINT64 name=VALUE_TYPE_STRING age=VALUE_TYPE_UINT64 - │ ├── Column #id - │ ├── Column #name - │ └── Column #age - └── Filter expr=age_gt_21 - │ └── BinaryOp type=cmp op=">" name=age_gt_21 - │ ├── Column #age - │ └── Literal value=21 type=VALUE_TYPE_INT64 - └── MakeTable name=users -` require.Equal(t, expected, actual) } -func TestFormatDataFrameWithSortQuery(t *testing.T) { - // Calculate the sum of sales per region for the year 2020, sorted by total sales descending - ds := &testDataSource{ - name: "orders", - schema: schema.Schema{ - Columns: []schema.ColumnSchema{ - {Name: "region", Type: schema.ValueTypeString}, - {Name: "sales", Type: schema.ValueTypeUint64}, - {Name: "year", Type: schema.ValueTypeUint64}, +func TestFormatSortQuery(t *testing.T) { + // Build a query plan for this query sorted by `age` in ascending order: + // + // { app="users" } | age > 21 + b := NewBuilder( + &MakeTable{ + Selector: &BinOp{ + Left: &ColumnRef{Column: "app", Type: ColumnTypeLabel}, + Right: LiteralString("users"), + Op: BinOpKindEq, }, }, - } - - df := NewDataFrame( - NewScan(ds.Name(), ds.Schema()), - ).Filter( - Eq("year_2020", Col("year"), LitI64(2020)), - ).Project( - []Expr{ - Col("region"), - Col("sales"), - Col("year"), - }, - ).Aggregate( - []Expr{Col("region")}, - []AggregateExpr{ - Sum("total_sales", Col("sales")), + ).Select( + &BinOp{ + Left: &ColumnRef{Column: "age", Type: ColumnTypeMetadata}, + Right: LiteralInt64(21), + Op: BinOpKindGt, }, - ).Sort( - NewSortExpr("sort_by_sales", Col("total_sales"), false, true), // Sort by total_sales descending, nulls first - ).Limit( - 0, - 10, - ) + ).Sort(ColumnRef{Column: "age", Type: ColumnTypeMetadata}, true, false) - var f TreeFormatter + var sb strings.Builder + PrintTree(&sb, b.Value()) - actual := "\n" + f.Format(df.LogicalPlan()) + actual := "\n" + sb.String() t.Logf("Actual output:\n%s", actual) expected := ` -Limit offset=0 fetch=10 -└── Sort expr=sort_by_sales direction=desc nulls=first - │ └── Column #total_sales - └── Aggregate groupings=([region]) aggregates=([total_sales]) - │ ├── GroupExpr - │ │ └── Column #region - │ └── AggregateExpr - │ └── Aggregate op=sum - │ └── Column #sales - └── Projection region=VALUE_TYPE_STRING sales=VALUE_TYPE_UINT64 year=VALUE_TYPE_UINT64 - │ ├── Column #region - │ ├── Column #sales - │ └── Column #year - └── Filter expr=year_2020 - │ └── BinaryOp type=cmp op="==" name=year_2020 - │ ├── Column #year - │ └── Literal value=2020 type=VALUE_TYPE_INT64 - └── MakeTable name=orders +Sort direction=asc nulls=last +│ └── ColumnRef #metadata.age +└── Select + │ └── BinOp op=GT + │ ├── ColumnRef #metadata.age + │ └── Literal value=21 kind=int64 + └── MakeTable + └── BinOp op=EQ + ├── ColumnRef #label.app + └── Literal value="users" kind=string ` require.Equal(t, expected, actual) } diff --git a/pkg/engine/planner/logical/limit.go b/pkg/engine/planner/logical/limit.go deleted file mode 100644 index 977b6979de..0000000000 --- a/pkg/engine/planner/logical/limit.go +++ /dev/null @@ -1,85 +0,0 @@ -package logical - -import ( - "github.com/grafana/loki/v3/pkg/engine/planner/schema" -) - -// Limit represents a plan node that limits the number of rows returned. -// It corresponds to the LIMIT clause in SQL and is used to restrict the -// number of rows returned by a query, optionally with an offset. -// -// The Limit plan is typically the final operation in a query plan, applied -// after filtering, projection, and aggregation. It's useful for pagination -// and for reducing the amount of data returned to the client. -type Limit struct { - // input is the child plan node providing data to limit - input Plan - // skip is the number of rows to skip before returning results (OFFSET) - // A value of 0 means no rows are skipped - skip uint64 - // fetch is the maximum number of rows to return (LIMIT) - // A value of 0 means all rows are returned (after applying skip) - fetch uint64 -} - -// Special values for skip and fetch -const ( - // NoSkip indicates that no rows should be skipped (OFFSET 0) - NoSkip uint64 = 0 - // NoLimit indicates that all rows should be returned (no LIMIT clause) - NoLimit uint64 = 0 -) - -// newLimit creates a new Limit plan node. -// The Limit logical plan restricts the number of rows returned by a query. -// It takes an input plan, a skip value (for OFFSET), and a fetch value (for LIMIT). -// If skip is 0, no rows are skipped. If fetch is 0, all rows are returned after applying skip. -// -// Example usage: -// -// // Return the first 10 rows -// limit := newLimit(inputPlan, 0, 10) -// -// // Skip the first 20 rows and return the next 10 -// limit := newLimit(inputPlan, 20, 10) -// -// // Skip the first 100 rows and return all remaining rows -// limit := newLimit(inputPlan, 100, 0) -func newLimit(input Plan, skip uint64, fetch uint64) *Limit { - return &Limit{ - input: input, - skip: skip, - fetch: fetch, - } -} - -// Schema returns the schema of the limit operation. -// The schema is the same as the input plan's schema since limiting -// only affects the number of rows, not their structure. -func (l *Limit) Schema() schema.Schema { - return l.input.Schema() -} - -// Type returns the plan type for this node. -func (l *Limit) Type() PlanType { - return PlanTypeLimit -} - -// Child returns the input plan. -func (l *Limit) Child() Plan { - return l.input -} - -// Skip returns the number of rows to skip. -// This is used for implementing the OFFSET clause in SQL. -// A value of 0 means no rows are skipped. -func (l *Limit) Skip() uint64 { - return l.skip -} - -// Fetch returns the maximum number of rows to return. -// This is used for implementing the LIMIT clause in SQL. -// A value of 0 means all rows are returned (after applying skip). -func (l *Limit) Fetch() uint64 { - return l.fetch -} diff --git a/pkg/engine/planner/logical/literal.go b/pkg/engine/planner/logical/literal.go deleted file mode 100644 index 66ea196369..0000000000 --- a/pkg/engine/planner/logical/literal.go +++ /dev/null @@ -1,99 +0,0 @@ -package logical - -import ( - "fmt" - - "github.com/grafana/loki/v3/pkg/engine/planner/schema" -) - -// LiteralType is an enum representing the type of a literal value. -// It allows for type-safe handling of different kinds of literal values. -type LiteralType int - -// LiteralType constants define the supported types of literal values. -// These loosely match the datasetmd.ValueType enum; -// consider using that directly in the future. -const ( - LiteralTypeInvalid LiteralType = iota // Invalid or uninitialized literal - LiteralTypeString // String literal - LiteralTypeInt64 // 64-bit integer literal -) - -// String returns a human-readable representation of the literal type. -func (t LiteralType) String() string { - switch t { - case LiteralTypeString: - return "string" - case LiteralTypeInt64: - return "int64" - default: - return "unknown" - } -} - -// LiteralExpr represents a literal value in the query plan. -// It can hold different types of values, such as strings or integers. -type LiteralExpr struct { - ty LiteralType // The type of the literal value - val any // The actual literal value -} - -// ValueString returns a string representation of the literal value. -// This is used for display and debugging purposes. -func (l LiteralExpr) ValueString() string { - switch l.ty { - case LiteralTypeString: - return l.val.(string) - } - return fmt.Sprintf("%v", l.val) -} - -// ToField converts the literal to a column schema. -// The name of the column is derived from the string representation of the value. -func (l LiteralExpr) ToField(_ Plan) schema.ColumnSchema { - switch l.ty { - case LiteralTypeString: - return schema.ColumnSchema{ - Name: l.val.(string), - Type: l.ValueType(), - } - case LiteralTypeInt64: - return schema.ColumnSchema{ - Name: fmt.Sprint(l.val.(int64)), - Type: l.ValueType(), - } - default: - panic(fmt.Sprintf("unsupported literal type: %d", l.ty)) - } -} - -// ValueType returns the schema.ValueType corresponding to this literal type. -// This is used to determine the type of the column in the output schema. -func (l LiteralExpr) ValueType() schema.ValueType { - switch l.ty { - case LiteralTypeString: - return schema.ValueTypeString - case LiteralTypeInt64: - return schema.ValueTypeInt64 - default: - panic(fmt.Sprintf("unsupported literal type: %d", l.ty)) - } -} - -// LitStr creates a string literal expression with the given value. -// Example: LitStr("hello") creates a string literal with value "hello". -func LitStr(v string) Expr { - return NewLiteralExpr(LiteralExpr{ - ty: LiteralTypeString, - val: v, - }) -} - -// LitI64 creates a 64-bit integer literal expression with the given value. -// Example: LitI64(42) creates an integer literal with value 42. -func LitI64(v int64) Expr { - return NewLiteralExpr(LiteralExpr{ - ty: LiteralTypeInt64, - val: v, - }) -} diff --git a/pkg/engine/planner/logical/logical.go b/pkg/engine/planner/logical/logical.go new file mode 100644 index 0000000000..7563d48ea7 --- /dev/null +++ b/pkg/engine/planner/logical/logical.go @@ -0,0 +1,94 @@ +// Package logical provides a logical query plan representation for data +// processing operations. +// +// The logical plan is represented using static single-assignment (SSA) form of +// intermediate representation (IR) for the operations performed on log data. +// +// For an introduction to SSA form, see +// https://en.wikipedia.org/wiki/Static_single_assignment_form. +// +// The primary interfaces of this package are: +// +// - [Value], an expression that yields a value. +// - [Instruction], a statement that consumes values and performs computation. +// - [Plan], a sequence of instructions that produces a result. +// +// A computation that also yields a result implements both the [Value] and +// [Instruction] interfaces. See the documentation comments on each type for +// which of those interfaces it implements. +// +// Values are representable as either: +// +// - A column value (such as in [ColumnRef]), +// - a relation (such as in [Select]), or +// - a value literal (such as in [Literal]). +// +// The SSA form forms a graph: each [Value] may appear as an operand of one or +// more [Instruction]s. +package logical + +import ( + "fmt" + "strings" + + "github.com/grafana/loki/v3/pkg/engine/planner/schema" +) + +// An Instruction is an SSA instruction that computes a new [Value] or has some +// effect. +// +// Instructions that define a value (e.g., BinOp) also implement the Value +// interface; an Instruction that only has an effect (e.g., Return) does not. +type Instruction interface { + // String returns the disassembled SSA form of the Instruction. This does not + // include the name of the Value if the Instruction also implements [Value]. + String() string + + // isInstruction is a marker method to prevent external implementations. + isInstruction() +} + +// A Value is an SSA value that can be referenced by an [Instruction]. +type Value interface { + // Name returns an identifier for this Value (such as "%1"), which is used + // when this Value appears as an operand of an Instruction. + // + // If the Value was not created by the logical planner, Name instead returns + // the pointer address of the Value. + Name() string + + // String returns human-readable information about the Value. If Value also + // implements [Instruction], String returns the disassembled form of the + // Instruction as documented by [Instruction.String]. + String() string + + // Schema returns the type of this Value. + Schema() *schema.Schema + + // isValue is a marker method to prevent external implementations. + isValue() +} + +// A Plan represents a sequence of [Instruction]s that ultimately produce a +// [Value]. +// +// The first [Return] instruction in the plan denotes the final output. +type Plan struct { + Instructions []Instruction // Instructions of the plan in order. +} + +// String prints out the entire plan SSA. +func (p Plan) String() string { + var sb strings.Builder + + for _, inst := range p.Instructions { + switch inst := inst.(type) { + case Value: + fmt.Fprintf(&sb, "%s = %s\n", inst.Name(), inst.String()) + case Instruction: + fmt.Fprintf(&sb, "%s\n", inst.String()) + } + } + + return sb.String() +} diff --git a/pkg/engine/planner/logical/logical_test.go b/pkg/engine/planner/logical/logical_test.go new file mode 100644 index 0000000000..1fd31f613b --- /dev/null +++ b/pkg/engine/planner/logical/logical_test.go @@ -0,0 +1,59 @@ +package logical + +import ( + "fmt" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestPlan_String(t *testing.T) { + // Build a query plan for this query sorted by `age` in ascending order: + // + // { app="users" } | age > 21 + b := NewBuilder( + &MakeTable{ + Selector: &BinOp{ + Left: &ColumnRef{Column: "app", Type: ColumnTypeLabel}, + Right: LiteralString("users"), + Op: BinOpKindEq, + }, + }, + ).Select( + &BinOp{ + Left: &ColumnRef{Column: "age", Type: ColumnTypeMetadata}, + Right: LiteralInt64(21), + Op: BinOpKindGt, + }, + ).Sort(ColumnRef{Column: "age", Type: ColumnTypeMetadata}, true, false) + + // Convert to SSA + ssaForm, err := b.ToPlan() + require.NoError(t, err) + require.NotNil(t, ssaForm) + + t.Logf("SSA Form:\n%s", ssaForm.String()) + + // Define expected output + exp := ` +%1 = EQ label.app, "users" +%2 = MAKE_TABLE [selector=%1] +%3 = GT metadata.age, 21 +%4 = SELECT %2 [predicate=%3] +%5 = SORT %4 [column=metadata.age, asc=true, nulls_first=false] +RETURN %5 +` + exp = strings.TrimSpace(exp) + + // Get the actual output without the RETURN statement + ssaOutput := ssaForm.String() + ssaLines := strings.Split(strings.TrimSpace(ssaOutput), "\n") + + expLines := strings.Split(exp, "\n") + require.Equal(t, len(expLines), len(ssaLines), "Expected and actual SSA output line counts do not match") + + for i, line := range expLines { + require.Equal(t, strings.TrimSpace(line), strings.TrimSpace(ssaLines[i]), fmt.Sprintf("Mismatch at line %d", i+1)) + } +} diff --git a/pkg/engine/planner/logical/node_binop.go b/pkg/engine/planner/logical/node_binop.go new file mode 100644 index 0000000000..5130829fcc --- /dev/null +++ b/pkg/engine/planner/logical/node_binop.go @@ -0,0 +1,109 @@ +package logical + +import ( + "fmt" + + "github.com/grafana/loki/v3/pkg/engine/planner/schema" +) + +// BinOpKind denotes the kind of [BinOp] operation to perform. +type BinOpKind int + +// Recognized values of [BinOpKind]. +const ( + // BinOpKindInvalid indicates an invalid binary operation. + BinOpKindInvalid BinOpKind = iota + + BinOpKindEq // Equality comparison (==). + BinOpKindNeq // Inequality comparison (!=). + BinOpKindGt // Greater than comparison (>). + BinOpKindGte // Greater than or equal comparison (>=). + BinOpKindLt // Less than comparison (<). + BinOpKindLte // Less than or equal comparison (<=). + BinOpKindAnd // Logical AND operation (&&). + BinOpKindOr // Logical OR operation (||). + BinOpKindXor // Logical XOR operation (^). + BinOpKindNot // Logical NOT operation (!). + + BinOpKindAdd // Addition operation (+). + BinOpKindSub // Subtraction operation (-). + BinOpKindMul // Multiplication operation (*). + BinOpKindDiv // Division operation (/). + BinOpKindMod // Modulo operation (%). + + BinOpKindMatchStr // String matching operation. + BinOpKindNotMatchStr // String non-matching operation. + BinOpKindMatchRe // Regular expression matching operation. + BinOpKindNotMatchRe // Regular expression non-matching operation. +) + +var binOpKindStrings = map[BinOpKind]string{ + BinOpKindInvalid: "invalid", + + BinOpKindEq: "EQ", + BinOpKindNeq: "NEQ", + BinOpKindGt: "GT", + BinOpKindGte: "GTE", + BinOpKindLt: "LT", + BinOpKindLte: "LTE", + BinOpKindAnd: "AND", + BinOpKindOr: "OR", + BinOpKindXor: "XOR", + BinOpKindNot: "NOT", + + BinOpKindAdd: "ADD", + BinOpKindSub: "SUB", + BinOpKindMul: "MUL", + BinOpKindDiv: "DIV", + BinOpKindMod: "MOD", + + BinOpKindMatchStr: "MATCH_STR", + BinOpKindNotMatchStr: "NOT_MATCH_STR", + BinOpKindMatchRe: "MATCH_RE", + BinOpKindNotMatchRe: "NOT_MATCH_RE", +} + +// String returns a human-readable representation of the binary operation kind. +func (k BinOpKind) String() string { + if s, ok := binOpKindStrings[k]; ok { + return s + } + return fmt.Sprintf("BinOpKind(%d)", k) +} + +// The BinOp instruction yields the result of binary operation Left Op Right. +// BinOp implements both [Instruction] and [Value]. +type BinOp struct { + id string + + Left, Right Value + Op BinOpKind +} + +var ( + _ Value = (*BinOp)(nil) + _ Instruction = (*BinOp)(nil) +) + +// Name returns an identifier for the BinOp operation. +func (b *BinOp) Name() string { + if b.id != "" { + return b.id + } + return fmt.Sprintf("<%p>", b) +} + +// String returns the disassembled SSA form of the BinOp instruction. +func (b *BinOp) String() string { + return fmt.Sprintf("%s %s, %s", b.Op, b.Left.Name(), b.Right.Name()) +} + +// Schema returns the schema of the BinOp operation. +func (b *BinOp) Schema() *schema.Schema { + // TODO(rfratto): What should be returned here? Should the schema of BinOp + // take on the schema of its LHS or RHS? Does it depend on the operation? + return nil +} + +func (b *BinOp) isValue() {} +func (b *BinOp) isInstruction() {} diff --git a/pkg/engine/planner/logical/node_limit.go b/pkg/engine/planner/logical/node_limit.go new file mode 100644 index 0000000000..1c2ecc826e --- /dev/null +++ b/pkg/engine/planner/logical/node_limit.go @@ -0,0 +1,53 @@ +package logical + +import ( + "fmt" + + "github.com/grafana/loki/v3/pkg/engine/planner/schema" +) + +// The Limit instruction limits the number of rows from a table relation. Limit +// implements [Instruction] and [Value]. +type Limit struct { + id string + + Table Value // Table relation to limit. + + // Skip is the number of rows to skip before returning results. A value of 0 + // means no rows are skipped. + Skip uint64 + + // Fetch is the maximum number of rows to return. A value of 0 means all rows + // are returned (after applying Skip). + Fetch uint64 +} + +var ( + _ Value = (*Limit)(nil) + _ Instruction = (*Limit)(nil) +) + +// Name returns an identifier for the Limit operation. +func (l *Limit) Name() string { + if l.id != "" { + return l.id + } + return fmt.Sprintf("<%p>", l) +} + +// String returns the disassembled SSA form of the Limit instruction. +func (l *Limit) String() string { + // TODO(rfratto): change the type of l.Input to [Value] so we can use + // s.Value.Name here. + return fmt.Sprintf("limit %v [skip=%d, fetch=%d]", l.Table.Name(), l.Skip, l.Fetch) +} + +// Schema returns the schema of the limit operation. +func (l *Limit) Schema() *schema.Schema { + // The schema is the same as the input plan's schema since limiting + // only affects the number of rows, not their structure. + return l.Table.Schema() +} + +func (l *Limit) isInstruction() {} +func (l *Limit) isValue() {} diff --git a/pkg/engine/planner/logical/node_literal.go b/pkg/engine/planner/logical/node_literal.go new file mode 100644 index 0000000000..fb0bbe12b7 --- /dev/null +++ b/pkg/engine/planner/logical/node_literal.go @@ -0,0 +1,145 @@ +package logical + +import ( + "fmt" + "strconv" + + "github.com/grafana/loki/v3/pkg/engine/planner/schema" +) + +// LiteralKind denotes the kind of [Literal] value. +type LiteralKind int + +// Recognized values of [LiteralKind]. +const ( + // LiteralKindInvalid indicates an invalid literal value. + LiteralKindInvalid LiteralKind = iota + + LiteralKindNull // NULL literal value. + LiteralKindString // String literal value. + LiteralKindInt64 // 64-bit integer literal value. + LiteralKindUint64 // 64-bit unsigned integer literal value. + LiteralKindByteArray // Byte array literal value. +) + +var literalKindStrings = map[LiteralKind]string{ + LiteralKindInvalid: "invalid", + + LiteralKindNull: "null", + LiteralKindString: "string", + LiteralKindInt64: "int64", + LiteralKindUint64: "uint64", + LiteralKindByteArray: "[]byte", +} + +// String returns the string representation of the LiteralKind. +func (k LiteralKind) String() string { + if s, ok := literalKindStrings[k]; ok { + return s + } + return fmt.Sprintf("LiteralKind(%d)", k) +} + +// A Literal represents a literal value known at plan time. Literal only +// implements [Value]. +// +// The zero value of a Literal is a NULL value. +type Literal struct { + val any +} + +var _ Value = (*Literal)(nil) + +// LiteralString creates a new Literal value from a string. +func LiteralString(v string) *Literal { return &Literal{val: v} } + +// LiteralInt64 creates a new Literal value from a 64-bit integer. +func LiteralInt64(v int64) *Literal { return &Literal{val: v} } + +// LiteralUint64 creates a new Literal value from a 64-bit unsigned integer. +func LiteralUint64(v uint64) *Literal { return &Literal{val: v} } + +// LiteralByteArray creates a new Literal value from a byte slice. +func LiteralByteArray(v []byte) *Literal { return &Literal{val: v} } + +// Kind returns the kind of value represented by the literal. +func (lit Literal) Kind() LiteralKind { + switch lit.val.(type) { + case nil: + return LiteralKindNull + case string: + return LiteralKindString + case int64: + return LiteralKindInt64 + case uint64: + return LiteralKindUint64 + case []byte: + return LiteralKindByteArray + default: + return LiteralKindInvalid + } +} + +// Name returns the string form of the literal. +func (lit Literal) Name() string { + return lit.String() +} + +// String returns a printable form of the literal, even if lit is not a +// [LiteralKindString]. +func (lit Literal) String() string { + switch lit.Kind() { + case LiteralKindNull: + return "NULL" + case LiteralKindString: + return strconv.Quote(lit.val.(string)) + case LiteralKindInt64: + return strconv.FormatInt(lit.Int64(), 10) + case LiteralKindUint64: + return strconv.FormatUint(lit.Uint64(), 10) + case LiteralKindByteArray: + return fmt.Sprintf("%v", lit.val) + default: + return fmt.Sprintf("Literal(%s)", lit.Kind()) + } +} + +// IsNull returns true if lit is a [LiteralKindNull] value. +func (lit Literal) IsNull() bool { + return lit.Kind() == LiteralKindNull +} + +// Int64 returns lit's value as an int64. It panics if lit is not a +// [LiteralKindInt64]. +func (lit Literal) Int64() int64 { + if expect, actual := LiteralKindInt64, lit.Kind(); expect != actual { + panic(fmt.Sprintf("literal type is %s, not %s", actual, expect)) + } + return lit.val.(int64) +} + +// Uint64 returns lit's value as a uint64. It panics if lit is not a +// [LiteralKindUint64]. +func (lit Literal) Uint64() uint64 { + if expect, actual := LiteralKindUint64, lit.Kind(); expect != actual { + panic(fmt.Sprintf("literal type is %s, not %s", actual, expect)) + } + return lit.val.(uint64) +} + +// ByteArray returns lit's value as a byte slice. It panics if lit is not a +// [LiteralKindByteArray]. +func (lit Literal) ByteArray() []byte { + if expect, actual := LiteralKindByteArray, lit.Kind(); expect != actual { + panic(fmt.Sprintf("literal type is %s, not %s", actual, expect)) + } + return lit.val.([]byte) +} + +func (lit *Literal) Schema() *schema.Schema { + // TODO(rfratto): schema.Schema needs to be updated to be a more general + // "type" instead. + return nil +} + +func (lit *Literal) isValue() {} diff --git a/pkg/engine/planner/logical/node_maketable.go b/pkg/engine/planner/logical/node_maketable.go new file mode 100644 index 0000000000..b5ebb54027 --- /dev/null +++ b/pkg/engine/planner/logical/node_maketable.go @@ -0,0 +1,49 @@ +package logical + +import ( + "fmt" + + "github.com/grafana/loki/v3/pkg/engine/planner/schema" +) + +// The MakeTable instruction yields a table relation from an identifier. +// MakeTable implements both [Instruction] and [Value]. +type MakeTable struct { + id string + + // Selector is used to generate a table relation. All streams for which the + // selector passes are included in the resulting table. + // + // It is invalid for Selector to include a [ColumnRef] that is not + // [ColumnTypeBuiltin] or [ColumnTypeLabel]. + Selector Value +} + +var ( + _ Value = (*MakeTable)(nil) + _ Instruction = (*MakeTable)(nil) +) + +// Name returns an identifier for the MakeTable operation. +func (t *MakeTable) Name() string { + if t.id != "" { + return t.id + } + return fmt.Sprintf("<%p>", t) +} + +// String returns the disassembled SSA form of the MakeTable instruction. +func (t *MakeTable) String() string { + return fmt.Sprintf("MAKE_TABLE [selector=%s]", t.Selector.Name()) +} + +// Schema returns the schema of the table. +// This implements part of the Plan interface. +func (t *MakeTable) Schema() *schema.Schema { + // TODO(rfratto): What should we return here? What's possible for the logical + // planner to know about the selector at planning time? + return nil +} + +func (t *MakeTable) isInstruction() {} +func (t *MakeTable) isValue() {} diff --git a/pkg/engine/planner/logical/node_return.go b/pkg/engine/planner/logical/node_return.go new file mode 100644 index 0000000000..e8cda19cfb --- /dev/null +++ b/pkg/engine/planner/logical/node_return.go @@ -0,0 +1,14 @@ +package logical + +// The Return instruction yields a value to return from a plan. Return +// implements [Instruction]. +type Return struct { + Value Value // The value to return. +} + +// String returns the disassembled SSA form of r. +func (r *Return) String() string { + return "RETURN " + r.Value.Name() +} + +func (r *Return) isInstruction() {} diff --git a/pkg/engine/planner/logical/node_select.go b/pkg/engine/planner/logical/node_select.go new file mode 100644 index 0000000000..13d5132f9d --- /dev/null +++ b/pkg/engine/planner/logical/node_select.go @@ -0,0 +1,48 @@ +package logical + +import ( + "fmt" + + "github.com/grafana/loki/v3/pkg/engine/planner/schema" +) + +// The Select instruction filters rows from a table relation. Select implements +// both [Instruction] and [Value]. +type Select struct { + id string + + Table Value // The table relation to filter. + + // Predicate is used to filter rows from Table. Each row is checked against + // the given Predicate, and only rows for which the Predicate is true are + // returned. + Predicate Value +} + +var ( + _ Value = (*Select)(nil) + _ Instruction = (*Select)(nil) +) + +// Name returns an identifier for the Select operation. +func (s *Select) Name() string { + if s.id != "" { + return s.id + } + return fmt.Sprintf("<%p>", s) +} + +// String returns the disassembled SSA form of the Select instruction. +func (s *Select) String() string { + return fmt.Sprintf("SELECT %s [predicate=%s]", s.Table.Name(), s.Predicate.Name()) +} + +// Schema returns the schema of the Select plan. +func (s *Select) Schema() *schema.Schema { + // Since Select only filters rows from a table, the schema is the same as the + // input table relation. + return s.Table.Schema() +} + +func (s *Select) isInstruction() {} +func (s *Select) isValue() {} diff --git a/pkg/engine/planner/logical/node_sort.go b/pkg/engine/planner/logical/node_sort.go new file mode 100644 index 0000000000..0a356c6abf --- /dev/null +++ b/pkg/engine/planner/logical/node_sort.go @@ -0,0 +1,57 @@ +package logical + +import ( + "fmt" + + "github.com/grafana/loki/v3/pkg/engine/planner/schema" +) + +// Sort represents a plan node that sorts rows based on sort expressions. +// It corresponds to the ORDER BY clause in SQL and is used to order +// the results of a query based on one or more sort expressions. + +// The Sort instruction sorts rows from a table relation. Sort implements both +// [Instruction] and [Value]. +type Sort struct { + id string + + Table Value // The table relation to sort. + + Column ColumnRef // The column to sort by. + Ascending bool // Whether to sort in ascending order. + NullsFirst bool // Controls whether NULLs appear first (true) or last (false). +} + +var ( + _ Value = (*Sort)(nil) + _ Instruction = (*Sort)(nil) +) + +// Name returns an identifier for the Sort operation. +func (s *Sort) Name() string { + if s.id != "" { + return s.id + } + return fmt.Sprintf("<%p>", s) +} + +// String returns the disassembled SSA form of the Sort instruction. +func (s *Sort) String() string { + return fmt.Sprintf( + "SORT %s [column=%s, asc=%t, nulls_first=%t]", + s.Table.Name(), + s.Column.String(), + s.Ascending, + s.NullsFirst, + ) +} + +// Schema returns the schema of the sort plan. +func (s *Sort) Schema() *schema.Schema { + // The schema is the same as the input plan's schema since sorting only + // affects the order of rows, not their structure. + return s.Table.Schema() +} + +func (s *Sort) isInstruction() {} +func (s *Sort) isValue() {} diff --git a/pkg/engine/planner/logical/node_unaryop.go b/pkg/engine/planner/logical/node_unaryop.go new file mode 100644 index 0000000000..9efe7bf377 --- /dev/null +++ b/pkg/engine/planner/logical/node_unaryop.go @@ -0,0 +1,69 @@ +package logical + +import ( + "fmt" + + "github.com/grafana/loki/v3/pkg/engine/planner/schema" +) + +// UnaryOpKind denotes the kind of [UnaryOp] operation to perform. +type UnaryOpKind int + +// Recognized values of [UnaryOpKind]. +const ( + // UnaryOpKindInvalid indicates an invalid unary operation. + UnaryOpKindInvalid UnaryOpKind = iota + + UnaryOpKindNot // Logical NOT operation (!). +) + +var unaryOpKindStrings = map[UnaryOpKind]string{ + UnaryOpKindInvalid: "invalid", + + UnaryOpKindNot: "NOT", +} + +// String returns the string representation of the UnaryOpKind. +func (k UnaryOpKind) String() string { + if s, ok := unaryOpKindStrings[k]; ok { + return s + } + return fmt.Sprintf("UnaryOpKind(%d)", k) +} + +// The UnaryOp instruction yields the result of unary operation Op Value. +// UnaryOp implements both [Instruction] and [Value]. +type UnaryOp struct { + id string + + Op UnaryOpKind + Value Value +} + +var ( + _ Value = (*UnaryOp)(nil) + _ Instruction = (*UnaryOp)(nil) +) + +// Name returns an identifier for the UnaryOp operation. +func (u *UnaryOp) Name() string { + if u.id != "" { + return u.id + } + return fmt.Sprintf("<%p>", u) +} + +// String returns the disassembled SSA form of the UnaryOp instruction. +func (u *UnaryOp) String() string { + return fmt.Sprintf("%s %s", u.Op, u.Value.Name()) +} + +// Schema returns the schema of the UnaryOp plan. +func (u *UnaryOp) Schema() *schema.Schema { + // TODO(rfratto): What should be returned here? Should the schema of BinOp + // take on the schema of its Value? Does it depend on the operation? + return nil +} + +func (u *UnaryOp) isValue() {} +func (u *UnaryOp) isInstruction() {} diff --git a/pkg/engine/planner/logical/plan.go b/pkg/engine/planner/logical/plan.go deleted file mode 100644 index 7ba37b243f..0000000000 --- a/pkg/engine/planner/logical/plan.go +++ /dev/null @@ -1,187 +0,0 @@ -// Package logical provides a logical query plan representation for data processing operations. -// It defines a type system for expressions and plan nodes that can be used to build and -// manipulate query plans in a structured way. -package logical - -import ( - "fmt" - - "github.com/grafana/loki/v3/pkg/engine/planner/schema" -) - -// PlanType is an enum representing the type of plan node. -// It allows consumers to determine the concrete type of a Plan -// and safely cast to the appropriate interface. -type PlanType int - -const ( - PlanTypeInvalid PlanType = iota // Invalid or uninitialized plan - PlanTypeTable // Represents a table scan operation - PlanTypeFilter // Represents a filter operation - PlanTypeProjection // Represents a projection operation - PlanTypeAggregate // Represents an aggregation operation - PlanTypeLimit // Represents a limit operation - PlanTypeSort // Represents a sort operation -) - -// String returns a string representation of the plan type. -// This is useful for debugging and error messages. -func (t PlanType) String() string { - switch t { - case PlanTypeInvalid: - return "Invalid" - case PlanTypeTable: - return "Table" - case PlanTypeFilter: - return "Filter" - case PlanTypeProjection: - return "Projection" - case PlanTypeAggregate: - return "Aggregate" - case PlanTypeLimit: - return "Limit" - case PlanTypeSort: - return "Sort" - default: - return "Unknown" - } -} - -// Plan is the core plan type in the logical package. -// It wraps a concrete plan type (MakeTable, Filter, etc.) -// and provides methods to safely access the underlying value. -// This approach replaces the previous interface-based design to reduce -// indirection and improve code clarity. -type Plan struct { - ty PlanType // The type of plan - val any // The concrete plan value -} - -// Type returns the type of the plan. -// This allows consumers to determine the concrete type of the plan -// and safely cast to the appropriate interface. -func (p Plan) Type() PlanType { - return p.ty -} - -// Schema returns the schema of the data produced by this plan node. -// It delegates to the appropriate concrete plan type based on the plan type. -func (p Plan) Schema() schema.Schema { - switch p.ty { - case PlanTypeTable: - return p.val.(*MakeTable).Schema() - case PlanTypeFilter: - return p.val.(*Filter).Schema() - case PlanTypeProjection: - return p.val.(*Projection).Schema() - case PlanTypeAggregate: - return p.val.(*Aggregate).Schema() - case PlanTypeLimit: - return p.val.(*Limit).Schema() - case PlanTypeSort: - return p.val.(*Sort).Schema() - default: - panic(fmt.Sprintf("unknown plan type: %v", p.ty)) - } -} - -// Table returns the concrete table plan if this is a table plan. -// Panics if this is not a table plan. -func (p Plan) Table() *MakeTable { - if p.ty != PlanTypeTable { - panic(fmt.Sprintf("not a table plan: %v", p.ty)) - } - return p.val.(*MakeTable) -} - -// Filter returns the concrete filter plan if this is a filter plan. -// Panics if this is not a filter plan. -func (p Plan) Filter() *Filter { - if p.ty != PlanTypeFilter { - panic(fmt.Sprintf("not a filter plan: %v", p.ty)) - } - return p.val.(*Filter) -} - -// Projection returns the concrete projection plan if this is a projection plan. -// Panics if this is not a projection plan. -func (p Plan) Projection() *Projection { - if p.ty != PlanTypeProjection { - panic(fmt.Sprintf("not a projection plan: %v", p.ty)) - } - return p.val.(*Projection) -} - -// Aggregate returns the concrete aggregate plan if this is an aggregate plan. -// Panics if this is not an aggregate plan. -func (p Plan) Aggregate() *Aggregate { - if p.ty != PlanTypeAggregate { - panic(fmt.Sprintf("not an aggregate plan: %v", p.ty)) - } - return p.val.(*Aggregate) -} - -// Limit returns the concrete limit plan if this is a limit plan. -// Panics if this is not a limit plan. -func (p Plan) Limit() *Limit { - if p.ty != PlanTypeLimit { - panic(fmt.Sprintf("not a limit plan: %v", p.ty)) - } - return p.val.(*Limit) -} - -// Sort returns the concrete sort plan if this is a sort plan. -// Panics if this is not a sort plan. -func (p Plan) Sort() *Sort { - if p.ty != PlanTypeSort { - panic(fmt.Sprintf("not a sort plan: %v", p.ty)) - } - return p.val.(*Sort) -} - -// newPlan creates a new plan with the given type and value. -// This is a helper function for creating plans of different types. -func newPlan(ty PlanType, val any) Plan { - return Plan{ - ty: ty, - val: val, - } -} - -// NewScan creates a new table scan plan. -// This is the entry point for building a query plan, as all queries -// start with scanning a table. -func NewScan(name string, schema schema.Schema) Plan { - return newPlan(PlanTypeTable, makeTable(name, schema)) -} - -// NewFilter creates a new filter plan. -// This applies a boolean expression to filter rows from the input plan. -func NewFilter(input Plan, expr Expr) Plan { - return newPlan(PlanTypeFilter, newFilter(input, expr)) -} - -// NewProjection creates a new projection plan. -// This applies a list of expressions to project columns from the input plan. -func NewProjection(input Plan, exprs []Expr) Plan { - return newPlan(PlanTypeProjection, newProjection(input, exprs)) -} - -// NewAggregate creates a new aggregate plan. -// This applies grouping and aggregation to the input plan. -func NewAggregate(input Plan, groupExprs []Expr, aggExprs []AggregateExpr) Plan { - return newPlan(PlanTypeAggregate, newAggregate(input, groupExprs, aggExprs)) -} - -// NewLimit creates a new limit plan. -// This limits the number of rows returned by the input plan. -// If skip is 0, no rows are skipped. If fetch is 0, all rows are returned after applying skip. -func NewLimit(input Plan, skip uint64, fetch uint64) Plan { - return newPlan(PlanTypeLimit, newLimit(input, skip, fetch)) -} - -// NewSort creates a new sort plan. -// This sorts the rows from the input plan based on the provided sort expression. -func NewSort(input Plan, expr SortExpr) Plan { - return newPlan(PlanTypeSort, newSort(input, expr)) -} diff --git a/pkg/engine/planner/logical/project.go b/pkg/engine/planner/logical/project.go deleted file mode 100644 index da7df4300b..0000000000 --- a/pkg/engine/planner/logical/project.go +++ /dev/null @@ -1,50 +0,0 @@ -package logical - -import ( - "github.com/grafana/loki/v3/pkg/engine/planner/schema" -) - -// Projection represents a plan node that projects expressions from its input. -// It corresponds to the SELECT clause in SQL and is used to select, transform, -// or compute columns from the input plan. -type Projection struct { - // input is the child plan node providing data to project - input Plan - // exprs is the list of expressions to project - exprs []Expr -} - -// newProjection creates a new Projection plan node. -// The Projection logical plan applies a list of expressions to the input data, -// producing a new set of columns. This is represented by the SELECT clause in SQL. -// Sometimes this is as simple as a list of columns, such as SELECT a, b, c FROM foo, -// but it could also include any other type of expression that is supported. -// A more complex example would be SELECT (CAST(a AS float) * 3.141592)) AS my_float FROM foo. -func newProjection(input Plan, exprs []Expr) *Projection { - return &Projection{ - input: input, - exprs: exprs, - } -} - -// Schema returns the schema of the projection plan. -// The schema is derived from the projected expressions. -func (p *Projection) Schema() schema.Schema { - cols := make([]schema.ColumnSchema, len(p.exprs)) - for i, expr := range p.exprs { - cols[i] = expr.ToField(p.input) - } - return schema.Schema{Columns: cols} -} - -// Child returns the input plan. -// This is a convenience method for accessing the child plan. -func (p *Projection) Child() Plan { - return p.input -} - -// ProjectExprs returns the list of projection expressions. -// These are the expressions that define the output columns. -func (p *Projection) ProjectExprs() []Expr { - return p.exprs -} diff --git a/pkg/engine/planner/logical/sort_expr.go b/pkg/engine/planner/logical/sort_expr.go deleted file mode 100644 index 26337ad994..0000000000 --- a/pkg/engine/planner/logical/sort_expr.go +++ /dev/null @@ -1,59 +0,0 @@ -package logical - -// SortExpr represents a sort expression in a query plan. -// It encapsulates an expression to sort on, a sort direction (ascending or descending), -// and how NULL values should be handled (first or last). -type SortExpr struct { - // name is a descriptive name for the sort expression - name string - // expr is the expression to sort on - expr Expr - // asc indicates whether to sort in ascending order (true) or descending order (false) - asc bool - // nullsFirst indicates whether NULL values should appear first (true) or last (false) - nullsFirst bool -} - -// NewSortExpr creates a new sort expression. -// -// Parameters: -// - name: A descriptive name for the sort expression -// - expr: The expression to sort on -// - asc: Whether to sort in ascending order (true) or descending order (false) -// - nullsFirst: Whether NULL values should appear first (true) or last (false) -// -// Example usage: -// -// // Sort by age in ascending order, NULLs last -// sortExpr := NewSortExpr("sort_by_age", Col("age"), true, false) -// -// // Sort by name in descending order, NULLs first -// sortExpr := NewSortExpr("sort_by_name", Col("name"), false, true) -func NewSortExpr(name string, expr Expr, asc bool, nullsFirst bool) SortExpr { - return SortExpr{ - name: name, - expr: expr, - asc: asc, - nullsFirst: nullsFirst, - } -} - -// Name returns the name of the sort expression. -func (s SortExpr) Name() string { - return s.name -} - -// Expr returns the expression to sort on. -func (s SortExpr) Expr() Expr { - return s.expr -} - -// Asc returns whether the sort is in ascending order. -func (s SortExpr) Asc() bool { - return s.asc -} - -// NullsFirst returns whether NULL values should appear first. -func (s SortExpr) NullsFirst() bool { - return s.nullsFirst -} diff --git a/pkg/engine/planner/logical/sort_plan.go b/pkg/engine/planner/logical/sort_plan.go deleted file mode 100644 index 36bf8faa43..0000000000 --- a/pkg/engine/planner/logical/sort_plan.go +++ /dev/null @@ -1,54 +0,0 @@ -package logical - -import ( - "github.com/grafana/loki/v3/pkg/engine/planner/schema" -) - -// Sort represents a plan node that sorts rows based on sort expressions. -// It corresponds to the ORDER BY clause in SQL and is used to order -// the results of a query based on one or more sort expressions. -type Sort struct { - // input is the child plan node providing data to sort - input Plan - // expr is the sort expression to apply - expr SortExpr -} - -// newSort creates a new Sort plan node. -// It takes an input plan and a vector of sort expressions to apply. -// -// Example usage: -// -// // Sort by age in ascending order, NULLs last, then by name in descending order, NULLs first -// sort := newSort(inputPlan, []SortExpr{ -// NewSortExpr("sort_by_age", Col("age"), true, false), -// NewSortExpr("sort_by_name", Col("name"), false, true), -// }) -func newSort(input Plan, expr SortExpr) *Sort { - return &Sort{ - input: input, - expr: expr, - } -} - -// Schema returns the schema of the sort plan. -// The schema is the same as the input plan's schema since sorting -// only affects the order of rows, not their structure. -func (s *Sort) Schema() schema.Schema { - return s.input.Schema() -} - -// Type returns the plan type for this node. -func (s *Sort) Type() PlanType { - return PlanTypeSort -} - -// Child returns the input plan. -func (s *Sort) Child() Plan { - return s.input -} - -// SortExpr returns the sort expression. -func (s *Sort) Expr() SortExpr { - return s.expr -} diff --git a/pkg/engine/planner/logical/ssa.go b/pkg/engine/planner/logical/ssa.go deleted file mode 100644 index 57c220870d..0000000000 --- a/pkg/engine/planner/logical/ssa.go +++ /dev/null @@ -1,512 +0,0 @@ -package logical - -import ( - "fmt" - "strings" -) - -// SSANode represents a single node in the SSA (Static Single Assignment) form -// Each node has a unique ID, a type, and ordered properties and references to other nodes -type SSANode struct { - // ID is the unique identifier for this node - ID int - // NodeType is the type of this node (e.g., "MakeTable", "ColumnRef", etc.) - NodeType string - // Tuples represents the ordered properties of this node - Tuples []nodeProperty - // References to other nodes in the SSA form - References []int -} - -// nodeProperty represents a key-value property of an SSA node -type nodeProperty struct { - Key string - Value string -} - -// String returns a string representation of this node -// Format: %ID = NodeType [prop1=value1, prop2=value2, ...] -func (n *SSANode) String() string { - var sb strings.Builder - - // Format the node ID and type - sb.WriteString(fmt.Sprintf("%%%d = %s", n.ID, n.NodeType)) - - // Add properties in brackets if any exist - if len(n.Tuples) > 0 { - sb.WriteString(" [") - - // Properties are already in the correct order - for i, prop := range n.Tuples { - if i > 0 { - sb.WriteString(", ") - } - sb.WriteString(fmt.Sprintf("%s=%s", prop.Key, prop.Value)) - } - - sb.WriteString("]") - } - - return sb.String() -} - -// SSAForm represents a full query plan in SSA form -// It contains a list of nodes and the ID of the root node -type SSAForm struct { - // nodes is an ordered list of SSA nodes, where each node's dependencies - // are guaranteed to appear earlier in the list - nodes []SSANode -} - -// ConvertToSSA converts a logical plan to SSA form -// It performs a post-order traversal of the plan, adding nodes as it goes -func ConvertToSSA(plan Plan) (*SSAForm, error) { - // Initialize the builder with an empty node at index 0 - builder := &ssaBuilder{ - nodes: []SSANode{{}}, // Start with an empty node at index 0 - nodeMap: make(map[string]int), - nextID: 1, - exprTypes: make(map[Expr]string), - } - - _, err := builder.processPlan(plan) - if err != nil { - return nil, fmt.Errorf("error converting plan to SSA: %w", err) - } - - return &SSAForm{ - nodes: builder.nodes, - }, nil -} - -// ssaBuilder is a helper type for building SSA forms -type ssaBuilder struct { - nodes []SSANode - nodeMap map[string]int // Maps node key to its ID - nextID int - exprTypes map[Expr]string // Cache for expression types -} - -// getID generates a unique ID for a node -func (b *ssaBuilder) getID() int { - id := b.nextID - b.nextID++ - return id -} - -// processPlan processes a logical plan and returns the ID of the resulting SSA node. -func (b *ssaBuilder) processPlan(plan Plan) (int, error) { - switch plan.Type() { - case PlanTypeTable: - return b.processTablePlan(plan.Table()) - case PlanTypeFilter: - return b.processFilterPlan(plan.Filter()) - case PlanTypeProjection: - return b.processProjectionPlan(plan.Projection()) - case PlanTypeAggregate: - return b.processAggregatePlan(plan.Aggregate()) - case PlanTypeLimit: - return b.processLimitPlan(plan.Limit()) - case PlanTypeSort: - return b.processSortPlan(plan.Sort()) - default: - return 0, fmt.Errorf("unsupported plan type: %v", plan.Type()) - } -} - -// processTablePlan processes a table plan node -// It creates a MakeTable node with the table name -func (b *ssaBuilder) processTablePlan(plan *MakeTable) (int, error) { - // Create a node for the table - id := b.getID() - node := SSANode{ - ID: id, - NodeType: "MakeTable", - Tuples: []nodeProperty{ - {Key: "name", Value: plan.TableName()}, - }, - } - - b.nodes = append(b.nodes, node) - return id, nil -} - -// processFilterPlan processes a filter plan node -// It processes the child plan and filter expression, then creates a Filter node -func (b *ssaBuilder) processFilterPlan(plan *Filter) (int, error) { - // Process the child plan first - childID, err := b.processPlan(plan.Child()) - if err != nil { - return 0, err - } - - // Process the filter expression - exprID, err := b.processExpr(plan.FilterExpr(), plan.Child()) - if err != nil { - return 0, err - } - - // Get the name of the expression - var exprName string - - if plan.FilterExpr().Type() == ExprTypeBinaryOp { - exprName = plan.FilterExpr().BinaryOp().Name() - } else { - exprName = plan.FilterExpr().ToField(plan.Child()).Name - } - - // Create a node for the filter - id := b.getID() - node := SSANode{ - ID: id, - NodeType: "Filter", - Tuples: []nodeProperty{ - {Key: "name", Value: exprName}, - {Key: "predicate", Value: fmt.Sprintf("%%%d", exprID)}, - {Key: "plan", Value: fmt.Sprintf("%%%d", childID)}, - }, - References: []int{exprID, childID}, - } - - b.nodes = append(b.nodes, node) - return id, nil -} - -// processProjectionPlan processes a projection plan node -// It processes the child plan and all projection expressions, then creates a Project node -func (b *ssaBuilder) processProjectionPlan(plan *Projection) (int, error) { - // Process the child plan first - childID, err := b.processPlan(plan.Child()) - if err != nil { - return 0, err - } - - // Process all projection expressions - var props []nodeProperty - var references []int - - // Process expressions and build properties in a stable order - // determined by the order of expressions in the plan - for _, expr := range plan.ProjectExprs() { - exprID, err := b.processExpr(expr, plan.Child()) - if err != nil { - return 0, err - } - - field := expr.ToField(plan.Child()) - props = append(props, nodeProperty{ - Key: field.Name, - Value: fmt.Sprintf("%%%d", exprID), - }) - references = append(references, exprID) - } - - // Create a node for the projection - id := b.getID() - node := SSANode{ - ID: id, - NodeType: "Project", - Tuples: props, - References: append(references, childID), // Add childID to references - } - - b.nodes = append(b.nodes, node) - return id, nil -} - -// processAggregatePlan processes an aggregate plan node -// It processes the child plan, group expressions, and aggregate expressions, -// then creates an AggregatePlan node -func (b *ssaBuilder) processAggregatePlan(plan *Aggregate) (int, error) { - // Process the child plan first - _, err := b.processPlan(plan.Child()) - if err != nil { - return 0, err - } - - // Process group expressions - var groupingRefs []string - var groupingIDs []int - - for _, expr := range plan.GroupExprs() { - exprID, err := b.processExpr(expr, plan.Child()) - if err != nil { - return 0, err - } - - groupingRefs = append(groupingRefs, fmt.Sprintf("%%%d", exprID)) - groupingIDs = append(groupingIDs, exprID) - } - - // Process aggregate expressions - var aggregationRefs []string - var aggregationIDs []int - - for _, expr := range plan.AggregateExprs() { - exprID, err := b.processAggregateExpr(&expr, plan.Child()) - if err != nil { - return 0, err - } - - aggregationRefs = append(aggregationRefs, fmt.Sprintf("%%%d", exprID)) - aggregationIDs = append(aggregationIDs, exprID) - } - - // Create a node for the aggregate plan - id := b.getID() - node := SSANode{ - ID: id, - NodeType: "AggregatePlan", - Tuples: []nodeProperty{ - {Key: "aggregations", Value: fmt.Sprintf("[%s]", strings.Join(aggregationRefs, ", "))}, - {Key: "groupings", Value: fmt.Sprintf("[%s]", strings.Join(groupingRefs, ", "))}, - }, - References: append(groupingIDs, aggregationIDs...), - } - - b.nodes = append(b.nodes, node) - return id, nil -} - -// processExpr processes an expression and returns its ID -// It handles different expression types by delegating to specific processing methods -func (b *ssaBuilder) processExpr(expr Expr, parent Plan) (int, error) { - switch expr.Type() { - case ExprTypeColumn: - return b.processColumnExpr(expr.Column(), parent) - case ExprTypeLiteral: - return b.processLiteralExpr(expr.Literal()) - case ExprTypeBinaryOp: - return b.processBinaryOpExpr(expr.BinaryOp(), parent) - case ExprTypeAggregate: - return b.processAggregateExpr(expr.Aggregate(), parent) - default: - return 0, fmt.Errorf("unknown expression type: %v", expr.Type()) - } -} - -// processColumnExpr processes a column expression -// It creates a ColumnRef node with the column name and type -func (b *ssaBuilder) processColumnExpr(expr *ColumnExpr, parent Plan) (int, error) { - field := expr.ToField(parent) - - // Create a node for the column reference - id := b.getID() - node := SSANode{ - ID: id, - NodeType: "ColumnRef", - Tuples: []nodeProperty{ - {Key: "name", Value: field.Name}, - {Key: "type", Value: field.Type.String()}, - }, - } - - b.nodes = append(b.nodes, node) - return id, nil -} - -// processLiteralExpr processes a literal expression -// It creates a Literal node with the value and type -func (b *ssaBuilder) processLiteralExpr(expr *LiteralExpr) (int, error) { - // Create a node for the literal - id := b.getID() - node := SSANode{ - ID: id, - NodeType: "Literal", - Tuples: []nodeProperty{ - {Key: "val", Value: expr.ValueString()}, - {Key: "type", Value: expr.ValueType().String()}, - }, - } - - b.nodes = append(b.nodes, node) - return id, nil -} - -// processBinaryOpExpr processes a binary operation expression -// It processes the left and right operands, then creates a BinaryOp node -func (b *ssaBuilder) processBinaryOpExpr(expr *BinOpExpr, parent Plan) (int, error) { - // Process the left and right operands first - leftID, err := b.processExpr(expr.Left(), parent) - if err != nil { - return 0, err - } - - rightID, err := b.processExpr(expr.Right(), parent) - if err != nil { - return 0, err - } - - // Create a node for the binary operation - id := b.getID() - node := SSANode{ - ID: id, - NodeType: "BinaryOp", - Tuples: []nodeProperty{ - {Key: "op", Value: fmt.Sprintf("(%s)", expr.Op().String())}, - {Key: "name", Value: expr.Name()}, - {Key: "left", Value: fmt.Sprintf("%%%d", leftID)}, - {Key: "right", Value: fmt.Sprintf("%%%d", rightID)}, - }, - References: []int{leftID, rightID}, - } - - b.nodes = append(b.nodes, node) - return id, nil -} - -// processAggregateExpr processes an aggregate expression -// It processes the input expression first if it exists, then creates an AggregationExpr node -func (b *ssaBuilder) processAggregateExpr(expr *AggregateExpr, parent Plan) (int, error) { - // Process the input expression first if it exists - exprID, err := b.processExpr(expr.SubExpr(), parent) - if err != nil { - return 0, err - } - - // Create a node for the aggregate expression - id := b.getID() - node := SSANode{ - ID: id, - NodeType: "AggregationExpr", - Tuples: []nodeProperty{ - {Key: "name", Value: expr.Name()}, - {Key: "op", Value: string(expr.Op())}, - }, - References: []int{exprID}, - } - - b.nodes = append(b.nodes, node) - return id, nil -} - -// processLimitPlan processes a limit plan and returns the ID of the resulting SSA node. -// This converts a Limit logical plan node to its SSA representation. -// -// The SSA node for a Limit plan has the following format: -// -// %ID = Limit [Skip=X, Fetch=Y] -// -// Where X is the number of rows to skip and Y is the maximum number of rows to return. -// The Limit node always includes both Skip and Fetch properties, even when they are zero. -// This ensures consistent representation and allows for optimization in a separate step. -// -// The Limit node references its input plan as a dependency. -func (b *ssaBuilder) processLimitPlan(plan *Limit) (int, error) { - // Process the input plan - inputID, err := b.processPlan(plan.Child()) - if err != nil { - return 0, fmt.Errorf("failed to process limit input plan: %w", err) - } - - // Create properties for the limit node - tuples := []nodeProperty{ - { - Key: "Skip", - Value: fmt.Sprintf("%d", plan.Skip()), - }, - { - Key: "Fetch", - Value: fmt.Sprintf("%d", plan.Fetch()), - }, - } - - // Create the limit node - id := b.getID() - b.nodes = append(b.nodes, SSANode{ - ID: id, - NodeType: "Limit", - Tuples: tuples, - References: []int{inputID}, - }) - return id, nil -} - -// processSortPlan processes a sort plan and returns the ID of the resulting SSA node. -// This converts a Sort logical plan node to its SSA representation. -// -// The SSA node for a Sort plan has the following format: -// -// %ID = Sort [expr=X, direction=Y, nulls=Z] -// -// Where X is the name of the sort expression, Y is the sort direction, and Z is the nulls position. -// The Sort node references its input plan and sort expression as dependencies. -func (b *ssaBuilder) processSortPlan(plan *Sort) (int, error) { - // Process the child plan first - childID, err := b.processPlan(plan.Child()) - if err != nil { - return 0, err - } - - // Process the sort expression - exprID, err := b.processExpr(plan.Expr().Expr(), plan.Child()) - if err != nil { - return 0, err - } - - // Create direction and nulls position properties - direction := "asc" - if !plan.Expr().Asc() { - direction = "desc" - } - - nullsPosition := "last" - if plan.Expr().NullsFirst() { - nullsPosition = "first" - } - - // Create the Sort node - id := b.getID() - node := SSANode{ - ID: id, - NodeType: "Sort", - Tuples: []nodeProperty{ - {Key: "expr", Value: plan.Expr().Name()}, - {Key: "direction", Value: direction}, - {Key: "nulls", Value: nullsPosition}, - }, - References: []int{exprID, childID}, - } - - b.nodes = append(b.nodes, node) - return id, nil -} - -// String returns a string representation of the SSA form with the RETURN statement -func (f *SSAForm) String() string { - if len(f.nodes) <= 1 { - return "" - } - - // The root is the last node added - lastNodeID := f.nodes[len(f.nodes)-1].ID - return f.Format() + fmt.Sprintf("\nRETURN %%%d", lastNodeID) -} - -// Format returns a formatted string representation of the SSA form -// It includes all nodes but not the RETURN statement -func (f *SSAForm) Format() string { - var sb strings.Builder - - // Add each node (skip node 0 if it exists) - for i := 1; i < len(f.nodes); i++ { - if i > 1 { - sb.WriteString("\n") - } - sb.WriteString(f.nodes[i].String()) - } - - return sb.String() -} - -// Nodes returns a map of node IDs to their string representations -// This is primarily used for testing -func (f *SSAForm) Nodes() map[int]string { - result := make(map[int]string) - for _, node := range f.nodes { - if node.ID > 0 { - result[node.ID] = node.String() - } - } - return result -} diff --git a/pkg/engine/planner/logical/ssa_test.go b/pkg/engine/planner/logical/ssa_test.go deleted file mode 100644 index c67ff1e0c8..0000000000 --- a/pkg/engine/planner/logical/ssa_test.go +++ /dev/null @@ -1,189 +0,0 @@ -package logical - -import ( - "fmt" - "strings" - "testing" - - "github.com/stretchr/testify/require" - - "github.com/grafana/loki/v3/pkg/engine/planner/schema" -) - -func TestConvertSimpleQueryToSSA(t *testing.T) { - // Build a simple query plan: - // SELECT id, name FROM users WHERE age > 21 - ds := &testDataSource{ - name: "users", - schema: schema.Schema{ - Columns: []schema.ColumnSchema{ - {Name: "id", Type: schema.ValueTypeUint64}, - {Name: "name", Type: schema.ValueTypeString}, - {Name: "age", Type: schema.ValueTypeUint64}, - }, - }, - } - - scan := NewScan(ds.Name(), ds.Schema()) - filter := NewFilter(scan, Gt("age_gt_21", Col("age"), LitI64(21))) - proj := NewProjection(filter, []Expr{Col("id"), Col("name")}) - - // Convert to SSA - ssaForm, err := ConvertToSSA(proj) - require.NoError(t, err) - require.NotNil(t, ssaForm) - - // Get the string representation for debugging - ssaString := ssaForm.String() - t.Logf("SSA Form:\n%s", ssaString) - - // Define expected output - exp := ` -%1 = MakeTable [name=users] -%2 = ColumnRef [name=age, type=VALUE_TYPE_UINT64] -%3 = Literal [val=21, type=VALUE_TYPE_INT64] -%4 = BinaryOp [op=(>), name=age_gt_21, left=%2, right=%3] -%5 = Filter [name=age_gt_21, predicate=%4, plan=%1] -%6 = ColumnRef [name=id, type=VALUE_TYPE_UINT64] -%7 = ColumnRef [name=name, type=VALUE_TYPE_STRING] -%8 = Project [id=%6, name=%7] -` - exp = strings.TrimSpace(exp) - - // Get the actual output without the RETURN statement - ssaOutput := ssaForm.Format() - ssaLines := strings.Split(strings.TrimSpace(ssaOutput), "\n") - - expLines := strings.Split(exp, "\n") - require.Equal(t, len(expLines), len(ssaLines), "Expected and actual SSA output line counts do not match") - - for i, line := range expLines { - require.Equal(t, strings.TrimSpace(line), strings.TrimSpace(ssaLines[i]), fmt.Sprintf("Mismatch at line %d", i+1)) - } -} - -func TestConvertComplexQueryToSSA(t *testing.T) { - // Calculate the sum of sales per region for the year 2020 - ds := &testDataSource{ - name: "orders", - schema: schema.Schema{ - Columns: []schema.ColumnSchema{ - {Name: "region", Type: schema.ValueTypeString}, - {Name: "sales", Type: schema.ValueTypeUint64}, - {Name: "year", Type: schema.ValueTypeUint64}, - }, - }, - } - - df := NewDataFrame( - NewScan(ds.Name(), ds.Schema()), - ).Filter( - Eq("year_2020", Col("year"), LitI64(2020)), - ).Project( - []Expr{ - Col("region"), - Col("sales"), - Col("year"), - }, - ).Aggregate( - []Expr{Col("region")}, - []AggregateExpr{ - Sum("total_sales", Col("sales")), - }, - ).Limit( - 0, - 10, - ) - - // Convert to SSA - ssaForm, err := ConvertToSSA(df.LogicalPlan()) - require.NoError(t, err) - require.NotNil(t, ssaForm) - - // Get the string representation for debugging - ssaString := ssaForm.String() - t.Logf("SSA Form:\n%s", ssaString) - - // Define expected output - exp := ` -%1 = MakeTable [name=orders] -%2 = ColumnRef [name=year, type=VALUE_TYPE_UINT64] -%3 = Literal [val=2020, type=VALUE_TYPE_INT64] -%4 = BinaryOp [op=(==), name=year_2020, left=%2, right=%3] -%5 = Filter [name=year_2020, predicate=%4, plan=%1] -%6 = ColumnRef [name=region, type=VALUE_TYPE_STRING] -%7 = ColumnRef [name=sales, type=VALUE_TYPE_UINT64] -%8 = ColumnRef [name=year, type=VALUE_TYPE_UINT64] -%9 = Project [region=%6, sales=%7, year=%8] -%10 = ColumnRef [name=region, type=VALUE_TYPE_STRING] -%11 = ColumnRef [name=sales, type=VALUE_TYPE_UINT64] -%12 = AggregationExpr [name=total_sales, op=sum] -%13 = AggregatePlan [aggregations=[%12], groupings=[%10]] -%14 = Limit [Skip=0, Fetch=10] -` - exp = strings.TrimSpace(exp) - - // Get the actual output without the RETURN statement - ssaOutput := ssaForm.Format() - ssaLines := strings.Split(strings.TrimSpace(ssaOutput), "\n") - - expLines := strings.Split(exp, "\n") - require.Equal(t, len(expLines), len(ssaLines), "Expected and actual SSA output line counts do not match") - - for i, line := range expLines { - require.Equal(t, strings.TrimSpace(line), strings.TrimSpace(ssaLines[i]), fmt.Sprintf("Mismatch at line %d", i+1)) - } -} - -func TestConvertSortQueryToSSA(t *testing.T) { - // Build a query plan with sorting: - // SELECT id, name, age FROM users WHERE age > 21 ORDER BY age ASC, name DESC - ds := &testDataSource{ - name: "users", - schema: schema.Schema{ - Columns: []schema.ColumnSchema{ - {Name: "id", Type: schema.ValueTypeUint64}, - {Name: "name", Type: schema.ValueTypeString}, - {Name: "age", Type: schema.ValueTypeUint64}, - }, - }, - } - - scan := NewScan(ds.Name(), ds.Schema()) - filter := NewFilter(scan, Gt("age_gt_21", Col("age"), LitI64(21))) - proj := NewProjection(filter, []Expr{Col("id"), Col("name"), Col("age")}) - - // Sort by age ascending, nulls last - sortByAge := NewSort(proj, NewSortExpr("sort_by_age", Col("age"), true, false)) - - ssa, err := ConvertToSSA(sortByAge) - require.NoError(t, err) - - t.Logf("SSA Form:\n%s", ssa.Format()) - - // Verify the structure of the SSA form - nodes := ssa.Nodes() - require.NotEmpty(t, nodes) - - // The last node should be the Sort node - lastNodeID := len(ssa.nodes) - 1 - lastNode := ssa.nodes[lastNodeID] - require.Equal(t, "Sort", lastNode.NodeType) - - // Verify the properties of the Sort node - var exprName, direction, nulls string - for _, tuple := range lastNode.Tuples { - switch tuple.Key { - case "expr": - exprName = tuple.Value - case "direction": - direction = tuple.Value - case "nulls": - nulls = tuple.Value - } - } - - require.Equal(t, "sort_by_age", exprName) - require.Equal(t, "asc", direction) - require.Equal(t, "last", nulls) -} diff --git a/pkg/engine/planner/logical/table.go b/pkg/engine/planner/logical/table.go deleted file mode 100644 index f8fdeb81ca..0000000000 --- a/pkg/engine/planner/logical/table.go +++ /dev/null @@ -1,43 +0,0 @@ -package logical - -import ( - "github.com/grafana/loki/v3/pkg/engine/planner/schema" -) - -// MakeTable represents a plan node that scans input data. -// It is the leaf node in the query tree, representing the initial data source -// from which all other operations will read. This is equivalent to a table scan -// operation in a relational database. -type MakeTable struct { - // name is the identifier of the table to scan - name string - // schema defines the structure of the data in the table - schema schema.Schema -} - -// makeTable creates a new MakeTable plan node with the given name and schema. -// This is an internal constructor used by the public NewScan function. -func makeTable(name string, schema schema.Schema) *MakeTable { - return &MakeTable{ - name: name, - schema: schema, - } -} - -// Schema returns the schema of the table. -// This implements part of the Plan interface. -func (t *MakeTable) Schema() schema.Schema { - return t.schema -} - -// TableSchema returns the schema of the table. -// This is a convenience method that returns the same value as Schema(). -func (t *MakeTable) TableSchema() schema.Schema { - return t.schema -} - -// TableName returns the name of the table. -// This is used for identifying the data source in the query plan. -func (t *MakeTable) TableName() string { - return t.name -} diff --git a/pkg/engine/planner/physical/planner.go b/pkg/engine/planner/physical/planner.go index a81ad09ec5..f47f389fd3 100644 --- a/pkg/engine/planner/physical/planner.go +++ b/pkg/engine/planner/physical/planner.go @@ -30,7 +30,7 @@ func (p *Planner) processMakeTable(_ *logical.MakeTable) error { return nil } -func (p *Planner) processSelect(_ *logical.Filter) error { +func (p *Planner) processSelect(_ *logical.Select) error { return nil }