mirror of https://github.com/grafana/loki
chore(engine): Update logical planner types to be structured as SSA (#16841)
Signed-off-by: Robert Fratto <robertfratto@gmail.com> Co-authored-by: Christian Haudum <christian.haudum@gmail.com>pull/16850/head
parent
110548e621
commit
cbe97e6c1a
@ -1,108 +0,0 @@ |
||||
package logical |
||||
|
||||
import ( |
||||
"github.com/grafana/loki/v3/pkg/engine/planner/schema" |
||||
) |
||||
|
||||
// AggregateOp represents the type of aggregation operation to perform.
|
||||
// It is a string-based enum that identifies different aggregation functions
|
||||
// that can be applied to expressions.
|
||||
type AggregateOp string |
||||
|
||||
const ( |
||||
// AggregateOpSum represents a sum aggregation
|
||||
AggregateOpSum AggregateOp = "sum" |
||||
// AggregateOpAvg represents an average aggregation
|
||||
AggregateOpAvg AggregateOp = "avg" |
||||
// AggregateOpMin represents a minimum value aggregation
|
||||
AggregateOpMin AggregateOp = "min" |
||||
// AggregateOpMax represents a maximum value aggregation
|
||||
AggregateOpMax AggregateOp = "max" |
||||
// AggregateOpCount represents a count aggregation
|
||||
AggregateOpCount AggregateOp = "count" |
||||
) |
||||
|
||||
// Convenience constructors for each aggregate operation
|
||||
var ( |
||||
// Sum creates a sum aggregation expression
|
||||
Sum = newAggregateExprConstructor(AggregateOpSum) |
||||
// Avg creates an average aggregation expression
|
||||
Avg = newAggregateExprConstructor(AggregateOpAvg) |
||||
// Min creates a minimum value aggregation expression
|
||||
Min = newAggregateExprConstructor(AggregateOpMin) |
||||
// Max creates a maximum value aggregation expression
|
||||
Max = newAggregateExprConstructor(AggregateOpMax) |
||||
// Count creates a count aggregation expression
|
||||
Count = newAggregateExprConstructor(AggregateOpCount) |
||||
) |
||||
|
||||
// AggregateExpr represents an aggregation operation on an expression.
|
||||
// It encapsulates the operation to perform (sum, avg, etc.), the expression
|
||||
// to aggregate, and a name for the result.
|
||||
type AggregateExpr struct { |
||||
// name is the identifier for this aggregation
|
||||
name string |
||||
// op specifies which aggregation operation to perform
|
||||
op AggregateOp |
||||
// expr is the expression to aggregate
|
||||
expr Expr |
||||
} |
||||
|
||||
// newAggregateExprConstructor creates a constructor function for a specific aggregate operation.
|
||||
// This is a higher-order function that returns a function for creating aggregate expressions
|
||||
// with a specific operation type.
|
||||
func newAggregateExprConstructor(op AggregateOp) func(name string, expr Expr) AggregateExpr { |
||||
return func(name string, expr Expr) AggregateExpr { |
||||
return AggregateExpr{ |
||||
name: name, |
||||
op: op, |
||||
expr: expr, |
||||
} |
||||
} |
||||
} |
||||
|
||||
// Type returns the type of the expression.
|
||||
// For aggregate expressions, this is always ExprTypeAggregate.
|
||||
func (a AggregateExpr) Type() ExprType { |
||||
return ExprTypeAggregate |
||||
} |
||||
|
||||
// Name returns the name of the aggregation.
|
||||
// This is used as the column name in the output schema.
|
||||
func (a AggregateExpr) Name() string { |
||||
return a.name |
||||
} |
||||
|
||||
// Op returns the aggregation operation.
|
||||
// This identifies which aggregation function to apply.
|
||||
func (a AggregateExpr) Op() AggregateOp { |
||||
return a.op |
||||
} |
||||
|
||||
// SubExpr returns the expression being aggregated.
|
||||
// This is the input to the aggregation function.
|
||||
func (a AggregateExpr) SubExpr() Expr { |
||||
return a.expr |
||||
} |
||||
|
||||
// ToField converts the aggregation expression to a column schema.
|
||||
// It determines the output type based on the input expression and
|
||||
// the aggregation operation.
|
||||
func (a AggregateExpr) ToField(p Plan) schema.ColumnSchema { |
||||
// Get the input field schema
|
||||
|
||||
return schema.ColumnSchema{ |
||||
Name: a.name, |
||||
// Aggregations typically result in numeric types
|
||||
Type: determineAggregationTypeFromFieldType(a.expr.ToField(p).Type, a.op), |
||||
} |
||||
} |
||||
|
||||
// determineAggregationTypeFromFieldType calculates the output type of an aggregation
|
||||
// based on the input field type and the aggregation operation.
|
||||
// Currently, this is a placeholder that always returns int64, but it should be
|
||||
// implemented to handle different input types and operations correctly.
|
||||
func determineAggregationTypeFromFieldType(_ schema.ValueType, _ AggregateOp) schema.ValueType { |
||||
// TODO: implement
|
||||
return schema.ValueTypeInt64 |
||||
} |
||||
@ -1,74 +0,0 @@ |
||||
package logical |
||||
|
||||
import ( |
||||
"github.com/grafana/loki/v3/pkg/engine/planner/schema" |
||||
) |
||||
|
||||
// Aggregate represents a plan node that performs aggregation operations.
|
||||
// The output schema is organized with grouping columns followed by aggregate expressions.
|
||||
// It corresponds to the GROUP BY clause in SQL and is used to compute aggregate
|
||||
// functions like SUM, AVG, MIN, MAX, and COUNT over groups of rows.
|
||||
type Aggregate struct { |
||||
// input is the child plan node providing data to aggregate
|
||||
input Plan |
||||
// groupExprs are the expressions to group by
|
||||
groupExprs []Expr |
||||
// aggExprs are the aggregate expressions to compute
|
||||
aggExprs []AggregateExpr |
||||
} |
||||
|
||||
// NewAggregate creates a new Aggregate plan node.
|
||||
// The Aggregate logical plan calculates aggregates of underlying data such as
|
||||
// calculating minimum, maximum, averages, and sums of data. Aggregates are often
|
||||
// grouped by other columns (or expressions).
|
||||
// A simple example would be SELECT region, SUM(sales) FROM orders GROUP BY region.
|
||||
func newAggregate(input Plan, groupExprs []Expr, aggExprs []AggregateExpr) *Aggregate { |
||||
return &Aggregate{ |
||||
input: input, |
||||
groupExprs: groupExprs, |
||||
aggExprs: aggExprs, |
||||
} |
||||
} |
||||
|
||||
// Schema returns the schema of the data produced by this aggregate.
|
||||
// The schema consists of group-by expressions followed by aggregate expressions.
|
||||
// This ordering is important for downstream operations that expect group columns
|
||||
// to come before aggregate columns.
|
||||
func (a *Aggregate) Schema() schema.Schema { |
||||
var columns []schema.ColumnSchema |
||||
|
||||
// Group expressions come first
|
||||
for _, expr := range a.groupExprs { |
||||
columns = append(columns, expr.ToField(a.input)) |
||||
} |
||||
|
||||
// Followed by aggregate expressions
|
||||
for _, expr := range a.aggExprs { |
||||
columns = append(columns, expr.ToField(a.input)) |
||||
} |
||||
|
||||
return schema.FromColumns(columns) |
||||
} |
||||
|
||||
// Type implements the ast interface
|
||||
func (a *Aggregate) Type() PlanType { |
||||
return PlanTypeAggregate |
||||
} |
||||
|
||||
// GroupExprs returns the list of expressions to group by.
|
||||
// These expressions define the grouping keys for the aggregation.
|
||||
func (a *Aggregate) GroupExprs() []Expr { |
||||
return a.groupExprs |
||||
} |
||||
|
||||
// AggregateExprs returns the list of aggregate expressions to compute.
|
||||
// These expressions define the aggregate functions to apply to each group.
|
||||
func (a *Aggregate) AggregateExprs() []AggregateExpr { |
||||
return a.aggExprs |
||||
} |
||||
|
||||
// Child returns the input plan.
|
||||
// This is a convenience method for accessing the child plan.
|
||||
func (a *Aggregate) Child() Plan { |
||||
return a.input |
||||
} |
||||
@ -1,286 +0,0 @@ |
||||
package logical |
||||
|
||||
import ( |
||||
"fmt" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/engine/planner/schema" |
||||
) |
||||
|
||||
/* |
||||
This file defines the structures and methods for binary operations within the logical query plan. |
||||
|
||||
Key components: |
||||
1. BinOpType: An enum representing the type of binary operation (e.g., math, comparison, set). |
||||
2. BinOpExpr: A struct representing a binary operation expression, which includes: |
||||
- name: Identifier for the binary operation. |
||||
- ty: Type of binary operation (BinOpType). |
||||
- op: The actual operation (e.g., +, -, ==, !=, &&, ||). |
||||
- l: Left expression operand. |
||||
- r: Right expression operand. |
||||
3. Methods for BinOpExpr: |
||||
- ToField: Converts the binary operation expression to a column schema. |
||||
- Type: Returns the type of binary operation. |
||||
- Name: Returns the name of the binary operation. |
||||
*/ |
||||
|
||||
// UNKNOWN is a constant used for string representation of unknown operation types
|
||||
const UNKNOWN = "unknown" |
||||
|
||||
// BinOpType is an enum representing the category of binary operation.
|
||||
// It allows for grouping similar operations together for type-safe handling.
|
||||
type BinOpType int |
||||
|
||||
const ( |
||||
BinOpTypeInvalid BinOpType = iota // Invalid or uninitialized binary operation
|
||||
BinOpTypeMath // Mathematical operations (+, -, *, /, %)
|
||||
BinOpTypeCmp // Comparison operations (==, !=, <, <=, >, >=)
|
||||
BinOpTypeSet // Set operations (AND, OR, NOT, XOR)
|
||||
) |
||||
|
||||
// String returns a human-readable representation of the binary operation type.
|
||||
func (t BinOpType) String() string { |
||||
switch t { |
||||
case BinOpTypeMath: |
||||
return "math" |
||||
case BinOpTypeCmp: |
||||
return "cmp" |
||||
case BinOpTypeSet: |
||||
return "set" |
||||
default: |
||||
return UNKNOWN |
||||
} |
||||
} |
||||
|
||||
// BinOpExpr represents a binary operation expression in the query plan.
|
||||
// It combines two expressions with an operation to produce a result.
|
||||
type BinOpExpr struct { |
||||
// name is the identifier for this binary operation
|
||||
name string |
||||
// ty is the type of binary operation (e.g. math, cmp, set)
|
||||
ty BinOpType |
||||
// op is the actual operation (e.g. +, -, ==, !=, &&, ||)
|
||||
op int |
||||
// l is the left expression
|
||||
l Expr |
||||
// r is the right expression
|
||||
r Expr |
||||
} |
||||
|
||||
// ToField converts the binary operation to a column schema.
|
||||
// The name of the column is the name of the binary operation,
|
||||
// and the type is derived from the left operand.
|
||||
func (b BinOpExpr) ToField(p Plan) schema.ColumnSchema { |
||||
return schema.ColumnSchema{ |
||||
Name: b.name, |
||||
Type: b.l.ToField(p).Type, |
||||
} |
||||
} |
||||
|
||||
// Type returns the type of the binary operation.
|
||||
func (b BinOpExpr) Type() BinOpType { |
||||
return b.ty |
||||
} |
||||
|
||||
// Name returns the name of the binary operation.
|
||||
func (b BinOpExpr) Name() string { |
||||
return b.name |
||||
} |
||||
|
||||
// Left returns the left operand of the binary operation.
|
||||
func (b BinOpExpr) Left() Expr { |
||||
return b.l |
||||
} |
||||
|
||||
// Right returns the right operand of the binary operation.
|
||||
func (b BinOpExpr) Right() Expr { |
||||
return b.r |
||||
} |
||||
|
||||
// Op returns a string representation of the binary operation.
|
||||
// It delegates to the appropriate type-specific operation based on the operation type.
|
||||
func (b BinOpExpr) Op() fmt.Stringer { |
||||
switch b.ty { |
||||
case BinOpTypeMath: |
||||
return BinaryOpMath(b.op) |
||||
case BinOpTypeCmp: |
||||
return BinaryOpCmp(b.op) |
||||
case BinOpTypeSet: |
||||
return BinaryOpSet(b.op) |
||||
default: |
||||
panic(fmt.Sprintf("unknown binary operation type: %d", b.ty)) |
||||
} |
||||
} |
||||
|
||||
// BinaryOpMath represents mathematical binary operations
|
||||
type BinaryOpMath int |
||||
|
||||
const ( |
||||
BinaryOpMathInvalid BinaryOpMath = iota |
||||
// BinaryOpAdd represents addition operation (+)
|
||||
BinaryOpAdd |
||||
// BinaryOpSubtract represents subtraction operation (-)
|
||||
BinaryOpSubtract |
||||
// BinaryOpMultiply represents multiplication operation (*)
|
||||
BinaryOpMultiply |
||||
// BinaryOpDivide represents division operation (/)
|
||||
BinaryOpDivide |
||||
// BinaryOpModulo represents modulo operation (%)
|
||||
BinaryOpModulo |
||||
) |
||||
|
||||
// String returns a human-readable representation of the mathematical operation.
|
||||
func (b BinaryOpMath) String() string { |
||||
switch b { |
||||
case BinaryOpAdd: |
||||
return "+" |
||||
case BinaryOpSubtract: |
||||
return "-" |
||||
case BinaryOpMultiply: |
||||
return "*" |
||||
case BinaryOpDivide: |
||||
return "/" |
||||
case BinaryOpModulo: |
||||
return "%" |
||||
default: |
||||
return "unknown" |
||||
} |
||||
} |
||||
|
||||
// BinaryOpCmp represents comparison binary operations
|
||||
type BinaryOpCmp int |
||||
|
||||
const ( |
||||
BinaryOpCmpInvalid BinaryOpCmp = iota |
||||
// BinaryOpEq represents equality comparison (==)
|
||||
BinaryOpEq |
||||
// BinaryOpNeq represents inequality comparison (!=)
|
||||
BinaryOpNeq |
||||
// BinaryOpLt represents less than comparison (<)
|
||||
BinaryOpLt |
||||
// BinaryOpLte represents less than or equal comparison (<=)
|
||||
BinaryOpLte |
||||
// BinaryOpGt represents greater than comparison (>)
|
||||
BinaryOpGt |
||||
// BinaryOpGte represents greater than or equal comparison (>=)
|
||||
BinaryOpGte |
||||
) |
||||
|
||||
// String returns a human-readable representation of the comparison operation.
|
||||
func (b BinaryOpCmp) String() string { |
||||
switch b { |
||||
case BinaryOpEq: |
||||
return "==" |
||||
case BinaryOpNeq: |
||||
return "!=" |
||||
case BinaryOpLt: |
||||
return "<" |
||||
case BinaryOpLte: |
||||
return "<=" |
||||
case BinaryOpGt: |
||||
return ">" |
||||
case BinaryOpGte: |
||||
return ">=" |
||||
default: |
||||
return UNKNOWN |
||||
} |
||||
} |
||||
|
||||
// BinaryOpSet represents set operations between boolean expressions
|
||||
type BinaryOpSet int |
||||
|
||||
const ( |
||||
BinaryOpSetInvalid BinaryOpSet = iota |
||||
// BinaryOpAnd represents logical AND operation
|
||||
BinaryOpAnd |
||||
// BinaryOpOr represents logical OR operation
|
||||
BinaryOpOr |
||||
// BinaryOpNot represents logical NOT operation (also known as "unless")
|
||||
BinaryOpNot |
||||
// BinaryOpXor represents logical XOR operation
|
||||
BinaryOpXor |
||||
) |
||||
|
||||
// String returns a human-readable representation of the set operation.
|
||||
func (b BinaryOpSet) String() string { |
||||
switch b { |
||||
case BinaryOpAnd: |
||||
return "and" |
||||
case BinaryOpOr: |
||||
return "or" |
||||
case BinaryOpNot: |
||||
return "not" |
||||
case BinaryOpXor: |
||||
return "xor" |
||||
default: |
||||
return UNKNOWN |
||||
} |
||||
} |
||||
|
||||
// newBinOpConstructor creates a constructor function for binary operations of a specific type.
|
||||
// This is a higher-order function that returns a function for creating binary operations.
|
||||
func newBinOpConstructor(t BinOpType, op int) func(name string, l Expr, r Expr) Expr { |
||||
return func(name string, l Expr, r Expr) Expr { |
||||
binop := BinOpExpr{ |
||||
name: name, |
||||
ty: t, |
||||
op: op, |
||||
l: l, |
||||
r: r, |
||||
} |
||||
return NewBinOpExpr(binop) |
||||
} |
||||
} |
||||
|
||||
// newBinOpSetConstructor creates a constructor function for set operations.
|
||||
func newBinOpSetConstructor(op BinaryOpSet) func(name string, l Expr, r Expr) Expr { |
||||
return newBinOpConstructor(BinOpTypeSet, int(op)) |
||||
} |
||||
|
||||
// newBinOpCmpConstructor creates a constructor function for comparison operations.
|
||||
func newBinOpCmpConstructor(op BinaryOpCmp) func(name string, l Expr, r Expr) Expr { |
||||
return newBinOpConstructor(BinOpTypeCmp, int(op)) |
||||
} |
||||
|
||||
// newBinOpMathConstructor creates a constructor function for mathematical operations.
|
||||
func newBinOpMathConstructor(op BinaryOpMath) func(name string, l Expr, r Expr) Expr { |
||||
return newBinOpConstructor(BinOpTypeMath, int(op)) |
||||
} |
||||
|
||||
var ( |
||||
// And creates a logical AND expression
|
||||
And = newBinOpSetConstructor(BinaryOpAnd) |
||||
// Or creates a logical OR expression
|
||||
Or = newBinOpSetConstructor(BinaryOpOr) |
||||
// Not creates a logical NOT expression
|
||||
Not = newBinOpSetConstructor(BinaryOpNot) |
||||
// Xor creates a logical XOR expression
|
||||
Xor = newBinOpSetConstructor(BinaryOpXor) |
||||
) |
||||
|
||||
var ( |
||||
// Eq creates an equality comparison expression
|
||||
Eq = newBinOpCmpConstructor(BinaryOpEq) |
||||
// Neq creates an inequality comparison expression
|
||||
Neq = newBinOpCmpConstructor(BinaryOpNeq) |
||||
// Lt creates a less than comparison expression
|
||||
Lt = newBinOpCmpConstructor(BinaryOpLt) |
||||
// Lte creates a less than or equal comparison expression
|
||||
Lte = newBinOpCmpConstructor(BinaryOpLte) |
||||
// Gt creates a greater than comparison expression
|
||||
Gt = newBinOpCmpConstructor(BinaryOpGt) |
||||
// Gte creates a greater than or equal comparison expression
|
||||
Gte = newBinOpCmpConstructor(BinaryOpGte) |
||||
) |
||||
|
||||
var ( |
||||
// Add creates a binary addition expression
|
||||
Add = newBinOpMathConstructor(BinaryOpAdd) |
||||
// Subtract creates a binary subtraction expression
|
||||
Subtract = newBinOpMathConstructor(BinaryOpSubtract) |
||||
// Multiply creates a binary multiplication expression
|
||||
Multiply = newBinOpMathConstructor(BinaryOpMultiply) |
||||
// Divide creates a binary division expression
|
||||
Divide = newBinOpMathConstructor(BinaryOpDivide) |
||||
// Modulo creates a binary modulo expression
|
||||
Modulo = newBinOpMathConstructor(BinaryOpModulo) |
||||
) |
||||
@ -0,0 +1,65 @@ |
||||
package logical |
||||
|
||||
import ( |
||||
"github.com/grafana/loki/v3/pkg/engine/planner/schema" |
||||
) |
||||
|
||||
// Builder provides an ergonomic interface for constructing a [Plan].
|
||||
type Builder struct { |
||||
val Value |
||||
} |
||||
|
||||
// NewBuilder creates a new Builder from a Value, where the starting Value is
|
||||
// usually a [MakeTable].
|
||||
func NewBuilder(val Value) *Builder { |
||||
return &Builder{val: val} |
||||
} |
||||
|
||||
// Select applies a [Select] operation to the Builder.
|
||||
func (b *Builder) Select(predicate Value) *Builder { |
||||
return &Builder{ |
||||
val: &Select{ |
||||
Table: b.val, |
||||
Predicate: predicate, |
||||
}, |
||||
} |
||||
} |
||||
|
||||
// Limit applies a [Limit] operation to the Builder.
|
||||
func (b *Builder) Limit(skip uint64, fetch uint64) *Builder { |
||||
return &Builder{ |
||||
val: &Limit{ |
||||
Table: b.val, |
||||
Skip: skip, |
||||
Fetch: fetch, |
||||
}, |
||||
} |
||||
} |
||||
|
||||
// Sort applies a [Sort] operation to the Builder.
|
||||
func (b *Builder) Sort(column ColumnRef, ascending, nullsFirst bool) *Builder { |
||||
return &Builder{ |
||||
val: &Sort{ |
||||
Table: b.val, |
||||
|
||||
Column: column, |
||||
Ascending: ascending, |
||||
NullsFirst: nullsFirst, |
||||
}, |
||||
} |
||||
} |
||||
|
||||
// Schema returns the schema of the data that will be produced by this Builder.
|
||||
func (b *Builder) Schema() *schema.Schema { |
||||
return b.val.Schema() |
||||
} |
||||
|
||||
// Value returns the underlying [Value]. This is useful when you need to access
|
||||
// the value directly, such as when passing it to a function that operates on
|
||||
// values rather than a Builder.
|
||||
func (b *Builder) Value() Value { return b.val } |
||||
|
||||
// ToPlan converts the Builder to a Plan.
|
||||
func (b *Builder) ToPlan() (*Plan, error) { |
||||
return convertToPlan(b.val) |
||||
} |
||||
@ -0,0 +1,134 @@ |
||||
package logical |
||||
|
||||
import ( |
||||
"fmt" |
||||
) |
||||
|
||||
// convertToPlan converts a [Value] into a [Plan]. The value becomes the last
|
||||
// instruction returned by [Return].
|
||||
func convertToPlan(value Value) (*Plan, error) { |
||||
var builder ssaBuilder |
||||
value, err := builder.process(value) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("error converting plan to SSA: %w", err) |
||||
} |
||||
|
||||
// Add the final Return instruction based on the last value.
|
||||
builder.instructions = append(builder.instructions, &Return{Value: value}) |
||||
|
||||
return &Plan{Instructions: builder.instructions}, nil |
||||
} |
||||
|
||||
// ssaBuilder is a helper type for building SSA forms
|
||||
type ssaBuilder struct { |
||||
instructions []Instruction |
||||
nextID int |
||||
} |
||||
|
||||
func (b *ssaBuilder) getID() int { |
||||
b.nextID++ |
||||
return b.nextID |
||||
} |
||||
|
||||
// processPlan processes a logical plan and returns the resulting Value.
|
||||
func (b *ssaBuilder) process(value Value) (Value, error) { |
||||
switch value := value.(type) { |
||||
case *MakeTable: |
||||
return b.processMakeTablePlan(value) |
||||
case *Select: |
||||
return b.processSelectPlan(value) |
||||
case *Limit: |
||||
return b.processLimitPlan(value) |
||||
case *Sort: |
||||
return b.processSortPlan(value) |
||||
|
||||
case *UnaryOp: |
||||
return b.processUnaryOp(value) |
||||
case *BinOp: |
||||
return b.processBinOp(value) |
||||
case *ColumnRef: |
||||
return b.processColumnRef(value) |
||||
case *Literal: |
||||
return b.processLiteral(value) |
||||
|
||||
default: |
||||
return nil, fmt.Errorf("unsupported value type %T", value) |
||||
} |
||||
} |
||||
|
||||
func (b *ssaBuilder) processMakeTablePlan(plan *MakeTable) (Value, error) { |
||||
if _, err := b.process(plan.Selector); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
plan.id = fmt.Sprintf("%%%d", b.getID()) |
||||
b.instructions = append(b.instructions, plan) |
||||
return plan, nil |
||||
} |
||||
|
||||
func (b *ssaBuilder) processSelectPlan(plan *Select) (Value, error) { |
||||
// Process the child plan first
|
||||
if _, err := b.process(plan.Table); err != nil { |
||||
return nil, err |
||||
} else if _, err := b.process(plan.Predicate); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
// Create a node for the select
|
||||
plan.id = fmt.Sprintf("%%%d", b.getID()) |
||||
b.instructions = append(b.instructions, plan) |
||||
return plan, nil |
||||
} |
||||
|
||||
func (b *ssaBuilder) processLimitPlan(plan *Limit) (Value, error) { |
||||
if _, err := b.process(plan.Table); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
plan.id = fmt.Sprintf("%%%d", b.getID()) |
||||
b.instructions = append(b.instructions, plan) |
||||
return plan, nil |
||||
} |
||||
|
||||
func (b *ssaBuilder) processSortPlan(plan *Sort) (Value, error) { |
||||
if _, err := b.process(plan.Table); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
plan.id = fmt.Sprintf("%%%d", b.getID()) |
||||
b.instructions = append(b.instructions, plan) |
||||
return plan, nil |
||||
} |
||||
|
||||
func (b *ssaBuilder) processUnaryOp(value *UnaryOp) (Value, error) { |
||||
if _, err := b.process(value.Value); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
// Create a node for the unary operation
|
||||
value.id = fmt.Sprintf("%%%d", b.getID()) |
||||
b.instructions = append(b.instructions, value) |
||||
return value, nil |
||||
} |
||||
|
||||
func (b *ssaBuilder) processBinOp(expr *BinOp) (Value, error) { |
||||
if _, err := b.process(expr.Left); err != nil { |
||||
return nil, err |
||||
} else if _, err := b.process(expr.Right); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
expr.id = fmt.Sprintf("%%%d", b.getID()) |
||||
b.instructions = append(b.instructions, expr) |
||||
return expr, nil |
||||
} |
||||
|
||||
func (b *ssaBuilder) processColumnRef(value *ColumnRef) (Value, error) { |
||||
// Nothing to do.
|
||||
return value, nil |
||||
} |
||||
|
||||
func (b *ssaBuilder) processLiteral(expr *Literal) (Value, error) { |
||||
// Nothing to do.
|
||||
return expr, nil |
||||
} |
||||
@ -1,32 +0,0 @@ |
||||
package logical |
||||
|
||||
import ( |
||||
"fmt" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/engine/planner/schema" |
||||
) |
||||
|
||||
// ColumnExpr represents a reference to a column in the input data
|
||||
type ColumnExpr struct { |
||||
// name is the identifier of the referenced column
|
||||
name string |
||||
} |
||||
|
||||
// Col creates a column reference expression
|
||||
func Col(name string) Expr { |
||||
return NewColumnExpr(ColumnExpr{name: name}) |
||||
} |
||||
|
||||
// ToField looks up and returns the schema for the referenced column
|
||||
func (c ColumnExpr) ToField(p Plan) schema.ColumnSchema { |
||||
for _, col := range p.Schema().Columns { |
||||
if col.Name == c.name { |
||||
return col |
||||
} |
||||
} |
||||
panic(fmt.Sprintf("column %s not found", c.name)) |
||||
} |
||||
|
||||
func (c ColumnExpr) ColumnName() string { |
||||
return c.name |
||||
} |
||||
@ -0,0 +1,65 @@ |
||||
package logical |
||||
|
||||
import ( |
||||
"fmt" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/engine/planner/schema" |
||||
) |
||||
|
||||
// ColumnType denotes the column type for a [ColumnRef].
|
||||
type ColumnType int |
||||
|
||||
// Recognized values of [ColumnType].
|
||||
const ( |
||||
// ColumnTypeInvalid indicates an invalid column type.
|
||||
ColumnTypeInvalid ColumnType = iota |
||||
|
||||
ColumnTypeBuiltin // ColumnTypeBuiltin represents a builtin column (such as timestamp).
|
||||
ColumnTypeLabel // ColumnTypeLabel represents a column from a stream label.
|
||||
ColumnTypeMetadata // ColumnTypeMetadata represents a column from a log metadata.
|
||||
) |
||||
|
||||
// String returns a human-readable representation of the column type.
|
||||
func (ct ColumnType) String() string { |
||||
switch ct { |
||||
case ColumnTypeInvalid: |
||||
return "invalid" |
||||
case ColumnTypeBuiltin: |
||||
return "builtin" |
||||
case ColumnTypeLabel: |
||||
return "label" |
||||
case ColumnTypeMetadata: |
||||
return "metadata" |
||||
default: |
||||
return fmt.Sprintf("ColumnType(%d)", ct) |
||||
} |
||||
} |
||||
|
||||
// A ColumnRef referenes a column within a table relation. ColumnRef only
|
||||
// implements [Value].
|
||||
type ColumnRef struct { |
||||
Column string // Name of the column being referenced.
|
||||
Type ColumnType // Type of the column being referenced.
|
||||
} |
||||
|
||||
var ( |
||||
_ Value = (*ColumnRef)(nil) |
||||
) |
||||
|
||||
// Name returns the identifier of the ColumnRef, which combines the column type
|
||||
// and column name being referenced.
|
||||
func (c *ColumnRef) Name() string { |
||||
return fmt.Sprintf("%s.%s", c.Type, c.Column) |
||||
} |
||||
|
||||
// String returns [ColumnRef.Name].
|
||||
func (c *ColumnRef) String() string { return c.Name() } |
||||
|
||||
// Schema returns the schema of the column being referenced.
|
||||
func (c *ColumnRef) Schema() *schema.Schema { |
||||
// TODO(rfratto): Update *schema.Schema to allow representing a single
|
||||
// column.
|
||||
return nil |
||||
} |
||||
|
||||
func (c *ColumnRef) isValue() {} |
||||
@ -1,126 +0,0 @@ |
||||
package logical |
||||
|
||||
import ( |
||||
"github.com/grafana/loki/v3/pkg/engine/planner/schema" |
||||
) |
||||
|
||||
// DataFrame provides an ergonomic interface for building logical query plans.
|
||||
// It wraps a logical Plan and provides fluent methods for common operations
|
||||
// like projection, filtering, and aggregation. This makes it easier to build
|
||||
// complex query plans in a readable and maintainable way.
|
||||
type DataFrame struct { |
||||
plan Plan |
||||
} |
||||
|
||||
// NewDataFrame creates a new DataFrame from a logical plan.
|
||||
// This is typically used to wrap a table scan plan as the starting point
|
||||
// for building a more complex query.
|
||||
func NewDataFrame(plan Plan) *DataFrame { |
||||
return &DataFrame{plan: plan} |
||||
} |
||||
|
||||
// Project applies a projection to the DataFrame.
|
||||
// It creates a new DataFrame with a projection plan that selects or computes
|
||||
// the specified expressions from the input DataFrame.
|
||||
// This corresponds to the SELECT clause in SQL.
|
||||
func (df *DataFrame) Project(exprs []Expr) *DataFrame { |
||||
return &DataFrame{ |
||||
plan: NewProjection(df.plan, exprs), |
||||
} |
||||
} |
||||
|
||||
// Filter applies a filter to the DataFrame.
|
||||
// It creates a new DataFrame with a filter plan that selects rows from the
|
||||
// input DataFrame based on the specified boolean expression.
|
||||
// This corresponds to the WHERE clause in SQL.
|
||||
func (df *DataFrame) Filter(expr Expr) *DataFrame { |
||||
return &DataFrame{ |
||||
plan: NewFilter(df.plan, expr), |
||||
} |
||||
} |
||||
|
||||
// Aggregate applies grouping and aggregation to the DataFrame.
|
||||
// It creates a new DataFrame with an aggregate plan that groups rows by the
|
||||
// specified expressions and computes the specified aggregate expressions.
|
||||
// This corresponds to the GROUP BY clause in SQL.
|
||||
func (df *DataFrame) Aggregate(groupBy []Expr, aggExprs []AggregateExpr) *DataFrame { |
||||
return &DataFrame{ |
||||
plan: NewAggregate(df.plan, groupBy, aggExprs), |
||||
} |
||||
} |
||||
|
||||
// Limit applies a row limit to the DataFrame.
|
||||
// It creates a new DataFrame with a limit plan that restricts the number of rows
|
||||
// returned, optionally with an offset to skip initial rows.
|
||||
// This corresponds to the LIMIT and OFFSET clauses in SQL.
|
||||
//
|
||||
// Parameters:
|
||||
// - skip: Number of rows to skip before returning results (OFFSET in SQL).
|
||||
// Use 0 to start from the first row.
|
||||
// - fetch: Maximum number of rows to return after skipping (LIMIT in SQL).
|
||||
// Use 0 to return all remaining rows.
|
||||
//
|
||||
// Example usage:
|
||||
//
|
||||
// // Return the first 10 rows
|
||||
// df = df.Limit(0, 10)
|
||||
//
|
||||
// // Skip the first 20 rows and return the next 10
|
||||
// df = df.Limit(20, 10)
|
||||
//
|
||||
// // Skip the first 100 rows and return all remaining rows
|
||||
// df = df.Limit(100, 0)
|
||||
//
|
||||
// The Limit operation is typically applied as the final step in a query,
|
||||
// after filtering, projection, and aggregation.
|
||||
func (df *DataFrame) Limit(skip uint64, fetch uint64) *DataFrame { |
||||
return &DataFrame{ |
||||
plan: NewLimit(df.plan, skip, fetch), |
||||
} |
||||
} |
||||
|
||||
// Schema returns the schema of the data that will be produced by this DataFrame.
|
||||
// This is useful for understanding the structure of the data that will result
|
||||
// from executing the query plan.
|
||||
func (df *DataFrame) Schema() schema.Schema { |
||||
return df.plan.Schema() |
||||
} |
||||
|
||||
// LogicalPlan returns the underlying logical plan.
|
||||
// This is useful when you need to access the plan directly, such as when
|
||||
// passing it to a function that operates on plans rather than DataFrames.
|
||||
func (df *DataFrame) LogicalPlan() Plan { |
||||
return df.plan |
||||
} |
||||
|
||||
// ToSSA converts the DataFrame to SSA form.
|
||||
// This is useful for optimizing and executing the query plan, as the SSA form
|
||||
// is easier to analyze and transform than the tree-based logical plan.
|
||||
func (df *DataFrame) ToSSA() (*SSAForm, error) { |
||||
return ConvertToSSA(df.plan) |
||||
} |
||||
|
||||
// Sort applies a sort operation to the DataFrame.
|
||||
// It creates a new DataFrame with a sort plan that orders rows from the
|
||||
// input DataFrame based on the specified sort expression.
|
||||
// This corresponds to the ORDER BY clause in SQL.
|
||||
//
|
||||
// Parameters:
|
||||
// - expr: The sort expression specifying the column to sort by, sort direction,
|
||||
// and NULL handling.
|
||||
//
|
||||
// Example usage:
|
||||
//
|
||||
// // Sort by age in ascending order, NULLs last
|
||||
// df = df.Sort(NewSortExpr("sort_by_age", Col("age"), true, false))
|
||||
//
|
||||
// // Sort by name in descending order, NULLs first
|
||||
// df = df.Sort(NewSortExpr("sort_by_name", Col("name"), false, true))
|
||||
//
|
||||
// The Sort operation is typically applied after filtering and projection,
|
||||
// but before limiting the results.
|
||||
func (df *DataFrame) Sort(expr SortExpr) *DataFrame { |
||||
return &DataFrame{ |
||||
plan: NewSort(df.plan, expr), |
||||
} |
||||
} |
||||
@ -1,129 +0,0 @@ |
||||
package logical |
||||
|
||||
import ( |
||||
"fmt" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/engine/planner/schema" |
||||
) |
||||
|
||||
// ExprType is an enum representing the type of expression.
|
||||
// It allows consumers to determine the concrete type of an Expr
|
||||
// and safely cast to the appropriate interface.
|
||||
type ExprType int |
||||
|
||||
const ( |
||||
ExprTypeInvalid ExprType = iota |
||||
ExprTypeColumn // Represents a reference to a column in the input
|
||||
ExprTypeLiteral // Represents a literal value
|
||||
ExprTypeBinaryOp // Represents a binary operation (e.g., a + b)
|
||||
ExprTypeAggregate // Represents an aggregate function (e.g., SUM(a))
|
||||
ExprTypeSort // Represents a sort expression
|
||||
) |
||||
|
||||
func (t ExprType) String() string { |
||||
switch t { |
||||
case ExprTypeColumn: |
||||
return "Column" |
||||
case ExprTypeLiteral: |
||||
return "Literal" |
||||
case ExprTypeBinaryOp: |
||||
return "BinaryOp" |
||||
case ExprTypeAggregate: |
||||
return "Aggregate" |
||||
case ExprTypeSort: |
||||
return "Sort" |
||||
default: |
||||
return "Unknown" |
||||
} |
||||
} |
||||
|
||||
type Expr struct { |
||||
ty ExprType |
||||
val any |
||||
} |
||||
|
||||
func (e Expr) Type() ExprType { |
||||
return e.ty |
||||
} |
||||
|
||||
func (e Expr) ToField(p Plan) schema.ColumnSchema { |
||||
switch e.ty { |
||||
case ExprTypeColumn: |
||||
return e.val.(*ColumnExpr).ToField(p) |
||||
case ExprTypeLiteral: |
||||
return e.val.(*LiteralExpr).ToField(p) |
||||
case ExprTypeBinaryOp: |
||||
return e.val.(*BinOpExpr).ToField(p) |
||||
case ExprTypeAggregate: |
||||
return e.val.(*AggregateExpr).ToField(p) |
||||
default: |
||||
panic(fmt.Sprintf("unsupported expression type: %d", e.ty)) |
||||
} |
||||
} |
||||
|
||||
// shortcut: must be checked elsewhere
|
||||
func (e Expr) Column() *ColumnExpr { |
||||
if e.ty != ExprTypeColumn { |
||||
panic(fmt.Sprintf("expression is not a column: %d", e.ty)) |
||||
} |
||||
return e.val.(*ColumnExpr) |
||||
} |
||||
|
||||
func NewColumnExpr(expr ColumnExpr) Expr { |
||||
return Expr{ |
||||
ty: ExprTypeColumn, |
||||
val: &expr, |
||||
} |
||||
} |
||||
|
||||
func NewLiteralExpr(expr LiteralExpr) Expr { |
||||
return Expr{ |
||||
ty: ExprTypeLiteral, |
||||
val: &expr, |
||||
} |
||||
} |
||||
|
||||
func NewBinOpExpr(expr BinOpExpr) Expr { |
||||
return Expr{ |
||||
ty: ExprTypeBinaryOp, |
||||
val: &expr, |
||||
} |
||||
} |
||||
|
||||
func NewAggregateExpr(expr AggregateExpr) Expr { |
||||
return Expr{ |
||||
ty: ExprTypeAggregate, |
||||
val: &expr, |
||||
} |
||||
} |
||||
|
||||
// shortcut: must be checked elsewhere
|
||||
func (e Expr) Literal() *LiteralExpr { |
||||
if e.ty != ExprTypeLiteral { |
||||
panic(fmt.Sprintf("expression is not a literal: %d", e.ty)) |
||||
} |
||||
return e.val.(*LiteralExpr) |
||||
} |
||||
|
||||
// shortcut: must be checked elsewhere
|
||||
func (e Expr) BinaryOp() *BinOpExpr { |
||||
if e.ty != ExprTypeBinaryOp { |
||||
panic(fmt.Sprintf("expression is not a binary operation: %d", e.ty)) |
||||
} |
||||
return e.val.(*BinOpExpr) |
||||
} |
||||
|
||||
// shortcut: must be checked elsewhere
|
||||
func (e Expr) Aggregate() *AggregateExpr { |
||||
if e.ty != ExprTypeAggregate { |
||||
panic(fmt.Sprintf("expression is not an aggregate: %d", e.ty)) |
||||
} |
||||
return e.val.(*AggregateExpr) |
||||
} |
||||
|
||||
func (e Expr) Sort() *SortExpr { |
||||
if e.ty != ExprTypeSort { |
||||
panic(fmt.Sprintf("expression is not a sort: %d", e.ty)) |
||||
} |
||||
return e.val.(*SortExpr) |
||||
} |
||||
@ -1,51 +0,0 @@ |
||||
package logical |
||||
|
||||
import ( |
||||
"github.com/grafana/loki/v3/pkg/engine/planner/schema" |
||||
) |
||||
|
||||
// Filter represents a plan node that filters rows based on a boolean expression.
|
||||
// It corresponds to the WHERE clause in SQL and is used to select a subset of rows
|
||||
// from the input plan based on a predicate expression.
|
||||
type Filter struct { |
||||
// input is the child plan node providing data to filter
|
||||
input Plan |
||||
// expr is the boolean expression used to filter rows
|
||||
expr Expr |
||||
} |
||||
|
||||
// newFilter creates a new Filter plan node.
|
||||
// It takes an input plan and a boolean expression that determines which rows
|
||||
// should be selected (included) in its output. This is represented by the WHERE
|
||||
// clause in SQL. A simple example would be SELECT * FROM foo WHERE a > 5.
|
||||
// The filter expression needs to evaluate to a Boolean result.
|
||||
func newFilter(input Plan, expr Expr) *Filter { |
||||
return &Filter{ |
||||
input: input, |
||||
expr: expr, |
||||
} |
||||
} |
||||
|
||||
// Schema returns the schema of the filter plan.
|
||||
// The schema of a filter is the same as the schema of its input,
|
||||
// as filtering only removes rows and doesn't modify the structure.
|
||||
func (f *Filter) Schema() schema.Schema { |
||||
return f.input.Schema() |
||||
} |
||||
|
||||
// Child returns the input plan.
|
||||
// This is a convenience method for accessing the child plan.
|
||||
func (f *Filter) Child() Plan { |
||||
return f.input |
||||
} |
||||
|
||||
// FilterExpr returns the filter expression.
|
||||
// This is the boolean expression used to determine which rows to include.
|
||||
func (f *Filter) FilterExpr() Expr { |
||||
return f.expr |
||||
} |
||||
|
||||
// Type implements the Plan interface
|
||||
func (f *Filter) Type() PlanType { |
||||
return PlanTypeFilter |
||||
} |
||||
@ -1,85 +0,0 @@ |
||||
package logical |
||||
|
||||
import ( |
||||
"github.com/grafana/loki/v3/pkg/engine/planner/schema" |
||||
) |
||||
|
||||
// Limit represents a plan node that limits the number of rows returned.
|
||||
// It corresponds to the LIMIT clause in SQL and is used to restrict the
|
||||
// number of rows returned by a query, optionally with an offset.
|
||||
//
|
||||
// The Limit plan is typically the final operation in a query plan, applied
|
||||
// after filtering, projection, and aggregation. It's useful for pagination
|
||||
// and for reducing the amount of data returned to the client.
|
||||
type Limit struct { |
||||
// input is the child plan node providing data to limit
|
||||
input Plan |
||||
// skip is the number of rows to skip before returning results (OFFSET)
|
||||
// A value of 0 means no rows are skipped
|
||||
skip uint64 |
||||
// fetch is the maximum number of rows to return (LIMIT)
|
||||
// A value of 0 means all rows are returned (after applying skip)
|
||||
fetch uint64 |
||||
} |
||||
|
||||
// Special values for skip and fetch
|
||||
const ( |
||||
// NoSkip indicates that no rows should be skipped (OFFSET 0)
|
||||
NoSkip uint64 = 0 |
||||
// NoLimit indicates that all rows should be returned (no LIMIT clause)
|
||||
NoLimit uint64 = 0 |
||||
) |
||||
|
||||
// newLimit creates a new Limit plan node.
|
||||
// The Limit logical plan restricts the number of rows returned by a query.
|
||||
// It takes an input plan, a skip value (for OFFSET), and a fetch value (for LIMIT).
|
||||
// If skip is 0, no rows are skipped. If fetch is 0, all rows are returned after applying skip.
|
||||
//
|
||||
// Example usage:
|
||||
//
|
||||
// // Return the first 10 rows
|
||||
// limit := newLimit(inputPlan, 0, 10)
|
||||
//
|
||||
// // Skip the first 20 rows and return the next 10
|
||||
// limit := newLimit(inputPlan, 20, 10)
|
||||
//
|
||||
// // Skip the first 100 rows and return all remaining rows
|
||||
// limit := newLimit(inputPlan, 100, 0)
|
||||
func newLimit(input Plan, skip uint64, fetch uint64) *Limit { |
||||
return &Limit{ |
||||
input: input, |
||||
skip: skip, |
||||
fetch: fetch, |
||||
} |
||||
} |
||||
|
||||
// Schema returns the schema of the limit operation.
|
||||
// The schema is the same as the input plan's schema since limiting
|
||||
// only affects the number of rows, not their structure.
|
||||
func (l *Limit) Schema() schema.Schema { |
||||
return l.input.Schema() |
||||
} |
||||
|
||||
// Type returns the plan type for this node.
|
||||
func (l *Limit) Type() PlanType { |
||||
return PlanTypeLimit |
||||
} |
||||
|
||||
// Child returns the input plan.
|
||||
func (l *Limit) Child() Plan { |
||||
return l.input |
||||
} |
||||
|
||||
// Skip returns the number of rows to skip.
|
||||
// This is used for implementing the OFFSET clause in SQL.
|
||||
// A value of 0 means no rows are skipped.
|
||||
func (l *Limit) Skip() uint64 { |
||||
return l.skip |
||||
} |
||||
|
||||
// Fetch returns the maximum number of rows to return.
|
||||
// This is used for implementing the LIMIT clause in SQL.
|
||||
// A value of 0 means all rows are returned (after applying skip).
|
||||
func (l *Limit) Fetch() uint64 { |
||||
return l.fetch |
||||
} |
||||
@ -1,99 +0,0 @@ |
||||
package logical |
||||
|
||||
import ( |
||||
"fmt" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/engine/planner/schema" |
||||
) |
||||
|
||||
// LiteralType is an enum representing the type of a literal value.
|
||||
// It allows for type-safe handling of different kinds of literal values.
|
||||
type LiteralType int |
||||
|
||||
// LiteralType constants define the supported types of literal values.
|
||||
// These loosely match the datasetmd.ValueType enum;
|
||||
// consider using that directly in the future.
|
||||
const ( |
||||
LiteralTypeInvalid LiteralType = iota // Invalid or uninitialized literal
|
||||
LiteralTypeString // String literal
|
||||
LiteralTypeInt64 // 64-bit integer literal
|
||||
) |
||||
|
||||
// String returns a human-readable representation of the literal type.
|
||||
func (t LiteralType) String() string { |
||||
switch t { |
||||
case LiteralTypeString: |
||||
return "string" |
||||
case LiteralTypeInt64: |
||||
return "int64" |
||||
default: |
||||
return "unknown" |
||||
} |
||||
} |
||||
|
||||
// LiteralExpr represents a literal value in the query plan.
|
||||
// It can hold different types of values, such as strings or integers.
|
||||
type LiteralExpr struct { |
||||
ty LiteralType // The type of the literal value
|
||||
val any // The actual literal value
|
||||
} |
||||
|
||||
// ValueString returns a string representation of the literal value.
|
||||
// This is used for display and debugging purposes.
|
||||
func (l LiteralExpr) ValueString() string { |
||||
switch l.ty { |
||||
case LiteralTypeString: |
||||
return l.val.(string) |
||||
} |
||||
return fmt.Sprintf("%v", l.val) |
||||
} |
||||
|
||||
// ToField converts the literal to a column schema.
|
||||
// The name of the column is derived from the string representation of the value.
|
||||
func (l LiteralExpr) ToField(_ Plan) schema.ColumnSchema { |
||||
switch l.ty { |
||||
case LiteralTypeString: |
||||
return schema.ColumnSchema{ |
||||
Name: l.val.(string), |
||||
Type: l.ValueType(), |
||||
} |
||||
case LiteralTypeInt64: |
||||
return schema.ColumnSchema{ |
||||
Name: fmt.Sprint(l.val.(int64)), |
||||
Type: l.ValueType(), |
||||
} |
||||
default: |
||||
panic(fmt.Sprintf("unsupported literal type: %d", l.ty)) |
||||
} |
||||
} |
||||
|
||||
// ValueType returns the schema.ValueType corresponding to this literal type.
|
||||
// This is used to determine the type of the column in the output schema.
|
||||
func (l LiteralExpr) ValueType() schema.ValueType { |
||||
switch l.ty { |
||||
case LiteralTypeString: |
||||
return schema.ValueTypeString |
||||
case LiteralTypeInt64: |
||||
return schema.ValueTypeInt64 |
||||
default: |
||||
panic(fmt.Sprintf("unsupported literal type: %d", l.ty)) |
||||
} |
||||
} |
||||
|
||||
// LitStr creates a string literal expression with the given value.
|
||||
// Example: LitStr("hello") creates a string literal with value "hello".
|
||||
func LitStr(v string) Expr { |
||||
return NewLiteralExpr(LiteralExpr{ |
||||
ty: LiteralTypeString, |
||||
val: v, |
||||
}) |
||||
} |
||||
|
||||
// LitI64 creates a 64-bit integer literal expression with the given value.
|
||||
// Example: LitI64(42) creates an integer literal with value 42.
|
||||
func LitI64(v int64) Expr { |
||||
return NewLiteralExpr(LiteralExpr{ |
||||
ty: LiteralTypeInt64, |
||||
val: v, |
||||
}) |
||||
} |
||||
@ -0,0 +1,94 @@ |
||||
// Package logical provides a logical query plan representation for data
|
||||
// processing operations.
|
||||
//
|
||||
// The logical plan is represented using static single-assignment (SSA) form of
|
||||
// intermediate representation (IR) for the operations performed on log data.
|
||||
//
|
||||
// For an introduction to SSA form, see
|
||||
// https://en.wikipedia.org/wiki/Static_single_assignment_form.
|
||||
//
|
||||
// The primary interfaces of this package are:
|
||||
//
|
||||
// - [Value], an expression that yields a value.
|
||||
// - [Instruction], a statement that consumes values and performs computation.
|
||||
// - [Plan], a sequence of instructions that produces a result.
|
||||
//
|
||||
// A computation that also yields a result implements both the [Value] and
|
||||
// [Instruction] interfaces. See the documentation comments on each type for
|
||||
// which of those interfaces it implements.
|
||||
//
|
||||
// Values are representable as either:
|
||||
//
|
||||
// - A column value (such as in [ColumnRef]),
|
||||
// - a relation (such as in [Select]), or
|
||||
// - a value literal (such as in [Literal]).
|
||||
//
|
||||
// The SSA form forms a graph: each [Value] may appear as an operand of one or
|
||||
// more [Instruction]s.
|
||||
package logical |
||||
|
||||
import ( |
||||
"fmt" |
||||
"strings" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/engine/planner/schema" |
||||
) |
||||
|
||||
// An Instruction is an SSA instruction that computes a new [Value] or has some
|
||||
// effect.
|
||||
//
|
||||
// Instructions that define a value (e.g., BinOp) also implement the Value
|
||||
// interface; an Instruction that only has an effect (e.g., Return) does not.
|
||||
type Instruction interface { |
||||
// String returns the disassembled SSA form of the Instruction. This does not
|
||||
// include the name of the Value if the Instruction also implements [Value].
|
||||
String() string |
||||
|
||||
// isInstruction is a marker method to prevent external implementations.
|
||||
isInstruction() |
||||
} |
||||
|
||||
// A Value is an SSA value that can be referenced by an [Instruction].
|
||||
type Value interface { |
||||
// Name returns an identifier for this Value (such as "%1"), which is used
|
||||
// when this Value appears as an operand of an Instruction.
|
||||
//
|
||||
// If the Value was not created by the logical planner, Name instead returns
|
||||
// the pointer address of the Value.
|
||||
Name() string |
||||
|
||||
// String returns human-readable information about the Value. If Value also
|
||||
// implements [Instruction], String returns the disassembled form of the
|
||||
// Instruction as documented by [Instruction.String].
|
||||
String() string |
||||
|
||||
// Schema returns the type of this Value.
|
||||
Schema() *schema.Schema |
||||
|
||||
// isValue is a marker method to prevent external implementations.
|
||||
isValue() |
||||
} |
||||
|
||||
// A Plan represents a sequence of [Instruction]s that ultimately produce a
|
||||
// [Value].
|
||||
//
|
||||
// The first [Return] instruction in the plan denotes the final output.
|
||||
type Plan struct { |
||||
Instructions []Instruction // Instructions of the plan in order.
|
||||
} |
||||
|
||||
// String prints out the entire plan SSA.
|
||||
func (p Plan) String() string { |
||||
var sb strings.Builder |
||||
|
||||
for _, inst := range p.Instructions { |
||||
switch inst := inst.(type) { |
||||
case Value: |
||||
fmt.Fprintf(&sb, "%s = %s\n", inst.Name(), inst.String()) |
||||
case Instruction: |
||||
fmt.Fprintf(&sb, "%s\n", inst.String()) |
||||
} |
||||
} |
||||
|
||||
return sb.String() |
||||
} |
||||
@ -0,0 +1,59 @@ |
||||
package logical |
||||
|
||||
import ( |
||||
"fmt" |
||||
"strings" |
||||
"testing" |
||||
|
||||
"github.com/stretchr/testify/require" |
||||
) |
||||
|
||||
func TestPlan_String(t *testing.T) { |
||||
// Build a query plan for this query sorted by `age` in ascending order:
|
||||
//
|
||||
// { app="users" } | age > 21
|
||||
b := NewBuilder( |
||||
&MakeTable{ |
||||
Selector: &BinOp{ |
||||
Left: &ColumnRef{Column: "app", Type: ColumnTypeLabel}, |
||||
Right: LiteralString("users"), |
||||
Op: BinOpKindEq, |
||||
}, |
||||
}, |
||||
).Select( |
||||
&BinOp{ |
||||
Left: &ColumnRef{Column: "age", Type: ColumnTypeMetadata}, |
||||
Right: LiteralInt64(21), |
||||
Op: BinOpKindGt, |
||||
}, |
||||
).Sort(ColumnRef{Column: "age", Type: ColumnTypeMetadata}, true, false) |
||||
|
||||
// Convert to SSA
|
||||
ssaForm, err := b.ToPlan() |
||||
require.NoError(t, err) |
||||
require.NotNil(t, ssaForm) |
||||
|
||||
t.Logf("SSA Form:\n%s", ssaForm.String()) |
||||
|
||||
// Define expected output
|
||||
exp := ` |
||||
%1 = EQ label.app, "users"
|
||||
%2 = MAKE_TABLE [selector=%1]
|
||||
%3 = GT metadata.age, 21
|
||||
%4 = SELECT %2 [predicate=%3]
|
||||
%5 = SORT %4 [column=metadata.age, asc=true, nulls_first=false] |
||||
RETURN %5
|
||||
` |
||||
exp = strings.TrimSpace(exp) |
||||
|
||||
// Get the actual output without the RETURN statement
|
||||
ssaOutput := ssaForm.String() |
||||
ssaLines := strings.Split(strings.TrimSpace(ssaOutput), "\n") |
||||
|
||||
expLines := strings.Split(exp, "\n") |
||||
require.Equal(t, len(expLines), len(ssaLines), "Expected and actual SSA output line counts do not match") |
||||
|
||||
for i, line := range expLines { |
||||
require.Equal(t, strings.TrimSpace(line), strings.TrimSpace(ssaLines[i]), fmt.Sprintf("Mismatch at line %d", i+1)) |
||||
} |
||||
} |
||||
@ -0,0 +1,109 @@ |
||||
package logical |
||||
|
||||
import ( |
||||
"fmt" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/engine/planner/schema" |
||||
) |
||||
|
||||
// BinOpKind denotes the kind of [BinOp] operation to perform.
|
||||
type BinOpKind int |
||||
|
||||
// Recognized values of [BinOpKind].
|
||||
const ( |
||||
// BinOpKindInvalid indicates an invalid binary operation.
|
||||
BinOpKindInvalid BinOpKind = iota |
||||
|
||||
BinOpKindEq // Equality comparison (==).
|
||||
BinOpKindNeq // Inequality comparison (!=).
|
||||
BinOpKindGt // Greater than comparison (>).
|
||||
BinOpKindGte // Greater than or equal comparison (>=).
|
||||
BinOpKindLt // Less than comparison (<).
|
||||
BinOpKindLte // Less than or equal comparison (<=).
|
||||
BinOpKindAnd // Logical AND operation (&&).
|
||||
BinOpKindOr // Logical OR operation (||).
|
||||
BinOpKindXor // Logical XOR operation (^).
|
||||
BinOpKindNot // Logical NOT operation (!).
|
||||
|
||||
BinOpKindAdd // Addition operation (+).
|
||||
BinOpKindSub // Subtraction operation (-).
|
||||
BinOpKindMul // Multiplication operation (*).
|
||||
BinOpKindDiv // Division operation (/).
|
||||
BinOpKindMod // Modulo operation (%).
|
||||
|
||||
BinOpKindMatchStr // String matching operation.
|
||||
BinOpKindNotMatchStr // String non-matching operation.
|
||||
BinOpKindMatchRe // Regular expression matching operation.
|
||||
BinOpKindNotMatchRe // Regular expression non-matching operation.
|
||||
) |
||||
|
||||
var binOpKindStrings = map[BinOpKind]string{ |
||||
BinOpKindInvalid: "invalid", |
||||
|
||||
BinOpKindEq: "EQ", |
||||
BinOpKindNeq: "NEQ", |
||||
BinOpKindGt: "GT", |
||||
BinOpKindGte: "GTE", |
||||
BinOpKindLt: "LT", |
||||
BinOpKindLte: "LTE", |
||||
BinOpKindAnd: "AND", |
||||
BinOpKindOr: "OR", |
||||
BinOpKindXor: "XOR", |
||||
BinOpKindNot: "NOT", |
||||
|
||||
BinOpKindAdd: "ADD", |
||||
BinOpKindSub: "SUB", |
||||
BinOpKindMul: "MUL", |
||||
BinOpKindDiv: "DIV", |
||||
BinOpKindMod: "MOD", |
||||
|
||||
BinOpKindMatchStr: "MATCH_STR", |
||||
BinOpKindNotMatchStr: "NOT_MATCH_STR", |
||||
BinOpKindMatchRe: "MATCH_RE", |
||||
BinOpKindNotMatchRe: "NOT_MATCH_RE", |
||||
} |
||||
|
||||
// String returns a human-readable representation of the binary operation kind.
|
||||
func (k BinOpKind) String() string { |
||||
if s, ok := binOpKindStrings[k]; ok { |
||||
return s |
||||
} |
||||
return fmt.Sprintf("BinOpKind(%d)", k) |
||||
} |
||||
|
||||
// The BinOp instruction yields the result of binary operation Left Op Right.
|
||||
// BinOp implements both [Instruction] and [Value].
|
||||
type BinOp struct { |
||||
id string |
||||
|
||||
Left, Right Value |
||||
Op BinOpKind |
||||
} |
||||
|
||||
var ( |
||||
_ Value = (*BinOp)(nil) |
||||
_ Instruction = (*BinOp)(nil) |
||||
) |
||||
|
||||
// Name returns an identifier for the BinOp operation.
|
||||
func (b *BinOp) Name() string { |
||||
if b.id != "" { |
||||
return b.id |
||||
} |
||||
return fmt.Sprintf("<%p>", b) |
||||
} |
||||
|
||||
// String returns the disassembled SSA form of the BinOp instruction.
|
||||
func (b *BinOp) String() string { |
||||
return fmt.Sprintf("%s %s, %s", b.Op, b.Left.Name(), b.Right.Name()) |
||||
} |
||||
|
||||
// Schema returns the schema of the BinOp operation.
|
||||
func (b *BinOp) Schema() *schema.Schema { |
||||
// TODO(rfratto): What should be returned here? Should the schema of BinOp
|
||||
// take on the schema of its LHS or RHS? Does it depend on the operation?
|
||||
return nil |
||||
} |
||||
|
||||
func (b *BinOp) isValue() {} |
||||
func (b *BinOp) isInstruction() {} |
||||
@ -0,0 +1,53 @@ |
||||
package logical |
||||
|
||||
import ( |
||||
"fmt" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/engine/planner/schema" |
||||
) |
||||
|
||||
// The Limit instruction limits the number of rows from a table relation. Limit
|
||||
// implements [Instruction] and [Value].
|
||||
type Limit struct { |
||||
id string |
||||
|
||||
Table Value // Table relation to limit.
|
||||
|
||||
// Skip is the number of rows to skip before returning results. A value of 0
|
||||
// means no rows are skipped.
|
||||
Skip uint64 |
||||
|
||||
// Fetch is the maximum number of rows to return. A value of 0 means all rows
|
||||
// are returned (after applying Skip).
|
||||
Fetch uint64 |
||||
} |
||||
|
||||
var ( |
||||
_ Value = (*Limit)(nil) |
||||
_ Instruction = (*Limit)(nil) |
||||
) |
||||
|
||||
// Name returns an identifier for the Limit operation.
|
||||
func (l *Limit) Name() string { |
||||
if l.id != "" { |
||||
return l.id |
||||
} |
||||
return fmt.Sprintf("<%p>", l) |
||||
} |
||||
|
||||
// String returns the disassembled SSA form of the Limit instruction.
|
||||
func (l *Limit) String() string { |
||||
// TODO(rfratto): change the type of l.Input to [Value] so we can use
|
||||
// s.Value.Name here.
|
||||
return fmt.Sprintf("limit %v [skip=%d, fetch=%d]", l.Table.Name(), l.Skip, l.Fetch) |
||||
} |
||||
|
||||
// Schema returns the schema of the limit operation.
|
||||
func (l *Limit) Schema() *schema.Schema { |
||||
// The schema is the same as the input plan's schema since limiting
|
||||
// only affects the number of rows, not their structure.
|
||||
return l.Table.Schema() |
||||
} |
||||
|
||||
func (l *Limit) isInstruction() {} |
||||
func (l *Limit) isValue() {} |
||||
@ -0,0 +1,145 @@ |
||||
package logical |
||||
|
||||
import ( |
||||
"fmt" |
||||
"strconv" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/engine/planner/schema" |
||||
) |
||||
|
||||
// LiteralKind denotes the kind of [Literal] value.
|
||||
type LiteralKind int |
||||
|
||||
// Recognized values of [LiteralKind].
|
||||
const ( |
||||
// LiteralKindInvalid indicates an invalid literal value.
|
||||
LiteralKindInvalid LiteralKind = iota |
||||
|
||||
LiteralKindNull // NULL literal value.
|
||||
LiteralKindString // String literal value.
|
||||
LiteralKindInt64 // 64-bit integer literal value.
|
||||
LiteralKindUint64 // 64-bit unsigned integer literal value.
|
||||
LiteralKindByteArray // Byte array literal value.
|
||||
) |
||||
|
||||
var literalKindStrings = map[LiteralKind]string{ |
||||
LiteralKindInvalid: "invalid", |
||||
|
||||
LiteralKindNull: "null", |
||||
LiteralKindString: "string", |
||||
LiteralKindInt64: "int64", |
||||
LiteralKindUint64: "uint64", |
||||
LiteralKindByteArray: "[]byte", |
||||
} |
||||
|
||||
// String returns the string representation of the LiteralKind.
|
||||
func (k LiteralKind) String() string { |
||||
if s, ok := literalKindStrings[k]; ok { |
||||
return s |
||||
} |
||||
return fmt.Sprintf("LiteralKind(%d)", k) |
||||
} |
||||
|
||||
// A Literal represents a literal value known at plan time. Literal only
|
||||
// implements [Value].
|
||||
//
|
||||
// The zero value of a Literal is a NULL value.
|
||||
type Literal struct { |
||||
val any |
||||
} |
||||
|
||||
var _ Value = (*Literal)(nil) |
||||
|
||||
// LiteralString creates a new Literal value from a string.
|
||||
func LiteralString(v string) *Literal { return &Literal{val: v} } |
||||
|
||||
// LiteralInt64 creates a new Literal value from a 64-bit integer.
|
||||
func LiteralInt64(v int64) *Literal { return &Literal{val: v} } |
||||
|
||||
// LiteralUint64 creates a new Literal value from a 64-bit unsigned integer.
|
||||
func LiteralUint64(v uint64) *Literal { return &Literal{val: v} } |
||||
|
||||
// LiteralByteArray creates a new Literal value from a byte slice.
|
||||
func LiteralByteArray(v []byte) *Literal { return &Literal{val: v} } |
||||
|
||||
// Kind returns the kind of value represented by the literal.
|
||||
func (lit Literal) Kind() LiteralKind { |
||||
switch lit.val.(type) { |
||||
case nil: |
||||
return LiteralKindNull |
||||
case string: |
||||
return LiteralKindString |
||||
case int64: |
||||
return LiteralKindInt64 |
||||
case uint64: |
||||
return LiteralKindUint64 |
||||
case []byte: |
||||
return LiteralKindByteArray |
||||
default: |
||||
return LiteralKindInvalid |
||||
} |
||||
} |
||||
|
||||
// Name returns the string form of the literal.
|
||||
func (lit Literal) Name() string { |
||||
return lit.String() |
||||
} |
||||
|
||||
// String returns a printable form of the literal, even if lit is not a
|
||||
// [LiteralKindString].
|
||||
func (lit Literal) String() string { |
||||
switch lit.Kind() { |
||||
case LiteralKindNull: |
||||
return "NULL" |
||||
case LiteralKindString: |
||||
return strconv.Quote(lit.val.(string)) |
||||
case LiteralKindInt64: |
||||
return strconv.FormatInt(lit.Int64(), 10) |
||||
case LiteralKindUint64: |
||||
return strconv.FormatUint(lit.Uint64(), 10) |
||||
case LiteralKindByteArray: |
||||
return fmt.Sprintf("%v", lit.val) |
||||
default: |
||||
return fmt.Sprintf("Literal(%s)", lit.Kind()) |
||||
} |
||||
} |
||||
|
||||
// IsNull returns true if lit is a [LiteralKindNull] value.
|
||||
func (lit Literal) IsNull() bool { |
||||
return lit.Kind() == LiteralKindNull |
||||
} |
||||
|
||||
// Int64 returns lit's value as an int64. It panics if lit is not a
|
||||
// [LiteralKindInt64].
|
||||
func (lit Literal) Int64() int64 { |
||||
if expect, actual := LiteralKindInt64, lit.Kind(); expect != actual { |
||||
panic(fmt.Sprintf("literal type is %s, not %s", actual, expect)) |
||||
} |
||||
return lit.val.(int64) |
||||
} |
||||
|
||||
// Uint64 returns lit's value as a uint64. It panics if lit is not a
|
||||
// [LiteralKindUint64].
|
||||
func (lit Literal) Uint64() uint64 { |
||||
if expect, actual := LiteralKindUint64, lit.Kind(); expect != actual { |
||||
panic(fmt.Sprintf("literal type is %s, not %s", actual, expect)) |
||||
} |
||||
return lit.val.(uint64) |
||||
} |
||||
|
||||
// ByteArray returns lit's value as a byte slice. It panics if lit is not a
|
||||
// [LiteralKindByteArray].
|
||||
func (lit Literal) ByteArray() []byte { |
||||
if expect, actual := LiteralKindByteArray, lit.Kind(); expect != actual { |
||||
panic(fmt.Sprintf("literal type is %s, not %s", actual, expect)) |
||||
} |
||||
return lit.val.([]byte) |
||||
} |
||||
|
||||
func (lit *Literal) Schema() *schema.Schema { |
||||
// TODO(rfratto): schema.Schema needs to be updated to be a more general
|
||||
// "type" instead.
|
||||
return nil |
||||
} |
||||
|
||||
func (lit *Literal) isValue() {} |
||||
@ -0,0 +1,49 @@ |
||||
package logical |
||||
|
||||
import ( |
||||
"fmt" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/engine/planner/schema" |
||||
) |
||||
|
||||
// The MakeTable instruction yields a table relation from an identifier.
|
||||
// MakeTable implements both [Instruction] and [Value].
|
||||
type MakeTable struct { |
||||
id string |
||||
|
||||
// Selector is used to generate a table relation. All streams for which the
|
||||
// selector passes are included in the resulting table.
|
||||
//
|
||||
// It is invalid for Selector to include a [ColumnRef] that is not
|
||||
// [ColumnTypeBuiltin] or [ColumnTypeLabel].
|
||||
Selector Value |
||||
} |
||||
|
||||
var ( |
||||
_ Value = (*MakeTable)(nil) |
||||
_ Instruction = (*MakeTable)(nil) |
||||
) |
||||
|
||||
// Name returns an identifier for the MakeTable operation.
|
||||
func (t *MakeTable) Name() string { |
||||
if t.id != "" { |
||||
return t.id |
||||
} |
||||
return fmt.Sprintf("<%p>", t) |
||||
} |
||||
|
||||
// String returns the disassembled SSA form of the MakeTable instruction.
|
||||
func (t *MakeTable) String() string { |
||||
return fmt.Sprintf("MAKE_TABLE [selector=%s]", t.Selector.Name()) |
||||
} |
||||
|
||||
// Schema returns the schema of the table.
|
||||
// This implements part of the Plan interface.
|
||||
func (t *MakeTable) Schema() *schema.Schema { |
||||
// TODO(rfratto): What should we return here? What's possible for the logical
|
||||
// planner to know about the selector at planning time?
|
||||
return nil |
||||
} |
||||
|
||||
func (t *MakeTable) isInstruction() {} |
||||
func (t *MakeTable) isValue() {} |
||||
@ -0,0 +1,14 @@ |
||||
package logical |
||||
|
||||
// The Return instruction yields a value to return from a plan. Return
|
||||
// implements [Instruction].
|
||||
type Return struct { |
||||
Value Value // The value to return.
|
||||
} |
||||
|
||||
// String returns the disassembled SSA form of r.
|
||||
func (r *Return) String() string { |
||||
return "RETURN " + r.Value.Name() |
||||
} |
||||
|
||||
func (r *Return) isInstruction() {} |
||||
@ -0,0 +1,48 @@ |
||||
package logical |
||||
|
||||
import ( |
||||
"fmt" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/engine/planner/schema" |
||||
) |
||||
|
||||
// The Select instruction filters rows from a table relation. Select implements
|
||||
// both [Instruction] and [Value].
|
||||
type Select struct { |
||||
id string |
||||
|
||||
Table Value // The table relation to filter.
|
||||
|
||||
// Predicate is used to filter rows from Table. Each row is checked against
|
||||
// the given Predicate, and only rows for which the Predicate is true are
|
||||
// returned.
|
||||
Predicate Value |
||||
} |
||||
|
||||
var ( |
||||
_ Value = (*Select)(nil) |
||||
_ Instruction = (*Select)(nil) |
||||
) |
||||
|
||||
// Name returns an identifier for the Select operation.
|
||||
func (s *Select) Name() string { |
||||
if s.id != "" { |
||||
return s.id |
||||
} |
||||
return fmt.Sprintf("<%p>", s) |
||||
} |
||||
|
||||
// String returns the disassembled SSA form of the Select instruction.
|
||||
func (s *Select) String() string { |
||||
return fmt.Sprintf("SELECT %s [predicate=%s]", s.Table.Name(), s.Predicate.Name()) |
||||
} |
||||
|
||||
// Schema returns the schema of the Select plan.
|
||||
func (s *Select) Schema() *schema.Schema { |
||||
// Since Select only filters rows from a table, the schema is the same as the
|
||||
// input table relation.
|
||||
return s.Table.Schema() |
||||
} |
||||
|
||||
func (s *Select) isInstruction() {} |
||||
func (s *Select) isValue() {} |
||||
@ -0,0 +1,57 @@ |
||||
package logical |
||||
|
||||
import ( |
||||
"fmt" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/engine/planner/schema" |
||||
) |
||||
|
||||
// Sort represents a plan node that sorts rows based on sort expressions.
|
||||
// It corresponds to the ORDER BY clause in SQL and is used to order
|
||||
// the results of a query based on one or more sort expressions.
|
||||
|
||||
// The Sort instruction sorts rows from a table relation. Sort implements both
|
||||
// [Instruction] and [Value].
|
||||
type Sort struct { |
||||
id string |
||||
|
||||
Table Value // The table relation to sort.
|
||||
|
||||
Column ColumnRef // The column to sort by.
|
||||
Ascending bool // Whether to sort in ascending order.
|
||||
NullsFirst bool // Controls whether NULLs appear first (true) or last (false).
|
||||
} |
||||
|
||||
var ( |
||||
_ Value = (*Sort)(nil) |
||||
_ Instruction = (*Sort)(nil) |
||||
) |
||||
|
||||
// Name returns an identifier for the Sort operation.
|
||||
func (s *Sort) Name() string { |
||||
if s.id != "" { |
||||
return s.id |
||||
} |
||||
return fmt.Sprintf("<%p>", s) |
||||
} |
||||
|
||||
// String returns the disassembled SSA form of the Sort instruction.
|
||||
func (s *Sort) String() string { |
||||
return fmt.Sprintf( |
||||
"SORT %s [column=%s, asc=%t, nulls_first=%t]", |
||||
s.Table.Name(), |
||||
s.Column.String(), |
||||
s.Ascending, |
||||
s.NullsFirst, |
||||
) |
||||
} |
||||
|
||||
// Schema returns the schema of the sort plan.
|
||||
func (s *Sort) Schema() *schema.Schema { |
||||
// The schema is the same as the input plan's schema since sorting only
|
||||
// affects the order of rows, not their structure.
|
||||
return s.Table.Schema() |
||||
} |
||||
|
||||
func (s *Sort) isInstruction() {} |
||||
func (s *Sort) isValue() {} |
||||
@ -0,0 +1,69 @@ |
||||
package logical |
||||
|
||||
import ( |
||||
"fmt" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/engine/planner/schema" |
||||
) |
||||
|
||||
// UnaryOpKind denotes the kind of [UnaryOp] operation to perform.
|
||||
type UnaryOpKind int |
||||
|
||||
// Recognized values of [UnaryOpKind].
|
||||
const ( |
||||
// UnaryOpKindInvalid indicates an invalid unary operation.
|
||||
UnaryOpKindInvalid UnaryOpKind = iota |
||||
|
||||
UnaryOpKindNot // Logical NOT operation (!).
|
||||
) |
||||
|
||||
var unaryOpKindStrings = map[UnaryOpKind]string{ |
||||
UnaryOpKindInvalid: "invalid", |
||||
|
||||
UnaryOpKindNot: "NOT", |
||||
} |
||||
|
||||
// String returns the string representation of the UnaryOpKind.
|
||||
func (k UnaryOpKind) String() string { |
||||
if s, ok := unaryOpKindStrings[k]; ok { |
||||
return s |
||||
} |
||||
return fmt.Sprintf("UnaryOpKind(%d)", k) |
||||
} |
||||
|
||||
// The UnaryOp instruction yields the result of unary operation Op Value.
|
||||
// UnaryOp implements both [Instruction] and [Value].
|
||||
type UnaryOp struct { |
||||
id string |
||||
|
||||
Op UnaryOpKind |
||||
Value Value |
||||
} |
||||
|
||||
var ( |
||||
_ Value = (*UnaryOp)(nil) |
||||
_ Instruction = (*UnaryOp)(nil) |
||||
) |
||||
|
||||
// Name returns an identifier for the UnaryOp operation.
|
||||
func (u *UnaryOp) Name() string { |
||||
if u.id != "" { |
||||
return u.id |
||||
} |
||||
return fmt.Sprintf("<%p>", u) |
||||
} |
||||
|
||||
// String returns the disassembled SSA form of the UnaryOp instruction.
|
||||
func (u *UnaryOp) String() string { |
||||
return fmt.Sprintf("%s %s", u.Op, u.Value.Name()) |
||||
} |
||||
|
||||
// Schema returns the schema of the UnaryOp plan.
|
||||
func (u *UnaryOp) Schema() *schema.Schema { |
||||
// TODO(rfratto): What should be returned here? Should the schema of BinOp
|
||||
// take on the schema of its Value? Does it depend on the operation?
|
||||
return nil |
||||
} |
||||
|
||||
func (u *UnaryOp) isValue() {} |
||||
func (u *UnaryOp) isInstruction() {} |
||||
@ -1,187 +0,0 @@ |
||||
// Package logical provides a logical query plan representation for data processing operations.
|
||||
// It defines a type system for expressions and plan nodes that can be used to build and
|
||||
// manipulate query plans in a structured way.
|
||||
package logical |
||||
|
||||
import ( |
||||
"fmt" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/engine/planner/schema" |
||||
) |
||||
|
||||
// PlanType is an enum representing the type of plan node.
|
||||
// It allows consumers to determine the concrete type of a Plan
|
||||
// and safely cast to the appropriate interface.
|
||||
type PlanType int |
||||
|
||||
const ( |
||||
PlanTypeInvalid PlanType = iota // Invalid or uninitialized plan
|
||||
PlanTypeTable // Represents a table scan operation
|
||||
PlanTypeFilter // Represents a filter operation
|
||||
PlanTypeProjection // Represents a projection operation
|
||||
PlanTypeAggregate // Represents an aggregation operation
|
||||
PlanTypeLimit // Represents a limit operation
|
||||
PlanTypeSort // Represents a sort operation
|
||||
) |
||||
|
||||
// String returns a string representation of the plan type.
|
||||
// This is useful for debugging and error messages.
|
||||
func (t PlanType) String() string { |
||||
switch t { |
||||
case PlanTypeInvalid: |
||||
return "Invalid" |
||||
case PlanTypeTable: |
||||
return "Table" |
||||
case PlanTypeFilter: |
||||
return "Filter" |
||||
case PlanTypeProjection: |
||||
return "Projection" |
||||
case PlanTypeAggregate: |
||||
return "Aggregate" |
||||
case PlanTypeLimit: |
||||
return "Limit" |
||||
case PlanTypeSort: |
||||
return "Sort" |
||||
default: |
||||
return "Unknown" |
||||
} |
||||
} |
||||
|
||||
// Plan is the core plan type in the logical package.
|
||||
// It wraps a concrete plan type (MakeTable, Filter, etc.)
|
||||
// and provides methods to safely access the underlying value.
|
||||
// This approach replaces the previous interface-based design to reduce
|
||||
// indirection and improve code clarity.
|
||||
type Plan struct { |
||||
ty PlanType // The type of plan
|
||||
val any // The concrete plan value
|
||||
} |
||||
|
||||
// Type returns the type of the plan.
|
||||
// This allows consumers to determine the concrete type of the plan
|
||||
// and safely cast to the appropriate interface.
|
||||
func (p Plan) Type() PlanType { |
||||
return p.ty |
||||
} |
||||
|
||||
// Schema returns the schema of the data produced by this plan node.
|
||||
// It delegates to the appropriate concrete plan type based on the plan type.
|
||||
func (p Plan) Schema() schema.Schema { |
||||
switch p.ty { |
||||
case PlanTypeTable: |
||||
return p.val.(*MakeTable).Schema() |
||||
case PlanTypeFilter: |
||||
return p.val.(*Filter).Schema() |
||||
case PlanTypeProjection: |
||||
return p.val.(*Projection).Schema() |
||||
case PlanTypeAggregate: |
||||
return p.val.(*Aggregate).Schema() |
||||
case PlanTypeLimit: |
||||
return p.val.(*Limit).Schema() |
||||
case PlanTypeSort: |
||||
return p.val.(*Sort).Schema() |
||||
default: |
||||
panic(fmt.Sprintf("unknown plan type: %v", p.ty)) |
||||
} |
||||
} |
||||
|
||||
// Table returns the concrete table plan if this is a table plan.
|
||||
// Panics if this is not a table plan.
|
||||
func (p Plan) Table() *MakeTable { |
||||
if p.ty != PlanTypeTable { |
||||
panic(fmt.Sprintf("not a table plan: %v", p.ty)) |
||||
} |
||||
return p.val.(*MakeTable) |
||||
} |
||||
|
||||
// Filter returns the concrete filter plan if this is a filter plan.
|
||||
// Panics if this is not a filter plan.
|
||||
func (p Plan) Filter() *Filter { |
||||
if p.ty != PlanTypeFilter { |
||||
panic(fmt.Sprintf("not a filter plan: %v", p.ty)) |
||||
} |
||||
return p.val.(*Filter) |
||||
} |
||||
|
||||
// Projection returns the concrete projection plan if this is a projection plan.
|
||||
// Panics if this is not a projection plan.
|
||||
func (p Plan) Projection() *Projection { |
||||
if p.ty != PlanTypeProjection { |
||||
panic(fmt.Sprintf("not a projection plan: %v", p.ty)) |
||||
} |
||||
return p.val.(*Projection) |
||||
} |
||||
|
||||
// Aggregate returns the concrete aggregate plan if this is an aggregate plan.
|
||||
// Panics if this is not an aggregate plan.
|
||||
func (p Plan) Aggregate() *Aggregate { |
||||
if p.ty != PlanTypeAggregate { |
||||
panic(fmt.Sprintf("not an aggregate plan: %v", p.ty)) |
||||
} |
||||
return p.val.(*Aggregate) |
||||
} |
||||
|
||||
// Limit returns the concrete limit plan if this is a limit plan.
|
||||
// Panics if this is not a limit plan.
|
||||
func (p Plan) Limit() *Limit { |
||||
if p.ty != PlanTypeLimit { |
||||
panic(fmt.Sprintf("not a limit plan: %v", p.ty)) |
||||
} |
||||
return p.val.(*Limit) |
||||
} |
||||
|
||||
// Sort returns the concrete sort plan if this is a sort plan.
|
||||
// Panics if this is not a sort plan.
|
||||
func (p Plan) Sort() *Sort { |
||||
if p.ty != PlanTypeSort { |
||||
panic(fmt.Sprintf("not a sort plan: %v", p.ty)) |
||||
} |
||||
return p.val.(*Sort) |
||||
} |
||||
|
||||
// newPlan creates a new plan with the given type and value.
|
||||
// This is a helper function for creating plans of different types.
|
||||
func newPlan(ty PlanType, val any) Plan { |
||||
return Plan{ |
||||
ty: ty, |
||||
val: val, |
||||
} |
||||
} |
||||
|
||||
// NewScan creates a new table scan plan.
|
||||
// This is the entry point for building a query plan, as all queries
|
||||
// start with scanning a table.
|
||||
func NewScan(name string, schema schema.Schema) Plan { |
||||
return newPlan(PlanTypeTable, makeTable(name, schema)) |
||||
} |
||||
|
||||
// NewFilter creates a new filter plan.
|
||||
// This applies a boolean expression to filter rows from the input plan.
|
||||
func NewFilter(input Plan, expr Expr) Plan { |
||||
return newPlan(PlanTypeFilter, newFilter(input, expr)) |
||||
} |
||||
|
||||
// NewProjection creates a new projection plan.
|
||||
// This applies a list of expressions to project columns from the input plan.
|
||||
func NewProjection(input Plan, exprs []Expr) Plan { |
||||
return newPlan(PlanTypeProjection, newProjection(input, exprs)) |
||||
} |
||||
|
||||
// NewAggregate creates a new aggregate plan.
|
||||
// This applies grouping and aggregation to the input plan.
|
||||
func NewAggregate(input Plan, groupExprs []Expr, aggExprs []AggregateExpr) Plan { |
||||
return newPlan(PlanTypeAggregate, newAggregate(input, groupExprs, aggExprs)) |
||||
} |
||||
|
||||
// NewLimit creates a new limit plan.
|
||||
// This limits the number of rows returned by the input plan.
|
||||
// If skip is 0, no rows are skipped. If fetch is 0, all rows are returned after applying skip.
|
||||
func NewLimit(input Plan, skip uint64, fetch uint64) Plan { |
||||
return newPlan(PlanTypeLimit, newLimit(input, skip, fetch)) |
||||
} |
||||
|
||||
// NewSort creates a new sort plan.
|
||||
// This sorts the rows from the input plan based on the provided sort expression.
|
||||
func NewSort(input Plan, expr SortExpr) Plan { |
||||
return newPlan(PlanTypeSort, newSort(input, expr)) |
||||
} |
||||
@ -1,50 +0,0 @@ |
||||
package logical |
||||
|
||||
import ( |
||||
"github.com/grafana/loki/v3/pkg/engine/planner/schema" |
||||
) |
||||
|
||||
// Projection represents a plan node that projects expressions from its input.
|
||||
// It corresponds to the SELECT clause in SQL and is used to select, transform,
|
||||
// or compute columns from the input plan.
|
||||
type Projection struct { |
||||
// input is the child plan node providing data to project
|
||||
input Plan |
||||
// exprs is the list of expressions to project
|
||||
exprs []Expr |
||||
} |
||||
|
||||
// newProjection creates a new Projection plan node.
|
||||
// The Projection logical plan applies a list of expressions to the input data,
|
||||
// producing a new set of columns. This is represented by the SELECT clause in SQL.
|
||||
// Sometimes this is as simple as a list of columns, such as SELECT a, b, c FROM foo,
|
||||
// but it could also include any other type of expression that is supported.
|
||||
// A more complex example would be SELECT (CAST(a AS float) * 3.141592)) AS my_float FROM foo.
|
||||
func newProjection(input Plan, exprs []Expr) *Projection { |
||||
return &Projection{ |
||||
input: input, |
||||
exprs: exprs, |
||||
} |
||||
} |
||||
|
||||
// Schema returns the schema of the projection plan.
|
||||
// The schema is derived from the projected expressions.
|
||||
func (p *Projection) Schema() schema.Schema { |
||||
cols := make([]schema.ColumnSchema, len(p.exprs)) |
||||
for i, expr := range p.exprs { |
||||
cols[i] = expr.ToField(p.input) |
||||
} |
||||
return schema.Schema{Columns: cols} |
||||
} |
||||
|
||||
// Child returns the input plan.
|
||||
// This is a convenience method for accessing the child plan.
|
||||
func (p *Projection) Child() Plan { |
||||
return p.input |
||||
} |
||||
|
||||
// ProjectExprs returns the list of projection expressions.
|
||||
// These are the expressions that define the output columns.
|
||||
func (p *Projection) ProjectExprs() []Expr { |
||||
return p.exprs |
||||
} |
||||
@ -1,59 +0,0 @@ |
||||
package logical |
||||
|
||||
// SortExpr represents a sort expression in a query plan.
|
||||
// It encapsulates an expression to sort on, a sort direction (ascending or descending),
|
||||
// and how NULL values should be handled (first or last).
|
||||
type SortExpr struct { |
||||
// name is a descriptive name for the sort expression
|
||||
name string |
||||
// expr is the expression to sort on
|
||||
expr Expr |
||||
// asc indicates whether to sort in ascending order (true) or descending order (false)
|
||||
asc bool |
||||
// nullsFirst indicates whether NULL values should appear first (true) or last (false)
|
||||
nullsFirst bool |
||||
} |
||||
|
||||
// NewSortExpr creates a new sort expression.
|
||||
//
|
||||
// Parameters:
|
||||
// - name: A descriptive name for the sort expression
|
||||
// - expr: The expression to sort on
|
||||
// - asc: Whether to sort in ascending order (true) or descending order (false)
|
||||
// - nullsFirst: Whether NULL values should appear first (true) or last (false)
|
||||
//
|
||||
// Example usage:
|
||||
//
|
||||
// // Sort by age in ascending order, NULLs last
|
||||
// sortExpr := NewSortExpr("sort_by_age", Col("age"), true, false)
|
||||
//
|
||||
// // Sort by name in descending order, NULLs first
|
||||
// sortExpr := NewSortExpr("sort_by_name", Col("name"), false, true)
|
||||
func NewSortExpr(name string, expr Expr, asc bool, nullsFirst bool) SortExpr { |
||||
return SortExpr{ |
||||
name: name, |
||||
expr: expr, |
||||
asc: asc, |
||||
nullsFirst: nullsFirst, |
||||
} |
||||
} |
||||
|
||||
// Name returns the name of the sort expression.
|
||||
func (s SortExpr) Name() string { |
||||
return s.name |
||||
} |
||||
|
||||
// Expr returns the expression to sort on.
|
||||
func (s SortExpr) Expr() Expr { |
||||
return s.expr |
||||
} |
||||
|
||||
// Asc returns whether the sort is in ascending order.
|
||||
func (s SortExpr) Asc() bool { |
||||
return s.asc |
||||
} |
||||
|
||||
// NullsFirst returns whether NULL values should appear first.
|
||||
func (s SortExpr) NullsFirst() bool { |
||||
return s.nullsFirst |
||||
} |
||||
@ -1,54 +0,0 @@ |
||||
package logical |
||||
|
||||
import ( |
||||
"github.com/grafana/loki/v3/pkg/engine/planner/schema" |
||||
) |
||||
|
||||
// Sort represents a plan node that sorts rows based on sort expressions.
|
||||
// It corresponds to the ORDER BY clause in SQL and is used to order
|
||||
// the results of a query based on one or more sort expressions.
|
||||
type Sort struct { |
||||
// input is the child plan node providing data to sort
|
||||
input Plan |
||||
// expr is the sort expression to apply
|
||||
expr SortExpr |
||||
} |
||||
|
||||
// newSort creates a new Sort plan node.
|
||||
// It takes an input plan and a vector of sort expressions to apply.
|
||||
//
|
||||
// Example usage:
|
||||
//
|
||||
// // Sort by age in ascending order, NULLs last, then by name in descending order, NULLs first
|
||||
// sort := newSort(inputPlan, []SortExpr{
|
||||
// NewSortExpr("sort_by_age", Col("age"), true, false),
|
||||
// NewSortExpr("sort_by_name", Col("name"), false, true),
|
||||
// })
|
||||
func newSort(input Plan, expr SortExpr) *Sort { |
||||
return &Sort{ |
||||
input: input, |
||||
expr: expr, |
||||
} |
||||
} |
||||
|
||||
// Schema returns the schema of the sort plan.
|
||||
// The schema is the same as the input plan's schema since sorting
|
||||
// only affects the order of rows, not their structure.
|
||||
func (s *Sort) Schema() schema.Schema { |
||||
return s.input.Schema() |
||||
} |
||||
|
||||
// Type returns the plan type for this node.
|
||||
func (s *Sort) Type() PlanType { |
||||
return PlanTypeSort |
||||
} |
||||
|
||||
// Child returns the input plan.
|
||||
func (s *Sort) Child() Plan { |
||||
return s.input |
||||
} |
||||
|
||||
// SortExpr returns the sort expression.
|
||||
func (s *Sort) Expr() SortExpr { |
||||
return s.expr |
||||
} |
||||
@ -1,512 +0,0 @@ |
||||
package logical |
||||
|
||||
import ( |
||||
"fmt" |
||||
"strings" |
||||
) |
||||
|
||||
// SSANode represents a single node in the SSA (Static Single Assignment) form
|
||||
// Each node has a unique ID, a type, and ordered properties and references to other nodes
|
||||
type SSANode struct { |
||||
// ID is the unique identifier for this node
|
||||
ID int |
||||
// NodeType is the type of this node (e.g., "MakeTable", "ColumnRef", etc.)
|
||||
NodeType string |
||||
// Tuples represents the ordered properties of this node
|
||||
Tuples []nodeProperty |
||||
// References to other nodes in the SSA form
|
||||
References []int |
||||
} |
||||
|
||||
// nodeProperty represents a key-value property of an SSA node
|
||||
type nodeProperty struct { |
||||
Key string |
||||
Value string |
||||
} |
||||
|
||||
// String returns a string representation of this node
|
||||
// Format: %ID = NodeType [prop1=value1, prop2=value2, ...]
|
||||
func (n *SSANode) String() string { |
||||
var sb strings.Builder |
||||
|
||||
// Format the node ID and type
|
||||
sb.WriteString(fmt.Sprintf("%%%d = %s", n.ID, n.NodeType)) |
||||
|
||||
// Add properties in brackets if any exist
|
||||
if len(n.Tuples) > 0 { |
||||
sb.WriteString(" [") |
||||
|
||||
// Properties are already in the correct order
|
||||
for i, prop := range n.Tuples { |
||||
if i > 0 { |
||||
sb.WriteString(", ") |
||||
} |
||||
sb.WriteString(fmt.Sprintf("%s=%s", prop.Key, prop.Value)) |
||||
} |
||||
|
||||
sb.WriteString("]") |
||||
} |
||||
|
||||
return sb.String() |
||||
} |
||||
|
||||
// SSAForm represents a full query plan in SSA form
|
||||
// It contains a list of nodes and the ID of the root node
|
||||
type SSAForm struct { |
||||
// nodes is an ordered list of SSA nodes, where each node's dependencies
|
||||
// are guaranteed to appear earlier in the list
|
||||
nodes []SSANode |
||||
} |
||||
|
||||
// ConvertToSSA converts a logical plan to SSA form
|
||||
// It performs a post-order traversal of the plan, adding nodes as it goes
|
||||
func ConvertToSSA(plan Plan) (*SSAForm, error) { |
||||
// Initialize the builder with an empty node at index 0
|
||||
builder := &ssaBuilder{ |
||||
nodes: []SSANode{{}}, // Start with an empty node at index 0
|
||||
nodeMap: make(map[string]int), |
||||
nextID: 1, |
||||
exprTypes: make(map[Expr]string), |
||||
} |
||||
|
||||
_, err := builder.processPlan(plan) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("error converting plan to SSA: %w", err) |
||||
} |
||||
|
||||
return &SSAForm{ |
||||
nodes: builder.nodes, |
||||
}, nil |
||||
} |
||||
|
||||
// ssaBuilder is a helper type for building SSA forms
|
||||
type ssaBuilder struct { |
||||
nodes []SSANode |
||||
nodeMap map[string]int // Maps node key to its ID
|
||||
nextID int |
||||
exprTypes map[Expr]string // Cache for expression types
|
||||
} |
||||
|
||||
// getID generates a unique ID for a node
|
||||
func (b *ssaBuilder) getID() int { |
||||
id := b.nextID |
||||
b.nextID++ |
||||
return id |
||||
} |
||||
|
||||
// processPlan processes a logical plan and returns the ID of the resulting SSA node.
|
||||
func (b *ssaBuilder) processPlan(plan Plan) (int, error) { |
||||
switch plan.Type() { |
||||
case PlanTypeTable: |
||||
return b.processTablePlan(plan.Table()) |
||||
case PlanTypeFilter: |
||||
return b.processFilterPlan(plan.Filter()) |
||||
case PlanTypeProjection: |
||||
return b.processProjectionPlan(plan.Projection()) |
||||
case PlanTypeAggregate: |
||||
return b.processAggregatePlan(plan.Aggregate()) |
||||
case PlanTypeLimit: |
||||
return b.processLimitPlan(plan.Limit()) |
||||
case PlanTypeSort: |
||||
return b.processSortPlan(plan.Sort()) |
||||
default: |
||||
return 0, fmt.Errorf("unsupported plan type: %v", plan.Type()) |
||||
} |
||||
} |
||||
|
||||
// processTablePlan processes a table plan node
|
||||
// It creates a MakeTable node with the table name
|
||||
func (b *ssaBuilder) processTablePlan(plan *MakeTable) (int, error) { |
||||
// Create a node for the table
|
||||
id := b.getID() |
||||
node := SSANode{ |
||||
ID: id, |
||||
NodeType: "MakeTable", |
||||
Tuples: []nodeProperty{ |
||||
{Key: "name", Value: plan.TableName()}, |
||||
}, |
||||
} |
||||
|
||||
b.nodes = append(b.nodes, node) |
||||
return id, nil |
||||
} |
||||
|
||||
// processFilterPlan processes a filter plan node
|
||||
// It processes the child plan and filter expression, then creates a Filter node
|
||||
func (b *ssaBuilder) processFilterPlan(plan *Filter) (int, error) { |
||||
// Process the child plan first
|
||||
childID, err := b.processPlan(plan.Child()) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
|
||||
// Process the filter expression
|
||||
exprID, err := b.processExpr(plan.FilterExpr(), plan.Child()) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
|
||||
// Get the name of the expression
|
||||
var exprName string |
||||
|
||||
if plan.FilterExpr().Type() == ExprTypeBinaryOp { |
||||
exprName = plan.FilterExpr().BinaryOp().Name() |
||||
} else { |
||||
exprName = plan.FilterExpr().ToField(plan.Child()).Name |
||||
} |
||||
|
||||
// Create a node for the filter
|
||||
id := b.getID() |
||||
node := SSANode{ |
||||
ID: id, |
||||
NodeType: "Filter", |
||||
Tuples: []nodeProperty{ |
||||
{Key: "name", Value: exprName}, |
||||
{Key: "predicate", Value: fmt.Sprintf("%%%d", exprID)}, |
||||
{Key: "plan", Value: fmt.Sprintf("%%%d", childID)}, |
||||
}, |
||||
References: []int{exprID, childID}, |
||||
} |
||||
|
||||
b.nodes = append(b.nodes, node) |
||||
return id, nil |
||||
} |
||||
|
||||
// processProjectionPlan processes a projection plan node
|
||||
// It processes the child plan and all projection expressions, then creates a Project node
|
||||
func (b *ssaBuilder) processProjectionPlan(plan *Projection) (int, error) { |
||||
// Process the child plan first
|
||||
childID, err := b.processPlan(plan.Child()) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
|
||||
// Process all projection expressions
|
||||
var props []nodeProperty |
||||
var references []int |
||||
|
||||
// Process expressions and build properties in a stable order
|
||||
// determined by the order of expressions in the plan
|
||||
for _, expr := range plan.ProjectExprs() { |
||||
exprID, err := b.processExpr(expr, plan.Child()) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
|
||||
field := expr.ToField(plan.Child()) |
||||
props = append(props, nodeProperty{ |
||||
Key: field.Name, |
||||
Value: fmt.Sprintf("%%%d", exprID), |
||||
}) |
||||
references = append(references, exprID) |
||||
} |
||||
|
||||
// Create a node for the projection
|
||||
id := b.getID() |
||||
node := SSANode{ |
||||
ID: id, |
||||
NodeType: "Project", |
||||
Tuples: props, |
||||
References: append(references, childID), // Add childID to references
|
||||
} |
||||
|
||||
b.nodes = append(b.nodes, node) |
||||
return id, nil |
||||
} |
||||
|
||||
// processAggregatePlan processes an aggregate plan node
|
||||
// It processes the child plan, group expressions, and aggregate expressions,
|
||||
// then creates an AggregatePlan node
|
||||
func (b *ssaBuilder) processAggregatePlan(plan *Aggregate) (int, error) { |
||||
// Process the child plan first
|
||||
_, err := b.processPlan(plan.Child()) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
|
||||
// Process group expressions
|
||||
var groupingRefs []string |
||||
var groupingIDs []int |
||||
|
||||
for _, expr := range plan.GroupExprs() { |
||||
exprID, err := b.processExpr(expr, plan.Child()) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
|
||||
groupingRefs = append(groupingRefs, fmt.Sprintf("%%%d", exprID)) |
||||
groupingIDs = append(groupingIDs, exprID) |
||||
} |
||||
|
||||
// Process aggregate expressions
|
||||
var aggregationRefs []string |
||||
var aggregationIDs []int |
||||
|
||||
for _, expr := range plan.AggregateExprs() { |
||||
exprID, err := b.processAggregateExpr(&expr, plan.Child()) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
|
||||
aggregationRefs = append(aggregationRefs, fmt.Sprintf("%%%d", exprID)) |
||||
aggregationIDs = append(aggregationIDs, exprID) |
||||
} |
||||
|
||||
// Create a node for the aggregate plan
|
||||
id := b.getID() |
||||
node := SSANode{ |
||||
ID: id, |
||||
NodeType: "AggregatePlan", |
||||
Tuples: []nodeProperty{ |
||||
{Key: "aggregations", Value: fmt.Sprintf("[%s]", strings.Join(aggregationRefs, ", "))}, |
||||
{Key: "groupings", Value: fmt.Sprintf("[%s]", strings.Join(groupingRefs, ", "))}, |
||||
}, |
||||
References: append(groupingIDs, aggregationIDs...), |
||||
} |
||||
|
||||
b.nodes = append(b.nodes, node) |
||||
return id, nil |
||||
} |
||||
|
||||
// processExpr processes an expression and returns its ID
|
||||
// It handles different expression types by delegating to specific processing methods
|
||||
func (b *ssaBuilder) processExpr(expr Expr, parent Plan) (int, error) { |
||||
switch expr.Type() { |
||||
case ExprTypeColumn: |
||||
return b.processColumnExpr(expr.Column(), parent) |
||||
case ExprTypeLiteral: |
||||
return b.processLiteralExpr(expr.Literal()) |
||||
case ExprTypeBinaryOp: |
||||
return b.processBinaryOpExpr(expr.BinaryOp(), parent) |
||||
case ExprTypeAggregate: |
||||
return b.processAggregateExpr(expr.Aggregate(), parent) |
||||
default: |
||||
return 0, fmt.Errorf("unknown expression type: %v", expr.Type()) |
||||
} |
||||
} |
||||
|
||||
// processColumnExpr processes a column expression
|
||||
// It creates a ColumnRef node with the column name and type
|
||||
func (b *ssaBuilder) processColumnExpr(expr *ColumnExpr, parent Plan) (int, error) { |
||||
field := expr.ToField(parent) |
||||
|
||||
// Create a node for the column reference
|
||||
id := b.getID() |
||||
node := SSANode{ |
||||
ID: id, |
||||
NodeType: "ColumnRef", |
||||
Tuples: []nodeProperty{ |
||||
{Key: "name", Value: field.Name}, |
||||
{Key: "type", Value: field.Type.String()}, |
||||
}, |
||||
} |
||||
|
||||
b.nodes = append(b.nodes, node) |
||||
return id, nil |
||||
} |
||||
|
||||
// processLiteralExpr processes a literal expression
|
||||
// It creates a Literal node with the value and type
|
||||
func (b *ssaBuilder) processLiteralExpr(expr *LiteralExpr) (int, error) { |
||||
// Create a node for the literal
|
||||
id := b.getID() |
||||
node := SSANode{ |
||||
ID: id, |
||||
NodeType: "Literal", |
||||
Tuples: []nodeProperty{ |
||||
{Key: "val", Value: expr.ValueString()}, |
||||
{Key: "type", Value: expr.ValueType().String()}, |
||||
}, |
||||
} |
||||
|
||||
b.nodes = append(b.nodes, node) |
||||
return id, nil |
||||
} |
||||
|
||||
// processBinaryOpExpr processes a binary operation expression
|
||||
// It processes the left and right operands, then creates a BinaryOp node
|
||||
func (b *ssaBuilder) processBinaryOpExpr(expr *BinOpExpr, parent Plan) (int, error) { |
||||
// Process the left and right operands first
|
||||
leftID, err := b.processExpr(expr.Left(), parent) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
|
||||
rightID, err := b.processExpr(expr.Right(), parent) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
|
||||
// Create a node for the binary operation
|
||||
id := b.getID() |
||||
node := SSANode{ |
||||
ID: id, |
||||
NodeType: "BinaryOp", |
||||
Tuples: []nodeProperty{ |
||||
{Key: "op", Value: fmt.Sprintf("(%s)", expr.Op().String())}, |
||||
{Key: "name", Value: expr.Name()}, |
||||
{Key: "left", Value: fmt.Sprintf("%%%d", leftID)}, |
||||
{Key: "right", Value: fmt.Sprintf("%%%d", rightID)}, |
||||
}, |
||||
References: []int{leftID, rightID}, |
||||
} |
||||
|
||||
b.nodes = append(b.nodes, node) |
||||
return id, nil |
||||
} |
||||
|
||||
// processAggregateExpr processes an aggregate expression
|
||||
// It processes the input expression first if it exists, then creates an AggregationExpr node
|
||||
func (b *ssaBuilder) processAggregateExpr(expr *AggregateExpr, parent Plan) (int, error) { |
||||
// Process the input expression first if it exists
|
||||
exprID, err := b.processExpr(expr.SubExpr(), parent) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
|
||||
// Create a node for the aggregate expression
|
||||
id := b.getID() |
||||
node := SSANode{ |
||||
ID: id, |
||||
NodeType: "AggregationExpr", |
||||
Tuples: []nodeProperty{ |
||||
{Key: "name", Value: expr.Name()}, |
||||
{Key: "op", Value: string(expr.Op())}, |
||||
}, |
||||
References: []int{exprID}, |
||||
} |
||||
|
||||
b.nodes = append(b.nodes, node) |
||||
return id, nil |
||||
} |
||||
|
||||
// processLimitPlan processes a limit plan and returns the ID of the resulting SSA node.
|
||||
// This converts a Limit logical plan node to its SSA representation.
|
||||
//
|
||||
// The SSA node for a Limit plan has the following format:
|
||||
//
|
||||
// %ID = Limit [Skip=X, Fetch=Y]
|
||||
//
|
||||
// Where X is the number of rows to skip and Y is the maximum number of rows to return.
|
||||
// The Limit node always includes both Skip and Fetch properties, even when they are zero.
|
||||
// This ensures consistent representation and allows for optimization in a separate step.
|
||||
//
|
||||
// The Limit node references its input plan as a dependency.
|
||||
func (b *ssaBuilder) processLimitPlan(plan *Limit) (int, error) { |
||||
// Process the input plan
|
||||
inputID, err := b.processPlan(plan.Child()) |
||||
if err != nil { |
||||
return 0, fmt.Errorf("failed to process limit input plan: %w", err) |
||||
} |
||||
|
||||
// Create properties for the limit node
|
||||
tuples := []nodeProperty{ |
||||
{ |
||||
Key: "Skip", |
||||
Value: fmt.Sprintf("%d", plan.Skip()), |
||||
}, |
||||
{ |
||||
Key: "Fetch", |
||||
Value: fmt.Sprintf("%d", plan.Fetch()), |
||||
}, |
||||
} |
||||
|
||||
// Create the limit node
|
||||
id := b.getID() |
||||
b.nodes = append(b.nodes, SSANode{ |
||||
ID: id, |
||||
NodeType: "Limit", |
||||
Tuples: tuples, |
||||
References: []int{inputID}, |
||||
}) |
||||
return id, nil |
||||
} |
||||
|
||||
// processSortPlan processes a sort plan and returns the ID of the resulting SSA node.
|
||||
// This converts a Sort logical plan node to its SSA representation.
|
||||
//
|
||||
// The SSA node for a Sort plan has the following format:
|
||||
//
|
||||
// %ID = Sort [expr=X, direction=Y, nulls=Z]
|
||||
//
|
||||
// Where X is the name of the sort expression, Y is the sort direction, and Z is the nulls position.
|
||||
// The Sort node references its input plan and sort expression as dependencies.
|
||||
func (b *ssaBuilder) processSortPlan(plan *Sort) (int, error) { |
||||
// Process the child plan first
|
||||
childID, err := b.processPlan(plan.Child()) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
|
||||
// Process the sort expression
|
||||
exprID, err := b.processExpr(plan.Expr().Expr(), plan.Child()) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
|
||||
// Create direction and nulls position properties
|
||||
direction := "asc" |
||||
if !plan.Expr().Asc() { |
||||
direction = "desc" |
||||
} |
||||
|
||||
nullsPosition := "last" |
||||
if plan.Expr().NullsFirst() { |
||||
nullsPosition = "first" |
||||
} |
||||
|
||||
// Create the Sort node
|
||||
id := b.getID() |
||||
node := SSANode{ |
||||
ID: id, |
||||
NodeType: "Sort", |
||||
Tuples: []nodeProperty{ |
||||
{Key: "expr", Value: plan.Expr().Name()}, |
||||
{Key: "direction", Value: direction}, |
||||
{Key: "nulls", Value: nullsPosition}, |
||||
}, |
||||
References: []int{exprID, childID}, |
||||
} |
||||
|
||||
b.nodes = append(b.nodes, node) |
||||
return id, nil |
||||
} |
||||
|
||||
// String returns a string representation of the SSA form with the RETURN statement
|
||||
func (f *SSAForm) String() string { |
||||
if len(f.nodes) <= 1 { |
||||
return "" |
||||
} |
||||
|
||||
// The root is the last node added
|
||||
lastNodeID := f.nodes[len(f.nodes)-1].ID |
||||
return f.Format() + fmt.Sprintf("\nRETURN %%%d", lastNodeID) |
||||
} |
||||
|
||||
// Format returns a formatted string representation of the SSA form
|
||||
// It includes all nodes but not the RETURN statement
|
||||
func (f *SSAForm) Format() string { |
||||
var sb strings.Builder |
||||
|
||||
// Add each node (skip node 0 if it exists)
|
||||
for i := 1; i < len(f.nodes); i++ { |
||||
if i > 1 { |
||||
sb.WriteString("\n") |
||||
} |
||||
sb.WriteString(f.nodes[i].String()) |
||||
} |
||||
|
||||
return sb.String() |
||||
} |
||||
|
||||
// Nodes returns a map of node IDs to their string representations
|
||||
// This is primarily used for testing
|
||||
func (f *SSAForm) Nodes() map[int]string { |
||||
result := make(map[int]string) |
||||
for _, node := range f.nodes { |
||||
if node.ID > 0 { |
||||
result[node.ID] = node.String() |
||||
} |
||||
} |
||||
return result |
||||
} |
||||
@ -1,189 +0,0 @@ |
||||
package logical |
||||
|
||||
import ( |
||||
"fmt" |
||||
"strings" |
||||
"testing" |
||||
|
||||
"github.com/stretchr/testify/require" |
||||
|
||||
"github.com/grafana/loki/v3/pkg/engine/planner/schema" |
||||
) |
||||
|
||||
func TestConvertSimpleQueryToSSA(t *testing.T) { |
||||
// Build a simple query plan:
|
||||
// SELECT id, name FROM users WHERE age > 21
|
||||
ds := &testDataSource{ |
||||
name: "users", |
||||
schema: schema.Schema{ |
||||
Columns: []schema.ColumnSchema{ |
||||
{Name: "id", Type: schema.ValueTypeUint64}, |
||||
{Name: "name", Type: schema.ValueTypeString}, |
||||
{Name: "age", Type: schema.ValueTypeUint64}, |
||||
}, |
||||
}, |
||||
} |
||||
|
||||
scan := NewScan(ds.Name(), ds.Schema()) |
||||
filter := NewFilter(scan, Gt("age_gt_21", Col("age"), LitI64(21))) |
||||
proj := NewProjection(filter, []Expr{Col("id"), Col("name")}) |
||||
|
||||
// Convert to SSA
|
||||
ssaForm, err := ConvertToSSA(proj) |
||||
require.NoError(t, err) |
||||
require.NotNil(t, ssaForm) |
||||
|
||||
// Get the string representation for debugging
|
||||
ssaString := ssaForm.String() |
||||
t.Logf("SSA Form:\n%s", ssaString) |
||||
|
||||
// Define expected output
|
||||
exp := ` |
||||
%1 = MakeTable [name=users] |
||||
%2 = ColumnRef [name=age, type=VALUE_TYPE_UINT64] |
||||
%3 = Literal [val=21, type=VALUE_TYPE_INT64] |
||||
%4 = BinaryOp [op=(>), name=age_gt_21, left=%2, right=%3] |
||||
%5 = Filter [name=age_gt_21, predicate=%4, plan=%1] |
||||
%6 = ColumnRef [name=id, type=VALUE_TYPE_UINT64] |
||||
%7 = ColumnRef [name=name, type=VALUE_TYPE_STRING] |
||||
%8 = Project [id=%6, name=%7] |
||||
` |
||||
exp = strings.TrimSpace(exp) |
||||
|
||||
// Get the actual output without the RETURN statement
|
||||
ssaOutput := ssaForm.Format() |
||||
ssaLines := strings.Split(strings.TrimSpace(ssaOutput), "\n") |
||||
|
||||
expLines := strings.Split(exp, "\n") |
||||
require.Equal(t, len(expLines), len(ssaLines), "Expected and actual SSA output line counts do not match") |
||||
|
||||
for i, line := range expLines { |
||||
require.Equal(t, strings.TrimSpace(line), strings.TrimSpace(ssaLines[i]), fmt.Sprintf("Mismatch at line %d", i+1)) |
||||
} |
||||
} |
||||
|
||||
func TestConvertComplexQueryToSSA(t *testing.T) { |
||||
// Calculate the sum of sales per region for the year 2020
|
||||
ds := &testDataSource{ |
||||
name: "orders", |
||||
schema: schema.Schema{ |
||||
Columns: []schema.ColumnSchema{ |
||||
{Name: "region", Type: schema.ValueTypeString}, |
||||
{Name: "sales", Type: schema.ValueTypeUint64}, |
||||
{Name: "year", Type: schema.ValueTypeUint64}, |
||||
}, |
||||
}, |
||||
} |
||||
|
||||
df := NewDataFrame( |
||||
NewScan(ds.Name(), ds.Schema()), |
||||
).Filter( |
||||
Eq("year_2020", Col("year"), LitI64(2020)), |
||||
).Project( |
||||
[]Expr{ |
||||
Col("region"), |
||||
Col("sales"), |
||||
Col("year"), |
||||
}, |
||||
).Aggregate( |
||||
[]Expr{Col("region")}, |
||||
[]AggregateExpr{ |
||||
Sum("total_sales", Col("sales")), |
||||
}, |
||||
).Limit( |
||||
0, |
||||
10, |
||||
) |
||||
|
||||
// Convert to SSA
|
||||
ssaForm, err := ConvertToSSA(df.LogicalPlan()) |
||||
require.NoError(t, err) |
||||
require.NotNil(t, ssaForm) |
||||
|
||||
// Get the string representation for debugging
|
||||
ssaString := ssaForm.String() |
||||
t.Logf("SSA Form:\n%s", ssaString) |
||||
|
||||
// Define expected output
|
||||
exp := ` |
||||
%1 = MakeTable [name=orders] |
||||
%2 = ColumnRef [name=year, type=VALUE_TYPE_UINT64] |
||||
%3 = Literal [val=2020, type=VALUE_TYPE_INT64] |
||||
%4 = BinaryOp [op=(==), name=year_2020, left=%2, right=%3] |
||||
%5 = Filter [name=year_2020, predicate=%4, plan=%1] |
||||
%6 = ColumnRef [name=region, type=VALUE_TYPE_STRING] |
||||
%7 = ColumnRef [name=sales, type=VALUE_TYPE_UINT64] |
||||
%8 = ColumnRef [name=year, type=VALUE_TYPE_UINT64] |
||||
%9 = Project [region=%6, sales=%7, year=%8] |
||||
%10 = ColumnRef [name=region, type=VALUE_TYPE_STRING] |
||||
%11 = ColumnRef [name=sales, type=VALUE_TYPE_UINT64] |
||||
%12 = AggregationExpr [name=total_sales, op=sum] |
||||
%13 = AggregatePlan [aggregations=[%12], groupings=[%10]] |
||||
%14 = Limit [Skip=0, Fetch=10] |
||||
` |
||||
exp = strings.TrimSpace(exp) |
||||
|
||||
// Get the actual output without the RETURN statement
|
||||
ssaOutput := ssaForm.Format() |
||||
ssaLines := strings.Split(strings.TrimSpace(ssaOutput), "\n") |
||||
|
||||
expLines := strings.Split(exp, "\n") |
||||
require.Equal(t, len(expLines), len(ssaLines), "Expected and actual SSA output line counts do not match") |
||||
|
||||
for i, line := range expLines { |
||||
require.Equal(t, strings.TrimSpace(line), strings.TrimSpace(ssaLines[i]), fmt.Sprintf("Mismatch at line %d", i+1)) |
||||
} |
||||
} |
||||
|
||||
func TestConvertSortQueryToSSA(t *testing.T) { |
||||
// Build a query plan with sorting:
|
||||
// SELECT id, name, age FROM users WHERE age > 21 ORDER BY age ASC, name DESC
|
||||
ds := &testDataSource{ |
||||
name: "users", |
||||
schema: schema.Schema{ |
||||
Columns: []schema.ColumnSchema{ |
||||
{Name: "id", Type: schema.ValueTypeUint64}, |
||||
{Name: "name", Type: schema.ValueTypeString}, |
||||
{Name: "age", Type: schema.ValueTypeUint64}, |
||||
}, |
||||
}, |
||||
} |
||||
|
||||
scan := NewScan(ds.Name(), ds.Schema()) |
||||
filter := NewFilter(scan, Gt("age_gt_21", Col("age"), LitI64(21))) |
||||
proj := NewProjection(filter, []Expr{Col("id"), Col("name"), Col("age")}) |
||||
|
||||
// Sort by age ascending, nulls last
|
||||
sortByAge := NewSort(proj, NewSortExpr("sort_by_age", Col("age"), true, false)) |
||||
|
||||
ssa, err := ConvertToSSA(sortByAge) |
||||
require.NoError(t, err) |
||||
|
||||
t.Logf("SSA Form:\n%s", ssa.Format()) |
||||
|
||||
// Verify the structure of the SSA form
|
||||
nodes := ssa.Nodes() |
||||
require.NotEmpty(t, nodes) |
||||
|
||||
// The last node should be the Sort node
|
||||
lastNodeID := len(ssa.nodes) - 1 |
||||
lastNode := ssa.nodes[lastNodeID] |
||||
require.Equal(t, "Sort", lastNode.NodeType) |
||||
|
||||
// Verify the properties of the Sort node
|
||||
var exprName, direction, nulls string |
||||
for _, tuple := range lastNode.Tuples { |
||||
switch tuple.Key { |
||||
case "expr": |
||||
exprName = tuple.Value |
||||
case "direction": |
||||
direction = tuple.Value |
||||
case "nulls": |
||||
nulls = tuple.Value |
||||
} |
||||
} |
||||
|
||||
require.Equal(t, "sort_by_age", exprName) |
||||
require.Equal(t, "asc", direction) |
||||
require.Equal(t, "last", nulls) |
||||
} |
||||
@ -1,43 +0,0 @@ |
||||
package logical |
||||
|
||||
import ( |
||||
"github.com/grafana/loki/v3/pkg/engine/planner/schema" |
||||
) |
||||
|
||||
// MakeTable represents a plan node that scans input data.
|
||||
// It is the leaf node in the query tree, representing the initial data source
|
||||
// from which all other operations will read. This is equivalent to a table scan
|
||||
// operation in a relational database.
|
||||
type MakeTable struct { |
||||
// name is the identifier of the table to scan
|
||||
name string |
||||
// schema defines the structure of the data in the table
|
||||
schema schema.Schema |
||||
} |
||||
|
||||
// makeTable creates a new MakeTable plan node with the given name and schema.
|
||||
// This is an internal constructor used by the public NewScan function.
|
||||
func makeTable(name string, schema schema.Schema) *MakeTable { |
||||
return &MakeTable{ |
||||
name: name, |
||||
schema: schema, |
||||
} |
||||
} |
||||
|
||||
// Schema returns the schema of the table.
|
||||
// This implements part of the Plan interface.
|
||||
func (t *MakeTable) Schema() schema.Schema { |
||||
return t.schema |
||||
} |
||||
|
||||
// TableSchema returns the schema of the table.
|
||||
// This is a convenience method that returns the same value as Schema().
|
||||
func (t *MakeTable) TableSchema() schema.Schema { |
||||
return t.schema |
||||
} |
||||
|
||||
// TableName returns the name of the table.
|
||||
// This is used for identifying the data source in the query plan.
|
||||
func (t *MakeTable) TableName() string { |
||||
return t.name |
||||
} |
||||
Loading…
Reference in new issue