chore(dataobj): Wire dataset.Reader into dataobj.LogsReader and dataobj.StreamsReader (#16701)

Co-authored-by: Ashwanth Goli <iamashwanth@gmail.com>
pull/16709/head
Robert Fratto 1 year ago committed by GitHub
parent 8c4eb428c3
commit cd11367148
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 13
      pkg/dataobj/internal/dataset/predicate.go
  2. 100
      pkg/dataobj/internal/dataset/reader.go
  3. 2
      pkg/dataobj/internal/dataset/reader_basic.go
  4. 159
      pkg/dataobj/internal/dataset/reader_test.go
  5. 7
      pkg/dataobj/internal/sections/logs/iter.go
  6. 7
      pkg/dataobj/internal/sections/streams/iter.go
  7. 321
      pkg/dataobj/logs_reader.go
  8. 2
      pkg/dataobj/logs_reader_test.go
  9. 63
      pkg/dataobj/predicate_test.go
  10. 304
      pkg/dataobj/streams_reader.go
  11. 2
      pkg/dataobj/streams_reader_test.go

@ -15,6 +15,13 @@ type (
// included if either the Left or Right Predicate are true.
OrPredicate struct{ Left, Right Predicate }
// A NotePredicate is a [Predicate] which asserts that a row may only be
// included if the inner Predicate is false.
NotPredicate struct{ Inner Predicate }
// FalsePredicate is a [Predicate] which always returns false.
FalsePredicate struct{}
// An EqualPredicate is a [Predicate] which asserts that a row may only be
// included if the Value of the Column is equal to the Value.
EqualPredicate struct {
@ -55,6 +62,8 @@ type (
func (AndPredicate) isPredicate() {}
func (OrPredicate) isPredicate() {}
func (NotPredicate) isPredicate() {}
func (FalsePredicate) isPredicate() {}
func (EqualPredicate) isPredicate() {}
func (GreaterThanPredicate) isPredicate() {}
func (LessThanPredicate) isPredicate() {}
@ -78,6 +87,10 @@ func WalkPredicate(p Predicate, fn func(p Predicate) bool) {
WalkPredicate(p.Left, fn)
WalkPredicate(p.Right, fn)
case NotPredicate:
WalkPredicate(p.Inner, fn)
case FalsePredicate: // No children.
case EqualPredicate: // No children.
case GreaterThanPredicate: // No children.
case LessThanPredicate: // No children.

@ -126,6 +126,8 @@ func (r *Reader) Read(ctx context.Context, s []Row) (n int, err error) {
count, err := r.inner.ReadColumns(ctx, r.dl.PrimaryColumns(), s[:readSize])
if err != nil && !errors.Is(err, io.EOF) {
return n, err
} else if count == 0 && errors.Is(err, io.EOF) {
return 0, io.EOF
}
var passCount int // passCount tracks how many rows pass the predicate.
@ -196,6 +198,12 @@ func checkPredicate(p Predicate, lookup map[Column]int, row Row) bool {
case OrPredicate:
return checkPredicate(p.Left, lookup, row) || checkPredicate(p.Right, lookup, row)
case NotPredicate:
return !checkPredicate(p.Inner, lookup, row)
case FalsePredicate:
return false
case EqualPredicate:
columnIndex, ok := lookup[p.Column]
if !ok {
@ -350,7 +358,7 @@ func (r *Reader) validatePredicate() error {
err = process(p.Column)
case FuncPredicate:
err = process(p.Column)
case AndPredicate, OrPredicate, nil:
case AndPredicate, OrPredicate, NotPredicate, FalsePredicate, nil:
// No columns to process.
default:
panic(fmt.Sprintf("dataset.Reader.validatePredicate: unsupported predicate type %T", p))
@ -422,7 +430,7 @@ func (r *Reader) fillPrimaryMask(mask *bitmask.Mask) {
process(p.Column)
case FuncPredicate:
process(p.Column)
case AndPredicate, OrPredicate, nil:
case AndPredicate, OrPredicate, NotPredicate, FalsePredicate, nil:
// No columns to process.
default:
panic(fmt.Sprintf("dataset.Reader.fillPrimaryMask: unsupported predicate type %T", p))
@ -463,6 +471,25 @@ func (r *Reader) buildPredicateRanges(ctx context.Context, p Predicate) (rowRang
}
return unionRanges(nil, left, right), nil
case NotPredicate:
// De Morgan's laws must be applied to reduce the NotPredicate to a set of
// predicates that can be applied to pages.
//
// See comment on [simplifyNotPredicate] for more information.
simplified, err := simplifyNotPredicate(p)
if err != nil {
// Predicate can't be simplfied, so we permit the full range.
var rowsCount uint64
for _, column := range r.dl.AllColumns() {
rowsCount = max(rowsCount, uint64(column.ColumnInfo().RowsCount))
}
return rowRanges{{Start: 0, End: rowsCount - 1}}, nil
}
return r.buildPredicateRanges(ctx, simplified)
case FalsePredicate:
return nil, nil // No valid ranges.
case EqualPredicate:
return r.buildColumnPredicateRanges(ctx, p.Column, p)
@ -489,6 +516,67 @@ func (r *Reader) buildPredicateRanges(ctx context.Context, p Predicate) (rowRang
}
}
// simplifyNotPredicate applies De Morgan's laws to a NotPredicate to permit
// page filtering.
//
// While during evaluation, a NotPredicate inverts the result of the inner
// predicate, the same can't be done for page filtering. For example, imagine
// that a page is included from a rule "a > 10." If we inverted that inclusion,
// we may be incorrectly filtering out that page, as that page may also have
// values less than 10.
//
// To correctly apply page filtering to a NotPredicate, we reduce the
// NotPredicate to a set of predicates that can be applied to pages. This may
// result in other NotPredicates that also need to be simplified.
//
// If the NotPredicate can't be simplified, simplifyNotPredicate returns an
// error.
func simplifyNotPredicate(p NotPredicate) (Predicate, error) {
switch inner := p.Inner.(type) {
case AndPredicate: // De Morgan's law: !(A && B) == !A || !B
return OrPredicate{
Left: NotPredicate{Inner: inner.Left},
Right: NotPredicate{Inner: inner.Right},
}, nil
case OrPredicate: // De Morgan's law: !(A || B) == !A && !B
return AndPredicate{
Left: NotPredicate{Inner: inner.Left},
Right: NotPredicate{Inner: inner.Right},
}, nil
case NotPredicate: // De Morgan's law: !!A == A
return inner.Inner, nil
case FalsePredicate:
return nil, fmt.Errorf("can't simplify FalsePredicate")
case EqualPredicate: // De Morgan's law: !(A == B) == A != B == A < B || A > B
return OrPredicate{
Left: LessThanPredicate(inner),
Right: GreaterThanPredicate(inner),
}, nil
case GreaterThanPredicate: // De Morgan's law: !(A > B) == A <= B
return OrPredicate{
Left: EqualPredicate(inner),
Right: LessThanPredicate(inner),
}, nil
case LessThanPredicate: // De Morgan's law: !(A < B) == A >= B
return OrPredicate{
Left: EqualPredicate(inner),
Right: GreaterThanPredicate(inner),
}, nil
case FuncPredicate:
return nil, fmt.Errorf("can't simplify FuncPredicate")
default:
panic(fmt.Sprintf("unsupported predicate type %T", inner))
}
}
// buildColumnPredicateRanges returns a set of rowRanges that are valid based
// on whether EqualPredicate, GreaterThanPredicate, or LessThanPredicate may be
// true for each page in a column.
@ -536,10 +624,10 @@ func (r *Reader) buildColumnPredicateRanges(ctx context.Context, c Column, p Pre
switch p := p.(type) {
case EqualPredicate: // EqualPredicate may be true if p.Value is inside the range of the page.
include = CompareValues(p.Value, minValue) >= 0 && CompareValues(p.Value, maxValue) <= 0
case GreaterThanPredicate: // GreaterThanPredicate may be true if p.Value is greater than the min value.
include = CompareValues(p.Value, minValue) > 0
case LessThanPredicate: // LessThanPredicate may be true if p.Value is less than the max value.
include = CompareValues(p.Value, maxValue) < 0
case GreaterThanPredicate: // GreaterThanPredicate may be true if maxValue of a page is greater than p.Value
include = CompareValues(maxValue, p.Value) > 0
case LessThanPredicate: // LessThanPredicate may be true if minValue of a page is less than p.Value
include = CompareValues(minValue, p.Value) < 0
default:
panic(fmt.Sprintf("unsupported predicate type %T", p))
}

@ -90,7 +90,7 @@ func (pr *basicReader) ReadColumns(ctx context.Context, columns []Column, s []Ro
// Fill does not advance the offset of the basicReader.
func (pr *basicReader) Fill(ctx context.Context, columns []Column, s []Row) (n int, err error) {
if len(columns) == 0 {
return 0, fmt.Errorf("no columns to read")
return 0, fmt.Errorf("no columns to fill")
}
for partition := range partitionRows(s) {

@ -8,6 +8,9 @@ import (
"testing"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
)
func Test_Reader_ReadAll(t *testing.T) {
@ -228,3 +231,159 @@ func readDataset(br *Reader, batchSize int) ([]Row, error) {
}
}
}
func Test_BuildPredicateRanges(t *testing.T) {
ds, cols := buildMemDatasetWithStats(t)
tt := []struct {
name string
predicate Predicate
want rowRanges
}{
{
name: "nil predicate returns full range",
predicate: nil,
want: rowRanges{{Start: 0, End: 999}}, // Full dataset range
},
{
name: "equal predicate in range",
predicate: EqualPredicate{Column: cols[1], Value: Int64Value(50)},
want: rowRanges{{Start: 0, End: 249}}, // Page 1 of Timestamp column
},
{
name: "equal predicate not in any range",
predicate: EqualPredicate{Column: cols[1], Value: Int64Value(1500)},
want: nil, // No ranges should match
},
{
name: "greater than predicate",
predicate: GreaterThanPredicate{Column: cols[1], Value: Int64Value(400)},
want: rowRanges{{Start: 250, End: 749}, {Start: 750, End: 999}}, // Pages 2 and 3 of Timestamp column
},
{
name: "less than predicate",
predicate: LessThanPredicate{Column: cols[1], Value: Int64Value(300)},
want: rowRanges{{Start: 0, End: 249}, {Start: 250, End: 749}}, // Pages 1 and 2 of Timestamp column
},
{
name: "and predicate",
predicate: AndPredicate{
Left: EqualPredicate{Column: cols[0], Value: Int64Value(1)}, // Rows 0 - 299 of stream column
Right: LessThanPredicate{Column: cols[1], Value: Int64Value(600)}, // Rows 0 - 249, 250 - 749 of timestamp column
},
want: rowRanges{{Start: 0, End: 249}, {Start: 250, End: 299}},
},
{
name: "or predicate",
predicate: OrPredicate{
Left: EqualPredicate{Column: cols[0], Value: Int64Value(1)}, // Rows 0 - 299 of stream column
Right: GreaterThanPredicate{Column: cols[1], Value: Int64Value(800)}, // Rows 750 - 999 of timestamp column
},
want: rowRanges{{Start: 0, End: 299}, {Start: 750, End: 999}}, // Rows 0 - 299, 750 - 999
},
}
ctx := context.Background()
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
r := NewReader(ReaderOptions{
Dataset: ds,
Columns: cols,
Predicate: tc.predicate,
})
defer r.Close()
// Initialize downloader
require.NoError(t, r.initDownloader(ctx))
got, err := r.buildPredicateRanges(ctx, tc.predicate)
require.NoError(t, err)
require.Equal(t, tc.want, got, "row ranges should match expected ranges")
})
}
}
// buildMemDatasetWithStats creates a test dataset with only column and page stats.
func buildMemDatasetWithStats(t *testing.T) (Dataset, []Column) {
t.Helper()
dset := FromMemory([]*MemColumn{
{
Info: ColumnInfo{
Name: "stream",
Type: datasetmd.VALUE_TYPE_INT64,
RowsCount: 1000, // 0 - 999
},
Pages: []*MemPage{
{
Info: PageInfo{
RowCount: 300, // 0 - 299
Stats: &datasetmd.Statistics{
MinValue: encodeInt64Value(t, 1),
MaxValue: encodeInt64Value(t, 2),
},
},
},
{
Info: PageInfo{
RowCount: 700, // 300 - 999
Stats: &datasetmd.Statistics{
MinValue: encodeInt64Value(t, 2),
MaxValue: encodeInt64Value(t, 2),
},
},
},
},
},
{
Info: ColumnInfo{
Name: "timestamp",
Type: datasetmd.VALUE_TYPE_INT64,
RowsCount: 1000, // 0 - 999
},
Pages: []*MemPage{
{
Info: PageInfo{
RowCount: 250, // 0 - 249
Stats: &datasetmd.Statistics{
MinValue: encodeInt64Value(t, 0),
MaxValue: encodeInt64Value(t, 100),
},
},
},
{
Info: PageInfo{
RowCount: 500, // 249 - 749
Stats: &datasetmd.Statistics{
MinValue: encodeInt64Value(t, 200),
MaxValue: encodeInt64Value(t, 500),
},
},
},
{
Info: PageInfo{
RowCount: 250, // 750 - 999
Stats: &datasetmd.Statistics{
MinValue: encodeInt64Value(t, 800),
MaxValue: encodeInt64Value(t, 1000),
},
},
},
},
},
})
cols, err := result.Collect(dset.ListColumns(context.Background()))
require.NoError(t, err)
return dset, cols
}
// Helper function to encode an integer value for statistics
func encodeInt64Value(t *testing.T, v int64) []byte {
t.Helper()
data, err := Int64Value(v).MarshalBinary()
require.NoError(t, err)
return data
}

@ -80,7 +80,7 @@ func IterSection(ctx context.Context, dec encoding.LogsDecoder, section *filemd.
}
for _, row := range rows[:n] {
record, err := decodeRecord(streamsColumns, row)
record, err := Decode(streamsColumns, row)
if err != nil || !yield(record) {
return err
}
@ -89,7 +89,10 @@ func IterSection(ctx context.Context, dec encoding.LogsDecoder, section *filemd.
})
}
func decodeRecord(columns []*logsmd.ColumnDesc, row dataset.Row) (Record, error) {
// Decode decodes a record from a [dataset.Row], using the provided columns to
// determine the column type. The list of columns must match the columns used
// to create the row.
func Decode(columns []*logsmd.ColumnDesc, row dataset.Row) (Record, error) {
record := Record{
// Preallocate metadata to exact number of metadata columns to avoid
// oversizing.

@ -78,7 +78,7 @@ func IterSection(ctx context.Context, dec encoding.StreamsDecoder, section *file
}
for _, row := range rows[:n] {
stream, err := decodeRow(streamsColumns, row)
stream, err := Decode(streamsColumns, row)
if err != nil || !yield(stream) {
return err
}
@ -87,7 +87,10 @@ func IterSection(ctx context.Context, dec encoding.StreamsDecoder, section *file
})
}
func decodeRow(columns []*streamsmd.ColumnDesc, row dataset.Row) (Stream, error) {
// Decode decodes a stream from a [dataset.Row], using the provided columns to
// determine the column type. The list of columns must match the columns used
// to create the row.
func Decode(columns []*streamsmd.ColumnDesc, row dataset.Row) (Stream, error) {
var stream Stream
for columnIndex, columnValue := range row.Values {

@ -2,17 +2,25 @@ package dataobj
import (
"context"
"errors"
"fmt"
"io"
"iter"
"maps"
"slices"
"sort"
"strconv"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/pkg/push"
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/encoding"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
"github.com/grafana/loki/v3/pkg/dataobj/internal/sections/logs"
)
@ -27,14 +35,18 @@ type Record struct {
// LogsReader reads the set of logs from an [Object].
type LogsReader struct {
obj *Object
idx int
obj *Object
idx int
ready bool
matchIDs map[int64]struct{}
predicate LogsPredicate
next func() (result.Result[logs.Record], bool)
stop func()
buf []dataset.Row
reader *dataset.Reader
columns []dataset.Column
columnDesc []*logsmd.ColumnDesc
}
// NewLogsReader creates a new LogsReader that reads from the logs section of
@ -53,7 +65,7 @@ func NewLogsReader(obj *Object, sectionIndex int) *LogsReader {
// MatchStreams may only be called before reading begins or after a call to
// [LogsReader.Reset].
func (r *LogsReader) MatchStreams(ids iter.Seq[int64]) error {
if r.next != nil {
if r.ready {
return fmt.Errorf("cannot change matched streams after reading has started")
}
@ -72,7 +84,7 @@ func (r *LogsReader) MatchStreams(ids iter.Seq[int64]) error {
// A predicate may only be set before reading begins or after a call to
// [LogsReader.Reset].
func (r *LogsReader) SetPredicate(p LogsPredicate) error {
if r.next != nil {
if r.ready {
return fmt.Errorf("cannot change predicate after reading has started")
}
@ -84,72 +96,91 @@ func (r *LogsReader) SetPredicate(p LogsPredicate) error {
// into s. It returns the number of records read and any error encountered. At
// the end of the logs section, Read returns 0, io.EOF.
func (r *LogsReader) Read(ctx context.Context, s []Record) (int, error) {
// TODO(rfratto): The implementation below is the initial, naive approach. It
// lacks a few features that will be needed at scale:
//
// * Read columns/pages in batches of len(s), rather than one row at a time,
//
// * Add page-level filtering based on min/max page values to quickly filter
// out batches of rows without needing to download or decode them.
//
// * Download pages in batches, rather than one at a time.
//
// * Only download/decode non-predicate columns following finding rows that
// match all predicate columns.
//
// * Reuse as much memory as possible from a combination of s and the state
// of LogsReader.
//
// These details can change internally without changing the API exposed by
// LogsReader, which is designed to permit efficient use in the future.
if r.obj == nil {
return 0, io.EOF
} else if r.idx < 0 {
return 0, fmt.Errorf("invalid section index %d", r.idx)
}
if r.next == nil {
err := r.initIter(ctx)
if !r.ready {
err := r.initReader(ctx)
if err != nil {
return 0, err
}
}
for i := range s {
res, ok := r.nextMatching()
if !ok {
return i, io.EOF
}
r.buf = slices.Grow(r.buf, len(s))
r.buf = r.buf[:len(s)]
n, err := r.reader.Read(ctx, r.buf)
if err != nil && !errors.Is(err, io.EOF) {
return 0, fmt.Errorf("reading rows: %w", err)
} else if n == 0 && errors.Is(err, io.EOF) {
return 0, io.EOF
}
record, err := res.Value()
for i := range r.buf[:n] {
readRecord, err := logs.Decode(r.columnDesc, r.buf[i])
if err != nil {
return i, fmt.Errorf("reading record: %w", err)
return i, fmt.Errorf("decoding record: %w", err)
}
s[i] = Record{
StreamID: record.StreamID,
Timestamp: record.Timestamp,
Metadata: convertMetadata(record.Metadata),
Line: record.Line,
StreamID: readRecord.StreamID,
Timestamp: readRecord.Timestamp,
Metadata: convertMetadata(readRecord.Metadata),
Line: readRecord.Line,
}
}
return len(s), nil
return n, nil
}
func (r *LogsReader) initIter(ctx context.Context) error {
func (r *LogsReader) initReader(ctx context.Context) error {
dec := r.obj.dec.LogsDecoder()
sec, err := r.findSection(ctx)
if err != nil {
return fmt.Errorf("finding section: %w", err)
}
if r.stop != nil {
r.stop()
columnDescs, err := dec.Columns(ctx, sec)
if err != nil {
return fmt.Errorf("reading columns: %w", err)
}
dset := encoding.LogsDataset(dec, sec)
columns, err := result.Collect(dset.ListColumns(ctx))
if err != nil {
return fmt.Errorf("reading columns: %w", err)
}
seq := logs.IterSection(ctx, r.obj.dec.LogsDecoder(), sec)
r.next, r.stop = result.Pull(seq)
// r.predicate doesn't contain mappings of stream IDs; we need to build
// that as a separate predicate and AND them together.
predicate := streamIDPredicate(maps.Keys(r.matchIDs), columns, columnDescs)
if r.predicate != nil {
predicate = dataset.AndPredicate{
Left: predicate,
Right: translateLogsPredicate(r.predicate, columns, columnDescs),
}
}
readerOpts := dataset.ReaderOptions{
Dataset: dset,
Columns: columns,
Predicate: predicate,
TargetCacheSize: 16_000_000, // Permit up to 16MB of cache pages.
}
if r.reader == nil {
r.reader = dataset.NewReader(readerOpts)
} else {
r.reader.Reset(readerOpts)
}
r.columnDesc = columnDescs
r.columns = columns
r.ready = true
return nil
}
@ -173,96 +204,184 @@ func (r *LogsReader) findSection(ctx context.Context) (*filemd.SectionInfo, erro
return nil, fmt.Errorf("section index %d not found", r.idx)
}
func (r *LogsReader) nextMatching() (result.Result[logs.Record], bool) {
if r.next == nil {
return result.Result[logs.Record]{}, false
func convertMetadata(md push.LabelsAdapter) labels.Labels {
l := make(labels.Labels, 0, len(md))
for _, label := range md {
l = append(l, labels.Label{Name: label.Name, Value: label.Value})
}
sort.Sort(l)
return l
}
NextRow:
res, ok := r.next()
if !ok {
return res, ok
}
// Reset resets the LogsReader with a new object and section index to read
// from. Reset allows reusing a LogsReader without allocating a new one.
//
// Any set predicate is cleared when Reset is called.
//
// Reset may be called with a nil object and a negative section index to clear
// the LogsReader without needing a new object.
func (r *LogsReader) Reset(obj *Object, sectionIndex int) {
r.obj = obj
r.idx = sectionIndex
r.ready = false
record, err := res.Value()
if err != nil {
return res, true
clear(r.matchIDs)
r.predicate = nil
r.columns = nil
r.columnDesc = nil
// We leave r.reader as-is to avoid reallocating; it'll be reset on the first
// call to Read.
}
func streamIDPredicate(ids iter.Seq[int64], columns []dataset.Column, columnDesc []*logsmd.ColumnDesc) dataset.Predicate {
var res dataset.Predicate
streamIDColumn := findColumnFromDesc(columns, columnDesc, func(desc *logsmd.ColumnDesc) bool {
return desc.Type == logsmd.COLUMN_TYPE_STREAM_ID
})
if streamIDColumn == nil {
return dataset.FalsePredicate{}
}
if r.matchIDs != nil {
if _, ok := r.matchIDs[record.StreamID]; !ok {
goto NextRow
for id := range ids {
p := dataset.EqualPredicate{
Column: streamIDColumn,
Value: dataset.Int64Value(id),
}
}
if !matchLogsPredicate(r.predicate, record) {
goto NextRow
if res == nil {
res = p
} else {
res = dataset.OrPredicate{
Left: res,
Right: p,
}
}
}
return res, true
return res
}
func matchLogsPredicate(p Predicate, record logs.Record) bool {
func translateLogsPredicate(p LogsPredicate, columns []dataset.Column, columnDesc []*logsmd.ColumnDesc) dataset.Predicate {
if p == nil {
return true
return nil
}
switch p := p.(type) {
case AndPredicate[LogsPredicate]:
return matchLogsPredicate(p.Left, record) && matchLogsPredicate(p.Right, record)
return dataset.AndPredicate{
Left: translateLogsPredicate(p.Left, columns, columnDesc),
Right: translateLogsPredicate(p.Right, columns, columnDesc),
}
case OrPredicate[LogsPredicate]:
return matchLogsPredicate(p.Left, record) || matchLogsPredicate(p.Right, record)
return dataset.OrPredicate{
Left: translateLogsPredicate(p.Left, columns, columnDesc),
Right: translateLogsPredicate(p.Right, columns, columnDesc),
}
case NotPredicate[LogsPredicate]:
return !matchLogsPredicate(p.Inner, record)
return dataset.NotPredicate{
Inner: translateLogsPredicate(p.Inner, columns, columnDesc),
}
case TimeRangePredicate[LogsPredicate]:
return matchTimestamp(p, record.Timestamp)
timeColumn := findColumnFromDesc(columns, columnDesc, func(desc *logsmd.ColumnDesc) bool {
return desc.Type == logsmd.COLUMN_TYPE_TIMESTAMP
})
if timeColumn == nil {
return dataset.FalsePredicate{}
}
return convertLogsTimePredicate(p, timeColumn)
case MetadataMatcherPredicate:
return getMetadata(record.Metadata, p.Key) == p.Value
metadataColumn := findColumnFromDesc(columns, columnDesc, func(desc *logsmd.ColumnDesc) bool {
return desc.Type == logsmd.COLUMN_TYPE_METADATA && desc.Info.Name == p.Key
})
if metadataColumn == nil {
return dataset.FalsePredicate{}
}
return dataset.EqualPredicate{
Column: metadataColumn,
Value: dataset.StringValue(p.Value),
}
case MetadataFilterPredicate:
return p.Keep(p.Key, getMetadata(record.Metadata, p.Key))
metadataColumn := findColumnFromDesc(columns, columnDesc, func(desc *logsmd.ColumnDesc) bool {
return desc.Type == logsmd.COLUMN_TYPE_METADATA && desc.Info.Name == p.Key
})
if metadataColumn == nil {
return dataset.FalsePredicate{}
}
return dataset.FuncPredicate{
Column: metadataColumn,
Keep: func(_ dataset.Column, value dataset.Value) bool {
return p.Keep(p.Key, valueToString(value))
},
}
default:
// Unsupported predicates should already be caught by
// [LogsReader.SetPredicate].
panic(fmt.Sprintf("unsupported predicate type %T", p))
}
}
func getMetadata(md push.LabelsAdapter, key string) string {
for _, l := range md {
if l.Name == key {
return l.Value
func convertLogsTimePredicate(p TimeRangePredicate[LogsPredicate], column dataset.Column) dataset.Predicate {
var start dataset.Predicate = dataset.GreaterThanPredicate{
Column: column,
Value: dataset.Int64Value(p.StartTime.UnixNano()),
}
if p.IncludeStart {
start = dataset.OrPredicate{
Left: start,
Right: dataset.EqualPredicate{
Column: column,
Value: dataset.Int64Value(p.StartTime.UnixNano()),
},
}
}
return ""
}
var end dataset.Predicate = dataset.LessThanPredicate{
Column: column,
Value: dataset.Int64Value(p.EndTime.UnixNano()),
}
if p.IncludeEnd {
end = dataset.OrPredicate{
Left: end,
Right: dataset.EqualPredicate{
Column: column,
Value: dataset.Int64Value(p.EndTime.UnixNano()),
},
}
}
func convertMetadata(md push.LabelsAdapter) labels.Labels {
l := make(labels.Labels, 0, len(md))
for _, label := range md {
l = append(l, labels.Label{Name: label.Name, Value: label.Value})
return dataset.AndPredicate{
Left: start,
Right: end,
}
sort.Sort(l)
return l
}
// Reset resets the LogsReader with a new object and section index to read
// from. Reset allows reusing a LogsReader without allocating a new one.
//
// Any set predicate is cleared when Reset is called.
//
// Reset may be called with a nil object and a negative section index to clear
// the LogsReader without needing a new object.
func (r *LogsReader) Reset(obj *Object, sectionIndex int) {
if r.stop != nil {
r.stop()
func findColumnFromDesc[Desc any](columns []dataset.Column, descs []Desc, check func(Desc) bool) dataset.Column {
for i, desc := range descs {
if check(desc) {
return columns[i]
}
}
return nil
}
r.obj = obj
r.idx = sectionIndex
r.next = nil
r.stop = nil
clear(r.matchIDs)
r.predicate = nil
func valueToString(value dataset.Value) string {
switch value.Type() {
case datasetmd.VALUE_TYPE_UNSPECIFIED:
return ""
case datasetmd.VALUE_TYPE_INT64:
return strconv.FormatInt(value.Int64(), 10)
case datasetmd.VALUE_TYPE_UINT64:
return strconv.FormatUint(value.Uint64(), 10)
case datasetmd.VALUE_TYPE_STRING:
return value.String()
default:
panic(fmt.Sprintf("unsupported value type %s", value.Type()))
}
}

@ -181,6 +181,6 @@ func readAllRecords(ctx context.Context, r *dataobj.LogsReader) ([]dataobj.Recor
return res, err
}
buf = buf[:0]
clear(buf)
}
}

@ -6,9 +6,17 @@ import (
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/sections/streams"
)
type fakeColumn struct{ dataset.Column }
var (
fakeMinColumn = &fakeColumn{}
fakeMaxColumn = &fakeColumn{}
)
func TestMatchStreamsTimeRangePredicate(t *testing.T) {
now := time.Now()
@ -274,12 +282,42 @@ func TestMatchStreamsTimeRangePredicate(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := matchStreamsPredicate(tt.pred, tt.stream)
predicate := convertStreamsTimePredicate(tt.pred.(TimeRangePredicate[StreamsPredicate]), fakeMinColumn, fakeMaxColumn)
result := evaluateStreamsPredicate(predicate, tt.stream)
require.Equal(t, tt.expected, result, "matchStreamsPredicate returned unexpected result")
})
}
}
func evaluateStreamsPredicate(p dataset.Predicate, s streams.Stream) bool {
switch p := p.(type) {
case dataset.AndPredicate:
return evaluateStreamsPredicate(p.Left, s) && evaluateStreamsPredicate(p.Right, s)
case dataset.OrPredicate:
return evaluateStreamsPredicate(p.Left, s) || evaluateStreamsPredicate(p.Right, s)
case dataset.NotPredicate:
return !evaluateStreamsPredicate(p.Inner, s)
case dataset.GreaterThanPredicate:
if p.Column == fakeMinColumn {
return s.MinTimestamp.After(time.Unix(0, p.Value.Int64()).UTC())
} else if p.Column == fakeMaxColumn {
return s.MaxTimestamp.After(time.Unix(0, p.Value.Int64()).UTC())
}
panic("unexpected column")
case dataset.LessThanPredicate:
if p.Column == fakeMinColumn {
return s.MinTimestamp.Before(time.Unix(0, p.Value.Int64()).UTC())
} else if p.Column == fakeMaxColumn {
return s.MaxTimestamp.Before(time.Unix(0, p.Value.Int64()).UTC())
}
panic("unexpected column")
default:
panic("unexpected predicate")
}
}
func TestMatchTimestamp(t *testing.T) {
now := time.Now()
@ -425,8 +463,29 @@ func TestMatchTimestamp(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := matchTimestamp(tt.pred, tt.ts)
predicate := convertLogsTimePredicate(tt.pred, nil)
result := evaluateRecordPredicate(predicate, tt.ts)
require.Equal(t, tt.expected, result, "matchTimestamp returned unexpected result")
})
}
}
func evaluateRecordPredicate(p dataset.Predicate, ts time.Time) bool {
switch p := p.(type) {
case dataset.AndPredicate:
return evaluateRecordPredicate(p.Left, ts) && evaluateRecordPredicate(p.Right, ts)
case dataset.OrPredicate:
return evaluateRecordPredicate(p.Left, ts) || evaluateRecordPredicate(p.Right, ts)
case dataset.NotPredicate:
return !evaluateRecordPredicate(p.Inner, ts)
case dataset.GreaterThanPredicate:
return ts.After(time.Unix(0, p.Value.Int64()))
case dataset.LessThanPredicate:
return ts.Before(time.Unix(0, p.Value.Int64()))
case dataset.EqualPredicate:
return ts.Equal(time.Unix(0, p.Value.Int64()))
default:
panic("unexpected predicate")
}
}

@ -2,13 +2,18 @@ package dataobj
import (
"context"
"errors"
"fmt"
"io"
"slices"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/encoding"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/filemd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
"github.com/grafana/loki/v3/pkg/dataobj/internal/sections/streams"
)
@ -32,13 +37,17 @@ type Stream struct {
// StreamsReader reads the set of streams from an [Object].
type StreamsReader struct {
obj *Object
idx int
obj *Object
idx int
ready bool
predicate Predicate
predicate StreamsPredicate
next func() (result.Result[streams.Stream], bool)
stop func()
buf []dataset.Row
reader *dataset.Reader
columns []dataset.Column
columnDesc []*streamsmd.ColumnDesc
}
// NewStreamsReader creates a new StreamsReader that reads from the streams
@ -58,7 +67,7 @@ func NewStreamsReader(obj *Object, sectionIndex int) *StreamsReader {
// A predicate may only be set before reading begins or after a call to
// [StreamsReader.Reset].
func (r *StreamsReader) SetPredicate(p StreamsPredicate) error {
if r.next != nil {
if r.ready {
return fmt.Errorf("cannot change predicate after reading has started")
}
@ -70,73 +79,82 @@ func (r *StreamsReader) SetPredicate(p StreamsPredicate) error {
// into s. It returns the number of streams read and any error encountered. At
// the end of the stream section, Read returns 0, io.EOF.
func (r *StreamsReader) Read(ctx context.Context, s []Stream) (int, error) {
// TODO(rfratto): The implementation below is the initial, naive approach. It
// lacks a few features that will be needed at scale:
//
// * Read columns/pages in batches of len(s), rather than one row at a time,
//
// * Add page-level filtering based on min/max page values to quickly filter
// out batches of rows without needing to download or decode them.
//
// * Download pages in batches, rather than one at a time.
//
// * Only download/decode non-predicate columns following finding rows that
// match all predicate columns.
//
// * Reuse as much memory as possible from a combination of s and the state
// of StreamsReader.
//
// These details can change internally without changing the API exposed by
// StreamsReader, which is designed to permit efficient use in the future.
if r.obj == nil {
return 0, io.EOF
} else if r.idx < 0 {
return 0, fmt.Errorf("invalid section index %d", r.idx)
}
if r.next == nil {
err := r.initIter(ctx)
if !r.ready {
err := r.initReader(ctx)
if err != nil {
return 0, err
}
}
for i := range s {
res, ok := r.nextMatching()
if !ok {
return i, io.EOF
}
r.buf = slices.Grow(r.buf, len(s))
r.buf = r.buf[:len(s)]
n, err := r.reader.Read(ctx, r.buf)
if err != nil && !errors.Is(err, io.EOF) {
return 0, fmt.Errorf("reading rows: %w", err)
} else if n == 0 && errors.Is(err, io.EOF) {
return 0, io.EOF
}
stream, err := res.Value()
for i := range r.buf[:n] {
readStream, err := streams.Decode(r.columnDesc, r.buf[i])
if err != nil {
return i, fmt.Errorf("reading stream: %w", err)
return i, fmt.Errorf("decoding stream: %w", err)
}
s[i] = Stream{
ID: stream.ID,
MinTime: stream.MinTimestamp,
MaxTime: stream.MaxTimestamp,
UncompressedSize: stream.UncompressedSize,
Labels: stream.Labels,
ID: readStream.ID,
MinTime: readStream.MinTimestamp,
MaxTime: readStream.MaxTimestamp,
UncompressedSize: readStream.UncompressedSize,
Labels: readStream.Labels,
}
}
return len(s), nil
return n, nil
}
func (r *StreamsReader) initIter(ctx context.Context) error {
func (r *StreamsReader) initReader(ctx context.Context) error {
dec := r.obj.dec.StreamsDecoder()
sec, err := r.findSection(ctx)
if err != nil {
return fmt.Errorf("finding section: %w", err)
}
if r.stop != nil {
r.stop()
columnDescs, err := dec.Columns(ctx, sec)
if err != nil {
return fmt.Errorf("reading columns: %w", err)
}
dset := encoding.StreamsDataset(dec, sec)
columns, err := result.Collect(dset.ListColumns(ctx))
if err != nil {
return fmt.Errorf("reading columns: %w", err)
}
seq := streams.IterSection(ctx, r.obj.dec.StreamsDecoder(), sec)
r.next, r.stop = result.Pull(seq)
readerOpts := dataset.ReaderOptions{
Dataset: dset,
Columns: columns,
Predicate: translateStreamsPredicate(r.predicate, columns, columnDescs),
TargetCacheSize: 16_000_000, // Permit up to 16MB of cache pages.
}
if r.reader == nil {
r.reader = dataset.NewReader(readerOpts)
} else {
r.reader.Reset(readerOpts)
}
r.columnDesc = columnDescs
r.columns = columns
r.ready = true
return nil
}
@ -160,100 +178,150 @@ func (r *StreamsReader) findSection(ctx context.Context) (*filemd.SectionInfo, e
return nil, fmt.Errorf("section index %d not found", r.idx)
}
func (r *StreamsReader) nextMatching() (result.Result[streams.Stream], bool) {
if r.next == nil {
return result.Result[streams.Stream]{}, false
}
NextRow:
res, ok := r.next()
if !ok {
return res, ok
}
stream, err := res.Value()
if err != nil {
return res, true
}
if !matchStreamsPredicate(r.predicate, stream) {
goto NextRow
}
// Reset resets the StreamsReader with a new object and section index to read
// from. Reset allows reusing a StreamsReader without allocating a new one.
//
// Any set predicate is cleared when Reset is called.
//
// Reset may be called with a nil object and a negative section index to clear
// the StreamsReader without needing a new object.
func (r *StreamsReader) Reset(obj *Object, sectionIndex int) {
r.obj = obj
r.idx = sectionIndex
r.predicate = nil
r.ready = false
r.columns = nil
r.columnDesc = nil
return res, true
// We leave r.reader as-is to avoid reallocating; it'll be reset on the first
// call to Read.
}
func matchStreamsPredicate(p Predicate, stream streams.Stream) bool {
func translateStreamsPredicate(p StreamsPredicate, columns []dataset.Column, columnDesc []*streamsmd.ColumnDesc) dataset.Predicate {
if p == nil {
return true
return nil
}
switch p := p.(type) {
case AndPredicate[StreamsPredicate]:
return matchStreamsPredicate(p.Left, stream) && matchStreamsPredicate(p.Right, stream)
return dataset.AndPredicate{
Left: translateStreamsPredicate(p.Left, columns, columnDesc),
Right: translateStreamsPredicate(p.Right, columns, columnDesc),
}
case OrPredicate[StreamsPredicate]:
return matchStreamsPredicate(p.Left, stream) || matchStreamsPredicate(p.Right, stream)
return dataset.OrPredicate{
Left: translateStreamsPredicate(p.Left, columns, columnDesc),
Right: translateStreamsPredicate(p.Right, columns, columnDesc),
}
case NotPredicate[StreamsPredicate]:
return !matchStreamsPredicate(p.Inner, stream)
return dataset.NotPredicate{
Inner: translateStreamsPredicate(p.Inner, columns, columnDesc),
}
case TimeRangePredicate[StreamsPredicate]:
// A stream matches if its time range overlaps with the query range
return overlapsTimeRange(p, stream.MinTimestamp, stream.MaxTimestamp)
minTimestamp := findColumnFromDesc(columns, columnDesc, func(desc *streamsmd.ColumnDesc) bool {
return desc.Type == streamsmd.COLUMN_TYPE_MIN_TIMESTAMP
})
maxTimestamp := findColumnFromDesc(columns, columnDesc, func(desc *streamsmd.ColumnDesc) bool {
return desc.Type == streamsmd.COLUMN_TYPE_MAX_TIMESTAMP
})
if minTimestamp == nil || maxTimestamp == nil {
return dataset.FalsePredicate{}
}
return convertStreamsTimePredicate(p, minTimestamp, maxTimestamp)
case LabelMatcherPredicate:
return stream.Labels.Get(p.Name) == p.Value
metadataColumn := findColumnFromDesc(columns, columnDesc, func(desc *streamsmd.ColumnDesc) bool {
return desc.Type == streamsmd.COLUMN_TYPE_LABEL && desc.Info.Name == p.Name
})
if metadataColumn == nil {
return dataset.FalsePredicate{}
}
return dataset.EqualPredicate{
Column: metadataColumn,
Value: dataset.StringValue(p.Value),
}
case LabelFilterPredicate:
return p.Keep(p.Name, stream.Labels.Get(p.Name))
metadataColumn := findColumnFromDesc(columns, columnDesc, func(desc *streamsmd.ColumnDesc) bool {
return desc.Type == streamsmd.COLUMN_TYPE_LABEL && desc.Info.Name == p.Name
})
if metadataColumn == nil {
return dataset.FalsePredicate{}
}
return dataset.FuncPredicate{
Column: metadataColumn,
Keep: func(_ dataset.Column, value dataset.Value) bool {
return p.Keep(p.Name, valueToString(value))
},
}
default:
// Unsupported predicates should already be caught by
// [StreamsReader.SetPredicate].
panic(fmt.Sprintf("unsupported predicate type %T", p))
}
}
func overlapsTimeRange[P Predicate](p TimeRangePredicate[P], start, end time.Time) bool {
func convertStreamsTimePredicate(p TimeRangePredicate[StreamsPredicate], minColumn, maxColumn dataset.Column) dataset.Predicate {
switch {
case p.IncludeStart && p.IncludeEnd:
return !end.Before(p.StartTime) && !start.After(p.EndTime)
case p.IncludeStart && !p.IncludeEnd:
return !end.Before(p.StartTime) && start.Before(p.EndTime)
case !p.IncludeStart && p.IncludeEnd:
return end.After(p.StartTime) && !start.After(p.EndTime)
case !p.IncludeStart && !p.IncludeEnd:
return end.After(p.StartTime) && start.Before(p.EndTime)
default:
panic("unreachable")
}
}
case p.IncludeStart && p.IncludeEnd: // !max.Before(p.StartTime) && !min.After(p.EndTime)
return dataset.AndPredicate{
Left: dataset.NotPredicate{
Inner: dataset.LessThanPredicate{
Column: maxColumn,
Value: dataset.Int64Value(p.StartTime.UnixNano()),
},
},
Right: dataset.NotPredicate{
Inner: dataset.GreaterThanPredicate{
Column: minColumn,
Value: dataset.Int64Value(p.EndTime.UnixNano()),
},
},
}
case p.IncludeStart && !p.IncludeEnd: // !max.Before(p.StartTime) && min.Before(p.EndTime)
return dataset.AndPredicate{
Left: dataset.NotPredicate{
Inner: dataset.LessThanPredicate{
Column: maxColumn,
Value: dataset.Int64Value(p.StartTime.UnixNano()),
},
},
Right: dataset.LessThanPredicate{
Column: minColumn,
Value: dataset.Int64Value(p.EndTime.UnixNano()),
},
}
case !p.IncludeStart && p.IncludeEnd: // max.After(p.StartTime) && !min.After(p.EndTime)
return dataset.AndPredicate{
Left: dataset.GreaterThanPredicate{
Column: maxColumn,
Value: dataset.Int64Value(p.StartTime.UnixNano()),
},
Right: dataset.NotPredicate{
Inner: dataset.GreaterThanPredicate{
Column: minColumn,
Value: dataset.Int64Value(p.EndTime.UnixNano()),
},
},
}
case !p.IncludeStart && !p.IncludeEnd: // max.After(p.StartTime) && min.Before(p.EndTime)
return dataset.AndPredicate{
Left: dataset.GreaterThanPredicate{
Column: maxColumn,
Value: dataset.Int64Value(p.StartTime.UnixNano()),
},
Right: dataset.LessThanPredicate{
Column: minColumn,
Value: dataset.Int64Value(p.EndTime.UnixNano()),
},
}
func matchTimestamp[P Predicate](p TimeRangePredicate[P], ts time.Time) bool {
switch {
case p.IncludeStart && p.IncludeEnd:
return !ts.Before(p.StartTime) && !ts.After(p.EndTime) // ts >= start && ts <= end
case p.IncludeStart && !p.IncludeEnd:
return !ts.Before(p.StartTime) && ts.Before(p.EndTime) // ts >= start && ts < end
case !p.IncludeStart && p.IncludeEnd:
return ts.After(p.StartTime) && !ts.After(p.EndTime) // ts > start && ts <= end
case !p.IncludeStart && !p.IncludeEnd:
return ts.After(p.StartTime) && ts.Before(p.EndTime) // ts > start && ts < end
default:
panic("unreachable")
}
}
// Reset resets the StreamsReader with a new object and section index to read
// from. Reset allows reusing a StreamsReader without allocating a new one.
//
// Any set predicate is cleared when Reset is called.
//
// Reset may be called with a nil object and a negative section index to clear
// the StreamsReader without needing a new object.
func (r *StreamsReader) Reset(obj *Object, sectionIndex int) {
if r.stop != nil {
r.stop()
}
r.obj = obj
r.idx = sectionIndex
r.next = nil
r.stop = nil
r.predicate = nil
}

@ -130,6 +130,6 @@ func readAllStreams(ctx context.Context, r *dataobj.StreamsReader) ([]dataobj.St
return res, err
}
buf = buf[:0]
clear(buf)
}
}

Loading…
Cancel
Save