Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj/internal/dataset/reader.go

923 lines
27 KiB

package dataset
import (
"context"
"errors"
"fmt"
"io"
"iter"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/bitmask"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/sliceclear"
"github.com/grafana/loki/v3/pkg/xcap"
)
// ReaderOptions configures how a [Reader] will read [Row]s.
type ReaderOptions struct {
Dataset Dataset // Dataset to read from.
// Columns to read from the Dataset. It is invalid to provide a Column that
// is not in Dataset.
//
// The set of Columns can include columns not used in Predicate; such columns
// are considered non-predicate columns.
Columns []Column
// Predicates filter the data returned by a Reader. Predicates are optional; if
// nil, all rows from Columns are returned.
//
// Expressions in Predicate may only reference columns in Columns.
// Holds a list of predicates that can be sequentially applied to the dataset.
Predicates []Predicate
// Prefetch enables bulk retrieving pages from the dataset when reading
// starts. To reduce read latency, this option should only be disabled when
// the entire Dataset is already held in memory.
Prefetch bool
}
// A Reader reads [Row]s from a [Dataset].
type Reader struct {
opts ReaderOptions
ready bool // ready is true if the Reader has been initialized.
origColumnLookup map[Column]int // Find the index of a column in opts.Columns.
primaryColumnIndexes []int // Indexes of primary columns in opts.Columns.
dl *readerDownloader // Bulk page download manager.
row int64 // The current row being read.
inner *basicReader // Underlying reader that reads from columns.
ranges rowRanges // Valid ranges to read across the entire dataset.
region *xcap.Region // Region for recording statistics.
}
// NewReader creates a new Reader from the provided options.
func NewReader(opts ReaderOptions) *Reader {
var r Reader
r.Reset(opts)
return &r
}
// Read reads up to the next len(s) rows from r and stores them into s. It
// returns the number of rows read and any error encountered. At the end of the
// Dataset, Read returns 0, [io.EOF].
func (r *Reader) Read(ctx context.Context, s []Row) (int, error) {
if len(s) == 0 {
return 0, nil
}
if !r.ready {
err := r.init(ctx)
if err != nil {
return 0, fmt.Errorf("initializing reader: %w", err)
}
}
r.region.Record(xcap.StatDatasetReadCalls.Observe(1))
ctx = xcap.ContextWithRegion(ctx, r.region)
// Our Read implementation works by:
//
// 1. Determining the next row to read (aligned to a valid range),
// 2. reading rows from primary columns up to len(s) or to the end of the
// range (whichever is lower),
// 3. filtering those read rows based on the predicate, and finally
// 4. filling the remaining secondary columns in the rows that pass the
// predicate.
//
// If a predicate is defined, primary columns are those used in the
// predicate, and secondary columns are those not used in the predicate. If
// there isn't a predicate, all columns are primary and no columns are
// secondary.
//
// This approach means that one call to Read may return 0, nil if the
// predicate filters out all rows, even if there are more rows in the dataset
// that may pass the predicate.
//
// This *could* be improved by running the above steps in a loop until len(s)
// rows have been read or we hit EOF. However, this would cause the remainder
// of s to shrink each loop iteration, leading to more inner Reads (as the
// buffer gets smaller and smaller).
//
// Improvements on this process must consider the trade-offs between less
// calls to [Reader.Read] while permitting the buffer to be as big as
// possible on each call.
row, err := r.alignRow()
if err != nil {
return 0, err
} else if _, err := r.inner.Seek(int64(row), io.SeekStart); err != nil {
return 0, fmt.Errorf("failed to seek to row %d: %w", row, err)
}
currentRange, ok := r.ranges.Range(row)
if !ok {
// This should be unreachable; alignToRange already ensures that we're in a
// range, or it returns io.EOF.
return 0, fmt.Errorf("failed to find range for row %d", row)
}
readSize := min(len(s), int(currentRange.End-row+1))
readRange := rowRange{
Start: row,
End: row + uint64(readSize) - 1,
}
r.dl.SetReadRange(readRange)
var (
rowsRead int // tracks max rows accessed to move the [r.row] cursor
passCount int // tracks how many rows passed the predicate
)
// If there are no predicates, read all columns in the dataset
if len(r.opts.Predicates) == 0 {
count, err := r.inner.ReadColumns(ctx, r.primaryColumns(), s[:readSize])
if err != nil && !errors.Is(err, io.EOF) {
return count, err
} else if count == 0 && errors.Is(err, io.EOF) {
return 0, io.EOF
}
rowsRead = count
passCount = count
var primaryColumnBytes int64
for i := range count {
primaryColumnBytes += s[i].Size()
}
r.region.Record(xcap.StatDatasetPrimaryRowsRead.Observe(int64(rowsRead)))
r.region.Record(xcap.StatDatasetPrimaryRowBytes.Observe(primaryColumnBytes))
} else {
rowsRead, passCount, err = r.readAndFilterPrimaryColumns(ctx, readSize, s[:readSize])
if err != nil {
return passCount, err
}
}
if secondary := r.secondaryColumns(); len(secondary) > 0 && passCount > 0 {
// Mask out any ranges that aren't in s[:passCount], so that filling in
// secondary columns doesn't consider downloading pages not used for the
// Fill.
//
// This mask is only needed for secondary filling in our read range, as the
// next call to [Reader.Read] will move the read range forward and these
// rows will never be considered.
for maskedRange := range buildMask(readRange, s[:passCount]) {
r.dl.Mask(maskedRange)
}
count, err := r.inner.Fill(ctx, secondary, s[:passCount])
if err != nil && !errors.Is(err, io.EOF) {
return count, err
} else if count != passCount {
return count, fmt.Errorf("failed to fill rows: expected %d, got %d", passCount, count)
}
var totalBytesFilled int64
for i := range count {
totalBytesFilled += s[i].Size() - s[i].SizeOfColumns(r.primaryColumnIndexes)
}
r.region.Record(xcap.StatDatasetSecondaryRowsRead.Observe(int64(count)))
r.region.Record(xcap.StatDatasetSecondaryRowBytes.Observe(totalBytesFilled))
}
// We only advance r.row after we successfully read and filled rows. This
// allows the caller to retry reading rows if a sporadic error occurs.
r.row += int64(rowsRead)
return passCount, nil
}
// readAndFilterPrimaryColumns reads the primary columns from the dataset
// and filters the rows by sequentially applying the predicates.
//
// For each predicate evaluation, only the required columns are loaded.
// Rows are filtered at each step with subsequent predicates only having to fill
// the columns on the reduced row range.
//
// It returns the max rows read, rows that passed all the predicates, and any error
func (r *Reader) readAndFilterPrimaryColumns(ctx context.Context, readSize int, s []Row) (int, int, error) {
var (
rowsRead int // tracks max rows accessed to move the [r.row] cursor
passCount int // number of rows that passed the predicate
primaryColumnBytes int64
filledColumns = make(map[Column]struct{}, len(r.primaryColumnIndexes))
)
// sequentially apply the predicates.
for i, p := range r.opts.Predicates {
columns, idxs, err := r.predicateColumns(p, func(c Column) bool {
// keep only columns that haven't been filled yet.
_, ok := filledColumns[c]
return !ok
})
if err != nil {
return rowsRead, 0, err
}
var count int
// read the requested number of rows for the first predicate.
if i == 0 {
count, err = r.inner.ReadColumns(ctx, columns, s[:readSize])
if err != nil && !errors.Is(err, io.EOF) {
return 0, 0, err
} else if count == 0 && errors.Is(err, io.EOF) {
return 0, 0, io.EOF
}
rowsRead = count
} else if len(columns) > 0 {
count, err = r.inner.Fill(ctx, columns, s[:readSize])
if err != nil && !errors.Is(err, io.EOF) {
return rowsRead, 0, err
} else if count != readSize {
return rowsRead, 0, fmt.Errorf("failed to fill rows: expected %d, got %d", readSize, count)
}
} else {
count = readSize // required columns are already filled
}
passCount = 0
for i := range count {
size := s[i].SizeOfColumns(idxs)
primaryColumnBytes += size
if !checkPredicate(p, r.origColumnLookup, s[i]) {
continue
}
// We move s[i] to s[passCount] by *swapping* the rows. Copying would
// result in the Row.Values slice existing in two places in the buffer,
// which causes memory corruption when filling in rows.
s[passCount], s[i] = s[i], s[passCount]
passCount++
}
if passCount == 0 {
// No rows passed the predicate, so we can stop early.
break
}
for _, c := range columns {
filledColumns[c] = struct{}{}
}
readSize = passCount
}
r.region.Record(xcap.StatDatasetPrimaryRowsRead.Observe(int64(rowsRead)))
r.region.Record(xcap.StatDatasetPrimaryRowBytes.Observe(primaryColumnBytes))
return rowsRead, passCount, nil
}
// alignRow returns r.row if it is a valid row in ranges, or adjusts r.row to
// the next valid row in ranges.
func (r *Reader) alignRow() (uint64, error) {
if r.ranges.Includes(uint64(r.row)) {
return uint64(r.row), nil
}
nextRow, ok := r.ranges.Next(uint64(r.row))
if !ok {
return 0, io.EOF
}
r.row = int64(nextRow)
return nextRow, nil
}
func checkPredicate(p Predicate, lookup map[Column]int, row Row) bool {
if p == nil {
return true
}
switch p := p.(type) {
case AndPredicate:
return checkPredicate(p.Left, lookup, row) && checkPredicate(p.Right, lookup, row)
case OrPredicate:
return checkPredicate(p.Left, lookup, row) || checkPredicate(p.Right, lookup, row)
case NotPredicate:
return !checkPredicate(p.Inner, lookup, row)
case TruePredicate:
return true
case FalsePredicate:
return false
case EqualPredicate:
columnIndex, ok := lookup[p.Column]
if !ok {
panic("checkPredicate: column not found")
}
return CompareValues(&row.Values[columnIndex], &p.Value) == 0
case InPredicate:
columnIndex, ok := lookup[p.Column]
if !ok {
panic("checkPredicate: column not found")
}
value := row.Values[columnIndex]
if value.IsNil() || value.Type() != p.Column.ColumnDesc().Type.Physical {
return false
}
return p.Values.Contains(value)
case GreaterThanPredicate:
columnIndex, ok := lookup[p.Column]
if !ok {
panic("checkPredicate: column not found")
}
return CompareValues(&row.Values[columnIndex], &p.Value) > 0
case LessThanPredicate:
columnIndex, ok := lookup[p.Column]
if !ok {
panic("checkPredicate: column not found")
}
return CompareValues(&row.Values[columnIndex], &p.Value) < 0
case FuncPredicate:
columnIndex, ok := lookup[p.Column]
if !ok {
panic("checkPredicate: column not found")
}
return p.Keep(p.Column, row.Values[columnIndex])
default:
panic(fmt.Sprintf("unsupported predicate type %T", p))
}
}
// buildMask returns an iterator that yields row ranges from full that are not
// present in s.
//
// buildMask will panic if any index in s is outside the range of full.
func buildMask(full rowRange, s []Row) iter.Seq[rowRange] {
return func(yield func(rowRange) bool) {
// Rows in s are in ascending order, but there may be gaps between rows. We
// need to return ranges of rows in full that are not in s.
if len(s) == 0 {
yield(full)
return
}
start := full.Start
for _, row := range s {
if !full.Contains(uint64(row.Index)) {
panic("buildMask: row index out of range")
}
if uint64(row.Index) != start {
// If start is 1 and row.Index is 5, then the excluded range is (1, 4):
//
// (start, row.Index - 1).
if !yield(rowRange{Start: start, End: uint64(row.Index) - 1}) {
return
}
}
start = uint64(row.Index) + 1
}
if start <= full.End {
if !yield(rowRange{Start: start, End: full.End}) {
return
}
}
}
}
// Close closes the Reader. Closed Readers can be reused by calling
// [Reader.Reset].
func (r *Reader) Close() error {
r.region.End()
if r.inner != nil {
return r.inner.Close()
}
return nil
}
// Reset discards any state and resets the Reader with a new set of options.
// This permits reusing a Reader rather than allocating a new one.
func (r *Reader) Reset(opts ReaderOptions) {
r.opts = opts
// There's not much work Reset can do without a context, since it needs to
// retrieve page info. We'll defer this work to an init function. This also
// unfortunately means that we might not reset page readers until the first
// call to Read.
if r.origColumnLookup == nil {
r.origColumnLookup = make(map[Column]int, len(opts.Columns))
}
clear(r.origColumnLookup)
for i, c := range opts.Columns {
r.origColumnLookup[c] = i
}
r.row = 0
r.ranges = sliceclear.Clear(r.ranges)
r.primaryColumnIndexes = sliceclear.Clear(r.primaryColumnIndexes)
r.ready = false
r.region = nil
}
func (r *Reader) init(ctx context.Context) error {
// Reader.init is kept close to the defition of Reader.Reset to make it
// easier to follow the correctness of resetting + initializing.
_, r.region = xcap.StartRegion(ctx, "dataset.Reader")
// r.validatePredicate must be called before initializing anything else; for
// simplicity, other functions assume that the predicate is valid and can
// panic if it isn't.
if err := r.validatePredicate(); err != nil {
return err
}
if err := r.initDownloader(ctx); err != nil {
return err
}
if r.inner == nil {
r.inner = newBasicReader(r.allColumns())
} else {
r.inner.Reset(r.allColumns())
}
r.ready = true
return nil
}
// allColumns returns the full set of column to read. If r was configured with
// prefetching, wrapped columns from [readerDownloader] are returned. Otherwise,
// the columns of the original dataset are returned.
func (r *Reader) allColumns() []Column {
if r.opts.Prefetch {
return r.dl.AllColumns()
}
return r.dl.OrigColumns()
}
// primaryColumns returns the primary columns to read. If r was configured with
// prefetching, wrapped columns from [readerDownloader] are returned. Otherwise,
// the primary columns of the original dataset are returned.
func (r *Reader) primaryColumns() []Column {
if r.opts.Prefetch {
return r.dl.PrimaryColumns()
}
return r.dl.OrigPrimaryColumns()
}
// secondaryColumns returns the secondary columns to read. If r was configured with
// prefetching, wrapped columns from [readerDownloader] are returned. Otherwise,
// the secondary columns of the original dataset are returned.
func (r *Reader) secondaryColumns() []Column {
if r.opts.Prefetch {
return r.dl.SecondaryColumns()
}
return r.dl.OrigSecondaryColumns()
}
// validatePredicate ensures that all columns used in a predicate have been
// provided in [ReaderOptions].
func (r *Reader) validatePredicate() error {
process := func(c Column) error {
_, ok := r.origColumnLookup[c]
if !ok {
return fmt.Errorf("predicate column %v not found in Reader columns", c)
}
return nil
}
var err error
for _, pp := range r.opts.Predicates {
WalkPredicate(pp, func(p Predicate) bool {
if err != nil {
return false
}
switch p := p.(type) {
case EqualPredicate:
err = process(p.Column)
case InPredicate:
err = process(p.Column)
case GreaterThanPredicate:
err = process(p.Column)
case LessThanPredicate:
err = process(p.Column)
case FuncPredicate:
err = process(p.Column)
case AndPredicate, OrPredicate, NotPredicate, TruePredicate, FalsePredicate, nil:
// No columns to process.
default:
panic(fmt.Sprintf("dataset.Reader.validatePredicate: unsupported predicate type %T", p))
}
return true // Continue walking the Predicate.
})
}
return err
}
// initDownloader initializes the reader's [readerDownloader]. initDownloader is
// always used to reduce the number of conditions.
func (r *Reader) initDownloader(ctx context.Context) error {
// The downloader is initialized in three steps:
//
// 1. Give it the inner dataset.
// 2. Add columns with a flag of whether a column is primary or secondary.
// 3. Provide the overall dataset row ranges that will be valid to read.
if r.dl == nil {
r.dl = newReaderDownloader(r.opts.Dataset)
} else {
r.dl.Reset(r.opts.Dataset)
}
mask := bitmask.New(len(r.opts.Columns))
r.fillPrimaryMask(mask)
for i, column := range r.opts.Columns {
primary := mask.Test(i)
r.dl.AddColumn(column, primary)
if primary {
r.primaryColumnIndexes = append(r.primaryColumnIndexes, i)
r.region.Record(xcap.StatDatasetPrimaryColumns.Observe(1))
r.region.Record(xcap.StatDatasetPrimaryColumnPages.Observe(int64(column.ColumnDesc().PagesCount)))
} else {
r.region.Record(xcap.StatDatasetSecondaryColumns.Observe(1))
r.region.Record(xcap.StatDatasetSecondaryColumnPages.Observe(int64(column.ColumnDesc().PagesCount)))
}
}
var ranges rowRanges
var err error
if len(r.opts.Predicates) == 0 { // no predicates, build full range
ranges, err = r.buildPredicateRanges(ctx, nil)
if err != nil {
return err
}
} else {
for _, p := range r.opts.Predicates {
rr, err := r.buildPredicateRanges(ctx, p)
if err != nil {
return err
}
if ranges == nil {
ranges = rr
} else {
ranges = intersectRanges(nil, ranges, rr)
}
}
}
r.dl.SetDatasetRanges(ranges)
r.ranges = ranges
var rowsCount uint64
for _, column := range r.allColumns() {
rowsCount = max(rowsCount, uint64(column.ColumnDesc().RowsCount))
}
r.region.Record(xcap.StatDatasetMaxRows.Observe(int64(rowsCount)))
r.region.Record(xcap.StatDatasetRowsAfterPruning.Observe(int64(ranges.TotalRowCount())))
return nil
}
func (r *Reader) fillPrimaryMask(mask *bitmask.Mask) {
process := func(c Column) {
idx, ok := r.origColumnLookup[c]
if !ok {
// This shouldn't be reachable: before we initialize anything we ensure
// that all columns in the predicate are available in r.opts.Columns.
panic("fillPrimaryMask: column not found")
}
mask.Set(idx)
}
// If there's no predicate, all columns are primary.
if len(r.opts.Predicates) == 0 {
for _, c := range r.opts.Columns {
process(c)
}
return
}
for _, pp := range r.opts.Predicates {
// If there is a predicate, primary columns are those used in the predicate.
WalkPredicate(pp, func(p Predicate) bool {
switch p := p.(type) {
case EqualPredicate:
process(p.Column)
case InPredicate:
process(p.Column)
case GreaterThanPredicate:
process(p.Column)
case LessThanPredicate:
process(p.Column)
case FuncPredicate:
process(p.Column)
case AndPredicate, OrPredicate, NotPredicate, TruePredicate, FalsePredicate, nil:
// No columns to process.
default:
panic(fmt.Sprintf("dataset.Reader.fillPrimaryMask: unsupported predicate type %T", p))
}
return true // Continue walking the Predicate.
})
}
}
// buildPredicateRanges returns a set of rowRanges that are valid to read based
// on the provided predicate. If p is nil or the predicate is unsupported, the
// entire dataset range is valid.
//
// r.dl must be initialized before calling buildPredicateRanges.
func (r *Reader) buildPredicateRanges(ctx context.Context, p Predicate) (rowRanges, error) {
// TODO(rfratto): We could be reusing memory for building ranges here.
switch p := p.(type) {
case AndPredicate:
left, err := r.buildPredicateRanges(ctx, p.Left)
if err != nil {
return nil, err
}
right, err := r.buildPredicateRanges(ctx, p.Right)
if err != nil {
return nil, err
}
return intersectRanges(nil, left, right), nil
case OrPredicate:
left, err := r.buildPredicateRanges(ctx, p.Left)
if err != nil {
return nil, err
}
right, err := r.buildPredicateRanges(ctx, p.Right)
if err != nil {
return nil, err
}
return unionRanges(nil, left, right), nil
case NotPredicate:
// De Morgan's laws must be applied to reduce the NotPredicate to a set of
// predicates that can be applied to pages.
//
// See comment on [simplifyNotPredicate] for more information.
simplified, err := simplifyNotPredicate(p)
if err != nil {
// Predicate can't be simplfied, so we permit the full range.
var rowsCount uint64
for _, column := range r.allColumns() {
rowsCount = max(rowsCount, uint64(column.ColumnDesc().RowsCount))
}
return rowRanges{{Start: 0, End: rowsCount - 1}}, nil
}
return r.buildPredicateRanges(ctx, simplified)
case FalsePredicate:
return nil, nil // No valid ranges.
case EqualPredicate:
return r.buildColumnPredicateRanges(ctx, p.Column, p)
case InPredicate:
return r.buildColumnPredicateRanges(ctx, p.Column, p)
case GreaterThanPredicate:
return r.buildColumnPredicateRanges(ctx, p.Column, p)
case LessThanPredicate:
return r.buildColumnPredicateRanges(ctx, p.Column, p)
case TruePredicate, FuncPredicate, nil:
// These predicates (and nil) don't support any filtering, so it maps to
// the full range being valid.
//
// We use r.dl.AllColumns instead of r.opts.Columns because the downloader
// will cache metadata.
var rowsCount uint64
for _, column := range r.allColumns() {
rowsCount = max(rowsCount, uint64(column.ColumnDesc().RowsCount))
}
return rowRanges{{Start: 0, End: rowsCount - 1}}, nil
default:
panic(fmt.Sprintf("unsupported predicate type %T", p))
}
}
// simplifyNotPredicate applies De Morgan's laws to a NotPredicate to permit
// page filtering.
//
// While during evaluation, a NotPredicate inverts the result of the inner
// predicate, the same can't be done for page filtering. For example, imagine
// that a page is included from a rule "a > 10." If we inverted that inclusion,
// we may be incorrectly filtering out that page, as that page may also have
// values less than 10.
//
// To correctly apply page filtering to a NotPredicate, we reduce the
// NotPredicate to a set of predicates that can be applied to pages. This may
// result in other NotPredicates that also need to be simplified.
//
// If the NotPredicate can't be simplified, simplifyNotPredicate returns an
// error.
func simplifyNotPredicate(p NotPredicate) (Predicate, error) {
switch inner := p.Inner.(type) {
case AndPredicate: // De Morgan's law: !(A && B) == !A || !B
return OrPredicate{
Left: NotPredicate{Inner: inner.Left},
Right: NotPredicate{Inner: inner.Right},
}, nil
case OrPredicate: // De Morgan's law: !(A || B) == !A && !B
return AndPredicate{
Left: NotPredicate{Inner: inner.Left},
Right: NotPredicate{Inner: inner.Right},
}, nil
case NotPredicate: // De Morgan's law: !!A == A
return inner.Inner, nil
case FalsePredicate:
return TruePredicate{}, nil
case EqualPredicate: // De Morgan's law: !(A == B) == A != B == A < B || A > B
return OrPredicate{
Left: LessThanPredicate(inner),
Right: GreaterThanPredicate(inner),
}, nil
case GreaterThanPredicate: // De Morgan's law: !(A > B) == A <= B
return OrPredicate{
Left: EqualPredicate(inner),
Right: LessThanPredicate(inner),
}, nil
case LessThanPredicate: // De Morgan's law: !(A < B) == A >= B
return OrPredicate{
Left: EqualPredicate(inner),
Right: GreaterThanPredicate(inner),
}, nil
case InPredicate:
// TODO: can be supported when we introduce NotInPredicate.
return nil, fmt.Errorf("can't simplify InPredicate")
case FuncPredicate:
return nil, fmt.Errorf("can't simplify FuncPredicate")
default:
panic(fmt.Sprintf("unsupported predicate type %T", inner))
}
}
// buildColumnPredicateRanges returns a set of rowRanges that are valid based
// on whether EqualPredicate, InPredicate, GreaterThanPredicate, or LessThanPredicate may be
// true for each page in a column.
func (r *Reader) buildColumnPredicateRanges(ctx context.Context, c Column, p Predicate) (rowRanges, error) {
// Get the wrapped column so that the result of c.ListPages can be cached.
if idx, ok := r.origColumnLookup[c]; ok {
c = r.allColumns()[idx]
} else {
return nil, fmt.Errorf("column %v not found in Reader columns", c)
}
var ranges rowRanges
var (
pageStart int
lastPageSize int
)
for result := range c.ListPages(ctx) {
pageStart += lastPageSize
page, err := result.Value()
if err != nil {
return nil, err
}
pageInfo := page.PageDesc()
lastPageSize = pageInfo.RowCount
pageRange := rowRange{
Start: uint64(pageStart),
End: uint64(pageStart + pageInfo.RowCount - 1),
}
minValue, maxValue, err := readMinMax(pageInfo.Stats)
if err != nil {
return nil, fmt.Errorf("failed to read page stats: %w", err)
} else if minValue.IsNil() || maxValue.IsNil() {
// No stats, so we add the whole range.
ranges.Add(pageRange)
continue
}
var include bool
switch p := p.(type) {
case EqualPredicate: // EqualPredicate may be true if p.Value is inside the range of the page.
isEmpty := p.Value.Type() == datasetmd.PHYSICAL_TYPE_BINARY && p.Value.IsZero()
include = isEmpty || (CompareValues(&p.Value, &minValue) >= 0 && CompareValues(&p.Value, &maxValue) <= 0)
case GreaterThanPredicate: // GreaterThanPredicate may be true if maxValue of a page is greater than p.Value
include = CompareValues(&maxValue, &p.Value) > 0
case LessThanPredicate: // LessThanPredicate may be true if minValue of a page is less than p.Value
include = CompareValues(&minValue, &p.Value) < 0
case InPredicate:
// Check if any value falls within the page's range
for v := range p.Values.Iter() {
if CompareValues(&v, &minValue) >= 0 && CompareValues(&v, &maxValue) <= 0 {
include = true
break
}
}
default:
panic(fmt.Sprintf("unsupported predicate type %T", p))
}
if include {
ranges.Add(pageRange)
}
}
return ranges, nil
}
// readMinMax reads the minimum and maximum values from the provided
// statistics. If either minValue or maxValue is NULL, the value is not present
// in the statistics.
func readMinMax(stats *datasetmd.Statistics) (minValue Value, maxValue Value, err error) {
// TODO(rfratto): We should have a dataset-specific Statistics type that
// already has the min and max value decoded, and make it the decoders just
// to read these.
if stats == nil {
return
}
if err := minValue.UnmarshalBinary(stats.MinValue); err != nil {
return Value{}, Value{}, fmt.Errorf("failed to unmarshal min value: %w", err)
} else if err := maxValue.UnmarshalBinary(stats.MaxValue); err != nil {
return Value{}, Value{}, fmt.Errorf("failed to unmarshal max value: %w", err)
}
return
}
func (r *Reader) predicateColumns(p Predicate, keep func(c Column) bool) ([]Column, []int, error) {
columns := make(map[Column]struct{})
WalkPredicate(p, func(p Predicate) bool {
switch p := p.(type) {
case EqualPredicate:
columns[p.Column] = struct{}{}
case InPredicate:
columns[p.Column] = struct{}{}
case GreaterThanPredicate:
columns[p.Column] = struct{}{}
case LessThanPredicate:
columns[p.Column] = struct{}{}
case FuncPredicate:
columns[p.Column] = struct{}{}
case AndPredicate, OrPredicate, NotPredicate, TruePredicate, FalsePredicate, nil:
// No columns to process.
default:
panic(fmt.Sprintf("predicateColumns: unsupported predicate type %T", p))
}
return true
})
ret := make([]Column, 0, len(columns))
idxs := make([]int, 0, len(columns))
for c := range columns {
idx, ok := r.origColumnLookup[c]
if !ok {
panic(fmt.Errorf("predicateColumns: column %v not found in Reader columns", c))
}
c := r.allColumns()[idx]
if !keep(c) {
continue
}
idxs = append(idxs, idx)
ret = append(ret, c)
}
return ret, idxs, nil
}