chore(engine): provide basic implementation of DataObjScan (#17568)

Co-authored-by: Ashwanth Goli <iamashwanth@gmail.com>
pull/17578/head
Robert Fratto 8 months ago committed by GitHub
parent c129024d63
commit 4841b2abe2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 134
      pkg/dataobj/querier/iter.go
  2. 12
      pkg/dataobj/querier/iter_test.go
  3. 554
      pkg/engine/executor/dataobjscan.go
  4. 47
      pkg/engine/executor/dataobjscan_predicate.go
  5. 301
      pkg/engine/executor/dataobjscan_test.go
  6. 28
      pkg/engine/executor/executor.go
  7. 9
      pkg/engine/executor/executor_test.go
  8. 31
      pkg/engine/executor/pipeline_utils_test.go
  9. 132
      pkg/util/topk/topk.go
  10. 84
      pkg/util/topk/topk_test.go

@ -1,11 +1,10 @@
package querier
import (
"container/heap"
"context"
"fmt"
"io"
"sort"
"slices"
"sync"
"github.com/grafana/loki/v3/pkg/dataobj"
@ -15,6 +14,7 @@ import (
"github.com/grafana/loki/v3/pkg/logql/log"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
"github.com/grafana/loki/v3/pkg/util/topk"
)
var (
@ -70,8 +70,12 @@ func newEntryIterator(ctx context.Context,
prevStreamID int64 = -1
streamExtractor log.StreamPipeline
streamHash uint64
top = newTopK(int(req.Limit), req.Direction)
statistics = stats.FromContext(ctx)
top = topk.Heap[entryWithLabels]{
Limit: int(req.Limit),
Less: lessFn(req.Direction),
}
statistics = stats.FromContext(ctx)
)
for {
@ -102,7 +106,7 @@ func newEntryIterator(ctx context.Context,
}
statistics.AddPostFilterRows(1)
top.Add(entryWithLabels{
top.Push(entryWithLabels{
Labels: parsedLabels.String(),
StreamHash: streamHash,
Entry: logproto.Entry{
@ -114,49 +118,8 @@ func newEntryIterator(ctx context.Context,
})
}
}
return top.Iterator(), nil
}
// entryHeap implements a min-heap of entries based on a custom less function.
// The less function determines the ordering based on the direction (FORWARD/BACKWARD).
// For FORWARD direction:
// - When comparing timestamps: entry.Timestamp.After(b) means 'a' is "less" than 'b'
// - Example: [t3, t2, t1] where t3 is most recent, t3 will be at the root (index 0)
//
// For BACKWARD direction:
// - When comparing timestamps: entry.Timestamp.Before(b) means 'a' is "less" than 'b'
// - Example: [t1, t2, t3] where t1 is oldest, t1 will be at the root (index 0)
//
// In both cases:
// - When timestamps are equal, we use labels as a tiebreaker
// - The root of the heap (index 0) contains the entry we want to evict first
type entryHeap struct {
less func(a, b entryWithLabels) bool
entries []entryWithLabels
}
func (h *entryHeap) Push(x any) {
h.entries = append(h.entries, x.(entryWithLabels))
}
func (h *entryHeap) Pop() any {
old := h.entries
n := len(old)
x := old[n-1]
h.entries = old[:n-1]
return x
}
func (h *entryHeap) Len() int {
return len(h.entries)
}
func (h *entryHeap) Less(i, j int) bool {
return h.less(h.entries[i], h.entries[j])
}
func (h *entryHeap) Swap(i, j int) {
h.entries[i], h.entries[j] = h.entries[j], h.entries[i]
return heapIterator(&top), nil
}
func lessFn(direction logproto.Direction) func(a, b entryWithLabels) bool {
@ -180,55 +143,22 @@ func lessFn(direction logproto.Direction) func(a, b entryWithLabels) bool {
}
}
// topk maintains a min-heap of the k most relevant entries.
// The heap is ordered by timestamp (and labels as tiebreaker) based on the direction:
// - For FORWARD: keeps k oldest entries by evicting newest entries first
// Example with k=3: If entries arrive as [t1,t2,t3,t4,t5], heap will contain [t1,t2,t3]
// - For BACKWARD: keeps k newest entries by evicting oldest entries first
// Example with k=3: If entries arrive as [t1,t2,t3,t4,t5], heap will contain [t3,t4,t5]
type topk struct {
k int
minHeap entryHeap
}
func newTopK(k int, direction logproto.Direction) *topk {
if k <= 0 {
panic("k must be greater than 0")
}
entries := entryWithLabelsPool.Get().(*[]entryWithLabels)
return &topk{
k: k,
minHeap: entryHeap{
less: lessFn(direction),
entries: (*entries)[:0],
},
}
}
// Add adds a new entry to the topk heap.
// If the heap has less than k entries, the entry is added directly.
// Otherwise, if the new entry should be evicted before the root (index 0),
// it is discarded. If not, the root is popped (discarded) and the new entry is pushed.
//
// For FORWARD direction:
// - Root contains newest entry (to be evicted first)
// - New entries that are newer than root are discarded
// Example: With k=3 and heap=[t1,t2,t3], a new entry t4 is discarded
//
// For BACKWARD direction:
// - Root contains oldest entry (to be evicted first)
// - New entries that are older than root are discarded
// Example: With k=3 and heap=[t3,t4,t5], a new entry t2 is discarded
func (t *topk) Add(r entryWithLabels) {
if t.minHeap.Len() < t.k {
heap.Push(&t.minHeap, r)
return
}
if t.minHeap.less(t.minHeap.entries[0], r) {
_ = heap.Pop(&t.minHeap)
heap.Push(&t.minHeap, r)
}
// heapIterator creates a new EntryIterator for the given topk heap. After
// calling heapIterator, h is emptied.
func heapIterator(h *topk.Heap[entryWithLabels]) iter.EntryIterator {
elems := h.PopAll()
// We need to reverse the order of the entries in the slice to maintain the order of logs we
// want to return:
//
// For FORWARD direction, we want smallest timestamps first (but the heap is
// ordered by largest timestamps first due to lessFn).
//
// For BACKWARD direction, we want largest timestamps first (but the heap is
// ordered by smallest timestamps first due to lessFn).
slices.Reverse(elems)
return &sliceIterator{entries: elems}
}
type sliceIterator struct {
@ -236,18 +166,6 @@ type sliceIterator struct {
curr entryWithLabels
}
func (t *topk) Iterator() iter.EntryIterator {
// We swap i and j in the less comparison to reverse the ordering from the minHeap.
// The minHeap is ordered such that the entry to evict is at index 0.
// For FORWARD: newest entries are evicted first, so we want oldest entries first in the final slice
// For BACKWARD: oldest entries are evicted first, so we want newest entries first in the final slice
// By swapping i and j, we effectively reverse the minHeap ordering to get the desired final ordering
sort.Slice(t.minHeap.entries, func(i, j int) bool {
return t.minHeap.less(t.minHeap.entries[j], t.minHeap.entries[i])
})
return &sliceIterator{entries: t.minHeap.entries}
}
func (s *sliceIterator) Next() bool {
if len(s.entries) == 0 {
return false

@ -7,6 +7,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/util/topk"
)
// makeEntry is a helper function to create a log entry with given timestamp and line
@ -94,19 +95,20 @@ func TestTopKIterator(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create topk iterator
top := &topk{
k: tt.k,
minHeap: entryHeap{less: lessFn(tt.direction)},
top := topk.Heap[entryWithLabels]{
Limit: tt.k,
Less: lessFn(tt.direction),
}
// Add entries
for _, e := range tt.input {
top.Add(e)
top.Push(e)
}
// Collect results
var got []entryWithLabels
iter := top.Iterator()
iter := heapIterator(&top)
for iter.Next() {
got = append(got, entryWithLabels{
Entry: iter.At(),

@ -0,0 +1,554 @@
package executor
import (
"cmp"
"context"
"errors"
"fmt"
"io"
"slices"
"sync"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/prometheus/prometheus/model/labels"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/engine/planner/physical"
"github.com/grafana/loki/v3/pkg/util/topk"
)
type dataobjScan struct {
ctx context.Context
opts dataobjScanOptions
initialized bool
readers []*dataobj.LogsReader
streams map[int64]labels.Labels
state state
}
type dataobjScanOptions struct {
// TODO(rfratto): Limiting each DataObjScan to a single section is going to
// be critical for limiting memory overhead here; the section is intended to
// be the smallest unit of parallelization.
Object *dataobj.Object // Object to read from.
StreamIDs []int64 // Stream IDs to match from logs sections.
Predicates []dataobj.LogsPredicate // Predicate to apply to the logs.
Projections []physical.ColumnExpression // Columns to include. An empty slice means all columns.
Direction physical.Direction // Direction of timestamps to return.
Limit uint32 // A limit on the number of rows to return (0=unlimited).
}
var _ Pipeline = (*dataobjScan)(nil)
// newDataobjScanPipeline creates a new Pipeline which emits a single
// [arrow.Record] composed of all log sections in a data object. Rows in the
// returned record are ordered by timestamp in the direction specified by
// opts.Direction.
func newDataobjScanPipeline(ctx context.Context, opts dataobjScanOptions) *dataobjScan {
return &dataobjScan{ctx: ctx, opts: opts}
}
// Read retrieves the next [arrow.Record] from the dataobj.
func (s *dataobjScan) Read() error {
if err := s.init(); err != nil {
return err
}
rec, err := s.read()
s.state = newState(rec, err)
if err != nil {
return fmt.Errorf("reading data object: %w", err)
}
return nil
}
func (s *dataobjScan) init() error {
if s.initialized {
return nil
}
md, err := s.opts.Object.Metadata(s.ctx)
if err != nil {
return fmt.Errorf("reading metadata: %w", err)
}
if err := s.initStreams(md); err != nil {
return fmt.Errorf("initializing streams: %w", err)
}
s.readers = make([]*dataobj.LogsReader, 0, md.LogsSections)
for section := range md.LogsSections {
// TODO(rfratto): There's a few problems with using LogsReader as it is:
//
// 1. LogsReader doesn't support providing a subset of columns to read
// from, so we're applying projections after reading.
//
// 2. LogsReader is intended to be pooled to reduce memory, but we're
// creating a new one every time here.
//
// For the sake of the initial implementation I'm ignoring these issues,
// but we'll absolutely need to solve this prior to production use.
lr := dataobj.NewLogsReader(s.opts.Object, section)
// The calls below can't fail because we're always using a brand new logs
// reader.
_ = lr.MatchStreams(slices.Values(s.opts.StreamIDs))
_ = lr.SetPredicates(s.opts.Predicates)
s.readers = append(s.readers, lr)
}
s.initialized = true
return nil
}
// initStreams retrieves all requested stream records from streams sections so
// that emitted [arrow.Record]s can include stream labels in results.
func (s *dataobjScan) initStreams(md dataobj.Metadata) error {
var sr dataobj.StreamsReader
streams := make([]dataobj.Stream, 512)
// Initialize entries in the map so we can do a presence test in the loop
// below.
s.streams = make(map[int64]labels.Labels, len(s.opts.StreamIDs))
for _, id := range s.opts.StreamIDs {
s.streams[id] = nil
}
for section := range md.StreamsSections {
// TODO(rfratto): dataobj.StreamsPredicate is missing support for filtering
// on stream IDs when we already know them in advance; this can cause the
// Read here to take longer than it needs to since we're reading the
// entirety of every row.
sr.Reset(s.opts.Object, section)
for {
n, err := sr.Read(s.ctx, streams)
if n == 0 && errors.Is(err, io.EOF) {
return nil
} else if err != nil && !errors.Is(err, io.EOF) {
return err
}
for i, stream := range streams[:n] {
if _, found := s.streams[stream.ID]; !found {
continue
}
s.streams[stream.ID] = stream.Labels
// Zero out the stream entry from the slice so the next call to sr.Read
// doesn't overwrite any memory we just moved to s.streams.
streams[i] = dataobj.Stream{}
}
}
}
// Check that all streams were populated.
var errs []error
for id, labels := range s.streams {
if labels == nil {
errs = append(errs, fmt.Errorf("requested stream ID %d not found in any stream section", id))
}
}
return errors.Join(errs...)
}
// read reads the entire data object into memory and generates an arrow.Record
// from the data. It returns an error upon encountering an error while reading
// one of the sections.
func (s *dataobjScan) read() (arrow.Record, error) {
// Since [physical.DataObjScan] requires that:
//
// * Records are ordered by timestamp, and
// * Records from the same dataobjScan do not overlap in time
//
// we *must* read the entire data object before creating a record, as the
// sections in the dataobj itself are not already sorted by timestamp (though
// we only need to keep up to Limit rows in memory).
var (
heapMut sync.Mutex
heap = topk.Heap[dataobj.Record]{
Limit: int(s.opts.Limit),
Less: s.getLessFunc(s.opts.Direction),
}
)
g, ctx := errgroup.WithContext(s.ctx)
var gotData atomic.Bool
for _, reader := range s.readers {
g.Go(func() error {
buf := make([]dataobj.Record, 512)
for {
n, err := reader.Read(ctx, buf)
if n == 0 && errors.Is(err, io.EOF) {
return nil
} else if err != nil && !errors.Is(err, io.EOF) {
return err
}
gotData.Store(true)
heapMut.Lock()
for _, rec := range buf[:n] {
heap.Push(rec)
}
heapMut.Unlock()
}
})
}
if err := g.Wait(); err != nil {
return nil, err
} else if !gotData.Load() {
return nil, EOF
}
projections, err := s.effectiveProjections(&heap)
if err != nil {
return nil, fmt.Errorf("getting effective projections: %w", err)
}
schema, err := schemaFromColumns(projections)
if err != nil {
return nil, fmt.Errorf("creating schema: %w", err)
}
// TODO(rfratto): pass allocator to builder
rb := array.NewRecordBuilder(memory.NewGoAllocator(), schema)
defer rb.Release()
records := heap.PopAll()
slices.Reverse(records)
for _, record := range records {
for i := 0; i < schema.NumFields(); i++ {
field, builder := rb.Schema().Field(i), rb.Field(i)
s.appendToBuilder(builder, &field, &record)
}
}
return rb.NewRecord(), nil
}
func (s *dataobjScan) getLessFunc(dir physical.Direction) func(a, b dataobj.Record) bool {
// compareStreams is used when two records have the same timestamp.
compareStreams := func(a, b dataobj.Record) bool {
aStream, ok := s.streams[a.StreamID]
if !ok {
return false
}
bStream, ok := s.streams[b.StreamID]
if !ok {
return true
}
return labels.Compare(aStream, bStream) < 0
}
switch dir {
case physical.Forward:
return func(a, b dataobj.Record) bool {
if a.Timestamp.Equal(b.Timestamp) {
compareStreams(a, b)
}
return a.Timestamp.After(b.Timestamp)
}
case physical.Backwards:
return func(a, b dataobj.Record) bool {
if a.Timestamp.Equal(b.Timestamp) {
compareStreams(a, b)
}
return a.Timestamp.Before(b.Timestamp)
}
default:
panic("invalid direction")
}
}
// effectiveProjections returns the effective projections to return for a
// record. If s.opts.Projections is non-empty, then its column expressions are
// used for the projections.
//
// Otherwise, the set of all columns found in the heap are used, in order of:
//
// * All stream labels (sorted by name)
// * All metadata columns (sorted by name)
// * Log timestamp
// * Log message
//
// effectiveProjections does not mutate h.
func (s *dataobjScan) effectiveProjections(h *topk.Heap[dataobj.Record]) ([]physical.ColumnExpression, error) {
if len(s.opts.Projections) > 0 {
return s.opts.Projections, nil
}
var (
columns []physical.ColumnExpression
foundStreams = map[int64]struct{}{}
found = map[physical.ColumnExpr]struct{}{}
)
addColumn := func(name string, ty types.ColumnType) {
expr := physical.ColumnExpr{
Ref: types.ColumnRef{Column: name, Type: ty},
}
if _, ok := found[expr]; !ok {
found[expr] = struct{}{}
columns = append(columns, &expr)
}
}
for rec := range h.Range() {
stream, ok := s.streams[rec.StreamID]
if !ok {
// If we hit this, there's a problem with either initStreams (we missed a
// requested stream) or the predicate application, where it returned a
// stream we didn't want.
return nil, fmt.Errorf("stream ID %d not found in stream cache", rec.StreamID)
}
if _, addedStream := foundStreams[rec.StreamID]; !addedStream {
for _, label := range stream {
addColumn(label.Name, types.ColumnTypeLabel)
}
foundStreams[rec.StreamID] = struct{}{}
}
for _, md := range rec.Metadata {
addColumn(md.Name, types.ColumnTypeMetadata)
}
}
// Sort existing columns by type (preferring labels) then name.
slices.SortFunc(columns, func(a, b physical.ColumnExpression) int {
aRef, bRef := a.(*physical.ColumnExpr).Ref, b.(*physical.ColumnExpr).Ref
if aRef.Type != bRef.Type {
if aRef.Type == types.ColumnTypeLabel {
return -1 // Labels first.
}
return 1
}
return cmp.Compare(aRef.Column, bRef.Column)
})
// Add fixed columns at the end.
addColumn("timestamp", types.ColumnTypeBuiltin)
addColumn("message", types.ColumnTypeBuiltin)
return columns, nil
}
func schemaFromColumns(columns []physical.ColumnExpression) (*arrow.Schema, error) {
var (
fields = make([]arrow.Field, 0, len(columns))
fingerprints = make(map[string]struct{}, len(columns))
)
addField := func(field arrow.Field) {
fp := field.Fingerprint()
if field.HasMetadata() {
// We differentiate column type using metadata, but the metadata isn't
// included in the fingerprint, so we need to manually include it here.
fp += field.Metadata.String()
}
if _, exist := fingerprints[fp]; exist {
return
}
fields = append(fields, field)
fingerprints[fp] = struct{}{}
}
for _, column := range columns {
columnExpr, ok := column.(*physical.ColumnExpr)
if !ok {
return nil, fmt.Errorf("invalid column expression type %T", column)
}
md := arrow.MetadataFrom(map[string]string{
types.ColumnTypeMetadataKey: columnExpr.Ref.Type.String(),
})
switch columnExpr.Ref.Type {
case types.ColumnTypeLabel:
// TODO(rfratto): Switch to dictionary encoding for labels.
//
// Since labels are more likely to repeat than metadata, we could cut
// down on the memory overhead of a record by dictionary encoding the
// labels.
//
// However, the csv package we use for testing DataObjScan currently
// (2025-05-02) doesn't support dictionary encoding, and we would need
// to find a solution there.
//
// We skipped dictionary encoding for now to get the initial prototype
// working.
addField(arrow.Field{
Name: columnExpr.Ref.Column,
Type: arrow.BinaryTypes.String,
Nullable: true,
Metadata: md,
})
case types.ColumnTypeMetadata:
// Metadata is *not* encoded using dictionary encoding since metadata is
// has unconstrained cardinality. Using dictionary encoding would require
// tracking every encoded value in the record, which is likely to be too
// expensive.
addField(arrow.Field{
Name: columnExpr.Ref.Column,
Type: arrow.BinaryTypes.String,
Nullable: true,
Metadata: md,
})
case types.ColumnTypeBuiltin:
addField(arrow.Field{
Name: columnExpr.Ref.Column,
Type: builtinColumnType(columnExpr.Ref),
Nullable: true,
Metadata: md,
})
case types.ColumnTypeAmbiguous:
// The best handling for ambiguous columns (in terms of the schema) is to
// explode it out into multiple columns, one for each type. (Except for
// parsed, which can't be emitted from DataObjScan right now).
//
// TODO(rfratto): should ambiguity be passed down like this? It's odd for
// the returned schema to be different than the set of columns you asked
// for.
//
// As an alternative, ambiguity could be handled by the planner, where it
// performs the explosion and propagates the ambiguity down into the
// predicates.
//
// If we're ok with the schema changing from what was requested, then we
// could update this to resolve the ambiguity at [dataobjScan.effectiveProjections]
// so we don't always explode out to the full set of columns.
addField(arrow.Field{
Name: columnExpr.Ref.Column,
Type: arrow.BinaryTypes.String,
Nullable: true,
Metadata: arrow.MetadataFrom(map[string]string{types.ColumnTypeMetadataKey: types.ColumnTypeLabel.String()}),
})
addField(arrow.Field{
Name: columnExpr.Ref.Column,
Type: arrow.BinaryTypes.String,
Nullable: true,
Metadata: arrow.MetadataFrom(map[string]string{types.ColumnTypeMetadataKey: types.ColumnTypeMetadata.String()}),
})
case types.ColumnTypeParsed:
return nil, fmt.Errorf("parsed column type not supported: %s", columnExpr.Ref.Type)
}
}
return arrow.NewSchema(fields, nil), nil
}
func builtinColumnType(ref types.ColumnRef) arrow.DataType {
if ref.Type != types.ColumnTypeBuiltin {
panic("builtinColumnType called with a non-builtin column")
}
switch ref.Column {
case "timestamp":
return arrow.FixedWidthTypes.Timestamp_ns
case "message":
return arrow.BinaryTypes.String
default:
panic(fmt.Sprintf("unsupported builtin column type %s", ref))
}
}
// appendToBuilder appends a the provided field from record into the given
// builder. The metadata of field is used to determine the category of column.
// appendToBuilder panics if the type of field does not match the datatype of
// builder.
func (s *dataobjScan) appendToBuilder(builder array.Builder, field *arrow.Field, record *dataobj.Record) {
columnType, ok := field.Metadata.GetValue(types.ColumnTypeMetadataKey)
if !ok {
// This shouldn't happen; we control the metadata here on the fields.
panic(fmt.Sprintf("missing column type in field %s", field.Name))
}
switch columnType {
case types.ColumnTypeLabel.String():
stream, ok := s.streams[record.StreamID]
if !ok {
panic(fmt.Sprintf("stream ID %d not found in stream cache", record.StreamID))
}
val := stream.Get(field.Name)
if val == "" {
builder.(*array.StringBuilder).AppendNull()
} else {
builder.(*array.StringBuilder).Append(val)
}
case types.ColumnTypeMetadata.String():
val := record.Metadata.Get(field.Name)
if val == "" {
builder.(*array.StringBuilder).AppendNull()
} else {
builder.(*array.StringBuilder).Append(val)
}
case types.ColumnTypeBuiltin.String():
if field.Name == "timestamp" {
ts, _ := arrow.TimestampFromTime(record.Timestamp, arrow.Nanosecond)
builder.(*array.TimestampBuilder).Append(ts)
} else if field.Name == "message" {
// Use the inner BinaryBuilder to avoid converting record.Line to a
// string and back.
builder.(*array.StringBuilder).BinaryBuilder.Append(record.Line)
} else {
panic(fmt.Sprintf("unsupported builtin column %s", field.Name))
}
default:
// This shouldn't happen; we control the metadata here on the fields.
panic(fmt.Sprintf("unsupported column type %s", columnType))
}
}
// Value returns the current [arrow.Record] retrieved by the previous call to
// [dataobjScan.Read], or an error if the record cannot be read.
func (s *dataobjScan) Value() (arrow.Record, error) { return s.state.batch, s.state.err }
// Close closes s and releases all resources.
func (s *dataobjScan) Close() {
for _, reader := range s.readers {
_ = reader.Close()
}
}
// Inputs implements Pipeline and returns nil, since DataObjScan accepts no
// pipelines as input.
func (s *dataobjScan) Inputs() []Pipeline { return nil }
// Transport implements Pipeline and returns [Local].
func (s *dataobjScan) Transport() Transport { return Local }

@ -0,0 +1,47 @@
package executor
import (
"fmt"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/engine/planner/physical"
)
// buildLogsPredicate builds a logs predicate from an expression.
func buildLogsPredicate(_ physical.Expression) (dataobj.LogsPredicate, error) {
// TODO(rfratto): implement converting expressions into logs predicates.
//
// There's a few challenges here:
//
// - Expressions do not cleanly map to [dataobj.LogsPredicate]s. For example,
// an expression may be simply a column reference, but a logs predicate is
// always some expression that can evaluate to true.
//
// - Mapping expressions into [dataobj.TimeRangePredicate] is a massive pain;
// since TimeRangePredicate specifies both bounds for the time range, we
// would need to find and collapse multiple physical.Expressions into a
// single TimeRangePredicate.
//
// - While [dataobj.MetadataMatcherPredicate] and
// [dataobj.LogMessageFilterPredicate] are catch-alls for function-based
// predicates, they are row-based and not column-based, so our
// expressionEvaluator cannot be used here.
//
// Long term, we likely want two things:
//
// 1. Use dataset.Reader and dataset.Predicate directly instead of
// dataobj.LogsReader.
//
// 2. Update dataset.Reader to be vector based instead of row-based.
//
// It's not clear if we should resolve the issues with LogsPredicate (or find
// hacks to make them work in the short term), or skip straight to using
// dataset.Reader instead.
//
// Implementing DataObjScan in the dataobj package would be a clean way to
// handle all of this, but that would cause cyclic dependencies. I also don't
// think we should start removing things from internal for this; we can probably
// find a way to remove the explicit dependency from the dataobj package from
// the physical planner instead.
return nil, fmt.Errorf("logs predicate conversion is not supported")
}

@ -0,0 +1,301 @@
package executor
import (
"bytes"
"math"
"testing"
"time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/engine/internal/types"
"github.com/grafana/loki/v3/pkg/engine/planner/physical"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/pkg/push"
)
var (
labelMD = buildMetadata(types.ColumnTypeLabel)
metadataMD = buildMetadata(types.ColumnTypeMetadata)
builtinMD = buildMetadata(types.ColumnTypeBuiltin)
)
func buildMetadata(ty types.ColumnType) arrow.Metadata {
return arrow.MetadataFrom(map[string]string{
types.ColumnTypeMetadataKey: ty.String(),
})
}
func Test_dataobjScan(t *testing.T) {
obj := buildDataobj(t, []logproto.Stream{
{
Labels: `{service="loki", env="prod"}`,
Entries: []logproto.Entry{
{
Timestamp: time.Unix(5, 0),
Line: "hello world",
StructuredMetadata: []push.LabelAdapter{{Name: "guid", Value: "aaaa-bbbb-cccc-dddd"}},
},
{
Timestamp: time.Unix(10, 0),
Line: "goodbye world",
StructuredMetadata: []push.LabelAdapter{{Name: "guid", Value: "eeee-ffff-aaaa-bbbb"}},
},
},
},
{
Labels: `{service="notloki", env="prod"}`,
Entries: []logproto.Entry{
{
Timestamp: time.Unix(2, 0),
Line: "hello world",
StructuredMetadata: []push.LabelAdapter{{Name: "pod", Value: "notloki-pod-1"}},
},
{
Timestamp: time.Unix(3, 0),
Line: "goodbye world",
StructuredMetadata: []push.LabelAdapter{{Name: "pod", Value: "notloki-pod-1"}},
},
},
},
})
t.Run("All columns", func(t *testing.T) {
pipeline := newDataobjScanPipeline(t.Context(), dataobjScanOptions{
Object: obj,
StreamIDs: []int64{1, 2}, // All streams
Projections: nil, // All columns
Direction: physical.Forward,
Limit: 0, // No limit
})
expectFields := []arrow.Field{
{Name: "env", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true},
{Name: "service", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true},
{Name: "guid", Type: arrow.BinaryTypes.String, Metadata: metadataMD, Nullable: true},
{Name: "pod", Type: arrow.BinaryTypes.String, Metadata: metadataMD, Nullable: true},
{Name: "timestamp", Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: builtinMD, Nullable: true},
{Name: "message", Type: arrow.BinaryTypes.String, Metadata: builtinMD, Nullable: true},
}
expectCSV := `prod,notloki,NULL,notloki-pod-1,1970-01-01 00:00:02,hello world
prod,notloki,NULL,notloki-pod-1,1970-01-01 00:00:03,goodbye world
prod,loki,aaaa-bbbb-cccc-dddd,NULL,1970-01-01 00:00:05,hello world
prod,loki,eeee-ffff-aaaa-bbbb,NULL,1970-01-01 00:00:10,goodbye world`
expectRecord, err := CSVToArrow(expectFields, expectCSV)
require.NoError(t, err)
defer expectRecord.Release()
AssertPipelinesEqual(t, pipeline, NewBufferedPipeline(expectRecord))
})
t.Run("Column subset", func(t *testing.T) {
pipeline := newDataobjScanPipeline(t.Context(), dataobjScanOptions{
Object: obj,
StreamIDs: []int64{1, 2}, // All streams
Projections: []physical.ColumnExpression{
&physical.ColumnExpr{Ref: types.ColumnRef{Column: "timestamp", Type: types.ColumnTypeBuiltin}},
&physical.ColumnExpr{Ref: types.ColumnRef{Column: "env", Type: types.ColumnTypeLabel}},
},
Direction: physical.Forward,
Limit: 0, // No limit
})
expectFields := []arrow.Field{
{Name: "timestamp", Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: builtinMD, Nullable: true},
{Name: "env", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true},
}
expectCSV := `1970-01-01 00:00:02,prod
1970-01-01 00:00:03,prod
1970-01-01 00:00:05,prod
1970-01-01 00:00:10,prod`
expectRecord, err := CSVToArrow(expectFields, expectCSV)
require.NoError(t, err)
defer expectRecord.Release()
AssertPipelinesEqual(t, pipeline, NewBufferedPipeline(expectRecord))
})
t.Run("Unknown column", func(t *testing.T) {
// Here, we'll check for a column which only exists once in the dataobj but is
// ambiguous from the perspective of the caller.
pipeline := newDataobjScanPipeline(t.Context(), dataobjScanOptions{
Object: obj,
StreamIDs: []int64{1, 2}, // All streams
Projections: []physical.ColumnExpression{
&physical.ColumnExpr{Ref: types.ColumnRef{Column: "env", Type: types.ColumnTypeAmbiguous}},
},
Direction: physical.Forward,
Limit: 0, // No limit
})
expectFields := []arrow.Field{
{Name: "env", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true},
{Name: "env", Type: arrow.BinaryTypes.String, Metadata: metadataMD, Nullable: true},
}
expectCSV := `prod,NULL
prod,NULL
prod,NULL
prod,NULL`
expectRecord, err := CSVToArrow(expectFields, expectCSV)
require.NoError(t, err)
defer expectRecord.Release()
AssertPipelinesEqual(t, pipeline, NewBufferedPipeline(expectRecord))
})
}
func Test_dataobjScan_DuplicateColumns(t *testing.T) {
obj := buildDataobj(t, []logproto.Stream{
// Case 1: A single row has a value for a label and metadata column with
// the same name.
{
Labels: `{service="loki", env="prod", pod="pod-1"}`,
Entries: []logproto.Entry{
{
Timestamp: time.Unix(1, 0),
Line: "message 1",
StructuredMetadata: []push.LabelAdapter{{Name: "pod", Value: "override"}},
},
},
},
// Case 2: A label and metadata column share a name but have values in
// different rows.
{
Labels: `{service="loki", env="prod"}`,
Entries: []logproto.Entry{{
Timestamp: time.Unix(2, 0),
Line: "message 2",
StructuredMetadata: []push.LabelAdapter{{Name: "namespace", Value: "namespace-1"}},
}},
},
{
Labels: `{service="loki", env="prod", namespace="namespace-2"}`,
Entries: []logproto.Entry{{
Timestamp: time.Unix(3, 0),
Line: "message 3",
StructuredMetadata: nil,
}},
},
})
t.Run("All columns", func(t *testing.T) {
pipeline := newDataobjScanPipeline(t.Context(), dataobjScanOptions{
Object: obj,
StreamIDs: []int64{1, 2, 3}, // All streams
Projections: nil, // All columns
Direction: physical.Forward,
Limit: 0, // No limit
})
expectFields := []arrow.Field{
{Name: "env", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true},
{Name: "namespace", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true},
{Name: "pod", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true},
{Name: "service", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true},
{Name: "namespace", Type: arrow.BinaryTypes.String, Metadata: metadataMD, Nullable: true},
{Name: "pod", Type: arrow.BinaryTypes.String, Metadata: metadataMD, Nullable: true},
{Name: "timestamp", Type: arrow.FixedWidthTypes.Timestamp_ns, Metadata: builtinMD, Nullable: true},
{Name: "message", Type: arrow.BinaryTypes.String, Metadata: builtinMD, Nullable: true},
}
expectCSV := `prod,NULL,pod-1,loki,NULL,override,1970-01-01 00:00:01,message 1
prod,NULL,NULL,loki,namespace-1,NULL,1970-01-01 00:00:02,message 2
prod,namespace-2,NULL,loki,NULL,NULL,1970-01-01 00:00:03,message 3`
expectRecord, err := CSVToArrow(expectFields, expectCSV)
require.NoError(t, err)
defer expectRecord.Release()
AssertPipelinesEqual(t, pipeline, NewBufferedPipeline(expectRecord))
})
t.Run("Ambiguous pod", func(t *testing.T) {
pipeline := newDataobjScanPipeline(t.Context(), dataobjScanOptions{
Object: obj,
StreamIDs: []int64{1, 2, 3}, // All streams
Projections: []physical.ColumnExpression{
&physical.ColumnExpr{Ref: types.ColumnRef{Column: "pod", Type: types.ColumnTypeAmbiguous}},
},
Direction: physical.Forward,
Limit: 0, // No limit
})
expectFields := []arrow.Field{
{Name: "pod", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true},
{Name: "pod", Type: arrow.BinaryTypes.String, Metadata: metadataMD, Nullable: true},
}
expectCSV := `pod-1,override
NULL,NULL
NULL,NULL`
expectRecord, err := CSVToArrow(expectFields, expectCSV)
require.NoError(t, err)
defer expectRecord.Release()
AssertPipelinesEqual(t, pipeline, NewBufferedPipeline(expectRecord))
})
t.Run("Ambiguous namespace", func(t *testing.T) {
pipeline := newDataobjScanPipeline(t.Context(), dataobjScanOptions{
Object: obj,
StreamIDs: []int64{1, 2, 3}, // All streams
Projections: []physical.ColumnExpression{
&physical.ColumnExpr{Ref: types.ColumnRef{Column: "namespace", Type: types.ColumnTypeAmbiguous}},
},
Direction: physical.Forward,
Limit: 0, // No limit
})
expectFields := []arrow.Field{
{Name: "namespace", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true},
{Name: "namespace", Type: arrow.BinaryTypes.String, Metadata: metadataMD, Nullable: true},
}
expectCSV := `NULL,NULL
NULL,namespace-1
namespace-2,NULL`
expectRecord, err := CSVToArrow(expectFields, expectCSV)
require.NoError(t, err)
defer expectRecord.Release()
AssertPipelinesEqual(t, pipeline, NewBufferedPipeline(expectRecord))
})
}
func buildDataobj(t testing.TB, streams []logproto.Stream) *dataobj.Object {
t.Helper()
builder, err := dataobj.NewBuilder(dataobj.BuilderConfig{
TargetPageSize: 8_000,
TargetObjectSize: math.MaxInt,
TargetSectionSize: 32_000,
BufferSize: 8_000,
SectionStripeMergeLimit: 2,
})
require.NoError(t, err)
for _, stream := range streams {
require.NoError(t, builder.Append(stream))
}
var buf bytes.Buffer
_, err = builder.Flush(&buf)
require.NoError(t, err)
r := bytes.NewReader(buf.Bytes())
return dataobj.FromReaderAt(r, r.Size())
}

@ -5,17 +5,22 @@ import (
"errors"
"fmt"
"github.com/thanos-io/objstore"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/engine/planner/physical"
)
type Config struct {
BatchSize int64 `yaml:"batch_size"`
Bucket objstore.Bucket
}
func Run(ctx context.Context, cfg Config, plan *physical.Plan) Pipeline {
c := &Context{
plan: plan,
batchSize: cfg.BatchSize,
bucket: cfg.Bucket,
}
if plan == nil {
return errorPipeline(errors.New("plan is nil"))
@ -32,6 +37,7 @@ type Context struct {
batchSize int64
plan *physical.Plan
evaluator expressionEvaluator
bucket objstore.Bucket
}
func (c *Context) execute(ctx context.Context, node physical.Node) Pipeline {
@ -57,8 +63,26 @@ func (c *Context) execute(ctx context.Context, node physical.Node) Pipeline {
}
}
func (c *Context) executeDataObjScan(_ context.Context, _ *physical.DataObjScan) Pipeline {
return errorPipeline(errNotImplemented)
func (c *Context) executeDataObjScan(ctx context.Context, node *physical.DataObjScan) Pipeline {
predicates := make([]dataobj.LogsPredicate, 0, len(node.Predicates))
for _, p := range node.Predicates {
conv, err := buildLogsPredicate(p)
if err != nil {
return errorPipeline(err)
}
predicates = append(predicates, conv)
}
return newDataobjScanPipeline(ctx, dataobjScanOptions{
Object: dataobj.FromBucket(c.bucket, string(node.Location)),
StreamIDs: node.StreamIDs,
Predicates: predicates,
Projections: node.Projections,
Direction: node.Direction,
Limit: node.Limit,
})
}
func (c *Context) executeSortMerge(_ context.Context, sortmerge *physical.SortMerge, inputs []Pipeline) Pipeline {

@ -23,15 +23,6 @@ func TestExecutor(t *testing.T) {
})
}
func TestExecutor_DataObjScan(t *testing.T) {
t.Run("is not implemented", func(t *testing.T) {
c := &Context{}
pipeline := c.executeDataObjScan(context.TODO(), &physical.DataObjScan{})
err := pipeline.Read()
require.ErrorContains(t, err, errNotImplemented.Error())
})
}
func TestExecutor_SortMerge(t *testing.T) {
t.Run("no inputs result in empty pipeline", func(t *testing.T) {
c := &Context{}

@ -1,6 +1,7 @@
package executor
import (
"errors"
"testing"
"github.com/apache/arrow-go/v18/arrow"
@ -55,28 +56,28 @@ func AssertPipelinesEqual(t testing.TB, left, right Pipeline) {
}
}
// Check if both pipelines are complete
if leftErr == EOF && rightErr == EOF {
// Check conditions on our batches:
switch {
case errors.Is(leftErr, EOF) && errors.Is(rightErr, EOF):
// Both pipelines are finished; we can finish now.
return
}
// If one pipeline has an error but the other doesn't, they're not equal
if (leftErr == EOF && rightErr != EOF) || (leftErr != EOF && rightErr == EOF) {
require.Fail(t, "Pipelines have different number of rows",
case (errors.Is(leftErr, EOF) && rightErr == nil) || (errors.Is(rightErr, EOF) && leftErr == nil):
// One pipeline finished before the other (and the other didn't fail), then
// there's an inequal number of rows.
require.Fail(t, "Pipelines have a different number of rows",
"left error: %v, right error: %v", leftErr, rightErr)
}
// If both pipelines have errors that aren't EOF, they fail equally
if leftErr != nil && rightErr != nil && leftErr != EOF && rightErr != EOF {
case leftErr != nil && rightErr != nil && !errors.Is(leftErr, EOF) && !errors.Is(rightErr, EOF):
// Both pipelines failed with non-EOF errors.
require.Equal(t, leftErr, rightErr, "Pipelines failed with different errors")
return
}
// If one pipeline has an error that's not EOF, the pipelines are not equal
if leftErr != nil && leftErr != EOF {
case leftErr != nil && !errors.Is(leftErr, EOF):
// Left pipeline failed with a non-EOF error.
require.Fail(t, "Left pipeline failed", "error: %v", leftErr)
}
if rightErr != nil && rightErr != EOF {
case rightErr != nil && !errors.Is(rightErr, EOF):
// Right pipeline failed with a non-EOF error.
require.Fail(t, "Right pipeline failed", "error: %v", rightErr)
}

@ -0,0 +1,132 @@
package topk
import (
"container/heap"
"iter"
"math/rand/v2"
"slices"
)
// Heap implements a heap of T. If Limit is specified, only the greatest
// elements (according to Less) up to Limit are kept.
//
// When removing elements, the smallest element (according to Less) is returned
// first. If using Heap as a max-heap, these elements need to stored in reverse
// order.
type Heap[T any] struct {
Limit int // Maximum number of entries to keep (0 = unlimited). Optional.
Less func(a, b T) bool // Less returns true if a < b. Required.
values []T // Current values in the heap.
}
// Push adds v into the heap. If the heap is full, v is added only if it is
// larger than the smallest value in the heap.
func (h *Heap[T]) Push(v T) {
if h.Limit == 0 || len(h.values) < h.Limit {
heap.Push(h.impl(), v)
return
}
// h.values[0] is always the smallest value in the heap.
if h.Less(h.values[0], v) {
_ = heap.Pop(h.impl())
heap.Push(h.impl(), v)
}
}
// Pop removes and returns the minimum element from the heap. Pop returns the
// zero value for T and false if the heap is empty.
func (h *Heap[T]) Pop() (T, bool) {
if len(h.values) == 0 {
var zero T
return zero, false
}
return heap.Pop(heapImpl[T]{h}).(T), true
}
// Len returns the current number of elements in the heap.
func (h *Heap[T]) Len() int { return len(h.values) }
// PopAll removes and returns all elements from the heap in sorted order.
func (h *Heap[T]) PopAll() []T {
res := h.values
slices.SortFunc(res, func(a, b T) int {
if h.Less(a, b) {
return -1
}
return 1
})
// Reset h.values to nil to avoid changes to the heap modifying the returned
// slice.
h.values = nil
return res
}
// Range returns an iterator over elements in the heap in random order without
// modifying the heap. The iteration order is not consistent between calls to
// Range.
//
// To retrieve items in sorted order, use [Heap.Pop] or [Heap.PopAll].
func (h *Heap[T]) Range() iter.Seq[T] {
if len(h.values) == 0 {
return func(func(T) bool) {}
}
// Create a random start point in the heap to avoid relying on the return
// order.
//
// This is similar to how Go range over maps work, but that creates a seed at
// the time the heap is created rather than when ranging begins.
start := rand.IntN(len(h.values))
return func(yield func(T) bool) {
curr := start
for {
if !yield(h.values[curr]) {
return
}
// Increment curr and stop once we've fully looped back to where we
// started.
curr = (curr + 1) % len(h.values)
if curr == start {
return
}
}
}
}
type heapImpl[T any] struct {
*Heap[T]
}
func (h *Heap[T]) impl() heap.Interface { return heapImpl[T]{h} }
var _ heap.Interface = (*heapImpl[int])(nil)
func (impl heapImpl[T]) Len() int { return impl.Heap.Len() }
func (impl heapImpl[T]) Less(i, j int) bool {
return impl.Heap.Less(impl.values[i], impl.values[j])
}
func (impl heapImpl[T]) Swap(i, j int) {
impl.values[i], impl.values[j] = impl.values[j], impl.values[i]
}
func (impl heapImpl[T]) Push(x any) {
impl.values = append(impl.values, x.(T))
}
func (impl heapImpl[T]) Pop() any {
old := impl.values
n := len(old)
x := old[n-1]
impl.values = old[:n-1]
return x
}

@ -0,0 +1,84 @@
package topk_test
import (
"fmt"
"slices"
"sort"
"testing"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/util/topk"
)
// ExampleHeap_greatest shows how to use a [topk.Heap] to get the top-k greatest
// elements in descending order.
func ExampleHeap_greatest() {
heap := &topk.Heap[int]{
Limit: 3,
Less: func(a, b int) bool { return a < b },
}
for i := range 10 {
heap.Push(i)
}
actual := heap.PopAll()
slices.Reverse(actual) // Reverse to get in greatest-descending order.
fmt.Println(actual)
// Output: [9 8 7]
}
// ExampleHeap_least shows how to use a [topk.Heap] to get the top-k least
// elements in ascending order.
func ExampleHeap_least() {
heap := &topk.Heap[int]{
Limit: 3,
Less: func(a, b int) bool { return a > b },
}
for i := range 10 {
heap.Push(i)
}
actual := heap.PopAll()
slices.Reverse(actual) // Reverse to get in least-ascending order.
fmt.Println(actual)
// Output: [0 1 2]
}
func TestHeap_Range(t *testing.T) {
heap := &topk.Heap[int]{
Limit: 3,
Less: func(a, b int) bool { return a < b },
}
for i := range 10 {
heap.Push(i)
}
var actual []int
for v := range heap.Range() {
actual = append(actual, v)
}
sort.Ints(actual)
expected := []int{7, 8, 9}
require.Equal(t, expected, actual)
}
func TestHeap_Range_Empty(t *testing.T) {
heap := &topk.Heap[int]{
Limit: 3,
Less: func(a, b int) bool { return a < b },
}
require.NotPanics(t, func() {
// Iterating over an empty heap should be a no-op.
for range heap.Range() {
t.Fatal("there should not be any values in the empty heap")
}
})
}
Loading…
Cancel
Save