mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
573 lines
17 KiB
573 lines
17 KiB
package executor
|
|
|
|
import (
|
|
"cmp"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"runtime"
|
|
"slices"
|
|
"sync"
|
|
|
|
"github.com/apache/arrow-go/v18/arrow"
|
|
"github.com/apache/arrow-go/v18/arrow/array"
|
|
"github.com/apache/arrow-go/v18/arrow/memory"
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"go.uber.org/atomic"
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
"github.com/grafana/loki/v3/pkg/dataobj"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/datatype"
|
|
"github.com/grafana/loki/v3/pkg/engine/internal/types"
|
|
"github.com/grafana/loki/v3/pkg/engine/planner/physical"
|
|
"github.com/grafana/loki/v3/pkg/util/topk"
|
|
)
|
|
|
|
type dataobjScan struct {
|
|
ctx context.Context
|
|
opts dataobjScanOptions
|
|
|
|
initialized bool
|
|
readers []*logs.RowReader
|
|
streams map[int64]labels.Labels
|
|
|
|
state state
|
|
}
|
|
|
|
type dataobjScanOptions struct {
|
|
// TODO(rfratto): Limiting each DataObjScan to a single section is going to
|
|
// be critical for limiting memory overhead here; the section is intended to
|
|
// be the smallest unit of parallelization.
|
|
|
|
Object *dataobj.Object // Object to read from.
|
|
StreamIDs []int64 // Stream IDs to match from logs sections.
|
|
Sections []int // Logs sections to fetch.
|
|
Predicates []logs.RowPredicate // Predicate to apply to the logs.
|
|
Projections []physical.ColumnExpression // Columns to include. An empty slice means all columns.
|
|
|
|
Direction physical.SortOrder // Order of timestamps to return (ASC=Forward, DESC=Backward)
|
|
Limit uint32 // A limit on the number of rows to return (0=unlimited).
|
|
}
|
|
|
|
var _ Pipeline = (*dataobjScan)(nil)
|
|
|
|
// newDataobjScanPipeline creates a new Pipeline which emits a single
|
|
// [arrow.Record] composed of all log sections in a data object. Rows in the
|
|
// returned record are ordered by timestamp in the direction specified by
|
|
// opts.Direction.
|
|
func newDataobjScanPipeline(ctx context.Context, opts dataobjScanOptions) *dataobjScan {
|
|
return &dataobjScan{ctx: ctx, opts: opts}
|
|
}
|
|
|
|
// Read retrieves the next [arrow.Record] from the dataobj.
|
|
func (s *dataobjScan) Read() error {
|
|
if err := s.init(); err != nil {
|
|
return err
|
|
}
|
|
|
|
rec, err := s.read()
|
|
s.state = newState(rec, err)
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("reading data object: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *dataobjScan) init() error {
|
|
if s.initialized {
|
|
return nil
|
|
}
|
|
|
|
if err := s.initStreams(); err != nil {
|
|
return fmt.Errorf("initializing streams: %w", err)
|
|
}
|
|
|
|
s.readers = nil
|
|
|
|
for idx, section := range s.opts.Object.Sections().Filter(logs.CheckSection) {
|
|
// Filter out sections that are not part of this shard
|
|
if !slices.Contains(s.opts.Sections, idx) {
|
|
continue
|
|
}
|
|
|
|
sec, err := logs.Open(s.ctx, section)
|
|
if err != nil {
|
|
return fmt.Errorf("opening logs section: %w", err)
|
|
}
|
|
|
|
// TODO(rfratto): There's a few problems with using LogsReader as it is:
|
|
//
|
|
// 1. LogsReader doesn't support providing a subset of columns to read
|
|
// from, so we're applying projections after reading.
|
|
//
|
|
// 2. LogsReader is intended to be pooled to reduce memory, but we're
|
|
// creating a new one every time here.
|
|
//
|
|
// For the sake of the initial implementation I'm ignoring these issues,
|
|
// but we'll absolutely need to solve this prior to production use.
|
|
lr := logs.NewRowReader(sec)
|
|
|
|
// The calls below can't fail because we're always using a brand new logs
|
|
// reader.
|
|
_ = lr.MatchStreams(slices.Values(s.opts.StreamIDs))
|
|
_ = lr.SetPredicates(s.opts.Predicates)
|
|
|
|
s.readers = append(s.readers, lr)
|
|
}
|
|
|
|
s.initialized = true
|
|
return nil
|
|
}
|
|
|
|
// initStreams retrieves all requested stream records from streams sections so
|
|
// that emitted [arrow.Record]s can include stream labels in results.
|
|
func (s *dataobjScan) initStreams() error {
|
|
var sr streams.RowReader
|
|
defer sr.Close()
|
|
|
|
streamsBuf := make([]streams.Stream, 512)
|
|
|
|
// Initialize entries in the map so we can do a presence test in the loop
|
|
// below.
|
|
s.streams = make(map[int64]labels.Labels, len(s.opts.StreamIDs))
|
|
for _, id := range s.opts.StreamIDs {
|
|
s.streams[id] = labels.EmptyLabels()
|
|
}
|
|
|
|
for _, section := range s.opts.Object.Sections().Filter(streams.CheckSection) {
|
|
sec, err := streams.Open(s.ctx, section)
|
|
if err != nil {
|
|
return fmt.Errorf("opening streams section: %w", err)
|
|
}
|
|
|
|
// TODO(rfratto): dataobj.StreamsPredicate is missing support for filtering
|
|
// on stream IDs when we already know them in advance; this can cause the
|
|
// Read here to take longer than it needs to since we're reading the
|
|
// entirety of every row.
|
|
sr.Reset(sec)
|
|
|
|
for {
|
|
n, err := sr.Read(s.ctx, streamsBuf)
|
|
if n == 0 && errors.Is(err, io.EOF) {
|
|
return nil
|
|
} else if err != nil && !errors.Is(err, io.EOF) {
|
|
return err
|
|
}
|
|
|
|
for i, stream := range streamsBuf[:n] {
|
|
if _, found := s.streams[stream.ID]; !found {
|
|
continue
|
|
}
|
|
|
|
s.streams[stream.ID] = stream.Labels
|
|
|
|
// Zero out the stream entry from the slice so the next call to sr.Read
|
|
// doesn't overwrite any memory we just moved to s.streams.
|
|
streamsBuf[i] = streams.Stream{}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Check that all streams were populated.
|
|
var errs []error
|
|
for id, labels := range s.streams {
|
|
if labels.IsEmpty() {
|
|
errs = append(errs, fmt.Errorf("requested stream ID %d not found in any stream section", id))
|
|
}
|
|
}
|
|
return errors.Join(errs...)
|
|
}
|
|
|
|
// read reads the entire data object into memory and generates an arrow.Record
|
|
// from the data. It returns an error upon encountering an error while reading
|
|
// one of the sections.
|
|
func (s *dataobjScan) read() (arrow.Record, error) {
|
|
// Since [physical.DataObjScan] requires that:
|
|
//
|
|
// * Records are ordered by timestamp, and
|
|
// * Records from the same dataobjScan do not overlap in time
|
|
//
|
|
// we *must* read the entire data object before creating a record, as the
|
|
// sections in the dataobj itself are not already sorted by timestamp (though
|
|
// we only need to keep up to Limit rows in memory).
|
|
|
|
var (
|
|
heapMut sync.Mutex
|
|
heap = topk.Heap[logs.Record]{
|
|
Limit: int(s.opts.Limit),
|
|
Less: s.getLessFunc(s.opts.Direction),
|
|
}
|
|
)
|
|
|
|
g, ctx := errgroup.WithContext(s.ctx)
|
|
g.SetLimit(max(runtime.GOMAXPROCS(0)/2, 1))
|
|
|
|
var gotData atomic.Bool
|
|
|
|
for _, reader := range s.readers {
|
|
g.Go(func() error {
|
|
for {
|
|
buf := make([]logs.Record, 1024) // do not re-use buffer
|
|
n, err := reader.Read(ctx, buf)
|
|
if n == 0 && errors.Is(err, io.EOF) {
|
|
return nil
|
|
} else if err != nil && !errors.Is(err, io.EOF) {
|
|
return err
|
|
}
|
|
|
|
gotData.Store(true)
|
|
|
|
heapMut.Lock()
|
|
for _, rec := range buf[:n] {
|
|
heap.Push(rec)
|
|
}
|
|
heapMut.Unlock()
|
|
}
|
|
})
|
|
}
|
|
|
|
if err := g.Wait(); err != nil {
|
|
return nil, err
|
|
} else if !gotData.Load() {
|
|
return nil, EOF
|
|
}
|
|
|
|
projections, err := s.effectiveProjections(&heap)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("getting effective projections: %w", err)
|
|
}
|
|
|
|
schema, err := schemaFromColumns(projections)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("creating schema: %w", err)
|
|
}
|
|
|
|
// TODO(rfratto): pass allocator to builder
|
|
rb := array.NewRecordBuilder(memory.NewGoAllocator(), schema)
|
|
defer rb.Release()
|
|
|
|
records := heap.PopAll()
|
|
slices.Reverse(records)
|
|
|
|
for _, record := range records {
|
|
for i := 0; i < schema.NumFields(); i++ {
|
|
field, builder := rb.Schema().Field(i), rb.Field(i)
|
|
s.appendToBuilder(builder, &field, &record)
|
|
}
|
|
}
|
|
|
|
return rb.NewRecord(), nil
|
|
}
|
|
|
|
// getLessFunc returns a "less comparison" function for records for the sort heap.
|
|
// direction determines the search order:
|
|
// BACKWARD is a backward search starting at the end of the time range.
|
|
// FORWARD is a forward search starting at the beginning of the time range.
|
|
// If two records have the same timestamp, the compareStreams function is used to determine the sort order.
|
|
func (s *dataobjScan) getLessFunc(direction physical.SortOrder) func(a, b logs.Record) bool {
|
|
compareStreams := func(a, b logs.Record) bool {
|
|
aStream, ok := s.streams[a.StreamID]
|
|
if !ok {
|
|
return false
|
|
}
|
|
|
|
bStream, ok := s.streams[b.StreamID]
|
|
if !ok {
|
|
return true
|
|
}
|
|
|
|
return labels.Compare(aStream, bStream) < 0
|
|
}
|
|
|
|
switch direction {
|
|
case physical.ASC:
|
|
return func(a, b logs.Record) bool {
|
|
if a.Timestamp.Equal(b.Timestamp) {
|
|
compareStreams(a, b)
|
|
}
|
|
return a.Timestamp.After(b.Timestamp)
|
|
}
|
|
case physical.DESC:
|
|
return func(a, b logs.Record) bool {
|
|
if a.Timestamp.Equal(b.Timestamp) {
|
|
compareStreams(a, b)
|
|
}
|
|
return a.Timestamp.Before(b.Timestamp)
|
|
}
|
|
default:
|
|
panic("invalid direction")
|
|
}
|
|
}
|
|
|
|
// effectiveProjections returns the effective projections to return for a
|
|
// record. If s.opts.Projections is non-empty, then its column expressions are
|
|
// used for the projections.
|
|
//
|
|
// Otherwise, the set of all columns found in the heap are used, in order of:
|
|
//
|
|
// * All stream labels (sorted by name)
|
|
// * All metadata columns (sorted by name)
|
|
// * Log timestamp
|
|
// * Log message
|
|
//
|
|
// effectiveProjections does not mutate h.
|
|
func (s *dataobjScan) effectiveProjections(h *topk.Heap[logs.Record]) ([]physical.ColumnExpression, error) {
|
|
if len(s.opts.Projections) > 0 {
|
|
return s.opts.Projections, nil
|
|
}
|
|
|
|
var (
|
|
columns []physical.ColumnExpression
|
|
foundStreams = map[int64]struct{}{}
|
|
found = map[physical.ColumnExpr]struct{}{}
|
|
)
|
|
|
|
addColumn := func(name string, ty types.ColumnType) {
|
|
expr := physical.ColumnExpr{
|
|
Ref: types.ColumnRef{Column: name, Type: ty},
|
|
}
|
|
|
|
if _, ok := found[expr]; !ok {
|
|
found[expr] = struct{}{}
|
|
columns = append(columns, &expr)
|
|
}
|
|
}
|
|
|
|
for rec := range h.Range() {
|
|
stream, ok := s.streams[rec.StreamID]
|
|
if !ok {
|
|
// If we hit this, there's a problem with either initStreams (we missed a
|
|
// requested stream) or the predicate application, where it returned a
|
|
// stream we didn't want.
|
|
return nil, fmt.Errorf("stream ID %d not found in stream cache", rec.StreamID)
|
|
}
|
|
|
|
if _, addedStream := foundStreams[rec.StreamID]; !addedStream {
|
|
stream.Range(func(label labels.Label) {
|
|
addColumn(label.Name, types.ColumnTypeLabel)
|
|
})
|
|
foundStreams[rec.StreamID] = struct{}{}
|
|
}
|
|
|
|
rec.Metadata.Range(func(label labels.Label) {
|
|
addColumn(label.Name, types.ColumnTypeMetadata)
|
|
})
|
|
}
|
|
|
|
// Sort existing columns by type (preferring labels) then name.
|
|
slices.SortFunc(columns, func(a, b physical.ColumnExpression) int {
|
|
aRef, bRef := a.(*physical.ColumnExpr).Ref, b.(*physical.ColumnExpr).Ref
|
|
|
|
if aRef.Type != bRef.Type {
|
|
if aRef.Type == types.ColumnTypeLabel {
|
|
return -1 // Labels first.
|
|
}
|
|
return 1
|
|
}
|
|
|
|
return cmp.Compare(aRef.Column, bRef.Column)
|
|
})
|
|
|
|
// Add fixed columns at the end.
|
|
addColumn(types.ColumnNameBuiltinTimestamp, types.ColumnTypeBuiltin)
|
|
addColumn(types.ColumnNameBuiltinMessage, types.ColumnTypeBuiltin)
|
|
|
|
return columns, nil
|
|
}
|
|
|
|
func schemaFromColumns(columns []physical.ColumnExpression) (*arrow.Schema, error) {
|
|
var (
|
|
fields = make([]arrow.Field, 0, len(columns))
|
|
fingerprints = make(map[string]struct{}, len(columns))
|
|
)
|
|
|
|
addField := func(field arrow.Field) {
|
|
fp := field.Fingerprint()
|
|
if field.HasMetadata() {
|
|
// We differentiate column type using metadata, but the metadata isn't
|
|
// included in the fingerprint, so we need to manually include it here.
|
|
fp += field.Metadata.String()
|
|
}
|
|
|
|
if _, exist := fingerprints[fp]; exist {
|
|
return
|
|
}
|
|
|
|
fields = append(fields, field)
|
|
fingerprints[fp] = struct{}{}
|
|
}
|
|
|
|
for _, column := range columns {
|
|
columnExpr, ok := column.(*physical.ColumnExpr)
|
|
if !ok {
|
|
return nil, fmt.Errorf("invalid column expression type %T", column)
|
|
}
|
|
|
|
switch columnExpr.Ref.Type {
|
|
case types.ColumnTypeLabel:
|
|
// TODO(rfratto): Switch to dictionary encoding for labels.
|
|
//
|
|
// Since labels are more likely to repeat than metadata, we could cut
|
|
// down on the memory overhead of a record by dictionary encoding the
|
|
// labels.
|
|
//
|
|
// However, the csv package we use for testing DataObjScan currently
|
|
// (2025-05-02) doesn't support dictionary encoding, and we would need
|
|
// to find a solution there.
|
|
//
|
|
// We skipped dictionary encoding for now to get the initial prototype
|
|
// working.
|
|
ty, md := arrowTypeFromColumnRef(columnExpr.Ref)
|
|
addField(arrow.Field{
|
|
Name: columnExpr.Ref.Column,
|
|
Type: ty,
|
|
Nullable: true,
|
|
Metadata: md,
|
|
})
|
|
|
|
case types.ColumnTypeMetadata:
|
|
// Metadata is *not* encoded using dictionary encoding since metadata is
|
|
// has unconstrained cardinality. Using dictionary encoding would require
|
|
// tracking every encoded value in the record, which is likely to be too
|
|
// expensive.
|
|
ty, md := arrowTypeFromColumnRef(columnExpr.Ref)
|
|
addField(arrow.Field{
|
|
Name: columnExpr.Ref.Column,
|
|
Type: ty,
|
|
Nullable: true,
|
|
Metadata: md,
|
|
})
|
|
|
|
case types.ColumnTypeBuiltin:
|
|
ty, md := arrowTypeFromColumnRef(columnExpr.Ref)
|
|
addField(arrow.Field{
|
|
Name: columnExpr.Ref.Column,
|
|
Type: ty,
|
|
Nullable: true,
|
|
Metadata: md,
|
|
})
|
|
|
|
case types.ColumnTypeAmbiguous:
|
|
// The best handling for ambiguous columns (in terms of the schema) is to
|
|
// explode it out into multiple columns, one for each type. (Except for
|
|
// parsed, which can't be emitted from DataObjScan right now).
|
|
//
|
|
// TODO(rfratto): should ambiguity be passed down like this? It's odd for
|
|
// the returned schema to be different than the set of columns you asked
|
|
// for.
|
|
//
|
|
// As an alternative, ambiguity could be handled by the planner, where it
|
|
// performs the explosion and propagates the ambiguity down into the
|
|
// predicates.
|
|
//
|
|
// If we're ok with the schema changing from what was requested, then we
|
|
// could update this to resolve the ambiguity at [dataobjScan.effectiveProjections]
|
|
// so we don't always explode out to the full set of columns.
|
|
addField(arrow.Field{
|
|
Name: columnExpr.Ref.Column,
|
|
Type: arrow.BinaryTypes.String,
|
|
Nullable: true,
|
|
Metadata: datatype.ColumnMetadata(types.ColumnTypeLabel, datatype.String),
|
|
})
|
|
addField(arrow.Field{
|
|
Name: columnExpr.Ref.Column,
|
|
Type: arrow.BinaryTypes.String,
|
|
Nullable: true,
|
|
Metadata: datatype.ColumnMetadata(types.ColumnTypeMetadata, datatype.String),
|
|
})
|
|
|
|
case types.ColumnTypeParsed, types.ColumnTypeGenerated:
|
|
return nil, fmt.Errorf("parsed column type not supported: %s", columnExpr.Ref.Type)
|
|
}
|
|
}
|
|
|
|
return arrow.NewSchema(fields, nil), nil
|
|
}
|
|
|
|
func arrowTypeFromColumnRef(ref types.ColumnRef) (arrow.DataType, arrow.Metadata) {
|
|
if ref.Type == types.ColumnTypeBuiltin {
|
|
switch ref.Column {
|
|
case types.ColumnNameBuiltinTimestamp:
|
|
return arrow.FixedWidthTypes.Timestamp_ns, datatype.ColumnMetadataBuiltinTimestamp
|
|
case types.ColumnNameBuiltinMessage:
|
|
return arrow.BinaryTypes.String, datatype.ColumnMetadataBuiltinMessage
|
|
default:
|
|
panic(fmt.Sprintf("unsupported builtin column type %s", ref))
|
|
}
|
|
}
|
|
|
|
return arrow.BinaryTypes.String, datatype.ColumnMetadata(ref.Type, datatype.String)
|
|
}
|
|
|
|
// appendToBuilder appends a the provided field from record into the given
|
|
// builder. The metadata of field is used to determine the category of column.
|
|
// appendToBuilder panics if the type of field does not match the datatype of
|
|
// builder.
|
|
func (s *dataobjScan) appendToBuilder(builder array.Builder, field *arrow.Field, record *logs.Record) {
|
|
columnType, ok := field.Metadata.GetValue(types.MetadataKeyColumnType)
|
|
if !ok {
|
|
// This shouldn't happen; we control the metadata here on the fields.
|
|
panic(fmt.Sprintf("missing column type in field %s", field.Name))
|
|
}
|
|
|
|
switch columnType {
|
|
case types.ColumnTypeLabel.String():
|
|
stream, ok := s.streams[record.StreamID]
|
|
if !ok {
|
|
panic(fmt.Sprintf("stream ID %d not found in stream cache", record.StreamID))
|
|
}
|
|
|
|
val := stream.Get(field.Name)
|
|
if val == "" {
|
|
builder.(*array.StringBuilder).AppendNull()
|
|
} else {
|
|
builder.(*array.StringBuilder).Append(val)
|
|
}
|
|
|
|
case types.ColumnTypeMetadata.String():
|
|
val := record.Metadata.Get(field.Name)
|
|
if val == "" {
|
|
builder.(*array.StringBuilder).AppendNull()
|
|
} else {
|
|
builder.(*array.StringBuilder).Append(val)
|
|
}
|
|
|
|
case types.ColumnTypeBuiltin.String():
|
|
if field.Name == types.ColumnNameBuiltinTimestamp {
|
|
ts, _ := arrow.TimestampFromTime(record.Timestamp, arrow.Nanosecond)
|
|
builder.(*array.TimestampBuilder).Append(ts)
|
|
} else if field.Name == types.ColumnNameBuiltinMessage {
|
|
// Use the inner BinaryBuilder to avoid converting record.Line to a
|
|
// string and back.
|
|
builder.(*array.StringBuilder).BinaryBuilder.Append(record.Line)
|
|
} else {
|
|
panic(fmt.Sprintf("unsupported builtin column %s", field.Name))
|
|
}
|
|
|
|
default:
|
|
// This shouldn't happen; we control the metadata here on the fields.
|
|
panic(fmt.Sprintf("unsupported column type %s", columnType))
|
|
}
|
|
}
|
|
|
|
// Value returns the current [arrow.Record] retrieved by the previous call to
|
|
// [dataobjScan.Read], or an error if the record cannot be read.
|
|
func (s *dataobjScan) Value() (arrow.Record, error) { return s.state.batch, s.state.err }
|
|
|
|
// Close closes s and releases all resources.
|
|
func (s *dataobjScan) Close() {
|
|
for _, reader := range s.readers {
|
|
_ = reader.Close()
|
|
}
|
|
}
|
|
|
|
// Inputs implements Pipeline and returns nil, since DataObjScan accepts no
|
|
// pipelines as input.
|
|
func (s *dataobjScan) Inputs() []Pipeline { return nil }
|
|
|
|
// Transport implements Pipeline and returns [Local].
|
|
func (s *dataobjScan) Transport() Transport { return Local }
|
|
|