From 5203a0c9adf1efcd69e9ae7cd5cdbdd0fc78e9d1 Mon Sep 17 00:00:00 2001 From: Ashwanth Date: Thu, 14 Aug 2025 18:29:02 +0530 Subject: [PATCH] chore(dataobj): add reader stats (#18694) --- pkg/dataobj/internal/dataset/column.go | 1 + .../internal/dataset/column_builder.go | 1 + pkg/dataobj/internal/dataset/column_reader.go | 5 - pkg/dataobj/internal/dataset/read_stats.go | 282 ++++++++++++++++++ pkg/dataobj/internal/dataset/reader.go | 40 +-- .../internal/dataset/reader_downloader.go | 22 +- pkg/dataobj/internal/dataset/reader_test.go | 43 --- pkg/dataobj/internal/dataset/row_ranges.go | 8 + .../metadata/datasetmd/datasetmd.pb.go | 148 +++++---- .../metadata/datasetmd/datasetmd.proto | 3 + pkg/dataobj/querier/store_test.go | 64 ++-- pkg/dataobj/sections/indexpointers/dataset.go | 1 + pkg/dataobj/sections/indexpointers/encoder.go | 1 + pkg/dataobj/sections/logs/dataset.go | 1 + pkg/dataobj/sections/logs/decoder.go | 14 + pkg/dataobj/sections/logs/encoder.go | 1 + pkg/dataobj/sections/logs/reader.go | 16 +- pkg/dataobj/sections/logs/reader_test.go | 81 +++++ pkg/dataobj/sections/pointers/dataset.go | 1 + pkg/dataobj/sections/pointers/encoder.go | 1 + pkg/dataobj/sections/streams/dataset.go | 1 + pkg/dataobj/sections/streams/decoder.go | 14 + pkg/dataobj/sections/streams/encoder.go | 1 + pkg/engine/executor/dataobjscan.go | 15 +- pkg/engine/executor/dataobjscan_test.go | 15 +- pkg/engine/executor/executor.go | 2 +- pkg/logqlmodel/stats/context.go | 5 - 27 files changed, 613 insertions(+), 174 deletions(-) create mode 100644 pkg/dataobj/internal/dataset/read_stats.go diff --git a/pkg/dataobj/internal/dataset/column.go b/pkg/dataobj/internal/dataset/column.go index 717bd02c20..3299f45960 100644 --- a/pkg/dataobj/internal/dataset/column.go +++ b/pkg/dataobj/internal/dataset/column.go @@ -15,6 +15,7 @@ type ( Type datasetmd.ValueType // Type of values in the column. Compression datasetmd.CompressionType // Compression used for the column. + PagesCount int // Total number of pages in the column. RowsCount int // Total number of rows in the column. ValuesCount int // Total number of non-NULL values in the column. CompressedSize int // Total size of all pages in the column after compression. diff --git a/pkg/dataobj/internal/dataset/column_builder.go b/pkg/dataobj/internal/dataset/column_builder.go index b6cd10825f..eeaa85cfb0 100644 --- a/pkg/dataobj/internal/dataset/column_builder.go +++ b/pkg/dataobj/internal/dataset/column_builder.go @@ -176,6 +176,7 @@ func (cb *ColumnBuilder) Flush() (*MemColumn, error) { Name: cb.name, Type: cb.opts.Value, + PagesCount: len(cb.pages), Compression: cb.opts.Compression, Statistics: cb.statsBuilder.Flush(cb.pages), } diff --git a/pkg/dataobj/internal/dataset/column_reader.go b/pkg/dataobj/internal/dataset/column_reader.go index 383f8d56e8..5bb5ebec36 100644 --- a/pkg/dataobj/internal/dataset/column_reader.go +++ b/pkg/dataobj/internal/dataset/column_reader.go @@ -8,7 +8,6 @@ import ( "sort" "github.com/grafana/loki/v3/pkg/dataobj/internal/util/sliceclear" - "github.com/grafana/loki/v3/pkg/logqlmodel/stats" ) type columnReader struct { @@ -42,7 +41,6 @@ func (cr *columnReader) Read(ctx context.Context, v []Value) (n int, err error) return 0, err } } - statistics := stats.FromContext(ctx) for n < len(v) { // Make sure our reader is initialized to the right page for the row we @@ -54,9 +52,6 @@ func (cr *columnReader) Read(ctx context.Context, v []Value) (n int, err error) if err != nil { return n, err } - if pageIndex != cr.pageIndex { - statistics.AddPagesScanned(1) - } switch cr.reader { case nil: diff --git a/pkg/dataobj/internal/dataset/read_stats.go b/pkg/dataobj/internal/dataset/read_stats.go new file mode 100644 index 0000000000..74710d5101 --- /dev/null +++ b/pkg/dataobj/internal/dataset/read_stats.go @@ -0,0 +1,282 @@ +package dataset + +import ( + "context" + "time" + + "github.com/dustin/go-humanize" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + + "github.com/grafana/loki/v3/pkg/logqlmodel/stats" +) + +// DownloadStats holds statistics about the download operations performed by the dataset reader. +type DownloadStats struct { + // Use the following three stats together. + // 1. PagesScanned is the total number of ReadPage calls made to the downloader. + // 2. PagesFoundInCache is the number of pages that were found in cache and + // did not require a download. + // 3. BatchDownloadRequests is the number of batch download requests made by + // the downloader when a page is not found in cache. + PagesScanned uint64 + PagesFoundInCache uint64 + BatchDownloadRequests uint64 + + PrimaryColumnPages uint64 // Number of pages downloaded for primary columns + SecondaryColumnPages uint64 // Number of pages downloaded for secondary columns + PrimaryColumnBytes uint64 // Total bytes downloaded for primary columns + SecondaryColumnBytes uint64 // Total bytes downloaded for secondary columns + PrimaryColumnUncompressedBytes uint64 // Total uncompressed bytes for primary columns + SecondaryColumnUncompressedBytes uint64 // Total uncompressed bytes for secondary columns +} + +func (ds *DownloadStats) Reset() { + ds.PagesScanned = 0 + ds.PagesFoundInCache = 0 + ds.BatchDownloadRequests = 0 + + ds.PrimaryColumnPages = 0 + ds.SecondaryColumnPages = 0 + ds.PrimaryColumnBytes = 0 + ds.SecondaryColumnBytes = 0 + ds.PrimaryColumnUncompressedBytes = 0 + ds.SecondaryColumnUncompressedBytes = 0 +} + +// ReaderStats tracks statistics about dataset read operations. +type ReaderStats struct { + // TODO(ashwanth): global stats from [stats.Context] can be updated by the + // engine at the end of the query execution once we have a stats collection + // framework integrated with the execution pipeline. + // Below reference is a temporary stop gap. + // + // Reference to [stats.Context] to update relevant global statistics. + globalStats *stats.Context + + // Column statistics + PrimaryColumns uint64 // Number of primary columns to read from the dataset + SecondaryColumns uint64 // Number of secondary columns to read from the dataset + + // Page statistics + PrimaryColumnPages uint64 // Total pages in primary columns + SecondaryColumnPages uint64 // Total pages in secondary columns + DownloadStats DownloadStats // Download statistics for primary and secondary columns + + ReadCalls int64 // Total number of read calls made to the reader + + // Row statistics + MaxRows uint64 // Maximum number of rows across all columns + RowsToReadAfterPruning uint64 // Total number of primary rows to read after page pruning + + PrimaryRowsRead uint64 // Actual number of primary rows read. + SecondaryRowsRead uint64 // Actual number of secondary rows read. + + PrimaryRowBytes uint64 // Total bytes read for primary rows + SecondaryRowBytes uint64 // Total bytes read for secondary rows +} + +type ctxKeyType string + +const ( + readerStatsKey ctxKeyType = "reader_stats" +) + +func WithStats(ctx context.Context, stats *ReaderStats) context.Context { + if stats == nil { + return ctx + } + + return context.WithValue(ctx, readerStatsKey, stats) +} + +// StatsFromContext returns the reader stats from context. +func StatsFromContext(ctx context.Context) *ReaderStats { + v, ok := ctx.Value(readerStatsKey).(*ReaderStats) + if !ok { + return &ReaderStats{} + } + return v +} + +func (s *ReaderStats) LinkGlobalStats(stats *stats.Context) { + // If the global stats are already set, we don't override it. + if s.globalStats == nil { + s.globalStats = stats + } +} + +func (s *ReaderStats) AddReadCalls(count int) { + s.ReadCalls += int64(count) +} + +func (s *ReaderStats) AddPrimaryColumns(count uint64) { + s.PrimaryColumns += count +} + +func (s *ReaderStats) AddSecondaryColumns(count uint64) { + s.SecondaryColumns += count +} + +func (s *ReaderStats) AddPrimaryColumnPages(count uint64) { + s.PrimaryColumnPages += count +} + +func (s *ReaderStats) AddSecondaryColumnPages(count uint64) { + s.SecondaryColumnPages += count +} + +func (s *ReaderStats) AddPrimaryRowsRead(count uint64) { + s.PrimaryRowsRead += count + if s.globalStats != nil { + s.globalStats.AddPrePredicateDecompressedRows(int64(count)) + } +} + +func (s *ReaderStats) AddSecondaryRowsRead(count uint64) { + s.SecondaryRowsRead += count + if s.globalStats != nil { + s.globalStats.AddPostPredicateRows(int64(count)) + } +} + +func (s *ReaderStats) AddPrimaryRowBytes(count uint64) { + s.PrimaryRowBytes += count + if s.globalStats != nil { + s.globalStats.AddPrePredicateDecompressedBytes(int64(count)) + } +} + +func (s *ReaderStats) AddSecondaryRowBytes(count uint64) { + s.SecondaryRowBytes += count + if s.globalStats != nil { + s.globalStats.AddPostPredicateDecompressedBytes(int64(count)) + } +} + +func (s *ReaderStats) AddMaxRows(count uint64) { + s.MaxRows += count +} + +func (s *ReaderStats) AddRowsToReadAfterPruning(count uint64) { + s.RowsToReadAfterPruning += count +} + +func (s *ReaderStats) AddTotalRowsAvailable(count int64) { + s.MaxRows += uint64(count) + if s.globalStats != nil { + s.globalStats.AddTotalRowsAvailable(count) + } +} + +func (s *ReaderStats) AddPagesScanned(count uint64) { + s.DownloadStats.PagesScanned += count + if s.globalStats != nil { + s.globalStats.AddPagesScanned(int64(count)) + } +} + +func (s *ReaderStats) AddPagesFoundInCache(count uint64) { + s.DownloadStats.PagesFoundInCache += count +} + +func (s *ReaderStats) AddBatchDownloadRequests(count uint64) { + s.DownloadStats.BatchDownloadRequests += count + + if s.globalStats != nil { + s.globalStats.AddPageBatches(int64(count)) + } +} + +func (s *ReaderStats) AddPrimaryColumnPagesDownloaded(count uint64) { + s.DownloadStats.PrimaryColumnPages += count + if s.globalStats != nil { + s.globalStats.AddPagesDownloaded(int64(count)) + + } +} + +func (s *ReaderStats) AddSecondaryColumnPagesDownloaded(count uint64) { + s.DownloadStats.SecondaryColumnPages += count + if s.globalStats != nil { + s.globalStats.AddPagesDownloaded(int64(count)) + + } +} + +func (s *ReaderStats) AddPrimaryColumnBytesDownloaded(bytes uint64) { + s.DownloadStats.PrimaryColumnBytes += bytes + if s.globalStats != nil { + s.globalStats.AddPagesDownloadedBytes(int64(bytes)) + } +} + +func (s *ReaderStats) AddSecondaryColumnBytesDownloaded(bytes uint64) { + s.DownloadStats.SecondaryColumnBytes += bytes + if s.globalStats != nil { + s.globalStats.AddPagesDownloadedBytes(int64(bytes)) + } +} + +func (s *ReaderStats) AddPrimaryColumnUncompressedBytes(count uint64) { + s.DownloadStats.PrimaryColumnUncompressedBytes += count +} + +func (s *ReaderStats) AddSecondaryColumnUncompressedBytes(count uint64) { + s.DownloadStats.SecondaryColumnUncompressedBytes += count +} + +func (s *ReaderStats) Reset() { + s.PrimaryColumns = 0 + s.SecondaryColumns = 0 + + s.PrimaryColumnPages = 0 + s.SecondaryColumnPages = 0 + s.DownloadStats.Reset() + + s.ReadCalls = 0 + + s.MaxRows = 0 + s.RowsToReadAfterPruning = 0 + + s.PrimaryRowsRead = 0 + s.SecondaryRowsRead = 0 + + s.PrimaryRowBytes = 0 + s.SecondaryRowBytes = 0 + + s.globalStats = nil // Reset the global stats reference +} + +// LogSummary logs a summary of the read statistics to the provided logger. +func (s *ReaderStats) LogSummary(logger log.Logger, execDuration time.Duration) { + logValues := make([]any, 0, 50) + logValues = append(logValues, "msg", "dataset reader stats", + "execution_duration", execDuration, + "read_calls", s.ReadCalls, + "max_rows", s.MaxRows, + "rows_to_read_after_pruning", s.RowsToReadAfterPruning, + "primary_column_rows_read", s.PrimaryRowsRead, + "secondary_column_rows_read", s.SecondaryRowsRead, + "primary_column_bytes_read", humanize.Bytes(s.PrimaryRowBytes), + "secondary_column_bytes_read", humanize.Bytes(s.SecondaryRowBytes), + + "primary_columns", s.PrimaryColumns, + "secondary_columns", s.SecondaryColumns, + "primary_column_pages", s.PrimaryColumnPages, + "secondary_column_pages", s.SecondaryColumnPages, + + "total_pages_read", s.DownloadStats.PagesScanned, + "pages_found_in_cache", s.DownloadStats.PagesFoundInCache, + "batch_download_requests", s.DownloadStats.BatchDownloadRequests, + + "primary_pages_downloaded", s.DownloadStats.PrimaryColumnPages, + "secondary_pages_downloaded", s.DownloadStats.SecondaryColumnPages, + "primary_page_bytes_downloaded", humanize.Bytes(s.DownloadStats.PrimaryColumnBytes), + "secondary_page_bytes_downloaded", humanize.Bytes(s.DownloadStats.SecondaryColumnBytes), + "primary_page_uncompressed_bytes", humanize.Bytes(s.DownloadStats.PrimaryColumnUncompressedBytes), + "secondary_page_uncompressed_bytes", humanize.Bytes(s.DownloadStats.SecondaryColumnUncompressedBytes), + ) + + level.Debug(logger).Log(logValues...) +} diff --git a/pkg/dataobj/internal/dataset/reader.go b/pkg/dataobj/internal/dataset/reader.go index 61582883a8..e3e7e5fe57 100644 --- a/pkg/dataobj/internal/dataset/reader.go +++ b/pkg/dataobj/internal/dataset/reader.go @@ -10,7 +10,6 @@ import ( "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" "github.com/grafana/loki/v3/pkg/dataobj/internal/util/bitmask" "github.com/grafana/loki/v3/pkg/dataobj/internal/util/sliceclear" - "github.com/grafana/loki/v3/pkg/logqlmodel/stats" ) // ReaderOptions configures how a [Reader] will read [Row]s. @@ -53,6 +52,7 @@ type Reader struct { row int64 // The current row being read. inner *basicReader // Underlying reader that reads from columns. ranges rowRanges // Valid ranges to read across the entire dataset. + stats ReaderStats // Stats about the read operation. } // NewReader creates a new Reader from the provided options. @@ -66,16 +66,12 @@ func NewReader(opts ReaderOptions) *Reader { // returns the number of rows read and any error encountered. At the end of the // Dataset, Read returns 0, [io.EOF]. func (r *Reader) Read(ctx context.Context, s []Row) (n int, err error) { + stats := StatsFromContext(ctx) + stats.AddReadCalls(1) + if len(s) == 0 { return 0, nil } - // Init stats object and use the context, otherwise we create a new one every time we increment a stat. - var statistics *stats.Context - if stats.IsPresent(ctx) { - statistics = stats.FromContext(ctx) - } else { - statistics, ctx = stats.NewContext(ctx) - } if !r.ready { err := r.init(ctx) @@ -150,19 +146,18 @@ func (r *Reader) Read(ctx context.Context, s []Row) (n int, err error) { rowsRead = count passCount = count - statistics.AddPrePredicateDecompressedRows(int64(rowsRead)) + stats.AddPrimaryRowsRead(uint64(rowsRead)) var primaryColumnBytes int64 for i := range count { primaryColumnBytes += s[i].Size() } - statistics.AddPrePredicateDecompressedBytes(primaryColumnBytes) + stats.AddPrimaryRowBytes(uint64(primaryColumnBytes)) } else { - rowsRead, passCount, err = r.readAndFilterPrimaryColumns(ctx, readSize, s[:readSize], statistics) + rowsRead, passCount, err = r.readAndFilterPrimaryColumns(ctx, readSize, s[:readSize], stats) if err != nil { return n, err } } - statistics.AddPostPredicateRows(int64(passCount)) if secondary := r.dl.SecondaryColumns(); len(secondary) > 0 && passCount > 0 { // Mask out any ranges that aren't in s[:passCount], so that filling in @@ -187,7 +182,9 @@ func (r *Reader) Read(ctx context.Context, s []Row) (n int, err error) { for i := range count { totalBytesFilled += s[i].Size() - s[i].SizeOfColumns(r.primaryColumnIndexes) } - statistics.AddPostPredicateDecompressedBytes(totalBytesFilled) + + stats.AddSecondaryRowsRead(uint64(count)) + stats.AddSecondaryRowBytes(uint64(totalBytesFilled)) } n += passCount @@ -206,7 +203,7 @@ func (r *Reader) Read(ctx context.Context, s []Row) (n int, err error) { // the columns on the reduced row range. // // It returns the max rows read, rows that passed all the predicates, and any error -func (r *Reader) readAndFilterPrimaryColumns(ctx context.Context, readSize int, s []Row, stats *stats.Context) (int, int, error) { +func (r *Reader) readAndFilterPrimaryColumns(ctx context.Context, readSize int, s []Row, stats *ReaderStats) (int, int, error) { var ( rowsRead int // tracks max rows accessed to move the [r.row] cursor passCount int // number of rows that passed the predicate @@ -274,8 +271,8 @@ func (r *Reader) readAndFilterPrimaryColumns(ctx context.Context, readSize int, readSize = passCount } - stats.AddPrePredicateDecompressedRows(int64(rowsRead)) - stats.AddPrePredicateDecompressedBytes(primaryColumnBytes) + stats.AddPrimaryRowsRead(uint64(rowsRead)) + stats.AddPrimaryRowBytes(uint64(primaryColumnBytes)) return rowsRead, passCount, nil } @@ -516,12 +513,18 @@ func (r *Reader) initDownloader(ctx context.Context) error { mask := bitmask.New(len(r.opts.Columns)) r.fillPrimaryMask(mask) + stats := StatsFromContext(ctx) for i, column := range r.opts.Columns { primary := mask.Test(i) r.dl.AddColumn(column, primary) if primary { r.primaryColumnIndexes = append(r.primaryColumnIndexes, i) + stats.AddPrimaryColumns(1) + stats.AddPrimaryColumnPages(uint64(column.ColumnInfo().PagesCount)) + } else { + stats.AddSecondaryColumns(1) + stats.AddSecondaryColumnPages(uint64(column.ColumnInfo().PagesCount)) } } @@ -554,8 +557,9 @@ func (r *Reader) initDownloader(ctx context.Context) error { for _, column := range r.dl.AllColumns() { rowsCount = max(rowsCount, uint64(column.ColumnInfo().RowsCount)) } - statistics := stats.FromContext(ctx) - statistics.AddTotalRowsAvailable(int64(rowsCount)) + + stats.AddTotalRowsAvailable(int64(rowsCount)) + stats.AddRowsToReadAfterPruning(ranges.TotalRowCount()) return nil } diff --git a/pkg/dataobj/internal/dataset/reader_downloader.go b/pkg/dataobj/internal/dataset/reader_downloader.go index 21ff050f21..6e0a8e7f06 100644 --- a/pkg/dataobj/internal/dataset/reader_downloader.go +++ b/pkg/dataobj/internal/dataset/reader_downloader.go @@ -5,7 +5,6 @@ import ( "github.com/grafana/loki/v3/pkg/dataobj/internal/result" "github.com/grafana/loki/v3/pkg/dataobj/internal/util/sliceclear" - "github.com/grafana/loki/v3/pkg/logqlmodel/stats" ) // readerDownloader is a utility for downloading pages in bulk from a @@ -216,6 +215,19 @@ func (dl *readerDownloader) downloadBatch(ctx context.Context, requestor *reader return err } + stats := StatsFromContext(ctx) + for _, page := range batch { + if page.column.primary { + stats.AddPrimaryColumnPagesDownloaded(1) + stats.AddPrimaryColumnBytesDownloaded(uint64(page.inner.PageInfo().CompressedSize)) + stats.AddPrimaryColumnUncompressedBytes(uint64(page.inner.PageInfo().UncompressedSize)) + } else { + stats.AddSecondaryColumnPagesDownloaded(1) + stats.AddSecondaryColumnBytesDownloaded(uint64(page.inner.PageInfo().CompressedSize)) + stats.AddSecondaryColumnUncompressedBytes(uint64(page.inner.PageInfo().UncompressedSize)) + } + } + // Build the set of inner pages that will be passed to the inner Dataset for // downloading. innerPages := make([]Page, len(batch)) @@ -325,10 +337,6 @@ func (dl *readerDownloader) buildDownloadBatch(ctx context.Context, requestor *r batchSize += pageSize } - statistics := stats.FromContext(ctx) - statistics.AddPageBatches(1) - statistics.AddPagesDownloaded(int64(len(pageBatch))) - statistics.AddPagesDownloadedBytes(int64(batchSize)) return pageBatch, nil } @@ -580,10 +588,14 @@ func (page *readerPage) PageInfo() *PageInfo { } func (page *readerPage) ReadPage(ctx context.Context) (PageData, error) { + stats := StatsFromContext(ctx) + stats.AddPagesScanned(1) if page.data != nil { + stats.AddPagesFoundInCache(1) return page.data, nil } + stats.AddBatchDownloadRequests(1) if err := page.column.dl.downloadBatch(ctx, page); err != nil { return nil, err } diff --git a/pkg/dataobj/internal/dataset/reader_test.go b/pkg/dataobj/internal/dataset/reader_test.go index c630750f0c..277cbc3acd 100644 --- a/pkg/dataobj/internal/dataset/reader_test.go +++ b/pkg/dataobj/internal/dataset/reader_test.go @@ -16,7 +16,6 @@ import ( "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" "github.com/grafana/loki/v3/pkg/dataobj/internal/result" - "github.com/grafana/loki/v3/pkg/logqlmodel/stats" ) func Test_Reader_ReadAll(t *testing.T) { @@ -145,48 +144,6 @@ func Test_Reader_Reset(t *testing.T) { require.Equal(t, basicReaderTestData, convertToTestPersons(actualRows)) } -func Test_Reader_Stats(t *testing.T) { - dset, columns := buildTestDataset(t) - - // Create a predicate that only returns people born after 1985 - r := NewReader(ReaderOptions{ - Dataset: dset, - Columns: columns, - Predicates: []Predicate{GreaterThanPredicate{ - Column: columns[3], // birth_year column - Value: Int64Value(1985), - }}, - }) - defer r.Close() - - statsCtx, ctx := stats.NewContext(context.Background()) - actualRows, err := readDatasetWithContext(ctx, r, 3) - require.NoError(t, err) - - // Filter expected data manually to verify - var expected []testPerson - for _, p := range basicReaderTestData { - if p.birthYear > 1985 { - expected = append(expected, p) - } - } - require.Equal(t, expected, convertToTestPersons(actualRows)) - - primaryColumnBytes := int64(Int64Value(0).Size()) * int64(len(basicReaderTestData)) // Size of Int64Value * all rows - var totalBytestoFill int64 - for _, row := range actualRows { - totalBytestoFill += row.Size() - } - totalBytestoFill -= int64(Int64Value(0).Size()) * int64(len(expected)) // remove already filled primary columns - - // verify statistics - result := statsCtx.Result(0, 0, len(actualRows)) - require.Equal(t, int64(len(basicReaderTestData)), result.Querier.Store.Dataobj.PrePredicateDecompressedRows) - require.Equal(t, int64(len(expected)), result.Querier.Store.Dataobj.PostPredicateRows) - require.Equal(t, primaryColumnBytes, result.Querier.Store.Dataobj.PrePredicateDecompressedBytes) - require.Equal(t, totalBytestoFill, result.Querier.Store.Dataobj.PostPredicateDecompressedBytes) -} - func Test_buildMask(t *testing.T) { tt := []struct { name string diff --git a/pkg/dataobj/internal/dataset/row_ranges.go b/pkg/dataobj/internal/dataset/row_ranges.go index e2c0d1422a..f2c665a26b 100644 --- a/pkg/dataobj/internal/dataset/row_ranges.go +++ b/pkg/dataobj/internal/dataset/row_ranges.go @@ -160,6 +160,14 @@ func (rr *rowRanges) Next(row uint64) (uint64, bool) { return nextRow, true } +// TotalRowCount returns the total number of rows in the set. +func (rr *rowRanges) TotalRowCount() (count uint64) { + for _, r := range *rr { + count += r.End - r.Start + 1 + } + return +} + // intersectRanges appends the intersection of two sets of ranges into dst, // returning the result. // diff --git a/pkg/dataobj/internal/metadata/datasetmd/datasetmd.pb.go b/pkg/dataobj/internal/metadata/datasetmd/datasetmd.pb.go index 8bae8b727b..dc7ed13471 100644 --- a/pkg/dataobj/internal/metadata/datasetmd/datasetmd.pb.go +++ b/pkg/dataobj/internal/metadata/datasetmd/datasetmd.pb.go @@ -181,6 +181,8 @@ type ColumnInfo struct { Statistics *Statistics `protobuf:"bytes,9,opt,name=statistics,proto3" json:"statistics,omitempty"` // Total number of non-NULL values in the entire column. ValuesCount uint64 `protobuf:"varint,10,opt,name=values_count,json=valuesCount,proto3" json:"values_count,omitempty"` + // Total number of pages in the column. + PagesCount uint64 `protobuf:"varint,11,opt,name=pages_count,json=pagesCount,proto3" json:"pages_count,omitempty"` } func (m *ColumnInfo) Reset() { *m = ColumnInfo{} } @@ -285,6 +287,13 @@ func (m *ColumnInfo) GetValuesCount() uint64 { return 0 } +func (m *ColumnInfo) GetPagesCount() uint64 { + if m != nil { + return m.PagesCount + } + return 0 +} + // Statistics about a column or a page. All statistics are optional and are // conditionally set depending on the column type. type Statistics struct { @@ -601,61 +610,62 @@ func init() { } var fileDescriptor_7ab9d5b21b743868 = []byte{ - // 860 bytes of a gzipped FileDescriptorProto + // 872 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x55, 0x4d, 0x73, 0xda, 0x46, 0x18, 0x66, 0x81, 0xa4, 0xe8, 0x05, 0xdb, 0xf2, 0xd6, 0x6e, 0x64, 0x3b, 0x51, 0xa9, 0x3b, 0xd3, 0x50, 0xd2, 0x81, 0x29, 0xee, 0x34, 0x67, 0x0c, 0x6a, 0xaa, 0x19, 0x47, 0x30, 0x92, 0x92, 0x19, - 0xfb, 0xa2, 0x91, 0x85, 0x4c, 0xd5, 0xa0, 0x5d, 0x46, 0x5a, 0x5c, 0x93, 0x53, 0x4f, 0x39, 0xf7, - 0x67, 0xf4, 0xa7, 0xf4, 0xe8, 0x63, 0x8e, 0x35, 0x9e, 0xe9, 0xe4, 0x98, 0x9f, 0xd0, 0xd1, 0x0a, - 0x81, 0xf8, 0x28, 0x93, 0x43, 0x6f, 0xbb, 0xef, 0xf3, 0xbc, 0x1f, 0xec, 0xf3, 0xf0, 0x0a, 0x9e, - 0x0f, 0xdf, 0xf4, 0xeb, 0x3d, 0x9b, 0xd9, 0xf4, 0xf2, 0xd7, 0xba, 0x47, 0x98, 0x1b, 0x10, 0x7b, - 0x50, 0xf7, 0x5d, 0x66, 0x47, 0x41, 0x8e, 0x84, 0x2e, 0xf3, 0x7b, 0xf3, 0x53, 0x6d, 0x18, 0x50, - 0x46, 0xf1, 0xd1, 0x34, 0xa9, 0x96, 0x70, 0x6b, 0x53, 0x46, 0xed, 0xfa, 0xfb, 0xe3, 0x7f, 0x72, - 0x00, 0x2d, 0x3a, 0x18, 0xf9, 0x44, 0x25, 0x57, 0x14, 0x63, 0xc8, 0x13, 0xdb, 0x77, 0x25, 0x54, - 0x46, 0x15, 0x41, 0xe7, 0x67, 0xac, 0x00, 0x5c, 0xdb, 0x83, 0x91, 0x6b, 0xb1, 0xf1, 0xd0, 0x95, - 0xb2, 0x65, 0x54, 0xd9, 0x6e, 0x7c, 0x53, 0xdb, 0x50, 0xb4, 0xf6, 0x3a, 0xa2, 0x9b, 0xe3, 0xa1, - 0xab, 0x0b, 0xd7, 0xc9, 0x11, 0x3f, 0x01, 0x08, 0xe8, 0x6f, 0xa1, 0xe5, 0xd0, 0x11, 0x61, 0x52, - 0xae, 0x8c, 0x2a, 0x79, 0x5d, 0x88, 0x22, 0xad, 0x28, 0x80, 0x35, 0x28, 0x3a, 0xd4, 0x1f, 0x06, - 0x6e, 0x18, 0x7a, 0x94, 0x48, 0x79, 0xde, 0xe6, 0xbb, 0x8d, 0x6d, 0x5a, 0x73, 0x3e, 0x6f, 0x96, - 0x2e, 0x80, 0x9f, 0xc1, 0xee, 0x88, 0x24, 0x01, 0xb7, 0x67, 0x85, 0xde, 0x5b, 0x57, 0x7a, 0xc0, - 0xbb, 0x8a, 0x69, 0xc0, 0xf0, 0xde, 0xba, 0xf8, 0x29, 0xec, 0x2c, 0x53, 0x1f, 0x72, 0xea, 0xf6, - 0x2a, 0x31, 0x99, 0xc4, 0xa2, 0x57, 0x57, 0xa1, 0xcb, 0xa4, 0xcf, 0x62, 0x62, 0x12, 0xee, 0xf0, - 0x28, 0xfe, 0x1a, 0xb6, 0x66, 0x44, 0x5e, 0xaf, 0xc0, 0x69, 0xa5, 0x24, 0xc8, 0xab, 0xbd, 0x00, - 0x08, 0x99, 0xcd, 0xbc, 0x90, 0x79, 0x4e, 0x28, 0x09, 0x65, 0x54, 0x29, 0x36, 0x9e, 0x6e, 0xfc, - 0xc9, 0xc6, 0x8c, 0xae, 0xa7, 0x52, 0xf1, 0x57, 0x50, 0xe2, 0x0f, 0x9d, 0xbc, 0x2e, 0xf0, 0x66, - 0xc5, 0x38, 0xc6, 0xdf, 0xf7, 0x38, 0x04, 0x98, 0x27, 0xe3, 0x23, 0x10, 0x7c, 0x8f, 0x58, 0x9c, - 0xc0, 0xc5, 0x2e, 0xe9, 0x05, 0xdf, 0x23, 0x5c, 0x38, 0x0e, 0xda, 0x37, 0x53, 0x30, 0x3b, 0x05, - 0xed, 0x9b, 0x18, 0x7c, 0x06, 0xbb, 0x8e, 0x1d, 0xf4, 0x3c, 0x62, 0x0f, 0x3c, 0x36, 0x5e, 0x50, - 0x53, 0x4c, 0x01, 0x71, 0xd3, 0x77, 0x39, 0x28, 0x74, 0xed, 0xbe, 0xcb, 0xbd, 0xb5, 0x56, 0x11, - 0xf4, 0xe9, 0x8a, 0x64, 0xd7, 0x2a, 0xb2, 0x07, 0x0f, 0x9c, 0xc0, 0x39, 0x69, 0xf0, 0x19, 0xb6, - 0xf4, 0xf8, 0xb2, 0x64, 0xb6, 0xfc, 0xb2, 0xd9, 0x14, 0x28, 0xb8, 0xc4, 0xa1, 0x3d, 0x8f, 0xf4, - 0xb9, 0x27, 0xb6, 0x1b, 0xdf, 0x6e, 0x7c, 0x76, 0x65, 0x4a, 0xe6, 0x36, 0x9b, 0xa5, 0xe2, 0x2f, - 0xa1, 0x98, 0x76, 0x42, 0x6c, 0x19, 0x48, 0xb9, 0xe0, 0x08, 0x84, 0xb9, 0x03, 0x62, 0xa3, 0x14, - 0xfe, 0x43, 0xfd, 0xc2, 0xff, 0xa7, 0xbe, 0xb0, 0xaa, 0xfe, 0x07, 0x04, 0x3b, 0x86, 0xeb, 0x30, - 0x8f, 0x12, 0x83, 0x06, 0x8c, 0xeb, 0x71, 0x01, 0x25, 0x87, 0xff, 0xf3, 0xad, 0x90, 0x06, 0x2c, - 0x94, 0x50, 0x39, 0x57, 0x29, 0x36, 0x9e, 0x6f, 0x9e, 0x60, 0xb1, 0x46, 0x2d, 0x5e, 0x1d, 0xd1, - 0x35, 0xfa, 0xf7, 0x25, 0xe7, 0xf0, 0x70, 0x9c, 0x6c, 0x95, 0xe8, 0x1a, 0x0d, 0x38, 0xed, 0xe4, - 0x91, 0x9e, 0x7b, 0xc3, 0x45, 0xdf, 0x4a, 0x12, 0xd4, 0x28, 0x84, 0x7f, 0x06, 0xa1, 0xe7, 0x05, - 0x71, 0xf5, 0xe9, 0x8e, 0xa9, 0x6e, 0x9e, 0x84, 0x06, 0xac, 0x9d, 0x64, 0xe8, 0xf3, 0xe4, 0xea, - 0x08, 0x84, 0xd9, 0xfe, 0xc1, 0x87, 0xf0, 0xc5, 0xeb, 0xe6, 0xd9, 0x2b, 0xc5, 0x32, 0xcf, 0xbb, - 0x8a, 0xf5, 0x4a, 0x33, 0xba, 0x4a, 0x4b, 0xfd, 0x49, 0x55, 0xda, 0x62, 0x06, 0xef, 0x81, 0x98, - 0xc2, 0x54, 0xcd, 0xfc, 0xf1, 0x07, 0x11, 0xe1, 0x7d, 0xd8, 0x4d, 0x67, 0xc4, 0xe1, 0x2c, 0x3e, - 0x80, 0xfd, 0x54, 0xf8, 0xf4, 0xdc, 0x54, 0xac, 0xa6, 0xae, 0x37, 0xcf, 0xc5, 0xfc, 0x71, 0xbe, - 0x90, 0x13, 0x73, 0xd5, 0x77, 0x08, 0x76, 0x96, 0x16, 0x12, 0x2e, 0xc3, 0xe3, 0x56, 0xe7, 0x65, - 0x57, 0x57, 0x0c, 0x43, 0xed, 0x68, 0xeb, 0x66, 0x38, 0x80, 0xfd, 0x15, 0x86, 0xd6, 0xd1, 0x14, - 0x11, 0xe1, 0x23, 0x78, 0xb4, 0x02, 0x19, 0x5a, 0xb3, 0xdb, 0x3d, 0x8f, 0xc7, 0x59, 0x01, 0x2f, - 0x0c, 0xb3, 0x2d, 0xe6, 0xaa, 0x63, 0x28, 0xa5, 0xed, 0x8a, 0x9f, 0xc0, 0x81, 0xa2, 0xb5, 0x3a, - 0x6d, 0x55, 0x7b, 0xb1, 0x6e, 0x82, 0x47, 0xf0, 0xf9, 0x22, 0xdc, 0x3d, 0x6b, 0xaa, 0x9a, 0x88, - 0x56, 0x81, 0xb6, 0x72, 0x66, 0x36, 0xc5, 0x2c, 0x96, 0x60, 0x6f, 0x11, 0x38, 0x55, 0xcd, 0x97, - 0xcd, 0xae, 0x98, 0xab, 0x0e, 0x60, 0x6b, 0x41, 0x16, 0x2c, 0xc3, 0xa1, 0xd1, 0xd1, 0x4d, 0xab, - 0xad, 0xea, 0x4a, 0xcb, 0x8c, 0x26, 0x5d, 0x6c, 0xfe, 0x18, 0xa4, 0x25, 0xbc, 0x69, 0xb4, 0x14, - 0x2d, 0x2a, 0x2d, 0xa2, 0x68, 0xf2, 0x25, 0xb4, 0xad, 0xcc, 0xe0, 0xec, 0xe9, 0xcd, 0xed, 0x9d, - 0x9c, 0x79, 0x7f, 0x27, 0x67, 0x3e, 0xde, 0xc9, 0xe8, 0xf7, 0x89, 0x8c, 0xfe, 0x9c, 0xc8, 0xe8, - 0xaf, 0x89, 0x8c, 0x6e, 0x27, 0x32, 0xfa, 0x7b, 0x22, 0xa3, 0x0f, 0x13, 0x39, 0xf3, 0x71, 0x22, - 0xa3, 0x3f, 0xee, 0xe5, 0xcc, 0xed, 0xbd, 0x9c, 0x79, 0x7f, 0x2f, 0x67, 0x2e, 0x4e, 0xfb, 0x1e, - 0xfb, 0x65, 0x74, 0x59, 0x73, 0xa8, 0x5f, 0xef, 0x07, 0xf6, 0x95, 0x4d, 0xec, 0xfa, 0x80, 0xbe, - 0xf1, 0xea, 0xd7, 0x27, 0xf5, 0x4f, 0xfc, 0xc2, 0x5e, 0x3e, 0xe4, 0x1f, 0xd6, 0x93, 0x7f, 0x03, - 0x00, 0x00, 0xff, 0xff, 0x2a, 0x2b, 0x5b, 0xb1, 0x93, 0x07, 0x00, 0x00, + 0xfb, 0xa2, 0x91, 0x85, 0x4c, 0xd5, 0xa0, 0x5d, 0x46, 0x5a, 0x5c, 0x93, 0x53, 0x4f, 0xe9, 0xb5, + 0x3f, 0xa3, 0x3f, 0xa5, 0x47, 0x1f, 0x73, 0xac, 0xf1, 0x25, 0xc7, 0xfc, 0x84, 0x8e, 0x56, 0x08, + 0xc4, 0x47, 0x99, 0x1c, 0x7a, 0xdb, 0x7d, 0x9e, 0xe7, 0xfd, 0x60, 0xdf, 0x87, 0x57, 0xf0, 0x7c, + 0xf8, 0xa6, 0x5f, 0xef, 0xd9, 0xcc, 0xa6, 0x97, 0xbf, 0xd6, 0x3d, 0xc2, 0xdc, 0x80, 0xd8, 0x83, + 0xba, 0xef, 0x32, 0x3b, 0x02, 0x39, 0x13, 0xba, 0xcc, 0xef, 0xcd, 0x4f, 0xb5, 0x61, 0x40, 0x19, + 0xc5, 0x47, 0xd3, 0xa0, 0x5a, 0xa2, 0xad, 0x4d, 0x15, 0xb5, 0xeb, 0xef, 0x8f, 0xff, 0xc8, 0x03, + 0xb4, 0xe8, 0x60, 0xe4, 0x13, 0x95, 0x5c, 0x51, 0x8c, 0x21, 0x4f, 0x6c, 0xdf, 0x95, 0x50, 0x19, + 0x55, 0x04, 0x9d, 0x9f, 0xb1, 0x02, 0x70, 0x6d, 0x0f, 0x46, 0xae, 0xc5, 0xc6, 0x43, 0x57, 0xca, + 0x96, 0x51, 0x65, 0xbb, 0xf1, 0x4d, 0x6d, 0x43, 0xd2, 0xda, 0xeb, 0x48, 0x6e, 0x8e, 0x87, 0xae, + 0x2e, 0x5c, 0x27, 0x47, 0xfc, 0x04, 0x20, 0xa0, 0xbf, 0x85, 0x96, 0x43, 0x47, 0x84, 0x49, 0xb9, + 0x32, 0xaa, 0xe4, 0x75, 0x21, 0x42, 0x5a, 0x11, 0x80, 0x35, 0x28, 0x3a, 0xd4, 0x1f, 0x06, 0x6e, + 0x18, 0x7a, 0x94, 0x48, 0x79, 0x5e, 0xe6, 0xbb, 0x8d, 0x65, 0x5a, 0x73, 0x3d, 0x2f, 0x96, 0x4e, + 0x80, 0x9f, 0xc1, 0xee, 0x88, 0x24, 0x80, 0xdb, 0xb3, 0x42, 0xef, 0xad, 0x2b, 0x3d, 0xe0, 0x55, + 0xc5, 0x34, 0x61, 0x78, 0x6f, 0x5d, 0xfc, 0x14, 0x76, 0x96, 0xa5, 0x0f, 0xb9, 0x74, 0x7b, 0x55, + 0x98, 0x74, 0x62, 0xd1, 0xab, 0xab, 0xd0, 0x65, 0xd2, 0x67, 0xb1, 0x30, 0x81, 0x3b, 0x1c, 0xc5, + 0x5f, 0xc3, 0xd6, 0x4c, 0xc8, 0xf3, 0x15, 0xb8, 0xac, 0x94, 0x80, 0x3c, 0xdb, 0x0b, 0x80, 0x90, + 0xd9, 0xcc, 0x0b, 0x99, 0xe7, 0x84, 0x92, 0x50, 0x46, 0x95, 0x62, 0xe3, 0xe9, 0xc6, 0x9f, 0x6c, + 0xcc, 0xe4, 0x7a, 0x2a, 0x14, 0x7f, 0x05, 0x25, 0xfe, 0xd0, 0xc9, 0xeb, 0x02, 0x2f, 0x56, 0x8c, + 0xb1, 0xf8, 0x7d, 0xbf, 0x84, 0xe2, 0xd0, 0xee, 0xcf, 0x14, 0x45, 0xae, 0x00, 0x0e, 0x71, 0xc1, + 0x71, 0x08, 0x30, 0xcf, 0x8e, 0x8f, 0x40, 0xf0, 0x3d, 0x62, 0xf1, 0x0c, 0xdc, 0x0d, 0x25, 0xbd, + 0xe0, 0x7b, 0x84, 0x4f, 0x96, 0x93, 0xf6, 0xcd, 0x94, 0xcc, 0x4e, 0x49, 0xfb, 0x26, 0x26, 0x9f, + 0xc1, 0xae, 0x63, 0x07, 0x3d, 0x8f, 0xd8, 0x03, 0x8f, 0x8d, 0x17, 0xc6, 0x2d, 0xa6, 0x88, 0xb8, + 0xe8, 0xbb, 0x1c, 0x14, 0xba, 0x76, 0xdf, 0xe5, 0xe6, 0x5b, 0x3b, 0x32, 0xf4, 0xe9, 0x23, 0xcb, + 0xae, 0x1d, 0xd9, 0x1e, 0x3c, 0x70, 0x02, 0xe7, 0xa4, 0xc1, 0x7b, 0xd8, 0xd2, 0xe3, 0xcb, 0x92, + 0x1b, 0xf3, 0xcb, 0x6e, 0x54, 0xa0, 0xe0, 0x12, 0x87, 0xf6, 0x3c, 0xd2, 0xe7, 0xa6, 0xd9, 0x6e, + 0x7c, 0xbb, 0x71, 0x2e, 0xca, 0x54, 0xcc, 0x7d, 0x38, 0x0b, 0x8d, 0x1e, 0x3d, 0x6d, 0x95, 0xd8, + 0x53, 0x90, 0xb2, 0xc9, 0x11, 0x08, 0x73, 0x8b, 0xc4, 0x4e, 0x2a, 0xfc, 0x87, 0x3d, 0x0a, 0xff, + 0x9f, 0x3d, 0x84, 0x15, 0x7b, 0x1c, 0x7f, 0x40, 0xb0, 0x63, 0xb8, 0x0e, 0xf3, 0x28, 0x31, 0x68, + 0xc0, 0xf8, 0x3c, 0x2e, 0xa0, 0xe4, 0xf0, 0xd5, 0x60, 0x85, 0x34, 0x60, 0xa1, 0x84, 0xca, 0xb9, + 0x4a, 0xb1, 0xf1, 0x7c, 0x73, 0x07, 0x8b, 0x39, 0x6a, 0xf1, 0x6e, 0x89, 0xae, 0xd1, 0xdf, 0x33, + 0x39, 0x87, 0x87, 0xe3, 0x64, 0xed, 0x44, 0xd7, 0xa8, 0xc1, 0x69, 0x25, 0x8f, 0xf4, 0xdc, 0x1b, + 0x3e, 0xf4, 0xad, 0x24, 0x40, 0x8d, 0x20, 0xfc, 0x33, 0x08, 0x3d, 0x2f, 0x88, 0xb3, 0x4f, 0x97, + 0x50, 0x75, 0x73, 0x27, 0x34, 0x60, 0xed, 0x24, 0x42, 0x9f, 0x07, 0x57, 0x47, 0x20, 0xcc, 0x16, + 0x14, 0x3e, 0x84, 0x2f, 0x5e, 0x37, 0xcf, 0x5e, 0x29, 0x96, 0x79, 0xde, 0x55, 0xac, 0x57, 0x9a, + 0xd1, 0x55, 0x5a, 0xea, 0x4f, 0xaa, 0xd2, 0x16, 0x33, 0x78, 0x0f, 0xc4, 0x14, 0xa7, 0x6a, 0xe6, + 0x8f, 0x3f, 0x88, 0x08, 0xef, 0xc3, 0x6e, 0x3a, 0x22, 0x86, 0xb3, 0xf8, 0x00, 0xf6, 0x53, 0xf0, + 0xe9, 0xb9, 0xa9, 0x58, 0x4d, 0x5d, 0x6f, 0x9e, 0x8b, 0xf9, 0xe3, 0x7c, 0x21, 0x27, 0xe6, 0xaa, + 0xef, 0x10, 0xec, 0x2c, 0x6d, 0x2c, 0x5c, 0x86, 0xc7, 0xad, 0xce, 0xcb, 0xae, 0xae, 0x18, 0x86, + 0xda, 0xd1, 0xd6, 0xf5, 0x70, 0x00, 0xfb, 0x2b, 0x0a, 0xad, 0xa3, 0x29, 0x22, 0xc2, 0x47, 0xf0, + 0x68, 0x85, 0x32, 0xb4, 0x66, 0xb7, 0x7b, 0x1e, 0xb7, 0xb3, 0x42, 0x5e, 0x18, 0x66, 0x5b, 0xcc, + 0x55, 0xc7, 0x50, 0x4a, 0xdb, 0x15, 0x3f, 0x81, 0x03, 0x45, 0x6b, 0x75, 0xda, 0xaa, 0xf6, 0x62, + 0x5d, 0x07, 0x8f, 0xe0, 0xf3, 0x45, 0xba, 0x7b, 0xd6, 0x54, 0x35, 0x11, 0xad, 0x12, 0x6d, 0xe5, + 0xcc, 0x6c, 0x8a, 0x59, 0x2c, 0xc1, 0xde, 0x22, 0x71, 0xaa, 0x9a, 0x2f, 0x9b, 0x5d, 0x31, 0x57, + 0x1d, 0xc0, 0xd6, 0xc2, 0x58, 0xb0, 0x0c, 0x87, 0x46, 0x47, 0x37, 0xad, 0xb6, 0xaa, 0x2b, 0x2d, + 0x33, 0xea, 0x74, 0xb1, 0xf8, 0x63, 0x90, 0x96, 0xf8, 0xa6, 0xd1, 0x52, 0xb4, 0x28, 0xb5, 0x88, + 0xa2, 0xce, 0x97, 0xd8, 0xb6, 0x32, 0xa3, 0xb3, 0xa7, 0x37, 0xb7, 0x77, 0x72, 0xe6, 0xfd, 0x9d, + 0x9c, 0xf9, 0x78, 0x27, 0xa3, 0xdf, 0x27, 0x32, 0xfa, 0x6b, 0x22, 0xa3, 0xbf, 0x27, 0x32, 0xba, + 0x9d, 0xc8, 0xe8, 0x9f, 0x89, 0x8c, 0x3e, 0x4c, 0xe4, 0xcc, 0xc7, 0x89, 0x8c, 0xfe, 0xbc, 0x97, + 0x33, 0xb7, 0xf7, 0x72, 0xe6, 0xfd, 0xbd, 0x9c, 0xb9, 0x38, 0xed, 0x7b, 0xec, 0x97, 0xd1, 0x65, + 0xcd, 0xa1, 0x7e, 0xbd, 0x1f, 0xd8, 0x57, 0x36, 0xb1, 0xeb, 0x03, 0xfa, 0xc6, 0xab, 0x5f, 0x9f, + 0xd4, 0x3f, 0xf1, 0x13, 0x7c, 0xf9, 0x90, 0x7f, 0x79, 0x4f, 0xfe, 0x0d, 0x00, 0x00, 0xff, 0xff, + 0x4b, 0x56, 0xa2, 0xce, 0xb4, 0x07, 0x00, 0x00, } func (x ValueType) String() string { @@ -735,6 +745,9 @@ func (this *ColumnInfo) Equal(that interface{}) bool { if this.ValuesCount != that1.ValuesCount { return false } + if this.PagesCount != that1.PagesCount { + return false + } return true } func (this *Statistics) Equal(that interface{}) bool { @@ -875,7 +888,7 @@ func (this *ColumnInfo) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 14) + s := make([]string, 0, 15) s = append(s, "&datasetmd.ColumnInfo{") s = append(s, "Name: "+fmt.Sprintf("%#v", this.Name)+",\n") s = append(s, "ValueType: "+fmt.Sprintf("%#v", this.ValueType)+",\n") @@ -889,6 +902,7 @@ func (this *ColumnInfo) GoString() string { s = append(s, "Statistics: "+fmt.Sprintf("%#v", this.Statistics)+",\n") } s = append(s, "ValuesCount: "+fmt.Sprintf("%#v", this.ValuesCount)+",\n") + s = append(s, "PagesCount: "+fmt.Sprintf("%#v", this.PagesCount)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -975,6 +989,11 @@ func (m *ColumnInfo) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.PagesCount != 0 { + i = encodeVarintDatasetmd(dAtA, i, uint64(m.PagesCount)) + i-- + dAtA[i] = 0x58 + } if m.ValuesCount != 0 { i = encodeVarintDatasetmd(dAtA, i, uint64(m.ValuesCount)) i-- @@ -1273,6 +1292,9 @@ func (m *ColumnInfo) Size() (n int) { if m.ValuesCount != 0 { n += 1 + sovDatasetmd(uint64(m.ValuesCount)) } + if m.PagesCount != 0 { + n += 1 + sovDatasetmd(uint64(m.PagesCount)) + } return n } @@ -1384,6 +1406,7 @@ func (this *ColumnInfo) String() string { `MetadataSize:` + fmt.Sprintf("%v", this.MetadataSize) + `,`, `Statistics:` + strings.Replace(this.Statistics.String(), "Statistics", "Statistics", 1) + `,`, `ValuesCount:` + fmt.Sprintf("%v", this.ValuesCount) + `,`, + `PagesCount:` + fmt.Sprintf("%v", this.PagesCount) + `,`, `}`, }, "") return s @@ -1701,6 +1724,25 @@ func (m *ColumnInfo) Unmarshal(dAtA []byte) error { break } } + case 11: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field PagesCount", wireType) + } + m.PagesCount = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDatasetmd + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.PagesCount |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipDatasetmd(dAtA[iNdEx:]) diff --git a/pkg/dataobj/internal/metadata/datasetmd/datasetmd.proto b/pkg/dataobj/internal/metadata/datasetmd/datasetmd.proto index e4b34b5943..99f205cd57 100644 --- a/pkg/dataobj/internal/metadata/datasetmd/datasetmd.proto +++ b/pkg/dataobj/internal/metadata/datasetmd/datasetmd.proto @@ -41,6 +41,9 @@ message ColumnInfo { // Total number of non-NULL values in the entire column. uint64 values_count = 10; + + // Total number of pages in the column. + uint64 pages_count = 11; } // ValueType represents the valid types that values within a column can have. diff --git a/pkg/dataobj/querier/store_test.go b/pkg/dataobj/querier/store_test.go index a6f4a17dea..4895cc8145 100644 --- a/pkg/dataobj/querier/store_test.go +++ b/pkg/dataobj/querier/store_test.go @@ -133,19 +133,19 @@ func TestStore_SelectSamples(t *testing.T) { end: now.Add(time.Hour), shards: []string{"0_of_2"}, want: []sampleWithLabels{ - {Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(8 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(18 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(38 * time.Second).UnixNano(), Value: 1}}, - - {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(5 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(15 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(25 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(40 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(12 * time.Second).UnixNano(), Value: 1}}, {Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(22 * time.Second).UnixNano(), Value: 1}}, {Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(32 * time.Second).UnixNano(), Value: 1}}, {Labels: `{app="baz", env="prod", team="a"}`, Samples: logproto.Sample{Timestamp: now.Add(42 * time.Second).UnixNano(), Value: 1}}, + + {Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(10 * time.Second).UnixNano(), Value: 1}}, + {Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(20 * time.Second).UnixNano(), Value: 1}}, + {Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(35 * time.Second).UnixNano(), Value: 1}}, + + {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.UnixNano(), Value: 1}}, + {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(30 * time.Second).UnixNano(), Value: 1}}, + {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(45 * time.Second).UnixNano(), Value: 1}}, + {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(50 * time.Second).UnixNano(), Value: 1}}, }, }, { @@ -155,14 +155,14 @@ func TestStore_SelectSamples(t *testing.T) { end: now.Add(time.Hour), shards: []string{"1_of_2"}, want: []sampleWithLabels{ - {Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(10 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(20 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="foo", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(35 * time.Second).UnixNano(), Value: 1}}, + {Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(8 * time.Second).UnixNano(), Value: 1}}, + {Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(18 * time.Second).UnixNano(), Value: 1}}, + {Labels: `{app="bar", env="dev"}`, Samples: logproto.Sample{Timestamp: now.Add(38 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.UnixNano(), Value: 1}}, - {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(30 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(45 * time.Second).UnixNano(), Value: 1}}, - {Labels: `{app="foo", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(50 * time.Second).UnixNano(), Value: 1}}, + {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(5 * time.Second).UnixNano(), Value: 1}}, + {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(15 * time.Second).UnixNano(), Value: 1}}, + {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(25 * time.Second).UnixNano(), Value: 1}}, + {Labels: `{app="bar", env="prod"}`, Samples: logproto.Sample{Timestamp: now.Add(40 * time.Second).UnixNano(), Value: 1}}, }, }, { @@ -314,19 +314,19 @@ func TestStore_SelectLogs(t *testing.T) { limit: 100, direction: logproto.FORWARD, want: []entryWithLabels{ - {Labels: `{app="bar", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(8 * time.Second), Line: "bar5"}}, - {Labels: `{app="bar", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(18 * time.Second), Line: "bar6"}}, - {Labels: `{app="bar", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(38 * time.Second), Line: "bar7"}}, - - {Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(5 * time.Second), Line: "bar1"}}, - {Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(15 * time.Second), Line: "bar2"}}, - {Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(25 * time.Second), Line: "bar3"}}, - {Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(40 * time.Second), Line: "bar4"}}, - {Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(12 * time.Second), Line: "baz1"}}, {Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(22 * time.Second), Line: "baz2"}}, {Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(32 * time.Second), Line: "baz3"}}, {Labels: `{app="baz", env="prod", team="a"}`, Entry: logproto.Entry{Timestamp: now.Add(42 * time.Second), Line: "baz4"}}, + + {Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(10 * time.Second), Line: "foo5"}}, + {Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(20 * time.Second), Line: "foo6"}}, + {Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(35 * time.Second), Line: "foo7"}}, + + {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now, Line: "foo1"}}, + {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(30 * time.Second), Line: "foo2"}}, + {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(45 * time.Second), Line: "foo3"}}, + {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(50 * time.Second), Line: "foo4"}}, }, }, { @@ -338,14 +338,14 @@ func TestStore_SelectLogs(t *testing.T) { limit: 100, direction: logproto.FORWARD, want: []entryWithLabels{ - {Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(10 * time.Second), Line: "foo5"}}, - {Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(20 * time.Second), Line: "foo6"}}, - {Labels: `{app="foo", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(35 * time.Second), Line: "foo7"}}, + {Labels: `{app="bar", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(8 * time.Second), Line: "bar5"}}, + {Labels: `{app="bar", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(18 * time.Second), Line: "bar6"}}, + {Labels: `{app="bar", env="dev"}`, Entry: logproto.Entry{Timestamp: now.Add(38 * time.Second), Line: "bar7"}}, - {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now, Line: "foo1"}}, - {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(30 * time.Second), Line: "foo2"}}, - {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(45 * time.Second), Line: "foo3"}}, - {Labels: `{app="foo", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(50 * time.Second), Line: "foo4"}}, + {Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(5 * time.Second), Line: "bar1"}}, + {Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(15 * time.Second), Line: "bar2"}}, + {Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(25 * time.Second), Line: "bar3"}}, + {Labels: `{app="bar", env="prod"}`, Entry: logproto.Entry{Timestamp: now.Add(40 * time.Second), Line: "bar4"}}, }, }, { diff --git a/pkg/dataobj/sections/indexpointers/dataset.go b/pkg/dataobj/sections/indexpointers/dataset.go index 84864c1a6f..6b346a9b93 100644 --- a/pkg/dataobj/sections/indexpointers/dataset.go +++ b/pkg/dataobj/sections/indexpointers/dataset.go @@ -130,6 +130,7 @@ func newColumnDataset(dec *decoder, col *Column) *columnDataset { Type: info.ValueType, Compression: info.Compression, + PagesCount: int(info.PagesCount), RowsCount: int(info.RowsCount), ValuesCount: int(info.ValuesCount), CompressedSize: int(info.CompressedSize), diff --git a/pkg/dataobj/sections/indexpointers/encoder.go b/pkg/dataobj/sections/indexpointers/encoder.go index 337865db12..57ef91f21d 100644 --- a/pkg/dataobj/sections/indexpointers/encoder.go +++ b/pkg/dataobj/sections/indexpointers/encoder.go @@ -54,6 +54,7 @@ func (enc *encoder) OpenColumn(columnType indexpointersmd.ColumnType, info *data Info: &datasetmd.ColumnInfo{ Name: info.Name, ValueType: info.Type, + PagesCount: uint64(info.PagesCount), RowsCount: uint64(info.RowsCount), ValuesCount: uint64(info.ValuesCount), Compression: info.Compression, diff --git a/pkg/dataobj/sections/logs/dataset.go b/pkg/dataobj/sections/logs/dataset.go index 38cc0fcd48..10501e7887 100644 --- a/pkg/dataobj/sections/logs/dataset.go +++ b/pkg/dataobj/sections/logs/dataset.go @@ -130,6 +130,7 @@ func newColumnDataset(dec *decoder, col *Column) *columnDataset { Type: info.ValueType, Compression: info.Compression, + PagesCount: int(info.PagesCount), RowsCount: int(info.RowsCount), ValuesCount: int(info.ValuesCount), CompressedSize: int(info.CompressedSize), diff --git a/pkg/dataobj/sections/logs/decoder.go b/pkg/dataobj/sections/logs/decoder.go index 6d27a440cd..55fa6382dc 100644 --- a/pkg/dataobj/sections/logs/decoder.go +++ b/pkg/dataobj/sections/logs/decoder.go @@ -6,12 +6,16 @@ import ( "fmt" "io" + "github.com/dustin/go-humanize" + "github.com/go-kit/log/level" + "github.com/grafana/loki/v3/pkg/dataobj" "github.com/grafana/loki/v3/pkg/dataobj/internal/dataset" "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/logsmd" "github.com/grafana/loki/v3/pkg/dataobj/internal/result" "github.com/grafana/loki/v3/pkg/dataobj/internal/util/bufpool" "github.com/grafana/loki/v3/pkg/dataobj/internal/util/windowing" + utillog "github.com/grafana/loki/v3/pkg/util/log" ) // newDecoder creates a new [decoder] for the given [dataobj.SectionReader]. @@ -48,11 +52,14 @@ func (rd *decoder) Pages(ctx context.Context, columns []*logsmd.ColumnDesc) resu return c.GetInfo().MetadataOffset, c.GetInfo().MetadataSize } + numWindows := 0 for window := range windowing.Iter(columns, columnInfo, windowing.S3WindowSize) { if len(window) == 0 { continue } + numWindows++ + var ( windowOffset = window.Start().GetInfo().MetadataOffset windowSize = (window.End().GetInfo().MetadataOffset + window.End().GetInfo().MetadataSize) - windowOffset @@ -87,6 +94,8 @@ func (rd *decoder) Pages(ctx context.Context, columns []*logsmd.ColumnDesc) resu } } + level.Debug(utillog.WithContext(ctx, utillog.Logger)).Log("msg", "logs.decoder: retrieve page desc", "num_columns", len(columns), "window_size", humanize.Bytes(windowing.S3WindowSize), "total_windows", numWindows) + for _, data := range results { if !yield(data) { return nil @@ -121,11 +130,14 @@ func (rd *decoder) ReadPages(ctx context.Context, pages []*logsmd.PageDesc) resu // TODO(rfratto): If there are many windows, it may make sense to read them // in parallel. + numWindows := 0 for window := range windowing.Iter(pages, pageInfo, windowing.S3WindowSize) { if len(window) == 0 { continue } + numWindows++ + var ( windowOffset = window.Start().GetInfo().DataOffset windowSize = (window.End().GetInfo().DataOffset + window.End().GetInfo().DataSize) - windowOffset @@ -161,6 +173,8 @@ func (rd *decoder) ReadPages(ctx context.Context, pages []*logsmd.PageDesc) resu bufpool.Put(buffer) } + level.Debug(utillog.WithContext(ctx, utillog.Logger)).Log("msg", "logs.decoder: read pages", "num_pages", len(pages), "window_size", humanize.Bytes(windowing.S3WindowSize), "total_windows", numWindows) + for _, data := range results { if !yield(data) { return nil diff --git a/pkg/dataobj/sections/logs/encoder.go b/pkg/dataobj/sections/logs/encoder.go index 3c0e032571..ad4a0223fa 100644 --- a/pkg/dataobj/sections/logs/encoder.go +++ b/pkg/dataobj/sections/logs/encoder.go @@ -56,6 +56,7 @@ func (enc *encoder) OpenColumn(columnType logsmd.ColumnType, info *dataset.Colum Info: &datasetmd.ColumnInfo{ Name: info.Name, ValueType: info.Type, + PagesCount: uint64(info.PagesCount), RowsCount: uint64(info.RowsCount), ValuesCount: uint64(info.ValuesCount), Compression: info.Compression, diff --git a/pkg/dataobj/sections/logs/reader.go b/pkg/dataobj/sections/logs/reader.go index d50696b11c..8f0262ab3f 100644 --- a/pkg/dataobj/sections/logs/reader.go +++ b/pkg/dataobj/sections/logs/reader.go @@ -15,6 +15,7 @@ import ( "github.com/grafana/loki/v3/pkg/dataobj/internal/dataset" "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" "github.com/grafana/loki/v3/pkg/dataobj/internal/util/slicegrow" + "github.com/grafana/loki/v3/pkg/logqlmodel/stats" ) // ReaderOptions customizes the behavior of a [Reader]. @@ -121,6 +122,7 @@ type Reader struct { ready bool inner *dataset.Reader buf []dataset.Row + stats dataset.ReaderStats } // NewReader creates a new Reader from the provided options. Options are not @@ -159,7 +161,7 @@ func (r *Reader) Schema() *arrow.Schema { return r.schema } // [Reader.Schema]. These records must always be released after use. func (r *Reader) Read(ctx context.Context, batchSize int) (arrow.Record, error) { if !r.ready { - err := r.init() + err := r.init(ctx) if err != nil { return nil, fmt.Errorf("initializing Reader: %w", err) } @@ -171,7 +173,7 @@ func (r *Reader) Read(ctx context.Context, batchSize int) (arrow.Record, error) builder := array.NewRecordBuilder(r.opts.Allocator, r.schema) defer builder.Release() - n, readErr := r.inner.Read(ctx, r.buf) + n, readErr := r.inner.Read(dataset.WithStats(ctx, &r.stats), r.buf) for rowIndex := range n { row := r.buf[rowIndex] @@ -219,7 +221,7 @@ func (r *Reader) Read(ctx context.Context, batchSize int) (arrow.Record, error) return builder.NewRecord(), readErr } -func (r *Reader) init() error { +func (r *Reader) init(ctx context.Context) error { if err := r.opts.Validate(); err != nil { return fmt.Errorf("invalid options: %w", err) } else if r.opts.Allocator == nil { @@ -250,6 +252,9 @@ func (r *Reader) init() error { return fmt.Errorf("mapping predicates: %w", err) } + // TODO(ashwanth): remove when global stats are updated by the executor. + r.stats.LinkGlobalStats(stats.FromContext(ctx)) + innerOptions := dataset.ReaderOptions{ Dataset: dset, Columns: dset.Columns(), @@ -405,6 +410,7 @@ func mustConvertType(dtype arrow.DataType) datasetmd.ValueType { func (r *Reader) Reset(opts ReaderOptions) { r.opts = opts r.schema = columnsSchema(opts.Columns) + r.stats.Reset() r.ready = false @@ -415,6 +421,10 @@ func (r *Reader) Reset(opts ReaderOptions) { } } +func (r *Reader) Stats() *dataset.ReaderStats { + return &r.stats +} + // Close closes the Reader and releases any resources it holds. Closed Readers // can be reused by calling [Reader.Reset]. func (r *Reader) Close() error { diff --git a/pkg/dataobj/sections/logs/reader_test.go b/pkg/dataobj/sections/logs/reader_test.go index 18f1c85157..6bac798ad6 100644 --- a/pkg/dataobj/sections/logs/reader_test.go +++ b/pkg/dataobj/sections/logs/reader_test.go @@ -17,6 +17,7 @@ import ( "github.com/grafana/loki/v3/pkg/dataobj" "github.com/grafana/loki/v3/pkg/dataobj/sections/logs" + "github.com/grafana/loki/v3/pkg/logqlmodel/stats" "github.com/grafana/loki/v3/pkg/util/arrowtest" ) @@ -160,3 +161,83 @@ func readTable(ctx context.Context, r *logs.Reader) (arrow.Table, error) { return array.NewTableFromRecords(recs[0].Schema(), recs), nil } + +// TestReaderStats tests that the reader properly tracks statistics +func TestReaderStats(t *testing.T) { + alloc := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer alloc.AssertSize(t, 0) + + sec := buildSection(t, []logs.Record{ + {StreamID: 2, Timestamp: unixTime(40), Metadata: labels.FromStrings("trace_id", "789012"), Line: []byte("baz qux")}, + {StreamID: 2, Timestamp: unixTime(30), Metadata: labels.FromStrings("trace_id", "123456"), Line: []byte("foo bar")}, + {StreamID: 1, Timestamp: unixTime(20), Metadata: labels.FromStrings("trace_id", "abcdef"), Line: []byte("goodbye, world!")}, + {StreamID: 1, Timestamp: unixTime(10), Metadata: labels.EmptyLabels(), Line: []byte("hello, world!")}, + }) + + var ( + streamID = sec.Columns()[0] + traceID = sec.Columns()[2] + message = sec.Columns()[3] + ) + + // Create a reader with predicates + r := logs.NewReader(logs.ReaderOptions{ + Columns: []*logs.Column{streamID, traceID, message}, + Allocator: alloc, + Predicates: []logs.Predicate{ + logs.FuncPredicate{ + Column: traceID, + Keep: func(_ *logs.Column, value scalar.Scalar) bool { + if !value.IsValid() { + return false + } + + bb := value.(*scalar.String).Value.Bytes() + return bytes.Equal(bb, []byte("abcdef")) || bytes.Equal(bb, []byte("123456")) + }, + }, + logs.InPredicate{ + Column: streamID, + Values: []scalar.Scalar{ + scalar.NewInt64Scalar(1), + scalar.NewInt64Scalar(2), + }, + }, + }, + }) + + // Create stats context + statsCtx, ctx := stats.NewContext(context.Background()) + + // Read the data + actualTable, err := readTable(ctx, r) + if actualTable != nil { + defer actualTable.Release() + } + require.NoError(t, err) + + // Get the reader stats + readerStats := r.Stats() + + // Verify the stats are properly populated + require.Equal(t, int64(2), readerStats.ReadCalls) + require.Equal(t, uint64(2), readerStats.PrimaryColumns) // from 2 predicates + require.Equal(t, uint64(1), readerStats.SecondaryColumns) + require.Equal(t, uint64(2), readerStats.PrimaryColumnPages) + require.Equal(t, uint64(1), readerStats.SecondaryColumnPages) + + require.Equal(t, uint64(4), readerStats.MaxRows) + require.Equal(t, uint64(4), readerStats.RowsToReadAfterPruning) + require.Equal(t, uint64(4), readerStats.PrimaryRowsRead) + require.Equal(t, uint64(2), readerStats.SecondaryRowsRead) // 2 rows pass the predicate + + // Verify download stats - these should be populated by the downloader + require.Equal(t, uint64(3), readerStats.DownloadStats.PagesScanned) // one page per column + require.Equal(t, uint64(2), readerStats.DownloadStats.PrimaryColumnPages) + require.Equal(t, uint64(1), readerStats.DownloadStats.SecondaryColumnPages) + + // Verify global stats are updated + result := statsCtx.Result(0, 0, 0) + require.Equal(t, int64(4), result.Querier.Store.Dataobj.PrePredicateDecompressedRows) + require.Equal(t, int64(2), result.Querier.Store.Dataobj.PostPredicateRows) +} diff --git a/pkg/dataobj/sections/pointers/dataset.go b/pkg/dataobj/sections/pointers/dataset.go index 816981c013..d0ac6353e4 100644 --- a/pkg/dataobj/sections/pointers/dataset.go +++ b/pkg/dataobj/sections/pointers/dataset.go @@ -130,6 +130,7 @@ func newColumnDataset(dec *decoder, col *Column) *columnDataset { Type: info.ValueType, Compression: info.Compression, + PagesCount: int(info.PagesCount), RowsCount: int(info.RowsCount), ValuesCount: int(info.ValuesCount), CompressedSize: int(info.CompressedSize), diff --git a/pkg/dataobj/sections/pointers/encoder.go b/pkg/dataobj/sections/pointers/encoder.go index 571b03588b..5c2957e06e 100644 --- a/pkg/dataobj/sections/pointers/encoder.go +++ b/pkg/dataobj/sections/pointers/encoder.go @@ -54,6 +54,7 @@ func (enc *encoder) OpenColumn(columnType pointersmd.ColumnType, info *dataset.C Info: &datasetmd.ColumnInfo{ Name: info.Name, ValueType: info.Type, + PagesCount: uint64(info.PagesCount), RowsCount: uint64(info.RowsCount), ValuesCount: uint64(info.ValuesCount), Compression: info.Compression, diff --git a/pkg/dataobj/sections/streams/dataset.go b/pkg/dataobj/sections/streams/dataset.go index 61ed94f5a4..41ea3b594e 100644 --- a/pkg/dataobj/sections/streams/dataset.go +++ b/pkg/dataobj/sections/streams/dataset.go @@ -130,6 +130,7 @@ func newColumnDataset(dec *decoder, col *Column) *columnDataset { Type: info.ValueType, Compression: info.Compression, + PagesCount: int(info.PagesCount), RowsCount: int(info.RowsCount), ValuesCount: int(info.ValuesCount), CompressedSize: int(info.CompressedSize), diff --git a/pkg/dataobj/sections/streams/decoder.go b/pkg/dataobj/sections/streams/decoder.go index 62bc785f7f..3ab912cc25 100644 --- a/pkg/dataobj/sections/streams/decoder.go +++ b/pkg/dataobj/sections/streams/decoder.go @@ -6,12 +6,16 @@ import ( "fmt" "io" + "github.com/dustin/go-humanize" + "github.com/go-kit/log/level" + "github.com/grafana/loki/v3/pkg/dataobj" "github.com/grafana/loki/v3/pkg/dataobj/internal/dataset" "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/streamsmd" "github.com/grafana/loki/v3/pkg/dataobj/internal/result" "github.com/grafana/loki/v3/pkg/dataobj/internal/util/bufpool" "github.com/grafana/loki/v3/pkg/dataobj/internal/util/windowing" + utillog "github.com/grafana/loki/v3/pkg/util/log" ) // newDecoder creates a new [decoder] for the given [dataobj.SectionReader]. @@ -49,11 +53,14 @@ func (rd *decoder) Pages(ctx context.Context, columns []*streamsmd.ColumnDesc) r return c.GetInfo().MetadataOffset, c.GetInfo().MetadataSize } + numWindows := 0 for window := range windowing.Iter(columns, columnInfo, windowing.S3WindowSize) { if len(window) == 0 { continue } + numWindows++ + var ( windowOffset = window.Start().GetInfo().MetadataOffset windowSize = (window.End().GetInfo().MetadataOffset + window.End().GetInfo().MetadataSize) - windowOffset @@ -88,6 +95,8 @@ func (rd *decoder) Pages(ctx context.Context, columns []*streamsmd.ColumnDesc) r } } + level.Debug(utillog.WithContext(ctx, utillog.Logger)).Log("msg", "streams.decoder: retrieve page desc", "num_columns", len(columns), "window_size", humanize.Bytes(windowing.S3WindowSize), "total_windows", numWindows) + for _, data := range results { if !yield(data) { return nil @@ -122,11 +131,14 @@ func (rd *decoder) ReadPages(ctx context.Context, pages []*streamsmd.PageDesc) r // TODO(rfratto): If there are many windows, it may make sense to read them // in parallel. + numWindows := 0 for window := range windowing.Iter(pages, pageInfo, windowing.S3WindowSize) { if len(window) == 0 { continue } + numWindows++ + var ( windowOffset = window.Start().GetInfo().DataOffset windowSize = (window.End().GetInfo().DataOffset + window.End().GetInfo().DataSize) - windowOffset @@ -162,6 +174,8 @@ func (rd *decoder) ReadPages(ctx context.Context, pages []*streamsmd.PageDesc) r bufpool.Put(buffer) } + level.Debug(utillog.WithContext(ctx, utillog.Logger)).Log("msg", "streams.decoder: read pages", "num_pages", len(pages), "window_size", humanize.Bytes(windowing.S3WindowSize), "total_windows", numWindows) + for _, data := range results { if !yield(data) { return nil diff --git a/pkg/dataobj/sections/streams/encoder.go b/pkg/dataobj/sections/streams/encoder.go index 88cbed5023..c274e661b7 100644 --- a/pkg/dataobj/sections/streams/encoder.go +++ b/pkg/dataobj/sections/streams/encoder.go @@ -55,6 +55,7 @@ func (enc *encoder) OpenColumn(columnType streamsmd.ColumnType, info *dataset.Co Info: &datasetmd.ColumnInfo{ Name: info.Name, ValueType: info.Type, + PagesCount: uint64(info.PagesCount), RowsCount: uint64(info.RowsCount), ValuesCount: uint64(info.ValuesCount), Compression: info.Compression, diff --git a/pkg/engine/executor/dataobjscan.go b/pkg/engine/executor/dataobjscan.go index 236dc1e788..4078c6070a 100644 --- a/pkg/engine/executor/dataobjscan.go +++ b/pkg/engine/executor/dataobjscan.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "slices" + "time" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/memory" @@ -38,6 +39,7 @@ type dataobjScan struct { logger log.Logger initialized bool + initializedAt time.Time streams *streamsView streamsInjector *streamInjector reader *logs.Reader @@ -52,12 +54,15 @@ var _ Pipeline = (*dataobjScan)(nil) // [arrow.Record] composed of the requested log section in a data object. Rows // in the returned record are ordered by timestamp in the direction specified // by opts.Direction. -func newDataobjScanPipeline(opts dataobjScanOptions) *dataobjScan { +func newDataobjScanPipeline(opts dataobjScanOptions, logger log.Logger) *dataobjScan { if opts.Allocator == nil { opts.Allocator = memory.DefaultAllocator } - return &dataobjScan{opts: opts} + return &dataobjScan{ + opts: opts, + logger: logger, + } } func (s *dataobjScan) Read(ctx context.Context) error { @@ -89,6 +94,7 @@ func (s *dataobjScan) init() error { } s.initialized = true + s.initializedAt = time.Now().UTC() return nil } @@ -409,6 +415,11 @@ func (s *dataobjScan) Value() (arrow.Record, error) { return s.state.batch, s.st // Close closes s and releases all resources. func (s *dataobjScan) Close() { + if s.reader != nil { + // TODO(ashwanth): remove this once we have stats collection via executor + s.reader.Stats().LogSummary(s.logger, time.Since(s.initializedAt)) + } + if s.streams != nil { s.streams.Close() } diff --git a/pkg/engine/executor/dataobjscan_test.go b/pkg/engine/executor/dataobjscan_test.go index 0042da6dbb..b3be03cdbf 100644 --- a/pkg/engine/executor/dataobjscan_test.go +++ b/pkg/engine/executor/dataobjscan_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/apache/arrow-go/v18/arrow" + "github.com/go-kit/log" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/dataobj" @@ -86,7 +87,7 @@ func Test_dataobjScan(t *testing.T) { Projections: nil, // All columns BatchSize: 512, - }) + }, log.NewNopLogger()) expectFields := []arrow.Field{ {Name: "env", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true}, @@ -120,7 +121,7 @@ prod,notloki,NULL,notloki-pod-1,1970-01-01 00:00:02,hello world` }, BatchSize: 512, - }) + }, log.NewNopLogger()) expectFields := []arrow.Field{ {Name: "env", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true}, @@ -147,7 +148,7 @@ prod,1970-01-01 00:00:02` Projections: nil, // All columns BatchSize: 512, - }) + }, log.NewNopLogger()) expectFields := []arrow.Field{ {Name: "env", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true}, @@ -179,7 +180,7 @@ prod,notloki,NULL,notloki-pod-1,1970-01-01 00:00:02,hello world` &physical.ColumnExpr{Ref: types.ColumnRef{Column: "env", Type: types.ColumnTypeAmbiguous}}, }, BatchSize: 512, - }) + }, log.NewNopLogger()) expectFields := []arrow.Field{ {Name: "env", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true}, @@ -259,7 +260,7 @@ func Test_dataobjScan_DuplicateColumns(t *testing.T) { StreamIDs: []int64{1, 2, 3}, // All streams Projections: nil, // All columns BatchSize: 512, - }) + }, log.NewNopLogger()) expectFields := []arrow.Field{ {Name: "env", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true}, @@ -294,7 +295,7 @@ prod,NULL,pod-1,loki,NULL,override,1970-01-01 00:00:01,message 1` &physical.ColumnExpr{Ref: types.ColumnRef{Column: "pod", Type: types.ColumnTypeAmbiguous}}, }, BatchSize: 512, - }) + }, log.NewNopLogger()) expectFields := []arrow.Field{ {Name: "pod", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true}, @@ -321,7 +322,7 @@ pod-1,override` &physical.ColumnExpr{Ref: types.ColumnRef{Column: "namespace", Type: types.ColumnTypeAmbiguous}}, }, BatchSize: 512, - }) + }, log.NewNopLogger()) expectFields := []arrow.Field{ {Name: "namespace", Type: arrow.BinaryTypes.String, Metadata: labelMD, Nullable: true}, diff --git a/pkg/engine/executor/executor.go b/pkg/engine/executor/executor.go index fd895f109a..c19c7c072d 100644 --- a/pkg/engine/executor/executor.go +++ b/pkg/engine/executor/executor.go @@ -189,7 +189,7 @@ func (c *Context) executeDataObjScan(ctx context.Context, node *physical.DataObj BatchSize: c.batchSize, CacheSize: int(c.dataobjScanPageCacheSize), - }) + }, log.With(c.logger, "location", string(node.Location), "section", node.Section)) sortType, sortDirection, err := logsSection.PrimarySortOrder() if err != nil { diff --git a/pkg/logqlmodel/stats/context.go b/pkg/logqlmodel/stats/context.go index 4956609858..7eeb7fd03e 100644 --- a/pkg/logqlmodel/stats/context.go +++ b/pkg/logqlmodel/stats/context.go @@ -78,11 +78,6 @@ func NewContext(ctx context.Context) (*Context, context.Context) { return contextData, ctx } -func IsPresent(ctx context.Context) bool { - _, ok := ctx.Value(statsKey).(*Context) - return ok -} - // FromContext returns the statistics context. func FromContext(ctx context.Context) *Context { v, ok := ctx.Value(statsKey).(*Context)