package dataset import ( "fmt" "sync" "github.com/klauspost/compress/zstd" "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" ) // BuilderOptions configures common settings for building pages. type BuilderOptions struct { // PageSizeHint is the soft limit for the size of the page. Builders try to // fill pages as close to this size as possible, but the actual size may be // slightly larger or smaller. PageSizeHint int // PageMaxRowCount is the limit for the number of rows of the page. // When 0 or a negative number, then builders use the [BuilderOptions.PageSizeHint] // option to determine when a page needs to be flushed. PageMaxRowCount int // Type is the type of data in the column. Type.Physical is used for // encoding; Type.Logical is used as a hint to readers. Type ColumnType // Encoding is the encoding algorithm to use for values. Encoding datasetmd.EncodingType // Compression is the compression algorithm to use for values. Compression datasetmd.CompressionType // CompressionOptions holds optional configuration for compression. CompressionOptions *CompressionOptions // StatisticsOptions holds optional configuration for statistics. Statistics StatisticsOptions } // StatisticsOptions customizes the collection of statistics for a column. type StatisticsOptions struct { // StoreRangeStats indicates whether to store value range statistics for the // column and pages. StoreRangeStats bool // StoreCardinalityStats indicates whether to store cardinality estimations, // facilitated by hyperloglog StoreCardinalityStats bool } // CompressionOptions customizes the compressor used when building pages. // CompressionOptions cache byte compressors to reduce total allocations. // As an optimization, callers should reuse CompressionOptions pointers // wherever possible. type CompressionOptions struct { // Zstd holds encoding options for Zstd compression. Only used for // [datasetmd.COMPRESSION_TYPE_ZSTD]. Zstd []zstd.EOption // A helper to get a shared Zstd Writer for the given EOptions. // The shared writer can only used for EncodeAll. zstdWriter func() *zstd.Encoder } func (o *CompressionOptions) init() { if o.zstdWriter == nil { o.zstdWriter = sync.OnceValue(func() *zstd.Encoder { writer, err := zstd.NewWriter(nil, o.Zstd...) if err != nil { panic(fmt.Errorf("error initializing shared zstd writer: %w", err)) } return writer }) } } // A ColumnBuilder builds a sequence of [Value] entries of a common type into a // column. Values are accumulated into a buffer and then flushed into // [MemPage]s once the size of data exceeds a configurable limit. type ColumnBuilder struct { tag string opts BuilderOptions rows int // Total number of rows in the column. pages []*MemPage statsBuilder *columnStatsBuilder pageBuilder *pageBuilder } // NewColumnBuilder creates a new ColumnBuilder from the optional tag and // provided options. NewColumnBuilder returns an error if the options are // invalid. func NewColumnBuilder(tag string, opts BuilderOptions) (*ColumnBuilder, error) { builder, err := newPageBuilder(opts) if err != nil { return nil, fmt.Errorf("creating page builder: %w", err) } statsBuilder, err := newColumnStatsBuilder(opts.Statistics) if err != nil { return nil, fmt.Errorf("creating stats builder: %w", err) } return &ColumnBuilder{ tag: tag, opts: opts, pageBuilder: builder, statsBuilder: statsBuilder, }, nil } // Append adds a new value into cb with the given zero-indexed row number. If // the row number is higher than the current number of rows in cb, null values // are added up to the new row. // // Append returns an error if the row number is out-of-order. func (cb *ColumnBuilder) Append(row int, value Value) error { if row < cb.rows { return fmt.Errorf("row %d is older than current row %d", row, cb.rows) } // We give three attempts to append the data to the buffer; if the buffer is // full, we cut a page and then append to the newly reset buffer. // In case we also need to backfill, there is case where the backfill fills // the second page, that needs to be flushed again. // // The third iteration should never fail, as the buffer will always be empty // then. for range 3 { if cb.append(row, value) { cb.rows = row + 1 cb.statsBuilder.Append(value) return nil } cb.flushPage() } panic("ColumnBuilder.Append: failed to append value to fresh buffer") } // EstimatedSize returns the estimated size of all data in cb. EstimatedSize // includes the compressed size of all cut pages in cb, followed by the size // estimate of the in-progress page. // // Because compression isn't considered for the in-progress page, EstimatedSize // tends to overestimate the actual size after flushing. func (cb *ColumnBuilder) EstimatedSize() int { var size int for _, p := range cb.pages { size += p.Desc.CompressedSize } size += cb.pageBuilder.EstimatedSize() return size } // Backfill adds NULLs into cb up to (but not including) the provided row // number. If values exist up to the provided row number, Backfill does // nothing. func (cb *ColumnBuilder) Backfill(row int) { // We give two attempts to append the data to the buffer; if the buffer is // full, we cut a page and then append again to the newly reset buffer. // // The second iteration should never fail, as the buffer will always be // empty. for range 2 { if cb.backfill(row) { return } cb.flushPage() } panic("ColumnBuilder.Backfill: failed to backfill buffer") } func (cb *ColumnBuilder) backfill(row int) bool { if row > cb.rows { if !cb.pageBuilder.AppendNulls(uint64(row - cb.rows)) { return false } cb.rows = row } return true } func (cb *ColumnBuilder) append(row int, value Value) bool { // Backfill up to row. if !cb.backfill(row) { return false } return cb.pageBuilder.Append(value) } // Flush converts data in cb into a [MemColumn]. Afterwards, cb is reset to a // fresh state and can be reused. func (cb *ColumnBuilder) Flush() (*MemColumn, error) { cb.flushPage() info := ColumnDesc{ Type: cb.opts.Type, Tag: cb.tag, PagesCount: len(cb.pages), Compression: cb.opts.Compression, Statistics: cb.statsBuilder.Flush(cb.pages), } for _, page := range cb.pages { info.RowsCount += page.Desc.RowCount info.ValuesCount += page.Desc.ValuesCount info.CompressedSize += page.Desc.CompressedSize info.UncompressedSize += page.Desc.UncompressedSize } column := &MemColumn{ Desc: info, Pages: cb.pages, } cb.Reset() return column, nil } func (cb *ColumnBuilder) flushPage() { if cb.pageBuilder.Rows() == 0 { return } page, err := cb.pageBuilder.Flush() if err != nil { // Flush should only return an error when it's empty, which we already // ensure it's not in the lines above. panic(fmt.Sprintf("failed to flush page: %s", err)) } cb.pages = append(cb.pages, page) } // Reset clears all data in cb and resets it to a fresh state. func (cb *ColumnBuilder) Reset() { cb.rows = 0 cb.pages = nil cb.pageBuilder.Reset() }