Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/internal/executor/topk_batch_test.go

259 lines
6.9 KiB

package executor
import (
"testing"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/util/arrowtest"
)
var (
records = []arrowtest.Rows{
{
{"ts": int64(10), "table": "A", "line": "line A"},
{"ts": int64(15), "table": "A", "line": "line B"},
{"ts": int64(5), "table": "A", "line": "line C"},
{"ts": int64(20), "table": "A", "line": "line D"},
},
{
{"ts": int64(1), "table": "A", "line": "line A"},
{"ts": int64(50), "table": "A", "line": "line B"},
},
{
// This record contains an additional column not found in the other
// records; this tests to make sure topkBatch properly merges schemas.
{"ts": int64(100), "table": "B", "app": "loki", "line": "line A"},
{"ts": int64(75), "table": "B", "app": "loki", "line": "line B"},
{"ts": int64(25), "table": "B", "app": "loki", "line": "line C"},
},
{
{"ts": int64(13), "table": "C", "line": "line A"},
{"ts": int64(15), "table": "C", "line": "line B"},
{"ts": int64(17), "table": "C", "line": "line C"},
{"ts": int64(19), "table": "C", "line": "line D"},
},
{
// This record contains a nil sort key to test the behaviour of
// NullsFirst.
{"table": "D", "line": "line A"},
},
}
)
func Test_topkBatch(t *testing.T) {
tt := []struct {
name string
ascending bool
nullsFirst bool
expect arrowtest.Rows
}{
{
name: "Default",
expect: arrowtest.Rows{
{"ts": nil, "table": "D", "app": nil, "line": "line A"},
{"ts": int64(100), "table": "B", "app": "loki", "line": "line A"},
{"ts": int64(75), "table": "B", "app": "loki", "line": "line B"},
{"ts": int64(50), "table": "A", "app": nil, "line": "line B"},
{"ts": int64(25), "table": "B", "app": "loki", "line": "line C"},
},
},
{
name: "Ascending",
ascending: true,
expect: arrowtest.Rows{
// No app column because none of the rows used have it.
{"ts": int64(1), "table": "A", "line": "line A"},
{"ts": int64(5), "table": "A", "line": "line C"},
{"ts": int64(10), "table": "A", "line": "line A"},
{"ts": int64(13), "table": "C", "line": "line A"},
{"ts": int64(15), "table": "C", "line": "line B"},
},
},
{
name: "NullsFirst",
nullsFirst: true,
expect: arrowtest.Rows{
{"ts": int64(100), "table": "B", "app": "loki", "line": "line A"},
{"ts": int64(75), "table": "B", "app": "loki", "line": "line B"},
{"ts": int64(50), "table": "A", "app": nil, "line": "line B"},
{"ts": int64(25), "table": "B", "app": "loki", "line": "line C"},
{"ts": int64(20), "table": "A", "app": nil, "line": "line D"},
},
},
{
name: "NullsFirst Ascending",
nullsFirst: true,
ascending: true,
expect: arrowtest.Rows{
// No app column because none of the rows used have it.
{"ts": nil, "table": "D", "line": "line A"},
{"ts": int64(1), "table": "A", "line": "line A"},
{"ts": int64(5), "table": "A", "line": "line C"},
{"ts": int64(10), "table": "A", "line": "line A"},
{"ts": int64(13), "table": "C", "line": "line A"},
},
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
b := topkBatch{
Fields: []arrow.Field{{Name: "ts", Type: arrow.PrimitiveTypes.Int64, Nullable: true}},
K: 5,
MaxUnused: 5,
Ascending: tc.ascending,
NullsFirst: tc.nullsFirst,
}
defer b.Reset()
for _, rows := range records {
rec := rows.Record(memory.DefaultAllocator, rows.Schema())
b.Put(rec)
}
output := b.Compact()
actual, err := arrowtest.RecordRows(output)
require.NoError(t, err)
require.Len(t, actual, len(tc.expect))
require.ElementsMatch(t, tc.expect, actual, "rows should match (order may differ)")
})
}
}
// Test_topkBatch_MaxUnused ensures that compaction is automatically triggered
// upon appending a record when the number of unused rows gets too high.
func Test_topkBatch_MaxUnused(t *testing.T) {
// TopK 2, descending order.
b := topkBatch{
Fields: []arrow.Field{{Name: "ts", Type: arrow.PrimitiveTypes.Int64, Nullable: true}},
K: 2,
MaxUnused: 5,
}
defer b.Reset()
// seq tracks a sequence of Put operations into b. Each time we Put a record,
// the number of unused records should account for any record that still has
// rows contributing to the topk.
//
// Records that get fully pushed out of the topk do not contribute to this
// count.
seq := []struct {
rows arrowtest.Rows
expectUsed int
expectUnused int
}{
{
rows: arrowtest.Rows{
{"ts": int64(100)},
{"ts": int64(10)},
{"ts": int64(9)},
{"ts": int64(8)},
{"ts": int64(7)},
},
expectUsed: 2,
expectUnused: 3, // 3 unused from this initial record.
},
{
rows: arrowtest.Rows{
{"ts": int64(50)},
{"ts": int64(6)},
},
expectUsed: 2,
expectUnused: 5, // 4 from previous record, 1 from this record.
},
{
rows: arrowtest.Rows{
{"ts": int64(75)},
{"ts": int64(5)},
{"ts": int64(4)},
{"ts": int64(3)},
},
expectUsed: 2,
expectUnused: 0, // Expect compaction here, since 5/4/3 would be unused put us above the limit.
},
{
rows: arrowtest.Rows{
{"ts": int64(90)},
{"ts": int64(2)},
{"ts": int64(1)},
{"ts": int64(0)},
},
expectUsed: 2,
expectUnused: 4, // 3 from this record and 1 from the previous compacted record
},
}
for i, tc := range seq {
rec := tc.rows.Record(memory.DefaultAllocator, tc.rows.Schema())
b.Put(rec)
used, unused := b.Size()
assert.Equal(t, tc.expectUsed, used, "unexpected number of used rows after record %d", i)
assert.Equal(t, tc.expectUnused, unused, "unexpected number of unused rows after record %d", i)
}
}
func Test_iterContiguousRanges(t *testing.T) {
tt := []struct {
name string
rows []int
ranges []struct{ start, end int }
}{
{
name: "empty slice",
rows: []int{},
},
{
name: "single row",
rows: []int{5},
ranges: []struct{ start, end int }{{5, 6}},
},
{
name: "single contiguous range",
rows: []int{1, 2, 3},
ranges: []struct{ start, end int }{{1, 4}},
},
{
name: "multiple contiguous ranges",
rows: []int{1, 2, 3, 5, 6, 7},
ranges: []struct{ start, end int }{{1, 4}, {5, 8}},
},
{
name: "non-contiguous single rows",
rows: []int{1, 3, 5},
ranges: []struct{ start, end int }{{1, 2}, {3, 4}, {5, 6}},
},
{
name: "all rows",
rows: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
ranges: []struct{ start, end int }{{0, 10}},
},
{
name: "first and last row",
rows: []int{0, 9},
ranges: []struct{ start, end int }{{0, 1}, {9, 10}},
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
var got []struct{ start, end int }
iterContiguousRanges(tc.rows, func(start, end int) bool {
got = append(got, struct{ start, end int }{start, end})
return true
})
require.Equal(t, tc.ranges, got, "ranges should match")
})
}
}