mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
518 lines
20 KiB
518 lines
20 KiB
package executor
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"math"
|
|
"regexp"
|
|
"time"
|
|
|
|
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/datatype"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/types"
|
|
"github.com/grafana/loki/v3/pkg/engine/planner/physical"
|
|
)
|
|
|
|
var matchAllFilter = logs.LogMessageFilterRowPredicate{
|
|
Keep: func(_ []byte) bool { return true },
|
|
}
|
|
|
|
// buildLogsPredicate builds a logs predicate from an expression.
|
|
func buildLogsPredicate(expr physical.Expression) (logs.RowPredicate, error) {
|
|
// TODO(rfratto): implement converting expressions into logs predicates.
|
|
//
|
|
// There's a few challenges here:
|
|
//
|
|
// - Expressions do not cleanly map to [logs.RowPredicate]s. For example,
|
|
// an expression may be simply a column reference, but a logs predicate is
|
|
// always some expression that can evaluate to true.
|
|
//
|
|
// - Mapping expressions into [dataobj.TimeRangePredicate] is a massive pain;
|
|
// since TimeRangePredicate specifies both bounds for the time range, we
|
|
// would need to find and collapse multiple physical.Expressions into a
|
|
// single TimeRangePredicate.
|
|
//
|
|
// - While [dataobj.MetadataMatcherPredicate] and
|
|
// [dataobj.LogMessageFilterPredicate] are catch-alls for function-based
|
|
// predicates, they are row-based and not column-based, so our
|
|
// expressionEvaluator cannot be used here.
|
|
//
|
|
// Long term, we likely want two things:
|
|
//
|
|
// 1. Use dataset.Reader and dataset.Predicate directly instead of
|
|
// dataobj.LogsReader.
|
|
//
|
|
// 2. Update dataset.Reader to be vector based instead of row-based.
|
|
//
|
|
// It's not clear if we should resolve the issues with LogsPredicate (or find
|
|
// hacks to make them work in the short term), or skip straight to using
|
|
// dataset.Reader instead.
|
|
//
|
|
// Implementing DataObjScan in the dataobj package would be a clean way to
|
|
// handle all of this, but that would cause cyclic dependencies. I also don't
|
|
// think we should start removing things from internal for this; we can probably
|
|
// find a way to remove the explicit dependency from the dataobj package from
|
|
// the physical planner instead.
|
|
return mapInitiallySupportedPredicates(expr)
|
|
}
|
|
|
|
// Support for timestamp and metadata predicates has been implemented.
|
|
// TODO(owen-d): this can go away when we use dataset.Reader & dataset.Predicate directly
|
|
func mapInitiallySupportedPredicates(expr physical.Expression) (logs.RowPredicate, error) {
|
|
return mapPredicates(expr)
|
|
}
|
|
|
|
func mapPredicates(expr physical.Expression) (logs.RowPredicate, error) {
|
|
switch e := expr.(type) {
|
|
case *physical.BinaryExpr:
|
|
|
|
// Special case: both sides of the binary op are again binary expressions (where LHS is expected to be a column expression)
|
|
if e.Left.Type() == physical.ExprTypeBinary && e.Right.Type() == physical.ExprTypeBinary {
|
|
left, err := mapPredicates(e.Left)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
right, err := mapPredicates(e.Right)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
switch e.Op {
|
|
case types.BinaryOpAnd:
|
|
return logs.AndRowPredicate{
|
|
Left: left,
|
|
Right: right,
|
|
}, nil
|
|
case types.BinaryOpOr:
|
|
return logs.OrRowPredicate{
|
|
Left: left,
|
|
Right: right,
|
|
}, nil
|
|
default:
|
|
return nil, fmt.Errorf("unsupported operator in predicate: %s", e.Op)
|
|
}
|
|
}
|
|
|
|
if e.Left.Type() != physical.ExprTypeColumn {
|
|
return nil, fmt.Errorf("unsupported predicate, expected column ref on LHS: %s", expr.String())
|
|
}
|
|
|
|
left := e.Left.(*physical.ColumnExpr)
|
|
switch left.Ref.Type {
|
|
case types.ColumnTypeBuiltin:
|
|
if left.Ref.Column == types.ColumnNameBuiltinTimestamp {
|
|
return mapTimestampPredicate(e)
|
|
}
|
|
if left.Ref.Column == types.ColumnNameBuiltinMessage {
|
|
return mapMessagePredicate(e)
|
|
}
|
|
return nil, fmt.Errorf("unsupported builtin column in predicate (only timestamp is supported for now): %s", left.Ref.Column)
|
|
case types.ColumnTypeMetadata:
|
|
return mapMetadataPredicate(e)
|
|
default:
|
|
return nil, fmt.Errorf("unsupported column ref type (%T) in predicate: %s", left.Ref, left.Ref.String())
|
|
}
|
|
default:
|
|
return nil, fmt.Errorf("unsupported expression type (%T) in predicate: %s", expr, expr.String())
|
|
}
|
|
}
|
|
|
|
func mapTimestampPredicate(expr physical.Expression) (logs.TimeRangeRowPredicate, error) {
|
|
m := newTimestampPredicateMapper()
|
|
if err := m.verify(expr); err != nil {
|
|
return logs.TimeRangeRowPredicate{}, err
|
|
}
|
|
|
|
if err := m.processExpr(expr); err != nil {
|
|
return logs.TimeRangeRowPredicate{}, err
|
|
}
|
|
|
|
// Check for impossible ranges that might have been formed.
|
|
if m.res.StartTime.After(m.res.EndTime) {
|
|
return logs.TimeRangeRowPredicate{},
|
|
fmt.Errorf("impossible time range: start_time (%v) is after end_time (%v)", m.res.StartTime, m.res.EndTime)
|
|
}
|
|
if m.res.StartTime.Equal(m.res.EndTime) && (!m.res.IncludeStart || !m.res.IncludeEnd) {
|
|
return logs.TimeRangeRowPredicate{},
|
|
fmt.Errorf("impossible time range: start_time (%v) equals end_time (%v) but the range is exclusive", m.res.StartTime, m.res.EndTime)
|
|
}
|
|
|
|
return m.res, nil
|
|
}
|
|
|
|
func newTimestampPredicateMapper() *timestampPredicateMapper {
|
|
open := logs.TimeRangeRowPredicate{
|
|
StartTime: time.Unix(0, math.MinInt64).UTC(),
|
|
EndTime: time.Unix(0, math.MaxInt64).UTC(),
|
|
IncludeStart: true,
|
|
IncludeEnd: true,
|
|
}
|
|
return ×tampPredicateMapper{
|
|
res: open,
|
|
}
|
|
}
|
|
|
|
type timestampPredicateMapper struct {
|
|
res logs.TimeRangeRowPredicate
|
|
}
|
|
|
|
// ensures the LHS is a timestamp column reference
|
|
// and the RHS is either a literal or a binary expression
|
|
func (m *timestampPredicateMapper) verify(expr physical.Expression) error {
|
|
binop, ok := expr.(*physical.BinaryExpr)
|
|
if !ok {
|
|
return fmt.Errorf("unsupported expression type for timestamp predicate: %T, expected *physical.BinaryExpr", expr)
|
|
}
|
|
|
|
switch binop.Op {
|
|
case types.BinaryOpEq, types.BinaryOpGt, types.BinaryOpGte, types.BinaryOpLt, types.BinaryOpLte:
|
|
lhs, okLHS := binop.Left.(*physical.ColumnExpr)
|
|
if !okLHS || lhs.Ref.Type != types.ColumnTypeBuiltin || lhs.Ref.Column != types.ColumnNameBuiltinTimestamp {
|
|
return fmt.Errorf("invalid LHS for comparison: expected timestamp column, got %s", binop.Left.String())
|
|
}
|
|
|
|
// RHS must be a literal timestamp for simple comparisons.
|
|
rhsLit, okRHS := binop.Right.(*physical.LiteralExpr)
|
|
if !okRHS {
|
|
return fmt.Errorf("invalid RHS for comparison: expected literal timestamp, got %T", binop.Right)
|
|
}
|
|
if rhsLit.ValueType() != datatype.Timestamp {
|
|
return fmt.Errorf("unsupported literal type for RHS: %s, expected timestamp", rhsLit.ValueType())
|
|
}
|
|
return nil
|
|
|
|
case types.BinaryOpAnd:
|
|
if err := m.verify(binop.Left); err != nil {
|
|
return fmt.Errorf("invalid left operand for AND: %w", err)
|
|
}
|
|
if err := m.verify(binop.Right); err != nil {
|
|
return fmt.Errorf("invalid right operand for AND: %w", err)
|
|
}
|
|
return nil
|
|
|
|
default:
|
|
return fmt.Errorf("unsupported operator for timestamp predicate: %s", binop.Op)
|
|
}
|
|
}
|
|
|
|
// processExpr recursively processes the expression tree to update the time range.
|
|
func (m *timestampPredicateMapper) processExpr(expr physical.Expression) error {
|
|
// verify should have already ensured expr is *physical.BinaryExpr
|
|
binExp := expr.(*physical.BinaryExpr)
|
|
|
|
switch binExp.Op {
|
|
case types.BinaryOpAnd:
|
|
if err := m.processExpr(binExp.Left); err != nil {
|
|
return err
|
|
}
|
|
return m.processExpr(binExp.Right)
|
|
case types.BinaryOpEq, types.BinaryOpGt, types.BinaryOpGte, types.BinaryOpLt, types.BinaryOpLte:
|
|
// This is a comparison operation, rebound m.res
|
|
// The 'right' here is binExp.Right, which could be a literal or a nested BinaryExpr.
|
|
// rebound will extract the actual literal value.
|
|
return m.rebound(binExp.Op, binExp.Right)
|
|
default:
|
|
// This case should ideally not be reached if verify is correct.
|
|
return fmt.Errorf("unexpected operator in processExpr: %s", binExp.Op)
|
|
}
|
|
}
|
|
|
|
// rebound updates the time range (m.res) based on a single comparison operation.
|
|
// The `op` is the comparison operator (e.g., Gt, Lte).
|
|
// The `rightExpr` is the RHS of the comparison, which might be a literal
|
|
// or a nested binary expression (e.g., `timestamp > (timestamp = X)`).
|
|
// This function will traverse `rightExpr` to find the innermost literal value.
|
|
func (m *timestampPredicateMapper) rebound(op types.BinaryOp, rightExpr physical.Expression) error {
|
|
// `verify` (called by processExpr before this) now ensures that for comparison ops,
|
|
// the original expression's RHS was a literal, or if it was an AND, its constituent parts were.
|
|
// `processExpr` will pass the direct LiteralExpr from a comparison to rebound.
|
|
literalExpr, ok := rightExpr.(*physical.LiteralExpr)
|
|
if !ok {
|
|
// This should not happen if verify and processExpr are correct.
|
|
return fmt.Errorf("internal error: rebound expected LiteralExpr, got %T for: %s", rightExpr, rightExpr.String())
|
|
}
|
|
|
|
if literalExpr.ValueType() != datatype.Timestamp {
|
|
// Also should be caught by verify.
|
|
return fmt.Errorf("internal error: unsupported literal type in rebound: %s, expected timestamp", literalExpr.ValueType())
|
|
}
|
|
val := literalExpr.Literal.(*datatype.TimestampLiteral).Value()
|
|
|
|
switch op {
|
|
case types.BinaryOpEq: // ts == val
|
|
m.updateLowerBound(val, true)
|
|
m.updateUpperBound(val, true)
|
|
case types.BinaryOpGt: // ts > val
|
|
m.updateLowerBound(val, false)
|
|
case types.BinaryOpGte: // ts >= val
|
|
m.updateLowerBound(val, true)
|
|
case types.BinaryOpLt: // ts < val
|
|
m.updateUpperBound(val, false)
|
|
case types.BinaryOpLte: // ts <= val
|
|
m.updateUpperBound(val, true)
|
|
default:
|
|
// Should not be reached if processExpr filters operators correctly.
|
|
return fmt.Errorf("unsupported operator in rebound: %s", op)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// updateLowerBound updates the start of the time range (m.res.StartTime, m.res.IncludeStart).
|
|
// `val` is the new potential start time from the condition.
|
|
// `includeVal` indicates if this new start time is inclusive (e.g., from '>=' or '==').
|
|
func (m *timestampPredicateMapper) updateLowerBound(val time.Time, includeVal bool) {
|
|
if val.After(m.res.StartTime) {
|
|
// The new value is strictly greater than the current start, so it becomes the new start.
|
|
m.res.StartTime = val
|
|
m.res.IncludeStart = includeVal
|
|
} else if val.Equal(m.res.StartTime) {
|
|
// The new value is equal to the current start.
|
|
// The range becomes more restrictive if the current start was exclusive and the new one is also exclusive or inclusive.
|
|
// Or if the current was inclusive and the new one is exclusive.
|
|
// Effectively, IncludeStart becomes true only if *both* the existing and new condition allow/imply inclusion.
|
|
// No, this should be: if new condition makes it more restrictive (i.e. current is [T and new is (T ), then new is (T )
|
|
// m.res.IncludeStart = m.res.IncludeStart && includeVal (This is for intersection: [a,b] AND (a,c] -> (a, min(b,c)] )
|
|
// m.res.IncludeStart = m.res.IncludeStart && includeVal
|
|
if !includeVal { // if new bound is exclusive (e.g. from `> val`)
|
|
m.res.IncludeStart = false // existing [val,... or (val,... combined with (> val) becomes (val,...
|
|
}
|
|
// If includeVal is true (e.g. from `>= val`), and m.res.IncludeStart was already true, it stays true.
|
|
// If includeVal is true, and m.res.IncludeStart was false, it stays false ( (val,...) AND [val,...] is (val,...) )
|
|
// This logic is subtle. Let's restate:
|
|
// Current Start: S, Is (m.res.StartTime, m.res.IncludeStart)
|
|
// New Condition: val, includeVal
|
|
// if val > S: new start is val, includeVal
|
|
// if val == S: new start is val. IncludeStart becomes m.res.IncludeStart AND includeVal.
|
|
// Example: current (S, ...), new condition S >= S. Combined: (S, ...). So if m.res.IncludeStart=false, new includeVal=true -> false.
|
|
// Example: current [S, ...), new condition S > S. Combined: (S, ...). So if m.res.IncludeStart=true, new includeVal=false -> false.
|
|
// This is correct for intersection.
|
|
m.res.IncludeStart = m.res.IncludeStart && includeVal
|
|
}
|
|
// If val is before m.res.StartTime, the current StartTime is already more restrictive, so no change.
|
|
}
|
|
|
|
// updateUpperBound updates the end of the time range (m.res.EndTime, m.res.IncludeEnd).
|
|
// `val` is the new potential end time from the condition.
|
|
// `includeVal` indicates if this new end time is inclusive (e.g., from '<=' or '==').
|
|
func (m *timestampPredicateMapper) updateUpperBound(val time.Time, includeVal bool) {
|
|
if val.Before(m.res.EndTime) {
|
|
// The new value is strictly less than the current end, so it becomes the new end.
|
|
m.res.EndTime = val
|
|
m.res.IncludeEnd = includeVal
|
|
} else if val.Equal(m.res.EndTime) {
|
|
// The new value is equal to the current end.
|
|
// Similar to updateLowerBound, the inclusiveness is an AND condition.
|
|
// Example: current (..., S), new condition ts <= S. Combined: (..., S). m.res.IncludeEnd=false, includeVal=true -> false.
|
|
// Example: current (..., S], new condition ts < S. Combined: (..., S). m.res.IncludeEnd=true, includeVal=false -> false.
|
|
m.res.IncludeEnd = m.res.IncludeEnd && includeVal
|
|
}
|
|
// If val is after m.res.EndTime, the current EndTime is already more restrictive, so no change.
|
|
}
|
|
|
|
// mapMetadataPredicate converts a physical.Expression into a dataobj.Predicate for metadata filtering.
|
|
// It supports MetadataMatcherPredicate for equality checks on metadata fields,
|
|
// and can recursively handle AndPredicate, OrPredicate, and NotPredicate.
|
|
func mapMetadataPredicate(expr physical.Expression) (logs.RowPredicate, error) {
|
|
switch e := expr.(type) {
|
|
case *physical.BinaryExpr:
|
|
switch e.Op {
|
|
case types.BinaryOpEq:
|
|
if e.Left.Type() != physical.ExprTypeColumn {
|
|
return nil, fmt.Errorf("unsupported LHS type (%v) for EQ metadata predicate, expected ColumnExpr", e.Left.Type())
|
|
}
|
|
leftColumn, ok := e.Left.(*physical.ColumnExpr)
|
|
if !ok { // Should not happen due to Type() check but defensive
|
|
return nil, fmt.Errorf("LHS of EQ metadata predicate failed to cast to ColumnExpr")
|
|
}
|
|
if leftColumn.Ref.Type != types.ColumnTypeMetadata {
|
|
return nil, fmt.Errorf("unsupported LHS column type (%v) for EQ metadata predicate, expected ColumnTypeMetadata", leftColumn.Ref.Type)
|
|
}
|
|
|
|
if e.Right.Type() != physical.ExprTypeLiteral {
|
|
return nil, fmt.Errorf("unsupported RHS type (%v) for EQ metadata predicate, expected LiteralExpr", e.Right.Type())
|
|
}
|
|
rightLiteral, ok := e.Right.(*physical.LiteralExpr)
|
|
if !ok { // Should not happen
|
|
return nil, fmt.Errorf("RHS of EQ metadata predicate failed to cast to LiteralExpr")
|
|
}
|
|
if rightLiteral.ValueType() != datatype.String {
|
|
return nil, fmt.Errorf("unsupported RHS literal type (%v) for EQ metadata predicate, expected ValueTypeStr", rightLiteral.ValueType())
|
|
}
|
|
val := rightLiteral.Literal.(*datatype.StringLiteral).Value()
|
|
|
|
return logs.MetadataMatcherRowPredicate{
|
|
Key: leftColumn.Ref.Column,
|
|
Value: val,
|
|
}, nil
|
|
case types.BinaryOpAnd:
|
|
leftPredicate, err := mapMetadataPredicate(e.Left)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to map left operand of AND: %w", err)
|
|
}
|
|
rightPredicate, err := mapMetadataPredicate(e.Right)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to map right operand of AND: %w", err)
|
|
}
|
|
return logs.AndRowPredicate{
|
|
Left: leftPredicate,
|
|
Right: rightPredicate,
|
|
}, nil
|
|
case types.BinaryOpOr:
|
|
leftPredicate, err := mapMetadataPredicate(e.Left)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to map left operand of OR: %w", err)
|
|
}
|
|
rightPredicate, err := mapMetadataPredicate(e.Right)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to map right operand of OR: %w", err)
|
|
}
|
|
return logs.OrRowPredicate{
|
|
Left: leftPredicate,
|
|
Right: rightPredicate,
|
|
}, nil
|
|
default:
|
|
return nil, fmt.Errorf("unsupported binary operator (%s) for metadata predicate, expected EQ, AND, or OR", e.Op)
|
|
}
|
|
case *physical.UnaryExpr:
|
|
if e.Op != types.UnaryOpNot {
|
|
return nil, fmt.Errorf("unsupported unary operator (%s) for metadata predicate, expected NOT", e.Op)
|
|
}
|
|
innerPredicate, err := mapMetadataPredicate(e.Left)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to map inner expression of NOT: %w", err)
|
|
}
|
|
return logs.NotRowPredicate{
|
|
Inner: innerPredicate,
|
|
}, nil
|
|
default:
|
|
return nil, fmt.Errorf("unsupported expression type (%T) for metadata predicate, expected BinaryExpr or UnaryExpr", expr)
|
|
}
|
|
}
|
|
|
|
func mapMessagePredicate(expr physical.Expression) (logs.RowPredicate, error) {
|
|
switch e := expr.(type) {
|
|
case *physical.BinaryExpr:
|
|
switch e.Op {
|
|
|
|
case types.BinaryOpMatchSubstr, types.BinaryOpNotMatchSubstr, types.BinaryOpMatchRe, types.BinaryOpNotMatchRe:
|
|
return match(e)
|
|
|
|
case types.BinaryOpMatchPattern, types.BinaryOpNotMatchPattern:
|
|
return nil, fmt.Errorf("unsupported binary operator (%s) for log message predicate", e.Op)
|
|
|
|
case types.BinaryOpAnd:
|
|
leftPredicate, err := mapMessagePredicate(e.Left)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to map left operand of AND: %w", err)
|
|
}
|
|
rightPredicate, err := mapMessagePredicate(e.Right)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to map right operand of AND: %w", err)
|
|
}
|
|
return logs.AndRowPredicate{
|
|
Left: leftPredicate,
|
|
Right: rightPredicate,
|
|
}, nil
|
|
|
|
case types.BinaryOpOr:
|
|
leftPredicate, err := mapMessagePredicate(e.Left)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to map left operand of OR: %w", err)
|
|
}
|
|
rightPredicate, err := mapMessagePredicate(e.Right)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to map right operand of OR: %w", err)
|
|
}
|
|
return logs.OrRowPredicate{
|
|
Left: leftPredicate,
|
|
Right: rightPredicate,
|
|
}, nil
|
|
|
|
default:
|
|
return nil, fmt.Errorf("unsupported binary operator (%s) for log message predicate, expected MATCH_STR, NOT_MATCH_STR, MATCH_RE, NOT_MATCH_RE, AND, or OR", e.Op)
|
|
}
|
|
|
|
case *physical.UnaryExpr:
|
|
if e.Op != types.UnaryOpNot {
|
|
return nil, fmt.Errorf("unsupported unary operator (%s) for log message predicate, expected NOT", e.Op)
|
|
}
|
|
innerPredicate, err := mapMessagePredicate(e.Left)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to map inner expression of NOT: %w", err)
|
|
}
|
|
return logs.NotRowPredicate{
|
|
Inner: innerPredicate,
|
|
}, nil
|
|
|
|
default:
|
|
return nil, fmt.Errorf("unsupported expression type (%T) for log message predicate, expected BinaryExpr or UnaryExpr", expr)
|
|
}
|
|
}
|
|
|
|
func match(e *physical.BinaryExpr) (logs.RowPredicate, error) {
|
|
val, err := rhsValue(e)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
switch e.Op {
|
|
|
|
case types.BinaryOpMatchSubstr:
|
|
return logs.LogMessageFilterRowPredicate{
|
|
Keep: func(line []byte) bool { return bytes.Contains(line, []byte(val)) },
|
|
}, nil
|
|
|
|
case types.BinaryOpNotMatchSubstr:
|
|
return logs.LogMessageFilterRowPredicate{
|
|
Keep: func(line []byte) bool { return !bytes.Contains(line, []byte(val)) },
|
|
}, nil
|
|
|
|
case types.BinaryOpMatchRe:
|
|
re, err := regexp.Compile(val)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return logs.LogMessageFilterRowPredicate{
|
|
Keep: func(line []byte) bool { return re.Match(line) },
|
|
}, nil
|
|
|
|
case types.BinaryOpNotMatchRe:
|
|
re, err := regexp.Compile(val)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return logs.LogMessageFilterRowPredicate{
|
|
Keep: func(line []byte) bool { return !re.Match(line) },
|
|
}, nil
|
|
|
|
default:
|
|
return nil, fmt.Errorf("unsupported binary operator (%s) for log message predicate", e.Op)
|
|
}
|
|
}
|
|
|
|
func rhsValue(e *physical.BinaryExpr) (string, error) {
|
|
op := e.Op.String()
|
|
|
|
if e.Left.Type() != physical.ExprTypeColumn {
|
|
return "", fmt.Errorf("unsupported LHS type (%v) for %s message predicate, expected ColumnExpr", e.Left.Type(), op)
|
|
}
|
|
leftColumn, ok := e.Left.(*physical.ColumnExpr)
|
|
if !ok { // Should not happen due to Type() check but defensive
|
|
return "", fmt.Errorf("LHS of %s message predicate failed to cast to ColumnExpr", op)
|
|
}
|
|
if leftColumn.Ref.Type != types.ColumnTypeBuiltin {
|
|
return "", fmt.Errorf("unsupported LHS column type (%v) for %s message predicate, expected ColumnTypeBuiltin", leftColumn.Ref.Type, op)
|
|
}
|
|
|
|
if e.Right.Type() != physical.ExprTypeLiteral {
|
|
return "", fmt.Errorf("unsupported RHS type (%v) for %s message predicate, expected LiteralExpr", e.Right.Type(), op)
|
|
}
|
|
rightLiteral, ok := e.Right.(*physical.LiteralExpr)
|
|
if !ok { // Should not happen
|
|
return "", fmt.Errorf("RHS of %s message predicate failed to cast to LiteralExpr", op)
|
|
}
|
|
if rightLiteral.ValueType() != datatype.String {
|
|
return "", fmt.Errorf("unsupported RHS literal type (%v) for %s message predicate, expected ValueTypeStr", rightLiteral.ValueType(), op)
|
|
}
|
|
|
|
return rightLiteral.Literal.(*datatype.StringLiteral).Value(), nil
|
|
}
|
|
|