Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/pkg/engine/internal/executor/dataobjscan_predicate.go

374 lines
12 KiB

package executor
import (
"bytes"
"fmt"
"regexp"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/arrow/scalar"
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
7 months ago
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
)
// buildLogsPredicate builds a [logs.Predicate] from an expr. The columns slice
// determines available columns that can be referenced by the expression.
//
// The returned predicate performs no filtering on stream ID; callers must use
// [logs.AndPredicate] and manually include filters for stream IDs if desired.
//
// References to columns that do not exist in the columns slice map to a
// [logs.FalsePredicate].
//
// buildLogsPredicate returns an error if:
//
// - Expressions cannot be represented as a boolean value
// - An expression is not supported (such as comparing two columns or comparing
// two literals).
func buildLogsPredicate(expr physical.Expression, columns []*logs.Column) (logs.Predicate, error) {
switch expr := expr.(type) {
case physical.UnaryExpression:
return buildLogsUnaryPredicate(expr, columns)
case physical.BinaryExpression:
return buildLogsBinaryPredicate(expr, columns)
case *physical.LiteralExpr:
if expr.ValueType() == types.Loki.Bool {
val := expr.Value().(bool)
if val {
return logs.TruePredicate{}, nil
}
return logs.FalsePredicate{}, nil
}
case *physical.ColumnExpr:
// TODO(rfratto): This would add support for statements like
//
// SELECT * WHERE boolean_column
//
// (where boolean_column is a column containing boolean values). No such
// column type exists now, so I'm leaving it unsupported here for the time
// being.
return nil, fmt.Errorf("plain column references (%s) are unsupported for logs predicates", expr)
default:
// TODO(rfratto): This doesn't yet cover two potential future cases:
//
// * A plain boolean literal
// * A reference to a column containing boolean values
return nil, fmt.Errorf("expression %[1]s (type %[1]T) cannot be interpreted as a boolean", expr)
}
return nil, fmt.Errorf("expression %[1]s (type %[1]T) cannot be interpreted as a boolean", expr)
}
func buildLogsUnaryPredicate(expr physical.UnaryExpression, columns []*logs.Column) (logs.Predicate, error) {
unaryExpr, ok := expr.(*physical.UnaryExpr)
if !ok {
return nil, fmt.Errorf("expected physical.UnaryExpr, got %[1]T", expr)
}
inner, err := buildLogsPredicate(unaryExpr.Left, columns)
if err != nil {
return nil, fmt.Errorf("building unary predicate: %w", err)
}
switch unaryExpr.Op {
case types.UnaryOpNot:
return logs.NotPredicate{Inner: inner}, nil
}
return nil, fmt.Errorf("unsupported unary operator %s in logs predicate", unaryExpr.Op)
}
var comparisonBinaryOps = map[types.BinaryOp]struct{}{
types.BinaryOpEq: {},
types.BinaryOpNeq: {},
types.BinaryOpGt: {},
types.BinaryOpGte: {},
types.BinaryOpLt: {},
types.BinaryOpLte: {},
types.BinaryOpMatchSubstr: {},
types.BinaryOpNotMatchSubstr: {},
types.BinaryOpMatchRe: {},
types.BinaryOpNotMatchRe: {},
types.BinaryOpMatchPattern: {},
types.BinaryOpNotMatchPattern: {},
}
func buildLogsBinaryPredicate(expr physical.BinaryExpression, columns []*logs.Column) (logs.Predicate, error) {
binaryExpr, ok := expr.(*physical.BinaryExpr)
if !ok {
return nil, fmt.Errorf("expected physical.BinaryExpr, got %[1]T", expr)
}
switch binaryExpr.Op {
case types.BinaryOpAnd:
left, err := buildLogsPredicate(binaryExpr.Left, columns)
if err != nil {
return nil, fmt.Errorf("building left binary predicate: %w", err)
}
right, err := buildLogsPredicate(binaryExpr.Right, columns)
if err != nil {
return nil, fmt.Errorf("building right binary predicate: %w", err)
}
return logs.AndPredicate{Left: left, Right: right}, nil
case types.BinaryOpOr:
left, err := buildLogsPredicate(binaryExpr.Left, columns)
if err != nil {
return nil, fmt.Errorf("building left binary predicate: %w", err)
}
right, err := buildLogsPredicate(binaryExpr.Right, columns)
if err != nil {
return nil, fmt.Errorf("building right binary predicate: %w", err)
}
return logs.OrPredicate{Left: left, Right: right}, nil
}
if _, ok := comparisonBinaryOps[binaryExpr.Op]; ok {
return buildLogsComparison(binaryExpr, columns)
}
return nil, fmt.Errorf("expression %[1]s (type %[1]T) cannot be interpreted as a boolean", expr)
}
func buildLogsComparison(expr *physical.BinaryExpr, columns []*logs.Column) (logs.Predicate, error) {
// Currently, we only support comparisons where the left-hand side is a
// [physical.ColumnExpr] and the right-hand side is a [physical.LiteralExpr].
//
// Support for other cases could be added in the future:
//
// * LHS LiteralExpr, RHS ColumnExpr could be supported by inverting the
// operation.
// * LHS LiteralExpr, RHS LiteralExpr could be supported by evaluating the
// expresion immediately.
// * LHS ColumnExpr, RHS ColumnExpr could be supported in the future, but would need
// support down to the dataset level.
columnRef, leftValid := expr.Left.(*physical.ColumnExpr)
literalExpr, rightValid := expr.Right.(*physical.LiteralExpr)
if !leftValid || !rightValid {
return nil, fmt.Errorf("binary comparisons require the left-hand operation to reference a column (got %T) and the right-hand operation to be a literal (got %T)", expr.Left, expr.Right)
}
// findColumn may return nil for col if the referenced column doesn't exist;
// this is handled in the switch statement below and converts to either
// [logs.FalsePredicate] or [logs.TruePredicate] depending on the operation.
col, err := findColumn(columnRef.Ref, columns)
if err != nil {
return nil, fmt.Errorf("finding column %s: %w", columnRef.Ref, err)
}
s, err := buildDataobjScalar(literalExpr)
if err != nil {
return nil, err
}
switch expr.Op {
case types.BinaryOpEq:
if col == nil && s.IsValid() {
return logs.FalsePredicate{}, nil // Column(NULL) == non-null: always fails
} else if col == nil && !s.IsValid() {
return logs.TruePredicate{}, nil // Column(NULL) == NULL: always passes
}
return logs.EqualPredicate{Column: col, Value: s}, nil
case types.BinaryOpNeq:
if col == nil && s.IsValid() {
return logs.TruePredicate{}, nil // Column(NULL) != non-null: always passes
} else if col == nil && !s.IsValid() {
return logs.FalsePredicate{}, nil // Column(NULL) != NULL: always fails
}
return logs.NotPredicate{Inner: logs.EqualPredicate{Column: col, Value: s}}, nil
case types.BinaryOpGt:
if col == nil {
return logs.FalsePredicate{}, nil // Column(NULL) > value: always fails
}
return logs.GreaterThanPredicate{Column: col, Value: s}, nil
case types.BinaryOpGte:
if col == nil {
return logs.FalsePredicate{}, nil // Column(NULL) >= value: always fails
}
return logs.OrPredicate{
Left: logs.GreaterThanPredicate{Column: col, Value: s},
Right: logs.EqualPredicate{Column: col, Value: s},
}, nil
case types.BinaryOpLt:
if col == nil {
return logs.FalsePredicate{}, nil // Column(NULL) < value: always fails
}
return logs.LessThanPredicate{Column: col, Value: s}, nil
case types.BinaryOpLte:
if col == nil {
return logs.FalsePredicate{}, nil // Column(NULL) <= value: always fails
}
return logs.OrPredicate{
Left: logs.LessThanPredicate{Column: col, Value: s},
Right: logs.EqualPredicate{Column: col, Value: s},
}, nil
case types.BinaryOpMatchSubstr, types.BinaryOpMatchRe, types.BinaryOpMatchPattern:
if col == nil {
return logs.FalsePredicate{}, nil // Match operations against a non-existent column will always fail.
}
return buildLogsMatch(col, expr.Op, s)
case types.BinaryOpNotMatchSubstr, types.BinaryOpNotMatchRe, types.BinaryOpNotMatchPattern:
if col == nil {
return logs.TruePredicate{}, nil // Not match operations against a non-existent column will always pass.
}
return buildLogsMatch(col, expr.Op, s)
}
return nil, fmt.Errorf("unsupported binary operator %s in logs predicate", expr.Op)
}
// findColumn finds a column by ref in the slice of columns. If ref is invalid,
// findColumn returns an error. If the column does not exist, findColumn
// returns nil.
func findColumn(ref types.ColumnRef, columns []*logs.Column) (*logs.Column, error) {
if ref.Type != types.ColumnTypeBuiltin && ref.Type != types.ColumnTypeMetadata && ref.Type != types.ColumnTypeAmbiguous {
return nil, fmt.Errorf("invalid column ref %s, expected builtin or metadata", ref)
}
columnMatch := func(ref types.ColumnRef, column *logs.Column) bool {
switch {
case ref.Type == types.ColumnTypeBuiltin && ref.Column == types.ColumnNameBuiltinTimestamp:
return column.Type == logs.ColumnTypeTimestamp
case ref.Type == types.ColumnTypeBuiltin && ref.Column == types.ColumnNameBuiltinMessage:
return column.Type == logs.ColumnTypeMessage
case ref.Type == types.ColumnTypeMetadata || ref.Type == types.ColumnTypeAmbiguous:
return column.Name == ref.Column
}
return false
}
for _, column := range columns {
if columnMatch(ref, column) {
return column, nil
}
}
return nil, nil
}
// buildDataobjScalar builds a dataobj-compatible [scalar.Scalar] from a
// [types.Literal].
func buildDataobjScalar(expr *physical.LiteralExpr) (scalar.Scalar, error) {
// [logs.ReaderOptions.Validate] specifies that all scalars must be one of
// the given types:
//
// * [scalar.Null] (of any types)
// * [scalar.Int64]
// * [scalar.Uint64]
// * [scalar.Timestamp] (nanosecond precision)
// * [scalar.Binary]
//
// All of our mappings below evaluate to one of the above types.
switch lit := expr.Literal().(type) {
case types.NullLiteral:
return scalar.ScalarNull, nil
case types.IntegerLiteral:
return scalar.NewInt64Scalar(lit.Value()), nil
case types.BytesLiteral:
// [types.BytesLiteral] refers to byte sizes, not binary data.
return scalar.NewInt64Scalar(int64(lit.Value())), nil
case types.TimestampLiteral:
ts := arrow.Timestamp(lit.Value())
tsType := arrow.FixedWidthTypes.Timestamp_ns
return scalar.NewTimestampScalar(ts, tsType), nil
case types.StringLiteral:
buf := memory.NewBufferBytes([]byte(lit.Value()))
return scalar.NewBinaryScalar(buf, arrow.BinaryTypes.Binary), nil
}
return nil, fmt.Errorf("unsupported literal type %T", expr)
}
func buildLogsMatch(col *logs.Column, op types.BinaryOp, value scalar.Scalar) (logs.Predicate, error) {
// All the match operations require the value to be a string or a binary.
var find []byte
switch value := value.(type) {
case *scalar.Binary:
find = value.Data()
case *scalar.String:
find = value.Data()
default:
return nil, fmt.Errorf("unsupported scalar type %T for op %s, expected binary or string", value, op)
}
switch op {
case types.BinaryOpMatchSubstr:
return logs.FuncPredicate{
Column: col,
Keep: func(_ *logs.Column, value scalar.Scalar) bool {
return bytes.Contains(getBytes(value), find)
},
}, nil
case types.BinaryOpNotMatchSubstr:
return logs.FuncPredicate{
Column: col,
Keep: func(_ *logs.Column, value scalar.Scalar) bool {
return !bytes.Contains(getBytes(value), find)
},
}, nil
case types.BinaryOpMatchRe:
re, err := regexp.Compile(string(find))
if err != nil {
return nil, err
}
return logs.FuncPredicate{
Column: col,
Keep: func(_ *logs.Column, value scalar.Scalar) bool {
return re.Match(getBytes(value))
},
}, nil
case types.BinaryOpNotMatchRe:
re, err := regexp.Compile(string(find))
if err != nil {
return nil, err
}
return logs.FuncPredicate{
Column: col,
Keep: func(_ *logs.Column, value scalar.Scalar) bool {
return !re.Match(getBytes(value))
},
}, nil
}
// NOTE(rfratto): [types.BinaryOpMatchPattern] and [types.BinaryOpNotMatchPattern]
// are currently unsupported.
return nil, fmt.Errorf("unrecognized match operation %s", op)
}
func getBytes(value scalar.Scalar) []byte {
if !value.IsValid() {
return nil
}
switch value := value.(type) {
case *scalar.Binary:
return value.Data()
case *scalar.String:
return value.Data()
}
return nil
}