From e40777ea3640f6b4bc577c99bc3bc2e7ae5b3627 Mon Sep 17 00:00:00 2001 From: benclive Date: Fri, 5 Jun 2026 18:42:18 +0100 Subject: [PATCH] chore: Apply label hash sorting to topk when timestamps match (#22282) --- pkg/dataobj/sections/logs/table_test.go | 3 +- pkg/engine/internal/executor/topk.go | 12 +- pkg/engine/internal/executor/topk_batch.go | 53 ++++++-- .../internal/executor/topk_batch_test.go | 128 +++++++++++++----- 4 files changed, 144 insertions(+), 52 deletions(-) diff --git a/pkg/dataobj/sections/logs/table_test.go b/pkg/dataobj/sections/logs/table_test.go index 56ddb3d25e..dffb512247 100644 --- a/pkg/dataobj/sections/logs/table_test.go +++ b/pkg/dataobj/sections/logs/table_test.go @@ -146,6 +146,7 @@ func TestSortRecords_SortSchemaASC(t *testing.T) { records := []Record{ {StreamID: 10, Timestamp: t1, SortKey: "app-b", Line: []byte("b-old")}, + {StreamID: 10, Timestamp: t1, SortKey: "app-b", Line: []byte("b-old-2")}, // Stable sort: Ensure a second record with the same sort key is emitted in the order it was presented. {StreamID: 20, Timestamp: t3, SortKey: "app-a", Line: []byte("a-new")}, {StreamID: 21, Timestamp: t1, SortKey: "app-a", Line: []byte("a-old")}, {StreamID: 11, Timestamp: t2, SortKey: "app-b", Line: []byte("b-mid")}, @@ -161,7 +162,7 @@ func TestSortRecords_SortSchemaASC(t *testing.T) { } // app-a records first (sort key ASC), timestamp DESC within group - require.Equal(t, []string{"a-new", "a-mid", "a-old", "b-new", "b-mid", "b-old"}, lines) + require.Equal(t, []string{"a-new", "a-mid", "a-old", "b-new", "b-mid", "b-old", "b-old-2"}, lines) for i := 0; i < len(records)-1; i++ { a, b := records[i], records[i+1] diff --git a/pkg/engine/internal/executor/topk.go b/pkg/engine/internal/executor/topk.go index 342e11c1f2..edc2a760ad 100644 --- a/pkg/engine/internal/executor/topk.go +++ b/pkg/engine/internal/executor/topk.go @@ -8,6 +8,7 @@ import ( "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" + "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/v3/pkg/engine/internal/assertions" "github.com/grafana/loki/v3/pkg/engine/internal/planner/physical" @@ -71,11 +72,12 @@ func newTopkPipeline(opts topkOptions) (*topkPipeline, error) { inputs: opts.Inputs, sortByTime: sortByTime, batch: &topkBatch{ - Fields: fields, - Ascending: opts.Ascending, - NullsFirst: opts.NullsFirst, - K: opts.K, - MaxUnused: opts.MaxUnused, + Fields: fields, + Ascending: opts.Ascending, + NullsFirst: opts.NullsFirst, + K: opts.K, + MaxUnused: opts.MaxUnused, + labelsBuilder: labels.NewScratchBuilder(1), }, }, nil } diff --git a/pkg/engine/internal/executor/topk_batch.go b/pkg/engine/internal/executor/topk_batch.go index 7ac137d30a..5ccd5e869f 100644 --- a/pkg/engine/internal/executor/topk_batch.go +++ b/pkg/engine/internal/executor/topk_batch.go @@ -7,8 +7,11 @@ import ( "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/v3/pkg/engine/internal/arrowagg" + "github.com/grafana/loki/v3/pkg/engine/internal/semconv" + "github.com/grafana/loki/v3/pkg/engine/internal/types" "github.com/grafana/loki/v3/pkg/util/topk" ) @@ -48,22 +51,20 @@ type topkBatch struct { MaxUnused int ready bool // True if all fields below are initialized. - nextID int mapper *arrowagg.Mapper heap *topk.Heap[*topkReference] usedCount map[arrow.RecordBatch]int usedSchemas map[*arrow.Schema]int + + labelsBuilder labels.ScratchBuilder } // topkReference is a reference to a row in a record that is part of the // current set of top K rows. type topkReference struct { - // ID is a per-row unique ID across all records, used for comparing rows that - // are otherwise equal in the sort order. - ID int - - Record arrow.RecordBatch // Record contributing to the top K. - Row int + Record arrow.RecordBatch // Record contributing to the top K. + Row int + LabelHash uint64 } // Put adds rows from rec into b. If rec contains at least one row that belongs @@ -138,11 +139,9 @@ func (b *topkBatch) put(rec arrow.RecordBatch) { // rows. for i := range int(rec.NumRows()) { ref := &topkReference{ - ID: b.nextID, Record: rec, Row: i, } - b.nextID++ res, prev := b.heap.Push(ref) switch res { @@ -209,14 +208,40 @@ func (b *topkBatch) less(left, right *topkReference) bool { } } - // Fall back to sorting by ID to have consistent ordering, so that no two - // rows are ever equal. + // Fall back to sorting by label hash to have consistent ordering with the classic engine, so that no two + // rows are ever equal. Hashes are computed lazily as they aren't needed for every row and stored to avoid recomputing. + if left.LabelHash == 0 { + left.LabelHash = b.labelHash(left.Record, left.Row) + } + if right.LabelHash == 0 { + right.LabelHash = b.labelHash(right.Record, right.Row) + } + switch { case b.Ascending: - return left.ID > right.ID + return left.LabelHash < right.LabelHash default: - return left.ID < right.ID + return left.LabelHash > right.LabelHash + } +} + +func (b *topkBatch) labelHash(rec arrow.RecordBatch, row int) uint64 { + b.labelsBuilder.Reset() + for fieldIndex := range int(rec.NumCols()) { + field := rec.Schema().Field(fieldIndex) + ident, err := semconv.ParseFQN(field.Name) + if err != nil { + continue + } + if ident.ColumnType() == types.ColumnTypeLabel { + if rec.Column(fieldIndex).IsNull(row) || !rec.Column(fieldIndex).IsValid(row) { + continue + } + b.labelsBuilder.Add(ident.ShortName(), rec.Column(fieldIndex).ValueStr(row)) + } } + b.labelsBuilder.Sort() + return b.labelsBuilder.Labels().Hash() } // findRecordArray finds the array for the given [b.Fields] field index from @@ -298,9 +323,9 @@ func (b *topkBatch) Reset() { return } - b.nextID = 0 b.mapper.Reset() b.heap.PopAll() + b.labelsBuilder.Reset() clear(b.usedCount) clear(b.usedSchemas) diff --git a/pkg/engine/internal/executor/topk_batch_test.go b/pkg/engine/internal/executor/topk_batch_test.go index 3c9e3a9179..8171f115bf 100644 --- a/pkg/engine/internal/executor/topk_batch_test.go +++ b/pkg/engine/internal/executor/topk_batch_test.go @@ -5,44 +5,45 @@ import ( "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/grafana/loki/v3/pkg/engine/internal/semconv" + "github.com/grafana/loki/v3/pkg/engine/internal/types" "github.com/grafana/loki/v3/pkg/util/arrowtest" ) -var ( - records = []arrowtest.Rows{ - { - {"ts": int64(10), "table": "A", "line": "line A"}, - {"ts": int64(15), "table": "A", "line": "line B"}, - {"ts": int64(5), "table": "A", "line": "line C"}, - {"ts": int64(20), "table": "A", "line": "line D"}, - }, - { - {"ts": int64(1), "table": "A", "line": "line A"}, - {"ts": int64(50), "table": "A", "line": "line B"}, - }, - { - // This record contains an additional column not found in the other - // records; this tests to make sure topkBatch properly merges schemas. - {"ts": int64(100), "table": "B", "app": "loki", "line": "line A"}, - {"ts": int64(75), "table": "B", "app": "loki", "line": "line B"}, - {"ts": int64(25), "table": "B", "app": "loki", "line": "line C"}, - }, - { - {"ts": int64(13), "table": "C", "line": "line A"}, - {"ts": int64(15), "table": "C", "line": "line B"}, - {"ts": int64(17), "table": "C", "line": "line C"}, - {"ts": int64(19), "table": "C", "line": "line D"}, - }, - { - // This record contains a nil sort key to test the behaviour of - // NullsFirst. - {"table": "D", "line": "line A"}, - }, - } -) +var records = []arrowtest.Rows{ + { + {"ts": int64(10), "table": "A", "line": "line A"}, + {"ts": int64(15), "table": "A", "line": "line B"}, + {"ts": int64(5), "table": "A", "line": "line C"}, + {"ts": int64(20), "table": "A", "line": "line D"}, + }, + { + {"ts": int64(1), "table": "A", "line": "line A"}, + {"ts": int64(50), "table": "A", "line": "line B"}, + }, + { + // This record contains an additional column not found in the other + // records; this tests to make sure topkBatch properly merges schemas. + {"ts": int64(100), "table": "B", "app": "loki", "line": "line A"}, + {"ts": int64(75), "table": "B", "app": "loki", "line": "line B"}, + {"ts": int64(25), "table": "B", "app": "loki", "line": "line C"}, + }, + { + {"ts": int64(13), "table": "C", "line": "line A"}, + {"ts": int64(15), "table": "C", "line": "line B"}, + {"ts": int64(17), "table": "C", "line": "line C"}, + {"ts": int64(19), "table": "C", "line": "line D"}, + }, + { + // This record contains a nil sort key to test the behaviour of + // NullsFirst. + {"table": "D", "line": "line A"}, + }, +} func Test_topkBatch(t *testing.T) { tt := []struct { @@ -128,6 +129,69 @@ func Test_topkBatch(t *testing.T) { } } +func Test_topkBatch_stability(t *testing.T) { + lineIdent := semconv.NewIdentifier("line", types.ColumnTypeLabel, types.Loki.String) + records = []arrowtest.Rows{ + { + {"ts": int64(10), lineIdent.FQN(): "line A"}, + {"ts": int64(15), lineIdent.FQN(): "line B"}, + {"ts": int64(5), lineIdent.FQN(): "line C"}, + }, + { + {"ts": int64(5), lineIdent.FQN(): "line D"}, + }, + } + + expect := arrowtest.Rows{ + {"ts": int64(15), lineIdent.FQN(): "line B"}, + {"ts": int64(10), lineIdent.FQN(): "line A"}, + {"ts": int64(5), lineIdent.FQN(): "line D"}, + } + + schema := arrow.NewSchema([]arrow.Field{ + {Name: "ts", Type: arrow.PrimitiveTypes.Int64, Nullable: true}, + {Name: lineIdent.FQN(), Type: arrow.BinaryTypes.String, Nullable: true}, + }, nil) + + b := topkBatch{ + Fields: []arrow.Field{{Name: "ts", Type: arrow.PrimitiveTypes.Int64, Nullable: true}}, + K: 3, + MaxUnused: 3, + Ascending: false, + NullsFirst: false, + labelsBuilder: labels.NewScratchBuilder(1), + } + defer b.Reset() + + for _, rows := range records { + rec := rows.Record(memory.DefaultAllocator, schema) + b.Put(rec) + } + + output := b.Compact() + + actual, err := arrowtest.RecordRows(output) + require.NoError(t, err) + require.Len(t, actual, 3) + require.ElementsMatch(t, expect, actual, "rows should match (order may differ)") + + b.Reset() + + // Now apply batches in reverse order to test stability. + for i := len(records) - 1; i >= 0; i-- { + rows := records[i] + rec := rows.Record(memory.DefaultAllocator, schema) + b.Put(rec) + } + + output = b.Compact() + + actual, err = arrowtest.RecordRows(output) + require.NoError(t, err) + require.Len(t, actual, 3) + require.ElementsMatch(t, expect, actual, "rows should match when added in reverse order (order may differ)") +} + // Test_topkBatch_MaxUnused ensures that compaction is automatically triggered // upon appending a record when the number of unused rows gets too high. func Test_topkBatch_MaxUnused(t *testing.T) {