Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/executor/topk_batch.go

272 lines
7.9 KiB

package executor
import (
"fmt"
"slices"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/arrow/scalar"
"github.com/grafana/loki/v3/pkg/engine/internal/arrowagg"
"github.com/grafana/loki/v3/pkg/util/topk"
)
// topkBatch calculates the top K rows from a stream of [arrow.Record]s, where
// rows are sorted by the specified Fields.
//
// Rows with equal values for all sort fields are sorted by the order in which
// they were appended.
type topkBatch struct {
// Fields holds the list of fields to sort by, in order of precedence. If an
// incoming record is missing one of these fields, the value for that field
// is treated as null.
Fields []arrow.Field
// Ascending indicates whether to store the top K rows in ascending order. If
// true, the smallest K rows will be retained.
Ascending bool
// NullsFirst determines how to sort null values in the top K rows. If
// NullsFirst is true, null values will be treated as less than non-null
// values.
NullsFirst bool
// K holds the number of top rows to compute.
K int
// MaxUnused determines the maximum number of "unused" rows to retain. An
// unused row is any row from a retained record that does not contribute to
// the top K.
//
// After the number of unused rows exceeds MaxUnused, topkBatch will compact
// retained records into a new record only containing the current top K rows.
MaxUnused int
ready bool // True if all fields below are initialized.
nextID int
mapper *arrowagg.Mapper
heap *topk.Heap[*topkReference]
usedCount map[arrow.Record]int
usedSchemas map[*arrow.Schema]int
}
// topkReference is a reference to a row in a record that is part of the
// current set of top K rows.
type topkReference struct {
// ID is a per-row unique ID across all records, used for comparing rows that
// are otherwise equal in the sort order.
ID int
Record arrow.Record // Record contributing to the top K.
Row int
}
// Put adds rows from rec into b. If rec contains at least one row that belongs
// in the current top K rows, rec is retained until a compaction occurs or it
// is pushed out of the top K.
//
// When rec is retained, the number of rows that do not contribute to the top K
// contribute towards the total unused rows in b. Once the number of unused
// rows exceeds MaxUnused, Put calls [topkBatch.Compact] to clean up record
// references.
func (b *topkBatch) Put(alloc memory.Allocator, rec arrow.Record) {
b.put(rec)
// Compact if adding this record pushed us over the limit of unused rows.
if _, unused := b.Size(); unused > b.MaxUnused {
compacted := b.Compact(alloc)
b.put(compacted)
compacted.Release()
}
}
// put adds rows from rec into b without checking the number of unused rows.
func (b *topkBatch) put(rec arrow.Record) {
if !b.ready {
b.init()
}
// Iterate over the rows in the record and attempt to push each of them onto
// the heap. For simplicity, we retain the record per row in the heap, and
// track that count in a map for being able to compute the number of unused
// rows.
for i := range int(rec.NumRows()) {
ref := &topkReference{
ID: b.nextID,
Record: rec,
Row: i,
}
b.nextID++
res, prev := b.heap.Push(ref)
switch res {
case topk.PushResultPushed:
b.usedCount[rec]++
b.usedSchemas[rec.Schema()]++
rec.Retain()
case topk.PushResultReplaced:
b.usedCount[rec]++
b.usedSchemas[rec.Schema()]++
rec.Retain()
b.usedCount[prev.Record]--
b.usedSchemas[prev.Record.Schema()]--
if b.usedCount[prev.Record] == 0 {
delete(b.usedCount, prev.Record)
}
if b.usedSchemas[prev.Record.Schema()] == 0 {
b.mapper.RemoveSchema(prev.Record.Schema())
delete(b.usedSchemas, prev.Record.Schema())
}
prev.Record.Release()
}
}
}
func (b *topkBatch) init() {
b.heap = &topk.Heap[*topkReference]{
Limit: b.K,
Less: func(left, right *topkReference) bool {
if b.Ascending {
// If we're looking for the top K in ascending order, we need to switch
// over to a max-heap and return true if left > right.
return !b.less(left, right)
}
return b.less(left, right)
},
}
b.mapper = arrowagg.NewMapper(b.Fields)
b.usedCount = make(map[arrow.Record]int)
b.usedSchemas = make(map[*arrow.Schema]int)
b.ready = true
}
func (b *topkBatch) less(left, right *topkReference) bool {
for fieldIndex := range b.Fields {
leftArray := b.findRecordArray(left.Record, b.mapper, fieldIndex)
rightArray := b.findRecordArray(right.Record, b.mapper, fieldIndex)
var leftScalar, rightScalar scalar.Scalar
if leftArray != nil {
leftScalar, _ = scalar.GetScalar(leftArray, left.Row)
}
if rightArray != nil {
rightScalar, _ = scalar.GetScalar(rightArray, right.Row)
}
res, err := compareScalars(leftScalar, rightScalar, b.NullsFirst)
if err != nil {
// Treat failure to compare scalars as equal, so that the sort order is
// consistent. This should only happen when given invalid values to
// compare, as we know leftScalar and rightScalar are of the same type.
continue
}
switch res {
case 0: // left == right
// Continue to the next field if two scalars are equal.
continue
case -1: // left < right
return true
case 1: // left > right
return false
}
}
// Fall back to sorting by ID to have consistent ordering, so that no two
// rows are ever equal.
switch {
case b.Ascending:
return left.ID > right.ID
default:
return left.ID < right.ID
}
}
// findRecordArray finds the array for the given [b.Fields] field index from
// the mapper cache. findRecordArray returns nil if the field is not present in
// rec.
func (b *topkBatch) findRecordArray(rec arrow.Record, mapper *arrowagg.Mapper, fieldIndex int) arrow.Array {
columnIndex := mapper.FieldIndex(rec.Schema(), fieldIndex)
if columnIndex < 0 || columnIndex >= int(rec.NumCols()) {
return nil // No such field in the record.
}
return rec.Column(columnIndex)
}
// Size returns the current number of rows in the top K (<= K) and the number
// of unused rows that are retained from records (<= MaxUnused).
func (b *topkBatch) Size() (rows int, unused int) {
if b.heap == nil {
return 0, 0
}
rows = b.heap.Len()
for rec, used := range b.usedCount {
// The number of unused rows per record is its total number of rows minus
// the number of references to it, as each reference corresponds to one
// value in the heap.
unused += int(rec.NumRows()) - used
}
return rows, unused
}
// Compact compacts all retained records into a single record containing just
// the current top K rows.
//
// The returned record will have a combined schema from all of the input
// records. The sort order of fields in the returned record is not guaranteed.
// Rows that did not have one of the combined fields will be filled with null
// values for those fields.
//
// Compact returns nil if no rows are in the top K.
//
// The returned record should be Release'd by the caller when it is no longer
// needed.
func (b *topkBatch) Compact(alloc memory.Allocator) arrow.Record {
if len(b.usedCount) == 0 {
return nil
}
defer b.Reset() // Reset the batch after compaction to free references.
// Get all row references to compact.
rowRefs := b.heap.PopAll()
slices.Reverse(rowRefs)
compactor := arrowagg.NewRecords(alloc)
for _, ref := range rowRefs {
compactor.AppendSlice(ref.Record, int64(ref.Row), int64(ref.Row)+1)
}
compacted, err := compactor.Aggregate()
if err != nil {
// Aggregate should only fail if we didn't aggregate anything, which we
// know we have above.
panic(fmt.Sprintf("topkBatch.Compact: unexpected error aggregating records: %s", err))
}
return compacted
}
// Reset releases all resources held by the topkBatch.
func (b *topkBatch) Reset() {
if !b.ready {
return
}
b.nextID = 0
b.mapper.Reset()
b.heap.PopAll()
for rec, count := range b.usedCount {
for range count {
rec.Release()
}
}
clear(b.usedCount)
clear(b.usedSchemas)
}