Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/engine/internal/executor/topk.go

210 lines
5.5 KiB

package executor
import (
"context"
"errors"
"fmt"
"time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
"github.com/grafana/loki/v3/pkg/engine/internal/semconv"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/xcap"
)
type topkOptions struct {
// Input pipeliens to compute top K from.
Inputs []Pipeline
// SortBy is the list of columns to sort by, in order of precedence.
SortBy []physical.ColumnExpression
Ascending bool // Sorts lines in ascending order if true.
NullsFirst bool // When true, considers NULLs < non-NULLs when sorting.
K int // Number of top rows to compute.
// MaxUnused determines the maximum number of unused rows to retain. An
// unused row is any row from a retained record that does not contribute to
// the current top K.
//
// After the number of unused rows exceeds this value, retained records are
// compacted into a new record only containing the current used rows.
MaxUnused int
// Region is the xcap region for this node.
Region *xcap.Region
}
// topkPipeline performs a topk (SORT + LIMIT) operation across several input
// pipelines.
type topkPipeline struct {
inputs []Pipeline
// TopK can be sorted by any set of columns, but sorting by timestamp only is a special case
// due to possibility of short circuting.
sortByTime bool
callbacks []ContributingTimeRangeChangedHandler
batch *topkBatch
region *xcap.Region
computed bool
}
var _ Pipeline = (*topkPipeline)(nil)
// newTopkPipeline creates a new topkPipeline with the given options.
func newTopkPipeline(opts topkOptions) (*topkPipeline, error) {
fields, err := exprsToFields(opts.SortBy)
if err != nil {
return nil, err
}
sortByTime := false
if len(fields) == 1 {
fieldIdent, err := semconv.ParseFQN(fields[0].Name)
if err != nil {
return nil, err
}
sortByTime = semconv.ColumnIdentTimestamp.Equal(fieldIdent)
}
return &topkPipeline{
inputs: opts.Inputs,
sortByTime: sortByTime,
batch: &topkBatch{
Fields: fields,
Ascending: opts.Ascending,
NullsFirst: opts.NullsFirst,
K: opts.K,
MaxUnused: opts.MaxUnused,
},
region: opts.Region,
}, nil
}
func exprsToFields(exprs []physical.ColumnExpression) ([]arrow.Field, error) {
fields := make([]arrow.Field, 0, len(exprs))
for _, expr := range exprs {
expr, ok := expr.(*physical.ColumnExpr)
if !ok {
panic("topkPipeline only supports ColumnExpr expressions")
}
dt, err := guessLokiType(expr.Ref)
if err != nil {
return nil, err
}
ident := semconv.NewIdentifier(expr.Ref.Column, expr.Ref.Type, dt)
fields = append(fields, semconv.FieldFromIdent(ident, true))
}
return fields, nil
}
func guessLokiType(ref types.ColumnRef) (types.DataType, error) {
switch ref.Type {
case types.ColumnTypeBuiltin:
switch ref.Column {
case types.ColumnNameBuiltinTimestamp:
return types.Loki.Timestamp, nil
case types.ColumnNameBuiltinMessage:
return types.Loki.String, nil
default:
panic(fmt.Sprintf("unsupported builtin column type %s", ref))
}
case types.ColumnTypeGenerated:
return types.Loki.Float, nil
case types.ColumnTypeAmbiguous:
// TODO(rfratto): It's not clear how topk should sort when there's an
// ambiguous column reference, since ambiguous column references can
// refer to multiple columns.
return nil, fmt.Errorf("topkPipeline does not support ambiguous column types")
default:
return types.Loki.String, nil
}
}
// Read computes the topk as the next record. Read blocks until all input
// pipelines have been fully read and the top K rows have been computed.
func (p *topkPipeline) Read(ctx context.Context) (arrow.RecordBatch, error) {
if !p.computed {
rec, err := p.compute(ctx)
p.computed = true
return rec, err
}
return nil, EOF
}
func (p *topkPipeline) compute(ctx context.Context) (arrow.RecordBatch, error) {
var currentHeapMin time.Time
NextInput:
for _, in := range p.inputs {
for {
rec, err := in.Read(ctx)
if err != nil && errors.Is(err, EOF) {
continue NextInput
} else if err != nil {
return nil, err
}
p.batch.Put(rec)
// Short circuiting is possible only when the heap is full and it is sorted by timestamp.
if p.sortByTime && p.batch.IsFull() {
// We can safely assume there is 1 timestamp column and 1 row
heapMin := p.batch.Peek().Column(0).(*array.Timestamp).Value(0).ToTime(arrow.Nanosecond)
if p.batch.Ascending {
// bottom k
if currentHeapMin.IsZero() || heapMin.Before(currentHeapMin) {
currentHeapMin = heapMin
p.notifyAll(currentHeapMin, true)
}
} else {
// top k
if currentHeapMin.IsZero() || heapMin.After(currentHeapMin) {
currentHeapMin = heapMin
p.notifyAll(currentHeapMin, false)
}
}
}
}
}
compacted := p.batch.Compact()
if compacted == nil {
return nil, EOF
}
return compacted, nil
}
// Close closes the resources of the pipeline.
func (p *topkPipeline) Close() {
if p.region != nil {
p.region.End()
}
p.batch.Reset()
for _, in := range p.inputs {
in.Close()
}
}
// Region implements RegionProvider.
func (p *topkPipeline) Region() *xcap.Region {
return p.region
}
// SubscribeToTimeRangeChanges implements ContributingTimeRangeChangedNotifier
func (p *topkPipeline) SubscribeToTimeRangeChanges(callback ContributingTimeRangeChangedHandler) {
p.callbacks = append(p.callbacks, callback)
}
func (p *topkPipeline) notifyAll(ts time.Time, lessThan bool) {
for _, callback := range p.callbacks {
callback(ts, lessThan)
}
}