mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
923 lines
27 KiB
923 lines
27 KiB
package dataset
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"iter"
|
|
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/bitmask"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/sliceclear"
|
|
"github.com/grafana/loki/v3/pkg/xcap"
|
|
)
|
|
|
|
// ReaderOptions configures how a [Reader] will read [Row]s.
|
|
type ReaderOptions struct {
|
|
Dataset Dataset // Dataset to read from.
|
|
|
|
// Columns to read from the Dataset. It is invalid to provide a Column that
|
|
// is not in Dataset.
|
|
//
|
|
// The set of Columns can include columns not used in Predicate; such columns
|
|
// are considered non-predicate columns.
|
|
Columns []Column
|
|
|
|
// Predicates filter the data returned by a Reader. Predicates are optional; if
|
|
// nil, all rows from Columns are returned.
|
|
//
|
|
// Expressions in Predicate may only reference columns in Columns.
|
|
// Holds a list of predicates that can be sequentially applied to the dataset.
|
|
Predicates []Predicate
|
|
|
|
// Prefetch enables bulk retrieving pages from the dataset when reading
|
|
// starts. To reduce read latency, this option should only be disabled when
|
|
// the entire Dataset is already held in memory.
|
|
Prefetch bool
|
|
}
|
|
|
|
// A Reader reads [Row]s from a [Dataset].
|
|
type Reader struct {
|
|
opts ReaderOptions
|
|
ready bool // ready is true if the Reader has been initialized.
|
|
|
|
origColumnLookup map[Column]int // Find the index of a column in opts.Columns.
|
|
primaryColumnIndexes []int // Indexes of primary columns in opts.Columns.
|
|
|
|
dl *readerDownloader // Bulk page download manager.
|
|
row int64 // The current row being read.
|
|
inner *basicReader // Underlying reader that reads from columns.
|
|
ranges rowRanges // Valid ranges to read across the entire dataset.
|
|
|
|
region *xcap.Region // Region for recording statistics.
|
|
}
|
|
|
|
// NewReader creates a new Reader from the provided options.
|
|
func NewReader(opts ReaderOptions) *Reader {
|
|
var r Reader
|
|
r.Reset(opts)
|
|
return &r
|
|
}
|
|
|
|
// Read reads up to the next len(s) rows from r and stores them into s. It
|
|
// returns the number of rows read and any error encountered. At the end of the
|
|
// Dataset, Read returns 0, [io.EOF].
|
|
func (r *Reader) Read(ctx context.Context, s []Row) (int, error) {
|
|
if len(s) == 0 {
|
|
return 0, nil
|
|
}
|
|
|
|
if !r.ready {
|
|
err := r.init(ctx)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("initializing reader: %w", err)
|
|
}
|
|
}
|
|
|
|
r.region.Record(xcap.StatDatasetReadCalls.Observe(1))
|
|
ctx = xcap.ContextWithRegion(ctx, r.region)
|
|
|
|
// Our Read implementation works by:
|
|
//
|
|
// 1. Determining the next row to read (aligned to a valid range),
|
|
// 2. reading rows from primary columns up to len(s) or to the end of the
|
|
// range (whichever is lower),
|
|
// 3. filtering those read rows based on the predicate, and finally
|
|
// 4. filling the remaining secondary columns in the rows that pass the
|
|
// predicate.
|
|
//
|
|
// If a predicate is defined, primary columns are those used in the
|
|
// predicate, and secondary columns are those not used in the predicate. If
|
|
// there isn't a predicate, all columns are primary and no columns are
|
|
// secondary.
|
|
//
|
|
// This approach means that one call to Read may return 0, nil if the
|
|
// predicate filters out all rows, even if there are more rows in the dataset
|
|
// that may pass the predicate.
|
|
//
|
|
// This *could* be improved by running the above steps in a loop until len(s)
|
|
// rows have been read or we hit EOF. However, this would cause the remainder
|
|
// of s to shrink each loop iteration, leading to more inner Reads (as the
|
|
// buffer gets smaller and smaller).
|
|
//
|
|
// Improvements on this process must consider the trade-offs between less
|
|
// calls to [Reader.Read] while permitting the buffer to be as big as
|
|
// possible on each call.
|
|
|
|
row, err := r.alignRow()
|
|
if err != nil {
|
|
return 0, err
|
|
} else if _, err := r.inner.Seek(int64(row), io.SeekStart); err != nil {
|
|
return 0, fmt.Errorf("failed to seek to row %d: %w", row, err)
|
|
}
|
|
|
|
currentRange, ok := r.ranges.Range(row)
|
|
if !ok {
|
|
// This should be unreachable; alignToRange already ensures that we're in a
|
|
// range, or it returns io.EOF.
|
|
return 0, fmt.Errorf("failed to find range for row %d", row)
|
|
}
|
|
|
|
readSize := min(len(s), int(currentRange.End-row+1))
|
|
|
|
readRange := rowRange{
|
|
Start: row,
|
|
End: row + uint64(readSize) - 1,
|
|
}
|
|
r.dl.SetReadRange(readRange)
|
|
|
|
var (
|
|
rowsRead int // tracks max rows accessed to move the [r.row] cursor
|
|
passCount int // tracks how many rows passed the predicate
|
|
)
|
|
|
|
// If there are no predicates, read all columns in the dataset
|
|
if len(r.opts.Predicates) == 0 {
|
|
count, err := r.inner.ReadColumns(ctx, r.primaryColumns(), s[:readSize])
|
|
if err != nil && !errors.Is(err, io.EOF) {
|
|
return count, err
|
|
} else if count == 0 && errors.Is(err, io.EOF) {
|
|
return 0, io.EOF
|
|
}
|
|
|
|
rowsRead = count
|
|
passCount = count
|
|
|
|
var primaryColumnBytes int64
|
|
for i := range count {
|
|
primaryColumnBytes += s[i].Size()
|
|
}
|
|
|
|
r.region.Record(xcap.StatDatasetPrimaryRowsRead.Observe(int64(rowsRead)))
|
|
r.region.Record(xcap.StatDatasetPrimaryRowBytes.Observe(primaryColumnBytes))
|
|
} else {
|
|
rowsRead, passCount, err = r.readAndFilterPrimaryColumns(ctx, readSize, s[:readSize])
|
|
if err != nil {
|
|
return passCount, err
|
|
}
|
|
}
|
|
|
|
if secondary := r.secondaryColumns(); len(secondary) > 0 && passCount > 0 {
|
|
// Mask out any ranges that aren't in s[:passCount], so that filling in
|
|
// secondary columns doesn't consider downloading pages not used for the
|
|
// Fill.
|
|
//
|
|
// This mask is only needed for secondary filling in our read range, as the
|
|
// next call to [Reader.Read] will move the read range forward and these
|
|
// rows will never be considered.
|
|
for maskedRange := range buildMask(readRange, s[:passCount]) {
|
|
r.dl.Mask(maskedRange)
|
|
}
|
|
|
|
count, err := r.inner.Fill(ctx, secondary, s[:passCount])
|
|
if err != nil && !errors.Is(err, io.EOF) {
|
|
return count, err
|
|
} else if count != passCount {
|
|
return count, fmt.Errorf("failed to fill rows: expected %d, got %d", passCount, count)
|
|
}
|
|
|
|
var totalBytesFilled int64
|
|
for i := range count {
|
|
totalBytesFilled += s[i].Size() - s[i].SizeOfColumns(r.primaryColumnIndexes)
|
|
}
|
|
|
|
r.region.Record(xcap.StatDatasetSecondaryRowsRead.Observe(int64(count)))
|
|
r.region.Record(xcap.StatDatasetSecondaryRowBytes.Observe(totalBytesFilled))
|
|
}
|
|
|
|
// We only advance r.row after we successfully read and filled rows. This
|
|
// allows the caller to retry reading rows if a sporadic error occurs.
|
|
r.row += int64(rowsRead)
|
|
return passCount, nil
|
|
}
|
|
|
|
// readAndFilterPrimaryColumns reads the primary columns from the dataset
|
|
// and filters the rows by sequentially applying the predicates.
|
|
//
|
|
// For each predicate evaluation, only the required columns are loaded.
|
|
// Rows are filtered at each step with subsequent predicates only having to fill
|
|
// the columns on the reduced row range.
|
|
//
|
|
// It returns the max rows read, rows that passed all the predicates, and any error
|
|
func (r *Reader) readAndFilterPrimaryColumns(ctx context.Context, readSize int, s []Row) (int, int, error) {
|
|
var (
|
|
rowsRead int // tracks max rows accessed to move the [r.row] cursor
|
|
passCount int // number of rows that passed the predicate
|
|
primaryColumnBytes int64
|
|
filledColumns = make(map[Column]struct{}, len(r.primaryColumnIndexes))
|
|
)
|
|
|
|
// sequentially apply the predicates.
|
|
for i, p := range r.opts.Predicates {
|
|
columns, idxs, err := r.predicateColumns(p, func(c Column) bool {
|
|
// keep only columns that haven't been filled yet.
|
|
_, ok := filledColumns[c]
|
|
return !ok
|
|
})
|
|
if err != nil {
|
|
return rowsRead, 0, err
|
|
}
|
|
|
|
var count int
|
|
// read the requested number of rows for the first predicate.
|
|
if i == 0 {
|
|
count, err = r.inner.ReadColumns(ctx, columns, s[:readSize])
|
|
if err != nil && !errors.Is(err, io.EOF) {
|
|
return 0, 0, err
|
|
} else if count == 0 && errors.Is(err, io.EOF) {
|
|
return 0, 0, io.EOF
|
|
}
|
|
|
|
rowsRead = count
|
|
} else if len(columns) > 0 {
|
|
count, err = r.inner.Fill(ctx, columns, s[:readSize])
|
|
if err != nil && !errors.Is(err, io.EOF) {
|
|
return rowsRead, 0, err
|
|
} else if count != readSize {
|
|
return rowsRead, 0, fmt.Errorf("failed to fill rows: expected %d, got %d", readSize, count)
|
|
}
|
|
} else {
|
|
count = readSize // required columns are already filled
|
|
}
|
|
|
|
passCount = 0
|
|
for i := range count {
|
|
size := s[i].SizeOfColumns(idxs)
|
|
primaryColumnBytes += size
|
|
|
|
if !checkPredicate(p, r.origColumnLookup, s[i]) {
|
|
continue
|
|
}
|
|
// We move s[i] to s[passCount] by *swapping* the rows. Copying would
|
|
// result in the Row.Values slice existing in two places in the buffer,
|
|
// which causes memory corruption when filling in rows.
|
|
s[passCount], s[i] = s[i], s[passCount]
|
|
passCount++
|
|
}
|
|
|
|
if passCount == 0 {
|
|
// No rows passed the predicate, so we can stop early.
|
|
break
|
|
}
|
|
|
|
for _, c := range columns {
|
|
filledColumns[c] = struct{}{}
|
|
}
|
|
|
|
readSize = passCount
|
|
}
|
|
|
|
r.region.Record(xcap.StatDatasetPrimaryRowsRead.Observe(int64(rowsRead)))
|
|
r.region.Record(xcap.StatDatasetPrimaryRowBytes.Observe(primaryColumnBytes))
|
|
|
|
return rowsRead, passCount, nil
|
|
}
|
|
|
|
// alignRow returns r.row if it is a valid row in ranges, or adjusts r.row to
|
|
// the next valid row in ranges.
|
|
func (r *Reader) alignRow() (uint64, error) {
|
|
if r.ranges.Includes(uint64(r.row)) {
|
|
return uint64(r.row), nil
|
|
}
|
|
|
|
nextRow, ok := r.ranges.Next(uint64(r.row))
|
|
if !ok {
|
|
return 0, io.EOF
|
|
}
|
|
r.row = int64(nextRow)
|
|
return nextRow, nil
|
|
}
|
|
|
|
func checkPredicate(p Predicate, lookup map[Column]int, row Row) bool {
|
|
if p == nil {
|
|
return true
|
|
}
|
|
|
|
switch p := p.(type) {
|
|
case AndPredicate:
|
|
return checkPredicate(p.Left, lookup, row) && checkPredicate(p.Right, lookup, row)
|
|
|
|
case OrPredicate:
|
|
return checkPredicate(p.Left, lookup, row) || checkPredicate(p.Right, lookup, row)
|
|
|
|
case NotPredicate:
|
|
return !checkPredicate(p.Inner, lookup, row)
|
|
|
|
case TruePredicate:
|
|
return true
|
|
|
|
case FalsePredicate:
|
|
return false
|
|
|
|
case EqualPredicate:
|
|
columnIndex, ok := lookup[p.Column]
|
|
if !ok {
|
|
panic("checkPredicate: column not found")
|
|
}
|
|
return CompareValues(&row.Values[columnIndex], &p.Value) == 0
|
|
|
|
case InPredicate:
|
|
columnIndex, ok := lookup[p.Column]
|
|
if !ok {
|
|
panic("checkPredicate: column not found")
|
|
}
|
|
|
|
value := row.Values[columnIndex]
|
|
if value.IsNil() || value.Type() != p.Column.ColumnDesc().Type.Physical {
|
|
return false
|
|
}
|
|
return p.Values.Contains(value)
|
|
|
|
case GreaterThanPredicate:
|
|
columnIndex, ok := lookup[p.Column]
|
|
if !ok {
|
|
panic("checkPredicate: column not found")
|
|
}
|
|
return CompareValues(&row.Values[columnIndex], &p.Value) > 0
|
|
|
|
case LessThanPredicate:
|
|
columnIndex, ok := lookup[p.Column]
|
|
if !ok {
|
|
panic("checkPredicate: column not found")
|
|
}
|
|
return CompareValues(&row.Values[columnIndex], &p.Value) < 0
|
|
|
|
case FuncPredicate:
|
|
columnIndex, ok := lookup[p.Column]
|
|
if !ok {
|
|
panic("checkPredicate: column not found")
|
|
}
|
|
return p.Keep(p.Column, row.Values[columnIndex])
|
|
|
|
default:
|
|
panic(fmt.Sprintf("unsupported predicate type %T", p))
|
|
}
|
|
}
|
|
|
|
// buildMask returns an iterator that yields row ranges from full that are not
|
|
// present in s.
|
|
//
|
|
// buildMask will panic if any index in s is outside the range of full.
|
|
func buildMask(full rowRange, s []Row) iter.Seq[rowRange] {
|
|
return func(yield func(rowRange) bool) {
|
|
// Rows in s are in ascending order, but there may be gaps between rows. We
|
|
// need to return ranges of rows in full that are not in s.
|
|
if len(s) == 0 {
|
|
yield(full)
|
|
return
|
|
}
|
|
|
|
start := full.Start
|
|
|
|
for _, row := range s {
|
|
if !full.Contains(uint64(row.Index)) {
|
|
panic("buildMask: row index out of range")
|
|
}
|
|
|
|
if uint64(row.Index) != start {
|
|
// If start is 1 and row.Index is 5, then the excluded range is (1, 4):
|
|
//
|
|
// (start, row.Index - 1).
|
|
if !yield(rowRange{Start: start, End: uint64(row.Index) - 1}) {
|
|
return
|
|
}
|
|
}
|
|
|
|
start = uint64(row.Index) + 1
|
|
}
|
|
|
|
if start <= full.End {
|
|
if !yield(rowRange{Start: start, End: full.End}) {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Close closes the Reader. Closed Readers can be reused by calling
|
|
// [Reader.Reset].
|
|
func (r *Reader) Close() error {
|
|
r.region.End()
|
|
|
|
if r.inner != nil {
|
|
return r.inner.Close()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Reset discards any state and resets the Reader with a new set of options.
|
|
// This permits reusing a Reader rather than allocating a new one.
|
|
func (r *Reader) Reset(opts ReaderOptions) {
|
|
r.opts = opts
|
|
|
|
// There's not much work Reset can do without a context, since it needs to
|
|
// retrieve page info. We'll defer this work to an init function. This also
|
|
// unfortunately means that we might not reset page readers until the first
|
|
// call to Read.
|
|
if r.origColumnLookup == nil {
|
|
r.origColumnLookup = make(map[Column]int, len(opts.Columns))
|
|
}
|
|
clear(r.origColumnLookup)
|
|
for i, c := range opts.Columns {
|
|
r.origColumnLookup[c] = i
|
|
}
|
|
|
|
r.row = 0
|
|
r.ranges = sliceclear.Clear(r.ranges)
|
|
r.primaryColumnIndexes = sliceclear.Clear(r.primaryColumnIndexes)
|
|
r.ready = false
|
|
r.region = nil
|
|
}
|
|
|
|
func (r *Reader) init(ctx context.Context) error {
|
|
// Reader.init is kept close to the defition of Reader.Reset to make it
|
|
// easier to follow the correctness of resetting + initializing.
|
|
|
|
_, r.region = xcap.StartRegion(ctx, "dataset.Reader")
|
|
|
|
// r.validatePredicate must be called before initializing anything else; for
|
|
// simplicity, other functions assume that the predicate is valid and can
|
|
// panic if it isn't.
|
|
if err := r.validatePredicate(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := r.initDownloader(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
if r.inner == nil {
|
|
r.inner = newBasicReader(r.allColumns())
|
|
} else {
|
|
r.inner.Reset(r.allColumns())
|
|
}
|
|
|
|
r.ready = true
|
|
return nil
|
|
}
|
|
|
|
// allColumns returns the full set of column to read. If r was configured with
|
|
// prefetching, wrapped columns from [readerDownloader] are returned. Otherwise,
|
|
// the columns of the original dataset are returned.
|
|
func (r *Reader) allColumns() []Column {
|
|
if r.opts.Prefetch {
|
|
return r.dl.AllColumns()
|
|
}
|
|
return r.dl.OrigColumns()
|
|
}
|
|
|
|
// primaryColumns returns the primary columns to read. If r was configured with
|
|
// prefetching, wrapped columns from [readerDownloader] are returned. Otherwise,
|
|
// the primary columns of the original dataset are returned.
|
|
func (r *Reader) primaryColumns() []Column {
|
|
if r.opts.Prefetch {
|
|
return r.dl.PrimaryColumns()
|
|
}
|
|
return r.dl.OrigPrimaryColumns()
|
|
}
|
|
|
|
// secondaryColumns returns the secondary columns to read. If r was configured with
|
|
// prefetching, wrapped columns from [readerDownloader] are returned. Otherwise,
|
|
// the secondary columns of the original dataset are returned.
|
|
func (r *Reader) secondaryColumns() []Column {
|
|
if r.opts.Prefetch {
|
|
return r.dl.SecondaryColumns()
|
|
}
|
|
return r.dl.OrigSecondaryColumns()
|
|
}
|
|
|
|
// validatePredicate ensures that all columns used in a predicate have been
|
|
// provided in [ReaderOptions].
|
|
func (r *Reader) validatePredicate() error {
|
|
process := func(c Column) error {
|
|
_, ok := r.origColumnLookup[c]
|
|
if !ok {
|
|
return fmt.Errorf("predicate column %v not found in Reader columns", c)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
var err error
|
|
|
|
for _, pp := range r.opts.Predicates {
|
|
WalkPredicate(pp, func(p Predicate) bool {
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
switch p := p.(type) {
|
|
case EqualPredicate:
|
|
err = process(p.Column)
|
|
case InPredicate:
|
|
err = process(p.Column)
|
|
case GreaterThanPredicate:
|
|
err = process(p.Column)
|
|
case LessThanPredicate:
|
|
err = process(p.Column)
|
|
case FuncPredicate:
|
|
err = process(p.Column)
|
|
case AndPredicate, OrPredicate, NotPredicate, TruePredicate, FalsePredicate, nil:
|
|
// No columns to process.
|
|
default:
|
|
panic(fmt.Sprintf("dataset.Reader.validatePredicate: unsupported predicate type %T", p))
|
|
}
|
|
|
|
return true // Continue walking the Predicate.
|
|
})
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// initDownloader initializes the reader's [readerDownloader]. initDownloader is
|
|
// always used to reduce the number of conditions.
|
|
func (r *Reader) initDownloader(ctx context.Context) error {
|
|
// The downloader is initialized in three steps:
|
|
//
|
|
// 1. Give it the inner dataset.
|
|
// 2. Add columns with a flag of whether a column is primary or secondary.
|
|
// 3. Provide the overall dataset row ranges that will be valid to read.
|
|
|
|
if r.dl == nil {
|
|
r.dl = newReaderDownloader(r.opts.Dataset)
|
|
} else {
|
|
r.dl.Reset(r.opts.Dataset)
|
|
}
|
|
|
|
mask := bitmask.New(len(r.opts.Columns))
|
|
r.fillPrimaryMask(mask)
|
|
|
|
for i, column := range r.opts.Columns {
|
|
primary := mask.Test(i)
|
|
r.dl.AddColumn(column, primary)
|
|
|
|
if primary {
|
|
r.primaryColumnIndexes = append(r.primaryColumnIndexes, i)
|
|
r.region.Record(xcap.StatDatasetPrimaryColumns.Observe(1))
|
|
r.region.Record(xcap.StatDatasetPrimaryColumnPages.Observe(int64(column.ColumnDesc().PagesCount)))
|
|
} else {
|
|
r.region.Record(xcap.StatDatasetSecondaryColumns.Observe(1))
|
|
r.region.Record(xcap.StatDatasetSecondaryColumnPages.Observe(int64(column.ColumnDesc().PagesCount)))
|
|
}
|
|
}
|
|
|
|
var ranges rowRanges
|
|
var err error
|
|
if len(r.opts.Predicates) == 0 { // no predicates, build full range
|
|
ranges, err = r.buildPredicateRanges(ctx, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
for _, p := range r.opts.Predicates {
|
|
rr, err := r.buildPredicateRanges(ctx, p)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if ranges == nil {
|
|
ranges = rr
|
|
} else {
|
|
ranges = intersectRanges(nil, ranges, rr)
|
|
}
|
|
}
|
|
}
|
|
|
|
r.dl.SetDatasetRanges(ranges)
|
|
r.ranges = ranges
|
|
|
|
var rowsCount uint64
|
|
for _, column := range r.allColumns() {
|
|
rowsCount = max(rowsCount, uint64(column.ColumnDesc().RowsCount))
|
|
}
|
|
|
|
r.region.Record(xcap.StatDatasetMaxRows.Observe(int64(rowsCount)))
|
|
r.region.Record(xcap.StatDatasetRowsAfterPruning.Observe(int64(ranges.TotalRowCount())))
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *Reader) fillPrimaryMask(mask *bitmask.Mask) {
|
|
process := func(c Column) {
|
|
idx, ok := r.origColumnLookup[c]
|
|
if !ok {
|
|
// This shouldn't be reachable: before we initialize anything we ensure
|
|
// that all columns in the predicate are available in r.opts.Columns.
|
|
panic("fillPrimaryMask: column not found")
|
|
}
|
|
mask.Set(idx)
|
|
}
|
|
|
|
// If there's no predicate, all columns are primary.
|
|
if len(r.opts.Predicates) == 0 {
|
|
for _, c := range r.opts.Columns {
|
|
process(c)
|
|
}
|
|
return
|
|
}
|
|
|
|
for _, pp := range r.opts.Predicates {
|
|
// If there is a predicate, primary columns are those used in the predicate.
|
|
WalkPredicate(pp, func(p Predicate) bool {
|
|
switch p := p.(type) {
|
|
case EqualPredicate:
|
|
process(p.Column)
|
|
case InPredicate:
|
|
process(p.Column)
|
|
case GreaterThanPredicate:
|
|
process(p.Column)
|
|
case LessThanPredicate:
|
|
process(p.Column)
|
|
case FuncPredicate:
|
|
process(p.Column)
|
|
case AndPredicate, OrPredicate, NotPredicate, TruePredicate, FalsePredicate, nil:
|
|
// No columns to process.
|
|
default:
|
|
panic(fmt.Sprintf("dataset.Reader.fillPrimaryMask: unsupported predicate type %T", p))
|
|
}
|
|
|
|
return true // Continue walking the Predicate.
|
|
})
|
|
}
|
|
}
|
|
|
|
// buildPredicateRanges returns a set of rowRanges that are valid to read based
|
|
// on the provided predicate. If p is nil or the predicate is unsupported, the
|
|
// entire dataset range is valid.
|
|
//
|
|
// r.dl must be initialized before calling buildPredicateRanges.
|
|
func (r *Reader) buildPredicateRanges(ctx context.Context, p Predicate) (rowRanges, error) {
|
|
// TODO(rfratto): We could be reusing memory for building ranges here.
|
|
|
|
switch p := p.(type) {
|
|
case AndPredicate:
|
|
left, err := r.buildPredicateRanges(ctx, p.Left)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
right, err := r.buildPredicateRanges(ctx, p.Right)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return intersectRanges(nil, left, right), nil
|
|
|
|
case OrPredicate:
|
|
left, err := r.buildPredicateRanges(ctx, p.Left)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
right, err := r.buildPredicateRanges(ctx, p.Right)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return unionRanges(nil, left, right), nil
|
|
|
|
case NotPredicate:
|
|
// De Morgan's laws must be applied to reduce the NotPredicate to a set of
|
|
// predicates that can be applied to pages.
|
|
//
|
|
// See comment on [simplifyNotPredicate] for more information.
|
|
simplified, err := simplifyNotPredicate(p)
|
|
if err != nil {
|
|
// Predicate can't be simplfied, so we permit the full range.
|
|
var rowsCount uint64
|
|
for _, column := range r.allColumns() {
|
|
rowsCount = max(rowsCount, uint64(column.ColumnDesc().RowsCount))
|
|
}
|
|
return rowRanges{{Start: 0, End: rowsCount - 1}}, nil
|
|
}
|
|
return r.buildPredicateRanges(ctx, simplified)
|
|
|
|
case FalsePredicate:
|
|
return nil, nil // No valid ranges.
|
|
|
|
case EqualPredicate:
|
|
return r.buildColumnPredicateRanges(ctx, p.Column, p)
|
|
|
|
case InPredicate:
|
|
return r.buildColumnPredicateRanges(ctx, p.Column, p)
|
|
|
|
case GreaterThanPredicate:
|
|
return r.buildColumnPredicateRanges(ctx, p.Column, p)
|
|
|
|
case LessThanPredicate:
|
|
return r.buildColumnPredicateRanges(ctx, p.Column, p)
|
|
|
|
case TruePredicate, FuncPredicate, nil:
|
|
// These predicates (and nil) don't support any filtering, so it maps to
|
|
// the full range being valid.
|
|
//
|
|
// We use r.dl.AllColumns instead of r.opts.Columns because the downloader
|
|
// will cache metadata.
|
|
var rowsCount uint64
|
|
for _, column := range r.allColumns() {
|
|
rowsCount = max(rowsCount, uint64(column.ColumnDesc().RowsCount))
|
|
}
|
|
return rowRanges{{Start: 0, End: rowsCount - 1}}, nil
|
|
|
|
default:
|
|
panic(fmt.Sprintf("unsupported predicate type %T", p))
|
|
}
|
|
}
|
|
|
|
// simplifyNotPredicate applies De Morgan's laws to a NotPredicate to permit
|
|
// page filtering.
|
|
//
|
|
// While during evaluation, a NotPredicate inverts the result of the inner
|
|
// predicate, the same can't be done for page filtering. For example, imagine
|
|
// that a page is included from a rule "a > 10." If we inverted that inclusion,
|
|
// we may be incorrectly filtering out that page, as that page may also have
|
|
// values less than 10.
|
|
//
|
|
// To correctly apply page filtering to a NotPredicate, we reduce the
|
|
// NotPredicate to a set of predicates that can be applied to pages. This may
|
|
// result in other NotPredicates that also need to be simplified.
|
|
//
|
|
// If the NotPredicate can't be simplified, simplifyNotPredicate returns an
|
|
// error.
|
|
func simplifyNotPredicate(p NotPredicate) (Predicate, error) {
|
|
switch inner := p.Inner.(type) {
|
|
case AndPredicate: // De Morgan's law: !(A && B) == !A || !B
|
|
return OrPredicate{
|
|
Left: NotPredicate{Inner: inner.Left},
|
|
Right: NotPredicate{Inner: inner.Right},
|
|
}, nil
|
|
|
|
case OrPredicate: // De Morgan's law: !(A || B) == !A && !B
|
|
return AndPredicate{
|
|
Left: NotPredicate{Inner: inner.Left},
|
|
Right: NotPredicate{Inner: inner.Right},
|
|
}, nil
|
|
|
|
case NotPredicate: // De Morgan's law: !!A == A
|
|
return inner.Inner, nil
|
|
|
|
case FalsePredicate:
|
|
return TruePredicate{}, nil
|
|
|
|
case EqualPredicate: // De Morgan's law: !(A == B) == A != B == A < B || A > B
|
|
return OrPredicate{
|
|
Left: LessThanPredicate(inner),
|
|
Right: GreaterThanPredicate(inner),
|
|
}, nil
|
|
|
|
case GreaterThanPredicate: // De Morgan's law: !(A > B) == A <= B
|
|
return OrPredicate{
|
|
Left: EqualPredicate(inner),
|
|
Right: LessThanPredicate(inner),
|
|
}, nil
|
|
|
|
case LessThanPredicate: // De Morgan's law: !(A < B) == A >= B
|
|
return OrPredicate{
|
|
Left: EqualPredicate(inner),
|
|
Right: GreaterThanPredicate(inner),
|
|
}, nil
|
|
|
|
case InPredicate:
|
|
// TODO: can be supported when we introduce NotInPredicate.
|
|
return nil, fmt.Errorf("can't simplify InPredicate")
|
|
|
|
case FuncPredicate:
|
|
return nil, fmt.Errorf("can't simplify FuncPredicate")
|
|
|
|
default:
|
|
panic(fmt.Sprintf("unsupported predicate type %T", inner))
|
|
}
|
|
}
|
|
|
|
// buildColumnPredicateRanges returns a set of rowRanges that are valid based
|
|
// on whether EqualPredicate, InPredicate, GreaterThanPredicate, or LessThanPredicate may be
|
|
// true for each page in a column.
|
|
func (r *Reader) buildColumnPredicateRanges(ctx context.Context, c Column, p Predicate) (rowRanges, error) {
|
|
// Get the wrapped column so that the result of c.ListPages can be cached.
|
|
if idx, ok := r.origColumnLookup[c]; ok {
|
|
c = r.allColumns()[idx]
|
|
} else {
|
|
return nil, fmt.Errorf("column %v not found in Reader columns", c)
|
|
}
|
|
|
|
var ranges rowRanges
|
|
|
|
var (
|
|
pageStart int
|
|
lastPageSize int
|
|
)
|
|
|
|
for result := range c.ListPages(ctx) {
|
|
pageStart += lastPageSize
|
|
|
|
page, err := result.Value()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
pageInfo := page.PageDesc()
|
|
lastPageSize = pageInfo.RowCount
|
|
|
|
pageRange := rowRange{
|
|
Start: uint64(pageStart),
|
|
End: uint64(pageStart + pageInfo.RowCount - 1),
|
|
}
|
|
|
|
minValue, maxValue, err := readMinMax(pageInfo.Stats)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read page stats: %w", err)
|
|
} else if minValue.IsNil() || maxValue.IsNil() {
|
|
// No stats, so we add the whole range.
|
|
ranges.Add(pageRange)
|
|
continue
|
|
}
|
|
|
|
var include bool
|
|
|
|
switch p := p.(type) {
|
|
case EqualPredicate: // EqualPredicate may be true if p.Value is inside the range of the page.
|
|
isEmpty := p.Value.Type() == datasetmd.PHYSICAL_TYPE_BINARY && p.Value.IsZero()
|
|
include = isEmpty || (CompareValues(&p.Value, &minValue) >= 0 && CompareValues(&p.Value, &maxValue) <= 0)
|
|
case GreaterThanPredicate: // GreaterThanPredicate may be true if maxValue of a page is greater than p.Value
|
|
include = CompareValues(&maxValue, &p.Value) > 0
|
|
case LessThanPredicate: // LessThanPredicate may be true if minValue of a page is less than p.Value
|
|
include = CompareValues(&minValue, &p.Value) < 0
|
|
case InPredicate:
|
|
// Check if any value falls within the page's range
|
|
for v := range p.Values.Iter() {
|
|
if CompareValues(&v, &minValue) >= 0 && CompareValues(&v, &maxValue) <= 0 {
|
|
include = true
|
|
break
|
|
}
|
|
}
|
|
default:
|
|
panic(fmt.Sprintf("unsupported predicate type %T", p))
|
|
}
|
|
|
|
if include {
|
|
ranges.Add(pageRange)
|
|
}
|
|
}
|
|
|
|
return ranges, nil
|
|
}
|
|
|
|
// readMinMax reads the minimum and maximum values from the provided
|
|
// statistics. If either minValue or maxValue is NULL, the value is not present
|
|
// in the statistics.
|
|
func readMinMax(stats *datasetmd.Statistics) (minValue Value, maxValue Value, err error) {
|
|
// TODO(rfratto): We should have a dataset-specific Statistics type that
|
|
// already has the min and max value decoded, and make it the decoders just
|
|
// to read these.
|
|
|
|
if stats == nil {
|
|
return
|
|
}
|
|
|
|
if err := minValue.UnmarshalBinary(stats.MinValue); err != nil {
|
|
return Value{}, Value{}, fmt.Errorf("failed to unmarshal min value: %w", err)
|
|
} else if err := maxValue.UnmarshalBinary(stats.MaxValue); err != nil {
|
|
return Value{}, Value{}, fmt.Errorf("failed to unmarshal max value: %w", err)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (r *Reader) predicateColumns(p Predicate, keep func(c Column) bool) ([]Column, []int, error) {
|
|
columns := make(map[Column]struct{})
|
|
|
|
WalkPredicate(p, func(p Predicate) bool {
|
|
switch p := p.(type) {
|
|
case EqualPredicate:
|
|
columns[p.Column] = struct{}{}
|
|
case InPredicate:
|
|
columns[p.Column] = struct{}{}
|
|
case GreaterThanPredicate:
|
|
columns[p.Column] = struct{}{}
|
|
case LessThanPredicate:
|
|
columns[p.Column] = struct{}{}
|
|
case FuncPredicate:
|
|
columns[p.Column] = struct{}{}
|
|
case AndPredicate, OrPredicate, NotPredicate, TruePredicate, FalsePredicate, nil:
|
|
// No columns to process.
|
|
default:
|
|
panic(fmt.Sprintf("predicateColumns: unsupported predicate type %T", p))
|
|
}
|
|
return true
|
|
})
|
|
|
|
ret := make([]Column, 0, len(columns))
|
|
idxs := make([]int, 0, len(columns))
|
|
for c := range columns {
|
|
idx, ok := r.origColumnLookup[c]
|
|
if !ok {
|
|
panic(fmt.Errorf("predicateColumns: column %v not found in Reader columns", c))
|
|
}
|
|
|
|
c := r.allColumns()[idx]
|
|
if !keep(c) {
|
|
continue
|
|
}
|
|
|
|
idxs = append(idxs, idx)
|
|
ret = append(ret, c)
|
|
}
|
|
|
|
return ret, idxs, nil
|
|
}
|
|
|