mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
447 lines
14 KiB
447 lines
14 KiB
package streams
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
"go.uber.org/atomic"
|
|
|
|
"github.com/grafana/loki/v3/pkg/dataobj"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/dataset"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/streamio"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/internal/util/sliceclear"
|
|
"github.com/grafana/loki/v3/pkg/dataobj/sections/internal/columnar"
|
|
)
|
|
|
|
// A Stream is an individual stream within a data object.
|
|
type Stream struct {
|
|
// ID to uniquely represent a stream in a data object. Valid IDs start at 1.
|
|
// IDs are used to track streams across multiple sections in the same data
|
|
// object.
|
|
ID int64
|
|
|
|
// MinTime and MaxTime denote the range of timestamps across all entries in
|
|
// the stream.
|
|
MinTimestamp, MaxTimestamp time.Time // Minimum timestamp in the stream.
|
|
|
|
// Uncompressed size of the log lines and structured metadata values in the stream.
|
|
UncompressedSize int64
|
|
|
|
// Labels of the stream.
|
|
Labels labels.Labels
|
|
|
|
// Total number of log records in the stream.
|
|
Rows int
|
|
}
|
|
|
|
// Reset zeroes all values in the stream struct so it can be reused.
|
|
func (s *Stream) Reset() {
|
|
s.ID = 0
|
|
s.Labels = labels.EmptyLabels()
|
|
s.MinTimestamp = time.Time{}
|
|
s.MaxTimestamp = time.Time{}
|
|
s.UncompressedSize = 0
|
|
s.Rows = 0
|
|
}
|
|
|
|
var streamPool = sync.Pool{
|
|
New: func() interface{} {
|
|
return &Stream{}
|
|
},
|
|
}
|
|
|
|
// Builder builds a streams section.
|
|
type Builder struct {
|
|
metrics *Metrics
|
|
pageSize int
|
|
pageRowCount int
|
|
lastID atomic.Int64
|
|
lookup map[uint64][]*Stream
|
|
|
|
// The optional tenant that owns the builder. If specified, the section
|
|
// must only contain streams owned by the tenant, and no other tenants.
|
|
tenant string
|
|
|
|
// Size of all label values across all streams; used for
|
|
// [Streams.EstimatedSize]. Resets on [Streams.Reset].
|
|
currentLabelsSize int
|
|
|
|
globalMinTimestamp time.Time // Minimum timestamp across all streams, used for metrics.
|
|
globalMaxTimestamp time.Time // Maximum timestamp across all streams, used for metrics.
|
|
|
|
// orderedStreams is used for consistently iterating over the list of
|
|
// streams. It contains streamed added in append order.
|
|
ordered []*Stream
|
|
}
|
|
|
|
// NewBuilder creates a new sterams section builder. The pageSize argument
|
|
// specifies how large pages should be.
|
|
func NewBuilder(metrics *Metrics, pageSize, pageRowCount int) *Builder {
|
|
if metrics == nil {
|
|
metrics = NewMetrics()
|
|
}
|
|
return &Builder{
|
|
metrics: metrics,
|
|
pageSize: pageSize,
|
|
pageRowCount: pageRowCount,
|
|
lookup: make(map[uint64][]*Stream, 1024),
|
|
ordered: make([]*Stream, 0, 1024),
|
|
}
|
|
}
|
|
|
|
// Tenant returns the optional tenant that owns the builder.
|
|
func (b *Builder) Tenant() string { return b.tenant }
|
|
|
|
// SetTenant sets the tenant that owns the builder. A builder can be made
|
|
// multi-tenant by passing an empty string.
|
|
func (b *Builder) SetTenant(tenant string) { b.tenant = tenant }
|
|
|
|
// Type returns the [dataobj.SectionType] of the streams builder.
|
|
func (b *Builder) Type() dataobj.SectionType { return sectionType }
|
|
|
|
// TimeRange returns the minimum and maximum timestamp across all streams.
|
|
func (b *Builder) TimeRange() (time.Time, time.Time) {
|
|
return b.globalMinTimestamp, b.globalMaxTimestamp
|
|
}
|
|
|
|
// Record a stream record within the section. The provided timestamp is used to
|
|
// track the minimum and maximum timestamp of a stream. The number of calls to
|
|
// Record is used to track the number of rows for a stream. The recordSize is
|
|
// used to track the uncompressed size of the stream.
|
|
//
|
|
// The stream ID of the recorded stream is returned.
|
|
func (b *Builder) Record(streamLabels labels.Labels, ts time.Time, recordSize int64) int64 {
|
|
ts = ts.UTC()
|
|
b.observeRecord(ts)
|
|
|
|
stream := b.getOrAddStream(streamLabels)
|
|
if stream.MinTimestamp.IsZero() || ts.Before(stream.MinTimestamp) {
|
|
stream.MinTimestamp = ts
|
|
}
|
|
if stream.MaxTimestamp.IsZero() || ts.After(stream.MaxTimestamp) {
|
|
stream.MaxTimestamp = ts
|
|
}
|
|
stream.Rows++
|
|
stream.UncompressedSize += recordSize
|
|
|
|
return stream.ID
|
|
}
|
|
|
|
func (b *Builder) observeRecord(ts time.Time) {
|
|
b.metrics.recordsTotal.Inc()
|
|
|
|
if ts.Before(b.globalMinTimestamp) || b.globalMinTimestamp.IsZero() {
|
|
b.globalMinTimestamp = ts
|
|
b.metrics.minTimestamp.Set(float64(ts.Unix()))
|
|
}
|
|
if ts.After(b.globalMaxTimestamp) || b.globalMaxTimestamp.IsZero() {
|
|
b.globalMaxTimestamp = ts
|
|
b.metrics.maxTimestamp.Set(float64(ts.Unix()))
|
|
}
|
|
}
|
|
|
|
// AppendValue may only be used for copying streams from an existing section.
|
|
func (b *Builder) AppendValue(val Stream) {
|
|
newStream := streamPool.Get().(*Stream)
|
|
newStream.Reset()
|
|
|
|
newStream.ID = val.ID
|
|
newStream.MinTimestamp, newStream.MaxTimestamp = val.MinTimestamp, val.MaxTimestamp
|
|
newStream.UncompressedSize = val.UncompressedSize
|
|
newStream.Labels = val.Labels
|
|
newStream.Rows = val.Rows
|
|
|
|
newStream.Labels.Range(func(l labels.Label) {
|
|
b.currentLabelsSize += len(l.Value)
|
|
})
|
|
|
|
hash := labels.StableHash(newStream.Labels)
|
|
b.lookup[hash] = append(b.lookup[hash], newStream)
|
|
b.ordered = append(b.ordered, newStream)
|
|
}
|
|
|
|
// EstimatedSize returns the estimated size of the Streams section in bytes.
|
|
func (b *Builder) EstimatedSize() int {
|
|
// Since columns are only built when encoding, we can't use
|
|
// [dataset.ColumnBuilder.EstimatedSize] here.
|
|
//
|
|
// Instead, we use a basic heuristic, estimating delta encoding and
|
|
// compression:
|
|
//
|
|
// 1. Assume an ID delta of 1.
|
|
// 2. Assume a timestamp delta of 1s.
|
|
// 3. Assume a row count delta of 500.
|
|
// 4. Assume (conservative) 2x compression ratio of all label values.
|
|
|
|
var (
|
|
idDeltaSize = streamio.VarintSize(1)
|
|
timestampDeltaSize = streamio.VarintSize(int64(time.Second))
|
|
rowDeltaSize = streamio.VarintSize(500)
|
|
)
|
|
|
|
var sizeEstimate int
|
|
|
|
sizeEstimate += len(b.ordered) * idDeltaSize // ID
|
|
sizeEstimate += len(b.ordered) * timestampDeltaSize // Min timestamp
|
|
sizeEstimate += len(b.ordered) * timestampDeltaSize // Max timestamp
|
|
sizeEstimate += len(b.ordered) * rowDeltaSize // Rows
|
|
sizeEstimate += b.currentLabelsSize / 2 // All labels (2x compression ratio)
|
|
|
|
return sizeEstimate
|
|
}
|
|
|
|
func (b *Builder) getOrAddStream(streamLabels labels.Labels) *Stream {
|
|
hash := labels.StableHash(streamLabels)
|
|
matches, ok := b.lookup[hash]
|
|
if !ok {
|
|
return b.addStream(hash, streamLabels)
|
|
}
|
|
|
|
for _, stream := range matches {
|
|
if labels.Equal(stream.Labels, streamLabels) {
|
|
return stream
|
|
}
|
|
}
|
|
|
|
return b.addStream(hash, streamLabels)
|
|
}
|
|
|
|
func (b *Builder) addStream(hash uint64, streamLabels labels.Labels) *Stream {
|
|
streamLabels.Range(func(l labels.Label) {
|
|
b.currentLabelsSize += len(l.Value)
|
|
})
|
|
|
|
newStream := streamPool.Get().(*Stream)
|
|
newStream.Reset()
|
|
newStream.ID = b.lastID.Add(1)
|
|
newStream.Labels = streamLabels
|
|
|
|
b.lookup[hash] = append(b.lookup[hash], newStream)
|
|
b.ordered = append(b.ordered, newStream)
|
|
b.metrics.streamCount.Inc()
|
|
return newStream
|
|
}
|
|
|
|
// StreamID returns the stream ID for the provided streamLabels. If the stream
|
|
// has not been recorded, StreamID returns 0.
|
|
func (b *Builder) StreamID(streamLabels labels.Labels) int64 {
|
|
hash := labels.StableHash(streamLabels)
|
|
matches, ok := b.lookup[hash]
|
|
if !ok {
|
|
return 0
|
|
}
|
|
|
|
for _, stream := range matches {
|
|
if labels.Equal(stream.Labels, streamLabels) {
|
|
return stream.ID
|
|
}
|
|
}
|
|
|
|
return 0
|
|
}
|
|
|
|
// Flush flushes the streams section to the provided writer.
|
|
//
|
|
// After successful encoding, b is reset to a fresh state and can be reused.
|
|
func (b *Builder) Flush(w dataobj.SectionWriter) (n int64, err error) {
|
|
timer := prometheus.NewTimer(b.metrics.encodeSeconds)
|
|
defer timer.ObserveDuration()
|
|
|
|
var columnarEnc columnar.Encoder
|
|
defer columnarEnc.Reset()
|
|
|
|
if err := b.encodeTo(&columnarEnc); err != nil {
|
|
return 0, fmt.Errorf("building encoder: %w", err)
|
|
}
|
|
|
|
columnarEnc.SetTenant(b.tenant)
|
|
|
|
n, err = columnarEnc.Flush(w)
|
|
if err == nil {
|
|
b.Reset()
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
func (b *Builder) encodeTo(enc *columnar.Encoder) error {
|
|
// TODO(rfratto): handle one section becoming too large. This can happen when
|
|
// the number of columns is very wide. There are two approaches to handle
|
|
// this:
|
|
//
|
|
// 1. Split streams into multiple sections.
|
|
// 2. Move some columns into an aggregated column which holds multiple label
|
|
// keys and values.
|
|
|
|
idBuilder, err := numberColumnBuilder(ColumnTypeStreamID, b.pageSize, b.pageRowCount)
|
|
if err != nil {
|
|
return fmt.Errorf("creating ID column: %w", err)
|
|
}
|
|
minTimestampBuilder, err := numberColumnBuilder(ColumnTypeMinTimestamp, b.pageSize, b.pageRowCount)
|
|
if err != nil {
|
|
return fmt.Errorf("creating minimum timestamp column: %w", err)
|
|
}
|
|
maxTimestampBuilder, err := numberColumnBuilder(ColumnTypeMaxTimestamp, b.pageSize, b.pageRowCount)
|
|
if err != nil {
|
|
return fmt.Errorf("creating maximum timestamp column: %w", err)
|
|
}
|
|
rowsCountBuilder, err := numberColumnBuilder(ColumnTypeRows, b.pageSize, b.pageRowCount)
|
|
if err != nil {
|
|
return fmt.Errorf("creating rows column: %w", err)
|
|
}
|
|
uncompressedSizeBuilder, err := numberColumnBuilder(ColumnTypeUncompressedSize, b.pageSize, b.pageRowCount)
|
|
if err != nil {
|
|
return fmt.Errorf("creating uncompressed size column: %w", err)
|
|
}
|
|
|
|
var (
|
|
labelBuilders []*dataset.ColumnBuilder
|
|
labelBuilderlookup = map[string]int{} // Name to index
|
|
)
|
|
|
|
getLabelColumn := func(name string) (*dataset.ColumnBuilder, error) {
|
|
idx, ok := labelBuilderlookup[name]
|
|
if ok {
|
|
return labelBuilders[idx], nil
|
|
}
|
|
|
|
builder, err := dataset.NewColumnBuilder(name, dataset.BuilderOptions{
|
|
PageSizeHint: b.pageSize,
|
|
PageMaxRowCount: b.pageRowCount,
|
|
Type: dataset.ColumnType{
|
|
Physical: datasetmd.PHYSICAL_TYPE_BINARY,
|
|
Logical: ColumnTypeLabel.String(),
|
|
},
|
|
Encoding: datasetmd.ENCODING_TYPE_PLAIN,
|
|
Compression: datasetmd.COMPRESSION_TYPE_ZSTD,
|
|
Statistics: dataset.StatisticsOptions{
|
|
StoreRangeStats: true,
|
|
},
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("creating label column: %w", err)
|
|
}
|
|
|
|
labelBuilders = append(labelBuilders, builder)
|
|
labelBuilderlookup[name] = len(labelBuilders) - 1
|
|
return builder, nil
|
|
}
|
|
|
|
// Populate our column builders.
|
|
for i, stream := range b.ordered {
|
|
// Append only fails if the rows are out-of-order, which can't happen here.
|
|
_ = idBuilder.Append(i, dataset.Int64Value(stream.ID))
|
|
_ = minTimestampBuilder.Append(i, dataset.Int64Value(stream.MinTimestamp.UnixNano()))
|
|
_ = maxTimestampBuilder.Append(i, dataset.Int64Value(stream.MaxTimestamp.UnixNano()))
|
|
_ = rowsCountBuilder.Append(i, dataset.Int64Value(int64(stream.Rows)))
|
|
_ = uncompressedSizeBuilder.Append(i, dataset.Int64Value(stream.UncompressedSize))
|
|
|
|
err := stream.Labels.Validate(func(label labels.Label) error {
|
|
builder, err := getLabelColumn(label.Name)
|
|
if err != nil {
|
|
return fmt.Errorf("getting label column: %w", err)
|
|
}
|
|
_ = builder.Append(i, dataset.BinaryValue([]byte(label.Value)))
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Encode our builders to sections. We ignore errors after enc.OpenStreams
|
|
// (which may fail due to a caller) since we guarantee correct usage of the
|
|
// encoding API.
|
|
{
|
|
var errs []error
|
|
errs = append(errs, encodeColumn(enc, ColumnTypeStreamID, idBuilder))
|
|
errs = append(errs, encodeColumn(enc, ColumnTypeMinTimestamp, minTimestampBuilder))
|
|
errs = append(errs, encodeColumn(enc, ColumnTypeMaxTimestamp, maxTimestampBuilder))
|
|
errs = append(errs, encodeColumn(enc, ColumnTypeRows, rowsCountBuilder))
|
|
errs = append(errs, encodeColumn(enc, ColumnTypeUncompressedSize, uncompressedSizeBuilder))
|
|
if err := errors.Join(errs...); err != nil {
|
|
return fmt.Errorf("encoding columns: %w", err)
|
|
}
|
|
}
|
|
|
|
for _, labelBuilder := range labelBuilders {
|
|
// For consistency we'll make sure each label builder has the same number
|
|
// of rows as the other columns (which is the number of streams).
|
|
labelBuilder.Backfill(len(b.ordered))
|
|
|
|
err := encodeColumn(enc, ColumnTypeLabel, labelBuilder)
|
|
if err != nil {
|
|
return fmt.Errorf("encoding label column: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func numberColumnBuilder(columnType ColumnType, pageSize, pageRowCount int) (*dataset.ColumnBuilder, error) {
|
|
return dataset.NewColumnBuilder("", dataset.BuilderOptions{
|
|
PageSizeHint: pageSize,
|
|
PageMaxRowCount: pageRowCount,
|
|
Type: dataset.ColumnType{
|
|
Physical: datasetmd.PHYSICAL_TYPE_INT64,
|
|
Logical: columnType.String(),
|
|
},
|
|
Encoding: datasetmd.ENCODING_TYPE_DELTA,
|
|
Compression: datasetmd.COMPRESSION_TYPE_NONE,
|
|
Statistics: dataset.StatisticsOptions{
|
|
StoreRangeStats: true,
|
|
},
|
|
})
|
|
}
|
|
|
|
func encodeColumn(enc *columnar.Encoder, columnType ColumnType, builder *dataset.ColumnBuilder) error {
|
|
column, err := builder.Flush()
|
|
if err != nil {
|
|
return fmt.Errorf("flushing %s column: %w", columnType, err)
|
|
}
|
|
|
|
columnEnc, err := enc.OpenColumn(column.ColumnDesc())
|
|
if err != nil {
|
|
return fmt.Errorf("opening %s column encoder: %w", columnType, err)
|
|
}
|
|
defer func() {
|
|
// Discard on defer for safety. This will return an error if we
|
|
// successfully committed.
|
|
_ = columnEnc.Discard()
|
|
}()
|
|
if len(column.Pages) == 0 {
|
|
// Column has no data; discard.
|
|
return nil
|
|
}
|
|
|
|
for _, page := range column.Pages {
|
|
err := columnEnc.AppendPage(page)
|
|
if err != nil {
|
|
return fmt.Errorf("appending %s page: %w", columnType, err)
|
|
}
|
|
}
|
|
|
|
return columnEnc.Commit()
|
|
}
|
|
|
|
// Reset resets all state, allowing Streams to be reused.
|
|
func (b *Builder) Reset() {
|
|
b.lastID.Store(0)
|
|
for _, stream := range b.ordered {
|
|
streamPool.Put(stream)
|
|
}
|
|
clear(b.lookup)
|
|
b.tenant = ""
|
|
b.ordered = sliceclear.Clear(b.ordered)
|
|
b.currentLabelsSize = 0
|
|
b.globalMinTimestamp = time.Time{}
|
|
b.globalMaxTimestamp = time.Time{}
|
|
|
|
b.metrics.streamCount.Set(0)
|
|
b.metrics.minTimestamp.Set(0)
|
|
b.metrics.maxTimestamp.Set(0)
|
|
}
|
|
|