From 5961923836c3febe989dd1bd868634e2a04bfba4 Mon Sep 17 00:00:00 2001 From: Ashwanth Date: Wed, 30 Apr 2025 11:36:18 +0530 Subject: [PATCH] chore(dataobj): apply predicates sequentially (#17283) --- pkg/dataobj/internal/dataset/reader.go | 305 ++++++++++---- pkg/dataobj/internal/dataset/reader_test.go | 430 +++++++++++++++++++- pkg/dataobj/logs_reader.go | 16 +- pkg/dataobj/streams_reader.go | 11 +- 4 files changed, 654 insertions(+), 108 deletions(-) diff --git a/pkg/dataobj/internal/dataset/reader.go b/pkg/dataobj/internal/dataset/reader.go index 18d69bbfc3..abe6ab2ac4 100644 --- a/pkg/dataobj/internal/dataset/reader.go +++ b/pkg/dataobj/internal/dataset/reader.go @@ -24,11 +24,12 @@ type ReaderOptions struct { // are considered non-predicate columns. Columns []Column - // Predicate filters the data returned by a Reader. Predicate is optional; if + // Predicates filter the data returned by a Reader. Predicates are optional; if // nil, all rows from Columns are returned. // // Expressions in Predicate may only reference columns in Columns. - Predicate Predicate + // Holds a list of predicates that can be sequentially applied to the dataset. + Predicates []Predicate // TargetCacheSize configures the amount of memory to target for caching // pages in memory. The cache may exceed this size if the combined size of @@ -125,40 +126,37 @@ func (r *Reader) Read(ctx context.Context, s []Row) (n int, err error) { } r.dl.SetReadRange(readRange) - count, err := r.inner.ReadColumns(ctx, r.dl.PrimaryColumns(), s[:readSize]) - if err != nil && !errors.Is(err, io.EOF) { - return n, err - } else if count == 0 && errors.Is(err, io.EOF) { - return 0, io.EOF - } + var ( + rowsRead int // tracks max rows accessed to move the [r.row] cursor + passCount int // tracks how many rows passed the predicate + statistics = stats.FromContext(ctx) + ) - var primaryColumnBytes int64 - var primaryColumnPostFilterBytes int64 - var totalBytesAfterFill int64 - var passCount int // passCount tracks how many rows pass the predicate. + // If there are no predicates, read all columns in the dataset + if len(r.opts.Predicates) == 0 { + count, err := r.inner.ReadColumns(ctx, r.dl.PrimaryColumns(), s[:readSize]) + if err != nil && !errors.Is(err, io.EOF) { + return n, err + } else if count == 0 && errors.Is(err, io.EOF) { + return 0, io.EOF + } - if r.opts.Predicate == nil { - // If there's no predicate, all rows are valid. + rowsRead = count passCount = count + + statistics.AddPrePredicateDecompressedRows(int64(rowsRead)) + var primaryColumnBytes int64 for i := range count { primaryColumnBytes += s[i].Size() } + statistics.AddPrePredicateDecompressedBytes(primaryColumnBytes) } else { - for i := range count { - size := s[i].SizeOfColumns(r.primaryColumnIndexes) - - primaryColumnBytes += size - if !checkPredicate(r.opts.Predicate, r.origColumnLookup, s[i]) { - continue - } - // We move s[i] to s[passCount] by *swapping* the rows. Copying would - // result in the Row.Values slice existing in two places in the buffer, - // which causes memory corruption when filling in rows. - s[passCount], s[i] = s[i], s[passCount] - passCount++ - primaryColumnPostFilterBytes += size + rowsRead, passCount, err = r.readAndFilterPrimaryColumns(ctx, readSize, s[:readSize], statistics) + if err != nil { + return n, err } } + statistics.AddPostPredicateRows(int64(passCount)) if secondary := r.dl.SecondaryColumns(); len(secondary) > 0 && passCount > 0 { // Mask out any ranges that aren't in s[:passCount], so that filling in @@ -178,28 +176,103 @@ func (r *Reader) Read(ctx context.Context, s []Row) (n int, err error) { } else if count != passCount { return n, fmt.Errorf("failed to fill rows: expected %d, got %d", n, count) } + + var totalBytesFilled int64 for i := range count { - totalBytesAfterFill += s[i].Size() + totalBytesFilled += s[i].Size() - s[i].SizeOfColumns(r.primaryColumnIndexes) } + statistics.AddPostPredicateDecompressedBytes(totalBytesFilled) } n += passCount - statistics := stats.FromContext(ctx) - statistics.AddPrePredicateDecompressedRows(int64(count)) - statistics.AddPrePredicateDecompressedBytes(primaryColumnBytes) - statistics.AddPostPredicateRows(int64(passCount)) - // Fill is not called when there is no predicate - if totalBytesAfterFill > 0 { - statistics.AddPostPredicateDecompressedBytes(totalBytesAfterFill - primaryColumnPostFilterBytes) - } - // We only advance r.row after we successfully read and filled rows. This // allows the caller to retry reading rows if a sporadic error occurs. - r.row += int64(count) + r.row += int64(rowsRead) return n, nil } +// readAndFilterPrimaryColumns reads the primary columns from the dataset +// and filters the rows by sequentially applying the predicates. +// +// For each predicate evaluation, only the required columns are loaded. +// Rows are filtered at each step with subsequent predicates only having to fill +// the columns on the reduced row range. +// +// It returns the max rows read, rows that passed all the predicates, and any error +func (r *Reader) readAndFilterPrimaryColumns(ctx context.Context, readSize int, s []Row, stats *stats.Context) (int, int, error) { + var ( + rowsRead int // tracks max rows accessed to move the [r.row] cursor + passCount int // number of rows that passed the predicate + primaryColumnBytes int64 + filledColumns = make(map[Column]struct{}, len(r.primaryColumnIndexes)) + ) + + // sequentially apply the predicates. + for i, p := range r.opts.Predicates { + columns, idxs, err := r.predicateColumns(p, func(c Column) bool { + // keep only columns that haven't been filled yet. + _, ok := filledColumns[c] + return !ok + }) + if err != nil { + return rowsRead, 0, err + } + + var count int + // read the requested number of rows for the first predicate. + if i == 0 { + count, err = r.inner.ReadColumns(ctx, columns, s[:readSize]) + if err != nil && !errors.Is(err, io.EOF) { + return 0, 0, err + } else if count == 0 && errors.Is(err, io.EOF) { + return 0, 0, io.EOF + } + + rowsRead = count + } else if len(columns) > 0 { + count, err = r.inner.Fill(ctx, columns, s[:readSize]) + if err != nil && !errors.Is(err, io.EOF) { + return rowsRead, 0, err + } else if count != readSize { + return rowsRead, 0, fmt.Errorf("failed to fill rows: expected %d, got %d", len(s), count) + } + } else { + count = readSize // required columns are already filled + } + + passCount = 0 + for i := range count { + size := s[i].SizeOfColumns(idxs) + primaryColumnBytes += size + + if !checkPredicate(p, r.origColumnLookup, s[i]) { + continue + } + // We move s[i] to s[passCount] by *swapping* the rows. Copying would + // result in the Row.Values slice existing in two places in the buffer, + // which causes memory corruption when filling in rows. + s[passCount], s[i] = s[i], s[passCount] + passCount++ + } + + if passCount == 0 { + // No rows passed the predicate, so we can stop early. + break + } + + for _, c := range columns { + filledColumns[c] = struct{}{} + } + + readSize = passCount + } + + stats.AddPrePredicateDecompressedRows(int64(rowsRead)) + stats.AddPrePredicateDecompressedBytes(primaryColumnBytes) + return rowsRead, passCount, nil +} + // alignRow returns r.row if it is a valid row in ranges, or adjusts r.row to // the next valid row in ranges. func (r *Reader) alignRow() (uint64, error) { @@ -392,30 +465,32 @@ func (r *Reader) validatePredicate() error { var err error - WalkPredicate(r.opts.Predicate, func(p Predicate) bool { - if err != nil { - return false - } + for _, pp := range r.opts.Predicates { + WalkPredicate(pp, func(p Predicate) bool { + if err != nil { + return false + } - switch p := p.(type) { - case EqualPredicate: - err = process(p.Column) - case InPredicate: - err = process(p.Column) - case GreaterThanPredicate: - err = process(p.Column) - case LessThanPredicate: - err = process(p.Column) - case FuncPredicate: - err = process(p.Column) - case AndPredicate, OrPredicate, NotPredicate, FalsePredicate, nil: - // No columns to process. - default: - panic(fmt.Sprintf("dataset.Reader.validatePredicate: unsupported predicate type %T", p)) - } + switch p := p.(type) { + case EqualPredicate: + err = process(p.Column) + case InPredicate: + err = process(p.Column) + case GreaterThanPredicate: + err = process(p.Column) + case LessThanPredicate: + err = process(p.Column) + case FuncPredicate: + err = process(p.Column) + case AndPredicate, OrPredicate, NotPredicate, FalsePredicate, nil: + // No columns to process. + default: + panic(fmt.Sprintf("dataset.Reader.validatePredicate: unsupported predicate type %T", p)) + } - return true // Continue walking the Predicate. - }) + return true // Continue walking the Predicate. + }) + } return err } @@ -445,10 +520,28 @@ func (r *Reader) initDownloader(ctx context.Context) error { } } - ranges, err := r.buildPredicateRanges(ctx, r.opts.Predicate) - if err != nil { - return err + var ranges rowRanges + var err error + if len(r.opts.Predicates) == 0 { // no predicates, build full range + ranges, err = r.buildPredicateRanges(ctx, nil) + if err != nil { + return err + } + } else { + for _, p := range r.opts.Predicates { + rr, err := r.buildPredicateRanges(ctx, p) + if err != nil { + return err + } + + if ranges == nil { + ranges = rr + } else { + ranges = intersectRanges(nil, ranges, rr) + } + } } + r.dl.SetDatasetRanges(ranges) r.ranges = ranges @@ -474,34 +567,36 @@ func (r *Reader) fillPrimaryMask(mask *bitmask.Mask) { } // If there's no predicate, all columns are primary. - if r.opts.Predicate == nil { + if len(r.opts.Predicates) == 0 { for _, c := range r.opts.Columns { process(c) } return } - // If there is a predicate, primary columns are those used in the predicate. - WalkPredicate(r.opts.Predicate, func(p Predicate) bool { - switch p := p.(type) { - case EqualPredicate: - process(p.Column) - case InPredicate: - process(p.Column) - case GreaterThanPredicate: - process(p.Column) - case LessThanPredicate: - process(p.Column) - case FuncPredicate: - process(p.Column) - case AndPredicate, OrPredicate, NotPredicate, FalsePredicate, nil: - // No columns to process. - default: - panic(fmt.Sprintf("dataset.Reader.fillPrimaryMask: unsupported predicate type %T", p)) - } + for _, pp := range r.opts.Predicates { + // If there is a predicate, primary columns are those used in the predicate. + WalkPredicate(pp, func(p Predicate) bool { + switch p := p.(type) { + case EqualPredicate: + process(p.Column) + case InPredicate: + process(p.Column) + case GreaterThanPredicate: + process(p.Column) + case LessThanPredicate: + process(p.Column) + case FuncPredicate: + process(p.Column) + case AndPredicate, OrPredicate, NotPredicate, FalsePredicate, nil: + // No columns to process. + default: + panic(fmt.Sprintf("dataset.Reader.fillPrimaryMask: unsupported predicate type %T", p)) + } - return true // Continue walking the Predicate. - }) + return true // Continue walking the Predicate. + }) + } } // buildPredicateRanges returns a set of rowRanges that are valid to read based @@ -738,3 +833,45 @@ func readMinMax(stats *datasetmd.Statistics) (minValue Value, maxValue Value, er } return } + +func (r *Reader) predicateColumns(p Predicate, keep func(c Column) bool) ([]Column, []int, error) { + columns := make(map[Column]struct{}) + + WalkPredicate(p, func(p Predicate) bool { + switch p := p.(type) { + case EqualPredicate: + columns[p.Column] = struct{}{} + case InPredicate: + columns[p.Column] = struct{}{} + case GreaterThanPredicate: + columns[p.Column] = struct{}{} + case LessThanPredicate: + columns[p.Column] = struct{}{} + case FuncPredicate: + columns[p.Column] = struct{}{} + case AndPredicate, OrPredicate, NotPredicate, FalsePredicate, nil: + // No columns to process. + default: + panic(fmt.Sprintf("predicateColumns: unsupported predicate type %T", p)) + } + return true + }) + + ret := make([]Column, 0, len(columns)) + idxs := make([]int, 0, len(columns)) + for c := range columns { + if !keep(c) { + continue + } + + idx, ok := r.origColumnLookup[c] + if !ok { + panic(fmt.Errorf("predicateColumns: column %v not found in Reader columns", c)) + } + + idxs = append(idxs, idx) + ret = append(ret, r.dl.AllColumns()[idx]) + } + + return ret, idxs, nil +} diff --git a/pkg/dataobj/internal/dataset/reader_test.go b/pkg/dataobj/internal/dataset/reader_test.go index 207250ba78..81746a0e7c 100644 --- a/pkg/dataobj/internal/dataset/reader_test.go +++ b/pkg/dataobj/internal/dataset/reader_test.go @@ -3,10 +3,15 @@ package dataset import ( "context" "errors" + "fmt" "io" + "iter" + "math/rand" "slices" + "strconv" "testing" + "github.com/dustin/go-humanize" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" @@ -31,9 +36,11 @@ func Test_Reader_ReadWithPredicate(t *testing.T) { r := NewReader(ReaderOptions{ Dataset: dset, Columns: columns, - Predicate: GreaterThanPredicate{ - Column: columns[3], // birth_year column - Value: Int64Value(1985), + Predicates: []Predicate{ + GreaterThanPredicate{ + Column: columns[3], // birth_year column + Value: Int64Value(1985), + }, }, }) defer r.Close() @@ -65,9 +72,11 @@ func Test_Reader_ReadWithPageFiltering(t *testing.T) { // // TODO(rfratto): make it easier to prove that a predicate includes a value // which is out of range of at least one page. - Predicate: EqualPredicate{ - Column: columns[0], // first_name column - Value: ByteArrayValue([]byte("Henry")), + Predicates: []Predicate{ + EqualPredicate{ + Column: columns[0], // first_name column + Value: ByteArrayValue([]byte("Henry")), + }, }, }) defer r.Close() @@ -92,9 +101,11 @@ func Test_Reader_ReadWithPredicate_NoSecondary(t *testing.T) { r := NewReader(ReaderOptions{ Dataset: dset, Columns: []Column{columns[3]}, - Predicate: GreaterThanPredicate{ - Column: columns[3], // birth_year column - Value: Int64Value(1985), + Predicates: []Predicate{ + GreaterThanPredicate{ + Column: columns[3], // birth_year column + Value: Int64Value(1985), + }, }, }) defer r.Close() @@ -141,10 +152,10 @@ func Test_Reader_Stats(t *testing.T) { r := NewReader(ReaderOptions{ Dataset: dset, Columns: columns, - Predicate: GreaterThanPredicate{ + Predicates: []Predicate{GreaterThanPredicate{ Column: columns[3], // birth_year column Value: Int64Value(1985), - }, + }}, }) defer r.Close() @@ -361,9 +372,9 @@ func Test_BuildPredicateRanges(t *testing.T) { for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { r := NewReader(ReaderOptions{ - Dataset: ds, - Columns: cols, - Predicate: tc.predicate, + Dataset: ds, + Columns: cols, + Predicates: []Predicate{tc.predicate}, }) defer r.Close() @@ -462,3 +473,394 @@ func encodeInt64Value(t *testing.T, v int64) []byte { require.NoError(t, err) return data } + +func BenchmarkReader(b *testing.B) { + generator := DatasetGenerator{ + RowCount: 1_000_000, + PageSizeHint: 2 * 1024 * 1024, // 2MB + Columns: []generatorColumnConfig{ + { + Name: "stream", + ValueType: datasetmd.VALUE_TYPE_INT64, + Encoding: datasetmd.ENCODING_TYPE_DELTA, + Compression: datasetmd.COMPRESSION_TYPE_NONE, + CardinalityTarget: 1000, + }, + { + Name: "timestamp", + ValueType: datasetmd.VALUE_TYPE_INT64, + Encoding: datasetmd.ENCODING_TYPE_DELTA, + Compression: datasetmd.COMPRESSION_TYPE_NONE, + CardinalityTarget: 100_000, + }, + { + Name: "log", + ValueType: datasetmd.VALUE_TYPE_BYTE_ARRAY, + Encoding: datasetmd.ENCODING_TYPE_PLAIN, + Compression: datasetmd.COMPRESSION_TYPE_NONE, + AvgSize: 1024, + CardinalityTarget: 100_000, + }, + }, + } + + readPatterns := []struct { + name string + batchSize int + }{ + { + name: "batch=100", + batchSize: 100, + }, + { + name: "batch=10k", + batchSize: 10_000, + }, + } + + // Generate dataset once per case + ds, cols := generator.Build(b, rand.Int63()) + opts := ReaderOptions{ + Dataset: ds, + Columns: cols, + } + + for _, rp := range readPatterns { + b.Run(rp.name, func(b *testing.B) { + b.ResetTimer() + b.ReportAllocs() + + batch := make([]Row, rp.batchSize) + for b.Loop() { + reader := NewReader(opts) + var rowsRead int + for { + n, err := reader.Read(context.Background(), batch) + if err == io.EOF { + break + } + if err != nil { + b.Fatal(err) + } + rowsRead += n + } + reader.Close() + + b.ReportMetric(float64(rowsRead)/float64(b.N), "rows/op") + } + }) + } +} + +func BenchmarkPredicateExecution(b *testing.B) { + // Generate dataset with two columns, one with high cardinality and one with low cardinality + // higher the cardinality, more selective the predicate + generator := DatasetGenerator{ + RowCount: 1_000_000, + // set large page size to not realise benefits from page pruning since the goal + // of this benchmark is to measure the gains from sequential predicate evaluation alone. + PageSizeHint: 100 * 1024 * 1024, + Columns: []generatorColumnConfig{ + { + Name: "more_selective", + ValueType: datasetmd.VALUE_TYPE_INT64, + Encoding: datasetmd.ENCODING_TYPE_DELTA, + Compression: datasetmd.COMPRESSION_TYPE_NONE, + CardinalityTarget: 500_000, + }, + { + Name: "less_selective", + ValueType: datasetmd.VALUE_TYPE_INT64, + Encoding: datasetmd.ENCODING_TYPE_DELTA, + Compression: datasetmd.COMPRESSION_TYPE_NONE, + CardinalityTarget: 100, + }, + }, + } + + ds, cols := generator.Build(b, rand.Int63()) + + var col1Value, col2Value int64 + idx := rand.Intn(generator.RowCount) // Randomly select a row index to use for the predicate values + + currentPos := 0 + batch := make([]Row, 1000) + // read the dataset once to pick a random row for predicate generation + reader := NewReader(ReaderOptions{ + Dataset: ds, + Columns: cols, + }) + + for { + n, err := reader.Read(context.Background(), batch) + if err == io.EOF { + break + } + if err != nil { + b.Fatal(err) + } + + // Check if our target index is in this batch + if idx >= currentPos && idx < currentPos+n { + selectedRow := batch[idx-currentPos] + col1Value = selectedRow.Values[0].Int64() + col2Value = selectedRow.Values[1].Int64() + break + } + + currentPos += n + } + reader.Close() + + predicatePatterns := []struct { + name string + predicates []Predicate + }{ + { + name: "combined", + predicates: []Predicate{ + AndPredicate{ + Left: EqualPredicate{ + Column: cols[0], + Value: Int64Value(col1Value), + }, + Right: EqualPredicate{ + Column: cols[1], + Value: Int64Value(col2Value), + }, + }, + }, + }, + { + name: "high", + predicates: []Predicate{ + EqualPredicate{ + Column: cols[0], + Value: Int64Value(col1Value), + }, + EqualPredicate{ + Column: cols[1], + Value: Int64Value(col2Value), + }, + }, + }, + { + name: "low", + predicates: []Predicate{ + EqualPredicate{ + Column: cols[1], + Value: Int64Value(col2Value), + }, + EqualPredicate{ + Column: cols[0], + Value: Int64Value(col1Value), + }, + }, + }, + } + + for _, pp := range predicatePatterns { + b.Run("selectivity="+pp.name, func(b *testing.B) { + b.ResetTimer() + b.ReportAllocs() + + for b.Loop() { + reader := NewReader(ReaderOptions{ + Dataset: ds, + Columns: cols, + Predicates: pp.predicates, + }) + + batch := make([]Row, 10000) + + for { + _, err := reader.Read(context.Background(), batch) + if err == io.EOF { + break + } + if err != nil { + b.Fatal(err) + } + } + reader.Close() + } + }) + } +} + +type generatorColumnConfig struct { + Name string + ValueType datasetmd.ValueType + Encoding datasetmd.EncodingType + Compression datasetmd.CompressionType + + AvgSize int64 // Average size in bytes for variable-length types + CardinalityTarget int64 // Target number of unique values + SparsityRate float64 // 0.0-1.0, where 1.0 means all values are null +} + +func columnValues(rng *rand.Rand, cfg generatorColumnConfig) iter.Seq[Value] { + switch cfg.ValueType { + case datasetmd.VALUE_TYPE_INT64, datasetmd.VALUE_TYPE_UINT64: + return numberValues(rng, cfg) + case datasetmd.VALUE_TYPE_BYTE_ARRAY: + return stringValues(rng, cfg) + default: + panic(fmt.Sprintf("unsupported type for generation: %v", cfg.ValueType)) + } +} + +func stringValues(rng *rand.Rand, cfg generatorColumnConfig) iter.Seq[Value] { + // Pre-generate the set of unique values we'll cycle through + uniqueValues := make([]Value, cfg.CardinalityTarget) + for i := range int(cfg.CardinalityTarget) { + // Generate size between 0.5x and 1.5x of average size + size := int(float64(cfg.AvgSize) * (0.5 + rng.Float64())) + + // Convert number to string and create padded result + str := make([]byte, size) + num := []byte(strconv.Itoa(i)) + copy(str, num) + for j := len(num); j < size; j++ { + str[j] = 'x' + } + uniqueValues[i] = ByteArrayValue(str) + } + + return func(yield func(Value) bool) { + for { + if !yield(uniqueValues[rng.Intn(len(uniqueValues))]) { + return + } + } + } +} + +func numberValues(rng *rand.Rand, cfg generatorColumnConfig) iter.Seq[Value] { + return func(yield func(Value) bool) { + for { + v := rng.Int63n(cfg.CardinalityTarget) + switch cfg.ValueType { + case datasetmd.VALUE_TYPE_INT64: + if !yield(Int64Value(v)) { + return + } + case datasetmd.VALUE_TYPE_UINT64: + if !yield(Uint64Value(uint64(v))) { + return + } + } + } + } +} + +type DatasetGenerator struct { + RowCount int + PageSizeHint int + Columns []generatorColumnConfig +} + +func (g *DatasetGenerator) Build(t testing.TB, seed int64) (Dataset, []Column) { + t.Helper() + + memColumns := make([]*MemColumn, 0, len(g.Columns)) + rng := rand.New(rand.NewSource(seed)) + + for _, colCfg := range g.Columns { + next, stop := iter.Pull(columnValues(rng, colCfg)) + defer stop() + + opts := BuilderOptions{ + PageSizeHint: g.PageSizeHint, + Value: colCfg.ValueType, + Encoding: colCfg.Encoding, + Compression: colCfg.Compression, + Statistics: StatisticsOptions{ + StoreCardinalityStats: true, + }, + } + + if colCfg.ValueType == datasetmd.VALUE_TYPE_INT64 || colCfg.ValueType == datasetmd.VALUE_TYPE_UINT64 { + opts.Statistics.StoreRangeStats = true + } + + // Create a builder for this column + builder, err := NewColumnBuilder(colCfg.Name, opts) + require.NoError(t, err) + + // Add values to the builder + for i := range g.RowCount { + if rng.Float64() < colCfg.SparsityRate { + continue + } + + val, ok := next() + require.True(t, ok, "generator should yield values") + require.NoError(t, builder.Append(i, val)) + } + + col, err := builder.Flush() + require.NoError(t, err) + + memColumns = append(memColumns, col) + } + + ds := FromMemory(memColumns) + cols, err := result.Collect(ds.ListColumns(context.Background())) + require.NoError(t, err) + + return ds, cols +} + +// Test_DatasetGenerator is a helper to debug the dataset generation +func Test_DatasetGenerator(t *testing.T) { + g := DatasetGenerator{ + RowCount: 1_000_000, + PageSizeHint: 2 * 1024 * 1024, // 2MB + Columns: []generatorColumnConfig{ + { + Name: "timestamp", + ValueType: datasetmd.VALUE_TYPE_INT64, + Encoding: datasetmd.ENCODING_TYPE_DELTA, + Compression: datasetmd.COMPRESSION_TYPE_NONE, + CardinalityTarget: 100_000, + SparsityRate: 0.0, + }, + { + Name: "label", + ValueType: datasetmd.VALUE_TYPE_BYTE_ARRAY, + Encoding: datasetmd.ENCODING_TYPE_PLAIN, + Compression: datasetmd.COMPRESSION_TYPE_NONE, + AvgSize: 32, + CardinalityTarget: 100, + SparsityRate: 0.3, + }, + }, + } + + _, cols := g.Build(t, rand.Int63()) + require.Equal(t, 2, len(cols)) + require.Equal(t, g.RowCount, cols[0].ColumnInfo().RowsCount) + // TODO: Row count is < expected. Must be a result of null values at the end. + // Remove this comment once the issue is fixed. + // require.Equal(t, g.RowCount, cols[1].ColumnInfo().RowsCount) + + require.NotNil(t, cols[0].ColumnInfo().Statistics.CardinalityCount) + require.NotNil(t, cols[1].ColumnInfo().Statistics.CardinalityCount) + + t.Logf("timestamp column cardinality: %d", cols[0].ColumnInfo().Statistics.CardinalityCount) + t.Logf("label column cardinality: %d", cols[1].ColumnInfo().Statistics.CardinalityCount) + + require.NotNil(t, cols[0].ColumnInfo().Statistics.MinValue) + require.NotNil(t, cols[0].ColumnInfo().Statistics.MaxValue) + + var minValue, maxValue Value + require.NoError(t, minValue.UnmarshalBinary(cols[0].ColumnInfo().Statistics.MinValue)) + require.NoError(t, maxValue.UnmarshalBinary(cols[0].ColumnInfo().Statistics.MaxValue)) + + t.Logf("timestamp column min: %d", minValue.Int64()) + t.Logf("timestamp column max: %d", maxValue.Int64()) + + t.Logf("timestamp column size: %s", humanize.Bytes(uint64(cols[0].ColumnInfo().UncompressedSize))) + t.Logf("label column size: %s", humanize.Bytes(uint64(cols[1].ColumnInfo().UncompressedSize))) +} diff --git a/pkg/dataobj/logs_reader.go b/pkg/dataobj/logs_reader.go index e99cba18f2..c9eb362a6b 100644 --- a/pkg/dataobj/logs_reader.go +++ b/pkg/dataobj/logs_reader.go @@ -177,18 +177,20 @@ func (r *LogsReader) initReader(ctx context.Context) error { // r.predicate doesn't contain mappings of stream IDs; we need to build // that as a separate predicate and AND them together. - predicate := streamIDPredicate(maps.Keys(r.matchIDs), columns, columnDescs) + var predicates []dataset.Predicate + if p := streamIDPredicate(maps.Keys(r.matchIDs), columns, columnDescs); p != nil { + predicates = append(predicates, p) + } if r.predicate != nil { - predicate = dataset.AndPredicate{ - Left: predicate, - Right: translateLogsPredicate(r.predicate, columns, columnDescs), + if p := translateLogsPredicate(r.predicate, columns, columnDescs); p != nil { + predicates = append(predicates, p) } } readerOpts := dataset.ReaderOptions{ - Dataset: dset, - Columns: columns, - Predicate: predicate, + Dataset: dset, + Columns: columns, + Predicates: predicates, TargetCacheSize: 16_000_000, // Permit up to 16MB of cache pages. } diff --git a/pkg/dataobj/streams_reader.go b/pkg/dataobj/streams_reader.go index 9cc3b2ab6f..23b03177dc 100644 --- a/pkg/dataobj/streams_reader.go +++ b/pkg/dataobj/streams_reader.go @@ -144,10 +144,15 @@ func (r *StreamsReader) initReader(ctx context.Context) error { return fmt.Errorf("reading columns: %w", err) } + var predicates []dataset.Predicate + if p := translateStreamsPredicate(r.predicate, columns, columnDescs); p != nil { + predicates = append(predicates, p) + } + readerOpts := dataset.ReaderOptions{ - Dataset: dset, - Columns: columns, - Predicate: translateStreamsPredicate(r.predicate, columns, columnDescs), + Dataset: dset, + Columns: columns, + Predicates: predicates, TargetCacheSize: 16_000_000, // Permit up to 16MB of cache pages. }