chore(engine): Improve type system of the new query engine (#17456)

**What this PR does / why we need it**:

The original type system for the new engine was introduced before adding Apache Arrow and with that an additional type system.


One aspect of the new type system of this PR is that the logical types of the engine are aligned/interchangeable with the arrow logical types.

```go
type Type uint8

const (
	NULL    = Type(arrow.NULL)
	BOOL    = Type(arrow.BOOL)
	STRING  = Type(arrow.STRING)
	INT64   = Type(arrow.INT64)
	FLOAT64 = Type(arrow.FLOAT64)
)
```

The actual data types of Loki (`Boolean`, `String`, `Integer`, `Float`, `Timestamp`, `Duration`, and `Bytes`) are implemented as wrapper around a logical type. Different data types can use the same logical type as representation, such as `Integer`, `Timestamp` and `Duration` both use `INT64`.

Since Arrow array types are bound to logical types, and one logical type can have multiple Loki data types, it is required to "annotate" Fields of an arrow.Record using its Metadata, similar to how it is done for distinguishing builtin columns from label/metadata/parsed columns.

Additionally, the PR contains a mapping between Loki types and Arrow types.

The `Literal` has been updated to use the new types.

The old system also used UINT64 as representation of the `Timestamp` type, which is incorrect, since both LogQL and the DataObjects allow timestamps before unix epoch.

---

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/17578/head
Christian Haudum 1 year ago committed by GitHub
parent 4841b2abe2
commit 94b1d2d5ce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 2
      pkg/engine/engine.go
  2. 14
      pkg/engine/engine_test.go
  3. 111
      pkg/engine/executor/expressions.go
  4. 55
      pkg/engine/executor/expressions_test.go
  5. 21
      pkg/engine/executor/filter_test.go
  6. 4
      pkg/engine/executor/project.go
  7. 28
      pkg/engine/executor/project_test.go
  8. 16
      pkg/engine/executor/sortmerge.go
  9. 8
      pkg/engine/executor/sortmerge_test.go
  10. 13
      pkg/engine/executor/util_test.go
  11. 67
      pkg/engine/internal/datatype/arrow.go
  12. 240
      pkg/engine/internal/datatype/literal.go
  13. 120
      pkg/engine/internal/datatype/types.go
  14. 19
      pkg/engine/internal/datatype/util.go
  15. 38
      pkg/engine/internal/types/column.go
  16. 135
      pkg/engine/internal/types/literal.go
  17. 83
      pkg/engine/internal/types/literal_test.go
  18. 4
      pkg/engine/internal/types/operators.go
  19. 47
      pkg/engine/internal/types/value.go
  20. 8
      pkg/engine/planner/logical/format_tree_test.go
  21. 2
      pkg/engine/planner/logical/logical_test.go
  22. 42
      pkg/engine/planner/logical/node_literal.go
  23. 11
      pkg/engine/planner/logical/planner.go
  24. 12
      pkg/engine/planner/logical/planner_test.go
  25. 9
      pkg/engine/planner/physical/context.go
  26. 11
      pkg/engine/planner/physical/context_test.go
  27. 36
      pkg/engine/planner/physical/expressions.go
  28. 28
      pkg/engine/planner/physical/expressions_test.go
  29. 19
      pkg/engine/planner/physical/optimizer_test.go
  30. 3
      pkg/engine/planner/physical/planner_test.go

@ -167,7 +167,7 @@ func collectRow(rec arrow.Record, i int, result *resultBuilder) {
// TODO(chaudum): We need to add metadata to columns to identify builtins, labels, metadata, and parsed.
field := rec.Schema().Field(colIdx)
colType, ok := field.Metadata.GetValue(types.ColumnTypeMetadataKey)
colType, ok := field.Metadata.GetValue(types.MetadataKeyColumnType)
// Ignore column values that are NULL or invalid or don't have a column typ
if col.IsNull(i) || !col.IsValid(i) || !ok {

@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/engine/executor"
"github.com/grafana/loki/v3/pkg/engine/internal/datatype"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logqlmodel"
@ -52,15 +53,14 @@ func createRecord(t *testing.T, schema *arrow.Schema, data [][]interface{}) arro
}
func TestConvertArrowRecordsToLokiResult(t *testing.T) {
mdTypeBuiltin := arrow.NewMetadata([]string{types.ColumnTypeMetadataKey}, []string{types.ColumnTypeBuiltin.String()})
mdTypeLabel := arrow.NewMetadata([]string{types.ColumnTypeMetadataKey}, []string{types.ColumnTypeLabel.String()})
mdTypeMetadata := arrow.NewMetadata([]string{types.ColumnTypeMetadataKey}, []string{types.ColumnTypeMetadata.String()})
mdTypeLabel := datatype.ColumnMetadata(types.ColumnTypeLabel, datatype.String)
mdTypeMetadata := datatype.ColumnMetadata(types.ColumnTypeMetadata, datatype.String)
t.Run("rows without log line, timestamp, or labels are ignored", func(t *testing.T) {
schema := arrow.NewSchema(
[]arrow.Field{
{Name: types.ColumnNameBuiltinTimestamp, Type: arrow.PrimitiveTypes.Uint64, Metadata: mdTypeBuiltin},
{Name: types.ColumnNameBuiltinLine, Type: arrow.BinaryTypes.String, Metadata: mdTypeBuiltin},
{Name: types.ColumnNameBuiltinTimestamp, Type: arrow.PrimitiveTypes.Uint64, Metadata: datatype.ColumnMetadataBuiltinTimestamp},
{Name: types.ColumnNameBuiltinLine, Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadataBuiltinLine},
{Name: "env", Type: arrow.BinaryTypes.String, Metadata: mdTypeLabel},
},
nil,
@ -117,8 +117,8 @@ func TestConvertArrowRecordsToLokiResult(t *testing.T) {
t.Run("successful conversion of labels, log line, timestamp, and structured metadata ", func(t *testing.T) {
schema := arrow.NewSchema(
[]arrow.Field{
{Name: types.ColumnNameBuiltinTimestamp, Type: arrow.PrimitiveTypes.Uint64, Metadata: mdTypeBuiltin},
{Name: types.ColumnNameBuiltinLine, Type: arrow.BinaryTypes.String, Metadata: mdTypeBuiltin},
{Name: types.ColumnNameBuiltinTimestamp, Type: arrow.PrimitiveTypes.Uint64, Metadata: datatype.ColumnMetadataBuiltinTimestamp},
{Name: types.ColumnNameBuiltinLine, Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadataBuiltinLine},
{Name: "env", Type: arrow.BinaryTypes.String, Metadata: mdTypeLabel},
{Name: "namespace", Type: arrow.BinaryTypes.String, Metadata: mdTypeLabel},
{Name: "traceID", Type: arrow.BinaryTypes.String, Metadata: mdTypeMetadata},

@ -7,6 +7,7 @@ import (
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/grafana/loki/v3/pkg/engine/internal/datatype"
"github.com/grafana/loki/v3/pkg/engine/internal/errors"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/engine/planner/physical"
@ -19,15 +20,28 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record)
case *physical.LiteralExpr:
return &Scalar{
value: expr.Value,
value: expr.Literal,
rows: input.NumRows(),
ct: types.ColumnTypeAmbiguous,
}, nil
case *physical.ColumnExpr:
schema := input.Schema()
for i := range input.NumCols() {
md := schema.Field(int(i)).Metadata
if input.ColumnName(int(i)) == expr.Ref.Column {
dt, ok := md.GetValue(types.MetadataKeyColumnDataType)
if !ok {
continue
}
ct, ok := md.GetValue(types.MetadataKeyColumnType)
if !ok {
ct = types.ColumnTypeAmbiguous.String()
}
return &Array{
array: input.Column(int(i)),
dt: datatype.FromString(dt),
ct: types.ColumnTypeFromString(ct),
rows: input.NumRows(),
}, nil
}
@ -40,7 +54,7 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record)
return nil, err
}
fn, err := unaryFunctions.GetForSignature(expr.Op, lhr.Type())
fn, err := unaryFunctions.GetForSignature(expr.Op, lhr.Type().ArrowType())
if err != nil {
return nil, fmt.Errorf("failed to lookup unary function: %w", err)
}
@ -57,13 +71,13 @@ func (e expressionEvaluator) eval(expr physical.Expression, input arrow.Record)
}
// At the moment we only support functions that accept the same input types.
if lhs.Type().ID() != rhs.Type().ID() {
return nil, fmt.Errorf("failed to lookup binary function for signature %v(%v,%v): types do not match", expr.Op, lhs.Type(), rhs.Type())
if lhs.Type().ArrowType().ID() != rhs.Type().ArrowType().ID() {
return nil, fmt.Errorf("failed to lookup binary function for signature %v(%v,%v): types do not match", expr.Op, lhs.Type().ArrowType(), rhs.Type().ArrowType())
}
fn, err := binaryFunctions.GetForSignature(expr.Op, lhs.Type())
fn, err := binaryFunctions.GetForSignature(expr.Op, lhs.Type().ArrowType())
if err != nil {
return nil, fmt.Errorf("failed to lookup binary function for signature %v(%v,%v): %w", expr.Op, lhs.Type(), rhs.Type(), err)
return nil, fmt.Errorf("failed to lookup binary function for signature %v(%v,%v): %w", expr.Op, lhs.Type().ArrowType(), rhs.Type().ArrowType(), err)
}
return fn.Evaluate(lhs, rhs)
}
@ -86,16 +100,19 @@ type ColumnVector interface {
ToArray() arrow.Array
// Value returns the value at the specified index position in the column vector.
Value(i int) any
// Type returns the Arrow data type of the column vector.
Type() arrow.DataType
// Type returns the Loki data type of the column vector.
Type() datatype.DataType
// ColumnType returns the type of column the vector originates from.
ColumnType() types.ColumnType
// Len returns the length of the vector
Len() int64
}
// Scalar represents a single value repeated any number of times.
type Scalar struct {
value types.Literal
value datatype.Literal
rows int64
ct types.ColumnType
}
var _ ColumnVector = (*Scalar)(nil)
@ -103,60 +120,63 @@ var _ ColumnVector = (*Scalar)(nil)
// ToArray implements ColumnVector.
func (v *Scalar) ToArray() arrow.Array {
mem := memory.NewGoAllocator()
builder := array.NewBuilder(mem, v.Type())
builder := array.NewBuilder(mem, v.Type().ArrowType())
defer builder.Release()
for i := int64(0); i < v.rows; i++ {
switch v.value.ValueType() {
case types.ValueTypeBool:
builder.(*array.BooleanBuilder).Append(v.value.Value.(bool))
case types.ValueTypeStr:
builder.(*array.StringBuilder).Append(v.value.Value.(string))
case types.ValueTypeInt:
builder.(*array.Int64Builder).Append(v.value.Value.(int64))
case types.ValueTypeFloat:
builder.(*array.Float64Builder).Append(v.value.Value.(float64))
case types.ValueTypeTimestamp:
builder.(*array.Uint64Builder).Append(v.value.Value.(uint64))
default:
switch builder := builder.(type) {
case *array.NullBuilder:
for range v.rows {
builder.AppendNull()
}
case *array.BooleanBuilder:
value := v.value.Any().(bool)
for range v.rows {
builder.Append(value)
}
case *array.StringBuilder:
value := v.value.Any().(string)
for range v.rows {
builder.Append(value)
}
case *array.Int64Builder:
value := v.value.Any().(int64)
for range v.rows {
builder.Append(value)
}
case *array.Float64Builder:
value := v.value.Any().(float64)
for range v.rows {
builder.Append(value)
}
}
return builder.NewArray()
}
// Value implements ColumnVector.
func (v *Scalar) Value(_ int) any {
return v.value.Value
return v.value.Any()
}
// Type implements ColumnVector.
func (v Scalar) Type() arrow.DataType {
switch v.value.ValueType() {
case types.ValueTypeBool:
return arrow.FixedWidthTypes.Boolean
case types.ValueTypeStr:
return arrow.BinaryTypes.String
case types.ValueTypeInt:
return arrow.PrimitiveTypes.Int64
case types.ValueTypeFloat:
return arrow.PrimitiveTypes.Float64
case types.ValueTypeTimestamp:
return arrow.PrimitiveTypes.Uint64
default:
return arrow.Null
}
func (v *Scalar) Type() datatype.DataType {
return v.value.Type()
}
// ColumnType implements ColumnVector.
func (v *Scalar) ColumnType() types.ColumnType {
return v.ct
}
// Len implements ColumnVector.
func (v Scalar) Len() int64 {
func (v *Scalar) Len() int64 {
return v.rows
}
// Array represents a column of data, stored as an [arrow.Array].
type Array struct {
array arrow.Array
dt datatype.DataType
ct types.ColumnType
rows int64
}
@ -190,8 +210,13 @@ func (a *Array) Value(i int) any {
}
// Type implements ColumnVector.
func (a *Array) Type() arrow.DataType {
return a.array.DataType()
func (a *Array) Type() datatype.DataType {
return a.dt
}
// ColumnType implements ColumnVector.
func (a *Array) ColumnType() types.ColumnType {
return a.ct
}
// Len implements ColumnVector.

@ -9,6 +9,7 @@ import (
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/engine/internal/datatype"
"github.com/grafana/loki/v3/pkg/engine/internal/errors"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/engine/planner/physical"
@ -16,10 +17,10 @@ import (
var (
fields = []arrow.Field{
{Name: "name", Type: arrow.BinaryTypes.String},
{Name: "timestamp", Type: arrow.PrimitiveTypes.Uint64},
{Name: "value", Type: arrow.PrimitiveTypes.Float64},
{Name: "valid", Type: arrow.FixedWidthTypes.Boolean},
{Name: "name", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.String)},
{Name: "timestamp", Type: arrow.PrimitiveTypes.Int64, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Timestamp)},
{Name: "value", Type: arrow.PrimitiveTypes.Float64, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Float)},
{Name: "valid", Type: arrow.FixedWidthTypes.Boolean, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Bool)},
}
sampledata = `Alice,1745487598764058205,0.2586284611568047,false
Bob,1745487598764058305,0.7823145698741236,true
@ -61,13 +62,23 @@ func TestEvaluateLiteralExpression(t *testing.T) {
},
{
name: "float",
value: float64(123.456789),
value: 123.456789,
arrowType: arrow.FLOAT64,
},
{
name: "timestamp",
value: uint64(1744612881740032450),
arrowType: arrow.UINT64,
value: time.Unix(3600, 0).UTC(),
arrowType: arrow.INT64,
},
{
name: "duration",
value: time.Hour,
arrowType: arrow.INT64,
},
{
name: "bytes",
value: int64(1024),
arrowType: arrow.INT64,
},
} {
t.Run(tt.name, func(t *testing.T) {
@ -78,7 +89,7 @@ func TestEvaluateLiteralExpression(t *testing.T) {
rec := batch(n, time.Now())
colVec, err := e.eval(literal, rec)
require.NoError(t, err)
require.Equalf(t, tt.arrowType, colVec.Type().ID(), "expected: %v got: %v", tt.arrowType.String(), colVec.Type().ID().String())
require.Equalf(t, tt.arrowType, colVec.Type().ArrowType().ID(), "expected: %v got: %v", tt.arrowType.String(), colVec.Type().ArrowType().ID().String())
for i := range n {
val := colVec.Value(i)
@ -105,10 +116,10 @@ func TestEvaluateColumnExpression(t *testing.T) {
require.ErrorContains(t, err, errors.ErrKey.Error())
})
t.Run("string(log)", func(t *testing.T) {
t.Run("string(line)", func(t *testing.T) {
colExpr := &physical.ColumnExpr{
Ref: types.ColumnRef{
Column: "log",
Column: "line",
Type: types.ColumnTypeBuiltin,
},
}
@ -117,7 +128,7 @@ func TestEvaluateColumnExpression(t *testing.T) {
rec := batch(n, time.Now())
colVec, err := e.eval(colExpr, rec)
require.NoError(t, err)
require.Equal(t, arrow.STRING, colVec.Type().ID())
require.Equal(t, arrow.STRING, colVec.Type().ArrowType().ID())
for i := range n {
val := colVec.Value(i)
@ -145,7 +156,7 @@ func TestEvaluateBinaryExpression(t *testing.T) {
}
_, err := e.eval(expr, rec)
require.ErrorContains(t, err, "failed to lookup binary function for signature EQ(utf8,uint64): types do not match")
require.ErrorContains(t, err, "failed to lookup binary function for signature EQ(utf8,int64): types do not match")
})
t.Run("error if function for signature is not registered", func(t *testing.T) {
@ -156,11 +167,11 @@ func TestEvaluateBinaryExpression(t *testing.T) {
Right: &physical.ColumnExpr{
Ref: types.ColumnRef{Column: "name", Type: types.ColumnTypeBuiltin},
},
Op: types.BinaryOpInvalid,
Op: types.BinaryOpXor,
}
_, err := e.eval(expr, rec)
require.ErrorContains(t, err, "failed to lookup binary function for signature invalid(utf8,utf8): not implemented")
require.ErrorContains(t, err, "failed to lookup binary function for signature XOR(utf8,utf8): not implemented")
})
t.Run("EQ(string,string)", func(t *testing.T) {
@ -168,10 +179,8 @@ func TestEvaluateBinaryExpression(t *testing.T) {
Left: &physical.ColumnExpr{
Ref: types.ColumnRef{Column: "name", Type: types.ColumnTypeBuiltin},
},
Right: &physical.LiteralExpr{
Value: types.Literal{Value: "Charlie"},
},
Op: types.BinaryOpEq,
Right: physical.NewLiteral("Charlie"),
Op: types.BinaryOpEq,
}
res, err := e.eval(expr, rec)
@ -185,10 +194,8 @@ func TestEvaluateBinaryExpression(t *testing.T) {
Left: &physical.ColumnExpr{
Ref: types.ColumnRef{Column: "value", Type: types.ColumnTypeBuiltin},
},
Right: &physical.LiteralExpr{
Value: types.Literal{Value: 0.5},
},
Op: types.BinaryOpGt,
Right: physical.NewLiteral(0.5),
Op: types.BinaryOpGt,
}
res, err := e.eval(expr, rec)
@ -216,8 +223,8 @@ func batch(n int, now time.Time) arrow.Record {
// 2. Define the schema
schema := arrow.NewSchema(
[]arrow.Field{
{Name: "log", Type: arrow.BinaryTypes.String},
{Name: "timestamp", Type: arrow.PrimitiveTypes.Uint64},
{Name: "line", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadataBuiltinLine},
{Name: "timestamp", Type: arrow.PrimitiveTypes.Uint64, Metadata: datatype.ColumnMetadataBuiltinTimestamp},
},
nil, // No metadata
)

@ -8,14 +8,15 @@ import (
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/engine/internal/datatype"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/engine/planner/physical"
)
func TestNewFilterPipeline(t *testing.T) {
fields := []arrow.Field{
{Name: "name", Type: arrow.BinaryTypes.String},
{Name: "valid", Type: arrow.FixedWidthTypes.Boolean},
{Name: "name", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.String)},
{Name: "valid", Type: arrow.FixedWidthTypes.Boolean, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Bool)},
}
t.Run("filter with true literal predicate", func(t *testing.T) {
@ -29,9 +30,7 @@ func TestNewFilterPipeline(t *testing.T) {
inputPipeline := NewBufferedPipeline(inputRecord)
// Create a filter predicate that's always true
truePredicate := &physical.LiteralExpr{
Value: createLiteral(true),
}
truePredicate := physical.NewLiteral(true)
// Create a Filter node
filter := &physical.Filter{
@ -63,9 +62,7 @@ func TestNewFilterPipeline(t *testing.T) {
inputPipeline := NewBufferedPipeline(inputRecord)
// Create a filter predicate that's always false
falsePredicate := &physical.LiteralExpr{
Value: createLiteral(false),
}
falsePredicate := physical.NewLiteral(false)
// Create a Filter node
filter := &physical.Filter{
@ -149,12 +146,12 @@ func TestNewFilterPipeline(t *testing.T) {
Predicates: []physical.Expression{
&physical.BinaryExpr{
Left: &physical.ColumnExpr{Ref: createColumnRef("name")},
Right: &physical.LiteralExpr{Value: createLiteral("Bob")},
Right: physical.NewLiteral("Bob"),
Op: types.BinaryOpEq,
},
&physical.BinaryExpr{
Left: &physical.ColumnExpr{Ref: createColumnRef("valid")},
Right: &physical.LiteralExpr{Value: createLiteral(false)},
Right: physical.NewLiteral(false),
Op: types.BinaryOpNeq,
},
},
@ -198,9 +195,7 @@ func TestNewFilterPipeline(t *testing.T) {
inputPipeline := NewBufferedPipeline(emptyRecord)
// Create a simple filter
truePredicate := &physical.LiteralExpr{
Value: createLiteral(true),
}
truePredicate := physical.NewLiteral(true)
// Create a Filter node
filter := &physical.Filter{

@ -6,12 +6,14 @@ import (
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/grafana/loki/v3/pkg/engine/internal/datatype"
"github.com/grafana/loki/v3/pkg/engine/planner/physical"
)
func NewProjectPipeline(input Pipeline, columns []physical.ColumnExpression, evaluator *expressionEvaluator) (*GenericPipeline, error) {
// Get the column names from the projection expressions
columnNames := make([]string, len(columns))
for i, col := range columns {
if colExpr, ok := col.(*physical.ColumnExpr); ok {
columnNames[i] = colExpr.Ref.Column
@ -41,7 +43,7 @@ func NewProjectPipeline(input Pipeline, columns []physical.ColumnExpression, eva
if err != nil {
return failureState(err)
}
fields = append(fields, arrow.Field{Name: columnNames[i], Type: vec.Type()})
fields = append(fields, arrow.Field{Name: columnNames[i], Type: vec.Type().ArrowType(), Metadata: datatype.ColumnMetadata(vec.ColumnType(), vec.Type())})
projected = append(projected, vec.ToArray())
}

@ -6,15 +6,16 @@ import (
"github.com/apache/arrow-go/v18/arrow"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/engine/internal/datatype"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/engine/planner/physical"
)
func TestNewProjectPipeline(t *testing.T) {
fields := []arrow.Field{
{Name: "name", Type: arrow.BinaryTypes.String},
{Name: "age", Type: arrow.PrimitiveTypes.Int32},
{Name: "city", Type: arrow.BinaryTypes.String},
{Name: "name", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.String)},
{Name: "age", Type: arrow.PrimitiveTypes.Int64, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Integer)},
{Name: "city", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.String)},
}
t.Run("project single column", func(t *testing.T) {
@ -41,7 +42,7 @@ func TestNewProjectPipeline(t *testing.T) {
// Create expected output
expectedCSV := "Alice\nBob\nCharlie"
expectedFields := []arrow.Field{
{Name: "name", Type: arrow.BinaryTypes.String},
{Name: "name", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.String)},
}
expectedRecord, err := CSVToArrow(expectedFields, expectedCSV)
require.NoError(t, err)
@ -80,8 +81,8 @@ func TestNewProjectPipeline(t *testing.T) {
// Create expected output
expectedCSV := "Alice,New York\nBob,Boston\nCharlie,Seattle"
expectedFields := []arrow.Field{
{Name: "name", Type: arrow.BinaryTypes.String},
{Name: "city", Type: arrow.BinaryTypes.String},
{Name: "name", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.String)},
{Name: "city", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.String)},
}
expectedRecord, err := CSVToArrow(expectedFields, expectedCSV)
require.NoError(t, err)
@ -123,9 +124,9 @@ func TestNewProjectPipeline(t *testing.T) {
// Create expected output
expectedCSV := "New York,30,Alice\nBoston,25,Bob\nSeattle,35,Charlie"
expectedFields := []arrow.Field{
{Name: "city", Type: arrow.BinaryTypes.String},
{Name: "age", Type: arrow.PrimitiveTypes.Int32},
{Name: "name", Type: arrow.BinaryTypes.String},
{Name: "city", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.String)},
{Name: "age", Type: arrow.PrimitiveTypes.Int64, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Integer)},
{Name: "name", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.String)},
}
expectedRecord, err := CSVToArrow(expectedFields, expectedCSV)
require.NoError(t, err)
@ -169,8 +170,8 @@ func TestNewProjectPipeline(t *testing.T) {
// Create expected output also split across multiple records
expectedFields := []arrow.Field{
{Name: "name", Type: arrow.BinaryTypes.String},
{Name: "age", Type: arrow.PrimitiveTypes.Int32},
{Name: "name", Type: arrow.BinaryTypes.String, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.String)},
{Name: "age", Type: arrow.PrimitiveTypes.Int64, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Integer)},
}
expected := `
@ -199,8 +200,3 @@ func createColumnRef(name string) types.ColumnRef {
Type: types.ColumnTypeBuiltin,
}
}
// Helper to create a literal value
func createLiteral(val any) types.Literal {
return types.Literal{Value: val}
}

@ -13,12 +13,12 @@ import (
// NewSortMergePipeline returns a new pipeline that merges already sorted inputs into a single output.
func NewSortMergePipeline(inputs []Pipeline, order physical.SortOrder, column physical.ColumnExpression, evaluator expressionEvaluator) (*KWayMerge, error) {
var compare func(a, b uint64) bool
var compare func(a, b int64) bool
switch order {
case physical.ASC:
compare = func(a, b uint64) bool { return a <= b }
compare = func(a, b int64) bool { return a <= b }
case physical.DESC:
compare = func(a, b uint64) bool { return a >= b }
compare = func(a, b int64) bool { return a >= b }
default:
return nil, fmt.Errorf("invalid sort order %v", order)
}
@ -42,7 +42,7 @@ type KWayMerge struct {
exhausted []bool
offsets []int64
columnEval evalFunc
compare func(a, b uint64) bool
compare func(a, b int64) bool
}
var _ Pipeline = (*KWayMerge)(nil)
@ -92,7 +92,7 @@ func (p *KWayMerge) init() {
p.offsets = make([]int64, n)
if p.compare == nil {
p.compare = func(a, b uint64) bool { return a <= b }
p.compare = func(a, b int64) bool { return a <= b }
}
}
@ -106,7 +106,7 @@ func (p *KWayMerge) read() error {
p.state.batch.Release()
}
timestamps := make([]uint64, 0, len(p.inputs))
timestamps := make([]int64, 0, len(p.inputs))
batchIndexes := make([]int, 0, len(p.inputs))
for i := range len(p.inputs) {
@ -136,7 +136,7 @@ func (p *KWayMerge) read() error {
if err != nil {
return err
}
tsCol, ok := col.ToArray().(*array.Uint64)
tsCol, ok := col.ToArray().(*array.Int64)
if !ok {
return errors.New("column is not a timestamp column")
}
@ -182,7 +182,7 @@ func (p *KWayMerge) read() error {
return err
}
// We assume the column is a Uint64 array
tsCol, ok := col.ToArray().(*array.Uint64)
tsCol, ok := col.ToArray().(*array.Int64)
if !ok {
return errors.New("column is not a timestamp column")
}

@ -63,7 +63,7 @@ func TestSortMerge(t *testing.T) {
pipeline, err := NewSortMergePipeline(inputs, merge.Order, merge.Column, expressionEvaluator{})
require.NoError(t, err)
var lastTs uint64
var lastTs int64
var batches, rows int64
for {
err := pipeline.Read()
@ -77,7 +77,7 @@ func TestSortMerge(t *testing.T) {
tsCol, err := c.evaluator.eval(merge.Column, batch)
require.NoError(t, err)
arr := tsCol.ToArray().(*array.Uint64)
arr := tsCol.ToArray().(*array.Int64)
// Check if ts column is sorted
for i := 0; i < arr.Len()-1; i++ {
@ -115,7 +115,7 @@ func TestSortMerge(t *testing.T) {
pipeline, err := NewSortMergePipeline(inputs, merge.Order, merge.Column, expressionEvaluator{})
require.NoError(t, err)
var lastTs uint64 = math.MaxUint64
var lastTs int64 = math.MaxInt64
var batches, rows int64
for {
err := pipeline.Read()
@ -129,7 +129,7 @@ func TestSortMerge(t *testing.T) {
tsCol, err := c.evaluator.eval(merge.Column, batch)
require.NoError(t, err)
arr := tsCol.ToArray().(*array.Uint64)
arr := tsCol.ToArray().(*array.Int64)
// Check if ts column is sorted
for i := 0; i < arr.Len()-1; i++ {

@ -7,12 +7,15 @@ import (
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/grafana/loki/v3/pkg/engine/internal/datatype"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
)
var (
incrementingIntPipeline = newRecordGenerator(
arrow.NewSchema([]arrow.Field{
{Name: "id", Type: arrow.PrimitiveTypes.Int64},
{Name: "id", Type: arrow.PrimitiveTypes.Int64, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Integer)},
}, nil),
func(offset, sz int64, schema *arrow.Schema) arrow.Record {
@ -48,20 +51,20 @@ const (
func timestampPipeline(start time.Time, order time.Duration) *recordGenerator {
return newRecordGenerator(
arrow.NewSchema([]arrow.Field{
{Name: "id", Type: arrow.PrimitiveTypes.Int64},
{Name: "timestamp", Type: arrow.PrimitiveTypes.Uint64},
{Name: "id", Type: arrow.PrimitiveTypes.Int64, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Integer)},
{Name: "timestamp", Type: arrow.PrimitiveTypes.Int64, Metadata: datatype.ColumnMetadata(types.ColumnTypeBuiltin, datatype.Timestamp)},
}, nil),
func(offset, sz int64, schema *arrow.Schema) arrow.Record {
idColBuilder := array.NewInt64Builder(memory.DefaultAllocator)
defer idColBuilder.Release()
tsColBuilder := array.NewUint64Builder(memory.DefaultAllocator)
tsColBuilder := array.NewInt64Builder(memory.DefaultAllocator)
defer tsColBuilder.Release()
for i := int64(0); i < sz; i++ {
idColBuilder.Append(offset + i)
tsColBuilder.Append(uint64(start.Add(order * (time.Duration(offset)*time.Second + time.Duration(i)*time.Millisecond)).UnixNano()))
tsColBuilder.Append(start.Add(order * (time.Duration(offset)*time.Second + time.Duration(i)*time.Millisecond)).UnixNano())
}
idData := idColBuilder.NewArray()

@ -0,0 +1,67 @@
package datatype
import "github.com/apache/arrow-go/v18/arrow"
var (
LokiType = struct {
Null DataType
Bool DataType
String DataType
Integer DataType
Float DataType
Timestamp DataType
Duration DataType
Bytes DataType
}{
Null: Null,
Bool: Bool,
String: String,
Integer: Integer,
Float: Float,
Timestamp: Timestamp,
Duration: Duration,
Bytes: Bytes,
}
ArrowType = struct {
Null arrow.DataType
Bool arrow.DataType
String arrow.DataType
Integer arrow.DataType
Float arrow.DataType
Timestamp arrow.DataType
Duration arrow.DataType
Bytes arrow.DataType
}{
Null: arrow.Null,
Bool: arrow.FixedWidthTypes.Boolean,
String: arrow.BinaryTypes.String,
Integer: arrow.PrimitiveTypes.Int64,
Float: arrow.PrimitiveTypes.Float64,
Timestamp: arrow.PrimitiveTypes.Int64,
Duration: arrow.PrimitiveTypes.Int64,
Bytes: arrow.PrimitiveTypes.Int64,
}
ToArrow = map[DataType]arrow.DataType{
Null: ArrowType.Null,
Bool: ArrowType.Bool,
String: ArrowType.String,
Integer: ArrowType.Integer,
Float: ArrowType.Float,
Timestamp: ArrowType.Timestamp,
Duration: ArrowType.Duration,
Bytes: ArrowType.Bytes,
}
ToLoki = map[arrow.DataType]DataType{
ArrowType.Null: Null,
ArrowType.Bool: Bool,
ArrowType.String: String,
ArrowType.Integer: Integer,
ArrowType.Float: Float,
ArrowType.Timestamp: Timestamp,
ArrowType.Duration: Duration,
ArrowType.Bytes: Bytes,
}
)

@ -0,0 +1,240 @@
package datatype
import (
"fmt"
"strconv"
"time"
)
type NullLiteral struct {
}
// String implements Literal.
func (n *NullLiteral) String() string {
return "null"
}
// Type implements Literal.
func (n *NullLiteral) Type() DataType {
return Null
}
// Any implements Literal.
func (n *NullLiteral) Any() any {
return nil
}
func (n *NullLiteral) Value() any {
return nil
}
type BoolLiteral struct {
v bool
}
// String implements Literal.
func (b *BoolLiteral) String() string {
return strconv.FormatBool(b.v)
}
// Type implements Literal.
func (b *BoolLiteral) Type() DataType {
return Bool
}
// Any implements Literal.
func (b *BoolLiteral) Any() any {
return b.v
}
func (b *BoolLiteral) Value() bool {
return b.v
}
type StringLiteral struct {
v string
}
// String implements Literal.
func (s *StringLiteral) String() string {
return fmt.Sprintf(`"%s"`, s.v)
}
// Type implements Literal.
func (s *StringLiteral) Type() DataType {
return String
}
// Any implements Literal.
func (s *StringLiteral) Any() any {
return s.v
}
func (s *StringLiteral) Value() string {
return s.v
}
type IntegerLiteral struct {
v int64
}
// String implements Literal.
func (i *IntegerLiteral) String() string {
return strconv.FormatInt(i.v, 10)
}
// Type implements Literal.
func (i *IntegerLiteral) Type() DataType {
return Integer
}
// Any implements Literal.
func (i *IntegerLiteral) Any() any {
return i.v
}
func (i *IntegerLiteral) Value() int64 {
return i.v
}
type FloatLiteral struct {
v float64
}
// String implements Literal.
func (f *FloatLiteral) String() string {
return strconv.FormatFloat(f.v, 'f', -1, 64)
}
// Type implements Literal.
func (f *FloatLiteral) Type() DataType {
return Float
}
// Any implements Literal.
func (f *FloatLiteral) Any() any {
return f.v
}
func (f *FloatLiteral) Value() float64 {
return f.v
}
type TimestampLiteral struct {
v time.Time
}
// String implements Literal.
func (t *TimestampLiteral) String() string {
return t.v.UTC().Format(time.RFC3339Nano)
}
// Type implements Literal.
func (t *TimestampLiteral) Type() DataType {
return Timestamp
}
// Any implements Literal.
func (t *TimestampLiteral) Any() any {
return t.v.UTC()
}
func (t *TimestampLiteral) Value() time.Time {
return t.v
}
type DurationLiteral struct {
v time.Duration
}
// String implements Literal.
func (d *DurationLiteral) String() string {
return d.v.String()
}
// Type implements Literal.
func (d *DurationLiteral) Type() DataType {
return Duration
}
// Any implements Literal.
func (d *DurationLiteral) Any() any {
return d.v
}
func (d *DurationLiteral) Value() time.Duration {
return d.v
}
type BytesLiteral struct {
v int64
}
// String implements Literal.
func (b *BytesLiteral) String() string {
return fmt.Sprintf("%dB", b.v)
}
// Type implements Literal.
func (b *BytesLiteral) Type() DataType {
return Bytes
}
// Any implements Literal.
func (b *BytesLiteral) Any() any {
return b.v
}
func (b *BytesLiteral) Value() int64 {
return b.v
}
// Literal is holds a value of [any] typed as [DataType].
type Literal interface {
fmt.Stringer
Any() any
Type() DataType
}
var (
_ Literal = (*NullLiteral)(nil)
_ Literal = (*BoolLiteral)(nil)
_ Literal = (*StringLiteral)(nil)
_ Literal = (*IntegerLiteral)(nil)
_ Literal = (*FloatLiteral)(nil)
_ Literal = (*TimestampLiteral)(nil)
_ Literal = (*DurationLiteral)(nil)
_ Literal = (*BytesLiteral)(nil)
)
func NewNullLiteral() *NullLiteral {
return &NullLiteral{}
}
func NewBoolLiteral(v bool) *BoolLiteral {
return &BoolLiteral{v: v}
}
func NewStringLiteral(v string) *StringLiteral {
return &StringLiteral{v: v}
}
func NewIntegerLiteral(v int64) *IntegerLiteral {
return &IntegerLiteral{v: v}
}
func NewFloatLiteral(v float64) *FloatLiteral {
return &FloatLiteral{v: v}
}
func NewTimestampLiteral(v time.Time) *TimestampLiteral {
return &TimestampLiteral{v: v}
}
func NewDurationLiteral(v time.Duration) *DurationLiteral {
return &DurationLiteral{v: v}
}
func NewBytesLiteral(v int64) *BytesLiteral {
return &BytesLiteral{v: v}
}

@ -0,0 +1,120 @@
package datatype
import (
"fmt"
"github.com/apache/arrow-go/v18/arrow"
)
type Type uint8
const (
NULL = Type(arrow.NULL)
BOOL = Type(arrow.BOOL)
STRING = Type(arrow.STRING)
INT64 = Type(arrow.INT64)
FLOAT64 = Type(arrow.FLOAT64)
)
func (t Type) String() string {
switch t {
case NULL:
return "NULL"
case BOOL:
return "BOOL"
case STRING:
return "STRING"
case INT64:
return "INT64"
case FLOAT64:
return "FLOAT64"
default:
return "INVALID"
}
}
type DataType interface {
fmt.Stringer
ID() Type
ArrowType() arrow.DataType
}
var (
Null DataType = tNull{}
Bool DataType = tBool{}
String DataType = tString{}
Integer DataType = tInteger{}
Float DataType = tFloat{}
Timestamp DataType = tTimestamp{}
Duration DataType = tDuration{}
Bytes DataType = tBytes{}
)
type tNull struct{}
func (tNull) ID() Type { return NULL }
func (tNull) String() string { return "null" }
func (tNull) ArrowType() arrow.DataType { return ArrowType.Null }
type tBool struct{}
func (tBool) ID() Type { return BOOL }
func (tBool) String() string { return "bool" }
func (tBool) ArrowType() arrow.DataType { return ArrowType.Bool }
type tString struct{}
func (tString) ID() Type { return STRING }
func (tString) String() string { return "string" }
func (tString) ArrowType() arrow.DataType { return ArrowType.String }
type tInteger struct{}
func (tInteger) ID() Type { return INT64 }
func (tInteger) String() string { return "integer" }
func (tInteger) ArrowType() arrow.DataType { return ArrowType.Integer }
type tFloat struct{}
func (tFloat) ID() Type { return FLOAT64 }
func (tFloat) String() string { return "float" }
func (tFloat) ArrowType() arrow.DataType { return ArrowType.Float }
type tTimestamp struct{}
func (tTimestamp) ID() Type { return INT64 }
func (tTimestamp) String() string { return "timestamp" }
func (tTimestamp) ArrowType() arrow.DataType { return ArrowType.Integer }
type tDuration struct{}
func (tDuration) ID() Type { return INT64 }
func (tDuration) String() string { return "duration" }
func (tDuration) ArrowType() arrow.DataType { return ArrowType.Integer }
type tBytes struct{}
func (tBytes) ID() Type { return INT64 }
func (tBytes) String() string { return "bytes" }
func (tBytes) ArrowType() arrow.DataType { return ArrowType.Integer }
var (
names = map[string]DataType{
Null.String(): Null,
Bool.String(): Bool,
String.String(): String,
Integer.String(): Integer,
Float.String(): Float,
Timestamp.String(): Timestamp,
Duration.String(): Duration,
Bytes.String(): Bytes,
}
)
func FromString(dt string) DataType {
ty, ok := names[dt]
if !ok {
panic("invalid data type name")
}
return ty
}

@ -0,0 +1,19 @@
package datatype
import (
"github.com/apache/arrow-go/v18/arrow"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
)
var (
ColumnMetadataBuiltinLine = ColumnMetadata(types.ColumnTypeBuiltin, String)
ColumnMetadataBuiltinTimestamp = ColumnMetadata(types.ColumnTypeBuiltin, Timestamp)
)
func ColumnMetadata(ct types.ColumnType, dt DataType) arrow.Metadata {
return arrow.NewMetadata(
[]string{types.MetadataKeyColumnType, types.MetadataKeyColumnDataType},
[]string{ct.String(), dt.String()},
)
}

@ -1,6 +1,8 @@
package types
import "fmt"
import (
"fmt"
)
// ColumnType denotes the column type for a [ColumnRef].
type ColumnType int
@ -21,26 +23,32 @@ const (
const (
ColumnNameBuiltinTimestamp = "timestamp"
ColumnNameBuiltinLine = "line"
ColumnTypeMetadataKey = "column_type"
MetadataKeyColumnType = "column_type"
MetadataKeyColumnDataType = "column_datatype"
)
var ctNames = [6]string{"invalid", "builtin", "label", "metadata", "parsed", "ambiguous"}
// String returns a human-readable representation of the column type.
func (ct ColumnType) String() string {
return ctNames[ct]
}
// ColumnTypeFromString returns the [ColumnType] from its string representation.
func ColumnTypeFromString(ct string) ColumnType {
switch ct {
case ColumnTypeInvalid:
return typeInvalid
case ColumnTypeBuiltin:
return "builtin"
case ColumnTypeLabel:
return "label"
case ColumnTypeMetadata:
return "metadata"
case ColumnTypeParsed:
return "parsed"
case ColumnTypeAmbiguous:
return "ambiguous"
case ctNames[1]:
return ColumnTypeBuiltin
case ctNames[2]:
return ColumnTypeLabel
case ctNames[3]:
return ColumnTypeMetadata
case ctNames[4]:
return ColumnTypeParsed
case ctNames[5]:
return ColumnTypeAmbiguous
default:
return fmt.Sprintf("ColumnType(%d)", ct)
panic(fmt.Sprintf("invalid column type: %s", ct))
}
}

@ -1,135 +0,0 @@
package types
import (
"fmt"
"strconv"
)
// Literal is holds a value of [ValueType].
type Literal struct {
Value any
}
// String returns the string representation of the literal value.
func (l *Literal) String() string {
switch v := l.Value.(type) {
case nil:
return "NULL"
case bool:
return strconv.FormatBool(v)
case string:
return fmt.Sprintf(`"%s"`, v)
case float64:
return strconv.FormatFloat(v, 'f', -1, 64)
case int64:
return strconv.FormatInt(v, 10)
case uint64:
return strconv.FormatUint(v, 10)
case []byte:
return fmt.Sprintf("%v", v)
default:
return "invalid"
}
}
// ValueType returns the kind of value represented by the literal.
func (l *Literal) ValueType() ValueType {
switch l.Value.(type) {
case nil:
return ValueTypeNull
case bool:
return ValueTypeBool
case string:
return ValueTypeStr
case float64:
return ValueTypeFloat
case int64:
return ValueTypeInt
case uint64:
return ValueTypeTimestamp
case []byte:
return ValueTypeByteArray
default:
return ValueTypeInvalid
}
}
// IsNull returns true if lit is a [ValueTypeNull] value.
func (l Literal) IsNull() bool {
return l.ValueType() == ValueTypeNull
}
// Str returns the value as a string. It panics if lit is not a [ValueTypeString].
func (l Literal) Str() string {
if expect, actual := ValueTypeStr, l.ValueType(); expect != actual {
panic(fmt.Sprintf("literal type is %s, not %s", actual, expect))
}
return l.Value.(string)
}
// Int64 returns the value as an int64. It panics if lit is not a [ValueTypeFloat].
func (l Literal) Float() float64 {
if expect, actual := ValueTypeFloat, l.ValueType(); expect != actual {
panic(fmt.Sprintf("literal type is %s, not %s", actual, expect))
}
return l.Value.(float64)
}
// Int returns the value as an int64. It panics if lit is not a [ValueTypeInt].
func (l Literal) Int() int64 {
if expect, actual := ValueTypeInt, l.ValueType(); expect != actual {
panic(fmt.Sprintf("literal type is %s, not %s", actual, expect))
}
return l.Value.(int64)
}
// Timestamp returns the value as a uint64. It panics if lit is not a [ValueTypeTimestamp].
func (l Literal) Timestamp() uint64 {
if expect, actual := ValueTypeTimestamp, l.ValueType(); expect != actual {
panic(fmt.Sprintf("literal type is %s, not %s", actual, expect))
}
return l.Value.(uint64)
}
// ByteArray returns the value as a byte slice. It panics if lit is not a [ValueTypeByteArray].
func (l Literal) ByteArray() []byte {
if expect, actual := ValueTypeByteArray, l.ValueType(); expect != actual {
panic(fmt.Sprintf("literal type is %s, not %s", actual, expect))
}
return l.Value.([]byte)
}
// Convenience function for creating a NULL literal.
func NullLiteral() Literal {
return Literal{Value: nil}
}
// Convenience function for creating a bool literal.
func BoolLiteral(v bool) Literal {
return Literal{Value: v}
}
// Convenience function for creating a string literal.
func StringLiteral(v string) Literal {
return Literal{Value: v}
}
// Convenience function for creating a float literal.
func FloatLiteral(v float64) Literal {
return Literal{Value: v}
}
// Convenience function for creating a timestamp literal.
func TimestampLiteral(v uint64) Literal {
return Literal{Value: v}
}
// Convenience function for creating an integer literal.
func IntLiteral(v int64) Literal {
return Literal{Value: v}
}
// Convenience function for creating a byte array literal.
func ByteArrayLiteral(v []byte) Literal {
return Literal{Value: v}
}

@ -1,83 +0,0 @@
package types
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestLiteralRepresentation(t *testing.T) {
testCases := []struct {
name string
literal Literal
expected string
}{
{
name: "null literal",
literal: NullLiteral(),
expected: "NULL",
},
{
name: "bool literal - true",
literal: BoolLiteral(true),
expected: "true",
},
{
name: "bool literal - false",
literal: BoolLiteral(false),
expected: "false",
},
{
name: "string literal",
literal: StringLiteral("test"),
expected: `"test"`,
},
{
name: "string literal with quotes",
literal: StringLiteral(`test "quoted" string`),
expected: `"test "quoted" string"`,
},
{
name: "float literal - integer value",
literal: FloatLiteral(42.0),
expected: "42",
},
{
name: "float literal - decimal value",
literal: FloatLiteral(3.14159),
expected: "3.14159",
},
{
name: "int literal - positive",
literal: IntLiteral(123),
expected: "123",
},
{
name: "int literal - negative",
literal: IntLiteral(-456),
expected: "-456",
},
{
name: "timestamp literal",
literal: TimestampLiteral(1625097600000),
expected: "1625097600000",
},
{
name: "byte array literal",
literal: ByteArrayLiteral([]byte("hello")),
expected: "[104 101 108 108 111]",
},
{
name: "invalid literal",
literal: Literal{Value: make(chan int)}, // channels are not supported types
expected: "invalid",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := tc.literal.String()
require.Equal(t, tc.expected, result)
})
}
}

@ -17,8 +17,6 @@ const (
// String returns the string representation of the UnaryOp.
func (t UnaryOp) String() string {
switch t {
case UnaryOpInvalid:
return typeInvalid
case UnaryOpNot:
return "NOT"
case UnaryOpAbs:
@ -65,8 +63,6 @@ const (
// String returns a human-readable representation of the binary operation kind.
func (t BinaryOp) String() string {
switch t {
case BinaryOpInvalid:
return typeInvalid
case BinaryOpEq:
return "EQ"
case BinaryOpNeq:

@ -1,47 +0,0 @@
package types
const (
typeInvalid = "invalid"
)
// ValueType represents the type of a value, which can either be a literal value, or a column value.
type ValueType uint32
const (
ValueTypeInvalid ValueType = iota // zero-value is an invalid type
ValueTypeNull // NULL value.
ValueTypeBool // Boolean value
ValueTypeFloat // 64bit floating point value
ValueTypeInt // Signed 64bit integer value
ValueTypeTimestamp // Unsigned 64bit integer value (nanosecond timestamp)
ValueTypeStr // String value
ValueTypeByteArray // Byte-slice value
// ValueTypeBytes
// ValueTypeDate
// ValueTypeDuration
)
// String returns the string representation of the LiteralKind.
func (t ValueType) String() string {
switch t {
case ValueTypeInvalid:
return typeInvalid
case ValueTypeNull:
return "null"
case ValueTypeBool:
return "bool"
case ValueTypeFloat:
return "float"
case ValueTypeInt:
return "int"
case ValueTypeTimestamp:
return "timestamp"
case ValueTypeStr:
return "string"
case ValueTypeByteArray:
return "[]byte"
default:
return typeInvalid
}
}

@ -32,7 +32,7 @@ func TestFormatSimpleQuery(t *testing.T) {
).Select(
&BinOp{
Left: NewColumnRef("age", types.ColumnTypeMetadata),
Right: NewLiteral[int64](21),
Right: NewLiteral(21),
Op: types.BinaryOpGt,
},
)
@ -50,7 +50,7 @@ func TestFormatSimpleQuery(t *testing.T) {
SELECT <%4> table=%2 predicate=%3
BinOp <%3> op=GT left=metadata.age right=21
ColumnRef column=age type=metadata
Literal value=21 kind=int
Literal value=21 kind=integer
MAKETABLE <%2> selector=EQ label.app "users"
BinOp <%1> op=EQ left=label.app right="users"
ColumnRef column=app type=label
@ -75,7 +75,7 @@ func TestFormatSortQuery(t *testing.T) {
).Select(
&BinOp{
Left: NewColumnRef("age", types.ColumnTypeMetadata),
Right: NewLiteral[int64](21),
Right: NewLiteral(21),
Op: types.BinaryOpGt,
},
).Sort(*NewColumnRef("age", types.ColumnTypeMetadata), true, false)
@ -95,7 +95,7 @@ SORT <%5> table=%4 column=metadata.age direction=asc nulls=last
SELECT <%4> table=%2 predicate=%3
BinOp <%3> op=GT left=metadata.age right=21
ColumnRef column=age type=metadata
Literal value=21 kind=int
Literal value=21 kind=integer
MAKETABLE <%2> selector=EQ label.app "users"
BinOp <%1> op=EQ left=label.app right="users"
ColumnRef column=app type=label

@ -25,7 +25,7 @@ func TestPlan_String(t *testing.T) {
).Select(
&BinOp{
Left: NewColumnRef("age", types.ColumnTypeMetadata),
Right: NewLiteral[int64](21),
Right: NewLiteral(21),
Op: types.BinaryOpGt,
},
).Sort(*NewColumnRef("age", types.ColumnTypeMetadata), true, false)

@ -1,7 +1,9 @@
package logical
import (
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"time"
"github.com/grafana/loki/v3/pkg/engine/internal/datatype"
"github.com/grafana/loki/v3/pkg/engine/planner/schema"
)
@ -10,36 +12,56 @@ import (
//
// The zero value of a Literal is a NULL value.
type Literal struct {
val types.Literal
datatype.Literal
}
var _ Value = (*Literal)(nil)
func NewLiteral[T any](v T) *Literal {
return &Literal{
val: types.Literal{Value: v},
func NewLiteral(v any) *Literal {
if v == nil {
return &Literal{Literal: datatype.NewNullLiteral()}
}
switch casted := v.(type) {
case bool:
return &Literal{Literal: datatype.NewBoolLiteral(casted)}
case string:
// TODO(chaudum): Try parsing bytes/timestamp/duration
return &Literal{Literal: datatype.NewStringLiteral(casted)}
case int:
return &Literal{Literal: datatype.NewIntegerLiteral(int64(casted))}
case int64:
return &Literal{Literal: datatype.NewIntegerLiteral(casted)}
case float64:
return &Literal{Literal: datatype.NewFloatLiteral(casted)}
case time.Time:
return &Literal{Literal: datatype.NewTimestampLiteral(casted)}
case time.Duration:
return &Literal{Literal: datatype.NewDurationLiteral(casted)}
default:
return &Literal{Literal: datatype.NewNullLiteral()}
}
}
// Kind returns the kind of value represented by the literal.
func (l Literal) Kind() types.ValueType {
return l.val.ValueType()
func (l Literal) Kind() datatype.DataType {
return l.Literal.Type()
}
// Name returns the string form of the literal.
func (l Literal) Name() string {
return l.val.String()
return l.Literal.String()
}
// String returns a printable form of the literal, even if lit is not a
// [ValueTypeString].
func (l Literal) String() string {
return l.val.String()
return l.Literal.String()
}
// Value returns lit's value as untyped interface{}.
func (l Literal) Value() any {
return l.val.Value
return l.Literal.Any()
}
func (l *Literal) Schema() *schema.Schema {

@ -3,6 +3,7 @@ package logical
import (
"errors"
"fmt"
"time"
"github.com/prometheus/prometheus/model/labels"
@ -73,8 +74,8 @@ func BuildPlan(query logql.Params) (*Plan, error) {
builder = builder.Sort(*timestampColumnRef(), ascending, false)
// SELECT -> Filter
start := query.Start().UnixNano()
end := query.End().UnixNano()
start := query.Start()
end := query.End()
for _, value := range convertQueryRangeToPredicates(start, end) {
builder = builder.Select(value)
}
@ -235,16 +236,16 @@ func convertLabelFilter(expr log.LabelFilterer) (Value, error) {
return nil, fmt.Errorf("invalid label filter %T", expr)
}
func convertQueryRangeToPredicates(start, end int64) []*BinOp {
func convertQueryRangeToPredicates(start, end time.Time) []*BinOp {
return []*BinOp{
{
Left: timestampColumnRef(),
Right: NewLiteral(uint64(start)),
Right: NewLiteral(start),
Op: types.BinaryOpGte,
},
{
Left: timestampColumnRef(),
Right: NewLiteral(uint64(end)),
Right: NewLiteral(end),
Op: types.BinaryOpLt,
},
}

@ -27,12 +27,12 @@ func (q *query) Direction() logproto.Direction {
// End implements logql.Params.
func (q *query) End() time.Time {
return time.Unix(0, q.end)
return time.Unix(q.end, 0)
}
// Start implements logql.Params.
func (q *query) Start() time.Time {
return time.Unix(0, q.start)
return time.Unix(q.start, 0)
}
// Limit implements logql.Params.
@ -80,8 +80,8 @@ var _ logql.Params = (*query)(nil)
func TestConvertAST_Success(t *testing.T) {
q := &query{
statement: `{cluster="prod", namespace=~"loki-.*"} | foo="bar" or bar="baz" |= "metric.go" |= "foo" or "bar" !~ "(a|b|c)" `,
start: 1000,
end: 2000,
start: 3600,
end: 7200,
direction: logproto.FORWARD,
limit: 1000,
}
@ -94,9 +94,9 @@ func TestConvertAST_Success(t *testing.T) {
%3 = AND %1 %2
%4 = MAKETABLE [selector=%3]
%5 = SORT %4 [column=builtin.timestamp, asc=true, nulls_first=false]
%6 = GTE builtin.timestamp 1000
%6 = GTE builtin.timestamp 1970-01-01T01:00:00Z
%7 = SELECT %5 [predicate=%6]
%8 = LT builtin.timestamp 2000
%8 = LT builtin.timestamp 1970-01-01T02:00:00Z
%9 = SELECT %7 [predicate=%8]
%10 = MATCH_STR ambiguous.foo "bar"
%11 = MATCH_STR ambiguous.bar "baz"

@ -9,6 +9,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
"github.com/grafana/loki/v3/pkg/engine/internal/datatype"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
)
@ -98,7 +99,7 @@ func expressionToMatchers(selector Expression) ([]*labels.Matcher, error) {
if err != nil {
return nil, err
}
value, err := convertLiteral(expr.Right)
value, err := convertLiteralToString(expr.Right)
if err != nil {
return nil, err
}
@ -115,15 +116,15 @@ func expressionToMatchers(selector Expression) ([]*labels.Matcher, error) {
}
}
func convertLiteral(expr Expression) (string, error) {
func convertLiteralToString(expr Expression) (string, error) {
l, ok := expr.(*LiteralExpr)
if !ok {
return "", fmt.Errorf("expected literal expression, got %T", expr)
}
if l.ValueType() != types.ValueTypeStr {
if l.ValueType() != datatype.String {
return "", fmt.Errorf("literal type is not a string, got %v", l.ValueType())
}
return l.Value.Str(), nil
return l.Any().(string), nil
}
func convertColumnRef(expr Expression) (string, error) {

@ -2,6 +2,7 @@ package physical
import (
"testing"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
@ -24,11 +25,15 @@ func TestContext_ConvertLiteral(t *testing.T) {
wantErr: true,
},
{
expr: NewLiteral(int64(123)),
expr: NewLiteral(123),
wantErr: true,
},
{
expr: NewLiteral(uint64(123456789)),
expr: NewLiteral(time.Now()),
wantErr: true,
},
{
expr: NewLiteral(time.Hour),
wantErr: true,
},
{
@ -46,7 +51,7 @@ func TestContext_ConvertLiteral(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.expr.String(), func(t *testing.T) {
got, err := convertLiteral(tt.expr)
got, err := convertLiteralToString(tt.expr)
if tt.wantErr {
require.Error(t, err)
t.Log(err)

@ -2,7 +2,9 @@ package physical
import (
"fmt"
"time"
"github.com/grafana/loki/v3/pkg/engine/internal/datatype"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
)
@ -59,7 +61,7 @@ type BinaryExpression interface {
// physical plan.
type LiteralExpression interface {
Expression
ValueType() types.ValueType
ValueType() datatype.DataType
isLiteralExpr()
}
@ -110,7 +112,7 @@ func (*BinaryExpr) Type() ExpressionType {
// LiteralExpr is an expression that implements the [LiteralExpression] interface.
type LiteralExpr struct {
Value types.Literal
datatype.Literal
}
func (*LiteralExpr) isExpr() {}
@ -118,7 +120,7 @@ func (*LiteralExpr) isLiteralExpr() {}
// String returns the string representation of the literal value.
func (e *LiteralExpr) String() string {
return e.Value.String()
return e.Literal.String()
}
// ID returns the type of the [LiteralExpr].
@ -127,13 +129,33 @@ func (*LiteralExpr) Type() ExpressionType {
}
// ValueType returns the kind of value represented by the literal.
func (e *LiteralExpr) ValueType() types.ValueType {
return e.Value.ValueType()
func (e *LiteralExpr) ValueType() datatype.DataType {
return e.Literal.Type()
}
func NewLiteral(value any) *LiteralExpr {
return &LiteralExpr{
Value: types.Literal{Value: value},
if value == nil {
return &LiteralExpr{Literal: datatype.NewNullLiteral()}
}
switch casted := value.(type) {
case bool:
return &LiteralExpr{Literal: datatype.NewBoolLiteral(casted)}
case string:
// TODO(chaudum): Try parsing bytes/timestamp/duration
return &LiteralExpr{Literal: datatype.NewStringLiteral(casted)}
case int:
return &LiteralExpr{Literal: datatype.NewIntegerLiteral(int64(casted))}
case int64:
return &LiteralExpr{Literal: datatype.NewIntegerLiteral(casted)}
case float64:
return &LiteralExpr{Literal: datatype.NewFloatLiteral(casted)}
case time.Time:
return &LiteralExpr{Literal: datatype.NewTimestampLiteral(casted)}
case time.Duration:
return &LiteralExpr{Literal: datatype.NewDurationLiteral(casted)}
default:
panic(fmt.Sprintf("invalid literal value type %T", value))
}
}

@ -2,9 +2,11 @@ package physical
import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/engine/internal/datatype"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
)
@ -18,7 +20,7 @@ func TestExpressionTypes(t *testing.T) {
name: "UnaryExpression",
expr: &UnaryExpr{
Op: types.UnaryOpNot,
Left: &LiteralExpr{Value: types.BoolLiteral(true)},
Left: NewLiteral(true),
},
expected: ExprTypeUnary,
},
@ -27,13 +29,13 @@ func TestExpressionTypes(t *testing.T) {
expr: &BinaryExpr{
Op: types.BinaryOpEq,
Left: &ColumnExpr{Ref: types.ColumnRef{Column: "col", Type: types.ColumnTypeBuiltin}},
Right: &LiteralExpr{Value: types.StringLiteral("foo")},
Right: NewLiteral("foo"),
},
expected: ExprTypeBinary,
},
{
name: "LiteralExpression",
expr: &LiteralExpr{Value: types.StringLiteral("col")},
expr: NewLiteral("col"),
expected: ExprTypeLiteral,
},
{
@ -58,7 +60,7 @@ func TestLiteralExpr(t *testing.T) {
require.Equal(t, ExprTypeLiteral, expr.Type())
literal, ok := expr.(LiteralExpression)
require.True(t, ok)
require.Equal(t, types.ValueTypeBool, literal.ValueType())
require.Equal(t, datatype.Bool, literal.ValueType())
})
t.Run("float", func(t *testing.T) {
@ -66,7 +68,7 @@ func TestLiteralExpr(t *testing.T) {
require.Equal(t, ExprTypeLiteral, expr.Type())
literal, ok := expr.(LiteralExpression)
require.True(t, ok)
require.Equal(t, types.ValueTypeFloat, literal.ValueType())
require.Equal(t, datatype.Float, literal.ValueType())
})
t.Run("integer", func(t *testing.T) {
@ -74,15 +76,23 @@ func TestLiteralExpr(t *testing.T) {
require.Equal(t, ExprTypeLiteral, expr.Type())
literal, ok := expr.(LiteralExpression)
require.True(t, ok)
require.Equal(t, types.ValueTypeInt, literal.ValueType())
require.Equal(t, datatype.Integer, literal.ValueType())
})
t.Run("timestamp", func(t *testing.T) {
var expr Expression = NewLiteral(uint64(1741882435000000000))
var expr Expression = NewLiteral(time.Unix(0, 1741882435000000000))
require.Equal(t, ExprTypeLiteral, expr.Type())
literal, ok := expr.(LiteralExpression)
require.True(t, ok)
require.Equal(t, types.ValueTypeTimestamp, literal.ValueType())
require.Equal(t, datatype.Timestamp, literal.ValueType())
})
t.Run("duration", func(t *testing.T) {
var expr Expression = NewLiteral(time.Hour)
require.Equal(t, ExprTypeLiteral, expr.Type())
literal, ok := expr.(LiteralExpression)
require.True(t, ok)
require.Equal(t, datatype.Duration, literal.ValueType())
})
t.Run("string", func(t *testing.T) {
@ -90,6 +100,6 @@ func TestLiteralExpr(t *testing.T) {
require.Equal(t, ExprTypeLiteral, expr.Type())
literal, ok := expr.(LiteralExpression)
require.True(t, ok)
require.Equal(t, types.ValueTypeStr, literal.ValueType())
require.Equal(t, datatype.String, literal.ValueType())
})
}

@ -2,6 +2,7 @@ package physical
import (
"testing"
"time"
"github.com/stretchr/testify/require"
@ -28,7 +29,7 @@ func TestCanApplyPredicate(t *testing.T) {
{
predicate: &BinaryExpr{
Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin),
Right: NewLiteral(uint64(1743424636000000000)),
Right: NewLiteral(time.Now()),
Op: types.BinaryOpGt,
},
want: true,
@ -58,14 +59,14 @@ func dummyPlan() *Plan {
filter1 := plan.addNode(&Filter{id: "filter1", Predicates: []Expression{
&BinaryExpr{
Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin),
Right: NewLiteral(uint64(1000000000)),
Right: NewLiteral(time.Unix(0, 1000000000)),
Op: types.BinaryOpGt,
},
}})
filter2 := plan.addNode(&Filter{id: "filter2", Predicates: []Expression{
&BinaryExpr{
Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin),
Right: NewLiteral(uint64(2000000000)),
Right: NewLiteral(time.Unix(0, 2000000000)),
Op: types.BinaryOpLte,
},
}})
@ -111,24 +112,24 @@ func TestOptimizer(t *testing.T) {
scan1 := optimized.addNode(&DataObjScan{id: "scan1", Predicates: []Expression{
&BinaryExpr{
Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin),
Right: NewLiteral(uint64(1000000000)),
Right: NewLiteral(time.Unix(0, 1000000000)),
Op: types.BinaryOpGt,
},
&BinaryExpr{
Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin),
Right: NewLiteral(uint64(2000000000)),
Right: NewLiteral(time.Unix(0, 2000000000)),
Op: types.BinaryOpLte,
},
}})
scan2 := optimized.addNode(&DataObjScan{id: "scan2", Predicates: []Expression{
&BinaryExpr{
Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin),
Right: NewLiteral(uint64(1000000000)),
Right: NewLiteral(time.Unix(0, 1000000000)),
Op: types.BinaryOpGt,
},
&BinaryExpr{
Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin),
Right: NewLiteral(uint64(2000000000)),
Right: NewLiteral(time.Unix(0, 2000000000)),
Op: types.BinaryOpLte,
},
}})
@ -166,14 +167,14 @@ func TestOptimizer(t *testing.T) {
filter1 := optimized.addNode(&Filter{id: "filter1", Predicates: []Expression{
&BinaryExpr{
Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin),
Right: NewLiteral(uint64(1000000000)),
Right: NewLiteral(time.Unix(0, 1000000000)),
Op: types.BinaryOpGt,
},
}})
filter2 := optimized.addNode(&Filter{id: "filter2", Predicates: []Expression{
&BinaryExpr{
Left: newColumnExpr("timestamp", types.ColumnTypeBuiltin),
Right: NewLiteral(uint64(2000000000)),
Right: NewLiteral(time.Unix(0, 2000000000)),
Op: types.BinaryOpLte,
},
}})

@ -2,6 +2,7 @@ package physical
import (
"testing"
"time"
"github.com/stretchr/testify/require"
@ -50,7 +51,7 @@ func TestPlanner_Convert(t *testing.T) {
).Select(
&logical.BinOp{
Left: logical.NewColumnRef("timestamp", types.ColumnTypeBuiltin),
Right: logical.NewLiteral(uint64(1742826126000000000)),
Right: logical.NewLiteral(time.Unix(0, 1742826126000000000)),
Op: types.BinaryOpLt,
},
).Limit(0, 1000)

Loading…
Cancel
Save