mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
256 lines
6.6 KiB
256 lines
6.6 KiB
package executor
|
|
|
|
import (
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/apache/arrow-go/v18/arrow"
|
|
"github.com/apache/arrow-go/v18/arrow/array"
|
|
"github.com/apache/arrow-go/v18/arrow/memory"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/errors"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/types"
|
|
"github.com/grafana/loki/v3/pkg/engine/planner/physical"
|
|
)
|
|
|
|
var (
|
|
fields = []arrow.Field{
|
|
{Name: "name", Type: arrow.BinaryTypes.String},
|
|
{Name: "timestamp", Type: arrow.PrimitiveTypes.Uint64},
|
|
{Name: "value", Type: arrow.PrimitiveTypes.Float64},
|
|
{Name: "valid", Type: arrow.FixedWidthTypes.Boolean},
|
|
}
|
|
sampledata = `Alice,1745487598764058205,0.2586284611568047,false
|
|
Bob,1745487598764058305,0.7823145698741236,true
|
|
Charlie,1745487598764058405,0.3451289756123478,false
|
|
David,1745487598764058505,0.9217834561278945,true
|
|
Eve,1745487598764058605,0.1245789632145789,false
|
|
Frank,1745487598764058705,0.5678912345678912,true
|
|
Grace,1745487598764058805,0.8912345678912345,false
|
|
Hannah,1745487598764058905,0.2345678912345678,true
|
|
Ian,1745487598764059005,0.6789123456789123,false
|
|
Julia,1745487598764059105,0.4123456789123456,true`
|
|
)
|
|
|
|
func TestEvaluateLiteralExpression(t *testing.T) {
|
|
for _, tt := range []struct {
|
|
name string
|
|
value any
|
|
arrowType arrow.Type
|
|
}{
|
|
{
|
|
name: "null",
|
|
value: nil,
|
|
arrowType: arrow.NULL,
|
|
},
|
|
{
|
|
name: "bool",
|
|
value: true,
|
|
arrowType: arrow.BOOL,
|
|
},
|
|
{
|
|
name: "str",
|
|
value: "loki",
|
|
arrowType: arrow.STRING,
|
|
},
|
|
{
|
|
name: "int",
|
|
value: int64(123456789),
|
|
arrowType: arrow.INT64,
|
|
},
|
|
{
|
|
name: "float",
|
|
value: float64(123.456789),
|
|
arrowType: arrow.FLOAT64,
|
|
},
|
|
{
|
|
name: "timestamp",
|
|
value: uint64(1744612881740032450),
|
|
arrowType: arrow.UINT64,
|
|
},
|
|
} {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
literal := physical.NewLiteral(tt.value)
|
|
e := expressionEvaluator{}
|
|
|
|
n := len(words)
|
|
rec := batch(n, time.Now())
|
|
colVec, err := e.eval(literal, rec)
|
|
require.NoError(t, err)
|
|
require.Equalf(t, tt.arrowType, colVec.Type().ID(), "expected: %v got: %v", tt.arrowType.String(), colVec.Type().ID().String())
|
|
|
|
for i := range n {
|
|
val := colVec.Value(i)
|
|
require.Equal(t, tt.value, val)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestEvaluateColumnExpression(t *testing.T) {
|
|
e := expressionEvaluator{}
|
|
|
|
t.Run("invalid", func(t *testing.T) {
|
|
colExpr := &physical.ColumnExpr{
|
|
Ref: types.ColumnRef{
|
|
Column: "does_not_exist",
|
|
Type: types.ColumnTypeBuiltin,
|
|
},
|
|
}
|
|
|
|
n := len(words)
|
|
rec := batch(n, time.Now())
|
|
_, err := e.eval(colExpr, rec)
|
|
require.ErrorContains(t, err, errors.ErrKey.Error())
|
|
})
|
|
|
|
t.Run("string(log)", func(t *testing.T) {
|
|
colExpr := &physical.ColumnExpr{
|
|
Ref: types.ColumnRef{
|
|
Column: "log",
|
|
Type: types.ColumnTypeBuiltin,
|
|
},
|
|
}
|
|
|
|
n := len(words)
|
|
rec := batch(n, time.Now())
|
|
colVec, err := e.eval(colExpr, rec)
|
|
require.NoError(t, err)
|
|
require.Equal(t, arrow.STRING, colVec.Type().ID())
|
|
|
|
for i := range n {
|
|
val := colVec.Value(i)
|
|
require.Equal(t, words[i%len(words)], val)
|
|
}
|
|
})
|
|
}
|
|
|
|
func TestEvaluateBinaryExpression(t *testing.T) {
|
|
rec, err := CSVToArrow(fields, sampledata)
|
|
require.NoError(t, err)
|
|
defer rec.Release()
|
|
|
|
e := expressionEvaluator{}
|
|
|
|
t.Run("error if types do not match", func(t *testing.T) {
|
|
expr := &physical.BinaryExpr{
|
|
Left: &physical.ColumnExpr{
|
|
Ref: types.ColumnRef{Column: "name", Type: types.ColumnTypeBuiltin},
|
|
},
|
|
Right: &physical.ColumnExpr{
|
|
Ref: types.ColumnRef{Column: "timestamp", Type: types.ColumnTypeBuiltin},
|
|
},
|
|
Op: types.BinaryOpEq,
|
|
}
|
|
|
|
_, err := e.eval(expr, rec)
|
|
require.ErrorContains(t, err, "failed to lookup binary function for signature EQ(utf8,uint64): types do not match")
|
|
})
|
|
|
|
t.Run("error if function for signature is not registered", func(t *testing.T) {
|
|
expr := &physical.BinaryExpr{
|
|
Left: &physical.ColumnExpr{
|
|
Ref: types.ColumnRef{Column: "name", Type: types.ColumnTypeBuiltin},
|
|
},
|
|
Right: &physical.ColumnExpr{
|
|
Ref: types.ColumnRef{Column: "name", Type: types.ColumnTypeBuiltin},
|
|
},
|
|
Op: types.BinaryOpInvalid,
|
|
}
|
|
|
|
_, err := e.eval(expr, rec)
|
|
require.ErrorContains(t, err, "failed to lookup binary function for signature invalid(utf8,utf8): not implemented")
|
|
})
|
|
|
|
t.Run("EQ(string,string)", func(t *testing.T) {
|
|
expr := &physical.BinaryExpr{
|
|
Left: &physical.ColumnExpr{
|
|
Ref: types.ColumnRef{Column: "name", Type: types.ColumnTypeBuiltin},
|
|
},
|
|
Right: &physical.LiteralExpr{
|
|
Value: types.Literal{Value: "Charlie"},
|
|
},
|
|
Op: types.BinaryOpEq,
|
|
}
|
|
|
|
res, err := e.eval(expr, rec)
|
|
require.NoError(t, err)
|
|
result := collectBooleanColumnVector(res)
|
|
require.Equal(t, []bool{false, false, true, false, false, false, false, false, false, false}, result)
|
|
})
|
|
|
|
t.Run("GT(float,float)", func(t *testing.T) {
|
|
expr := &physical.BinaryExpr{
|
|
Left: &physical.ColumnExpr{
|
|
Ref: types.ColumnRef{Column: "value", Type: types.ColumnTypeBuiltin},
|
|
},
|
|
Right: &physical.LiteralExpr{
|
|
Value: types.Literal{Value: 0.5},
|
|
},
|
|
Op: types.BinaryOpGt,
|
|
}
|
|
|
|
res, err := e.eval(expr, rec)
|
|
require.NoError(t, err)
|
|
result := collectBooleanColumnVector(res)
|
|
require.Equal(t, []bool{false, true, false, true, false, true, true, false, true, false}, result)
|
|
})
|
|
}
|
|
|
|
func collectBooleanColumnVector(vec ColumnVector) []bool {
|
|
res := make([]bool, 0, vec.Len())
|
|
arr := vec.ToArray().(*array.Boolean)
|
|
for i := range int(vec.Len()) {
|
|
res = append(res, arr.Value(i))
|
|
}
|
|
return res
|
|
}
|
|
|
|
var words = []string{"one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten"}
|
|
|
|
func batch(n int, now time.Time) arrow.Record {
|
|
// 1. Create a memory allocator
|
|
mem := memory.NewGoAllocator()
|
|
|
|
// 2. Define the schema
|
|
schema := arrow.NewSchema(
|
|
[]arrow.Field{
|
|
{Name: "log", Type: arrow.BinaryTypes.String},
|
|
{Name: "timestamp", Type: arrow.PrimitiveTypes.Uint64},
|
|
},
|
|
nil, // No metadata
|
|
)
|
|
|
|
// 3. Create builders for each column
|
|
logBuilder := array.NewStringBuilder(mem)
|
|
defer logBuilder.Release()
|
|
|
|
tsBuilder := array.NewUint64Builder(mem)
|
|
defer tsBuilder.Release()
|
|
|
|
// 4. Append data to the builders
|
|
logs := make([]string, n)
|
|
ts := make([]uint64, n)
|
|
|
|
for i := range n {
|
|
logs[i] = words[i%len(words)]
|
|
ts[i] = uint64(now.Add(time.Duration(i) * time.Second).UnixNano())
|
|
}
|
|
|
|
tsBuilder.AppendValues(ts, nil)
|
|
logBuilder.AppendValues(logs, nil)
|
|
|
|
// 5. Build the arrays
|
|
logArray := logBuilder.NewArray()
|
|
defer logArray.Release()
|
|
|
|
tsArray := tsBuilder.NewArray()
|
|
defer tsArray.Release()
|
|
|
|
// 6. Create the record
|
|
columns := []arrow.Array{logArray, tsArray}
|
|
record := array.NewRecord(schema, columns, int64(n))
|
|
|
|
return record
|
|
}
|
|
|