Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj/sections/pointers/builder.go

416 lines
14 KiB

package pointers
import (
"errors"
"fmt"
"sort"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/pointersmd"
"github.com/grafana/loki/v3/pkg/dataobj/internal/streamio"
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/sliceclear"
)
// A SectionPointer is a pointer to an section within another object.
// It is a wide object containing two types of index information:
//
// 1. Stream indexing metadata
// 2. Column indexing metadata
//
// The stream indexing metadata is used to lookup which stream is in the referenced section, and their ID within the object.
// The column indexing metadata is used to lookup which column values are present in the referenced section.
// Path & Section are mandatory fields, and are used to uniquely identify the section within the referenced object.
type SectionPointer struct {
Path string
Section int64
PointerKind PointerKind
// Stream indexing metadata
StreamID int64
StreamIDRef int64
StartTs time.Time
EndTs time.Time
LineCount int64
UncompressedSize int64
// Column indexing metadata
ColumnIndex int64
ColumnName string
ValuesBloomFilter []byte
}
func (p *SectionPointer) Reset() {
p.Path = ""
p.Section = 0
p.PointerKind = PointerKindInvalid
p.StreamID = 0
p.StreamIDRef = 0
p.StartTs = time.Time{}
p.EndTs = time.Time{}
p.LineCount = 0
p.UncompressedSize = 0
p.ColumnIndex = 0
p.ColumnName = ""
p.ValuesBloomFilter = p.ValuesBloomFilter[:0]
}
type PointerKind int
const (
PointerKindInvalid PointerKind = iota // PointerKindInvalid is an invalid pointer kind.
PointerKindStreamIndex // PointerKindStreamIndex is a pointer for a stream index.
PointerKindColumnIndex // PointerKindColumnIndex is a pointer for a column index.
)
type streamKey struct {
objectPath string
section int64
streamID int64
}
// Builder builds a pointers section.
type Builder struct {
metrics *Metrics
pageSize int
// streamLookup is a map of the stream ID in this index object to the pointer.
streamLookup map[streamKey]*SectionPointer
// pointers is the list of pointers to encode.
pointers []*SectionPointer
key streamKey
}
// NewBuilder creates a new pointers section builder. The pageSize argument
// specifies how large pages should be.
func NewBuilder(metrics *Metrics, pageSize int) *Builder {
if metrics == nil {
metrics = NewMetrics()
}
return &Builder{
metrics: metrics,
pageSize: pageSize,
streamLookup: make(map[streamKey]*SectionPointer),
pointers: make([]*SectionPointer, 0, 1024),
}
}
// Type returns the [dataobj.SectionType] of the pointers builder.
func (b *Builder) Type() dataobj.SectionType { return sectionType }
// ObserveStream observes a stream in the index by recording the start & end timestamps, line count, and uncompressed size per-section.
func (b *Builder) ObserveStream(path string, section int64, idInObject int64, idInIndex int64, ts time.Time, uncompressedSize int64) {
b.key.objectPath = path
b.key.section = section
b.key.streamID = idInObject
pointer, ok := b.streamLookup[b.key]
if ok {
// Update the existing pointer
if ts.Before(pointer.StartTs) {
pointer.StartTs = ts
}
if ts.After(pointer.EndTs) {
pointer.EndTs = ts
}
pointer.LineCount++
pointer.UncompressedSize += uncompressedSize
return
}
newPointer := &SectionPointer{
Path: path,
Section: section,
PointerKind: PointerKindStreamIndex,
StreamID: idInIndex,
StreamIDRef: idInObject,
StartTs: ts,
EndTs: ts,
LineCount: 1,
UncompressedSize: uncompressedSize,
}
b.pointers = append(b.pointers, newPointer)
b.streamLookup[b.key] = newPointer
}
func (b *Builder) RecordColumnIndex(path string, section int64, columnName string, columnIndex int64, valuesBloomFilter []byte) {
newPointer := &SectionPointer{
Path: path,
Section: section,
PointerKind: PointerKindColumnIndex,
ColumnName: columnName,
ColumnIndex: columnIndex,
ValuesBloomFilter: valuesBloomFilter,
}
b.pointers = append(b.pointers, newPointer)
}
// EstimatedSize returns the estimated size of the Pointers section in bytes.
func (b *Builder) EstimatedSize() int {
// Since columns are only built when encoding, we can't use
// [dataset.ColumnBuilder.EstimatedSize] here.
//
// Instead, we use a basic heuristic, estimating delta encoding and
// compression:
//
// 1. Assume an ID delta of 1.
// 2. Assume a timestamp delta of 10m.
// 3. Assume a row count delta of 500.
// 4. Assume a 10kb uncompressed size delta.
// 5. Assume 5 bytes per column name with a 2x compression ratio.
// 6. Assume 50 bytes per column bloom.
// 7. Assume 10% of the pointers are column indexes.
var (
idDeltaSize = streamio.VarintSize(1)
timestampDeltaSize = streamio.VarintSize(10 * int64(time.Minute))
rowDeltaSize = streamio.VarintSize(500)
bytesDeltaSize = streamio.VarintSize(10000)
streamIndexCount = int(float64(len(b.streamLookup)) * 0.9)
columnIndexCount = int(float64(len(b.pointers)) * 0.1)
)
var sizeEstimate int
sizeEstimate += len(b.pointers) * idDeltaSize * 2 // ID + StreamIDRef
sizeEstimate += streamIndexCount * timestampDeltaSize // Min timestamp
sizeEstimate += streamIndexCount * timestampDeltaSize // Max timestamp
sizeEstimate += streamIndexCount * rowDeltaSize // Rows
sizeEstimate += streamIndexCount * bytesDeltaSize // Uncompressed size
sizeEstimate += columnIndexCount * 5 // Column name (2x compression ratio)
sizeEstimate += columnIndexCount * 50 // Column bloom
return sizeEstimate
}
// Flush flushes the streams section to the provided writer.
//
// After successful encoding, b is reset to a fresh state and can be reused.
func (b *Builder) Flush(w dataobj.SectionWriter) (n int64, err error) {
timer := prometheus.NewTimer(b.metrics.encodeSeconds)
defer timer.ObserveDuration()
b.sortPointerObjects()
var pointersEnc encoder
defer pointersEnc.Reset()
if err := b.encodeTo(&pointersEnc); err != nil {
return 0, fmt.Errorf("building encoder: %w", err)
}
n, err = pointersEnc.Flush(w)
if err == nil {
b.Reset()
}
return n, err
}
// sortPointerObjects sorts the pointers so all the column indexes are together and all the stream indexes are ordered by StreamID then Timestamp.
func (b *Builder) sortPointerObjects() {
sort.Slice(b.pointers, func(i, j int) bool {
if b.pointers[i].PointerKind == PointerKindColumnIndex && b.pointers[j].PointerKind == PointerKindColumnIndex {
// A column index
if b.pointers[i].ColumnIndex == b.pointers[j].ColumnIndex {
return b.pointers[i].Section < b.pointers[j].Section
}
return b.pointers[i].ColumnIndex < b.pointers[j].ColumnIndex
} else if b.pointers[i].PointerKind == PointerKindStreamIndex && b.pointers[j].PointerKind == PointerKindStreamIndex {
// A stream
if b.pointers[i].StartTs.Equal(b.pointers[j].StartTs) {
return b.pointers[i].EndTs.Before(b.pointers[j].EndTs)
}
return b.pointers[i].StartTs.Before(b.pointers[j].StartTs)
}
// They're different, just make sure all the streams are separate from the columns
return b.pointers[i].StreamID < b.pointers[j].StreamID
})
}
func (b *Builder) encodeTo(enc *encoder) error {
// TODO(rfratto): handle one section becoming too large. This can happen when
// the number of columns is very wide. There are two approaches to handle
// this:
//
// 1. Split streams into multiple sections.
// 2. Move some columns into an aggregated column which holds multiple label
// keys and values.
pathBuilder, err := dataset.NewColumnBuilder("path", dataset.BuilderOptions{
PageSizeHint: b.pageSize,
Value: datasetmd.VALUE_TYPE_BYTE_ARRAY,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
Compression: datasetmd.COMPRESSION_TYPE_ZSTD,
Statistics: dataset.StatisticsOptions{
StoreRangeStats: true,
},
})
if err != nil {
return fmt.Errorf("creating path column: %w", err)
}
sectionBuilder, err := numberColumnBuilder(b.pageSize)
if err != nil {
return fmt.Errorf("creating section column: %w", err)
}
pointerKindBuilder, err := numberColumnBuilder(b.pageSize)
if err != nil {
return fmt.Errorf("creating pointer kind column: %w", err)
}
// Stream info
idBuilder, err := numberColumnBuilder(b.pageSize)
if err != nil {
return fmt.Errorf("creating ID column: %w", err)
}
streamIDRefBuilder, err := numberColumnBuilder(b.pageSize)
if err != nil {
return fmt.Errorf("creating stream ID in object column: %w", err)
}
minTimestampBuilder, err := numberColumnBuilder(b.pageSize)
if err != nil {
return fmt.Errorf("creating minimum timestamp column: %w", err)
}
maxTimestampBuilder, err := numberColumnBuilder(b.pageSize)
if err != nil {
return fmt.Errorf("creating maximum timestamp column: %w", err)
}
rowCountBuilder, err := numberColumnBuilder(b.pageSize)
if err != nil {
return fmt.Errorf("creating rows column: %w", err)
}
uncompressedSizeBuilder, err := numberColumnBuilder(b.pageSize)
if err != nil {
return fmt.Errorf("creating uncompressed size column: %w", err)
}
// Column index info
columnNameBuilder, err := dataset.NewColumnBuilder("column_name", dataset.BuilderOptions{
PageSizeHint: b.pageSize,
Value: datasetmd.VALUE_TYPE_BYTE_ARRAY,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
Compression: datasetmd.COMPRESSION_TYPE_ZSTD,
Statistics: dataset.StatisticsOptions{
StoreRangeStats: true,
},
})
if err != nil {
return fmt.Errorf("creating column name column: %w", err)
}
columnIndexBuilder, err := numberColumnBuilder(b.pageSize)
if err != nil {
return fmt.Errorf("creating column index column: %w", err)
}
valuesBloomFilterBuilder, err := dataset.NewColumnBuilder("values_bloom_filter", dataset.BuilderOptions{
PageSizeHint: b.pageSize,
Value: datasetmd.VALUE_TYPE_BYTE_ARRAY,
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
Compression: datasetmd.COMPRESSION_TYPE_NONE, // TODO: is there a sensible compression algorithm for bloom filters?
})
if err != nil {
return fmt.Errorf("creating values bloom filter column: %w", err)
}
// Populate our column builders.
for i, pointer := range b.pointers {
_ = pathBuilder.Append(i, dataset.ByteArrayValue([]byte(pointer.Path)))
_ = sectionBuilder.Append(i, dataset.Int64Value(pointer.Section))
_ = pointerKindBuilder.Append(i, dataset.Int64Value(int64(pointer.PointerKind)))
if pointer.PointerKind == PointerKindStreamIndex {
// Append only fails if the rows are out-of-order, which can't happen here.
_ = idBuilder.Append(i, dataset.Int64Value(pointer.StreamID))
_ = streamIDRefBuilder.Append(i, dataset.Int64Value(pointer.StreamIDRef))
_ = minTimestampBuilder.Append(i, dataset.Int64Value(pointer.StartTs.UnixNano()))
_ = maxTimestampBuilder.Append(i, dataset.Int64Value(pointer.EndTs.UnixNano()))
_ = rowCountBuilder.Append(i, dataset.Int64Value(pointer.LineCount))
_ = uncompressedSizeBuilder.Append(i, dataset.Int64Value(pointer.UncompressedSize))
}
if pointer.PointerKind == PointerKindColumnIndex {
_ = columnNameBuilder.Append(i, dataset.ByteArrayValue([]byte(pointer.ColumnName)))
_ = columnIndexBuilder.Append(i, dataset.Int64Value(pointer.ColumnIndex))
_ = valuesBloomFilterBuilder.Append(i, dataset.ByteArrayValue(pointer.ValuesBloomFilter))
}
}
// Encode our builders to sections. We ignore errors after enc.OpenStreams
// (which may fail due to a caller) since we guarantee correct usage of the
// encoding API.
{
var errs []error
errs = append(errs, encodeColumn(enc, pointersmd.COLUMN_TYPE_PATH, pathBuilder))
errs = append(errs, encodeColumn(enc, pointersmd.COLUMN_TYPE_SECTION, sectionBuilder))
errs = append(errs, encodeColumn(enc, pointersmd.COLUMN_TYPE_POINTER_KIND, pointerKindBuilder))
errs = append(errs, encodeColumn(enc, pointersmd.COLUMN_TYPE_STREAM_ID, idBuilder))
errs = append(errs, encodeColumn(enc, pointersmd.COLUMN_TYPE_STREAM_ID_REF, streamIDRefBuilder))
errs = append(errs, encodeColumn(enc, pointersmd.COLUMN_TYPE_MIN_TIMESTAMP, minTimestampBuilder))
errs = append(errs, encodeColumn(enc, pointersmd.COLUMN_TYPE_MAX_TIMESTAMP, maxTimestampBuilder))
errs = append(errs, encodeColumn(enc, pointersmd.COLUMN_TYPE_ROW_COUNT, rowCountBuilder))
errs = append(errs, encodeColumn(enc, pointersmd.COLUMN_TYPE_UNCOMPRESSED_SIZE, uncompressedSizeBuilder))
errs = append(errs, encodeColumn(enc, pointersmd.COLUMN_TYPE_COLUMN_NAME, columnNameBuilder))
errs = append(errs, encodeColumn(enc, pointersmd.COLUMN_TYPE_COLUMN_INDEX, columnIndexBuilder))
errs = append(errs, encodeColumn(enc, pointersmd.COLUMN_TYPE_VALUES_BLOOM_FILTER, valuesBloomFilterBuilder))
if err := errors.Join(errs...); err != nil {
return fmt.Errorf("encoding columns: %w", err)
}
}
return nil
}
func numberColumnBuilder(pageSize int) (*dataset.ColumnBuilder, error) {
return dataset.NewColumnBuilder("", dataset.BuilderOptions{
PageSizeHint: pageSize,
Value: datasetmd.VALUE_TYPE_INT64,
Encoding: datasetmd.ENCODING_TYPE_DELTA,
Compression: datasetmd.COMPRESSION_TYPE_NONE,
Statistics: dataset.StatisticsOptions{
StoreRangeStats: true,
},
})
}
func encodeColumn(enc *encoder, columnType pointersmd.ColumnType, builder *dataset.ColumnBuilder) error {
column, err := builder.Flush()
if err != nil {
return fmt.Errorf("flushing %s column: %w", columnType, err)
}
columnEnc, err := enc.OpenColumn(columnType, &column.Info)
if err != nil {
return fmt.Errorf("opening %s column encoder: %w", columnType, err)
}
defer func() {
// Discard on defer for safety. This will return an error if we
// successfully committed.
_ = columnEnc.Discard()
}()
for _, page := range column.Pages {
err := columnEnc.AppendPage(page)
if err != nil {
return fmt.Errorf("appending %s page: %w", columnType, err)
}
}
return columnEnc.Commit()
}
// Reset resets all state, allowing Pointers builder to be reused.
func (b *Builder) Reset() {
b.pointers = sliceclear.Clear(b.pointers)
clear(b.streamLookup)
b.metrics.minTimestamp.Set(0)
b.metrics.maxTimestamp.Set(0)
}