Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/internal/executor/expressions.go

159 lines
5.0 KiB

package executor
import (
"fmt"
"github.com/apache/arrow-go/v18/arrow"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
)
type expressionEvaluator struct{}
func newExpressionEvaluator() expressionEvaluator {
return expressionEvaluator{}
}
func (e expressionEvaluator) eval(expr physical.Expression, input arrow.RecordBatch) (arrow.Array, error) {
switch expr := expr.(type) {
case *physical.LiteralExpr:
return NewScalar(expr.Literal(), int(input.NumRows())), nil
case *physical.ColumnExpr:
colIdent := semconv.NewIdentifier(expr.Ref.Column, expr.Ref.Type, types.Loki.String)
// For non-ambiguous columns, we can look up the column in the schema by its fully qualified name.
if expr.Ref.Type != types.ColumnTypeAmbiguous {
for idx, field := range input.Schema().Fields() {
ident, err := semconv.ParseFQN(field.Name)
if err != nil {
return nil, fmt.Errorf("failed to parse column %s: %w", field.Name, err)
}
if ident.ShortName() == colIdent.ShortName() && ident.ColumnType() == colIdent.ColumnType() {
return input.Column(idx), nil
}
}
}
// For ambiguous columns, we need to filter on the name and type and combine matching columns into a CoalesceVector.
if expr.Ref.Type == types.ColumnTypeAmbiguous {
var fieldIndices []int
var fieldIdents []*semconv.Identifier
for idx, field := range input.Schema().Fields() {
ident, err := semconv.ParseFQN(field.Name)
if err != nil {
return nil, fmt.Errorf("failed to parse column %s: %w", field.Name, err)
}
if ident.ShortName() == colIdent.ShortName() {
fieldIndices = append(fieldIndices, idx)
fieldIdents = append(fieldIdents, ident)
}
}
// Collect all matching columns and order by precedence
var vecs []*columnWithType
for i := range fieldIndices {
idx := fieldIndices[i]
ident := fieldIdents[i]
// TODO(ashwanth): Support other data types in CoalesceVector.
// For now, ensure all vectors are strings to avoid type conflicts.
if ident.DataType() != types.Loki.String {
return nil, fmt.Errorf("column %s has datatype %s, but expression expects %s", ident.ShortName(), ident.DataType(), types.Loki.String)
}
vecs = append(vecs, &columnWithType{col: input.Column(idx), ct: ident.ColumnType()})
}
// Single column matches
if len(vecs) == 1 {
return vecs[0].col, nil
}
// Multiple columns match
if len(vecs) > 1 {
return NewCoalesce(vecs), nil
}
}
// A non-existent column is represented as a string scalar with zero-value.
// This reflects current behaviour, where a label filter `| foo=""` would match all if `foo` is not defined.
return NewScalar(types.NewLiteral(""), int(input.NumRows())), nil
case *physical.UnaryExpr:
lhr, err := e.eval(expr.Left, input)
if err != nil {
return nil, err
}
fn, err := unaryFunctions.GetForSignature(expr.Op, lhr.DataType())
if err != nil {
return nil, fmt.Errorf("failed to lookup unary function: %w", err)
}
return fn.Evaluate(lhr)
case *physical.BinaryExpr:
lhs, err := e.eval(expr.Left, input)
if err != nil {
return nil, err
}
rhs, err := e.eval(expr.Right, input)
if err != nil {
return nil, err
}
// At the moment we only support functions that accept the same input types.
// TODO(chaudum): Compare Loki type, not Arrow type
if lhs.DataType().ID() != rhs.DataType().ID() {
return nil, fmt.Errorf("failed to lookup binary function for signature %v(%v,%v): types do not match", expr.Op, lhs.DataType(), rhs.DataType())
}
// TODO(chaudum): Resolve function by Loki type
fn, err := binaryFunctions.GetForSignature(expr.Op, lhs.DataType())
if err != nil {
return nil, fmt.Errorf("failed to lookup binary function for signature %v(%v,%v): %w", expr.Op, lhs.DataType(), rhs.DataType(), err)
}
// Check is lhs and rhs are Scalar vectors, because certain function types, such as regexp functions
// can optimize the evaluation per batch.
_, lhsIsScalar := expr.Left.(*physical.LiteralExpr)
_, rhsIsScalar := expr.Right.(*physical.LiteralExpr)
return fn.Evaluate(lhs, rhs, lhsIsScalar, rhsIsScalar)
case *physical.VariadicExpr:
args := make([]arrow.Array, len(expr.Expressions))
for i, arg := range expr.Expressions {
p, err := e.eval(arg, input)
if err != nil {
return nil, err
}
args[i] = p
}
fn, err := variadicFunctions.GetForSignature(expr.Op)
if err != nil {
return nil, fmt.Errorf("failed to lookup unary function: %w", err)
}
return fn.Evaluate(args...)
}
return nil, fmt.Errorf("unknown expression: %v", expr)
}
// newFunc returns a new function that can evaluate an input against a binded expression.
func (e expressionEvaluator) newFunc(expr physical.Expression) evalFunc {
return func(input arrow.RecordBatch) (arrow.Array, error) {
return e.eval(expr, input)
}
}
type evalFunc func(input arrow.RecordBatch) (arrow.Array, error)
type columnWithType struct {
col arrow.Array
ct types.ColumnType
}