package dataset import ( "context" "errors" "fmt" "io" "iter" "math/rand" "slices" "strconv" "testing" "github.com/dustin/go-humanize" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" "github.com/grafana/loki/v3/pkg/dataobj/internal/result" "github.com/grafana/loki/v3/pkg/xcap" ) func Test_Reader_ReadAll(t *testing.T) { dset, columns := buildTestDataset(t) r := NewReader(ReaderOptions{Dataset: dset, Columns: columns}) defer r.Close() actualRows, err := readDataset(r, 3) require.NoError(t, err) require.Equal(t, basicReaderTestData, convertToTestPersons(actualRows)) } func Test_Reader_ReadWithPredicate(t *testing.T) { dset, columns := buildTestDataset(t) // Create a predicate that only returns people born after 1985 r := NewReader(ReaderOptions{ Dataset: dset, Columns: columns, Predicates: []Predicate{ GreaterThanPredicate{ Column: columns[3], // birth_year column Value: Int64Value(1985), }, }, }) defer r.Close() actualRows, err := readDataset(r, 3) require.NoError(t, err) // Filter expected data manually to verify var expected []testPerson for _, p := range basicReaderTestData { if p.birthYear > 1985 { expected = append(expected, p) } } require.Equal(t, expected, convertToTestPersons(actualRows)) } // Test_Reader_ReadWithPageFiltering tests that a Reader can filter rows based // on a predicate that has filtered pages out. func Test_Reader_ReadWithPageFiltering(t *testing.T) { dset, columns := buildTestDataset(t) r := NewReader(ReaderOptions{ Dataset: dset, Columns: columns, // Henry is out of range of most pages except for the first and last, so // other pages would be filtered out of testing. // // TODO(rfratto): make it easier to prove that a predicate includes a value // which is out of range of at least one page. Predicates: []Predicate{ EqualPredicate{ Column: columns[0], // first_name column Value: BinaryValue([]byte("Henry")), }, }, }) defer r.Close() actualRows, err := readDataset(r, 3) require.NoError(t, err) // Filter expected data manually to verify var expected []testPerson for _, p := range basicReaderTestData { if p.firstName == "Henry" { expected = append(expected, p) } } require.Equal(t, expected, convertToTestPersons(actualRows)) } // Test_Reader_ReadWithPageFilteringOnEmptyPredicate tests that a Reader filters rows with empty predicate values. // Filtering for an explicitly empty value also includes Null values and these rows should not be excluded by page skipping. func Test_Reader_ReadWithPageFilteringOnEmptyPredicate(t *testing.T) { // Create builders for each column firstNameBuilder := buildStringColumn(t, "first_name") lastNameBuilder := buildStringColumn(t, "middle_name") // Row with both values. firstNameBuilder.append(0, BinaryValue([]byte("John"))) lastNameBuilder.append(0, BinaryValue([]byte("Doe"))) // Row with null value. firstNameBuilder.append(2, BinaryValue([]byte("Jim"))) lastNameBuilder.append(2, Value{}) firstName, err := firstNameBuilder.Flush() require.NoError(t, err) lastName, err := lastNameBuilder.Flush() require.NoError(t, err) dset := FromMemory([]*MemColumn{firstName, lastName}) cols, err := result.Collect(dset.ListColumns(context.Background())) require.NoError(t, err) r := NewReader(ReaderOptions{ Dataset: dset, Columns: cols, Predicates: []Predicate{ // Imitate predicate from logql: {last_name=""} EqualPredicate{ Column: cols[1], // last_name column Value: BinaryValue([]byte("")), }, }, }) defer r.Close() actualRows, err := readDataset(r, 3) require.NoError(t, err) actualFirstNames := make([]string, 0, len(actualRows)) actualLastNames := make([]string, 0, len(actualRows)) for _, row := range actualRows { if row.Values[0].IsNil() && row.Values[1].IsNil() { continue } actualFirstNames = append(actualFirstNames, string(row.Values[0].Binary())) if !row.Values[1].IsNil() { actualLastNames = append(actualLastNames, string(row.Values[1].Binary())) } else { actualLastNames = append(actualLastNames, "[nil]") } } // Filter expected data manually to verify expectedFirstNames := []string{"Jim"} expectedLastNames := []string{"[nil]"} require.Equal(t, expectedFirstNames, actualFirstNames) require.Equal(t, expectedLastNames, actualLastNames) } func Test_Reader_ReadWithPredicate_NoSecondary(t *testing.T) { dset, columns := buildTestDataset(t) // Create a predicate that only returns people born after 1985 r := NewReader(ReaderOptions{ Dataset: dset, Columns: []Column{columns[3]}, Predicates: []Predicate{ GreaterThanPredicate{ Column: columns[3], // birth_year column Value: Int64Value(1985), }, }, }) defer r.Close() actualRows, err := readDataset(r, 3) require.NoError(t, err) // Filter expected data manually to verify var expected []int for _, p := range basicReaderTestData { if p.birthYear > 1985 { expected = append(expected, int(p.birthYear)) } } var actual []int for _, row := range actualRows { actual = append(actual, int(row.Values[0].Int64())) } require.Equal(t, expected, actual) } func Test_Reader_Reset(t *testing.T) { dset, columns := buildTestDataset(t) r := NewReader(ReaderOptions{Dataset: dset, Columns: columns}) defer r.Close() // First read everything _, err := readDataset(r, 3) require.NoError(t, err) // Reset and read again r.Reset(ReaderOptions{Dataset: dset, Columns: columns}) actualRows, err := readDataset(r, 3) require.NoError(t, err) require.Equal(t, basicReaderTestData, convertToTestPersons(actualRows)) } func Test_buildMask(t *testing.T) { tt := []struct { name string fullRange rowRange rows []Row expect []rowRange }{ { name: "no rows", fullRange: rowRange{1, 10}, rows: nil, expect: []rowRange{{1, 10}}, }, { name: "full coverage", fullRange: rowRange{1, 10}, rows: makeRows(1, 10, 1), expect: nil, }, { name: "full coverage - split", fullRange: rowRange{1, 10}, rows: mergeRows(makeRows(1, 5, 1), makeRows(6, 10, 1)), expect: nil, }, { name: "partial coverage - front", fullRange: rowRange{1, 10}, rows: makeRows(1, 5, 1), expect: []rowRange{{6, 10}}, }, { name: "partial coverage - middle", fullRange: rowRange{1, 10}, rows: makeRows(5, 7, 1), expect: []rowRange{{1, 4}, {8, 10}}, }, { name: "partial coverage - end", fullRange: rowRange{1, 10}, rows: makeRows(6, 10, 1), expect: []rowRange{{1, 5}}, }, { name: "partial coverage - gaps", fullRange: rowRange{1, 10}, rows: []Row{{Index: 3}, {Index: 5}, {Index: 7}, {Index: 9}}, expect: []rowRange{{1, 2}, {4, 4}, {6, 6}, {8, 8}, {10, 10}}, }, } for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { actual := slices.Collect(buildMask(tc.fullRange, tc.rows)) require.Equal(t, tc.expect, actual) }) } } func makeRows(from, to, inc int) []Row { var rows []Row for i := from; i <= to; i += inc { rows = append(rows, Row{Index: i}) } return rows } func mergeRows(rows ...[]Row) []Row { var res []Row for _, r := range rows { res = append(res, r...) } return res } // readDataset reads all rows from a Reader using the given batch size. func readDataset(br *Reader, batchSize int) ([]Row, error) { return readDatasetWithContext(context.Background(), br, batchSize) } // readDatasetWithContext reads all rows from a Reader using the given batch size and context. func readDatasetWithContext(ctx context.Context, br *Reader, batchSize int) ([]Row, error) { var ( all []Row batch = make([]Row, batchSize) ) for { // Clear the batch for each read, to ensure that any memory in Row and // Value doesn't get reused. See comment in implmentation of // [readBasicReader] for more information. clear(batch) n, err := br.Read(ctx, batch) all = append(all, batch[:n]...) if errors.Is(err, io.EOF) { return all, nil } else if err != nil { return all, err } } } func Test_BuildPredicateRanges(t *testing.T) { ds, cols := buildMemDatasetWithStats(t) tt := []struct { name string predicate Predicate want rowRanges }{ { name: "nil predicate returns full range", predicate: nil, want: rowRanges{{Start: 0, End: 999}}, // Full dataset range }, { name: "equal predicate in range", predicate: EqualPredicate{Column: cols[1], Value: Int64Value(50)}, want: rowRanges{{Start: 0, End: 249}}, // Page 1 of Timestamp column }, { name: "equal predicate not in any range", predicate: EqualPredicate{Column: cols[1], Value: Int64Value(1500)}, want: nil, // No ranges should match }, { name: "greater than predicate", predicate: GreaterThanPredicate{Column: cols[1], Value: Int64Value(400)}, want: rowRanges{{Start: 250, End: 749}, {Start: 750, End: 999}}, // Pages 2 and 3 of Timestamp column }, { name: "less than predicate", predicate: LessThanPredicate{Column: cols[1], Value: Int64Value(300)}, want: rowRanges{{Start: 0, End: 249}, {Start: 250, End: 749}}, // Pages 1 and 2 of Timestamp column }, { name: "and predicate", predicate: AndPredicate{ Left: EqualPredicate{Column: cols[0], Value: Int64Value(1)}, // Rows 0 - 299 of stream column Right: LessThanPredicate{Column: cols[1], Value: Int64Value(600)}, // Rows 0 - 249, 250 - 749 of timestamp column }, want: rowRanges{{Start: 0, End: 249}, {Start: 250, End: 299}}, }, { name: "or predicate", predicate: OrPredicate{ Left: EqualPredicate{Column: cols[0], Value: Int64Value(1)}, // Rows 0 - 299 of stream column Right: GreaterThanPredicate{Column: cols[1], Value: Int64Value(800)}, // Rows 750 - 999 of timestamp column }, want: rowRanges{{Start: 0, End: 299}, {Start: 750, End: 999}}, // Rows 0 - 299, 750 - 999 }, { name: "InPredicate with values inside and outside page ranges", predicate: InPredicate{ Column: cols[1], // timestamp column Values: NewInt64ValueSet([]Value{ Int64Value(50), Int64Value(300), Int64Value(150), Int64Value(600), }), // 2 values in range. ~200 matching rows }, want: rowRanges{ {Start: 0, End: 249}, // Page 1: contains 50 {Start: 250, End: 749}, // Page 2: contains 300 }, }, { name: "InPredicate with values all outside page ranges", predicate: InPredicate{ Column: cols[1], // timestamp column Values: NewInt64ValueSet([]Value{ Int64Value(150), // Outside all pages Int64Value(600), // Outside all pages }), }, want: nil, // No pages should be included }, } ctx := context.Background() for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { r := NewReader(ReaderOptions{ Dataset: ds, Columns: cols, Predicates: []Predicate{tc.predicate}, }) defer r.Close() // Initialize downloader require.NoError(t, r.initDownloader(ctx)) got, err := r.buildPredicateRanges(ctx, tc.predicate) require.NoError(t, err) require.Equal(t, tc.want, got, "row ranges should match expected ranges") }) } } // buildMemDatasetWithStats creates a test dataset with only column and page stats. func buildMemDatasetWithStats(t *testing.T) (Dataset, []Column) { t.Helper() dset := FromMemory([]*MemColumn{ { Desc: ColumnDesc{ Tag: "stream", Type: ColumnType{Physical: datasetmd.PHYSICAL_TYPE_INT64, Logical: "number"}, RowsCount: 1000, // 0 - 999 }, Pages: []*MemPage{ { Desc: PageDesc{ RowCount: 300, // 0 - 299 Stats: &datasetmd.Statistics{ MinValue: encodeInt64Value(t, 1), MaxValue: encodeInt64Value(t, 2), }, }, }, { Desc: PageDesc{ RowCount: 700, // 300 - 999 Stats: &datasetmd.Statistics{ MinValue: encodeInt64Value(t, 2), MaxValue: encodeInt64Value(t, 2), }, }, }, }, }, { Desc: ColumnDesc{ Tag: "timestamp", Type: ColumnType{Physical: datasetmd.PHYSICAL_TYPE_INT64, Logical: "number"}, RowsCount: 1000, // 0 - 999 }, Pages: []*MemPage{ { Desc: PageDesc{ RowCount: 250, // 0 - 249 Stats: &datasetmd.Statistics{ MinValue: encodeInt64Value(t, 0), MaxValue: encodeInt64Value(t, 100), }, }, }, { Desc: PageDesc{ RowCount: 500, // 249 - 749 Stats: &datasetmd.Statistics{ MinValue: encodeInt64Value(t, 200), MaxValue: encodeInt64Value(t, 500), }, }, }, { Desc: PageDesc{ RowCount: 250, // 750 - 999 Stats: &datasetmd.Statistics{ MinValue: encodeInt64Value(t, 800), MaxValue: encodeInt64Value(t, 1000), }, }, }, }, }, }) cols, err := result.Collect(dset.ListColumns(context.Background())) require.NoError(t, err) return dset, cols } // Helper function to encode an integer value for statistics func encodeInt64Value(t *testing.T, v int64) []byte { t.Helper() data, err := Int64Value(v).MarshalBinary() require.NoError(t, err) return data } func BenchmarkReader(b *testing.B) { generator := DatasetGenerator{ RowCount: 1_000_000, PageSizeHint: 2 * 1024 * 1024, // 2MB Columns: []generatorColumnConfig{ { Tag: "stream", Type: ColumnType{Physical: datasetmd.PHYSICAL_TYPE_INT64, Logical: "number"}, Encoding: datasetmd.ENCODING_TYPE_DELTA, Compression: datasetmd.COMPRESSION_TYPE_NONE, CardinalityTarget: 1000, }, { Tag: "timestamp", Type: ColumnType{Physical: datasetmd.PHYSICAL_TYPE_INT64, Logical: "number"}, Encoding: datasetmd.ENCODING_TYPE_DELTA, Compression: datasetmd.COMPRESSION_TYPE_NONE, CardinalityTarget: 100_000, }, { Tag: "log", Type: ColumnType{Physical: datasetmd.PHYSICAL_TYPE_BINARY, Logical: "string"}, Encoding: datasetmd.ENCODING_TYPE_PLAIN, Compression: datasetmd.COMPRESSION_TYPE_NONE, AvgSize: 1024, CardinalityTarget: 100_000, }, }, } readPatterns := []struct { name string batchSize int }{ { name: "batch=100", batchSize: 100, }, { name: "batch=10k", batchSize: 10_000, }, } // Generate dataset once per case ds, cols := generator.Build(b, rand.Int63()) opts := ReaderOptions{ Dataset: ds, Columns: cols, } for _, rp := range readPatterns { b.Run(rp.name, func(b *testing.B) { b.ResetTimer() b.ReportAllocs() batch := make([]Row, rp.batchSize) for b.Loop() { reader := NewReader(opts) var rowsRead int for { n, err := reader.Read(context.Background(), batch) if err == io.EOF { break } if err != nil { b.Fatal(err) } rowsRead += n } reader.Close() b.ReportMetric(float64(rowsRead)/float64(b.N), "rows/op") } }) } } func BenchmarkPredicateExecution(b *testing.B) { // Generate dataset with two columns, one with high cardinality and one with low cardinality // higher the cardinality, more selective the predicate generator := DatasetGenerator{ RowCount: 1_000_000, // set large page size to not realise benefits from page pruning since the goal // of this benchmark is to measure the gains from sequential predicate evaluation alone. PageSizeHint: 100 * 1024 * 1024, Columns: []generatorColumnConfig{ { Tag: "more_selective", Type: ColumnType{Physical: datasetmd.PHYSICAL_TYPE_INT64, Logical: "int64"}, Encoding: datasetmd.ENCODING_TYPE_DELTA, Compression: datasetmd.COMPRESSION_TYPE_NONE, CardinalityTarget: 500_000, }, { Tag: "less_selective", Type: ColumnType{Physical: datasetmd.PHYSICAL_TYPE_INT64, Logical: "int64"}, Encoding: datasetmd.ENCODING_TYPE_DELTA, Compression: datasetmd.COMPRESSION_TYPE_NONE, CardinalityTarget: 100, }, }, } ds, cols := generator.Build(b, rand.Int63()) var col1Value, col2Value int64 idx := rand.Intn(generator.RowCount) // Randomly select a row index to use for the predicate values currentPos := 0 batch := make([]Row, 1000) // read the dataset once to pick a random row for predicate generation reader := NewReader(ReaderOptions{ Dataset: ds, Columns: cols, }) for { n, err := reader.Read(context.Background(), batch) if err == io.EOF { break } if err != nil { b.Fatal(err) } // Check if our target index is in this batch if idx >= currentPos && idx < currentPos+n { selectedRow := batch[idx-currentPos] col1Value = selectedRow.Values[0].Int64() col2Value = selectedRow.Values[1].Int64() break } currentPos += n } reader.Close() predicatePatterns := []struct { name string predicates []Predicate }{ { name: "combined", predicates: []Predicate{ AndPredicate{ Left: EqualPredicate{ Column: cols[0], Value: Int64Value(col1Value), }, Right: EqualPredicate{ Column: cols[1], Value: Int64Value(col2Value), }, }, }, }, { name: "high", predicates: []Predicate{ EqualPredicate{ Column: cols[0], Value: Int64Value(col1Value), }, EqualPredicate{ Column: cols[1], Value: Int64Value(col2Value), }, }, }, { name: "low", predicates: []Predicate{ EqualPredicate{ Column: cols[1], Value: Int64Value(col2Value), }, EqualPredicate{ Column: cols[0], Value: Int64Value(col1Value), }, }, }, } for _, pp := range predicatePatterns { b.Run("selectivity="+pp.name, func(b *testing.B) { b.ResetTimer() b.ReportAllocs() for b.Loop() { reader := NewReader(ReaderOptions{ Dataset: ds, Columns: cols, Predicates: pp.predicates, }) batch := make([]Row, 10000) for { _, err := reader.Read(context.Background(), batch) if err == io.EOF { break } if err != nil { b.Fatal(err) } } reader.Close() } }) } } type generatorColumnConfig struct { Tag string Type ColumnType Encoding datasetmd.EncodingType Compression datasetmd.CompressionType AvgSize int64 // Average size in bytes for variable-length types CardinalityTarget int64 // Target number of unique values SparsityRate float64 // 0.0-1.0, where 1.0 means all values are null } func columnValues(rng *rand.Rand, cfg generatorColumnConfig) iter.Seq[Value] { switch cfg.Type.Physical { case datasetmd.PHYSICAL_TYPE_INT64, datasetmd.PHYSICAL_TYPE_UINT64: return numberValues(rng, cfg) case datasetmd.PHYSICAL_TYPE_BINARY: return stringValues(rng, cfg) default: panic(fmt.Sprintf("unsupported type for generation: %v", cfg.Type.Physical)) } } func stringValues(rng *rand.Rand, cfg generatorColumnConfig) iter.Seq[Value] { // Pre-generate the set of unique values we'll cycle through uniqueValues := make([]Value, cfg.CardinalityTarget) for i := range int(cfg.CardinalityTarget) { // Generate size between 0.5x and 1.5x of average size size := int(float64(cfg.AvgSize) * (0.5 + rng.Float64())) // Convert number to string and create padded result str := make([]byte, size) num := []byte(strconv.Itoa(i)) copy(str, num) for j := len(num); j < size; j++ { str[j] = 'x' } uniqueValues[i] = BinaryValue(str) } return func(yield func(Value) bool) { for { if !yield(uniqueValues[rng.Intn(len(uniqueValues))]) { return } } } } func numberValues(rng *rand.Rand, cfg generatorColumnConfig) iter.Seq[Value] { return func(yield func(Value) bool) { for { v := rng.Int63n(cfg.CardinalityTarget) switch cfg.Type.Physical { case datasetmd.PHYSICAL_TYPE_INT64: if !yield(Int64Value(v)) { return } case datasetmd.PHYSICAL_TYPE_UINT64: if !yield(Uint64Value(uint64(v))) { return } } } } } type DatasetGenerator struct { RowCount int PageSizeHint int Columns []generatorColumnConfig } func (g *DatasetGenerator) Build(t testing.TB, seed int64) (Dataset, []Column) { t.Helper() memColumns := make([]*MemColumn, 0, len(g.Columns)) rng := rand.New(rand.NewSource(seed)) for _, colCfg := range g.Columns { next, stop := iter.Pull(columnValues(rng, colCfg)) defer stop() opts := BuilderOptions{ PageSizeHint: g.PageSizeHint, Type: colCfg.Type, Encoding: colCfg.Encoding, Compression: colCfg.Compression, Statistics: StatisticsOptions{ StoreCardinalityStats: true, }, } if colCfg.Type.Physical == datasetmd.PHYSICAL_TYPE_INT64 || colCfg.Type.Physical == datasetmd.PHYSICAL_TYPE_UINT64 { opts.Statistics.StoreRangeStats = true } // Create a builder for this column builder, err := NewColumnBuilder(colCfg.Tag, opts) require.NoError(t, err) // Add values to the builder for i := range g.RowCount { if rng.Float64() < colCfg.SparsityRate { continue } val, ok := next() require.True(t, ok, "generator should yield values") require.NoError(t, builder.Append(i, val)) } col, err := builder.Flush() require.NoError(t, err) memColumns = append(memColumns, col) } ds := FromMemory(memColumns) cols, err := result.Collect(ds.ListColumns(context.Background())) require.NoError(t, err) return ds, cols } // Test_DatasetGenerator is a helper to debug the dataset generation func Test_DatasetGenerator(t *testing.T) { g := DatasetGenerator{ RowCount: 1_000_000, PageSizeHint: 2 * 1024 * 1024, // 2MB Columns: []generatorColumnConfig{ { Tag: "timestamp", Type: ColumnType{Physical: datasetmd.PHYSICAL_TYPE_INT64, Logical: "timestamp"}, Encoding: datasetmd.ENCODING_TYPE_DELTA, Compression: datasetmd.COMPRESSION_TYPE_NONE, CardinalityTarget: 100_000, SparsityRate: 0.0, }, { Tag: "label", Type: ColumnType{Physical: datasetmd.PHYSICAL_TYPE_BINARY, Logical: "label"}, Encoding: datasetmd.ENCODING_TYPE_PLAIN, Compression: datasetmd.COMPRESSION_TYPE_NONE, AvgSize: 32, CardinalityTarget: 100, SparsityRate: 0.3, }, }, } _, cols := g.Build(t, rand.Int63()) require.Equal(t, 2, len(cols)) require.Equal(t, g.RowCount, cols[0].ColumnDesc().RowsCount) // TODO: Row count is < expected. Must be a result of null values at the end. // Remove this comment once the issue is fixed. // require.Equal(t, g.RowCount, cols[1].ColumnInfo().RowsCount) require.NotNil(t, cols[0].ColumnDesc().Statistics.CardinalityCount) require.NotNil(t, cols[1].ColumnDesc().Statistics.CardinalityCount) t.Logf("timestamp column cardinality: %d", cols[0].ColumnDesc().Statistics.CardinalityCount) t.Logf("label column cardinality: %d", cols[1].ColumnDesc().Statistics.CardinalityCount) require.NotNil(t, cols[0].ColumnDesc().Statistics.MinValue) require.NotNil(t, cols[0].ColumnDesc().Statistics.MaxValue) var minValue, maxValue Value require.NoError(t, minValue.UnmarshalBinary(cols[0].ColumnDesc().Statistics.MinValue)) require.NoError(t, maxValue.UnmarshalBinary(cols[0].ColumnDesc().Statistics.MaxValue)) t.Logf("timestamp column min: %d", minValue.Int64()) t.Logf("timestamp column max: %d", maxValue.Int64()) t.Logf("timestamp column size: %s", humanize.Bytes(uint64(cols[0].ColumnDesc().UncompressedSize))) t.Logf("label column size: %s", humanize.Bytes(uint64(cols[1].ColumnDesc().UncompressedSize))) } // Test_Reader_Stats tests that the reader properly tracks statistics via xcap regions. func Test_Reader_Stats(t *testing.T) { dset, columns := buildTestDataset(t) r := NewReader(ReaderOptions{ Dataset: dset, Columns: columns, Predicates: []Predicate{ GreaterThanPredicate{ Column: columns[3], // birth_year Value: Int64Value(1985), }, EqualPredicate{ Column: columns[0], // first_name Value: BinaryValue([]byte("Alice")), }, }, }) defer r.Close() ctx, _ := xcap.NewCapture(context.Background(), nil) _, err := readDatasetWithContext(ctx, r, 3) require.NoError(t, err) require.NotNil(t, r.region, "region should be available after reading") observations := r.region.Observations() obsMap := make(map[string]int64) for _, obs := range observations { obsMap[obs.Statistic.Name()] = obs.Value.(int64) } require.Equal(t, int64(2), obsMap[xcap.StatDatasetReadCalls.Name()]) require.Equal(t, int64(2), obsMap[xcap.StatDatasetPrimaryColumns.Name()]) require.Equal(t, int64(2), obsMap[xcap.StatDatasetSecondaryColumns.Name()]) require.Equal(t, int64(5), obsMap[xcap.StatDatasetPrimaryColumnPages.Name()]) require.Equal(t, int64(8), obsMap[xcap.StatDatasetSecondaryColumnPages.Name()]) require.Equal(t, int64(len(basicReaderTestData)), obsMap[xcap.StatDatasetMaxRows.Name()]) require.Equal(t, int64(3), obsMap[xcap.StatDatasetRowsAfterPruning.Name()]) require.Equal(t, int64(3), obsMap[xcap.StatDatasetPrimaryRowsRead.Name()]) require.Equal(t, int64(1), obsMap[xcap.StatDatasetSecondaryRowsRead.Name()]) }