mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
930 lines
25 KiB
930 lines
25 KiB
package dataset
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"iter"
|
|
"math/rand"
|
|
"slices"
|
|
"strconv"
|
|
"testing"
|
|
|
|
"github.com/dustin/go-humanize"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
|
|
"github.com/grafana/loki/v3/pkg/xcap"
|
|
)
|
|
|
|
func Test_Reader_ReadAll(t *testing.T) {
|
|
dset, columns := buildTestDataset(t)
|
|
r := NewReader(ReaderOptions{Dataset: dset, Columns: columns})
|
|
defer r.Close()
|
|
|
|
actualRows, err := readDataset(r, 3)
|
|
require.NoError(t, err)
|
|
require.Equal(t, basicReaderTestData, convertToTestPersons(actualRows))
|
|
}
|
|
|
|
func Test_Reader_ReadWithPredicate(t *testing.T) {
|
|
dset, columns := buildTestDataset(t)
|
|
|
|
// Create a predicate that only returns people born after 1985
|
|
r := NewReader(ReaderOptions{
|
|
Dataset: dset,
|
|
Columns: columns,
|
|
Predicates: []Predicate{
|
|
GreaterThanPredicate{
|
|
Column: columns[3], // birth_year column
|
|
Value: Int64Value(1985),
|
|
},
|
|
},
|
|
})
|
|
defer r.Close()
|
|
|
|
actualRows, err := readDataset(r, 3)
|
|
require.NoError(t, err)
|
|
|
|
// Filter expected data manually to verify
|
|
var expected []testPerson
|
|
for _, p := range basicReaderTestData {
|
|
if p.birthYear > 1985 {
|
|
expected = append(expected, p)
|
|
}
|
|
}
|
|
require.Equal(t, expected, convertToTestPersons(actualRows))
|
|
}
|
|
|
|
// Test_Reader_ReadWithPageFiltering tests that a Reader can filter rows based
|
|
// on a predicate that has filtered pages out.
|
|
func Test_Reader_ReadWithPageFiltering(t *testing.T) {
|
|
dset, columns := buildTestDataset(t)
|
|
|
|
r := NewReader(ReaderOptions{
|
|
Dataset: dset,
|
|
Columns: columns,
|
|
|
|
// Henry is out of range of most pages except for the first and last, so
|
|
// other pages would be filtered out of testing.
|
|
//
|
|
// TODO(rfratto): make it easier to prove that a predicate includes a value
|
|
// which is out of range of at least one page.
|
|
Predicates: []Predicate{
|
|
EqualPredicate{
|
|
Column: columns[0], // first_name column
|
|
Value: BinaryValue([]byte("Henry")),
|
|
},
|
|
},
|
|
})
|
|
defer r.Close()
|
|
|
|
actualRows, err := readDataset(r, 3)
|
|
require.NoError(t, err)
|
|
|
|
// Filter expected data manually to verify
|
|
var expected []testPerson
|
|
for _, p := range basicReaderTestData {
|
|
if p.firstName == "Henry" {
|
|
expected = append(expected, p)
|
|
}
|
|
}
|
|
require.Equal(t, expected, convertToTestPersons(actualRows))
|
|
}
|
|
|
|
// Test_Reader_ReadWithPageFilteringOnEmptyPredicate tests that a Reader filters rows with empty predicate values.
|
|
// Filtering for an explicitly empty value also includes Null values and these rows should not be excluded by page skipping.
|
|
func Test_Reader_ReadWithPageFilteringOnEmptyPredicate(t *testing.T) {
|
|
// Create builders for each column
|
|
firstNameBuilder := buildStringColumn(t, "first_name")
|
|
lastNameBuilder := buildStringColumn(t, "middle_name")
|
|
|
|
// Row with both values.
|
|
firstNameBuilder.append(0, BinaryValue([]byte("John")))
|
|
lastNameBuilder.append(0, BinaryValue([]byte("Doe")))
|
|
|
|
// Row with null value.
|
|
firstNameBuilder.append(2, BinaryValue([]byte("Jim")))
|
|
lastNameBuilder.append(2, Value{})
|
|
|
|
firstName, err := firstNameBuilder.Flush()
|
|
require.NoError(t, err)
|
|
lastName, err := lastNameBuilder.Flush()
|
|
require.NoError(t, err)
|
|
|
|
dset := FromMemory([]*MemColumn{firstName, lastName})
|
|
cols, err := result.Collect(dset.ListColumns(context.Background()))
|
|
require.NoError(t, err)
|
|
|
|
r := NewReader(ReaderOptions{
|
|
Dataset: dset,
|
|
Columns: cols,
|
|
|
|
Predicates: []Predicate{
|
|
// Imitate predicate from logql: {last_name=""}
|
|
EqualPredicate{
|
|
Column: cols[1], // last_name column
|
|
Value: BinaryValue([]byte("")),
|
|
},
|
|
},
|
|
})
|
|
defer r.Close()
|
|
|
|
actualRows, err := readDataset(r, 3)
|
|
require.NoError(t, err)
|
|
|
|
actualFirstNames := make([]string, 0, len(actualRows))
|
|
actualLastNames := make([]string, 0, len(actualRows))
|
|
for _, row := range actualRows {
|
|
if row.Values[0].IsNil() && row.Values[1].IsNil() {
|
|
continue
|
|
}
|
|
actualFirstNames = append(actualFirstNames, string(row.Values[0].Binary()))
|
|
if !row.Values[1].IsNil() {
|
|
actualLastNames = append(actualLastNames, string(row.Values[1].Binary()))
|
|
} else {
|
|
actualLastNames = append(actualLastNames, "[nil]")
|
|
}
|
|
}
|
|
|
|
// Filter expected data manually to verify
|
|
expectedFirstNames := []string{"Jim"}
|
|
expectedLastNames := []string{"[nil]"}
|
|
require.Equal(t, expectedFirstNames, actualFirstNames)
|
|
require.Equal(t, expectedLastNames, actualLastNames)
|
|
}
|
|
|
|
func Test_Reader_ReadWithPredicate_NoSecondary(t *testing.T) {
|
|
dset, columns := buildTestDataset(t)
|
|
|
|
// Create a predicate that only returns people born after 1985
|
|
r := NewReader(ReaderOptions{
|
|
Dataset: dset,
|
|
Columns: []Column{columns[3]},
|
|
Predicates: []Predicate{
|
|
GreaterThanPredicate{
|
|
Column: columns[3], // birth_year column
|
|
Value: Int64Value(1985),
|
|
},
|
|
},
|
|
})
|
|
defer r.Close()
|
|
|
|
actualRows, err := readDataset(r, 3)
|
|
require.NoError(t, err)
|
|
|
|
// Filter expected data manually to verify
|
|
var expected []int
|
|
for _, p := range basicReaderTestData {
|
|
if p.birthYear > 1985 {
|
|
expected = append(expected, int(p.birthYear))
|
|
}
|
|
}
|
|
|
|
var actual []int
|
|
for _, row := range actualRows {
|
|
actual = append(actual, int(row.Values[0].Int64()))
|
|
}
|
|
require.Equal(t, expected, actual)
|
|
}
|
|
|
|
func Test_Reader_Reset(t *testing.T) {
|
|
dset, columns := buildTestDataset(t)
|
|
r := NewReader(ReaderOptions{Dataset: dset, Columns: columns})
|
|
defer r.Close()
|
|
|
|
// First read everything
|
|
_, err := readDataset(r, 3)
|
|
require.NoError(t, err)
|
|
|
|
// Reset and read again
|
|
r.Reset(ReaderOptions{Dataset: dset, Columns: columns})
|
|
|
|
actualRows, err := readDataset(r, 3)
|
|
require.NoError(t, err)
|
|
require.Equal(t, basicReaderTestData, convertToTestPersons(actualRows))
|
|
}
|
|
|
|
func Test_buildMask(t *testing.T) {
|
|
tt := []struct {
|
|
name string
|
|
fullRange rowRange
|
|
rows []Row
|
|
expect []rowRange
|
|
}{
|
|
{
|
|
name: "no rows",
|
|
fullRange: rowRange{1, 10},
|
|
rows: nil,
|
|
expect: []rowRange{{1, 10}},
|
|
},
|
|
{
|
|
name: "full coverage",
|
|
fullRange: rowRange{1, 10},
|
|
rows: makeRows(1, 10, 1),
|
|
expect: nil,
|
|
},
|
|
{
|
|
name: "full coverage - split",
|
|
fullRange: rowRange{1, 10},
|
|
rows: mergeRows(makeRows(1, 5, 1), makeRows(6, 10, 1)),
|
|
expect: nil,
|
|
},
|
|
{
|
|
name: "partial coverage - front",
|
|
fullRange: rowRange{1, 10},
|
|
rows: makeRows(1, 5, 1),
|
|
expect: []rowRange{{6, 10}},
|
|
},
|
|
{
|
|
name: "partial coverage - middle",
|
|
fullRange: rowRange{1, 10},
|
|
rows: makeRows(5, 7, 1),
|
|
expect: []rowRange{{1, 4}, {8, 10}},
|
|
},
|
|
{
|
|
name: "partial coverage - end",
|
|
fullRange: rowRange{1, 10},
|
|
rows: makeRows(6, 10, 1),
|
|
expect: []rowRange{{1, 5}},
|
|
},
|
|
{
|
|
name: "partial coverage - gaps",
|
|
fullRange: rowRange{1, 10},
|
|
rows: []Row{{Index: 3}, {Index: 5}, {Index: 7}, {Index: 9}},
|
|
expect: []rowRange{{1, 2}, {4, 4}, {6, 6}, {8, 8}, {10, 10}},
|
|
},
|
|
}
|
|
|
|
for _, tc := range tt {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
actual := slices.Collect(buildMask(tc.fullRange, tc.rows))
|
|
require.Equal(t, tc.expect, actual)
|
|
})
|
|
}
|
|
}
|
|
|
|
func makeRows(from, to, inc int) []Row {
|
|
var rows []Row
|
|
for i := from; i <= to; i += inc {
|
|
rows = append(rows, Row{Index: i})
|
|
}
|
|
return rows
|
|
}
|
|
|
|
func mergeRows(rows ...[]Row) []Row {
|
|
var res []Row
|
|
for _, r := range rows {
|
|
res = append(res, r...)
|
|
}
|
|
return res
|
|
}
|
|
|
|
// readDataset reads all rows from a Reader using the given batch size.
|
|
func readDataset(br *Reader, batchSize int) ([]Row, error) {
|
|
return readDatasetWithContext(context.Background(), br, batchSize)
|
|
}
|
|
|
|
// readDatasetWithContext reads all rows from a Reader using the given batch size and context.
|
|
func readDatasetWithContext(ctx context.Context, br *Reader, batchSize int) ([]Row, error) {
|
|
var (
|
|
all []Row
|
|
|
|
batch = make([]Row, batchSize)
|
|
)
|
|
|
|
for {
|
|
// Clear the batch for each read, to ensure that any memory in Row and
|
|
// Value doesn't get reused. See comment in implmentation of
|
|
// [readBasicReader] for more information.
|
|
clear(batch)
|
|
|
|
n, err := br.Read(ctx, batch)
|
|
all = append(all, batch[:n]...)
|
|
if errors.Is(err, io.EOF) {
|
|
return all, nil
|
|
} else if err != nil {
|
|
return all, err
|
|
}
|
|
}
|
|
}
|
|
|
|
func Test_BuildPredicateRanges(t *testing.T) {
|
|
ds, cols := buildMemDatasetWithStats(t)
|
|
tt := []struct {
|
|
name string
|
|
predicate Predicate
|
|
want rowRanges
|
|
}{
|
|
{
|
|
name: "nil predicate returns full range",
|
|
predicate: nil,
|
|
want: rowRanges{{Start: 0, End: 999}}, // Full dataset range
|
|
},
|
|
{
|
|
name: "equal predicate in range",
|
|
predicate: EqualPredicate{Column: cols[1], Value: Int64Value(50)},
|
|
want: rowRanges{{Start: 0, End: 249}}, // Page 1 of Timestamp column
|
|
},
|
|
{
|
|
name: "equal predicate not in any range",
|
|
predicate: EqualPredicate{Column: cols[1], Value: Int64Value(1500)},
|
|
want: nil, // No ranges should match
|
|
},
|
|
{
|
|
name: "greater than predicate",
|
|
predicate: GreaterThanPredicate{Column: cols[1], Value: Int64Value(400)},
|
|
want: rowRanges{{Start: 250, End: 749}, {Start: 750, End: 999}}, // Pages 2 and 3 of Timestamp column
|
|
},
|
|
{
|
|
name: "less than predicate",
|
|
predicate: LessThanPredicate{Column: cols[1], Value: Int64Value(300)},
|
|
want: rowRanges{{Start: 0, End: 249}, {Start: 250, End: 749}}, // Pages 1 and 2 of Timestamp column
|
|
},
|
|
{
|
|
name: "and predicate",
|
|
predicate: AndPredicate{
|
|
Left: EqualPredicate{Column: cols[0], Value: Int64Value(1)}, // Rows 0 - 299 of stream column
|
|
Right: LessThanPredicate{Column: cols[1], Value: Int64Value(600)}, // Rows 0 - 249, 250 - 749 of timestamp column
|
|
},
|
|
want: rowRanges{{Start: 0, End: 249}, {Start: 250, End: 299}},
|
|
},
|
|
{
|
|
name: "or predicate",
|
|
predicate: OrPredicate{
|
|
Left: EqualPredicate{Column: cols[0], Value: Int64Value(1)}, // Rows 0 - 299 of stream column
|
|
Right: GreaterThanPredicate{Column: cols[1], Value: Int64Value(800)}, // Rows 750 - 999 of timestamp column
|
|
},
|
|
want: rowRanges{{Start: 0, End: 299}, {Start: 750, End: 999}}, // Rows 0 - 299, 750 - 999
|
|
},
|
|
{
|
|
name: "InPredicate with values inside and outside page ranges",
|
|
predicate: InPredicate{
|
|
Column: cols[1], // timestamp column
|
|
Values: NewInt64ValueSet([]Value{
|
|
Int64Value(50),
|
|
Int64Value(300),
|
|
Int64Value(150),
|
|
Int64Value(600),
|
|
}), // 2 values in range. ~200 matching rows
|
|
},
|
|
want: rowRanges{
|
|
{Start: 0, End: 249}, // Page 1: contains 50
|
|
{Start: 250, End: 749}, // Page 2: contains 300
|
|
},
|
|
},
|
|
{
|
|
name: "InPredicate with values all outside page ranges",
|
|
predicate: InPredicate{
|
|
Column: cols[1], // timestamp column
|
|
Values: NewInt64ValueSet([]Value{
|
|
Int64Value(150), // Outside all pages
|
|
Int64Value(600), // Outside all pages
|
|
}),
|
|
},
|
|
want: nil, // No pages should be included
|
|
},
|
|
}
|
|
|
|
ctx := context.Background()
|
|
for _, tc := range tt {
|
|
t.Run(tc.name, func(t *testing.T) {
|
|
r := NewReader(ReaderOptions{
|
|
Dataset: ds,
|
|
Columns: cols,
|
|
Predicates: []Predicate{tc.predicate},
|
|
})
|
|
defer r.Close()
|
|
|
|
// Initialize downloader
|
|
require.NoError(t, r.initDownloader(ctx))
|
|
|
|
got, err := r.buildPredicateRanges(ctx, tc.predicate)
|
|
require.NoError(t, err)
|
|
|
|
require.Equal(t, tc.want, got, "row ranges should match expected ranges")
|
|
})
|
|
}
|
|
}
|
|
|
|
// buildMemDatasetWithStats creates a test dataset with only column and page stats.
|
|
func buildMemDatasetWithStats(t *testing.T) (Dataset, []Column) {
|
|
t.Helper()
|
|
|
|
dset := FromMemory([]*MemColumn{
|
|
{
|
|
Desc: ColumnDesc{
|
|
Tag: "stream",
|
|
Type: ColumnType{Physical: datasetmd.PHYSICAL_TYPE_INT64, Logical: "number"},
|
|
RowsCount: 1000, // 0 - 999
|
|
},
|
|
Pages: []*MemPage{
|
|
{
|
|
Desc: PageDesc{
|
|
RowCount: 300, // 0 - 299
|
|
Stats: &datasetmd.Statistics{
|
|
MinValue: encodeInt64Value(t, 1),
|
|
MaxValue: encodeInt64Value(t, 2),
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Desc: PageDesc{
|
|
RowCount: 700, // 300 - 999
|
|
Stats: &datasetmd.Statistics{
|
|
MinValue: encodeInt64Value(t, 2),
|
|
MaxValue: encodeInt64Value(t, 2),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Desc: ColumnDesc{
|
|
Tag: "timestamp",
|
|
Type: ColumnType{Physical: datasetmd.PHYSICAL_TYPE_INT64, Logical: "number"},
|
|
RowsCount: 1000, // 0 - 999
|
|
},
|
|
Pages: []*MemPage{
|
|
{
|
|
Desc: PageDesc{
|
|
RowCount: 250, // 0 - 249
|
|
Stats: &datasetmd.Statistics{
|
|
MinValue: encodeInt64Value(t, 0),
|
|
MaxValue: encodeInt64Value(t, 100),
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Desc: PageDesc{
|
|
RowCount: 500, // 249 - 749
|
|
Stats: &datasetmd.Statistics{
|
|
MinValue: encodeInt64Value(t, 200),
|
|
MaxValue: encodeInt64Value(t, 500),
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Desc: PageDesc{
|
|
RowCount: 250, // 750 - 999
|
|
Stats: &datasetmd.Statistics{
|
|
MinValue: encodeInt64Value(t, 800),
|
|
MaxValue: encodeInt64Value(t, 1000),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
})
|
|
|
|
cols, err := result.Collect(dset.ListColumns(context.Background()))
|
|
require.NoError(t, err)
|
|
|
|
return dset, cols
|
|
}
|
|
|
|
// Helper function to encode an integer value for statistics
|
|
func encodeInt64Value(t *testing.T, v int64) []byte {
|
|
t.Helper()
|
|
|
|
data, err := Int64Value(v).MarshalBinary()
|
|
require.NoError(t, err)
|
|
return data
|
|
}
|
|
|
|
func BenchmarkReader(b *testing.B) {
|
|
generator := DatasetGenerator{
|
|
RowCount: 1_000_000,
|
|
PageSizeHint: 2 * 1024 * 1024, // 2MB
|
|
Columns: []generatorColumnConfig{
|
|
{
|
|
Tag: "stream",
|
|
Type: ColumnType{Physical: datasetmd.PHYSICAL_TYPE_INT64, Logical: "number"},
|
|
Encoding: datasetmd.ENCODING_TYPE_DELTA,
|
|
Compression: datasetmd.COMPRESSION_TYPE_NONE,
|
|
CardinalityTarget: 1000,
|
|
},
|
|
{
|
|
Tag: "timestamp",
|
|
Type: ColumnType{Physical: datasetmd.PHYSICAL_TYPE_INT64, Logical: "number"},
|
|
Encoding: datasetmd.ENCODING_TYPE_DELTA,
|
|
Compression: datasetmd.COMPRESSION_TYPE_NONE,
|
|
CardinalityTarget: 100_000,
|
|
},
|
|
{
|
|
Tag: "log",
|
|
Type: ColumnType{Physical: datasetmd.PHYSICAL_TYPE_BINARY, Logical: "string"},
|
|
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
|
|
Compression: datasetmd.COMPRESSION_TYPE_NONE,
|
|
AvgSize: 1024,
|
|
CardinalityTarget: 100_000,
|
|
},
|
|
},
|
|
}
|
|
|
|
readPatterns := []struct {
|
|
name string
|
|
batchSize int
|
|
}{
|
|
{
|
|
name: "batch=100",
|
|
batchSize: 100,
|
|
},
|
|
{
|
|
name: "batch=10k",
|
|
batchSize: 10_000,
|
|
},
|
|
}
|
|
|
|
// Generate dataset once per case
|
|
ds, cols := generator.Build(b, rand.Int63())
|
|
opts := ReaderOptions{
|
|
Dataset: ds,
|
|
Columns: cols,
|
|
}
|
|
|
|
for _, rp := range readPatterns {
|
|
b.Run(rp.name, func(b *testing.B) {
|
|
b.ResetTimer()
|
|
b.ReportAllocs()
|
|
|
|
batch := make([]Row, rp.batchSize)
|
|
for b.Loop() {
|
|
reader := NewReader(opts)
|
|
var rowsRead int
|
|
for {
|
|
n, err := reader.Read(context.Background(), batch)
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if err != nil {
|
|
b.Fatal(err)
|
|
}
|
|
rowsRead += n
|
|
}
|
|
reader.Close()
|
|
|
|
b.ReportMetric(float64(rowsRead)/float64(b.N), "rows/op")
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func BenchmarkPredicateExecution(b *testing.B) {
|
|
// Generate dataset with two columns, one with high cardinality and one with low cardinality
|
|
// higher the cardinality, more selective the predicate
|
|
generator := DatasetGenerator{
|
|
RowCount: 1_000_000,
|
|
// set large page size to not realise benefits from page pruning since the goal
|
|
// of this benchmark is to measure the gains from sequential predicate evaluation alone.
|
|
PageSizeHint: 100 * 1024 * 1024,
|
|
Columns: []generatorColumnConfig{
|
|
{
|
|
Tag: "more_selective",
|
|
Type: ColumnType{Physical: datasetmd.PHYSICAL_TYPE_INT64, Logical: "int64"},
|
|
Encoding: datasetmd.ENCODING_TYPE_DELTA,
|
|
Compression: datasetmd.COMPRESSION_TYPE_NONE,
|
|
CardinalityTarget: 500_000,
|
|
},
|
|
{
|
|
Tag: "less_selective",
|
|
Type: ColumnType{Physical: datasetmd.PHYSICAL_TYPE_INT64, Logical: "int64"},
|
|
Encoding: datasetmd.ENCODING_TYPE_DELTA,
|
|
Compression: datasetmd.COMPRESSION_TYPE_NONE,
|
|
CardinalityTarget: 100,
|
|
},
|
|
},
|
|
}
|
|
|
|
ds, cols := generator.Build(b, rand.Int63())
|
|
|
|
var col1Value, col2Value int64
|
|
idx := rand.Intn(generator.RowCount) // Randomly select a row index to use for the predicate values
|
|
|
|
currentPos := 0
|
|
batch := make([]Row, 1000)
|
|
// read the dataset once to pick a random row for predicate generation
|
|
reader := NewReader(ReaderOptions{
|
|
Dataset: ds,
|
|
Columns: cols,
|
|
})
|
|
|
|
for {
|
|
n, err := reader.Read(context.Background(), batch)
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if err != nil {
|
|
b.Fatal(err)
|
|
}
|
|
|
|
// Check if our target index is in this batch
|
|
if idx >= currentPos && idx < currentPos+n {
|
|
selectedRow := batch[idx-currentPos]
|
|
col1Value = selectedRow.Values[0].Int64()
|
|
col2Value = selectedRow.Values[1].Int64()
|
|
break
|
|
}
|
|
|
|
currentPos += n
|
|
}
|
|
reader.Close()
|
|
|
|
predicatePatterns := []struct {
|
|
name string
|
|
predicates []Predicate
|
|
}{
|
|
{
|
|
name: "combined",
|
|
predicates: []Predicate{
|
|
AndPredicate{
|
|
Left: EqualPredicate{
|
|
Column: cols[0],
|
|
Value: Int64Value(col1Value),
|
|
},
|
|
Right: EqualPredicate{
|
|
Column: cols[1],
|
|
Value: Int64Value(col2Value),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "high",
|
|
predicates: []Predicate{
|
|
EqualPredicate{
|
|
Column: cols[0],
|
|
Value: Int64Value(col1Value),
|
|
},
|
|
EqualPredicate{
|
|
Column: cols[1],
|
|
Value: Int64Value(col2Value),
|
|
},
|
|
},
|
|
},
|
|
{
|
|
name: "low",
|
|
predicates: []Predicate{
|
|
EqualPredicate{
|
|
Column: cols[1],
|
|
Value: Int64Value(col2Value),
|
|
},
|
|
EqualPredicate{
|
|
Column: cols[0],
|
|
Value: Int64Value(col1Value),
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
for _, pp := range predicatePatterns {
|
|
b.Run("selectivity="+pp.name, func(b *testing.B) {
|
|
b.ResetTimer()
|
|
b.ReportAllocs()
|
|
|
|
for b.Loop() {
|
|
reader := NewReader(ReaderOptions{
|
|
Dataset: ds,
|
|
Columns: cols,
|
|
Predicates: pp.predicates,
|
|
})
|
|
|
|
batch := make([]Row, 10000)
|
|
|
|
for {
|
|
_, err := reader.Read(context.Background(), batch)
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if err != nil {
|
|
b.Fatal(err)
|
|
}
|
|
}
|
|
reader.Close()
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
type generatorColumnConfig struct {
|
|
Tag string
|
|
Type ColumnType
|
|
Encoding datasetmd.EncodingType
|
|
Compression datasetmd.CompressionType
|
|
|
|
AvgSize int64 // Average size in bytes for variable-length types
|
|
CardinalityTarget int64 // Target number of unique values
|
|
SparsityRate float64 // 0.0-1.0, where 1.0 means all values are null
|
|
}
|
|
|
|
func columnValues(rng *rand.Rand, cfg generatorColumnConfig) iter.Seq[Value] {
|
|
switch cfg.Type.Physical {
|
|
case datasetmd.PHYSICAL_TYPE_INT64, datasetmd.PHYSICAL_TYPE_UINT64:
|
|
return numberValues(rng, cfg)
|
|
case datasetmd.PHYSICAL_TYPE_BINARY:
|
|
return stringValues(rng, cfg)
|
|
default:
|
|
panic(fmt.Sprintf("unsupported type for generation: %v", cfg.Type.Physical))
|
|
}
|
|
}
|
|
|
|
func stringValues(rng *rand.Rand, cfg generatorColumnConfig) iter.Seq[Value] {
|
|
// Pre-generate the set of unique values we'll cycle through
|
|
uniqueValues := make([]Value, cfg.CardinalityTarget)
|
|
for i := range int(cfg.CardinalityTarget) {
|
|
// Generate size between 0.5x and 1.5x of average size
|
|
size := int(float64(cfg.AvgSize) * (0.5 + rng.Float64()))
|
|
|
|
// Convert number to string and create padded result
|
|
str := make([]byte, size)
|
|
num := []byte(strconv.Itoa(i))
|
|
copy(str, num)
|
|
for j := len(num); j < size; j++ {
|
|
str[j] = 'x'
|
|
}
|
|
uniqueValues[i] = BinaryValue(str)
|
|
}
|
|
|
|
return func(yield func(Value) bool) {
|
|
for {
|
|
if !yield(uniqueValues[rng.Intn(len(uniqueValues))]) {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func numberValues(rng *rand.Rand, cfg generatorColumnConfig) iter.Seq[Value] {
|
|
return func(yield func(Value) bool) {
|
|
for {
|
|
v := rng.Int63n(cfg.CardinalityTarget)
|
|
switch cfg.Type.Physical {
|
|
case datasetmd.PHYSICAL_TYPE_INT64:
|
|
if !yield(Int64Value(v)) {
|
|
return
|
|
}
|
|
case datasetmd.PHYSICAL_TYPE_UINT64:
|
|
if !yield(Uint64Value(uint64(v))) {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
type DatasetGenerator struct {
|
|
RowCount int
|
|
PageSizeHint int
|
|
Columns []generatorColumnConfig
|
|
}
|
|
|
|
func (g *DatasetGenerator) Build(t testing.TB, seed int64) (Dataset, []Column) {
|
|
t.Helper()
|
|
|
|
memColumns := make([]*MemColumn, 0, len(g.Columns))
|
|
rng := rand.New(rand.NewSource(seed))
|
|
|
|
for _, colCfg := range g.Columns {
|
|
next, stop := iter.Pull(columnValues(rng, colCfg))
|
|
defer stop()
|
|
|
|
opts := BuilderOptions{
|
|
PageSizeHint: g.PageSizeHint,
|
|
Type: colCfg.Type,
|
|
Encoding: colCfg.Encoding,
|
|
Compression: colCfg.Compression,
|
|
Statistics: StatisticsOptions{
|
|
StoreCardinalityStats: true,
|
|
},
|
|
}
|
|
|
|
if colCfg.Type.Physical == datasetmd.PHYSICAL_TYPE_INT64 || colCfg.Type.Physical == datasetmd.PHYSICAL_TYPE_UINT64 {
|
|
opts.Statistics.StoreRangeStats = true
|
|
}
|
|
|
|
// Create a builder for this column
|
|
builder, err := NewColumnBuilder(colCfg.Tag, opts)
|
|
require.NoError(t, err)
|
|
|
|
// Add values to the builder
|
|
for i := range g.RowCount {
|
|
if rng.Float64() < colCfg.SparsityRate {
|
|
continue
|
|
}
|
|
|
|
val, ok := next()
|
|
require.True(t, ok, "generator should yield values")
|
|
require.NoError(t, builder.Append(i, val))
|
|
}
|
|
|
|
col, err := builder.Flush()
|
|
require.NoError(t, err)
|
|
|
|
memColumns = append(memColumns, col)
|
|
}
|
|
|
|
ds := FromMemory(memColumns)
|
|
cols, err := result.Collect(ds.ListColumns(context.Background()))
|
|
require.NoError(t, err)
|
|
|
|
return ds, cols
|
|
}
|
|
|
|
// Test_DatasetGenerator is a helper to debug the dataset generation
|
|
func Test_DatasetGenerator(t *testing.T) {
|
|
g := DatasetGenerator{
|
|
RowCount: 1_000_000,
|
|
PageSizeHint: 2 * 1024 * 1024, // 2MB
|
|
Columns: []generatorColumnConfig{
|
|
{
|
|
Tag: "timestamp",
|
|
Type: ColumnType{Physical: datasetmd.PHYSICAL_TYPE_INT64, Logical: "timestamp"},
|
|
Encoding: datasetmd.ENCODING_TYPE_DELTA,
|
|
Compression: datasetmd.COMPRESSION_TYPE_NONE,
|
|
CardinalityTarget: 100_000,
|
|
SparsityRate: 0.0,
|
|
},
|
|
{
|
|
Tag: "label",
|
|
Type: ColumnType{Physical: datasetmd.PHYSICAL_TYPE_BINARY, Logical: "label"},
|
|
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
|
|
Compression: datasetmd.COMPRESSION_TYPE_NONE,
|
|
AvgSize: 32,
|
|
CardinalityTarget: 100,
|
|
SparsityRate: 0.3,
|
|
},
|
|
},
|
|
}
|
|
|
|
_, cols := g.Build(t, rand.Int63())
|
|
require.Equal(t, 2, len(cols))
|
|
require.Equal(t, g.RowCount, cols[0].ColumnDesc().RowsCount)
|
|
// TODO: Row count is < expected. Must be a result of null values at the end.
|
|
// Remove this comment once the issue is fixed.
|
|
// require.Equal(t, g.RowCount, cols[1].ColumnInfo().RowsCount)
|
|
|
|
require.NotNil(t, cols[0].ColumnDesc().Statistics.CardinalityCount)
|
|
require.NotNil(t, cols[1].ColumnDesc().Statistics.CardinalityCount)
|
|
|
|
t.Logf("timestamp column cardinality: %d", cols[0].ColumnDesc().Statistics.CardinalityCount)
|
|
t.Logf("label column cardinality: %d", cols[1].ColumnDesc().Statistics.CardinalityCount)
|
|
|
|
require.NotNil(t, cols[0].ColumnDesc().Statistics.MinValue)
|
|
require.NotNil(t, cols[0].ColumnDesc().Statistics.MaxValue)
|
|
|
|
var minValue, maxValue Value
|
|
require.NoError(t, minValue.UnmarshalBinary(cols[0].ColumnDesc().Statistics.MinValue))
|
|
require.NoError(t, maxValue.UnmarshalBinary(cols[0].ColumnDesc().Statistics.MaxValue))
|
|
|
|
t.Logf("timestamp column min: %d", minValue.Int64())
|
|
t.Logf("timestamp column max: %d", maxValue.Int64())
|
|
|
|
t.Logf("timestamp column size: %s", humanize.Bytes(uint64(cols[0].ColumnDesc().UncompressedSize)))
|
|
t.Logf("label column size: %s", humanize.Bytes(uint64(cols[1].ColumnDesc().UncompressedSize)))
|
|
}
|
|
|
|
// Test_Reader_Stats tests that the reader properly tracks statistics via xcap regions.
|
|
func Test_Reader_Stats(t *testing.T) {
|
|
dset, columns := buildTestDataset(t)
|
|
|
|
r := NewReader(ReaderOptions{
|
|
Dataset: dset,
|
|
Columns: columns,
|
|
Predicates: []Predicate{
|
|
GreaterThanPredicate{
|
|
Column: columns[3], // birth_year
|
|
Value: Int64Value(1985),
|
|
},
|
|
EqualPredicate{
|
|
Column: columns[0], // first_name
|
|
Value: BinaryValue([]byte("Alice")),
|
|
},
|
|
},
|
|
})
|
|
defer r.Close()
|
|
|
|
ctx, _ := xcap.NewCapture(context.Background(), nil)
|
|
_, err := readDatasetWithContext(ctx, r, 3)
|
|
require.NoError(t, err)
|
|
|
|
require.NotNil(t, r.region, "region should be available after reading")
|
|
|
|
observations := r.region.Observations()
|
|
obsMap := make(map[string]int64)
|
|
for _, obs := range observations {
|
|
obsMap[obs.Statistic.Name()] = obs.Value.(int64)
|
|
}
|
|
|
|
require.Equal(t, int64(2), obsMap[xcap.StatDatasetReadCalls.Name()])
|
|
require.Equal(t, int64(2), obsMap[xcap.StatDatasetPrimaryColumns.Name()])
|
|
require.Equal(t, int64(2), obsMap[xcap.StatDatasetSecondaryColumns.Name()])
|
|
require.Equal(t, int64(5), obsMap[xcap.StatDatasetPrimaryColumnPages.Name()])
|
|
require.Equal(t, int64(8), obsMap[xcap.StatDatasetSecondaryColumnPages.Name()])
|
|
|
|
require.Equal(t, int64(len(basicReaderTestData)), obsMap[xcap.StatDatasetMaxRows.Name()])
|
|
require.Equal(t, int64(3), obsMap[xcap.StatDatasetRowsAfterPruning.Name()])
|
|
require.Equal(t, int64(3), obsMap[xcap.StatDatasetPrimaryRowsRead.Name()])
|
|
require.Equal(t, int64(1), obsMap[xcap.StatDatasetSecondaryRowsRead.Name()])
|
|
}
|
|
|