mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
260 lines
7.6 KiB
260 lines
7.6 KiB
package indexpointers
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"sort"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
|
|
"github.com/grafana/loki/v3/pkg/dataobj"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
|
|
datasetmd_v2 "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/streamio"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/sliceclear"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/sections/internal/columnar"
|
|
)
|
|
|
|
// IndexPointer is a pointer to an index object. It is used to lookup the index object
|
|
// by data object path within a time range.
|
|
//
|
|
// The path is the data object path, the start and end timestamps are the time range of data stored in the index object.
|
|
type IndexPointer struct {
|
|
Path string
|
|
StartTs time.Time
|
|
EndTs time.Time
|
|
}
|
|
|
|
type Builder struct {
|
|
metrics *Metrics
|
|
pageSize int
|
|
pageRowCount int
|
|
tenant string
|
|
|
|
indexPointers []*IndexPointer
|
|
}
|
|
|
|
func NewBuilder(metrics *Metrics, pageSize, pageRowCount int) *Builder {
|
|
if metrics == nil {
|
|
metrics = NewMetrics()
|
|
}
|
|
return &Builder{
|
|
metrics: metrics,
|
|
pageSize: pageSize,
|
|
pageRowCount: pageRowCount,
|
|
indexPointers: make([]*IndexPointer, 0, 1024),
|
|
}
|
|
}
|
|
|
|
func (b *Builder) SetTenant(tenant string) {
|
|
b.tenant = tenant
|
|
}
|
|
|
|
func (b *Builder) Tenant() string { return b.tenant }
|
|
|
|
func (b *Builder) Type() dataobj.SectionType { return sectionType }
|
|
|
|
// Append adds a new index pointer to the builder.
|
|
func (b *Builder) Append(path string, startTs time.Time, endTs time.Time) {
|
|
p := &IndexPointer{
|
|
Path: path,
|
|
StartTs: startTs.UTC(),
|
|
EndTs: endTs.UTC(),
|
|
}
|
|
b.indexPointers = append(b.indexPointers, p)
|
|
}
|
|
|
|
// EstimatedSize returns the estimated size of the Pointers section in bytes.
|
|
func (b *Builder) EstimatedSize() int {
|
|
// Since columns are only built when encoding, we can't use
|
|
// [dataset.ColumnBuilder.EstimatedSize] here.
|
|
//
|
|
// Instead, we use a basic heuristic, estimating delta encoding and
|
|
// compression:
|
|
//
|
|
// 1. Assume an average path length of 50 bytes with 3x compression ratio.
|
|
// 2. Assume a timestamp delta of 1 hour (3600 seconds).
|
|
// 3. Account for column metadata overhead.
|
|
|
|
if len(b.indexPointers) == 0 {
|
|
return 0
|
|
}
|
|
|
|
var (
|
|
avgPathLength = 85 // Average path length in bytes (based on real data)
|
|
pathCompressionRatio = 3 // ZSTD compression ratio
|
|
timestampDeltaSize = streamio.VarintSize(int64(time.Hour)) // 1 hour delta
|
|
metadataOverhead = 100 // Estimated metadata overhead per column
|
|
)
|
|
|
|
var sizeEstimate int
|
|
|
|
// Path column (byte arrays with ZSTD compression)
|
|
sizeEstimate += (len(b.indexPointers) * avgPathLength) / pathCompressionRatio
|
|
|
|
// Start timestamp column (int64 with delta encoding)
|
|
sizeEstimate += len(b.indexPointers) * timestampDeltaSize
|
|
|
|
// End timestamp column (int64 with delta encoding)
|
|
sizeEstimate += len(b.indexPointers) * timestampDeltaSize
|
|
|
|
// Column metadata overhead (3 columns)
|
|
sizeEstimate += 3 * metadataOverhead
|
|
|
|
return sizeEstimate
|
|
}
|
|
|
|
// Flush flushes the streams section to the provided writer.
|
|
//
|
|
// After successful encoding, b is reset to a fresh state and can be reused.
|
|
func (b *Builder) Flush(w dataobj.SectionWriter) (n int64, err error) {
|
|
timer := prometheus.NewTimer(b.metrics.encodeSeconds)
|
|
defer timer.ObserveDuration()
|
|
|
|
b.sortIndexPointers()
|
|
|
|
var enc columnar.Encoder
|
|
defer enc.Reset()
|
|
if err := b.encodeTo(&enc); err != nil {
|
|
return 0, fmt.Errorf("building encoder: %w", err)
|
|
}
|
|
|
|
enc.SetTenant(b.tenant)
|
|
|
|
n, err = enc.Flush(w)
|
|
if err == nil {
|
|
b.Reset()
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
func (b *Builder) sortIndexPointers() {
|
|
sort.Slice(b.indexPointers, func(i, j int) bool {
|
|
return b.indexPointers[i].StartTs.Before(b.indexPointers[j].StartTs) && b.indexPointers[i].EndTs.Before(b.indexPointers[j].StartTs)
|
|
})
|
|
}
|
|
|
|
// Reset resets all state, allowing IndexPointers to be reused.
|
|
func (b *Builder) Reset() {
|
|
b.indexPointers = sliceclear.Clear(b.indexPointers)
|
|
|
|
b.metrics.minTimestamp.Set(0)
|
|
b.metrics.maxTimestamp.Set(0)
|
|
}
|
|
|
|
func (b *Builder) encodeTo(enc *columnar.Encoder) error {
|
|
pathBuilder, err := dataset.NewColumnBuilder("path", dataset.BuilderOptions{
|
|
PageSizeHint: b.pageSize,
|
|
PageMaxRowCount: b.pageRowCount,
|
|
Type: dataset.ColumnType{
|
|
Physical: datasetmd_v2.PHYSICAL_TYPE_BINARY,
|
|
Logical: ColumnTypePath.String(),
|
|
},
|
|
Encoding: datasetmd_v2.ENCODING_TYPE_PLAIN,
|
|
Compression: datasetmd_v2.COMPRESSION_TYPE_ZSTD,
|
|
Statistics: dataset.StatisticsOptions{
|
|
StoreRangeStats: true,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("creating path column: %w", err)
|
|
}
|
|
|
|
minTimestampBuilder, err := dataset.NewColumnBuilder("min_timestamp", dataset.BuilderOptions{
|
|
PageSizeHint: b.pageSize,
|
|
PageMaxRowCount: b.pageRowCount,
|
|
Type: dataset.ColumnType{
|
|
Physical: datasetmd_v2.PHYSICAL_TYPE_INT64,
|
|
Logical: ColumnTypeMinTimestamp.String(),
|
|
},
|
|
Encoding: datasetmd_v2.ENCODING_TYPE_DELTA,
|
|
Compression: datasetmd_v2.COMPRESSION_TYPE_NONE,
|
|
Statistics: dataset.StatisticsOptions{
|
|
StoreRangeStats: true,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("creating min timestamp column: %w", err)
|
|
}
|
|
|
|
maxTimestampBuilder, err := dataset.NewColumnBuilder("max_timestamp", dataset.BuilderOptions{
|
|
PageSizeHint: b.pageSize,
|
|
PageMaxRowCount: b.pageRowCount,
|
|
Type: dataset.ColumnType{
|
|
Physical: datasetmd_v2.PHYSICAL_TYPE_INT64,
|
|
Logical: ColumnTypeMaxTimestamp.String(),
|
|
},
|
|
Encoding: datasetmd_v2.ENCODING_TYPE_DELTA,
|
|
Compression: datasetmd_v2.COMPRESSION_TYPE_NONE,
|
|
Statistics: dataset.StatisticsOptions{
|
|
StoreRangeStats: true,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("creating max timestamp column: %w", err)
|
|
}
|
|
|
|
for i, pointer := range b.indexPointers {
|
|
_ = pathBuilder.Append(i, dataset.BinaryValue([]byte(pointer.Path)))
|
|
_ = minTimestampBuilder.Append(i, dataset.Int64Value(pointer.StartTs.UnixNano()))
|
|
_ = maxTimestampBuilder.Append(i, dataset.Int64Value(pointer.EndTs.UnixNano()))
|
|
}
|
|
|
|
// Encode our builders to sections. We ignore errors after enc.OpenStreams
|
|
// (which may fail due to a caller) since we guarantee correct usage of the
|
|
// encoding API.
|
|
{
|
|
var errs []error
|
|
errs = append(errs, encodeColumn(enc, ColumnTypePath, pathBuilder))
|
|
errs = append(errs, encodeColumn(enc, ColumnTypeMinTimestamp, minTimestampBuilder))
|
|
errs = append(errs, encodeColumn(enc, ColumnTypeMaxTimestamp, maxTimestampBuilder))
|
|
|
|
if err := errors.Join(errs...); err != nil {
|
|
return fmt.Errorf("encoding columns: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func encodeColumn(enc *columnar.Encoder, columnType ColumnType, builder *dataset.ColumnBuilder) error {
|
|
column, err := builder.Flush()
|
|
if err != nil {
|
|
return fmt.Errorf("flushing %s column: %w", columnType, err)
|
|
}
|
|
|
|
columnEnc, err := enc.OpenColumn(column.ColumnDesc())
|
|
if err != nil {
|
|
return fmt.Errorf("opening %s column encoder: %w", columnType, err)
|
|
}
|
|
defer func() {
|
|
// Discard on defer for safety. This will return an error if we
|
|
// successfully committed.
|
|
_ = columnEnc.Discard()
|
|
}()
|
|
if len(column.Pages) == 0 {
|
|
// Column has no data; discard.
|
|
return nil
|
|
}
|
|
|
|
for _, page := range column.Pages {
|
|
err := columnEnc.AppendPage(page)
|
|
if err != nil {
|
|
return fmt.Errorf("appending %s page: %w", columnType, err)
|
|
}
|
|
}
|
|
|
|
if columnType == ColumnTypeMinTimestamp {
|
|
enc.SetSortInfo(&datasetmd_v2.SortInfo{
|
|
ColumnSorts: []*datasetmd_v2.SortInfo_ColumnSort{{
|
|
// NumColumns increases after calling Commit, so we can use the
|
|
// current value as the index.
|
|
ColumnIndex: uint32(enc.NumColumns()),
|
|
Direction: datasetmd_v2.SORT_DIRECTION_ASCENDING,
|
|
}},
|
|
})
|
|
}
|
|
|
|
return columnEnc.Commit()
|
|
}
|
|
|