mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
416 lines
14 KiB
416 lines
14 KiB
package pointers
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"sort"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
|
|
"github.com/grafana/loki/v3/pkg/dataobj"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/pointersmd"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/streamio"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/sliceclear"
|
|
)
|
|
|
|
// A SectionPointer is a pointer to an section within another object.
|
|
// It is a wide object containing two types of index information:
|
|
//
|
|
// 1. Stream indexing metadata
|
|
// 2. Column indexing metadata
|
|
//
|
|
// The stream indexing metadata is used to lookup which stream is in the referenced section, and their ID within the object.
|
|
// The column indexing metadata is used to lookup which column values are present in the referenced section.
|
|
// Path & Section are mandatory fields, and are used to uniquely identify the section within the referenced object.
|
|
type SectionPointer struct {
|
|
Path string
|
|
Section int64
|
|
PointerKind PointerKind
|
|
|
|
// Stream indexing metadata
|
|
StreamID int64
|
|
StreamIDRef int64
|
|
StartTs time.Time
|
|
EndTs time.Time
|
|
LineCount int64
|
|
UncompressedSize int64
|
|
|
|
// Column indexing metadata
|
|
ColumnIndex int64
|
|
ColumnName string
|
|
ValuesBloomFilter []byte
|
|
}
|
|
|
|
func (p *SectionPointer) Reset() {
|
|
p.Path = ""
|
|
p.Section = 0
|
|
p.PointerKind = PointerKindInvalid
|
|
|
|
p.StreamID = 0
|
|
p.StreamIDRef = 0
|
|
p.StartTs = time.Time{}
|
|
p.EndTs = time.Time{}
|
|
p.LineCount = 0
|
|
p.UncompressedSize = 0
|
|
|
|
p.ColumnIndex = 0
|
|
p.ColumnName = ""
|
|
p.ValuesBloomFilter = p.ValuesBloomFilter[:0]
|
|
}
|
|
|
|
type PointerKind int
|
|
|
|
const (
|
|
PointerKindInvalid PointerKind = iota // PointerKindInvalid is an invalid pointer kind.
|
|
PointerKindStreamIndex // PointerKindStreamIndex is a pointer for a stream index.
|
|
PointerKindColumnIndex // PointerKindColumnIndex is a pointer for a column index.
|
|
)
|
|
|
|
type streamKey struct {
|
|
objectPath string
|
|
section int64
|
|
streamID int64
|
|
}
|
|
|
|
// Builder builds a pointers section.
|
|
type Builder struct {
|
|
metrics *Metrics
|
|
pageSize int
|
|
|
|
// streamLookup is a map of the stream ID in this index object to the pointer.
|
|
streamLookup map[streamKey]*SectionPointer
|
|
// pointers is the list of pointers to encode.
|
|
pointers []*SectionPointer
|
|
|
|
key streamKey
|
|
}
|
|
|
|
// NewBuilder creates a new pointers section builder. The pageSize argument
|
|
// specifies how large pages should be.
|
|
func NewBuilder(metrics *Metrics, pageSize int) *Builder {
|
|
if metrics == nil {
|
|
metrics = NewMetrics()
|
|
}
|
|
return &Builder{
|
|
metrics: metrics,
|
|
pageSize: pageSize,
|
|
|
|
streamLookup: make(map[streamKey]*SectionPointer),
|
|
pointers: make([]*SectionPointer, 0, 1024),
|
|
}
|
|
}
|
|
|
|
// Type returns the [dataobj.SectionType] of the pointers builder.
|
|
func (b *Builder) Type() dataobj.SectionType { return sectionType }
|
|
|
|
// ObserveStream observes a stream in the index by recording the start & end timestamps, line count, and uncompressed size per-section.
|
|
func (b *Builder) ObserveStream(path string, section int64, idInObject int64, idInIndex int64, ts time.Time, uncompressedSize int64) {
|
|
b.key.objectPath = path
|
|
b.key.section = section
|
|
b.key.streamID = idInObject
|
|
|
|
pointer, ok := b.streamLookup[b.key]
|
|
if ok {
|
|
// Update the existing pointer
|
|
if ts.Before(pointer.StartTs) {
|
|
pointer.StartTs = ts
|
|
}
|
|
if ts.After(pointer.EndTs) {
|
|
pointer.EndTs = ts
|
|
}
|
|
pointer.LineCount++
|
|
pointer.UncompressedSize += uncompressedSize
|
|
return
|
|
}
|
|
|
|
newPointer := &SectionPointer{
|
|
Path: path,
|
|
Section: section,
|
|
PointerKind: PointerKindStreamIndex,
|
|
StreamID: idInIndex,
|
|
StreamIDRef: idInObject,
|
|
StartTs: ts,
|
|
EndTs: ts,
|
|
LineCount: 1,
|
|
UncompressedSize: uncompressedSize,
|
|
}
|
|
b.pointers = append(b.pointers, newPointer)
|
|
b.streamLookup[b.key] = newPointer
|
|
}
|
|
|
|
func (b *Builder) RecordColumnIndex(path string, section int64, columnName string, columnIndex int64, valuesBloomFilter []byte) {
|
|
newPointer := &SectionPointer{
|
|
Path: path,
|
|
Section: section,
|
|
PointerKind: PointerKindColumnIndex,
|
|
ColumnName: columnName,
|
|
ColumnIndex: columnIndex,
|
|
ValuesBloomFilter: valuesBloomFilter,
|
|
}
|
|
b.pointers = append(b.pointers, newPointer)
|
|
}
|
|
|
|
// EstimatedSize returns the estimated size of the Pointers section in bytes.
|
|
func (b *Builder) EstimatedSize() int {
|
|
// Since columns are only built when encoding, we can't use
|
|
// [dataset.ColumnBuilder.EstimatedSize] here.
|
|
//
|
|
// Instead, we use a basic heuristic, estimating delta encoding and
|
|
// compression:
|
|
//
|
|
// 1. Assume an ID delta of 1.
|
|
// 2. Assume a timestamp delta of 10m.
|
|
// 3. Assume a row count delta of 500.
|
|
// 4. Assume a 10kb uncompressed size delta.
|
|
// 5. Assume 5 bytes per column name with a 2x compression ratio.
|
|
// 6. Assume 50 bytes per column bloom.
|
|
// 7. Assume 10% of the pointers are column indexes.
|
|
|
|
var (
|
|
idDeltaSize = streamio.VarintSize(1)
|
|
timestampDeltaSize = streamio.VarintSize(10 * int64(time.Minute))
|
|
rowDeltaSize = streamio.VarintSize(500)
|
|
bytesDeltaSize = streamio.VarintSize(10000)
|
|
streamIndexCount = int(float64(len(b.streamLookup)) * 0.9)
|
|
columnIndexCount = int(float64(len(b.pointers)) * 0.1)
|
|
)
|
|
|
|
var sizeEstimate int
|
|
|
|
sizeEstimate += len(b.pointers) * idDeltaSize * 2 // ID + StreamIDRef
|
|
sizeEstimate += streamIndexCount * timestampDeltaSize // Min timestamp
|
|
sizeEstimate += streamIndexCount * timestampDeltaSize // Max timestamp
|
|
sizeEstimate += streamIndexCount * rowDeltaSize // Rows
|
|
sizeEstimate += streamIndexCount * bytesDeltaSize // Uncompressed size
|
|
sizeEstimate += columnIndexCount * 5 // Column name (2x compression ratio)
|
|
sizeEstimate += columnIndexCount * 50 // Column bloom
|
|
|
|
return sizeEstimate
|
|
}
|
|
|
|
// Flush flushes the streams section to the provided writer.
|
|
//
|
|
// After successful encoding, b is reset to a fresh state and can be reused.
|
|
func (b *Builder) Flush(w dataobj.SectionWriter) (n int64, err error) {
|
|
timer := prometheus.NewTimer(b.metrics.encodeSeconds)
|
|
defer timer.ObserveDuration()
|
|
|
|
b.sortPointerObjects()
|
|
|
|
var pointersEnc encoder
|
|
defer pointersEnc.Reset()
|
|
if err := b.encodeTo(&pointersEnc); err != nil {
|
|
return 0, fmt.Errorf("building encoder: %w", err)
|
|
}
|
|
|
|
n, err = pointersEnc.Flush(w)
|
|
if err == nil {
|
|
b.Reset()
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
// sortPointerObjects sorts the pointers so all the column indexes are together and all the stream indexes are ordered by StreamID then Timestamp.
|
|
func (b *Builder) sortPointerObjects() {
|
|
sort.Slice(b.pointers, func(i, j int) bool {
|
|
if b.pointers[i].PointerKind == PointerKindColumnIndex && b.pointers[j].PointerKind == PointerKindColumnIndex {
|
|
// A column index
|
|
if b.pointers[i].ColumnIndex == b.pointers[j].ColumnIndex {
|
|
return b.pointers[i].Section < b.pointers[j].Section
|
|
}
|
|
return b.pointers[i].ColumnIndex < b.pointers[j].ColumnIndex
|
|
} else if b.pointers[i].PointerKind == PointerKindStreamIndex && b.pointers[j].PointerKind == PointerKindStreamIndex {
|
|
// A stream
|
|
if b.pointers[i].StartTs.Equal(b.pointers[j].StartTs) {
|
|
return b.pointers[i].EndTs.Before(b.pointers[j].EndTs)
|
|
}
|
|
return b.pointers[i].StartTs.Before(b.pointers[j].StartTs)
|
|
}
|
|
// They're different, just make sure all the streams are separate from the columns
|
|
return b.pointers[i].StreamID < b.pointers[j].StreamID
|
|
})
|
|
}
|
|
|
|
func (b *Builder) encodeTo(enc *encoder) error {
|
|
// TODO(rfratto): handle one section becoming too large. This can happen when
|
|
// the number of columns is very wide. There are two approaches to handle
|
|
// this:
|
|
//
|
|
// 1. Split streams into multiple sections.
|
|
// 2. Move some columns into an aggregated column which holds multiple label
|
|
// keys and values.
|
|
|
|
pathBuilder, err := dataset.NewColumnBuilder("path", dataset.BuilderOptions{
|
|
PageSizeHint: b.pageSize,
|
|
Value: datasetmd.VALUE_TYPE_BYTE_ARRAY,
|
|
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
|
|
Compression: datasetmd.COMPRESSION_TYPE_ZSTD,
|
|
Statistics: dataset.StatisticsOptions{
|
|
StoreRangeStats: true,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("creating path column: %w", err)
|
|
}
|
|
sectionBuilder, err := numberColumnBuilder(b.pageSize)
|
|
if err != nil {
|
|
return fmt.Errorf("creating section column: %w", err)
|
|
}
|
|
pointerKindBuilder, err := numberColumnBuilder(b.pageSize)
|
|
if err != nil {
|
|
return fmt.Errorf("creating pointer kind column: %w", err)
|
|
}
|
|
|
|
// Stream info
|
|
idBuilder, err := numberColumnBuilder(b.pageSize)
|
|
if err != nil {
|
|
return fmt.Errorf("creating ID column: %w", err)
|
|
}
|
|
streamIDRefBuilder, err := numberColumnBuilder(b.pageSize)
|
|
if err != nil {
|
|
return fmt.Errorf("creating stream ID in object column: %w", err)
|
|
}
|
|
minTimestampBuilder, err := numberColumnBuilder(b.pageSize)
|
|
if err != nil {
|
|
return fmt.Errorf("creating minimum timestamp column: %w", err)
|
|
}
|
|
maxTimestampBuilder, err := numberColumnBuilder(b.pageSize)
|
|
if err != nil {
|
|
return fmt.Errorf("creating maximum timestamp column: %w", err)
|
|
}
|
|
rowCountBuilder, err := numberColumnBuilder(b.pageSize)
|
|
if err != nil {
|
|
return fmt.Errorf("creating rows column: %w", err)
|
|
}
|
|
uncompressedSizeBuilder, err := numberColumnBuilder(b.pageSize)
|
|
if err != nil {
|
|
return fmt.Errorf("creating uncompressed size column: %w", err)
|
|
}
|
|
|
|
// Column index info
|
|
columnNameBuilder, err := dataset.NewColumnBuilder("column_name", dataset.BuilderOptions{
|
|
PageSizeHint: b.pageSize,
|
|
Value: datasetmd.VALUE_TYPE_BYTE_ARRAY,
|
|
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
|
|
Compression: datasetmd.COMPRESSION_TYPE_ZSTD,
|
|
Statistics: dataset.StatisticsOptions{
|
|
StoreRangeStats: true,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("creating column name column: %w", err)
|
|
}
|
|
|
|
columnIndexBuilder, err := numberColumnBuilder(b.pageSize)
|
|
if err != nil {
|
|
return fmt.Errorf("creating column index column: %w", err)
|
|
}
|
|
|
|
valuesBloomFilterBuilder, err := dataset.NewColumnBuilder("values_bloom_filter", dataset.BuilderOptions{
|
|
PageSizeHint: b.pageSize,
|
|
Value: datasetmd.VALUE_TYPE_BYTE_ARRAY,
|
|
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
|
|
Compression: datasetmd.COMPRESSION_TYPE_NONE, // TODO: is there a sensible compression algorithm for bloom filters?
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("creating values bloom filter column: %w", err)
|
|
}
|
|
|
|
// Populate our column builders.
|
|
for i, pointer := range b.pointers {
|
|
_ = pathBuilder.Append(i, dataset.ByteArrayValue([]byte(pointer.Path)))
|
|
_ = sectionBuilder.Append(i, dataset.Int64Value(pointer.Section))
|
|
_ = pointerKindBuilder.Append(i, dataset.Int64Value(int64(pointer.PointerKind)))
|
|
|
|
if pointer.PointerKind == PointerKindStreamIndex {
|
|
// Append only fails if the rows are out-of-order, which can't happen here.
|
|
_ = idBuilder.Append(i, dataset.Int64Value(pointer.StreamID))
|
|
_ = streamIDRefBuilder.Append(i, dataset.Int64Value(pointer.StreamIDRef))
|
|
_ = minTimestampBuilder.Append(i, dataset.Int64Value(pointer.StartTs.UnixNano()))
|
|
_ = maxTimestampBuilder.Append(i, dataset.Int64Value(pointer.EndTs.UnixNano()))
|
|
_ = rowCountBuilder.Append(i, dataset.Int64Value(pointer.LineCount))
|
|
_ = uncompressedSizeBuilder.Append(i, dataset.Int64Value(pointer.UncompressedSize))
|
|
}
|
|
|
|
if pointer.PointerKind == PointerKindColumnIndex {
|
|
_ = columnNameBuilder.Append(i, dataset.ByteArrayValue([]byte(pointer.ColumnName)))
|
|
_ = columnIndexBuilder.Append(i, dataset.Int64Value(pointer.ColumnIndex))
|
|
_ = valuesBloomFilterBuilder.Append(i, dataset.ByteArrayValue(pointer.ValuesBloomFilter))
|
|
}
|
|
}
|
|
|
|
// Encode our builders to sections. We ignore errors after enc.OpenStreams
|
|
// (which may fail due to a caller) since we guarantee correct usage of the
|
|
// encoding API.
|
|
{
|
|
var errs []error
|
|
errs = append(errs, encodeColumn(enc, pointersmd.COLUMN_TYPE_PATH, pathBuilder))
|
|
errs = append(errs, encodeColumn(enc, pointersmd.COLUMN_TYPE_SECTION, sectionBuilder))
|
|
errs = append(errs, encodeColumn(enc, pointersmd.COLUMN_TYPE_POINTER_KIND, pointerKindBuilder))
|
|
errs = append(errs, encodeColumn(enc, pointersmd.COLUMN_TYPE_STREAM_ID, idBuilder))
|
|
errs = append(errs, encodeColumn(enc, pointersmd.COLUMN_TYPE_STREAM_ID_REF, streamIDRefBuilder))
|
|
errs = append(errs, encodeColumn(enc, pointersmd.COLUMN_TYPE_MIN_TIMESTAMP, minTimestampBuilder))
|
|
errs = append(errs, encodeColumn(enc, pointersmd.COLUMN_TYPE_MAX_TIMESTAMP, maxTimestampBuilder))
|
|
errs = append(errs, encodeColumn(enc, pointersmd.COLUMN_TYPE_ROW_COUNT, rowCountBuilder))
|
|
errs = append(errs, encodeColumn(enc, pointersmd.COLUMN_TYPE_UNCOMPRESSED_SIZE, uncompressedSizeBuilder))
|
|
errs = append(errs, encodeColumn(enc, pointersmd.COLUMN_TYPE_COLUMN_NAME, columnNameBuilder))
|
|
errs = append(errs, encodeColumn(enc, pointersmd.COLUMN_TYPE_COLUMN_INDEX, columnIndexBuilder))
|
|
errs = append(errs, encodeColumn(enc, pointersmd.COLUMN_TYPE_VALUES_BLOOM_FILTER, valuesBloomFilterBuilder))
|
|
|
|
if err := errors.Join(errs...); err != nil {
|
|
return fmt.Errorf("encoding columns: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func numberColumnBuilder(pageSize int) (*dataset.ColumnBuilder, error) {
|
|
return dataset.NewColumnBuilder("", dataset.BuilderOptions{
|
|
PageSizeHint: pageSize,
|
|
Value: datasetmd.VALUE_TYPE_INT64,
|
|
Encoding: datasetmd.ENCODING_TYPE_DELTA,
|
|
Compression: datasetmd.COMPRESSION_TYPE_NONE,
|
|
Statistics: dataset.StatisticsOptions{
|
|
StoreRangeStats: true,
|
|
},
|
|
})
|
|
}
|
|
|
|
func encodeColumn(enc *encoder, columnType pointersmd.ColumnType, builder *dataset.ColumnBuilder) error {
|
|
column, err := builder.Flush()
|
|
if err != nil {
|
|
return fmt.Errorf("flushing %s column: %w", columnType, err)
|
|
}
|
|
|
|
columnEnc, err := enc.OpenColumn(columnType, &column.Info)
|
|
if err != nil {
|
|
return fmt.Errorf("opening %s column encoder: %w", columnType, err)
|
|
}
|
|
defer func() {
|
|
// Discard on defer for safety. This will return an error if we
|
|
// successfully committed.
|
|
_ = columnEnc.Discard()
|
|
}()
|
|
|
|
for _, page := range column.Pages {
|
|
err := columnEnc.AppendPage(page)
|
|
if err != nil {
|
|
return fmt.Errorf("appending %s page: %w", columnType, err)
|
|
}
|
|
}
|
|
|
|
return columnEnc.Commit()
|
|
}
|
|
|
|
// Reset resets all state, allowing Pointers builder to be reused.
|
|
func (b *Builder) Reset() {
|
|
b.pointers = sliceclear.Clear(b.pointers)
|
|
clear(b.streamLookup)
|
|
|
|
b.metrics.minTimestamp.Set(0)
|
|
b.metrics.maxTimestamp.Set(0)
|
|
}
|
|
|