chore(dataobj): apply predicates sequentially (#17283)

pull/17522/head
Ashwanth 1 year ago committed by GitHub
parent 3c72560c71
commit 5961923836
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 305
      pkg/dataobj/internal/dataset/reader.go
  2. 430
      pkg/dataobj/internal/dataset/reader_test.go
  3. 16
      pkg/dataobj/logs_reader.go
  4. 11
      pkg/dataobj/streams_reader.go

@ -24,11 +24,12 @@ type ReaderOptions struct {
// are considered non-predicate columns.
Columns []Column
// Predicate filters the data returned by a Reader. Predicate is optional; if
// Predicates filter the data returned by a Reader. Predicates are optional; if
// nil, all rows from Columns are returned.
//
// Expressions in Predicate may only reference columns in Columns.
Predicate Predicate
// Holds a list of predicates that can be sequentially applied to the dataset.
Predicates []Predicate
// TargetCacheSize configures the amount of memory to target for caching
// pages in memory. The cache may exceed this size if the combined size of
@ -125,40 +126,37 @@ func (r *Reader) Read(ctx context.Context, s []Row) (n int, err error) {
}
r.dl.SetReadRange(readRange)
count, err := r.inner.ReadColumns(ctx, r.dl.PrimaryColumns(), s[:readSize])
if err != nil && !errors.Is(err, io.EOF) {
return n, err
} else if count == 0 && errors.Is(err, io.EOF) {
return 0, io.EOF
}
var (
rowsRead int // tracks max rows accessed to move the [r.row] cursor
passCount int // tracks how many rows passed the predicate
statistics = stats.FromContext(ctx)
)
var primaryColumnBytes int64
var primaryColumnPostFilterBytes int64
var totalBytesAfterFill int64
var passCount int // passCount tracks how many rows pass the predicate.
// If there are no predicates, read all columns in the dataset
if len(r.opts.Predicates) == 0 {
count, err := r.inner.ReadColumns(ctx, r.dl.PrimaryColumns(), s[:readSize])
if err != nil && !errors.Is(err, io.EOF) {
return n, err
} else if count == 0 && errors.Is(err, io.EOF) {
return 0, io.EOF
}
if r.opts.Predicate == nil {
// If there's no predicate, all rows are valid.
rowsRead = count
passCount = count
statistics.AddPrePredicateDecompressedRows(int64(rowsRead))
var primaryColumnBytes int64
for i := range count {
primaryColumnBytes += s[i].Size()
}
statistics.AddPrePredicateDecompressedBytes(primaryColumnBytes)
} else {
for i := range count {
size := s[i].SizeOfColumns(r.primaryColumnIndexes)
primaryColumnBytes += size
if !checkPredicate(r.opts.Predicate, r.origColumnLookup, s[i]) {
continue
}
// We move s[i] to s[passCount] by *swapping* the rows. Copying would
// result in the Row.Values slice existing in two places in the buffer,
// which causes memory corruption when filling in rows.
s[passCount], s[i] = s[i], s[passCount]
passCount++
primaryColumnPostFilterBytes += size
rowsRead, passCount, err = r.readAndFilterPrimaryColumns(ctx, readSize, s[:readSize], statistics)
if err != nil {
return n, err
}
}
statistics.AddPostPredicateRows(int64(passCount))
if secondary := r.dl.SecondaryColumns(); len(secondary) > 0 && passCount > 0 {
// Mask out any ranges that aren't in s[:passCount], so that filling in
@ -178,28 +176,103 @@ func (r *Reader) Read(ctx context.Context, s []Row) (n int, err error) {
} else if count != passCount {
return n, fmt.Errorf("failed to fill rows: expected %d, got %d", n, count)
}
var totalBytesFilled int64
for i := range count {
totalBytesAfterFill += s[i].Size()
totalBytesFilled += s[i].Size() - s[i].SizeOfColumns(r.primaryColumnIndexes)
}
statistics.AddPostPredicateDecompressedBytes(totalBytesFilled)
}
n += passCount
statistics := stats.FromContext(ctx)
statistics.AddPrePredicateDecompressedRows(int64(count))
statistics.AddPrePredicateDecompressedBytes(primaryColumnBytes)
statistics.AddPostPredicateRows(int64(passCount))
// Fill is not called when there is no predicate
if totalBytesAfterFill > 0 {
statistics.AddPostPredicateDecompressedBytes(totalBytesAfterFill - primaryColumnPostFilterBytes)
}
// We only advance r.row after we successfully read and filled rows. This
// allows the caller to retry reading rows if a sporadic error occurs.
r.row += int64(count)
r.row += int64(rowsRead)
return n, nil
}
// readAndFilterPrimaryColumns reads the primary columns from the dataset
// and filters the rows by sequentially applying the predicates.
//
// For each predicate evaluation, only the required columns are loaded.
// Rows are filtered at each step with subsequent predicates only having to fill
// the columns on the reduced row range.
//
// It returns the max rows read, rows that passed all the predicates, and any error
func (r *Reader) readAndFilterPrimaryColumns(ctx context.Context, readSize int, s []Row, stats *stats.Context) (int, int, error) {
var (
rowsRead int // tracks max rows accessed to move the [r.row] cursor
passCount int // number of rows that passed the predicate
primaryColumnBytes int64
filledColumns = make(map[Column]struct{}, len(r.primaryColumnIndexes))
)
// sequentially apply the predicates.
for i, p := range r.opts.Predicates {
columns, idxs, err := r.predicateColumns(p, func(c Column) bool {
// keep only columns that haven't been filled yet.
_, ok := filledColumns[c]
return !ok
})
if err != nil {
return rowsRead, 0, err
}
var count int
// read the requested number of rows for the first predicate.
if i == 0 {
count, err = r.inner.ReadColumns(ctx, columns, s[:readSize])
if err != nil && !errors.Is(err, io.EOF) {
return 0, 0, err
} else if count == 0 && errors.Is(err, io.EOF) {
return 0, 0, io.EOF
}
rowsRead = count
} else if len(columns) > 0 {
count, err = r.inner.Fill(ctx, columns, s[:readSize])
if err != nil && !errors.Is(err, io.EOF) {
return rowsRead, 0, err
} else if count != readSize {
return rowsRead, 0, fmt.Errorf("failed to fill rows: expected %d, got %d", len(s), count)
}
} else {
count = readSize // required columns are already filled
}
passCount = 0
for i := range count {
size := s[i].SizeOfColumns(idxs)
primaryColumnBytes += size
if !checkPredicate(p, r.origColumnLookup, s[i]) {
continue
}
// We move s[i] to s[passCount] by *swapping* the rows. Copying would
// result in the Row.Values slice existing in two places in the buffer,
// which causes memory corruption when filling in rows.
s[passCount], s[i] = s[i], s[passCount]
passCount++
}
if passCount == 0 {
// No rows passed the predicate, so we can stop early.
break
}
for _, c := range columns {
filledColumns[c] = struct{}{}
}
readSize = passCount
}
stats.AddPrePredicateDecompressedRows(int64(rowsRead))
stats.AddPrePredicateDecompressedBytes(primaryColumnBytes)
return rowsRead, passCount, nil
}
// alignRow returns r.row if it is a valid row in ranges, or adjusts r.row to
// the next valid row in ranges.
func (r *Reader) alignRow() (uint64, error) {
@ -392,30 +465,32 @@ func (r *Reader) validatePredicate() error {
var err error
WalkPredicate(r.opts.Predicate, func(p Predicate) bool {
if err != nil {
return false
}
for _, pp := range r.opts.Predicates {
WalkPredicate(pp, func(p Predicate) bool {
if err != nil {
return false
}
switch p := p.(type) {
case EqualPredicate:
err = process(p.Column)
case InPredicate:
err = process(p.Column)
case GreaterThanPredicate:
err = process(p.Column)
case LessThanPredicate:
err = process(p.Column)
case FuncPredicate:
err = process(p.Column)
case AndPredicate, OrPredicate, NotPredicate, FalsePredicate, nil:
// No columns to process.
default:
panic(fmt.Sprintf("dataset.Reader.validatePredicate: unsupported predicate type %T", p))
}
switch p := p.(type) {
case EqualPredicate:
err = process(p.Column)
case InPredicate:
err = process(p.Column)
case GreaterThanPredicate:
err = process(p.Column)
case LessThanPredicate:
err = process(p.Column)
case FuncPredicate:
err = process(p.Column)
case AndPredicate, OrPredicate, NotPredicate, FalsePredicate, nil:
// No columns to process.
default:
panic(fmt.Sprintf("dataset.Reader.validatePredicate: unsupported predicate type %T", p))
}
return true // Continue walking the Predicate.
})
return true // Continue walking the Predicate.
})
}
return err
}
@ -445,10 +520,28 @@ func (r *Reader) initDownloader(ctx context.Context) error {
}
}
ranges, err := r.buildPredicateRanges(ctx, r.opts.Predicate)
if err != nil {
return err
var ranges rowRanges
var err error
if len(r.opts.Predicates) == 0 { // no predicates, build full range
ranges, err = r.buildPredicateRanges(ctx, nil)
if err != nil {
return err
}
} else {
for _, p := range r.opts.Predicates {
rr, err := r.buildPredicateRanges(ctx, p)
if err != nil {
return err
}
if ranges == nil {
ranges = rr
} else {
ranges = intersectRanges(nil, ranges, rr)
}
}
}
r.dl.SetDatasetRanges(ranges)
r.ranges = ranges
@ -474,34 +567,36 @@ func (r *Reader) fillPrimaryMask(mask *bitmask.Mask) {
}
// If there's no predicate, all columns are primary.
if r.opts.Predicate == nil {
if len(r.opts.Predicates) == 0 {
for _, c := range r.opts.Columns {
process(c)
}
return
}
// If there is a predicate, primary columns are those used in the predicate.
WalkPredicate(r.opts.Predicate, func(p Predicate) bool {
switch p := p.(type) {
case EqualPredicate:
process(p.Column)
case InPredicate:
process(p.Column)
case GreaterThanPredicate:
process(p.Column)
case LessThanPredicate:
process(p.Column)
case FuncPredicate:
process(p.Column)
case AndPredicate, OrPredicate, NotPredicate, FalsePredicate, nil:
// No columns to process.
default:
panic(fmt.Sprintf("dataset.Reader.fillPrimaryMask: unsupported predicate type %T", p))
}
for _, pp := range r.opts.Predicates {
// If there is a predicate, primary columns are those used in the predicate.
WalkPredicate(pp, func(p Predicate) bool {
switch p := p.(type) {
case EqualPredicate:
process(p.Column)
case InPredicate:
process(p.Column)
case GreaterThanPredicate:
process(p.Column)
case LessThanPredicate:
process(p.Column)
case FuncPredicate:
process(p.Column)
case AndPredicate, OrPredicate, NotPredicate, FalsePredicate, nil:
// No columns to process.
default:
panic(fmt.Sprintf("dataset.Reader.fillPrimaryMask: unsupported predicate type %T", p))
}
return true // Continue walking the Predicate.
})
return true // Continue walking the Predicate.
})
}
}
// buildPredicateRanges returns a set of rowRanges that are valid to read based
@ -738,3 +833,45 @@ func readMinMax(stats *datasetmd.Statistics) (minValue Value, maxValue Value, er
}
return
}
func (r *Reader) predicateColumns(p Predicate, keep func(c Column) bool) ([]Column, []int, error) {
columns := make(map[Column]struct{})
WalkPredicate(p, func(p Predicate) bool {
switch p := p.(type) {
case EqualPredicate:
columns[p.Column] = struct{}{}
case InPredicate:
columns[p.Column] = struct{}{}
case GreaterThanPredicate:
columns[p.Column] = struct{}{}
case LessThanPredicate:
columns[p.Column] = struct{}{}
case FuncPredicate:
columns[p.Column] = struct{}{}
case AndPredicate, OrPredicate, NotPredicate, FalsePredicate, nil:
// No columns to process.
default:
panic(fmt.Sprintf("predicateColumns: unsupported predicate type %T", p))
}
return true
})
ret := make([]Column, 0, len(columns))
idxs := make([]int, 0, len(columns))
for c := range columns {
if !keep(c) {
continue
}
idx, ok := r.origColumnLookup[c]
if !ok {
panic(fmt.Errorf("predicateColumns: column %v not found in Reader columns", c))
}
idxs = append(idxs, idx)
ret = append(ret, r.dl.AllColumns()[idx])
}
return ret, idxs, nil
}

@ -3,10 +3,15 @@ package dataset
import (
"context"
"errors"
"fmt"
"io"
"iter"
"math/rand"
"slices"
"strconv"
"testing"
"github.com/dustin/go-humanize"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
@ -31,9 +36,11 @@ func Test_Reader_ReadWithPredicate(t *testing.T) {
r := NewReader(ReaderOptions{
Dataset: dset,
Columns: columns,
Predicate: GreaterThanPredicate{
Column: columns[3], // birth_year column
Value: Int64Value(1985),
Predicates: []Predicate{
GreaterThanPredicate{
Column: columns[3], // birth_year column
Value: Int64Value(1985),
},
},
})
defer r.Close()
@ -65,9 +72,11 @@ func Test_Reader_ReadWithPageFiltering(t *testing.T) {
//
// TODO(rfratto): make it easier to prove that a predicate includes a value
// which is out of range of at least one page.
Predicate: EqualPredicate{
Column: columns[0], // first_name column
Value: ByteArrayValue([]byte("Henry")),
Predicates: []Predicate{
EqualPredicate{
Column: columns[0], // first_name column
Value: ByteArrayValue([]byte("Henry")),
},
},
})
defer r.Close()
@ -92,9 +101,11 @@ func Test_Reader_ReadWithPredicate_NoSecondary(t *testing.T) {
r := NewReader(ReaderOptions{
Dataset: dset,
Columns: []Column{columns[3]},
Predicate: GreaterThanPredicate{
Column: columns[3], // birth_year column
Value: Int64Value(1985),
Predicates: []Predicate{
GreaterThanPredicate{
Column: columns[3], // birth_year column
Value: Int64Value(1985),
},
},
})
defer r.Close()
@ -141,10 +152,10 @@ func Test_Reader_Stats(t *testing.T) {
r := NewReader(ReaderOptions{
Dataset: dset,
Columns: columns,
Predicate: GreaterThanPredicate{
Predicates: []Predicate{GreaterThanPredicate{
Column: columns[3], // birth_year column
Value: Int64Value(1985),
},
}},
})
defer r.Close()
@ -361,9 +372,9 @@ func Test_BuildPredicateRanges(t *testing.T) {
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
r := NewReader(ReaderOptions{
Dataset: ds,
Columns: cols,
Predicate: tc.predicate,
Dataset: ds,
Columns: cols,
Predicates: []Predicate{tc.predicate},
})
defer r.Close()
@ -462,3 +473,394 @@ func encodeInt64Value(t *testing.T, v int64) []byte {
require.NoError(t, err)
return data
}
func BenchmarkReader(b *testing.B) {
generator := DatasetGenerator{
RowCount: 1_000_000,
PageSizeHint: 2 * 1024 * 1024, // 2MB
Columns: []generatorColumnConfig{
{
Name: "stream",
ValueType: datasetmd.VALUE_TYPE_INT64,
Encoding: datasetmd.ENCODING_TYPE_DELTA,
Compression: datasetmd.COMPRESSION_TYPE_NONE,
CardinalityTarget: 1000,
},
{
Name: "timestamp",
ValueType: datasetmd.VALUE_TYPE_INT64,
Encoding: datasetmd.ENCODING_TYPE_DELTA,
Compression: datasetmd.COMPRESSION_TYPE_NONE,
CardinalityTarget: 100_000,
},
{
Name: "log",
ValueType: datasetmd.VALUE_TYPE_BYTE_ARRAY,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
Compression: datasetmd.COMPRESSION_TYPE_NONE,
AvgSize: 1024,
CardinalityTarget: 100_000,
},
},
}
readPatterns := []struct {
name string
batchSize int
}{
{
name: "batch=100",
batchSize: 100,
},
{
name: "batch=10k",
batchSize: 10_000,
},
}
// Generate dataset once per case
ds, cols := generator.Build(b, rand.Int63())
opts := ReaderOptions{
Dataset: ds,
Columns: cols,
}
for _, rp := range readPatterns {
b.Run(rp.name, func(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
batch := make([]Row, rp.batchSize)
for b.Loop() {
reader := NewReader(opts)
var rowsRead int
for {
n, err := reader.Read(context.Background(), batch)
if err == io.EOF {
break
}
if err != nil {
b.Fatal(err)
}
rowsRead += n
}
reader.Close()
b.ReportMetric(float64(rowsRead)/float64(b.N), "rows/op")
}
})
}
}
func BenchmarkPredicateExecution(b *testing.B) {
// Generate dataset with two columns, one with high cardinality and one with low cardinality
// higher the cardinality, more selective the predicate
generator := DatasetGenerator{
RowCount: 1_000_000,
// set large page size to not realise benefits from page pruning since the goal
// of this benchmark is to measure the gains from sequential predicate evaluation alone.
PageSizeHint: 100 * 1024 * 1024,
Columns: []generatorColumnConfig{
{
Name: "more_selective",
ValueType: datasetmd.VALUE_TYPE_INT64,
Encoding: datasetmd.ENCODING_TYPE_DELTA,
Compression: datasetmd.COMPRESSION_TYPE_NONE,
CardinalityTarget: 500_000,
},
{
Name: "less_selective",
ValueType: datasetmd.VALUE_TYPE_INT64,
Encoding: datasetmd.ENCODING_TYPE_DELTA,
Compression: datasetmd.COMPRESSION_TYPE_NONE,
CardinalityTarget: 100,
},
},
}
ds, cols := generator.Build(b, rand.Int63())
var col1Value, col2Value int64
idx := rand.Intn(generator.RowCount) // Randomly select a row index to use for the predicate values
currentPos := 0
batch := make([]Row, 1000)
// read the dataset once to pick a random row for predicate generation
reader := NewReader(ReaderOptions{
Dataset: ds,
Columns: cols,
})
for {
n, err := reader.Read(context.Background(), batch)
if err == io.EOF {
break
}
if err != nil {
b.Fatal(err)
}
// Check if our target index is in this batch
if idx >= currentPos && idx < currentPos+n {
selectedRow := batch[idx-currentPos]
col1Value = selectedRow.Values[0].Int64()
col2Value = selectedRow.Values[1].Int64()
break
}
currentPos += n
}
reader.Close()
predicatePatterns := []struct {
name string
predicates []Predicate
}{
{
name: "combined",
predicates: []Predicate{
AndPredicate{
Left: EqualPredicate{
Column: cols[0],
Value: Int64Value(col1Value),
},
Right: EqualPredicate{
Column: cols[1],
Value: Int64Value(col2Value),
},
},
},
},
{
name: "high",
predicates: []Predicate{
EqualPredicate{
Column: cols[0],
Value: Int64Value(col1Value),
},
EqualPredicate{
Column: cols[1],
Value: Int64Value(col2Value),
},
},
},
{
name: "low",
predicates: []Predicate{
EqualPredicate{
Column: cols[1],
Value: Int64Value(col2Value),
},
EqualPredicate{
Column: cols[0],
Value: Int64Value(col1Value),
},
},
},
}
for _, pp := range predicatePatterns {
b.Run("selectivity="+pp.name, func(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for b.Loop() {
reader := NewReader(ReaderOptions{
Dataset: ds,
Columns: cols,
Predicates: pp.predicates,
})
batch := make([]Row, 10000)
for {
_, err := reader.Read(context.Background(), batch)
if err == io.EOF {
break
}
if err != nil {
b.Fatal(err)
}
}
reader.Close()
}
})
}
}
type generatorColumnConfig struct {
Name string
ValueType datasetmd.ValueType
Encoding datasetmd.EncodingType
Compression datasetmd.CompressionType
AvgSize int64 // Average size in bytes for variable-length types
CardinalityTarget int64 // Target number of unique values
SparsityRate float64 // 0.0-1.0, where 1.0 means all values are null
}
func columnValues(rng *rand.Rand, cfg generatorColumnConfig) iter.Seq[Value] {
switch cfg.ValueType {
case datasetmd.VALUE_TYPE_INT64, datasetmd.VALUE_TYPE_UINT64:
return numberValues(rng, cfg)
case datasetmd.VALUE_TYPE_BYTE_ARRAY:
return stringValues(rng, cfg)
default:
panic(fmt.Sprintf("unsupported type for generation: %v", cfg.ValueType))
}
}
func stringValues(rng *rand.Rand, cfg generatorColumnConfig) iter.Seq[Value] {
// Pre-generate the set of unique values we'll cycle through
uniqueValues := make([]Value, cfg.CardinalityTarget)
for i := range int(cfg.CardinalityTarget) {
// Generate size between 0.5x and 1.5x of average size
size := int(float64(cfg.AvgSize) * (0.5 + rng.Float64()))
// Convert number to string and create padded result
str := make([]byte, size)
num := []byte(strconv.Itoa(i))
copy(str, num)
for j := len(num); j < size; j++ {
str[j] = 'x'
}
uniqueValues[i] = ByteArrayValue(str)
}
return func(yield func(Value) bool) {
for {
if !yield(uniqueValues[rng.Intn(len(uniqueValues))]) {
return
}
}
}
}
func numberValues(rng *rand.Rand, cfg generatorColumnConfig) iter.Seq[Value] {
return func(yield func(Value) bool) {
for {
v := rng.Int63n(cfg.CardinalityTarget)
switch cfg.ValueType {
case datasetmd.VALUE_TYPE_INT64:
if !yield(Int64Value(v)) {
return
}
case datasetmd.VALUE_TYPE_UINT64:
if !yield(Uint64Value(uint64(v))) {
return
}
}
}
}
}
type DatasetGenerator struct {
RowCount int
PageSizeHint int
Columns []generatorColumnConfig
}
func (g *DatasetGenerator) Build(t testing.TB, seed int64) (Dataset, []Column) {
t.Helper()
memColumns := make([]*MemColumn, 0, len(g.Columns))
rng := rand.New(rand.NewSource(seed))
for _, colCfg := range g.Columns {
next, stop := iter.Pull(columnValues(rng, colCfg))
defer stop()
opts := BuilderOptions{
PageSizeHint: g.PageSizeHint,
Value: colCfg.ValueType,
Encoding: colCfg.Encoding,
Compression: colCfg.Compression,
Statistics: StatisticsOptions{
StoreCardinalityStats: true,
},
}
if colCfg.ValueType == datasetmd.VALUE_TYPE_INT64 || colCfg.ValueType == datasetmd.VALUE_TYPE_UINT64 {
opts.Statistics.StoreRangeStats = true
}
// Create a builder for this column
builder, err := NewColumnBuilder(colCfg.Name, opts)
require.NoError(t, err)
// Add values to the builder
for i := range g.RowCount {
if rng.Float64() < colCfg.SparsityRate {
continue
}
val, ok := next()
require.True(t, ok, "generator should yield values")
require.NoError(t, builder.Append(i, val))
}
col, err := builder.Flush()
require.NoError(t, err)
memColumns = append(memColumns, col)
}
ds := FromMemory(memColumns)
cols, err := result.Collect(ds.ListColumns(context.Background()))
require.NoError(t, err)
return ds, cols
}
// Test_DatasetGenerator is a helper to debug the dataset generation
func Test_DatasetGenerator(t *testing.T) {
g := DatasetGenerator{
RowCount: 1_000_000,
PageSizeHint: 2 * 1024 * 1024, // 2MB
Columns: []generatorColumnConfig{
{
Name: "timestamp",
ValueType: datasetmd.VALUE_TYPE_INT64,
Encoding: datasetmd.ENCODING_TYPE_DELTA,
Compression: datasetmd.COMPRESSION_TYPE_NONE,
CardinalityTarget: 100_000,
SparsityRate: 0.0,
},
{
Name: "label",
ValueType: datasetmd.VALUE_TYPE_BYTE_ARRAY,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
Compression: datasetmd.COMPRESSION_TYPE_NONE,
AvgSize: 32,
CardinalityTarget: 100,
SparsityRate: 0.3,
},
},
}
_, cols := g.Build(t, rand.Int63())
require.Equal(t, 2, len(cols))
require.Equal(t, g.RowCount, cols[0].ColumnInfo().RowsCount)
// TODO: Row count is < expected. Must be a result of null values at the end.
// Remove this comment once the issue is fixed.
// require.Equal(t, g.RowCount, cols[1].ColumnInfo().RowsCount)
require.NotNil(t, cols[0].ColumnInfo().Statistics.CardinalityCount)
require.NotNil(t, cols[1].ColumnInfo().Statistics.CardinalityCount)
t.Logf("timestamp column cardinality: %d", cols[0].ColumnInfo().Statistics.CardinalityCount)
t.Logf("label column cardinality: %d", cols[1].ColumnInfo().Statistics.CardinalityCount)
require.NotNil(t, cols[0].ColumnInfo().Statistics.MinValue)
require.NotNil(t, cols[0].ColumnInfo().Statistics.MaxValue)
var minValue, maxValue Value
require.NoError(t, minValue.UnmarshalBinary(cols[0].ColumnInfo().Statistics.MinValue))
require.NoError(t, maxValue.UnmarshalBinary(cols[0].ColumnInfo().Statistics.MaxValue))
t.Logf("timestamp column min: %d", minValue.Int64())
t.Logf("timestamp column max: %d", maxValue.Int64())
t.Logf("timestamp column size: %s", humanize.Bytes(uint64(cols[0].ColumnInfo().UncompressedSize)))
t.Logf("label column size: %s", humanize.Bytes(uint64(cols[1].ColumnInfo().UncompressedSize)))
}

@ -177,18 +177,20 @@ func (r *LogsReader) initReader(ctx context.Context) error {
// r.predicate doesn't contain mappings of stream IDs; we need to build
// that as a separate predicate and AND them together.
predicate := streamIDPredicate(maps.Keys(r.matchIDs), columns, columnDescs)
var predicates []dataset.Predicate
if p := streamIDPredicate(maps.Keys(r.matchIDs), columns, columnDescs); p != nil {
predicates = append(predicates, p)
}
if r.predicate != nil {
predicate = dataset.AndPredicate{
Left: predicate,
Right: translateLogsPredicate(r.predicate, columns, columnDescs),
if p := translateLogsPredicate(r.predicate, columns, columnDescs); p != nil {
predicates = append(predicates, p)
}
}
readerOpts := dataset.ReaderOptions{
Dataset: dset,
Columns: columns,
Predicate: predicate,
Dataset: dset,
Columns: columns,
Predicates: predicates,
TargetCacheSize: 16_000_000, // Permit up to 16MB of cache pages.
}

@ -144,10 +144,15 @@ func (r *StreamsReader) initReader(ctx context.Context) error {
return fmt.Errorf("reading columns: %w", err)
}
var predicates []dataset.Predicate
if p := translateStreamsPredicate(r.predicate, columns, columnDescs); p != nil {
predicates = append(predicates, p)
}
readerOpts := dataset.ReaderOptions{
Dataset: dset,
Columns: columns,
Predicate: translateStreamsPredicate(r.predicate, columns, columnDescs),
Dataset: dset,
Columns: columns,
Predicates: predicates,
TargetCacheSize: 16_000_000, // Permit up to 16MB of cache pages.
}

Loading…
Cancel
Save