chore(engine): Filter by unknown column (#18131)

The new engine fails to execute a query when an `arrow.Record` (batch) does not contain the column on which a label filter is used, e.g. `| foo="bar"` where `foo` is neither a label nor a structured metadata.

This change also fixed an incorrect conversion of the label filter operator `=`, which was converted to `MATCH_STR`, but requires to be `EQ`.

The new engine reflects current behaviour of a label filter `| foo=""` where `foo` is undefined.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/18144/head
Christian Haudum 7 months ago committed by GitHub
parent ba9b93ba42
commit 16ccf9be1e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 40
      pkg/engine/executor/dataobjscan.go
  2. 10
      pkg/engine/executor/dataobjscan_test.go
  3. 9
      pkg/engine/executor/expressions.go
  4. 11
      pkg/engine/executor/expressions_test.go
  5. 9
      pkg/engine/executor/filter.go
  6. 4
      pkg/engine/executor/sortmerge_test.go
  7. 4
      pkg/engine/planner/logical/planner.go
  8. 4
      pkg/engine/planner/logical/planner_test.go

@ -406,10 +406,6 @@ func schemaFromColumns(columns []physical.ColumnExpression) (*arrow.Schema, erro
return nil, fmt.Errorf("invalid column expression type %T", column)
}
md := arrow.MetadataFrom(map[string]string{
types.MetadataKeyColumnType: columnExpr.Ref.Type.String(),
})
switch columnExpr.Ref.Type {
case types.ColumnTypeLabel:
// TODO(rfratto): Switch to dictionary encoding for labels.
@ -424,9 +420,10 @@ func schemaFromColumns(columns []physical.ColumnExpression) (*arrow.Schema, erro
//
// We skipped dictionary encoding for now to get the initial prototype
// working.
ty, md := arrowTypeFromColumnRef(columnExpr.Ref)
addField(arrow.Field{
Name: columnExpr.Ref.Column,
Type: arrow.BinaryTypes.String,
Type: ty,
Nullable: true,
Metadata: md,
})
@ -436,15 +433,16 @@ func schemaFromColumns(columns []physical.ColumnExpression) (*arrow.Schema, erro
// has unconstrained cardinality. Using dictionary encoding would require
// tracking every encoded value in the record, which is likely to be too
// expensive.
ty, md := arrowTypeFromColumnRef(columnExpr.Ref)
addField(arrow.Field{
Name: columnExpr.Ref.Column,
Type: arrow.BinaryTypes.String,
Type: ty,
Nullable: true,
Metadata: md,
})
case types.ColumnTypeBuiltin:
ty, md := builtinColumnType(columnExpr.Ref)
ty, md := arrowTypeFromColumnRef(columnExpr.Ref)
addField(arrow.Field{
Name: columnExpr.Ref.Column,
Type: ty,
@ -472,16 +470,16 @@ func schemaFromColumns(columns []physical.ColumnExpression) (*arrow.Schema, erro
Name: columnExpr.Ref.Column,
Type: arrow.BinaryTypes.String,
Nullable: true,
Metadata: arrow.MetadataFrom(map[string]string{types.MetadataKeyColumnType: types.ColumnTypeLabel.String()}),
Metadata: datatype.ColumnMetadata(types.ColumnTypeLabel, datatype.String),
})
addField(arrow.Field{
Name: columnExpr.Ref.Column,
Type: arrow.BinaryTypes.String,
Nullable: true,
Metadata: arrow.MetadataFrom(map[string]string{types.MetadataKeyColumnType: types.ColumnTypeMetadata.String()}),
Metadata: datatype.ColumnMetadata(types.ColumnTypeMetadata, datatype.String),
})
case types.ColumnTypeParsed:
case types.ColumnTypeParsed, types.ColumnTypeGenerated:
return nil, fmt.Errorf("parsed column type not supported: %s", columnExpr.Ref.Type)
}
}
@ -489,19 +487,19 @@ func schemaFromColumns(columns []physical.ColumnExpression) (*arrow.Schema, erro
return arrow.NewSchema(fields, nil), nil
}
func builtinColumnType(ref types.ColumnRef) (arrow.DataType, arrow.Metadata) {
if ref.Type != types.ColumnTypeBuiltin {
panic("builtinColumnType called with a non-builtin column")
func arrowTypeFromColumnRef(ref types.ColumnRef) (arrow.DataType, arrow.Metadata) {
if ref.Type == types.ColumnTypeBuiltin {
switch ref.Column {
case types.ColumnNameBuiltinTimestamp:
return arrow.FixedWidthTypes.Timestamp_ns, datatype.ColumnMetadataBuiltinTimestamp
case types.ColumnNameBuiltinMessage:
return arrow.BinaryTypes.String, datatype.ColumnMetadataBuiltinMessage
default:
panic(fmt.Sprintf("unsupported builtin column type %s", ref))
}
}
switch ref.Column {
case types.ColumnNameBuiltinTimestamp:
return arrow.FixedWidthTypes.Timestamp_ns, datatype.ColumnMetadataBuiltinTimestamp
case types.ColumnNameBuiltinMessage:
return arrow.BinaryTypes.String, datatype.ColumnMetadataBuiltinMessage
default:
panic(fmt.Sprintf("unsupported builtin column type %s", ref))
}
return arrow.BinaryTypes.String, datatype.ColumnMetadata(ref.Type, datatype.String)
}
// appendToBuilder appends a the provided field from record into the given

@ -20,16 +20,10 @@ import (
)
var (
labelMD = buildMetadata(types.ColumnTypeLabel)
metadataMD = buildMetadata(types.ColumnTypeMetadata)
labelMD = datatype.ColumnMetadata(types.ColumnTypeLabel, datatype.String)
metadataMD = datatype.ColumnMetadata(types.ColumnTypeMetadata, datatype.String)
)
func buildMetadata(ty types.ColumnType) arrow.Metadata {
return arrow.MetadataFrom(map[string]string{
types.MetadataKeyColumnType: ty.String(),
})
}
func Test_dataobjScan(t *testing.T) {
obj := buildDataobj(t, []logproto.Stream{
{

@ -8,7 +8,6 @@ import (
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/grafana/loki/v3/pkg/engine/internal/datatype"
"github.com/grafana/loki/v3/pkg/engine/internal/errors"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/engine/planner/physical"
)
@ -46,7 +45,13 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record)
}, nil
}
}
return nil, fmt.Errorf("unknown column %s: %w", expr.Ref.String(), errors.ErrKey)
// A non-existent column is represented as a string scalar with zero-value.
// This reflects current behaviour, where a label filter `| foo=""` would match all if `foo` is not defined.
return &Scalar{
value: datatype.NewStringLiteral(""),
rows: input.NumRows(),
ct: types.ColumnTypeGenerated,
}, nil
case *physical.UnaryExpr:
lhr, err := e.eval(expr.Left, input)

@ -10,7 +10,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/engine/internal/datatype"
"github.com/grafana/loki/v3/pkg/engine/internal/errors"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/engine/planner/physical"
)
@ -102,7 +101,7 @@ func TestEvaluateLiteralExpression(t *testing.T) {
func TestEvaluateColumnExpression(t *testing.T) {
e := expressionEvaluator{}
t.Run("invalid", func(t *testing.T) {
t.Run("unknown column", func(t *testing.T) {
colExpr := &physical.ColumnExpr{
Ref: types.ColumnRef{
Column: "does_not_exist",
@ -112,8 +111,12 @@ func TestEvaluateColumnExpression(t *testing.T) {
n := len(words)
rec := batch(n, time.Now())
_, err := e.eval(colExpr, rec)
require.ErrorContains(t, err, errors.ErrKey.Error())
colVec, err := e.eval(colExpr, rec)
require.NoError(t, err)
_, ok := colVec.(*Scalar)
require.True(t, ok, "expected column vector to be a *Scalar, got %T", colVec)
require.Equal(t, arrow.STRING, colVec.Type().ArrowType().ID())
})
t.Run("string(message)", func(t *testing.T) {

@ -86,6 +86,7 @@ func filterBatch(batch arrow.Record, include func(int) bool) arrow.Record {
additions := make([]func(int), len(fields))
for i, field := range fields {
switch field.Type.ID() {
case arrow.BOOL:
builder := array.NewBooleanBuilder(mem)
@ -127,6 +128,14 @@ func filterBatch(batch arrow.Record, include func(int) bool) arrow.Record {
builder.Append(src.Value(offset))
}
case arrow.TIMESTAMP:
builder := array.NewTimestampBuilder(mem, &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: "UTC"})
builders[i] = builder
additions[i] = func(offset int) {
src := batch.Column(i).(*array.Timestamp)
builder.Append(src.Value(offset))
}
default:
panic(fmt.Sprintf("unimplemented type in filterBatch: %s", field.Type.Name()))
}

@ -24,7 +24,7 @@ func TestSortMerge(t *testing.T) {
merge := &physical.SortMerge{
Column: &physical.ColumnExpr{
Ref: types.ColumnRef{
Column: "invalid",
Column: "not_a_timestamp_column",
Type: types.ColumnTypeBuiltin,
},
},
@ -40,7 +40,7 @@ func TestSortMerge(t *testing.T) {
require.NoError(t, err)
err = pipeline.Read()
require.ErrorContains(t, err, "key error")
require.ErrorContains(t, err, "column is not a timestamp column")
})
t.Run("ascending timestamp", func(t *testing.T) {

@ -182,9 +182,9 @@ func lineColumnRef() *ColumnRef {
func convertLabelMatchType(op labels.MatchType) types.BinaryOp {
switch op {
case labels.MatchEqual:
return types.BinaryOpMatchSubstr
return types.BinaryOpEq
case labels.MatchNotEqual:
return types.BinaryOpNotMatchSubstr
return types.BinaryOpNeq
case labels.MatchRegexp:
return types.BinaryOpMatchRe
case labels.MatchNotRegexp:

@ -98,8 +98,8 @@ func TestConvertAST_Success(t *testing.T) {
%7 = SELECT %5 [predicate=%6]
%8 = LT builtin.timestamp 1970-01-01T02:00:00Z
%9 = SELECT %7 [predicate=%8]
%10 = MATCH_STR ambiguous.foo "bar"
%11 = MATCH_STR ambiguous.bar "baz"
%10 = EQ ambiguous.foo "bar"
%11 = EQ ambiguous.bar "baz"
%12 = OR %10 %11
%13 = SELECT %9 [predicate=%12]
%14 = MATCH_STR builtin.message "metric.go"

Loading…
Cancel
Save