package executor import ( "fmt" "slices" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/arrow-go/v18/arrow/memory" "github.com/grafana/loki/v3/pkg/engine/internal/arrowagg" "github.com/grafana/loki/v3/pkg/util/topk" ) // topkBatch calculates the top K rows from a stream of [arrow.RecordBatch]s, where // rows are ranked by the specified Fields. // // Rows with equal values for all the fields are ranked by the order in which // they were appended. // // topkBatch only identifies which rows belong in the top K, but does not // guarantee any specific ordering of those rows in the compacted output. Callers // should sort the result if a specific order is required. type topkBatch struct { // Fields holds the list of fields to sort by, in order of precedence. If an // incoming record is missing one of these fields, the value for that field // is treated as null. Fields []arrow.Field // Ascending indicates whether to store the top K rows in ascending order. If // true, the smallest K rows will be retained. Ascending bool // NullsFirst determines how to sort null values in the top K rows. If // NullsFirst is true, null values will be treated as less than non-null // values. NullsFirst bool // K holds the number of top rows to compute. K int // MaxUnused determines the maximum number of "unused" rows to retain. An // unused row is any row from a retained record that does not contribute to // the top K. // // After the number of unused rows exceeds MaxUnused, topkBatch will compact // retained records into a new record only containing the current top K rows. MaxUnused int ready bool // True if all fields below are initialized. nextID int mapper *arrowagg.Mapper heap *topk.Heap[*topkReference] usedCount map[arrow.RecordBatch]int usedSchemas map[*arrow.Schema]int } // topkReference is a reference to a row in a record that is part of the // current set of top K rows. type topkReference struct { // ID is a per-row unique ID across all records, used for comparing rows that // are otherwise equal in the sort order. ID int Record arrow.RecordBatch // Record contributing to the top K. Row int } // Put adds rows from rec into b. If rec contains at least one row that belongs // in the current top K rows, rec is retained until a compaction occurs or it // is pushed out of the top K. // // When rec is retained, the number of rows that do not contribute to the top K // contribute towards the total unused rows in b. Once the number of unused // rows exceeds MaxUnused, Put calls [topkBatch.Compact] to clean up record // references. func (b *topkBatch) Put(rec arrow.RecordBatch) { b.put(rec) // Compact if adding this record pushed us over the limit of unused rows. if _, unused := b.Size(); unused > b.MaxUnused { compacted := b.Compact() b.put(compacted) } } func (b *topkBatch) IsFull() bool { return b.ready && b.K > 0 && b.heap.Len() >= b.K } func (b *topkBatch) Peek() arrow.RecordBatch { ref, ok := b.heap.Peek() if !ok { return nil } return b.refToRecordBatch(ref) } func (b *topkBatch) refToRecordBatch(ref *topkReference) arrow.RecordBatch { schema := arrow.NewSchema(b.Fields, nil) rb := array.NewRecordBuilder(memory.DefaultAllocator, schema) for fieldIndex := range b.Fields { switch arr := b.findRecordArray(ref.Record, b.mapper, fieldIndex).(type) { case *array.Binary: rb.Field(fieldIndex).(*array.BinaryBuilder).Append(arr.Value(ref.Row)) case *array.Duration: rb.Field(fieldIndex).(*array.DurationBuilder).Append(arr.Value(ref.Row)) case *array.Float64: rb.Field(fieldIndex).(*array.Float64Builder).Append(arr.Value(ref.Row)) case *array.Uint64: rb.Field(fieldIndex).(*array.Uint64Builder).Append(arr.Value(ref.Row)) case *array.Int64: rb.Field(fieldIndex).(*array.Int64Builder).Append(arr.Value(ref.Row)) case *array.String: rb.Field(fieldIndex).(*array.StringBuilder).Append(arr.Value(ref.Row)) case *array.Timestamp: rb.Field(fieldIndex).(*array.TimestampBuilder).Append(arr.Value(ref.Row)) default: panic(fmt.Errorf("unknown array type: %T", ref.Record)) } } return rb.NewRecordBatch() } // put adds rows from rec into b without checking the number of unused rows. func (b *topkBatch) put(rec arrow.RecordBatch) { if !b.ready { b.init() } // Iterate over the rows in the record and attempt to push each of them onto // the heap. For simplicity, we retain the record per row in the heap, and // track that count in a map for being able to compute the number of unused // rows. for i := range int(rec.NumRows()) { ref := &topkReference{ ID: b.nextID, Record: rec, Row: i, } b.nextID++ res, prev := b.heap.Push(ref) switch res { case topk.PushResultPushed: b.usedCount[rec]++ b.usedSchemas[rec.Schema()]++ case topk.PushResultReplaced: b.usedCount[rec]++ b.usedSchemas[rec.Schema()]++ b.usedCount[prev.Record]-- b.usedSchemas[prev.Record.Schema()]-- if b.usedCount[prev.Record] == 0 { delete(b.usedCount, prev.Record) } if b.usedSchemas[prev.Record.Schema()] == 0 { b.mapper.RemoveSchema(prev.Record.Schema()) delete(b.usedSchemas, prev.Record.Schema()) } } } } func (b *topkBatch) init() { b.heap = &topk.Heap[*topkReference]{ Limit: b.K, Less: func(left, right *topkReference) bool { if b.Ascending { // If we're looking for the top K in ascending order, we need to switch // over to a max-heap and return true if left > right. return !b.less(left, right) } return b.less(left, right) }, } b.mapper = arrowagg.NewMapper(b.Fields) b.usedCount = make(map[arrow.RecordBatch]int) b.usedSchemas = make(map[*arrow.Schema]int) b.ready = true } func (b *topkBatch) less(left, right *topkReference) bool { for fieldIndex := range b.Fields { leftArray := b.findRecordArray(left.Record, b.mapper, fieldIndex) rightArray := b.findRecordArray(right.Record, b.mapper, fieldIndex) // Compare directly from arrays without creating scalars to avoid allocations res, err := compareArrays(leftArray, rightArray, left.Row, right.Row, b.NullsFirst) if err != nil { // Treat failure to compare as equal, so that the sort order is // consistent. This should only happen when given invalid values to // compare, as we know leftArray and rightArray are of the same type. continue } switch res { case 0: // left == right // Continue to the next field if two values are equal. continue case -1: // left < right return true case 1: // left > right return false } } // Fall back to sorting by ID to have consistent ordering, so that no two // rows are ever equal. switch { case b.Ascending: return left.ID > right.ID default: return left.ID < right.ID } } // findRecordArray finds the array for the given [b.Fields] field index from // the mapper cache. findRecordArray returns nil if the field is not present in // rec. func (b *topkBatch) findRecordArray(rec arrow.RecordBatch, mapper *arrowagg.Mapper, fieldIndex int) arrow.Array { columnIndex := mapper.FieldIndex(rec.Schema(), fieldIndex) if columnIndex < 0 || columnIndex >= int(rec.NumCols()) { return nil // No such field in the record. } return rec.Column(columnIndex) } // Size returns the current number of rows in the top K (<= K) and the number // of unused rows that are retained from records (<= MaxUnused). func (b *topkBatch) Size() (rows int, unused int) { if b.heap == nil { return 0, 0 } rows = b.heap.Len() for rec, used := range b.usedCount { // The number of unused rows per record is its total number of rows minus // the number of references to it, as each reference corresponds to one // value in the heap. unused += int(rec.NumRows()) - used } return rows, unused } // Compact compacts all retained records into a single record containing just // the current top K rows. // // The returned record will have a combined schema from all of the input // records. Neither the order of fields nor the order of rows in the returned // record is guaranteed. Rows that did not have one of the // combined fields will be filled with null values for those fields. // // Compact returns nil if no rows are in the top K. func (b *topkBatch) Compact() arrow.RecordBatch { if len(b.usedCount) == 0 { return nil } defer b.Reset() // Reset the batch after compaction to free references. // Get all row references to compact. rowRefs := b.heap.PopAll() recordRows := make(map[arrow.RecordBatch][]int, len(b.usedCount)) for _, ref := range rowRefs { recordRows[ref.Record] = append(recordRows[ref.Record], ref.Row) } compactor := arrowagg.NewRecords(memory.DefaultAllocator) for rec, rows := range recordRows { slices.Sort(rows) iterContiguousRanges(rows, func(start, end int) bool { compactor.AppendSlice(rec, int64(start), int64(end)) return true }) } // Rows are grouped by their source record and appended // in contiguous ranges for efficiency. compacted, err := compactor.Aggregate() if err != nil { // Aggregate should only fail if we didn't aggregate anything, which we // know we have above. panic(fmt.Sprintf("topkBatch.Compact: unexpected error aggregating records: %s", err)) } return compacted } // Reset releases all resources held by the topkBatch. func (b *topkBatch) Reset() { if !b.ready { return } b.nextID = 0 b.mapper.Reset() b.heap.PopAll() clear(b.usedCount) clear(b.usedSchemas) } // iterContiguousRanges iterates over contiguous ranges of row indices from a sorted // slice. Rows must be sorted in ascending order. // // For example, if rows is [1, 2, 3, 5, 6, 7], it will yield two ranges: // [1, 4) and [5, 8), representing the contiguous sequences. // // The function calls yield for each contiguous range found. If yield returns false, // iteration stops. func iterContiguousRanges(rows []int, yield func(start, end int) bool) { if len(rows) == 0 { return } startRow := rows[0] for i := 1; i < len(rows); i++ { // If current row is not contiguous with previous, yield the previous range if rows[i] != rows[i-1]+1 { if !yield(startRow, rows[i-1]+1) { return } startRow = rows[i] } } // Yield the final contiguous range yield(startRow, rows[len(rows)-1]+1) }