package indexpointers import ( "errors" "fmt" "sort" "time" "github.com/prometheus/client_golang/prometheus" "github.com/grafana/loki/v3/pkg/dataobj" "github.com/grafana/loki/v3/pkg/dataobj/internal/dataset" datasetmd_v2 "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" "github.com/grafana/loki/v3/pkg/dataobj/internal/streamio" "github.com/grafana/loki/v3/pkg/dataobj/internal/util/sliceclear" "github.com/grafana/loki/v3/pkg/dataobj/sections/internal/columnar" ) // IndexPointer is a pointer to an index object. It is used to lookup the index object // by data object path within a time range. // // The path is the data object path, the start and end timestamps are the time range of data stored in the index object. type IndexPointer struct { Path string StartTs time.Time EndTs time.Time } type Builder struct { metrics *Metrics pageSize int pageRowCount int tenant string indexPointers []*IndexPointer } func NewBuilder(metrics *Metrics, pageSize, pageRowCount int) *Builder { if metrics == nil { metrics = NewMetrics() } return &Builder{ metrics: metrics, pageSize: pageSize, pageRowCount: pageRowCount, indexPointers: make([]*IndexPointer, 0, 1024), } } func (b *Builder) SetTenant(tenant string) { b.tenant = tenant } func (b *Builder) Tenant() string { return b.tenant } func (b *Builder) Type() dataobj.SectionType { return sectionType } // Append adds a new index pointer to the builder. func (b *Builder) Append(path string, startTs time.Time, endTs time.Time) { p := &IndexPointer{ Path: path, StartTs: startTs.UTC(), EndTs: endTs.UTC(), } b.indexPointers = append(b.indexPointers, p) } // EstimatedSize returns the estimated size of the Pointers section in bytes. func (b *Builder) EstimatedSize() int { // Since columns are only built when encoding, we can't use // [dataset.ColumnBuilder.EstimatedSize] here. // // Instead, we use a basic heuristic, estimating delta encoding and // compression: // // 1. Assume an average path length of 50 bytes with 3x compression ratio. // 2. Assume a timestamp delta of 1 hour (3600 seconds). // 3. Account for column metadata overhead. if len(b.indexPointers) == 0 { return 0 } var ( avgPathLength = 85 // Average path length in bytes (based on real data) pathCompressionRatio = 3 // ZSTD compression ratio timestampDeltaSize = streamio.VarintSize(int64(time.Hour)) // 1 hour delta metadataOverhead = 100 // Estimated metadata overhead per column ) var sizeEstimate int // Path column (byte arrays with ZSTD compression) sizeEstimate += (len(b.indexPointers) * avgPathLength) / pathCompressionRatio // Start timestamp column (int64 with delta encoding) sizeEstimate += len(b.indexPointers) * timestampDeltaSize // End timestamp column (int64 with delta encoding) sizeEstimate += len(b.indexPointers) * timestampDeltaSize // Column metadata overhead (3 columns) sizeEstimate += 3 * metadataOverhead return sizeEstimate } // Flush flushes the streams section to the provided writer. // // After successful encoding, b is reset to a fresh state and can be reused. func (b *Builder) Flush(w dataobj.SectionWriter) (n int64, err error) { timer := prometheus.NewTimer(b.metrics.encodeSeconds) defer timer.ObserveDuration() b.sortIndexPointers() var enc columnar.Encoder defer enc.Reset() if err := b.encodeTo(&enc); err != nil { return 0, fmt.Errorf("building encoder: %w", err) } enc.SetTenant(b.tenant) n, err = enc.Flush(w) if err == nil { b.Reset() } return n, err } func (b *Builder) sortIndexPointers() { sort.Slice(b.indexPointers, func(i, j int) bool { return b.indexPointers[i].StartTs.Before(b.indexPointers[j].StartTs) && b.indexPointers[i].EndTs.Before(b.indexPointers[j].StartTs) }) } // Reset resets all state, allowing IndexPointers to be reused. func (b *Builder) Reset() { b.indexPointers = sliceclear.Clear(b.indexPointers) b.metrics.minTimestamp.Set(0) b.metrics.maxTimestamp.Set(0) } func (b *Builder) encodeTo(enc *columnar.Encoder) error { pathBuilder, err := dataset.NewColumnBuilder("path", dataset.BuilderOptions{ PageSizeHint: b.pageSize, PageMaxRowCount: b.pageRowCount, Type: dataset.ColumnType{ Physical: datasetmd_v2.PHYSICAL_TYPE_BINARY, Logical: ColumnTypePath.String(), }, Encoding: datasetmd_v2.ENCODING_TYPE_PLAIN, Compression: datasetmd_v2.COMPRESSION_TYPE_ZSTD, Statistics: dataset.StatisticsOptions{ StoreRangeStats: true, }, }) if err != nil { return fmt.Errorf("creating path column: %w", err) } minTimestampBuilder, err := dataset.NewColumnBuilder("min_timestamp", dataset.BuilderOptions{ PageSizeHint: b.pageSize, PageMaxRowCount: b.pageRowCount, Type: dataset.ColumnType{ Physical: datasetmd_v2.PHYSICAL_TYPE_INT64, Logical: ColumnTypeMinTimestamp.String(), }, Encoding: datasetmd_v2.ENCODING_TYPE_DELTA, Compression: datasetmd_v2.COMPRESSION_TYPE_NONE, Statistics: dataset.StatisticsOptions{ StoreRangeStats: true, }, }) if err != nil { return fmt.Errorf("creating min timestamp column: %w", err) } maxTimestampBuilder, err := dataset.NewColumnBuilder("max_timestamp", dataset.BuilderOptions{ PageSizeHint: b.pageSize, PageMaxRowCount: b.pageRowCount, Type: dataset.ColumnType{ Physical: datasetmd_v2.PHYSICAL_TYPE_INT64, Logical: ColumnTypeMaxTimestamp.String(), }, Encoding: datasetmd_v2.ENCODING_TYPE_DELTA, Compression: datasetmd_v2.COMPRESSION_TYPE_NONE, Statistics: dataset.StatisticsOptions{ StoreRangeStats: true, }, }) if err != nil { return fmt.Errorf("creating max timestamp column: %w", err) } for i, pointer := range b.indexPointers { _ = pathBuilder.Append(i, dataset.BinaryValue([]byte(pointer.Path))) _ = minTimestampBuilder.Append(i, dataset.Int64Value(pointer.StartTs.UnixNano())) _ = maxTimestampBuilder.Append(i, dataset.Int64Value(pointer.EndTs.UnixNano())) } // Encode our builders to sections. We ignore errors after enc.OpenStreams // (which may fail due to a caller) since we guarantee correct usage of the // encoding API. { var errs []error errs = append(errs, encodeColumn(enc, ColumnTypePath, pathBuilder)) errs = append(errs, encodeColumn(enc, ColumnTypeMinTimestamp, minTimestampBuilder)) errs = append(errs, encodeColumn(enc, ColumnTypeMaxTimestamp, maxTimestampBuilder)) if err := errors.Join(errs...); err != nil { return fmt.Errorf("encoding columns: %w", err) } } return nil } func encodeColumn(enc *columnar.Encoder, columnType ColumnType, builder *dataset.ColumnBuilder) error { column, err := builder.Flush() if err != nil { return fmt.Errorf("flushing %s column: %w", columnType, err) } columnEnc, err := enc.OpenColumn(column.ColumnDesc()) if err != nil { return fmt.Errorf("opening %s column encoder: %w", columnType, err) } defer func() { // Discard on defer for safety. This will return an error if we // successfully committed. _ = columnEnc.Discard() }() if len(column.Pages) == 0 { // Column has no data; discard. return nil } for _, page := range column.Pages { err := columnEnc.AppendPage(page) if err != nil { return fmt.Errorf("appending %s page: %w", columnType, err) } } if columnType == ColumnTypeMinTimestamp { enc.SetSortInfo(&datasetmd_v2.SortInfo{ ColumnSorts: []*datasetmd_v2.SortInfo_ColumnSort{{ // NumColumns increases after calling Commit, so we can use the // current value as the index. ColumnIndex: uint32(enc.NumColumns()), Direction: datasetmd_v2.SORT_DIRECTION_ASCENDING, }}, }) } return columnEnc.Commit() }