Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/executor/sortmerge_test.go

144 lines
3.9 KiB

package executor
import (
"slices"
"testing"
"time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/engine/planner/physical"
)
func TestSortMerge(t *testing.T) {
now := time.Unix(1000000, 0)
var batchSize = int64(3)
c := &Context{
batchSize: batchSize,
}
t.Run("invalid column name", func(t *testing.T) {
merge := &physical.SortMerge{
Column: &physical.ColumnExpr{
Ref: types.ColumnRef{
Column: "not_a_timestamp_column",
Type: types.ColumnTypeBuiltin,
},
},
Order: physical.ASC,
}
inputs := []Pipeline{
ascendingTimestampPipeline(now.Add(1*time.Nanosecond)).Pipeline(batchSize, 10),
ascendingTimestampPipeline(now.Add(2*time.Nanosecond)).Pipeline(batchSize, 10),
ascendingTimestampPipeline(now.Add(3*time.Nanosecond)).Pipeline(batchSize, 10),
}
pipeline, err := NewSortMergePipeline(inputs, merge.Order, merge.Column, expressionEvaluator{})
require.NoError(t, err)
ctx := t.Context()
err = pipeline.Read(ctx)
require.ErrorContains(t, err, "column is not a timestamp column")
})
t.Run("ascending timestamp", func(t *testing.T) {
merge := &physical.SortMerge{
Column: &physical.ColumnExpr{
Ref: types.ColumnRef{
Column: types.ColumnNameBuiltinTimestamp,
Type: types.ColumnTypeBuiltin,
},
},
Order: physical.ASC,
}
inputs := []Pipeline{
ascendingTimestampPipeline(now.Add(1*time.Nanosecond)).Pipeline(batchSize, 10),
ascendingTimestampPipeline(now.Add(2*time.Millisecond)).Pipeline(batchSize, 10),
ascendingTimestampPipeline(now.Add(3*time.Second)).Pipeline(batchSize, 10),
}
pipeline, err := NewSortMergePipeline(inputs, merge.Order, merge.Column, expressionEvaluator{})
require.NoError(t, err)
ctx := t.Context()
timestamps := make([]arrow.Timestamp, 0, 30)
var batches, rows int64
for {
err := pipeline.Read(ctx)
if err == EOF {
break
}
if err != nil {
t.Fatalf("did not expect error, got %s", err.Error())
}
batch, _ := pipeline.Value()
tsCol, err := c.evaluator.eval(merge.Column, batch)
require.NoError(t, err)
arr := tsCol.ToArray().(*array.Timestamp)
timestamps = append(timestamps, arr.Values()...)
batches++
rows += batch.NumRows()
}
// Check if ts column is sorted
require.Truef(t,
slices.IsSortedFunc(timestamps, func(a, b arrow.Timestamp) int { return int(a - b) }),
"timestamps are not sorted in ASC order: %v", timestamps)
})
t.Run("descending timestamp", func(t *testing.T) {
merge := &physical.SortMerge{
Column: &physical.ColumnExpr{
Ref: types.ColumnRef{
Column: types.ColumnNameBuiltinTimestamp,
Type: types.ColumnTypeBuiltin,
},
},
Order: physical.DESC,
}
inputs := []Pipeline{
descendingTimestampPipeline(now.Add(1*time.Nanosecond)).Pipeline(batchSize, 10),
descendingTimestampPipeline(now.Add(2*time.Millisecond)).Pipeline(batchSize, 10),
descendingTimestampPipeline(now.Add(3*time.Second)).Pipeline(batchSize, 10),
}
pipeline, err := NewSortMergePipeline(inputs, merge.Order, merge.Column, expressionEvaluator{})
require.NoError(t, err)
ctx := t.Context()
timestamps := make([]arrow.Timestamp, 0, 30)
var batches, rows int64
for {
err := pipeline.Read(ctx)
if err == EOF {
break
}
if err != nil {
t.Fatalf("did not expect error, got %s", err.Error())
}
batch, _ := pipeline.Value()
tsCol, err := c.evaluator.eval(merge.Column, batch)
require.NoError(t, err)
arr := tsCol.ToArray().(*array.Timestamp)
timestamps = append(timestamps, arr.Values()...)
batches++
rows += batch.NumRows()
}
// Check if ts column is sorted
require.Truef(t,
slices.IsSortedFunc(timestamps, func(a, b arrow.Timestamp) int { return int(b - a) }),
"timestamps are not sorted in DESC order: %v", timestamps)
})
}