mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
373 lines
12 KiB
373 lines
12 KiB
package executor
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"regexp"
|
|
|
|
"github.com/apache/arrow-go/v18/arrow"
|
|
"github.com/apache/arrow-go/v18/arrow/memory"
|
|
"github.com/apache/arrow-go/v18/arrow/scalar"
|
|
|
|
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/types"
|
|
)
|
|
|
|
// buildLogsPredicate builds a [logs.Predicate] from an expr. The columns slice
|
|
// determines available columns that can be referenced by the expression.
|
|
//
|
|
// The returned predicate performs no filtering on stream ID; callers must use
|
|
// [logs.AndPredicate] and manually include filters for stream IDs if desired.
|
|
//
|
|
// References to columns that do not exist in the columns slice map to a
|
|
// [logs.FalsePredicate].
|
|
//
|
|
// buildLogsPredicate returns an error if:
|
|
//
|
|
// - Expressions cannot be represented as a boolean value
|
|
// - An expression is not supported (such as comparing two columns or comparing
|
|
// two literals).
|
|
func buildLogsPredicate(expr physical.Expression, columns []*logs.Column) (logs.Predicate, error) {
|
|
switch expr := expr.(type) {
|
|
case physical.UnaryExpression:
|
|
return buildLogsUnaryPredicate(expr, columns)
|
|
|
|
case physical.BinaryExpression:
|
|
return buildLogsBinaryPredicate(expr, columns)
|
|
|
|
case *physical.LiteralExpr:
|
|
if expr.ValueType() == types.Loki.Bool {
|
|
val := expr.Value().(bool)
|
|
if val {
|
|
return logs.TruePredicate{}, nil
|
|
}
|
|
return logs.FalsePredicate{}, nil
|
|
}
|
|
|
|
case *physical.ColumnExpr:
|
|
// TODO(rfratto): This would add support for statements like
|
|
//
|
|
// SELECT * WHERE boolean_column
|
|
//
|
|
// (where boolean_column is a column containing boolean values). No such
|
|
// column type exists now, so I'm leaving it unsupported here for the time
|
|
// being.
|
|
return nil, fmt.Errorf("plain column references (%s) are unsupported for logs predicates", expr)
|
|
|
|
default:
|
|
// TODO(rfratto): This doesn't yet cover two potential future cases:
|
|
//
|
|
// * A plain boolean literal
|
|
// * A reference to a column containing boolean values
|
|
return nil, fmt.Errorf("expression %[1]s (type %[1]T) cannot be interpreted as a boolean", expr)
|
|
}
|
|
|
|
return nil, fmt.Errorf("expression %[1]s (type %[1]T) cannot be interpreted as a boolean", expr)
|
|
}
|
|
|
|
func buildLogsUnaryPredicate(expr physical.UnaryExpression, columns []*logs.Column) (logs.Predicate, error) {
|
|
unaryExpr, ok := expr.(*physical.UnaryExpr)
|
|
if !ok {
|
|
return nil, fmt.Errorf("expected physical.UnaryExpr, got %[1]T", expr)
|
|
}
|
|
|
|
inner, err := buildLogsPredicate(unaryExpr.Left, columns)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("building unary predicate: %w", err)
|
|
}
|
|
|
|
switch unaryExpr.Op {
|
|
case types.UnaryOpNot:
|
|
return logs.NotPredicate{Inner: inner}, nil
|
|
}
|
|
|
|
return nil, fmt.Errorf("unsupported unary operator %s in logs predicate", unaryExpr.Op)
|
|
}
|
|
|
|
var comparisonBinaryOps = map[types.BinaryOp]struct{}{
|
|
types.BinaryOpEq: {},
|
|
types.BinaryOpNeq: {},
|
|
types.BinaryOpGt: {},
|
|
types.BinaryOpGte: {},
|
|
types.BinaryOpLt: {},
|
|
types.BinaryOpLte: {},
|
|
types.BinaryOpMatchSubstr: {},
|
|
types.BinaryOpNotMatchSubstr: {},
|
|
types.BinaryOpMatchRe: {},
|
|
types.BinaryOpNotMatchRe: {},
|
|
types.BinaryOpMatchPattern: {},
|
|
types.BinaryOpNotMatchPattern: {},
|
|
}
|
|
|
|
func buildLogsBinaryPredicate(expr physical.BinaryExpression, columns []*logs.Column) (logs.Predicate, error) {
|
|
binaryExpr, ok := expr.(*physical.BinaryExpr)
|
|
if !ok {
|
|
return nil, fmt.Errorf("expected physical.BinaryExpr, got %[1]T", expr)
|
|
}
|
|
|
|
switch binaryExpr.Op {
|
|
case types.BinaryOpAnd:
|
|
left, err := buildLogsPredicate(binaryExpr.Left, columns)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("building left binary predicate: %w", err)
|
|
}
|
|
right, err := buildLogsPredicate(binaryExpr.Right, columns)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("building right binary predicate: %w", err)
|
|
}
|
|
return logs.AndPredicate{Left: left, Right: right}, nil
|
|
|
|
case types.BinaryOpOr:
|
|
left, err := buildLogsPredicate(binaryExpr.Left, columns)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("building left binary predicate: %w", err)
|
|
}
|
|
right, err := buildLogsPredicate(binaryExpr.Right, columns)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("building right binary predicate: %w", err)
|
|
}
|
|
return logs.OrPredicate{Left: left, Right: right}, nil
|
|
}
|
|
|
|
if _, ok := comparisonBinaryOps[binaryExpr.Op]; ok {
|
|
return buildLogsComparison(binaryExpr, columns)
|
|
}
|
|
|
|
return nil, fmt.Errorf("expression %[1]s (type %[1]T) cannot be interpreted as a boolean", expr)
|
|
}
|
|
|
|
func buildLogsComparison(expr *physical.BinaryExpr, columns []*logs.Column) (logs.Predicate, error) {
|
|
// Currently, we only support comparisons where the left-hand side is a
|
|
// [physical.ColumnExpr] and the right-hand side is a [physical.LiteralExpr].
|
|
//
|
|
// Support for other cases could be added in the future:
|
|
//
|
|
// * LHS LiteralExpr, RHS ColumnExpr could be supported by inverting the
|
|
// operation.
|
|
// * LHS LiteralExpr, RHS LiteralExpr could be supported by evaluating the
|
|
// expresion immediately.
|
|
// * LHS ColumnExpr, RHS ColumnExpr could be supported in the future, but would need
|
|
// support down to the dataset level.
|
|
|
|
columnRef, leftValid := expr.Left.(*physical.ColumnExpr)
|
|
literalExpr, rightValid := expr.Right.(*physical.LiteralExpr)
|
|
|
|
if !leftValid || !rightValid {
|
|
return nil, fmt.Errorf("binary comparisons require the left-hand operation to reference a column (got %T) and the right-hand operation to be a literal (got %T)", expr.Left, expr.Right)
|
|
}
|
|
|
|
// findColumn may return nil for col if the referenced column doesn't exist;
|
|
// this is handled in the switch statement below and converts to either
|
|
// [logs.FalsePredicate] or [logs.TruePredicate] depending on the operation.
|
|
col, err := findColumn(columnRef.Ref, columns)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("finding column %s: %w", columnRef.Ref, err)
|
|
}
|
|
|
|
s, err := buildDataobjScalar(literalExpr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
switch expr.Op {
|
|
case types.BinaryOpEq:
|
|
if col == nil && s.IsValid() {
|
|
return logs.FalsePredicate{}, nil // Column(NULL) == non-null: always fails
|
|
} else if col == nil && !s.IsValid() {
|
|
return logs.TruePredicate{}, nil // Column(NULL) == NULL: always passes
|
|
}
|
|
return logs.EqualPredicate{Column: col, Value: s}, nil
|
|
|
|
case types.BinaryOpNeq:
|
|
if col == nil && s.IsValid() {
|
|
return logs.TruePredicate{}, nil // Column(NULL) != non-null: always passes
|
|
} else if col == nil && !s.IsValid() {
|
|
return logs.FalsePredicate{}, nil // Column(NULL) != NULL: always fails
|
|
}
|
|
return logs.NotPredicate{Inner: logs.EqualPredicate{Column: col, Value: s}}, nil
|
|
|
|
case types.BinaryOpGt:
|
|
if col == nil {
|
|
return logs.FalsePredicate{}, nil // Column(NULL) > value: always fails
|
|
}
|
|
return logs.GreaterThanPredicate{Column: col, Value: s}, nil
|
|
|
|
case types.BinaryOpGte:
|
|
if col == nil {
|
|
return logs.FalsePredicate{}, nil // Column(NULL) >= value: always fails
|
|
}
|
|
return logs.OrPredicate{
|
|
Left: logs.GreaterThanPredicate{Column: col, Value: s},
|
|
Right: logs.EqualPredicate{Column: col, Value: s},
|
|
}, nil
|
|
|
|
case types.BinaryOpLt:
|
|
if col == nil {
|
|
return logs.FalsePredicate{}, nil // Column(NULL) < value: always fails
|
|
}
|
|
return logs.LessThanPredicate{Column: col, Value: s}, nil
|
|
|
|
case types.BinaryOpLte:
|
|
if col == nil {
|
|
return logs.FalsePredicate{}, nil // Column(NULL) <= value: always fails
|
|
}
|
|
return logs.OrPredicate{
|
|
Left: logs.LessThanPredicate{Column: col, Value: s},
|
|
Right: logs.EqualPredicate{Column: col, Value: s},
|
|
}, nil
|
|
|
|
case types.BinaryOpMatchSubstr, types.BinaryOpMatchRe, types.BinaryOpMatchPattern:
|
|
if col == nil {
|
|
return logs.FalsePredicate{}, nil // Match operations against a non-existent column will always fail.
|
|
}
|
|
return buildLogsMatch(col, expr.Op, s)
|
|
|
|
case types.BinaryOpNotMatchSubstr, types.BinaryOpNotMatchRe, types.BinaryOpNotMatchPattern:
|
|
if col == nil {
|
|
return logs.TruePredicate{}, nil // Not match operations against a non-existent column will always pass.
|
|
}
|
|
return buildLogsMatch(col, expr.Op, s)
|
|
}
|
|
|
|
return nil, fmt.Errorf("unsupported binary operator %s in logs predicate", expr.Op)
|
|
}
|
|
|
|
// findColumn finds a column by ref in the slice of columns. If ref is invalid,
|
|
// findColumn returns an error. If the column does not exist, findColumn
|
|
// returns nil.
|
|
func findColumn(ref types.ColumnRef, columns []*logs.Column) (*logs.Column, error) {
|
|
if ref.Type != types.ColumnTypeBuiltin && ref.Type != types.ColumnTypeMetadata && ref.Type != types.ColumnTypeAmbiguous {
|
|
return nil, fmt.Errorf("invalid column ref %s, expected builtin or metadata", ref)
|
|
}
|
|
|
|
columnMatch := func(ref types.ColumnRef, column *logs.Column) bool {
|
|
switch {
|
|
case ref.Type == types.ColumnTypeBuiltin && ref.Column == types.ColumnNameBuiltinTimestamp:
|
|
return column.Type == logs.ColumnTypeTimestamp
|
|
case ref.Type == types.ColumnTypeBuiltin && ref.Column == types.ColumnNameBuiltinMessage:
|
|
return column.Type == logs.ColumnTypeMessage
|
|
case ref.Type == types.ColumnTypeMetadata || ref.Type == types.ColumnTypeAmbiguous:
|
|
return column.Name == ref.Column
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
for _, column := range columns {
|
|
if columnMatch(ref, column) {
|
|
return column, nil
|
|
}
|
|
}
|
|
|
|
return nil, nil
|
|
}
|
|
|
|
// buildDataobjScalar builds a dataobj-compatible [scalar.Scalar] from a
|
|
// [types.Literal].
|
|
func buildDataobjScalar(expr *physical.LiteralExpr) (scalar.Scalar, error) {
|
|
// [logs.ReaderOptions.Validate] specifies that all scalars must be one of
|
|
// the given types:
|
|
//
|
|
// * [scalar.Null] (of any types)
|
|
// * [scalar.Int64]
|
|
// * [scalar.Uint64]
|
|
// * [scalar.Timestamp] (nanosecond precision)
|
|
// * [scalar.Binary]
|
|
//
|
|
// All of our mappings below evaluate to one of the above types.
|
|
|
|
switch lit := expr.Literal().(type) {
|
|
case types.NullLiteral:
|
|
return scalar.ScalarNull, nil
|
|
case types.IntegerLiteral:
|
|
return scalar.NewInt64Scalar(lit.Value()), nil
|
|
case types.BytesLiteral:
|
|
// [types.BytesLiteral] refers to byte sizes, not binary data.
|
|
return scalar.NewInt64Scalar(int64(lit.Value())), nil
|
|
case types.TimestampLiteral:
|
|
ts := arrow.Timestamp(lit.Value())
|
|
tsType := arrow.FixedWidthTypes.Timestamp_ns
|
|
return scalar.NewTimestampScalar(ts, tsType), nil
|
|
case types.StringLiteral:
|
|
buf := memory.NewBufferBytes([]byte(lit.Value()))
|
|
return scalar.NewBinaryScalar(buf, arrow.BinaryTypes.Binary), nil
|
|
}
|
|
|
|
return nil, fmt.Errorf("unsupported literal type %T", expr)
|
|
}
|
|
|
|
func buildLogsMatch(col *logs.Column, op types.BinaryOp, value scalar.Scalar) (logs.Predicate, error) {
|
|
// All the match operations require the value to be a string or a binary.
|
|
var find []byte
|
|
|
|
switch value := value.(type) {
|
|
case *scalar.Binary:
|
|
find = value.Data()
|
|
case *scalar.String:
|
|
find = value.Data()
|
|
default:
|
|
return nil, fmt.Errorf("unsupported scalar type %T for op %s, expected binary or string", value, op)
|
|
}
|
|
|
|
switch op {
|
|
case types.BinaryOpMatchSubstr:
|
|
return logs.FuncPredicate{
|
|
Column: col,
|
|
Keep: func(_ *logs.Column, value scalar.Scalar) bool {
|
|
return bytes.Contains(getBytes(value), find)
|
|
},
|
|
}, nil
|
|
|
|
case types.BinaryOpNotMatchSubstr:
|
|
return logs.FuncPredicate{
|
|
Column: col,
|
|
Keep: func(_ *logs.Column, value scalar.Scalar) bool {
|
|
return !bytes.Contains(getBytes(value), find)
|
|
},
|
|
}, nil
|
|
|
|
case types.BinaryOpMatchRe:
|
|
re, err := regexp.Compile(string(find))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return logs.FuncPredicate{
|
|
Column: col,
|
|
Keep: func(_ *logs.Column, value scalar.Scalar) bool {
|
|
return re.Match(getBytes(value))
|
|
},
|
|
}, nil
|
|
|
|
case types.BinaryOpNotMatchRe:
|
|
re, err := regexp.Compile(string(find))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return logs.FuncPredicate{
|
|
Column: col,
|
|
Keep: func(_ *logs.Column, value scalar.Scalar) bool {
|
|
return !re.Match(getBytes(value))
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
// NOTE(rfratto): [types.BinaryOpMatchPattern] and [types.BinaryOpNotMatchPattern]
|
|
// are currently unsupported.
|
|
return nil, fmt.Errorf("unrecognized match operation %s", op)
|
|
}
|
|
|
|
func getBytes(value scalar.Scalar) []byte {
|
|
if !value.IsValid() {
|
|
return nil
|
|
}
|
|
|
|
switch value := value.(type) {
|
|
case *scalar.Binary:
|
|
return value.Data()
|
|
case *scalar.String:
|
|
return value.Data()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|