diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index 20fdbd79e7..d5edc54485 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -167,7 +167,7 @@ func collectRow(rec arrow.Record, i int, result *resultBuilder) { // TODO(chaudum): We need to add metadata to columns to identify builtins, labels, metadata, and parsed. field := rec.Schema().Field(colIdx) - colType, ok := field.Metadata.GetValue(types.ColumnTypeMetadataKey) + colType, ok := field.Metadata.GetValue(types.MetadataKeyColumnType) // Ignore column values that are NULL or invalid or don't have a column typ if col.IsNull(i) || !col.IsValid(i) || !ok { diff --git a/pkg/engine/engine_test.go b/pkg/engine/engine_test.go index ba3fe17c84..68c381702a 100644 --- a/pkg/engine/engine_test.go +++ b/pkg/engine/engine_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/engine/executor" + "github.com/grafana/loki/v3/pkg/engine/internal/datatype" "github.com/grafana/loki/v3/pkg/engine/internal/types" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logqlmodel" @@ -52,15 +53,14 @@ func createRecord(t *testing.T, schema *arrow.Schema, data [][]interface{}) arro } func TestConvertArrowRecordsToLokiResult(t *testing.T) { - mdTypeBuiltin := arrow.NewMetadata([]string{types.ColumnTypeMetadataKey}, []string{types.ColumnTypeBuiltin.String()}) - mdTypeLabel := arrow.NewMetadata([]string{types.ColumnTypeMetadataKey}, []string{types.ColumnTypeLabel.String()}) - mdTypeMetadata := arrow.NewMetadata([]string{types.ColumnTypeMetadataKey}, []string{types.ColumnTypeMetadata.String()}) + mdTypeLabel := datatype.ColumnMetadata(types.ColumnTypeLabel, datatype.String) + mdTypeMetadata := datatype.ColumnMetadata(types.ColumnTypeMetadata, datatype.String) t.Run("rows without log line, timestamp, or labels are ignored", func(t *testing.T) { schema := arrow.NewSchema( []arrow.Field{ - {Name: types.ColumnNameBuiltinTimestamp, Type: arrow.PrimitiveTypes.Uint64, Metadata: mdTypeBuiltin}, - {Name: types.ColumnNameBuiltinLine, Type: arrow.BinaryTypes.String, Metadata: mdTypeBuiltin}, + {Name: types.ColumnNameBuiltinTimestamp, Type: arrow.PrimitiveTypes.Uint64, Metadata: datatype.ColumnMetadataBuiltinTimestamp}, + {Name: types.ColumnNameBuiltinLine, Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadataBuiltinLine}, {Name: "env", Type: arrow.BinaryTypes.String, Metadata: mdTypeLabel}, }, nil, @@ -117,8 +117,8 @@ func TestConvertArrowRecordsToLokiResult(t *testing.T) { t.Run("successful conversion of labels, log line, timestamp, and structured metadata ", func(t *testing.T) { schema := arrow.NewSchema( []arrow.Field{ - {Name: types.ColumnNameBuiltinTimestamp, Type: arrow.PrimitiveTypes.Uint64, Metadata: mdTypeBuiltin}, - {Name: types.ColumnNameBuiltinLine, Type: arrow.BinaryTypes.String, Metadata: mdTypeBuiltin}, + {Name: types.ColumnNameBuiltinTimestamp, Type: arrow.PrimitiveTypes.Uint64, Metadata: datatype.ColumnMetadataBuiltinTimestamp}, + {Name: types.ColumnNameBuiltinLine, Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadataBuiltinLine}, {Name: "env", Type: arrow.BinaryTypes.String, Metadata: mdTypeLabel}, {Name: "namespace", Type: arrow.BinaryTypes.String, Metadata: mdTypeLabel}, {Name: "traceID", Type: arrow.BinaryTypes.String, Metadata: mdTypeMetadata}, diff --git a/pkg/engine/executor/expressions.go b/pkg/engine/executor/expressions.go index aea6da537c..2a76ad4da1 100644 --- a/pkg/engine/executor/expressions.go +++ b/pkg/engine/executor/expressions.go @@ -7,6 +7,7 @@ import ( "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/grafana/loki/v3/pkg/engine/internal/datatype" "github.com/grafana/loki/v3/pkg/engine/internal/errors" "github.com/grafana/loki/v3/pkg/engine/internal/types" "github.com/grafana/loki/v3/pkg/engine/planner/physical" @@ -19,15 +20,28 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record) case *physical.LiteralExpr: return &Scalar{ - value: expr.Value, + value: expr.Literal, rows: input.NumRows(), + ct: types.ColumnTypeAmbiguous, }, nil case *physical.ColumnExpr: + schema := input.Schema() for i := range input.NumCols() { + md := schema.Field(int(i)).Metadata if input.ColumnName(int(i)) == expr.Ref.Column { + dt, ok := md.GetValue(types.MetadataKeyColumnDataType) + if !ok { + continue + } + ct, ok := md.GetValue(types.MetadataKeyColumnType) + if !ok { + ct = types.ColumnTypeAmbiguous.String() + } return &Array{ array: input.Column(int(i)), + dt: datatype.FromString(dt), + ct: types.ColumnTypeFromString(ct), rows: input.NumRows(), }, nil } @@ -40,7 +54,7 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record) return nil, err } - fn, err := unaryFunctions.GetForSignature(expr.Op, lhr.Type()) + fn, err := unaryFunctions.GetForSignature(expr.Op, lhr.Type().ArrowType()) if err != nil { return nil, fmt.Errorf("failed to lookup unary function: %w", err) } @@ -57,13 +71,13 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record) } // At the moment we only support functions that accept the same input types. - if lhs.Type().ID() != rhs.Type().ID() { - return nil, fmt.Errorf("failed to lookup binary function for signature %v(%v,%v): types do not match", expr.Op, lhs.Type(), rhs.Type()) + if lhs.Type().ArrowType().ID() != rhs.Type().ArrowType().ID() { + return nil, fmt.Errorf("failed to lookup binary function for signature %v(%v,%v): types do not match", expr.Op, lhs.Type().ArrowType(), rhs.Type().ArrowType()) } - fn, err := binaryFunctions.GetForSignature(expr.Op, lhs.Type()) + fn, err := binaryFunctions.GetForSignature(expr.Op, lhs.Type().ArrowType()) if err != nil { - return nil, fmt.Errorf("failed to lookup binary function for signature %v(%v,%v): %w", expr.Op, lhs.Type(), rhs.Type(), err) + return nil, fmt.Errorf("failed to lookup binary function for signature %v(%v,%v): %w", expr.Op, lhs.Type().ArrowType(), rhs.Type().ArrowType(), err) } return fn.Evaluate(lhs, rhs) } @@ -86,16 +100,19 @@ type ColumnVector interface { ToArray() arrow.Array // Value returns the value at the specified index position in the column vector. Value(i int) any - // Type returns the Arrow data type of the column vector. - Type() arrow.DataType + // Type returns the Loki data type of the column vector. + Type() datatype.DataType + // ColumnType returns the type of column the vector originates from. + ColumnType() types.ColumnType // Len returns the length of the vector Len() int64 } // Scalar represents a single value repeated any number of times. type Scalar struct { - value types.Literal + value datatype.Literal rows int64 + ct types.ColumnType } var _ ColumnVector = (*Scalar)(nil) @@ -103,60 +120,63 @@ var _ ColumnVector = (*Scalar)(nil) // ToArray implements ColumnVector. func (v *Scalar) ToArray() arrow.Array { mem := memory.NewGoAllocator() - builder := array.NewBuilder(mem, v.Type()) + builder := array.NewBuilder(mem, v.Type().ArrowType()) defer builder.Release() - for i := int64(0); i < v.rows; i++ { - switch v.value.ValueType() { - case types.ValueTypeBool: - builder.(*array.BooleanBuilder).Append(v.value.Value.(bool)) - case types.ValueTypeStr: - builder.(*array.StringBuilder).Append(v.value.Value.(string)) - case types.ValueTypeInt: - builder.(*array.Int64Builder).Append(v.value.Value.(int64)) - case types.ValueTypeFloat: - builder.(*array.Float64Builder).Append(v.value.Value.(float64)) - case types.ValueTypeTimestamp: - builder.(*array.Uint64Builder).Append(v.value.Value.(uint64)) - default: + switch builder := builder.(type) { + case *array.NullBuilder: + for range v.rows { builder.AppendNull() } + case *array.BooleanBuilder: + value := v.value.Any().(bool) + for range v.rows { + builder.Append(value) + } + case *array.StringBuilder: + value := v.value.Any().(string) + for range v.rows { + builder.Append(value) + } + case *array.Int64Builder: + value := v.value.Any().(int64) + for range v.rows { + builder.Append(value) + } + case *array.Float64Builder: + value := v.value.Any().(float64) + for range v.rows { + builder.Append(value) + } } - return builder.NewArray() } // Value implements ColumnVector. func (v *Scalar) Value(_ int) any { - return v.value.Value + return v.value.Any() } // Type implements ColumnVector. -func (v Scalar) Type() arrow.DataType { - switch v.value.ValueType() { - case types.ValueTypeBool: - return arrow.FixedWidthTypes.Boolean - case types.ValueTypeStr: - return arrow.BinaryTypes.String - case types.ValueTypeInt: - return arrow.PrimitiveTypes.Int64 - case types.ValueTypeFloat: - return arrow.PrimitiveTypes.Float64 - case types.ValueTypeTimestamp: - return arrow.PrimitiveTypes.Uint64 - default: - return arrow.Null - } +func (v *Scalar) Type() datatype.DataType { + return v.value.Type() +} + +// ColumnType implements ColumnVector. +func (v *Scalar) ColumnType() types.ColumnType { + return v.ct } // Len implements ColumnVector. -func (v Scalar) Len() int64 { +func (v *Scalar) Len() int64 { return v.rows } // Array represents a column of data, stored as an [arrow.Array]. type Array struct { array arrow.Array + dt datatype.DataType + ct types.ColumnType rows int64 } @@ -190,8 +210,13 @@ func (a *Array) Value(i int) any { } // Type implements ColumnVector. -func (a *Array) Type() arrow.DataType { - return a.array.DataType() +func (a *Array) Type() datatype.DataType { + return a.dt +} + +// ColumnType implements ColumnVector. +func (a *Array) ColumnType() types.ColumnType { + return a.ct } // Len implements ColumnVector. diff --git a/pkg/engine/executor/expressions_test.go b/pkg/engine/executor/expressions_test.go index 8e7ea511f0..fbb60b4604 100644 --- a/pkg/engine/executor/expressions_test.go +++ b/pkg/engine/executor/expressions_test.go @@ -9,6 +9,7 @@ import ( "github.com/apache/arrow-go/v18/arrow/memory" "github.com/stretchr/testify/require" + "github.com/grafana/loki/v3/pkg/engine/internal/datatype" "github.com/grafana/loki/v3/pkg/engine/internal/errors" "github.com/grafana/loki/v3/pkg/engine/internal/types" "github.com/grafana/loki/v3/pkg/engine/planner/physical" @@ -16,10 +17,10 @@ import ( var ( fields = []arrow.Field{ - {Name: "name", Type: arrow.BinaryTypes.String}, - {Name: "timestamp", Type: arrow.PrimitiveTypes.Uint64}, - {Name: "value", Type: arrow.PrimitiveTypes.Float64}, - {Name: "valid", Type: arrow.FixedWidthTypes.Boolean}, + {Name: "name", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.String)}, + {Name: "timestamp", Type: arrow.PrimitiveTypes.Int64, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Timestamp)}, + {Name: "value", Type: arrow.PrimitiveTypes.Float64, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Float)}, + {Name: "valid", Type: arrow.FixedWidthTypes.Boolean, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Bool)}, } sampledata = `Alice,1745487598764058205,0.2586284611568047,false Bob,1745487598764058305,0.7823145698741236,true @@ -61,13 +62,23 @@ func TestEvaluateLiteralExpression(t *testing.T) { }, { name: "float", - value: float64(123.456789), + value: 123.456789, arrowType: arrow.FLOAT64, }, { name: "timestamp", - value: uint64(1744612881740032450), - arrowType: arrow.UINT64, + value: time.Unix(3600, 0).UTC(), + arrowType: arrow.INT64, + }, + { + name: "duration", + value: time.Hour, + arrowType: arrow.INT64, + }, + { + name: "bytes", + value: int64(1024), + arrowType: arrow.INT64, }, } { t.Run(tt.name, func(t *testing.T) { @@ -78,7 +89,7 @@ func TestEvaluateLiteralExpression(t *testing.T) { rec := batch(n, time.Now()) colVec, err := e.eval(literal, rec) require.NoError(t, err) - require.Equalf(t, tt.arrowType, colVec.Type().ID(), "expected: %v got: %v", tt.arrowType.String(), colVec.Type().ID().String()) + require.Equalf(t, tt.arrowType, colVec.Type().ArrowType().ID(), "expected: %v got: %v", tt.arrowType.String(), colVec.Type().ArrowType().ID().String()) for i := range n { val := colVec.Value(i) @@ -105,10 +116,10 @@ func TestEvaluateColumnExpression(t *testing.T) { require.ErrorContains(t, err, errors.ErrKey.Error()) }) - t.Run("string(log)", func(t *testing.T) { + t.Run("string(line)", func(t *testing.T) { colExpr := &physical.ColumnExpr{ Ref: types.ColumnRef{ - Column: "log", + Column: "line", Type: types.ColumnTypeBuiltin, }, } @@ -117,7 +128,7 @@ func TestEvaluateColumnExpression(t *testing.T) { rec := batch(n, time.Now()) colVec, err := e.eval(colExpr, rec) require.NoError(t, err) - require.Equal(t, arrow.STRING, colVec.Type().ID()) + require.Equal(t, arrow.STRING, colVec.Type().ArrowType().ID()) for i := range n { val := colVec.Value(i) @@ -145,7 +156,7 @@ func TestEvaluateBinaryExpression(t *testing.T) { } _, err := e.eval(expr, rec) - require.ErrorContains(t, err, "failed to lookup binary function for signature EQ(utf8,uint64): types do not match") + require.ErrorContains(t, err, "failed to lookup binary function for signature EQ(utf8,int64): types do not match") }) t.Run("error if function for signature is not registered", func(t *testing.T) { @@ -156,11 +167,11 @@ func TestEvaluateBinaryExpression(t *testing.T) { Right: &physical.ColumnExpr{ Ref: types.ColumnRef{Column: "name", Type: types.ColumnTypeBuiltin}, }, - Op: types.BinaryOpInvalid, + Op: types.BinaryOpXor, } _, err := e.eval(expr, rec) - require.ErrorContains(t, err, "failed to lookup binary function for signature invalid(utf8,utf8): not implemented") + require.ErrorContains(t, err, "failed to lookup binary function for signature XOR(utf8,utf8): not implemented") }) t.Run("EQ(string,string)", func(t *testing.T) { @@ -168,10 +179,8 @@ func TestEvaluateBinaryExpression(t *testing.T) { Left: &physical.ColumnExpr{ Ref: types.ColumnRef{Column: "name", Type: types.ColumnTypeBuiltin}, }, - Right: &physical.LiteralExpr{ - Value: types.Literal{Value: "Charlie"}, - }, - Op: types.BinaryOpEq, + Right: physical.NewLiteral("Charlie"), + Op: types.BinaryOpEq, } res, err := e.eval(expr, rec) @@ -185,10 +194,8 @@ func TestEvaluateBinaryExpression(t *testing.T) { Left: &physical.ColumnExpr{ Ref: types.ColumnRef{Column: "value", Type: types.ColumnTypeBuiltin}, }, - Right: &physical.LiteralExpr{ - Value: types.Literal{Value: 0.5}, - }, - Op: types.BinaryOpGt, + Right: physical.NewLiteral(0.5), + Op: types.BinaryOpGt, } res, err := e.eval(expr, rec) @@ -216,8 +223,8 @@ func batch(n int, now time.Time) arrow.Record { // 2. Define the schema schema := arrow.NewSchema( []arrow.Field{ - {Name: "log", Type: arrow.BinaryTypes.String}, - {Name: "timestamp", Type: arrow.PrimitiveTypes.Uint64}, + {Name: "line", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadataBuiltinLine}, + {Name: "timestamp", Type: arrow.PrimitiveTypes.Uint64, Metadata: datatype.ColumnMetadataBuiltinTimestamp}, }, nil, // No metadata ) diff --git a/pkg/engine/executor/filter_test.go b/pkg/engine/executor/filter_test.go index 6bcb22be3b..5f64aa2699 100644 --- a/pkg/engine/executor/filter_test.go +++ b/pkg/engine/executor/filter_test.go @@ -8,14 +8,15 @@ import ( "github.com/apache/arrow-go/v18/arrow/memory" "github.com/stretchr/testify/require" + "github.com/grafana/loki/v3/pkg/engine/internal/datatype" "github.com/grafana/loki/v3/pkg/engine/internal/types" "github.com/grafana/loki/v3/pkg/engine/planner/physical" ) func TestNewFilterPipeline(t *testing.T) { fields := []arrow.Field{ - {Name: "name", Type: arrow.BinaryTypes.String}, - {Name: "valid", Type: arrow.FixedWidthTypes.Boolean}, + {Name: "name", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.String)}, + {Name: "valid", Type: arrow.FixedWidthTypes.Boolean, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Bool)}, } t.Run("filter with true literal predicate", func(t *testing.T) { @@ -29,9 +30,7 @@ func TestNewFilterPipeline(t *testing.T) { inputPipeline := NewBufferedPipeline(inputRecord) // Create a filter predicate that's always true - truePredicate := &physical.LiteralExpr{ - Value: createLiteral(true), - } + truePredicate := physical.NewLiteral(true) // Create a Filter node filter := &physical.Filter{ @@ -63,9 +62,7 @@ func TestNewFilterPipeline(t *testing.T) { inputPipeline := NewBufferedPipeline(inputRecord) // Create a filter predicate that's always false - falsePredicate := &physical.LiteralExpr{ - Value: createLiteral(false), - } + falsePredicate := physical.NewLiteral(false) // Create a Filter node filter := &physical.Filter{ @@ -149,12 +146,12 @@ func TestNewFilterPipeline(t *testing.T) { Predicates: []physical.Expression{ &physical.BinaryExpr{ Left: &physical.ColumnExpr{Ref: createColumnRef("name")}, - Right: &physical.LiteralExpr{Value: createLiteral("Bob")}, + Right: physical.NewLiteral("Bob"), Op: types.BinaryOpEq, }, &physical.BinaryExpr{ Left: &physical.ColumnExpr{Ref: createColumnRef("valid")}, - Right: &physical.LiteralExpr{Value: createLiteral(false)}, + Right: physical.NewLiteral(false), Op: types.BinaryOpNeq, }, }, @@ -198,9 +195,7 @@ func TestNewFilterPipeline(t *testing.T) { inputPipeline := NewBufferedPipeline(emptyRecord) // Create a simple filter - truePredicate := &physical.LiteralExpr{ - Value: createLiteral(true), - } + truePredicate := physical.NewLiteral(true) // Create a Filter node filter := &physical.Filter{ diff --git a/pkg/engine/executor/project.go b/pkg/engine/executor/project.go index be4660a0b9..58ac925db3 100644 --- a/pkg/engine/executor/project.go +++ b/pkg/engine/executor/project.go @@ -6,12 +6,14 @@ import ( "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" + "github.com/grafana/loki/v3/pkg/engine/internal/datatype" "github.com/grafana/loki/v3/pkg/engine/planner/physical" ) func NewProjectPipeline(input Pipeline, columns []physical.ColumnExpression, evaluator *expressionEvaluator) (*GenericPipeline, error) { // Get the column names from the projection expressions columnNames := make([]string, len(columns)) + for i, col := range columns { if colExpr, ok := col.(*physical.ColumnExpr); ok { columnNames[i] = colExpr.Ref.Column @@ -41,7 +43,7 @@ func NewProjectPipeline(input Pipeline, columns []physical.ColumnExpression, eva if err != nil { return failureState(err) } - fields = append(fields, arrow.Field{Name: columnNames[i], Type: vec.Type()}) + fields = append(fields, arrow.Field{Name: columnNames[i], Type: vec.Type().ArrowType(), Metadata: datatype.ColumnMetadata(vec.ColumnType(), vec.Type())}) projected = append(projected, vec.ToArray()) } diff --git a/pkg/engine/executor/project_test.go b/pkg/engine/executor/project_test.go index 9f1bcec0a0..c3e48454ec 100644 --- a/pkg/engine/executor/project_test.go +++ b/pkg/engine/executor/project_test.go @@ -6,15 +6,16 @@ import ( "github.com/apache/arrow-go/v18/arrow" "github.com/stretchr/testify/require" + "github.com/grafana/loki/v3/pkg/engine/internal/datatype" "github.com/grafana/loki/v3/pkg/engine/internal/types" "github.com/grafana/loki/v3/pkg/engine/planner/physical" ) func TestNewProjectPipeline(t *testing.T) { fields := []arrow.Field{ - {Name: "name", Type: arrow.BinaryTypes.String}, - {Name: "age", Type: arrow.PrimitiveTypes.Int32}, - {Name: "city", Type: arrow.BinaryTypes.String}, + {Name: "name", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.String)}, + {Name: "age", Type: arrow.PrimitiveTypes.Int64, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Integer)}, + {Name: "city", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.String)}, } t.Run("project single column", func(t *testing.T) { @@ -41,7 +42,7 @@ func TestNewProjectPipeline(t *testing.T) { // Create expected output expectedCSV := "Alice\nBob\nCharlie" expectedFields := []arrow.Field{ - {Name: "name", Type: arrow.BinaryTypes.String}, + {Name: "name", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.String)}, } expectedRecord, err := CSVToArrow(expectedFields, expectedCSV) require.NoError(t, err) @@ -80,8 +81,8 @@ func TestNewProjectPipeline(t *testing.T) { // Create expected output expectedCSV := "Alice,New York\nBob,Boston\nCharlie,Seattle" expectedFields := []arrow.Field{ - {Name: "name", Type: arrow.BinaryTypes.String}, - {Name: "city", Type: arrow.BinaryTypes.String}, + {Name: "name", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.String)}, + {Name: "city", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.String)}, } expectedRecord, err := CSVToArrow(expectedFields, expectedCSV) require.NoError(t, err) @@ -123,9 +124,9 @@ func TestNewProjectPipeline(t *testing.T) { // Create expected output expectedCSV := "New York,30,Alice\nBoston,25,Bob\nSeattle,35,Charlie" expectedFields := []arrow.Field{ - {Name: "city", Type: arrow.BinaryTypes.String}, - {Name: "age", Type: arrow.PrimitiveTypes.Int32}, - {Name: "name", Type: arrow.BinaryTypes.String}, + {Name: "city", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.String)}, + {Name: "age", Type: arrow.PrimitiveTypes.Int64, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Integer)}, + {Name: "name", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.String)}, } expectedRecord, err := CSVToArrow(expectedFields, expectedCSV) require.NoError(t, err) @@ -169,8 +170,8 @@ func TestNewProjectPipeline(t *testing.T) { // Create expected output also split across multiple records expectedFields := []arrow.Field{ - {Name: "name", Type: arrow.BinaryTypes.String}, - {Name: "age", Type: arrow.PrimitiveTypes.Int32}, + {Name: "name", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.String)}, + {Name: "age", Type: arrow.PrimitiveTypes.Int64, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Integer)}, } expected := ` @@ -199,8 +200,3 @@ func createColumnRef(name string) types.ColumnRef { Type: types.ColumnTypeBuiltin, } } - -// Helper to create a literal value -func createLiteral(val any) types.Literal { - return types.Literal{Value: val} -} diff --git a/pkg/engine/executor/sortmerge.go b/pkg/engine/executor/sortmerge.go index 1ec1ec5230..620b5eccff 100644 --- a/pkg/engine/executor/sortmerge.go +++ b/pkg/engine/executor/sortmerge.go @@ -13,12 +13,12 @@ import ( // NewSortMergePipeline returns a new pipeline that merges already sorted inputs into a single output. func NewSortMergePipeline(inputs []Pipeline, order physical.SortOrder, column physical.ColumnExpression, evaluator expressionEvaluator) (*KWayMerge, error) { - var compare func(a, b uint64) bool + var compare func(a, b int64) bool switch order { case physical.ASC: - compare = func(a, b uint64) bool { return a <= b } + compare = func(a, b int64) bool { return a <= b } case physical.DESC: - compare = func(a, b uint64) bool { return a >= b } + compare = func(a, b int64) bool { return a >= b } default: return nil, fmt.Errorf("invalid sort order %v", order) } @@ -42,7 +42,7 @@ type KWayMerge struct { exhausted []bool offsets []int64 columnEval evalFunc - compare func(a, b uint64) bool + compare func(a, b int64) bool } var _ Pipeline = (*KWayMerge)(nil) @@ -92,7 +92,7 @@ func (p *KWayMerge) init() { p.offsets = make([]int64, n) if p.compare == nil { - p.compare = func(a, b uint64) bool { return a <= b } + p.compare = func(a, b int64) bool { return a <= b } } } @@ -106,7 +106,7 @@ func (p *KWayMerge) read() error { p.state.batch.Release() } - timestamps := make([]uint64, 0, len(p.inputs)) + timestamps := make([]int64, 0, len(p.inputs)) batchIndexes := make([]int, 0, len(p.inputs)) for i := range len(p.inputs) { @@ -136,7 +136,7 @@ func (p *KWayMerge) read() error { if err != nil { return err } - tsCol, ok := col.ToArray().(*array.Uint64) + tsCol, ok := col.ToArray().(*array.Int64) if !ok { return errors.New("column is not a timestamp column") } @@ -182,7 +182,7 @@ func (p *KWayMerge) read() error { return err } // We assume the column is a Uint64 array - tsCol, ok := col.ToArray().(*array.Uint64) + tsCol, ok := col.ToArray().(*array.Int64) if !ok { return errors.New("column is not a timestamp column") } diff --git a/pkg/engine/executor/sortmerge_test.go b/pkg/engine/executor/sortmerge_test.go index 80f30eb3ff..8ee15f60fe 100644 --- a/pkg/engine/executor/sortmerge_test.go +++ b/pkg/engine/executor/sortmerge_test.go @@ -63,7 +63,7 @@ func TestSortMerge(t *testing.T) { pipeline, err := NewSortMergePipeline(inputs, merge.Order, merge.Column, expressionEvaluator{}) require.NoError(t, err) - var lastTs uint64 + var lastTs int64 var batches, rows int64 for { err := pipeline.Read() @@ -77,7 +77,7 @@ func TestSortMerge(t *testing.T) { tsCol, err := c.evaluator.eval(merge.Column, batch) require.NoError(t, err) - arr := tsCol.ToArray().(*array.Uint64) + arr := tsCol.ToArray().(*array.Int64) // Check if ts column is sorted for i := 0; i < arr.Len()-1; i++ { @@ -115,7 +115,7 @@ func TestSortMerge(t *testing.T) { pipeline, err := NewSortMergePipeline(inputs, merge.Order, merge.Column, expressionEvaluator{}) require.NoError(t, err) - var lastTs uint64 = math.MaxUint64 + var lastTs int64 = math.MaxInt64 var batches, rows int64 for { err := pipeline.Read() @@ -129,7 +129,7 @@ func TestSortMerge(t *testing.T) { tsCol, err := c.evaluator.eval(merge.Column, batch) require.NoError(t, err) - arr := tsCol.ToArray().(*array.Uint64) + arr := tsCol.ToArray().(*array.Int64) // Check if ts column is sorted for i := 0; i < arr.Len()-1; i++ { diff --git a/pkg/engine/executor/util_test.go b/pkg/engine/executor/util_test.go index c167383494..25e45e1579 100644 --- a/pkg/engine/executor/util_test.go +++ b/pkg/engine/executor/util_test.go @@ -7,12 +7,15 @@ import ( "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/arrow-go/v18/arrow/memory" + + "github.com/grafana/loki/v3/pkg/engine/internal/datatype" + "github.com/grafana/loki/v3/pkg/engine/internal/types" ) var ( incrementingIntPipeline = newRecordGenerator( arrow.NewSchema([]arrow.Field{ - {Name: "id", Type: arrow.PrimitiveTypes.Int64}, + {Name: "id", Type: arrow.PrimitiveTypes.Int64, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Integer)}, }, nil), func(offset, sz int64, schema *arrow.Schema) arrow.Record { @@ -48,20 +51,20 @@ const ( func timestampPipeline(start time.Time, order time.Duration) *recordGenerator { return newRecordGenerator( arrow.NewSchema([]arrow.Field{ - {Name: "id", Type: arrow.PrimitiveTypes.Int64}, - {Name: "timestamp", Type: arrow.PrimitiveTypes.Uint64}, + {Name: "id", Type: arrow.PrimitiveTypes.Int64, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Integer)}, + {Name: "timestamp", Type: arrow.PrimitiveTypes.Int64, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Timestamp)}, }, nil), func(offset, sz int64, schema *arrow.Schema) arrow.Record { idColBuilder := array.NewInt64Builder(memory.DefaultAllocator) defer idColBuilder.Release() - tsColBuilder := array.NewUint64Builder(memory.DefaultAllocator) + tsColBuilder := array.NewInt64Builder(memory.DefaultAllocator) defer tsColBuilder.Release() for i := int64(0); i < sz; i++ { idColBuilder.Append(offset + i) - tsColBuilder.Append(uint64(start.Add(order * (time.Duration(offset)*time.Second + time.Duration(i)*time.Millisecond)).UnixNano())) + tsColBuilder.Append(start.Add(order * (time.Duration(offset)*time.Second + time.Duration(i)*time.Millisecond)).UnixNano()) } idData := idColBuilder.NewArray() diff --git a/pkg/engine/internal/datatype/arrow.go b/pkg/engine/internal/datatype/arrow.go new file mode 100644 index 0000000000..dd561745fa --- /dev/null +++ b/pkg/engine/internal/datatype/arrow.go @@ -0,0 +1,67 @@ +package datatype + +import "github.com/apache/arrow-go/v18/arrow" + +var ( + LokiType = struct { + Null DataType + Bool DataType + String DataType + Integer DataType + Float DataType + Timestamp DataType + Duration DataType + Bytes DataType + }{ + Null: Null, + Bool: Bool, + String: String, + Integer: Integer, + Float: Float, + Timestamp: Timestamp, + Duration: Duration, + Bytes: Bytes, + } + + ArrowType = struct { + Null arrow.DataType + Bool arrow.DataType + String arrow.DataType + Integer arrow.DataType + Float arrow.DataType + Timestamp arrow.DataType + Duration arrow.DataType + Bytes arrow.DataType + }{ + Null: arrow.Null, + Bool: arrow.FixedWidthTypes.Boolean, + String: arrow.BinaryTypes.String, + Integer: arrow.PrimitiveTypes.Int64, + Float: arrow.PrimitiveTypes.Float64, + Timestamp: arrow.PrimitiveTypes.Int64, + Duration: arrow.PrimitiveTypes.Int64, + Bytes: arrow.PrimitiveTypes.Int64, + } + + ToArrow = map[DataType]arrow.DataType{ + Null: ArrowType.Null, + Bool: ArrowType.Bool, + String: ArrowType.String, + Integer: ArrowType.Integer, + Float: ArrowType.Float, + Timestamp: ArrowType.Timestamp, + Duration: ArrowType.Duration, + Bytes: ArrowType.Bytes, + } + + ToLoki = map[arrow.DataType]DataType{ + ArrowType.Null: Null, + ArrowType.Bool: Bool, + ArrowType.String: String, + ArrowType.Integer: Integer, + ArrowType.Float: Float, + ArrowType.Timestamp: Timestamp, + ArrowType.Duration: Duration, + ArrowType.Bytes: Bytes, + } +) diff --git a/pkg/engine/internal/datatype/literal.go b/pkg/engine/internal/datatype/literal.go new file mode 100644 index 0000000000..2d5c80dbd9 --- /dev/null +++ b/pkg/engine/internal/datatype/literal.go @@ -0,0 +1,240 @@ +package datatype + +import ( + "fmt" + "strconv" + "time" +) + +type NullLiteral struct { +} + +// String implements Literal. +func (n *NullLiteral) String() string { + return "null" +} + +// Type implements Literal. +func (n *NullLiteral) Type() DataType { + return Null +} + +// Any implements Literal. +func (n *NullLiteral) Any() any { + return nil +} + +func (n *NullLiteral) Value() any { + return nil +} + +type BoolLiteral struct { + v bool +} + +// String implements Literal. +func (b *BoolLiteral) String() string { + return strconv.FormatBool(b.v) +} + +// Type implements Literal. +func (b *BoolLiteral) Type() DataType { + return Bool +} + +// Any implements Literal. +func (b *BoolLiteral) Any() any { + return b.v +} + +func (b *BoolLiteral) Value() bool { + return b.v +} + +type StringLiteral struct { + v string +} + +// String implements Literal. +func (s *StringLiteral) String() string { + return fmt.Sprintf(`"%s"`, s.v) +} + +// Type implements Literal. +func (s *StringLiteral) Type() DataType { + return String +} + +// Any implements Literal. +func (s *StringLiteral) Any() any { + return s.v +} + +func (s *StringLiteral) Value() string { + return s.v +} + +type IntegerLiteral struct { + v int64 +} + +// String implements Literal. +func (i *IntegerLiteral) String() string { + return strconv.FormatInt(i.v, 10) +} + +// Type implements Literal. +func (i *IntegerLiteral) Type() DataType { + return Integer +} + +// Any implements Literal. +func (i *IntegerLiteral) Any() any { + return i.v +} + +func (i *IntegerLiteral) Value() int64 { + return i.v +} + +type FloatLiteral struct { + v float64 +} + +// String implements Literal. +func (f *FloatLiteral) String() string { + return strconv.FormatFloat(f.v, 'f', -1, 64) +} + +// Type implements Literal. +func (f *FloatLiteral) Type() DataType { + return Float +} + +// Any implements Literal. +func (f *FloatLiteral) Any() any { + return f.v +} + +func (f *FloatLiteral) Value() float64 { + return f.v +} + +type TimestampLiteral struct { + v time.Time +} + +// String implements Literal. +func (t *TimestampLiteral) String() string { + return t.v.UTC().Format(time.RFC3339Nano) +} + +// Type implements Literal. +func (t *TimestampLiteral) Type() DataType { + return Timestamp +} + +// Any implements Literal. +func (t *TimestampLiteral) Any() any { + return t.v.UTC() +} + +func (t *TimestampLiteral) Value() time.Time { + return t.v +} + +type DurationLiteral struct { + v time.Duration +} + +// String implements Literal. +func (d *DurationLiteral) String() string { + return d.v.String() +} + +// Type implements Literal. +func (d *DurationLiteral) Type() DataType { + return Duration +} + +// Any implements Literal. +func (d *DurationLiteral) Any() any { + return d.v +} + +func (d *DurationLiteral) Value() time.Duration { + return d.v +} + +type BytesLiteral struct { + v int64 +} + +// String implements Literal. +func (b *BytesLiteral) String() string { + return fmt.Sprintf("%dB", b.v) +} + +// Type implements Literal. +func (b *BytesLiteral) Type() DataType { + return Bytes +} + +// Any implements Literal. +func (b *BytesLiteral) Any() any { + return b.v +} + +func (b *BytesLiteral) Value() int64 { + return b.v +} + +// Literal is holds a value of [any] typed as [DataType]. +type Literal interface { + fmt.Stringer + Any() any + Type() DataType +} + +var ( + _ Literal = (*NullLiteral)(nil) + _ Literal = (*BoolLiteral)(nil) + _ Literal = (*StringLiteral)(nil) + _ Literal = (*IntegerLiteral)(nil) + _ Literal = (*FloatLiteral)(nil) + _ Literal = (*TimestampLiteral)(nil) + _ Literal = (*DurationLiteral)(nil) + _ Literal = (*BytesLiteral)(nil) +) + +func NewNullLiteral() *NullLiteral { + return &NullLiteral{} +} + +func NewBoolLiteral(v bool) *BoolLiteral { + return &BoolLiteral{v: v} +} + +func NewStringLiteral(v string) *StringLiteral { + return &StringLiteral{v: v} +} + +func NewIntegerLiteral(v int64) *IntegerLiteral { + return &IntegerLiteral{v: v} +} + +func NewFloatLiteral(v float64) *FloatLiteral { + return &FloatLiteral{v: v} +} + +func NewTimestampLiteral(v time.Time) *TimestampLiteral { + return &TimestampLiteral{v: v} +} + +func NewDurationLiteral(v time.Duration) *DurationLiteral { + return &DurationLiteral{v: v} +} + +func NewBytesLiteral(v int64) *BytesLiteral { + return &BytesLiteral{v: v} +} diff --git a/pkg/engine/internal/datatype/types.go b/pkg/engine/internal/datatype/types.go new file mode 100644 index 0000000000..c9c6170dd4 --- /dev/null +++ b/pkg/engine/internal/datatype/types.go @@ -0,0 +1,120 @@ +package datatype + +import ( + "fmt" + + "github.com/apache/arrow-go/v18/arrow" +) + +type Type uint8 + +const ( + NULL = Type(arrow.NULL) + BOOL = Type(arrow.BOOL) + STRING = Type(arrow.STRING) + INT64 = Type(arrow.INT64) + FLOAT64 = Type(arrow.FLOAT64) +) + +func (t Type) String() string { + switch t { + case NULL: + return "NULL" + case BOOL: + return "BOOL" + case STRING: + return "STRING" + case INT64: + return "INT64" + case FLOAT64: + return "FLOAT64" + default: + return "INVALID" + } +} + +type DataType interface { + fmt.Stringer + ID() Type + ArrowType() arrow.DataType +} + +var ( + Null DataType = tNull{} + Bool DataType = tBool{} + String DataType = tString{} + Integer DataType = tInteger{} + Float DataType = tFloat{} + Timestamp DataType = tTimestamp{} + Duration DataType = tDuration{} + Bytes DataType = tBytes{} +) + +type tNull struct{} + +func (tNull) ID() Type { return NULL } +func (tNull) String() string { return "null" } +func (tNull) ArrowType() arrow.DataType { return ArrowType.Null } + +type tBool struct{} + +func (tBool) ID() Type { return BOOL } +func (tBool) String() string { return "bool" } +func (tBool) ArrowType() arrow.DataType { return ArrowType.Bool } + +type tString struct{} + +func (tString) ID() Type { return STRING } +func (tString) String() string { return "string" } +func (tString) ArrowType() arrow.DataType { return ArrowType.String } + +type tInteger struct{} + +func (tInteger) ID() Type { return INT64 } +func (tInteger) String() string { return "integer" } +func (tInteger) ArrowType() arrow.DataType { return ArrowType.Integer } + +type tFloat struct{} + +func (tFloat) ID() Type { return FLOAT64 } +func (tFloat) String() string { return "float" } +func (tFloat) ArrowType() arrow.DataType { return ArrowType.Float } + +type tTimestamp struct{} + +func (tTimestamp) ID() Type { return INT64 } +func (tTimestamp) String() string { return "timestamp" } +func (tTimestamp) ArrowType() arrow.DataType { return ArrowType.Integer } + +type tDuration struct{} + +func (tDuration) ID() Type { return INT64 } +func (tDuration) String() string { return "duration" } +func (tDuration) ArrowType() arrow.DataType { return ArrowType.Integer } + +type tBytes struct{} + +func (tBytes) ID() Type { return INT64 } +func (tBytes) String() string { return "bytes" } +func (tBytes) ArrowType() arrow.DataType { return ArrowType.Integer } + +var ( + names = map[string]DataType{ + Null.String(): Null, + Bool.String(): Bool, + String.String(): String, + Integer.String(): Integer, + Float.String(): Float, + Timestamp.String(): Timestamp, + Duration.String(): Duration, + Bytes.String(): Bytes, + } +) + +func FromString(dt string) DataType { + ty, ok := names[dt] + if !ok { + panic("invalid data type name") + } + return ty +} diff --git a/pkg/engine/internal/datatype/util.go b/pkg/engine/internal/datatype/util.go new file mode 100644 index 0000000000..9f8de0ecb1 --- /dev/null +++ b/pkg/engine/internal/datatype/util.go @@ -0,0 +1,19 @@ +package datatype + +import ( + "github.com/apache/arrow-go/v18/arrow" + + "github.com/grafana/loki/v3/pkg/engine/internal/types" +) + +var ( + ColumnMetadataBuiltinLine = ColumnMetadata(types.ColumnTypeBuiltin, String) + ColumnMetadataBuiltinTimestamp = ColumnMetadata(types.ColumnTypeBuiltin, Timestamp) +) + +func ColumnMetadata(ct types.ColumnType, dt DataType) arrow.Metadata { + return arrow.NewMetadata( + []string{types.MetadataKeyColumnType, types.MetadataKeyColumnDataType}, + []string{ct.String(), dt.String()}, + ) +} diff --git a/pkg/engine/internal/types/column.go b/pkg/engine/internal/types/column.go index 285d4abd7d..f77a9e6f6f 100644 --- a/pkg/engine/internal/types/column.go +++ b/pkg/engine/internal/types/column.go @@ -1,6 +1,8 @@ package types -import "fmt" +import ( + "fmt" +) // ColumnType denotes the column type for a [ColumnRef]. type ColumnType int @@ -21,26 +23,32 @@ const ( const ( ColumnNameBuiltinTimestamp = "timestamp" ColumnNameBuiltinLine = "line" - ColumnTypeMetadataKey = "column_type" + MetadataKeyColumnType = "column_type" + MetadataKeyColumnDataType = "column_datatype" ) +var ctNames = [6]string{"invalid", "builtin", "label", "metadata", "parsed", "ambiguous"} + // String returns a human-readable representation of the column type. func (ct ColumnType) String() string { + return ctNames[ct] +} + +// ColumnTypeFromString returns the [ColumnType] from its string representation. +func ColumnTypeFromString(ct string) ColumnType { switch ct { - case ColumnTypeInvalid: - return typeInvalid - case ColumnTypeBuiltin: - return "builtin" - case ColumnTypeLabel: - return "label" - case ColumnTypeMetadata: - return "metadata" - case ColumnTypeParsed: - return "parsed" - case ColumnTypeAmbiguous: - return "ambiguous" + case ctNames[1]: + return ColumnTypeBuiltin + case ctNames[2]: + return ColumnTypeLabel + case ctNames[3]: + return ColumnTypeMetadata + case ctNames[4]: + return ColumnTypeParsed + case ctNames[5]: + return ColumnTypeAmbiguous default: - return fmt.Sprintf("ColumnType(%d)", ct) + panic(fmt.Sprintf("invalid column type: %s", ct)) } } diff --git a/pkg/engine/internal/types/literal.go b/pkg/engine/internal/types/literal.go deleted file mode 100644 index b0f58022e1..0000000000 --- a/pkg/engine/internal/types/literal.go +++ /dev/null @@ -1,135 +0,0 @@ -package types - -import ( - "fmt" - "strconv" -) - -// Literal is holds a value of [ValueType]. -type Literal struct { - Value any -} - -// String returns the string representation of the literal value. -func (l *Literal) String() string { - switch v := l.Value.(type) { - case nil: - return "NULL" - case bool: - return strconv.FormatBool(v) - case string: - return fmt.Sprintf(`"%s"`, v) - case float64: - return strconv.FormatFloat(v, 'f', -1, 64) - case int64: - return strconv.FormatInt(v, 10) - case uint64: - return strconv.FormatUint(v, 10) - case []byte: - return fmt.Sprintf("%v", v) - default: - return "invalid" - } -} - -// ValueType returns the kind of value represented by the literal. -func (l *Literal) ValueType() ValueType { - switch l.Value.(type) { - case nil: - return ValueTypeNull - case bool: - return ValueTypeBool - case string: - return ValueTypeStr - case float64: - return ValueTypeFloat - case int64: - return ValueTypeInt - case uint64: - return ValueTypeTimestamp - case []byte: - return ValueTypeByteArray - default: - return ValueTypeInvalid - } -} - -// IsNull returns true if lit is a [ValueTypeNull] value. -func (l Literal) IsNull() bool { - return l.ValueType() == ValueTypeNull -} - -// Str returns the value as a string. It panics if lit is not a [ValueTypeString]. -func (l Literal) Str() string { - if expect, actual := ValueTypeStr, l.ValueType(); expect != actual { - panic(fmt.Sprintf("literal type is %s, not %s", actual, expect)) - } - return l.Value.(string) -} - -// Int64 returns the value as an int64. It panics if lit is not a [ValueTypeFloat]. -func (l Literal) Float() float64 { - if expect, actual := ValueTypeFloat, l.ValueType(); expect != actual { - panic(fmt.Sprintf("literal type is %s, not %s", actual, expect)) - } - return l.Value.(float64) -} - -// Int returns the value as an int64. It panics if lit is not a [ValueTypeInt]. -func (l Literal) Int() int64 { - if expect, actual := ValueTypeInt, l.ValueType(); expect != actual { - panic(fmt.Sprintf("literal type is %s, not %s", actual, expect)) - } - return l.Value.(int64) -} - -// Timestamp returns the value as a uint64. It panics if lit is not a [ValueTypeTimestamp]. -func (l Literal) Timestamp() uint64 { - if expect, actual := ValueTypeTimestamp, l.ValueType(); expect != actual { - panic(fmt.Sprintf("literal type is %s, not %s", actual, expect)) - } - return l.Value.(uint64) -} - -// ByteArray returns the value as a byte slice. It panics if lit is not a [ValueTypeByteArray]. -func (l Literal) ByteArray() []byte { - if expect, actual := ValueTypeByteArray, l.ValueType(); expect != actual { - panic(fmt.Sprintf("literal type is %s, not %s", actual, expect)) - } - return l.Value.([]byte) -} - -// Convenience function for creating a NULL literal. -func NullLiteral() Literal { - return Literal{Value: nil} -} - -// Convenience function for creating a bool literal. -func BoolLiteral(v bool) Literal { - return Literal{Value: v} -} - -// Convenience function for creating a string literal. -func StringLiteral(v string) Literal { - return Literal{Value: v} -} - -// Convenience function for creating a float literal. -func FloatLiteral(v float64) Literal { - return Literal{Value: v} -} - -// Convenience function for creating a timestamp literal. -func TimestampLiteral(v uint64) Literal { - return Literal{Value: v} -} - -// Convenience function for creating an integer literal. -func IntLiteral(v int64) Literal { - return Literal{Value: v} -} - -// Convenience function for creating a byte array literal. -func ByteArrayLiteral(v []byte) Literal { - return Literal{Value: v} -} diff --git a/pkg/engine/internal/types/literal_test.go b/pkg/engine/internal/types/literal_test.go deleted file mode 100644 index d783a5f16f..0000000000 --- a/pkg/engine/internal/types/literal_test.go +++ /dev/null @@ -1,83 +0,0 @@ -package types - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestLiteralRepresentation(t *testing.T) { - testCases := []struct { - name string - literal Literal - expected string - }{ - { - name: "null literal", - literal: NullLiteral(), - expected: "NULL", - }, - { - name: "bool literal - true", - literal: BoolLiteral(true), - expected: "true", - }, - { - name: "bool literal - false", - literal: BoolLiteral(false), - expected: "false", - }, - { - name: "string literal", - literal: StringLiteral("test"), - expected: `"test"`, - }, - { - name: "string literal with quotes", - literal: StringLiteral(`test "quoted" string`), - expected: `"test "quoted" string"`, - }, - { - name: "float literal - integer value", - literal: FloatLiteral(42.0), - expected: "42", - }, - { - name: "float literal - decimal value", - literal: FloatLiteral(3.14159), - expected: "3.14159", - }, - { - name: "int literal - positive", - literal: IntLiteral(123), - expected: "123", - }, - { - name: "int literal - negative", - literal: IntLiteral(-456), - expected: "-456", - }, - { - name: "timestamp literal", - literal: TimestampLiteral(1625097600000), - expected: "1625097600000", - }, - { - name: "byte array literal", - literal: ByteArrayLiteral([]byte("hello")), - expected: "[104 101 108 108 111]", - }, - { - name: "invalid literal", - literal: Literal{Value: make(chan int)}, // channels are not supported types - expected: "invalid", - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - result := tc.literal.String() - require.Equal(t, tc.expected, result) - }) - } -} diff --git a/pkg/engine/internal/types/operators.go b/pkg/engine/internal/types/operators.go index c0d477c9de..5964a3eb0d 100644 --- a/pkg/engine/internal/types/operators.go +++ b/pkg/engine/internal/types/operators.go @@ -17,8 +17,6 @@ const ( // String returns the string representation of the UnaryOp. func (t UnaryOp) String() string { switch t { - case UnaryOpInvalid: - return typeInvalid case UnaryOpNot: return "NOT" case UnaryOpAbs: @@ -65,8 +63,6 @@ const ( // String returns a human-readable representation of the binary operation kind. func (t BinaryOp) String() string { switch t { - case BinaryOpInvalid: - return typeInvalid case BinaryOpEq: return "EQ" case BinaryOpNeq: diff --git a/pkg/engine/internal/types/value.go b/pkg/engine/internal/types/value.go deleted file mode 100644 index 21ff8babf1..0000000000 --- a/pkg/engine/internal/types/value.go +++ /dev/null @@ -1,47 +0,0 @@ -package types - -const ( - typeInvalid = "invalid" -) - -// ValueType represents the type of a value, which can either be a literal value, or a column value. -type ValueType uint32 - -const ( - ValueTypeInvalid ValueType = iota // zero-value is an invalid type - - ValueTypeNull // NULL value. - ValueTypeBool // Boolean value - ValueTypeFloat // 64bit floating point value - ValueTypeInt // Signed 64bit integer value - ValueTypeTimestamp // Unsigned 64bit integer value (nanosecond timestamp) - ValueTypeStr // String value - ValueTypeByteArray // Byte-slice value - // ValueTypeBytes - // ValueTypeDate - // ValueTypeDuration -) - -// String returns the string representation of the LiteralKind. -func (t ValueType) String() string { - switch t { - case ValueTypeInvalid: - return typeInvalid - case ValueTypeNull: - return "null" - case ValueTypeBool: - return "bool" - case ValueTypeFloat: - return "float" - case ValueTypeInt: - return "int" - case ValueTypeTimestamp: - return "timestamp" - case ValueTypeStr: - return "string" - case ValueTypeByteArray: - return "[]byte" - default: - return typeInvalid - } -} diff --git a/pkg/engine/planner/logical/format_tree_test.go b/pkg/engine/planner/logical/format_tree_test.go index beb69b754f..5dcdc567e2 100644 --- a/pkg/engine/planner/logical/format_tree_test.go +++ b/pkg/engine/planner/logical/format_tree_test.go @@ -32,7 +32,7 @@ func TestFormatSimpleQuery(t *testing.T) { ).Select( &BinOp{ Left: NewColumnRef("age", types.ColumnTypeMetadata), - Right: NewLiteral[int64](21), + Right: NewLiteral(21), Op: types.BinaryOpGt, }, ) @@ -50,7 +50,7 @@ func TestFormatSimpleQuery(t *testing.T) { SELECT <%4> table=%2 predicate=%3 │ └── BinOp <%3> op=GT left=metadata.age right=21 │ ├── ColumnRef column=age type=metadata -│ └── Literal value=21 kind=int +│ └── Literal value=21 kind=integer └── MAKETABLE <%2> selector=EQ label.app "users" └── BinOp <%1> op=EQ left=label.app right="users" ├── ColumnRef column=app type=label @@ -75,7 +75,7 @@ func TestFormatSortQuery(t *testing.T) { ).Select( &BinOp{ Left: NewColumnRef("age", types.ColumnTypeMetadata), - Right: NewLiteral[int64](21), + Right: NewLiteral(21), Op: types.BinaryOpGt, }, ).Sort(*NewColumnRef("age", types.ColumnTypeMetadata), true, false) @@ -95,7 +95,7 @@ SORT <%5> table=%4 column=metadata.age direction=asc nulls=last └── SELECT <%4> table=%2 predicate=%3 │ └── BinOp <%3> op=GT left=metadata.age right=21 │ ├── ColumnRef column=age type=metadata - │ └── Literal value=21 kind=int + │ └── Literal value=21 kind=integer └── MAKETABLE <%2> selector=EQ label.app "users" └── BinOp <%1> op=EQ left=label.app right="users" ├── ColumnRef column=app type=label diff --git a/pkg/engine/planner/logical/logical_test.go b/pkg/engine/planner/logical/logical_test.go index 2760faf333..5c56ad0da6 100644 --- a/pkg/engine/planner/logical/logical_test.go +++ b/pkg/engine/planner/logical/logical_test.go @@ -25,7 +25,7 @@ func TestPlan_String(t *testing.T) { ).Select( &BinOp{ Left: NewColumnRef("age", types.ColumnTypeMetadata), - Right: NewLiteral[int64](21), + Right: NewLiteral(21), Op: types.BinaryOpGt, }, ).Sort(*NewColumnRef("age", types.ColumnTypeMetadata), true, false) diff --git a/pkg/engine/planner/logical/node_literal.go b/pkg/engine/planner/logical/node_literal.go index d25fefd872..cadb283f86 100644 --- a/pkg/engine/planner/logical/node_literal.go +++ b/pkg/engine/planner/logical/node_literal.go @@ -1,7 +1,9 @@ package logical import ( - "github.com/grafana/loki/v3/pkg/engine/internal/types" + "time" + + "github.com/grafana/loki/v3/pkg/engine/internal/datatype" "github.com/grafana/loki/v3/pkg/engine/planner/schema" ) @@ -10,36 +12,56 @@ import ( // // The zero value of a Literal is a NULL value. type Literal struct { - val types.Literal + datatype.Literal } var _ Value = (*Literal)(nil) -func NewLiteral[T any](v T) *Literal { - return &Literal{ - val: types.Literal{Value: v}, +func NewLiteral(v any) *Literal { + if v == nil { + return &Literal{Literal: datatype.NewNullLiteral()} + } + + switch casted := v.(type) { + case bool: + return &Literal{Literal: datatype.NewBoolLiteral(casted)} + case string: + // TODO(chaudum): Try parsing bytes/timestamp/duration + return &Literal{Literal: datatype.NewStringLiteral(casted)} + case int: + return &Literal{Literal: datatype.NewIntegerLiteral(int64(casted))} + case int64: + return &Literal{Literal: datatype.NewIntegerLiteral(casted)} + case float64: + return &Literal{Literal: datatype.NewFloatLiteral(casted)} + case time.Time: + return &Literal{Literal: datatype.NewTimestampLiteral(casted)} + case time.Duration: + return &Literal{Literal: datatype.NewDurationLiteral(casted)} + default: + return &Literal{Literal: datatype.NewNullLiteral()} } } // Kind returns the kind of value represented by the literal. -func (l Literal) Kind() types.ValueType { - return l.val.ValueType() +func (l Literal) Kind() datatype.DataType { + return l.Literal.Type() } // Name returns the string form of the literal. func (l Literal) Name() string { - return l.val.String() + return l.Literal.String() } // String returns a printable form of the literal, even if lit is not a // [ValueTypeString]. func (l Literal) String() string { - return l.val.String() + return l.Literal.String() } // Value returns lit's value as untyped interface{}. func (l Literal) Value() any { - return l.val.Value + return l.Literal.Any() } func (l *Literal) Schema() *schema.Schema { diff --git a/pkg/engine/planner/logical/planner.go b/pkg/engine/planner/logical/planner.go index 3557ccd179..aa22ca5de6 100644 --- a/pkg/engine/planner/logical/planner.go +++ b/pkg/engine/planner/logical/planner.go @@ -3,6 +3,7 @@ package logical import ( "errors" "fmt" + "time" "github.com/prometheus/prometheus/model/labels" @@ -73,8 +74,8 @@ func BuildPlan(query logql.Params) (*Plan, error) { builder = builder.Sort(*timestampColumnRef(), ascending, false) // SELECT -> Filter - start := query.Start().UnixNano() - end := query.End().UnixNano() + start := query.Start() + end := query.End() for _, value := range convertQueryRangeToPredicates(start, end) { builder = builder.Select(value) } @@ -235,16 +236,16 @@ func convertLabelFilter(expr log.LabelFilterer) (Value, error) { return nil, fmt.Errorf("invalid label filter %T", expr) } -func convertQueryRangeToPredicates(start, end int64) []*BinOp { +func convertQueryRangeToPredicates(start, end time.Time) []*BinOp { return []*BinOp{ { Left: timestampColumnRef(), - Right: NewLiteral(uint64(start)), + Right: NewLiteral(start), Op: types.BinaryOpGte, }, { Left: timestampColumnRef(), - Right: NewLiteral(uint64(end)), + Right: NewLiteral(end), Op: types.BinaryOpLt, }, } diff --git a/pkg/engine/planner/logical/planner_test.go b/pkg/engine/planner/logical/planner_test.go index ee1cb82e77..35b12a1701 100644 --- a/pkg/engine/planner/logical/planner_test.go +++ b/pkg/engine/planner/logical/planner_test.go @@ -27,12 +27,12 @@ func (q *query) Direction() logproto.Direction { // End implements logql.Params. func (q *query) End() time.Time { - return time.Unix(0, q.end) + return time.Unix(q.end, 0) } // Start implements logql.Params. func (q *query) Start() time.Time { - return time.Unix(0, q.start) + return time.Unix(q.start, 0) } // Limit implements logql.Params. @@ -80,8 +80,8 @@ var _ logql.Params = (*query)(nil) func TestConvertAST_Success(t *testing.T) { q := &query{ statement: `{cluster="prod", namespace=~"loki-.*"} | foo="bar" or bar="baz" |= "metric.go" |= "foo" or "bar" !~ "(a|b|c)" `, - start: 1000, - end: 2000, + start: 3600, + end: 7200, direction: logproto.FORWARD, limit: 1000, } @@ -94,9 +94,9 @@ func TestConvertAST_Success(t *testing.T) { %3 = AND %1 %2 %4 = MAKETABLE [selector=%3] %5 = SORT %4 [column=builtin.timestamp, asc=true, nulls_first=false] -%6 = GTE builtin.timestamp 1000 +%6 = GTE builtin.timestamp 1970-01-01T01:00:00Z %7 = SELECT %5 [predicate=%6] -%8 = LT builtin.timestamp 2000 +%8 = LT builtin.timestamp 1970-01-01T02:00:00Z %9 = SELECT %7 [predicate=%8] %10 = MATCH_STR ambiguous.foo "bar" %11 = MATCH_STR ambiguous.bar "baz" diff --git a/pkg/engine/planner/physical/context.go b/pkg/engine/planner/physical/context.go index f07c223516..a110f65ce4 100644 --- a/pkg/engine/planner/physical/context.go +++ b/pkg/engine/planner/physical/context.go @@ -9,6 +9,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/v3/pkg/dataobj/metastore" + "github.com/grafana/loki/v3/pkg/engine/internal/datatype" "github.com/grafana/loki/v3/pkg/engine/internal/types" ) @@ -98,7 +99,7 @@ func expressionToMatchers(selector Expression) ([]*labels.Matcher, error) { if err != nil { return nil, err } - value, err := convertLiteral(expr.Right) + value, err := convertLiteralToString(expr.Right) if err != nil { return nil, err } @@ -115,15 +116,15 @@ func expressionToMatchers(selector Expression) ([]*labels.Matcher, error) { } } -func convertLiteral(expr Expression) (string, error) { +func convertLiteralToString(expr Expression) (string, error) { l, ok := expr.(*LiteralExpr) if !ok { return "", fmt.Errorf("expected literal expression, got %T", expr) } - if l.ValueType() != types.ValueTypeStr { + if l.ValueType() != datatype.String { return "", fmt.Errorf("literal type is not a string, got %v", l.ValueType()) } - return l.Value.Str(), nil + return l.Any().(string), nil } func convertColumnRef(expr Expression) (string, error) { diff --git a/pkg/engine/planner/physical/context_test.go b/pkg/engine/planner/physical/context_test.go index 22b565128a..3f929f0299 100644 --- a/pkg/engine/planner/physical/context_test.go +++ b/pkg/engine/planner/physical/context_test.go @@ -2,6 +2,7 @@ package physical import ( "testing" + "time" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" @@ -24,11 +25,15 @@ func TestContext_ConvertLiteral(t *testing.T) { wantErr: true, }, { - expr: NewLiteral(int64(123)), + expr: NewLiteral(123), wantErr: true, }, { - expr: NewLiteral(uint64(123456789)), + expr: NewLiteral(time.Now()), + wantErr: true, + }, + { + expr: NewLiteral(time.Hour), wantErr: true, }, { @@ -46,7 +51,7 @@ func TestContext_ConvertLiteral(t *testing.T) { } for _, tt := range tests { t.Run(tt.expr.String(), func(t *testing.T) { - got, err := convertLiteral(tt.expr) + got, err := convertLiteralToString(tt.expr) if tt.wantErr { require.Error(t, err) t.Log(err) diff --git a/pkg/engine/planner/physical/expressions.go b/pkg/engine/planner/physical/expressions.go index bbce03b613..eac7126b2a 100644 --- a/pkg/engine/planner/physical/expressions.go +++ b/pkg/engine/planner/physical/expressions.go @@ -2,7 +2,9 @@ package physical import ( "fmt" + "time" + "github.com/grafana/loki/v3/pkg/engine/internal/datatype" "github.com/grafana/loki/v3/pkg/engine/internal/types" ) @@ -59,7 +61,7 @@ type BinaryExpression interface { // physical plan. type LiteralExpression interface { Expression - ValueType() types.ValueType + ValueType() datatype.DataType isLiteralExpr() } @@ -110,7 +112,7 @@ func (*BinaryExpr) Type() ExpressionType { // LiteralExpr is an expression that implements the [LiteralExpression] interface. type LiteralExpr struct { - Value types.Literal + datatype.Literal } func (*LiteralExpr) isExpr() {} @@ -118,7 +120,7 @@ func (*LiteralExpr) isLiteralExpr() {} // String returns the string representation of the literal value. func (e *LiteralExpr) String() string { - return e.Value.String() + return e.Literal.String() } // ID returns the type of the [LiteralExpr]. @@ -127,13 +129,33 @@ func (*LiteralExpr) Type() ExpressionType { } // ValueType returns the kind of value represented by the literal. -func (e *LiteralExpr) ValueType() types.ValueType { - return e.Value.ValueType() +func (e *LiteralExpr) ValueType() datatype.DataType { + return e.Literal.Type() } func NewLiteral(value any) *LiteralExpr { - return &LiteralExpr{ - Value: types.Literal{Value: value}, + if value == nil { + return &LiteralExpr{Literal: datatype.NewNullLiteral()} + } + + switch casted := value.(type) { + case bool: + return &LiteralExpr{Literal: datatype.NewBoolLiteral(casted)} + case string: + // TODO(chaudum): Try parsing bytes/timestamp/duration + return &LiteralExpr{Literal: datatype.NewStringLiteral(casted)} + case int: + return &LiteralExpr{Literal: datatype.NewIntegerLiteral(int64(casted))} + case int64: + return &LiteralExpr{Literal: datatype.NewIntegerLiteral(casted)} + case float64: + return &LiteralExpr{Literal: datatype.NewFloatLiteral(casted)} + case time.Time: + return &LiteralExpr{Literal: datatype.NewTimestampLiteral(casted)} + case time.Duration: + return &LiteralExpr{Literal: datatype.NewDurationLiteral(casted)} + default: + panic(fmt.Sprintf("invalid literal value type %T", value)) } } diff --git a/pkg/engine/planner/physical/expressions_test.go b/pkg/engine/planner/physical/expressions_test.go index 35272ace74..34d6555773 100644 --- a/pkg/engine/planner/physical/expressions_test.go +++ b/pkg/engine/planner/physical/expressions_test.go @@ -2,9 +2,11 @@ package physical import ( "testing" + "time" "github.com/stretchr/testify/require" + "github.com/grafana/loki/v3/pkg/engine/internal/datatype" "github.com/grafana/loki/v3/pkg/engine/internal/types" ) @@ -18,7 +20,7 @@ func TestExpressionTypes(t *testing.T) { name: "UnaryExpression", expr: &UnaryExpr{ Op: types.UnaryOpNot, - Left: &LiteralExpr{Value: types.BoolLiteral(true)}, + Left: NewLiteral(true), }, expected: ExprTypeUnary, }, @@ -27,13 +29,13 @@ func TestExpressionTypes(t *testing.T) { expr: &BinaryExpr{ Op: types.BinaryOpEq, Left: &ColumnExpr{Ref: types.ColumnRef{Column: "col", Type: types.ColumnTypeBuiltin}}, - Right: &LiteralExpr{Value: types.StringLiteral("foo")}, + Right: NewLiteral("foo"), }, expected: ExprTypeBinary, }, { name: "LiteralExpression", - expr: &LiteralExpr{Value: types.StringLiteral("col")}, + expr: NewLiteral("col"), expected: ExprTypeLiteral, }, { @@ -58,7 +60,7 @@ func TestLiteralExpr(t *testing.T) { require.Equal(t, ExprTypeLiteral, expr.Type()) literal, ok := expr.(LiteralExpression) require.True(t, ok) - require.Equal(t, types.ValueTypeBool, literal.ValueType()) + require.Equal(t, datatype.Bool, literal.ValueType()) }) t.Run("float", func(t *testing.T) { @@ -66,7 +68,7 @@ func TestLiteralExpr(t *testing.T) { require.Equal(t, ExprTypeLiteral, expr.Type()) literal, ok := expr.(LiteralExpression) require.True(t, ok) - require.Equal(t, types.ValueTypeFloat, literal.ValueType()) + require.Equal(t, datatype.Float, literal.ValueType()) }) t.Run("integer", func(t *testing.T) { @@ -74,15 +76,23 @@ func TestLiteralExpr(t *testing.T) { require.Equal(t, ExprTypeLiteral, expr.Type()) literal, ok := expr.(LiteralExpression) require.True(t, ok) - require.Equal(t, types.ValueTypeInt, literal.ValueType()) + require.Equal(t, datatype.Integer, literal.ValueType()) }) t.Run("timestamp", func(t *testing.T) { - var expr Expression = NewLiteral(uint64(1741882435000000000)) + var expr Expression = NewLiteral(time.Unix(0, 1741882435000000000)) require.Equal(t, ExprTypeLiteral, expr.Type()) literal, ok := expr.(LiteralExpression) require.True(t, ok) - require.Equal(t, types.ValueTypeTimestamp, literal.ValueType()) + require.Equal(t, datatype.Timestamp, literal.ValueType()) + }) + + t.Run("duration", func(t *testing.T) { + var expr Expression = NewLiteral(time.Hour) + require.Equal(t, ExprTypeLiteral, expr.Type()) + literal, ok := expr.(LiteralExpression) + require.True(t, ok) + require.Equal(t, datatype.Duration, literal.ValueType()) }) t.Run("string", func(t *testing.T) { @@ -90,6 +100,6 @@ func TestLiteralExpr(t *testing.T) { require.Equal(t, ExprTypeLiteral, expr.Type()) literal, ok := expr.(LiteralExpression) require.True(t, ok) - require.Equal(t, types.ValueTypeStr, literal.ValueType()) + require.Equal(t, datatype.String, literal.ValueType()) }) } diff --git a/pkg/engine/planner/physical/optimizer_test.go b/pkg/engine/planner/physical/optimizer_test.go index 3b5647ecce..8cbf822078 100644 --- a/pkg/engine/planner/physical/optimizer_test.go +++ b/pkg/engine/planner/physical/optimizer_test.go @@ -2,6 +2,7 @@ package physical import ( "testing" + "time" "github.com/stretchr/testify/require" @@ -28,7 +29,7 @@ func TestCanApplyPredicate(t *testing.T) { { predicate: &BinaryExpr{ Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin), - Right: NewLiteral(uint64(1743424636000000000)), + Right: NewLiteral(time.Now()), Op: types.BinaryOpGt, }, want: true, @@ -58,14 +59,14 @@ func dummyPlan() *Plan { filter1 := plan.addNode(&Filter{id: "filter1", Predicates: []Expression{ &BinaryExpr{ Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin), - Right: NewLiteral(uint64(1000000000)), + Right: NewLiteral(time.Unix(0, 1000000000)), Op: types.BinaryOpGt, }, }}) filter2 := plan.addNode(&Filter{id: "filter2", Predicates: []Expression{ &BinaryExpr{ Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin), - Right: NewLiteral(uint64(2000000000)), + Right: NewLiteral(time.Unix(0, 2000000000)), Op: types.BinaryOpLte, }, }}) @@ -111,24 +112,24 @@ func TestOptimizer(t *testing.T) { scan1 := optimized.addNode(&DataObjScan{id: "scan1", Predicates: []Expression{ &BinaryExpr{ Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin), - Right: NewLiteral(uint64(1000000000)), + Right: NewLiteral(time.Unix(0, 1000000000)), Op: types.BinaryOpGt, }, &BinaryExpr{ Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin), - Right: NewLiteral(uint64(2000000000)), + Right: NewLiteral(time.Unix(0, 2000000000)), Op: types.BinaryOpLte, }, }}) scan2 := optimized.addNode(&DataObjScan{id: "scan2", Predicates: []Expression{ &BinaryExpr{ Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin), - Right: NewLiteral(uint64(1000000000)), + Right: NewLiteral(time.Unix(0, 1000000000)), Op: types.BinaryOpGt, }, &BinaryExpr{ Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin), - Right: NewLiteral(uint64(2000000000)), + Right: NewLiteral(time.Unix(0, 2000000000)), Op: types.BinaryOpLte, }, }}) @@ -166,14 +167,14 @@ func TestOptimizer(t *testing.T) { filter1 := optimized.addNode(&Filter{id: "filter1", Predicates: []Expression{ &BinaryExpr{ Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin), - Right: NewLiteral(uint64(1000000000)), + Right: NewLiteral(time.Unix(0, 1000000000)), Op: types.BinaryOpGt, }, }}) filter2 := optimized.addNode(&Filter{id: "filter2", Predicates: []Expression{ &BinaryExpr{ Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin), - Right: NewLiteral(uint64(2000000000)), + Right: NewLiteral(time.Unix(0, 2000000000)), Op: types.BinaryOpLte, }, }}) diff --git a/pkg/engine/planner/physical/planner_test.go b/pkg/engine/planner/physical/planner_test.go index 0fd689a167..47a59309a9 100644 --- a/pkg/engine/planner/physical/planner_test.go +++ b/pkg/engine/planner/physical/planner_test.go @@ -2,6 +2,7 @@ package physical import ( "testing" + "time" "github.com/stretchr/testify/require" @@ -50,7 +51,7 @@ func TestPlanner_Convert(t *testing.T) { ).Select( &logical.BinOp{ Left: logical.NewColumnRef("timestamp", types.ColumnTypeBuiltin), - Right: logical.NewLiteral(uint64(1742826126000000000)), + Right: logical.NewLiteral(time.Unix(0, 1742826126000000000)), Op: types.BinaryOpLt, }, ).Limit(0, 1000)