diff --git a/pkg/dataobj/querier/iter.go b/pkg/dataobj/querier/iter.go index 64bed55b97..7949aefaed 100644 --- a/pkg/dataobj/querier/iter.go +++ b/pkg/dataobj/querier/iter.go @@ -1,11 +1,10 @@ package querier import ( - "container/heap" "context" "fmt" "io" - "sort" + "slices" "sync" "github.com/grafana/loki/v3/pkg/dataobj" @@ -15,6 +14,7 @@ import ( "github.com/grafana/loki/v3/pkg/logql/log" "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" + "github.com/grafana/loki/v3/pkg/util/topk" ) var ( @@ -70,8 +70,12 @@ func newEntryIterator(ctx context.Context, prevStreamID int64 = -1 streamExtractor log.StreamPipeline streamHash uint64 - top = newTopK(int(req.Limit), req.Direction) - statistics = stats.FromContext(ctx) + top = topk.Heap[entryWithLabels]{ + Limit: int(req.Limit), + Less: lessFn(req.Direction), + } + + statistics = stats.FromContext(ctx) ) for { @@ -102,7 +106,7 @@ func newEntryIterator(ctx context.Context, } statistics.AddPostFilterRows(1) - top.Add(entryWithLabels{ + top.Push(entryWithLabels{ Labels: parsedLabels.String(), StreamHash: streamHash, Entry: logproto.Entry{ @@ -114,49 +118,8 @@ func newEntryIterator(ctx context.Context, }) } } - return top.Iterator(), nil -} - -// entryHeap implements a min-heap of entries based on a custom less function. -// The less function determines the ordering based on the direction (FORWARD/BACKWARD). -// For FORWARD direction: -// - When comparing timestamps: entry.Timestamp.After(b) means 'a' is "less" than 'b' -// - Example: [t3, t2, t1] where t3 is most recent, t3 will be at the root (index 0) -// -// For BACKWARD direction: -// - When comparing timestamps: entry.Timestamp.Before(b) means 'a' is "less" than 'b' -// - Example: [t1, t2, t3] where t1 is oldest, t1 will be at the root (index 0) -// -// In both cases: -// - When timestamps are equal, we use labels as a tiebreaker -// - The root of the heap (index 0) contains the entry we want to evict first -type entryHeap struct { - less func(a, b entryWithLabels) bool - entries []entryWithLabels -} - -func (h *entryHeap) Push(x any) { - h.entries = append(h.entries, x.(entryWithLabels)) -} - -func (h *entryHeap) Pop() any { - old := h.entries - n := len(old) - x := old[n-1] - h.entries = old[:n-1] - return x -} - -func (h *entryHeap) Len() int { - return len(h.entries) -} - -func (h *entryHeap) Less(i, j int) bool { - return h.less(h.entries[i], h.entries[j]) -} -func (h *entryHeap) Swap(i, j int) { - h.entries[i], h.entries[j] = h.entries[j], h.entries[i] + return heapIterator(&top), nil } func lessFn(direction logproto.Direction) func(a, b entryWithLabels) bool { @@ -180,55 +143,22 @@ func lessFn(direction logproto.Direction) func(a, b entryWithLabels) bool { } } -// topk maintains a min-heap of the k most relevant entries. -// The heap is ordered by timestamp (and labels as tiebreaker) based on the direction: -// - For FORWARD: keeps k oldest entries by evicting newest entries first -// Example with k=3: If entries arrive as [t1,t2,t3,t4,t5], heap will contain [t1,t2,t3] -// - For BACKWARD: keeps k newest entries by evicting oldest entries first -// Example with k=3: If entries arrive as [t1,t2,t3,t4,t5], heap will contain [t3,t4,t5] -type topk struct { - k int - minHeap entryHeap -} - -func newTopK(k int, direction logproto.Direction) *topk { - if k <= 0 { - panic("k must be greater than 0") - } - entries := entryWithLabelsPool.Get().(*[]entryWithLabels) - - return &topk{ - k: k, - minHeap: entryHeap{ - less: lessFn(direction), - entries: (*entries)[:0], - }, - } -} - -// Add adds a new entry to the topk heap. -// If the heap has less than k entries, the entry is added directly. -// Otherwise, if the new entry should be evicted before the root (index 0), -// it is discarded. If not, the root is popped (discarded) and the new entry is pushed. -// -// For FORWARD direction: -// - Root contains newest entry (to be evicted first) -// - New entries that are newer than root are discarded -// Example: With k=3 and heap=[t1,t2,t3], a new entry t4 is discarded -// -// For BACKWARD direction: -// - Root contains oldest entry (to be evicted first) -// - New entries that are older than root are discarded -// Example: With k=3 and heap=[t3,t4,t5], a new entry t2 is discarded -func (t *topk) Add(r entryWithLabels) { - if t.minHeap.Len() < t.k { - heap.Push(&t.minHeap, r) - return - } - if t.minHeap.less(t.minHeap.entries[0], r) { - _ = heap.Pop(&t.minHeap) - heap.Push(&t.minHeap, r) - } +// heapIterator creates a new EntryIterator for the given topk heap. After +// calling heapIterator, h is emptied. +func heapIterator(h *topk.Heap[entryWithLabels]) iter.EntryIterator { + elems := h.PopAll() + + // We need to reverse the order of the entries in the slice to maintain the order of logs we + // want to return: + // + // For FORWARD direction, we want smallest timestamps first (but the heap is + // ordered by largest timestamps first due to lessFn). + // + // For BACKWARD direction, we want largest timestamps first (but the heap is + // ordered by smallest timestamps first due to lessFn). + slices.Reverse(elems) + + return &sliceIterator{entries: elems} } type sliceIterator struct { @@ -236,18 +166,6 @@ type sliceIterator struct { curr entryWithLabels } -func (t *topk) Iterator() iter.EntryIterator { - // We swap i and j in the less comparison to reverse the ordering from the minHeap. - // The minHeap is ordered such that the entry to evict is at index 0. - // For FORWARD: newest entries are evicted first, so we want oldest entries first in the final slice - // For BACKWARD: oldest entries are evicted first, so we want newest entries first in the final slice - // By swapping i and j, we effectively reverse the minHeap ordering to get the desired final ordering - sort.Slice(t.minHeap.entries, func(i, j int) bool { - return t.minHeap.less(t.minHeap.entries[j], t.minHeap.entries[i]) - }) - return &sliceIterator{entries: t.minHeap.entries} -} - func (s *sliceIterator) Next() bool { if len(s.entries) == 0 { return false diff --git a/pkg/dataobj/querier/iter_test.go b/pkg/dataobj/querier/iter_test.go index cff55b727b..addeffd21c 100644 --- a/pkg/dataobj/querier/iter_test.go +++ b/pkg/dataobj/querier/iter_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/util/topk" ) // makeEntry is a helper function to create a log entry with given timestamp and line @@ -94,19 +95,20 @@ func TestTopKIterator(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // Create topk iterator - top := &topk{ - k: tt.k, - minHeap: entryHeap{less: lessFn(tt.direction)}, + top := topk.Heap[entryWithLabels]{ + Limit: tt.k, + Less: lessFn(tt.direction), } // Add entries for _, e := range tt.input { - top.Add(e) + top.Push(e) } // Collect results var got []entryWithLabels - iter := top.Iterator() + + iter := heapIterator(&top) for iter.Next() { got = append(got, entryWithLabels{ Entry: iter.At(), diff --git a/pkg/engine/executor/dataobjscan.go b/pkg/engine/executor/dataobjscan.go new file mode 100644 index 0000000000..738b3a873a --- /dev/null +++ b/pkg/engine/executor/dataobjscan.go @@ -0,0 +1,554 @@ +package executor + +import ( + "cmp" + "context" + "errors" + "fmt" + "io" + "slices" + "sync" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/prometheus/prometheus/model/labels" + "go.uber.org/atomic" + "golang.org/x/sync/errgroup" + + "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/engine/internal/types" + "github.com/grafana/loki/v3/pkg/engine/planner/physical" + "github.com/grafana/loki/v3/pkg/util/topk" +) + +type dataobjScan struct { + ctx context.Context + opts dataobjScanOptions + + initialized bool + readers []*dataobj.LogsReader + streams map[int64]labels.Labels + + state state +} + +type dataobjScanOptions struct { + // TODO(rfratto): Limiting each DataObjScan to a single section is going to + // be critical for limiting memory overhead here; the section is intended to + // be the smallest unit of parallelization. + + Object *dataobj.Object // Object to read from. + StreamIDs []int64 // Stream IDs to match from logs sections. + Predicates []dataobj.LogsPredicate // Predicate to apply to the logs. + Projections []physical.ColumnExpression // Columns to include. An empty slice means all columns. + + Direction physical.Direction // Direction of timestamps to return. + Limit uint32 // A limit on the number of rows to return (0=unlimited). +} + +var _ Pipeline = (*dataobjScan)(nil) + +// newDataobjScanPipeline creates a new Pipeline which emits a single +// [arrow.Record] composed of all log sections in a data object. Rows in the +// returned record are ordered by timestamp in the direction specified by +// opts.Direction. +func newDataobjScanPipeline(ctx context.Context, opts dataobjScanOptions) *dataobjScan { + return &dataobjScan{ctx: ctx, opts: opts} +} + +// Read retrieves the next [arrow.Record] from the dataobj. +func (s *dataobjScan) Read() error { + if err := s.init(); err != nil { + return err + } + + rec, err := s.read() + s.state = newState(rec, err) + + if err != nil { + return fmt.Errorf("reading data object: %w", err) + } + return nil +} + +func (s *dataobjScan) init() error { + if s.initialized { + return nil + } + + md, err := s.opts.Object.Metadata(s.ctx) + if err != nil { + return fmt.Errorf("reading metadata: %w", err) + } + + if err := s.initStreams(md); err != nil { + return fmt.Errorf("initializing streams: %w", err) + } + + s.readers = make([]*dataobj.LogsReader, 0, md.LogsSections) + + for section := range md.LogsSections { + // TODO(rfratto): There's a few problems with using LogsReader as it is: + // + // 1. LogsReader doesn't support providing a subset of columns to read + // from, so we're applying projections after reading. + // + // 2. LogsReader is intended to be pooled to reduce memory, but we're + // creating a new one every time here. + // + // For the sake of the initial implementation I'm ignoring these issues, + // but we'll absolutely need to solve this prior to production use. + lr := dataobj.NewLogsReader(s.opts.Object, section) + + // The calls below can't fail because we're always using a brand new logs + // reader. + _ = lr.MatchStreams(slices.Values(s.opts.StreamIDs)) + _ = lr.SetPredicates(s.opts.Predicates) + + s.readers = append(s.readers, lr) + } + + s.initialized = true + return nil +} + +// initStreams retrieves all requested stream records from streams sections so +// that emitted [arrow.Record]s can include stream labels in results. +func (s *dataobjScan) initStreams(md dataobj.Metadata) error { + var sr dataobj.StreamsReader + + streams := make([]dataobj.Stream, 512) + + // Initialize entries in the map so we can do a presence test in the loop + // below. + s.streams = make(map[int64]labels.Labels, len(s.opts.StreamIDs)) + for _, id := range s.opts.StreamIDs { + s.streams[id] = nil + } + + for section := range md.StreamsSections { + // TODO(rfratto): dataobj.StreamsPredicate is missing support for filtering + // on stream IDs when we already know them in advance; this can cause the + // Read here to take longer than it needs to since we're reading the + // entirety of every row. + sr.Reset(s.opts.Object, section) + + for { + n, err := sr.Read(s.ctx, streams) + if n == 0 && errors.Is(err, io.EOF) { + return nil + } else if err != nil && !errors.Is(err, io.EOF) { + return err + } + + for i, stream := range streams[:n] { + if _, found := s.streams[stream.ID]; !found { + continue + } + + s.streams[stream.ID] = stream.Labels + + // Zero out the stream entry from the slice so the next call to sr.Read + // doesn't overwrite any memory we just moved to s.streams. + streams[i] = dataobj.Stream{} + } + } + } + + // Check that all streams were populated. + var errs []error + for id, labels := range s.streams { + if labels == nil { + errs = append(errs, fmt.Errorf("requested stream ID %d not found in any stream section", id)) + } + } + return errors.Join(errs...) +} + +// read reads the entire data object into memory and generates an arrow.Record +// from the data. It returns an error upon encountering an error while reading +// one of the sections. +func (s *dataobjScan) read() (arrow.Record, error) { + // Since [physical.DataObjScan] requires that: + // + // * Records are ordered by timestamp, and + // * Records from the same dataobjScan do not overlap in time + // + // we *must* read the entire data object before creating a record, as the + // sections in the dataobj itself are not already sorted by timestamp (though + // we only need to keep up to Limit rows in memory). + + var ( + heapMut sync.Mutex + heap = topk.Heap[dataobj.Record]{ + Limit: int(s.opts.Limit), + Less: s.getLessFunc(s.opts.Direction), + } + ) + + g, ctx := errgroup.WithContext(s.ctx) + + var gotData atomic.Bool + + for _, reader := range s.readers { + g.Go(func() error { + buf := make([]dataobj.Record, 512) + + for { + n, err := reader.Read(ctx, buf) + if n == 0 && errors.Is(err, io.EOF) { + return nil + } else if err != nil && !errors.Is(err, io.EOF) { + return err + } + + gotData.Store(true) + + heapMut.Lock() + for _, rec := range buf[:n] { + heap.Push(rec) + } + heapMut.Unlock() + } + }) + } + + if err := g.Wait(); err != nil { + return nil, err + } else if !gotData.Load() { + return nil, EOF + } + + projections, err := s.effectiveProjections(&heap) + if err != nil { + return nil, fmt.Errorf("getting effective projections: %w", err) + } + + schema, err := schemaFromColumns(projections) + if err != nil { + return nil, fmt.Errorf("creating schema: %w", err) + } + + // TODO(rfratto): pass allocator to builder + rb := array.NewRecordBuilder(memory.NewGoAllocator(), schema) + defer rb.Release() + + records := heap.PopAll() + slices.Reverse(records) + + for _, record := range records { + for i := 0; i < schema.NumFields(); i++ { + field, builder := rb.Schema().Field(i), rb.Field(i) + s.appendToBuilder(builder, &field, &record) + } + } + + return rb.NewRecord(), nil +} + +func (s *dataobjScan) getLessFunc(dir physical.Direction) func(a, b dataobj.Record) bool { + // compareStreams is used when two records have the same timestamp. + compareStreams := func(a, b dataobj.Record) bool { + aStream, ok := s.streams[a.StreamID] + if !ok { + return false + } + + bStream, ok := s.streams[b.StreamID] + if !ok { + return true + } + + return labels.Compare(aStream, bStream) < 0 + } + + switch dir { + case physical.Forward: + return func(a, b dataobj.Record) bool { + if a.Timestamp.Equal(b.Timestamp) { + compareStreams(a, b) + } + return a.Timestamp.After(b.Timestamp) + } + case physical.Backwards: + return func(a, b dataobj.Record) bool { + if a.Timestamp.Equal(b.Timestamp) { + compareStreams(a, b) + } + return a.Timestamp.Before(b.Timestamp) + } + default: + panic("invalid direction") + } +} + +// effectiveProjections returns the effective projections to return for a +// record. If s.opts.Projections is non-empty, then its column expressions are +// used for the projections. +// +// Otherwise, the set of all columns found in the heap are used, in order of: +// +// * All stream labels (sorted by name) +// * All metadata columns (sorted by name) +// * Log timestamp +// * Log message +// +// effectiveProjections does not mutate h. +func (s *dataobjScan) effectiveProjections(h *topk.Heap[dataobj.Record]) ([]physical.ColumnExpression, error) { + if len(s.opts.Projections) > 0 { + return s.opts.Projections, nil + } + + var ( + columns []physical.ColumnExpression + foundStreams = map[int64]struct{}{} + found = map[physical.ColumnExpr]struct{}{} + ) + + addColumn := func(name string, ty types.ColumnType) { + expr := physical.ColumnExpr{ + Ref: types.ColumnRef{Column: name, Type: ty}, + } + + if _, ok := found[expr]; !ok { + found[expr] = struct{}{} + columns = append(columns, &expr) + } + } + + for rec := range h.Range() { + stream, ok := s.streams[rec.StreamID] + if !ok { + // If we hit this, there's a problem with either initStreams (we missed a + // requested stream) or the predicate application, where it returned a + // stream we didn't want. + return nil, fmt.Errorf("stream ID %d not found in stream cache", rec.StreamID) + } + + if _, addedStream := foundStreams[rec.StreamID]; !addedStream { + for _, label := range stream { + addColumn(label.Name, types.ColumnTypeLabel) + } + foundStreams[rec.StreamID] = struct{}{} + } + + for _, md := range rec.Metadata { + addColumn(md.Name, types.ColumnTypeMetadata) + } + } + + // Sort existing columns by type (preferring labels) then name. + slices.SortFunc(columns, func(a, b physical.ColumnExpression) int { + aRef, bRef := a.(*physical.ColumnExpr).Ref, b.(*physical.ColumnExpr).Ref + + if aRef.Type != bRef.Type { + if aRef.Type == types.ColumnTypeLabel { + return -1 // Labels first. + } + return 1 + } + + return cmp.Compare(aRef.Column, bRef.Column) + }) + + // Add fixed columns at the end. + addColumn("timestamp", types.ColumnTypeBuiltin) + addColumn("message", types.ColumnTypeBuiltin) + + return columns, nil +} + +func schemaFromColumns(columns []physical.ColumnExpression) (*arrow.Schema, error) { + var ( + fields = make([]arrow.Field, 0, len(columns)) + fingerprints = make(map[string]struct{}, len(columns)) + ) + + addField := func(field arrow.Field) { + fp := field.Fingerprint() + if field.HasMetadata() { + // We differentiate column type using metadata, but the metadata isn't + // included in the fingerprint, so we need to manually include it here. + fp += field.Metadata.String() + } + + if _, exist := fingerprints[fp]; exist { + return + } + + fields = append(fields, field) + fingerprints[fp] = struct{}{} + } + + for _, column := range columns { + columnExpr, ok := column.(*physical.ColumnExpr) + if !ok { + return nil, fmt.Errorf("invalid column expression type %T", column) + } + + md := arrow.MetadataFrom(map[string]string{ + types.ColumnTypeMetadataKey: columnExpr.Ref.Type.String(), + }) + + switch columnExpr.Ref.Type { + case types.ColumnTypeLabel: + // TODO(rfratto): Switch to dictionary encoding for labels. + // + // Since labels are more likely to repeat than metadata, we could cut + // down on the memory overhead of a record by dictionary encoding the + // labels. + // + // However, the csv package we use for testing DataObjScan currently + // (2025-05-02) doesn't support dictionary encoding, and we would need + // to find a solution there. + // + // We skipped dictionary encoding for now to get the initial prototype + // working. + addField(arrow.Field{ + Name: columnExpr.Ref.Column, + Type: arrow.BinaryTypes.String, + Nullable: true, + Metadata: md, + }) + + case types.ColumnTypeMetadata: + // Metadata is *not* encoded using dictionary encoding since metadata is + // has unconstrained cardinality. Using dictionary encoding would require + // tracking every encoded value in the record, which is likely to be too + // expensive. + addField(arrow.Field{ + Name: columnExpr.Ref.Column, + Type: arrow.BinaryTypes.String, + Nullable: true, + Metadata: md, + }) + + case types.ColumnTypeBuiltin: + addField(arrow.Field{ + Name: columnExpr.Ref.Column, + Type: builtinColumnType(columnExpr.Ref), + Nullable: true, + Metadata: md, + }) + + case types.ColumnTypeAmbiguous: + // The best handling for ambiguous columns (in terms of the schema) is to + // explode it out into multiple columns, one for each type. (Except for + // parsed, which can't be emitted from DataObjScan right now). + // + // TODO(rfratto): should ambiguity be passed down like this? It's odd for + // the returned schema to be different than the set of columns you asked + // for. + // + // As an alternative, ambiguity could be handled by the planner, where it + // performs the explosion and propagates the ambiguity down into the + // predicates. + // + // If we're ok with the schema changing from what was requested, then we + // could update this to resolve the ambiguity at [dataobjScan.effectiveProjections] + // so we don't always explode out to the full set of columns. + addField(arrow.Field{ + Name: columnExpr.Ref.Column, + Type: arrow.BinaryTypes.String, + Nullable: true, + Metadata: arrow.MetadataFrom(map[string]string{types.ColumnTypeMetadataKey: types.ColumnTypeLabel.String()}), + }) + addField(arrow.Field{ + Name: columnExpr.Ref.Column, + Type: arrow.BinaryTypes.String, + Nullable: true, + Metadata: arrow.MetadataFrom(map[string]string{types.ColumnTypeMetadataKey: types.ColumnTypeMetadata.String()}), + }) + + case types.ColumnTypeParsed: + return nil, fmt.Errorf("parsed column type not supported: %s", columnExpr.Ref.Type) + } + } + + return arrow.NewSchema(fields, nil), nil +} + +func builtinColumnType(ref types.ColumnRef) arrow.DataType { + if ref.Type != types.ColumnTypeBuiltin { + panic("builtinColumnType called with a non-builtin column") + } + + switch ref.Column { + case "timestamp": + return arrow.FixedWidthTypes.Timestamp_ns + case "message": + return arrow.BinaryTypes.String + default: + panic(fmt.Sprintf("unsupported builtin column type %s", ref)) + } +} + +// appendToBuilder appends a the provided field from record into the given +// builder. The metadata of field is used to determine the category of column. +// appendToBuilder panics if the type of field does not match the datatype of +// builder. +func (s *dataobjScan) appendToBuilder(builder array.Builder, field *arrow.Field, record *dataobj.Record) { + columnType, ok := field.Metadata.GetValue(types.ColumnTypeMetadataKey) + if !ok { + // This shouldn't happen; we control the metadata here on the fields. + panic(fmt.Sprintf("missing column type in field %s", field.Name)) + } + + switch columnType { + case types.ColumnTypeLabel.String(): + stream, ok := s.streams[record.StreamID] + if !ok { + panic(fmt.Sprintf("stream ID %d not found in stream cache", record.StreamID)) + } + + val := stream.Get(field.Name) + if val == "" { + builder.(*array.StringBuilder).AppendNull() + } else { + builder.(*array.StringBuilder).Append(val) + } + + case types.ColumnTypeMetadata.String(): + val := record.Metadata.Get(field.Name) + if val == "" { + builder.(*array.StringBuilder).AppendNull() + } else { + builder.(*array.StringBuilder).Append(val) + } + + case types.ColumnTypeBuiltin.String(): + if field.Name == "timestamp" { + ts, _ := arrow.TimestampFromTime(record.Timestamp, arrow.Nanosecond) + builder.(*array.TimestampBuilder).Append(ts) + } else if field.Name == "message" { + // Use the inner BinaryBuilder to avoid converting record.Line to a + // string and back. + builder.(*array.StringBuilder).BinaryBuilder.Append(record.Line) + } else { + panic(fmt.Sprintf("unsupported builtin column %s", field.Name)) + } + + default: + // This shouldn't happen; we control the metadata here on the fields. + panic(fmt.Sprintf("unsupported column type %s", columnType)) + } +} + +// Value returns the current [arrow.Record] retrieved by the previous call to +// [dataobjScan.Read], or an error if the record cannot be read. +func (s *dataobjScan) Value() (arrow.Record, error) { return s.state.batch, s.state.err } + +// Close closes s and releases all resources. +func (s *dataobjScan) Close() { + for _, reader := range s.readers { + _ = reader.Close() + } +} + +// Inputs implements Pipeline and returns nil, since DataObjScan accepts no +// pipelines as input. +func (s *dataobjScan) Inputs() []Pipeline { return nil } + +// Transport implements Pipeline and returns [Local]. +func (s *dataobjScan) Transport() Transport { return Local } diff --git a/pkg/engine/executor/dataobjscan_predicate.go b/pkg/engine/executor/dataobjscan_predicate.go new file mode 100644 index 0000000000..990c6786d6 --- /dev/null +++ b/pkg/engine/executor/dataobjscan_predicate.go @@ -0,0 +1,47 @@ +package executor + +import ( + "fmt" + + "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/engine/planner/physical" +) + +// buildLogsPredicate builds a logs predicate from an expression. +func buildLogsPredicate(_ physical.Expression) (dataobj.LogsPredicate, error) { + // TODO(rfratto): implement converting expressions into logs predicates. + // + // There's a few challenges here: + // + // - Expressions do not cleanly map to [dataobj.LogsPredicate]s. For example, + // an expression may be simply a column reference, but a logs predicate is + // always some expression that can evaluate to true. + // + // - Mapping expressions into [dataobj.TimeRangePredicate] is a massive pain; + // since TimeRangePredicate specifies both bounds for the time range, we + // would need to find and collapse multiple physical.Expressions into a + // single TimeRangePredicate. + // + // - While [dataobj.MetadataMatcherPredicate] and + // [dataobj.LogMessageFilterPredicate] are catch-alls for function-based + // predicates, they are row-based and not column-based, so our + // expressionEvaluator cannot be used here. + // + // Long term, we likely want two things: + // + // 1. Use dataset.Reader and dataset.Predicate directly instead of + // dataobj.LogsReader. + // + // 2. Update dataset.Reader to be vector based instead of row-based. + // + // It's not clear if we should resolve the issues with LogsPredicate (or find + // hacks to make them work in the short term), or skip straight to using + // dataset.Reader instead. + // + // Implementing DataObjScan in the dataobj package would be a clean way to + // handle all of this, but that would cause cyclic dependencies. I also don't + // think we should start removing things from internal for this; we can probably + // find a way to remove the explicit dependency from the dataobj package from + // the physical planner instead. + return nil, fmt.Errorf("logs predicate conversion is not supported") +} diff --git a/pkg/engine/executor/dataobjscan_test.go b/pkg/engine/executor/dataobjscan_test.go new file mode 100644 index 0000000000..845e074a8d --- /dev/null +++ b/pkg/engine/executor/dataobjscan_test.go @@ -0,0 +1,301 @@ +package executor + +import ( + "bytes" + "math" + "testing" + "time" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/dataobj" + "github.com/grafana/loki/v3/pkg/engine/internal/types" + "github.com/grafana/loki/v3/pkg/engine/planner/physical" + "github.com/grafana/loki/v3/pkg/logproto" + + "github.com/grafana/loki/pkg/push" +) + +var ( + labelMD = buildMetadata(types.ColumnTypeLabel) + metadataMD = buildMetadata(types.ColumnTypeMetadata) + builtinMD = buildMetadata(types.ColumnTypeBuiltin) +) + +func buildMetadata(ty types.ColumnType) arrow.Metadata { + return arrow.MetadataFrom(map[string]string{ + types.ColumnTypeMetadataKey: ty.String(), + }) +} + +func Test_dataobjScan(t *testing.T) { + obj := buildDataobj(t, []logproto.Stream{ + { + Labels: `{service="loki", env="prod"}`, + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(5, 0), + Line: "hello world", + StructuredMetadata: []push.LabelAdapter{{Name: "guid", Value: "aaaa-bbbb-cccc-dddd"}}, + }, + { + Timestamp: time.Unix(10, 0), + Line: "goodbye world", + StructuredMetadata: []push.LabelAdapter{{Name: "guid", Value: "eeee-ffff-aaaa-bbbb"}}, + }, + }, + }, + { + Labels: `{service="notloki", env="prod"}`, + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(2, 0), + Line: "hello world", + StructuredMetadata: []push.LabelAdapter{{Name: "pod", Value: "notloki-pod-1"}}, + }, + { + Timestamp: time.Unix(3, 0), + Line: "goodbye world", + StructuredMetadata: []push.LabelAdapter{{Name: "pod", Value: "notloki-pod-1"}}, + }, + }, + }, + }) + + t.Run("All columns", func(t *testing.T) { + pipeline := newDataobjScanPipeline(t.Context(), dataobjScanOptions{ + Object: obj, + StreamIDs: []int64{1, 2}, // All streams + Projections: nil, // All columns + Direction: physical.Forward, + Limit: 0, // No limit + }) + + expectFields := []arrow.Field{ + {Name: "env", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true}, + {Name: "service", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true}, + {Name: "guid", Type: arrow.BinaryTypes.String, Metadata: metadataMD, Nullable: true}, + {Name: "pod", Type: arrow.BinaryTypes.String, Metadata: metadataMD, Nullable: true}, + {Name: "timestamp", Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: builtinMD, Nullable: true}, + {Name: "message", Type: arrow.BinaryTypes.String, Metadata: builtinMD, Nullable: true}, + } + + expectCSV := `prod,notloki,NULL,notloki-pod-1,1970-01-01 00:00:02,hello world +prod,notloki,NULL,notloki-pod-1,1970-01-01 00:00:03,goodbye world +prod,loki,aaaa-bbbb-cccc-dddd,NULL,1970-01-01 00:00:05,hello world +prod,loki,eeee-ffff-aaaa-bbbb,NULL,1970-01-01 00:00:10,goodbye world` + + expectRecord, err := CSVToArrow(expectFields, expectCSV) + require.NoError(t, err) + defer expectRecord.Release() + + AssertPipelinesEqual(t, pipeline, NewBufferedPipeline(expectRecord)) + }) + + t.Run("Column subset", func(t *testing.T) { + pipeline := newDataobjScanPipeline(t.Context(), dataobjScanOptions{ + Object: obj, + StreamIDs: []int64{1, 2}, // All streams + Projections: []physical.ColumnExpression{ + &physical.ColumnExpr{Ref: types.ColumnRef{Column: "timestamp", Type: types.ColumnTypeBuiltin}}, + &physical.ColumnExpr{Ref: types.ColumnRef{Column: "env", Type: types.ColumnTypeLabel}}, + }, + Direction: physical.Forward, + Limit: 0, // No limit + }) + + expectFields := []arrow.Field{ + {Name: "timestamp", Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: builtinMD, Nullable: true}, + {Name: "env", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true}, + } + + expectCSV := `1970-01-01 00:00:02,prod +1970-01-01 00:00:03,prod +1970-01-01 00:00:05,prod +1970-01-01 00:00:10,prod` + + expectRecord, err := CSVToArrow(expectFields, expectCSV) + require.NoError(t, err) + defer expectRecord.Release() + + AssertPipelinesEqual(t, pipeline, NewBufferedPipeline(expectRecord)) + }) + + t.Run("Unknown column", func(t *testing.T) { + // Here, we'll check for a column which only exists once in the dataobj but is + // ambiguous from the perspective of the caller. + pipeline := newDataobjScanPipeline(t.Context(), dataobjScanOptions{ + Object: obj, + StreamIDs: []int64{1, 2}, // All streams + Projections: []physical.ColumnExpression{ + &physical.ColumnExpr{Ref: types.ColumnRef{Column: "env", Type: types.ColumnTypeAmbiguous}}, + }, + Direction: physical.Forward, + Limit: 0, // No limit + }) + + expectFields := []arrow.Field{ + {Name: "env", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true}, + {Name: "env", Type: arrow.BinaryTypes.String, Metadata: metadataMD, Nullable: true}, + } + + expectCSV := `prod,NULL +prod,NULL +prod,NULL +prod,NULL` + + expectRecord, err := CSVToArrow(expectFields, expectCSV) + require.NoError(t, err) + defer expectRecord.Release() + + AssertPipelinesEqual(t, pipeline, NewBufferedPipeline(expectRecord)) + }) +} + +func Test_dataobjScan_DuplicateColumns(t *testing.T) { + obj := buildDataobj(t, []logproto.Stream{ + // Case 1: A single row has a value for a label and metadata column with + // the same name. + { + Labels: `{service="loki", env="prod", pod="pod-1"}`, + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(1, 0), + Line: "message 1", + StructuredMetadata: []push.LabelAdapter{{Name: "pod", Value: "override"}}, + }, + }, + }, + + // Case 2: A label and metadata column share a name but have values in + // different rows. + { + Labels: `{service="loki", env="prod"}`, + Entries: []logproto.Entry{{ + Timestamp: time.Unix(2, 0), + Line: "message 2", + StructuredMetadata: []push.LabelAdapter{{Name: "namespace", Value: "namespace-1"}}, + }}, + }, + { + Labels: `{service="loki", env="prod", namespace="namespace-2"}`, + Entries: []logproto.Entry{{ + Timestamp: time.Unix(3, 0), + Line: "message 3", + StructuredMetadata: nil, + }}, + }, + }) + + t.Run("All columns", func(t *testing.T) { + pipeline := newDataobjScanPipeline(t.Context(), dataobjScanOptions{ + Object: obj, + StreamIDs: []int64{1, 2, 3}, // All streams + Projections: nil, // All columns + Direction: physical.Forward, + Limit: 0, // No limit + }) + + expectFields := []arrow.Field{ + {Name: "env", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true}, + {Name: "namespace", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true}, + {Name: "pod", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true}, + {Name: "service", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true}, + + {Name: "namespace", Type: arrow.BinaryTypes.String, Metadata: metadataMD, Nullable: true}, + {Name: "pod", Type: arrow.BinaryTypes.String, Metadata: metadataMD, Nullable: true}, + + {Name: "timestamp", Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: builtinMD, Nullable: true}, + {Name: "message", Type: arrow.BinaryTypes.String, Metadata: builtinMD, Nullable: true}, + } + + expectCSV := `prod,NULL,pod-1,loki,NULL,override,1970-01-01 00:00:01,message 1 +prod,NULL,NULL,loki,namespace-1,NULL,1970-01-01 00:00:02,message 2 +prod,namespace-2,NULL,loki,NULL,NULL,1970-01-01 00:00:03,message 3` + + expectRecord, err := CSVToArrow(expectFields, expectCSV) + require.NoError(t, err) + defer expectRecord.Release() + + AssertPipelinesEqual(t, pipeline, NewBufferedPipeline(expectRecord)) + }) + + t.Run("Ambiguous pod", func(t *testing.T) { + pipeline := newDataobjScanPipeline(t.Context(), dataobjScanOptions{ + Object: obj, + StreamIDs: []int64{1, 2, 3}, // All streams + Projections: []physical.ColumnExpression{ + &physical.ColumnExpr{Ref: types.ColumnRef{Column: "pod", Type: types.ColumnTypeAmbiguous}}, + }, + Direction: physical.Forward, + Limit: 0, // No limit + }) + + expectFields := []arrow.Field{ + {Name: "pod", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true}, + {Name: "pod", Type: arrow.BinaryTypes.String, Metadata: metadataMD, Nullable: true}, + } + + expectCSV := `pod-1,override +NULL,NULL +NULL,NULL` + + expectRecord, err := CSVToArrow(expectFields, expectCSV) + require.NoError(t, err) + defer expectRecord.Release() + + AssertPipelinesEqual(t, pipeline, NewBufferedPipeline(expectRecord)) + }) + + t.Run("Ambiguous namespace", func(t *testing.T) { + pipeline := newDataobjScanPipeline(t.Context(), dataobjScanOptions{ + Object: obj, + StreamIDs: []int64{1, 2, 3}, // All streams + Projections: []physical.ColumnExpression{ + &physical.ColumnExpr{Ref: types.ColumnRef{Column: "namespace", Type: types.ColumnTypeAmbiguous}}, + }, + Direction: physical.Forward, + Limit: 0, // No limit + }) + + expectFields := []arrow.Field{ + {Name: "namespace", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true}, + {Name: "namespace", Type: arrow.BinaryTypes.String, Metadata: metadataMD, Nullable: true}, + } + + expectCSV := `NULL,NULL +NULL,namespace-1 +namespace-2,NULL` + + expectRecord, err := CSVToArrow(expectFields, expectCSV) + require.NoError(t, err) + defer expectRecord.Release() + + AssertPipelinesEqual(t, pipeline, NewBufferedPipeline(expectRecord)) + }) +} + +func buildDataobj(t testing.TB, streams []logproto.Stream) *dataobj.Object { + t.Helper() + + builder, err := dataobj.NewBuilder(dataobj.BuilderConfig{ + TargetPageSize: 8_000, + TargetObjectSize: math.MaxInt, + TargetSectionSize: 32_000, + BufferSize: 8_000, + SectionStripeMergeLimit: 2, + }) + require.NoError(t, err) + + for _, stream := range streams { + require.NoError(t, builder.Append(stream)) + } + + var buf bytes.Buffer + _, err = builder.Flush(&buf) + require.NoError(t, err) + + r := bytes.NewReader(buf.Bytes()) + return dataobj.FromReaderAt(r, r.Size()) +} diff --git a/pkg/engine/executor/executor.go b/pkg/engine/executor/executor.go index 1f55350ad8..fb048a287f 100644 --- a/pkg/engine/executor/executor.go +++ b/pkg/engine/executor/executor.go @@ -5,17 +5,22 @@ import ( "errors" "fmt" + "github.com/thanos-io/objstore" + + "github.com/grafana/loki/v3/pkg/dataobj" "github.com/grafana/loki/v3/pkg/engine/planner/physical" ) type Config struct { BatchSize int64 `yaml:"batch_size"` + Bucket objstore.Bucket } func Run(ctx context.Context, cfg Config, plan *physical.Plan) Pipeline { c := &Context{ plan: plan, batchSize: cfg.BatchSize, + bucket: cfg.Bucket, } if plan == nil { return errorPipeline(errors.New("plan is nil")) @@ -32,6 +37,7 @@ type Context struct { batchSize int64 plan *physical.Plan evaluator expressionEvaluator + bucket objstore.Bucket } func (c *Context) execute(ctx context.Context, node physical.Node) Pipeline { @@ -57,8 +63,26 @@ func (c *Context) execute(ctx context.Context, node physical.Node) Pipeline { } } -func (c *Context) executeDataObjScan(_ context.Context, _ *physical.DataObjScan) Pipeline { - return errorPipeline(errNotImplemented) +func (c *Context) executeDataObjScan(ctx context.Context, node *physical.DataObjScan) Pipeline { + predicates := make([]dataobj.LogsPredicate, 0, len(node.Predicates)) + + for _, p := range node.Predicates { + conv, err := buildLogsPredicate(p) + if err != nil { + return errorPipeline(err) + } + predicates = append(predicates, conv) + } + + return newDataobjScanPipeline(ctx, dataobjScanOptions{ + Object: dataobj.FromBucket(c.bucket, string(node.Location)), + StreamIDs: node.StreamIDs, + Predicates: predicates, + Projections: node.Projections, + + Direction: node.Direction, + Limit: node.Limit, + }) } func (c *Context) executeSortMerge(_ context.Context, sortmerge *physical.SortMerge, inputs []Pipeline) Pipeline { diff --git a/pkg/engine/executor/executor_test.go b/pkg/engine/executor/executor_test.go index b2198613c5..83bf5e83b5 100644 --- a/pkg/engine/executor/executor_test.go +++ b/pkg/engine/executor/executor_test.go @@ -23,15 +23,6 @@ func TestExecutor(t *testing.T) { }) } -func TestExecutor_DataObjScan(t *testing.T) { - t.Run("is not implemented", func(t *testing.T) { - c := &Context{} - pipeline := c.executeDataObjScan(context.TODO(), &physical.DataObjScan{}) - err := pipeline.Read() - require.ErrorContains(t, err, errNotImplemented.Error()) - }) -} - func TestExecutor_SortMerge(t *testing.T) { t.Run("no inputs result in empty pipeline", func(t *testing.T) { c := &Context{} diff --git a/pkg/engine/executor/pipeline_utils_test.go b/pkg/engine/executor/pipeline_utils_test.go index 58eb1c3e37..02b8233d9f 100644 --- a/pkg/engine/executor/pipeline_utils_test.go +++ b/pkg/engine/executor/pipeline_utils_test.go @@ -1,6 +1,7 @@ package executor import ( + "errors" "testing" "github.com/apache/arrow-go/v18/arrow" @@ -55,28 +56,28 @@ func AssertPipelinesEqual(t testing.TB, left, right Pipeline) { } } - // Check if both pipelines are complete - if leftErr == EOF && rightErr == EOF { + // Check conditions on our batches: + switch { + case errors.Is(leftErr, EOF) && errors.Is(rightErr, EOF): + // Both pipelines are finished; we can finish now. return - } - // If one pipeline has an error but the other doesn't, they're not equal - if (leftErr == EOF && rightErr != EOF) || (leftErr != EOF && rightErr == EOF) { - require.Fail(t, "Pipelines have different number of rows", + case (errors.Is(leftErr, EOF) && rightErr == nil) || (errors.Is(rightErr, EOF) && leftErr == nil): + // One pipeline finished before the other (and the other didn't fail), then + // there's an inequal number of rows. + require.Fail(t, "Pipelines have a different number of rows", "left error: %v, right error: %v", leftErr, rightErr) - } - // If both pipelines have errors that aren't EOF, they fail equally - if leftErr != nil && rightErr != nil && leftErr != EOF && rightErr != EOF { + case leftErr != nil && rightErr != nil && !errors.Is(leftErr, EOF) && !errors.Is(rightErr, EOF): + // Both pipelines failed with non-EOF errors. require.Equal(t, leftErr, rightErr, "Pipelines failed with different errors") - return - } - // If one pipeline has an error that's not EOF, the pipelines are not equal - if leftErr != nil && leftErr != EOF { + case leftErr != nil && !errors.Is(leftErr, EOF): + // Left pipeline failed with a non-EOF error. require.Fail(t, "Left pipeline failed", "error: %v", leftErr) - } - if rightErr != nil && rightErr != EOF { + + case rightErr != nil && !errors.Is(rightErr, EOF): + // Right pipeline failed with a non-EOF error. require.Fail(t, "Right pipeline failed", "error: %v", rightErr) } diff --git a/pkg/util/topk/topk.go b/pkg/util/topk/topk.go new file mode 100644 index 0000000000..813402b93e --- /dev/null +++ b/pkg/util/topk/topk.go @@ -0,0 +1,132 @@ +package topk + +import ( + "container/heap" + "iter" + "math/rand/v2" + "slices" +) + +// Heap implements a heap of T. If Limit is specified, only the greatest +// elements (according to Less) up to Limit are kept. +// +// When removing elements, the smallest element (according to Less) is returned +// first. If using Heap as a max-heap, these elements need to stored in reverse +// order. +type Heap[T any] struct { + Limit int // Maximum number of entries to keep (0 = unlimited). Optional. + Less func(a, b T) bool // Less returns true if a < b. Required. + + values []T // Current values in the heap. +} + +// Push adds v into the heap. If the heap is full, v is added only if it is +// larger than the smallest value in the heap. +func (h *Heap[T]) Push(v T) { + if h.Limit == 0 || len(h.values) < h.Limit { + heap.Push(h.impl(), v) + return + } + + // h.values[0] is always the smallest value in the heap. + if h.Less(h.values[0], v) { + _ = heap.Pop(h.impl()) + heap.Push(h.impl(), v) + } +} + +// Pop removes and returns the minimum element from the heap. Pop returns the +// zero value for T and false if the heap is empty. +func (h *Heap[T]) Pop() (T, bool) { + if len(h.values) == 0 { + var zero T + return zero, false + } + + return heap.Pop(heapImpl[T]{h}).(T), true +} + +// Len returns the current number of elements in the heap. +func (h *Heap[T]) Len() int { return len(h.values) } + +// PopAll removes and returns all elements from the heap in sorted order. +func (h *Heap[T]) PopAll() []T { + res := h.values + + slices.SortFunc(res, func(a, b T) int { + if h.Less(a, b) { + return -1 + } + return 1 + }) + + // Reset h.values to nil to avoid changes to the heap modifying the returned + // slice. + h.values = nil + return res +} + +// Range returns an iterator over elements in the heap in random order without +// modifying the heap. The iteration order is not consistent between calls to +// Range. +// +// To retrieve items in sorted order, use [Heap.Pop] or [Heap.PopAll]. +func (h *Heap[T]) Range() iter.Seq[T] { + if len(h.values) == 0 { + return func(func(T) bool) {} + } + + // Create a random start point in the heap to avoid relying on the return + // order. + // + // This is similar to how Go range over maps work, but that creates a seed at + // the time the heap is created rather than when ranging begins. + start := rand.IntN(len(h.values)) + + return func(yield func(T) bool) { + curr := start + + for { + if !yield(h.values[curr]) { + return + } + + // Increment curr and stop once we've fully looped back to where we + // started. + curr = (curr + 1) % len(h.values) + if curr == start { + return + } + } + } +} + +type heapImpl[T any] struct { + *Heap[T] +} + +func (h *Heap[T]) impl() heap.Interface { return heapImpl[T]{h} } + +var _ heap.Interface = (*heapImpl[int])(nil) + +func (impl heapImpl[T]) Len() int { return impl.Heap.Len() } + +func (impl heapImpl[T]) Less(i, j int) bool { + return impl.Heap.Less(impl.values[i], impl.values[j]) +} + +func (impl heapImpl[T]) Swap(i, j int) { + impl.values[i], impl.values[j] = impl.values[j], impl.values[i] +} + +func (impl heapImpl[T]) Push(x any) { + impl.values = append(impl.values, x.(T)) +} + +func (impl heapImpl[T]) Pop() any { + old := impl.values + n := len(old) + x := old[n-1] + impl.values = old[:n-1] + return x +} diff --git a/pkg/util/topk/topk_test.go b/pkg/util/topk/topk_test.go new file mode 100644 index 0000000000..afd1a4a815 --- /dev/null +++ b/pkg/util/topk/topk_test.go @@ -0,0 +1,84 @@ +package topk_test + +import ( + "fmt" + "slices" + "sort" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/util/topk" +) + +// ExampleHeap_greatest shows how to use a [topk.Heap] to get the top-k greatest +// elements in descending order. +func ExampleHeap_greatest() { + heap := &topk.Heap[int]{ + Limit: 3, + Less: func(a, b int) bool { return a < b }, + } + + for i := range 10 { + heap.Push(i) + } + + actual := heap.PopAll() + slices.Reverse(actual) // Reverse to get in greatest-descending order. + + fmt.Println(actual) + // Output: [9 8 7] +} + +// ExampleHeap_least shows how to use a [topk.Heap] to get the top-k least +// elements in ascending order. +func ExampleHeap_least() { + heap := &topk.Heap[int]{ + Limit: 3, + Less: func(a, b int) bool { return a > b }, + } + + for i := range 10 { + heap.Push(i) + } + + actual := heap.PopAll() + slices.Reverse(actual) // Reverse to get in least-ascending order. + + fmt.Println(actual) + // Output: [0 1 2] +} + +func TestHeap_Range(t *testing.T) { + heap := &topk.Heap[int]{ + Limit: 3, + Less: func(a, b int) bool { return a < b }, + } + + for i := range 10 { + heap.Push(i) + } + + var actual []int + for v := range heap.Range() { + actual = append(actual, v) + } + sort.Ints(actual) + + expected := []int{7, 8, 9} + require.Equal(t, expected, actual) +} + +func TestHeap_Range_Empty(t *testing.T) { + heap := &topk.Heap[int]{ + Limit: 3, + Less: func(a, b int) bool { return a < b }, + } + + require.NotPanics(t, func() { + // Iterating over an empty heap should be a no-op. + for range heap.Range() { + t.Fatal("there should not be any values in the empty heap") + } + }) +}