package streams import ( "errors" "fmt" "sync" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" "go.uber.org/atomic" "github.com/grafana/loki/v3/pkg/dataobj" "github.com/grafana/loki/v3/pkg/dataobj/internal/dataset" "github.com/grafana/loki/v3/pkg/dataobj/internal/metadata/datasetmd" "github.com/grafana/loki/v3/pkg/dataobj/internal/streamio" "github.com/grafana/loki/v3/pkg/dataobj/internal/util/sliceclear" "github.com/grafana/loki/v3/pkg/dataobj/sections/internal/columnar" ) // A Stream is an individual stream within a data object. type Stream struct { // ID to uniquely represent a stream in a data object. Valid IDs start at 1. // IDs are used to track streams across multiple sections in the same data // object. ID int64 // MinTime and MaxTime denote the range of timestamps across all entries in // the stream. MinTimestamp, MaxTimestamp time.Time // Minimum timestamp in the stream. // Uncompressed size of the log lines and structured metadata values in the stream. UncompressedSize int64 // Labels of the stream. Labels labels.Labels // Total number of log records in the stream. Rows int } // Reset zeroes all values in the stream struct so it can be reused. func (s *Stream) Reset() { s.ID = 0 s.Labels = labels.EmptyLabels() s.MinTimestamp = time.Time{} s.MaxTimestamp = time.Time{} s.UncompressedSize = 0 s.Rows = 0 } var streamPool = sync.Pool{ New: func() interface{} { return &Stream{} }, } // Builder builds a streams section. type Builder struct { metrics *Metrics pageSize int pageRowCount int lastID atomic.Int64 lookup map[uint64][]*Stream // The optional tenant that owns the builder. If specified, the section // must only contain streams owned by the tenant, and no other tenants. tenant string // Size of all label values across all streams; used for // [Streams.EstimatedSize]. Resets on [Streams.Reset]. currentLabelsSize int globalMinTimestamp time.Time // Minimum timestamp across all streams, used for metrics. globalMaxTimestamp time.Time // Maximum timestamp across all streams, used for metrics. // orderedStreams is used for consistently iterating over the list of // streams. It contains streamed added in append order. ordered []*Stream } // NewBuilder creates a new sterams section builder. The pageSize argument // specifies how large pages should be. func NewBuilder(metrics *Metrics, pageSize, pageRowCount int) *Builder { if metrics == nil { metrics = NewMetrics() } return &Builder{ metrics: metrics, pageSize: pageSize, pageRowCount: pageRowCount, lookup: make(map[uint64][]*Stream, 1024), ordered: make([]*Stream, 0, 1024), } } // Tenant returns the optional tenant that owns the builder. func (b *Builder) Tenant() string { return b.tenant } // SetTenant sets the tenant that owns the builder. A builder can be made // multi-tenant by passing an empty string. func (b *Builder) SetTenant(tenant string) { b.tenant = tenant } // Type returns the [dataobj.SectionType] of the streams builder. func (b *Builder) Type() dataobj.SectionType { return sectionType } // TimeRange returns the minimum and maximum timestamp across all streams. func (b *Builder) TimeRange() (time.Time, time.Time) { return b.globalMinTimestamp, b.globalMaxTimestamp } // Record a stream record within the section. The provided timestamp is used to // track the minimum and maximum timestamp of a stream. The number of calls to // Record is used to track the number of rows for a stream. The recordSize is // used to track the uncompressed size of the stream. // // The stream ID of the recorded stream is returned. func (b *Builder) Record(streamLabels labels.Labels, ts time.Time, recordSize int64) int64 { ts = ts.UTC() b.observeRecord(ts) stream := b.getOrAddStream(streamLabels) if stream.MinTimestamp.IsZero() || ts.Before(stream.MinTimestamp) { stream.MinTimestamp = ts } if stream.MaxTimestamp.IsZero() || ts.After(stream.MaxTimestamp) { stream.MaxTimestamp = ts } stream.Rows++ stream.UncompressedSize += recordSize return stream.ID } func (b *Builder) observeRecord(ts time.Time) { b.metrics.recordsTotal.Inc() if ts.Before(b.globalMinTimestamp) || b.globalMinTimestamp.IsZero() { b.globalMinTimestamp = ts b.metrics.minTimestamp.Set(float64(ts.Unix())) } if ts.After(b.globalMaxTimestamp) || b.globalMaxTimestamp.IsZero() { b.globalMaxTimestamp = ts b.metrics.maxTimestamp.Set(float64(ts.Unix())) } } // AppendValue may only be used for copying streams from an existing section. func (b *Builder) AppendValue(val Stream) { newStream := streamPool.Get().(*Stream) newStream.Reset() newStream.ID = val.ID newStream.MinTimestamp, newStream.MaxTimestamp = val.MinTimestamp, val.MaxTimestamp newStream.UncompressedSize = val.UncompressedSize newStream.Labels = val.Labels newStream.Rows = val.Rows newStream.Labels.Range(func(l labels.Label) { b.currentLabelsSize += len(l.Value) }) hash := labels.StableHash(newStream.Labels) b.lookup[hash] = append(b.lookup[hash], newStream) b.ordered = append(b.ordered, newStream) } // EstimatedSize returns the estimated size of the Streams section in bytes. func (b *Builder) EstimatedSize() int { // Since columns are only built when encoding, we can't use // [dataset.ColumnBuilder.EstimatedSize] here. // // Instead, we use a basic heuristic, estimating delta encoding and // compression: // // 1. Assume an ID delta of 1. // 2. Assume a timestamp delta of 1s. // 3. Assume a row count delta of 500. // 4. Assume (conservative) 2x compression ratio of all label values. var ( idDeltaSize = streamio.VarintSize(1) timestampDeltaSize = streamio.VarintSize(int64(time.Second)) rowDeltaSize = streamio.VarintSize(500) ) var sizeEstimate int sizeEstimate += len(b.ordered) * idDeltaSize // ID sizeEstimate += len(b.ordered) * timestampDeltaSize // Min timestamp sizeEstimate += len(b.ordered) * timestampDeltaSize // Max timestamp sizeEstimate += len(b.ordered) * rowDeltaSize // Rows sizeEstimate += b.currentLabelsSize / 2 // All labels (2x compression ratio) return sizeEstimate } func (b *Builder) getOrAddStream(streamLabels labels.Labels) *Stream { hash := labels.StableHash(streamLabels) matches, ok := b.lookup[hash] if !ok { return b.addStream(hash, streamLabels) } for _, stream := range matches { if labels.Equal(stream.Labels, streamLabels) { return stream } } return b.addStream(hash, streamLabels) } func (b *Builder) addStream(hash uint64, streamLabels labels.Labels) *Stream { streamLabels.Range(func(l labels.Label) { b.currentLabelsSize += len(l.Value) }) newStream := streamPool.Get().(*Stream) newStream.Reset() newStream.ID = b.lastID.Add(1) newStream.Labels = streamLabels b.lookup[hash] = append(b.lookup[hash], newStream) b.ordered = append(b.ordered, newStream) b.metrics.streamCount.Inc() return newStream } // StreamID returns the stream ID for the provided streamLabels. If the stream // has not been recorded, StreamID returns 0. func (b *Builder) StreamID(streamLabels labels.Labels) int64 { hash := labels.StableHash(streamLabels) matches, ok := b.lookup[hash] if !ok { return 0 } for _, stream := range matches { if labels.Equal(stream.Labels, streamLabels) { return stream.ID } } return 0 } // Flush flushes the streams section to the provided writer. // // After successful encoding, b is reset to a fresh state and can be reused. func (b *Builder) Flush(w dataobj.SectionWriter) (n int64, err error) { timer := prometheus.NewTimer(b.metrics.encodeSeconds) defer timer.ObserveDuration() var columnarEnc columnar.Encoder defer columnarEnc.Reset() if err := b.encodeTo(&columnarEnc); err != nil { return 0, fmt.Errorf("building encoder: %w", err) } columnarEnc.SetTenant(b.tenant) n, err = columnarEnc.Flush(w) if err == nil { b.Reset() } return n, err } func (b *Builder) encodeTo(enc *columnar.Encoder) error { // TODO(rfratto): handle one section becoming too large. This can happen when // the number of columns is very wide. There are two approaches to handle // this: // // 1. Split streams into multiple sections. // 2. Move some columns into an aggregated column which holds multiple label // keys and values. idBuilder, err := numberColumnBuilder(ColumnTypeStreamID, b.pageSize, b.pageRowCount) if err != nil { return fmt.Errorf("creating ID column: %w", err) } minTimestampBuilder, err := numberColumnBuilder(ColumnTypeMinTimestamp, b.pageSize, b.pageRowCount) if err != nil { return fmt.Errorf("creating minimum timestamp column: %w", err) } maxTimestampBuilder, err := numberColumnBuilder(ColumnTypeMaxTimestamp, b.pageSize, b.pageRowCount) if err != nil { return fmt.Errorf("creating maximum timestamp column: %w", err) } rowsCountBuilder, err := numberColumnBuilder(ColumnTypeRows, b.pageSize, b.pageRowCount) if err != nil { return fmt.Errorf("creating rows column: %w", err) } uncompressedSizeBuilder, err := numberColumnBuilder(ColumnTypeUncompressedSize, b.pageSize, b.pageRowCount) if err != nil { return fmt.Errorf("creating uncompressed size column: %w", err) } var ( labelBuilders []*dataset.ColumnBuilder labelBuilderlookup = map[string]int{} // Name to index ) getLabelColumn := func(name string) (*dataset.ColumnBuilder, error) { idx, ok := labelBuilderlookup[name] if ok { return labelBuilders[idx], nil } builder, err := dataset.NewColumnBuilder(name, dataset.BuilderOptions{ PageSizeHint: b.pageSize, PageMaxRowCount: b.pageRowCount, Type: dataset.ColumnType{ Physical: datasetmd.PHYSICAL_TYPE_BINARY, Logical: ColumnTypeLabel.String(), }, Encoding: datasetmd.ENCODING_TYPE_PLAIN, Compression: datasetmd.COMPRESSION_TYPE_ZSTD, Statistics: dataset.StatisticsOptions{ StoreRangeStats: true, }, }) if err != nil { return nil, fmt.Errorf("creating label column: %w", err) } labelBuilders = append(labelBuilders, builder) labelBuilderlookup[name] = len(labelBuilders) - 1 return builder, nil } // Populate our column builders. for i, stream := range b.ordered { // Append only fails if the rows are out-of-order, which can't happen here. _ = idBuilder.Append(i, dataset.Int64Value(stream.ID)) _ = minTimestampBuilder.Append(i, dataset.Int64Value(stream.MinTimestamp.UnixNano())) _ = maxTimestampBuilder.Append(i, dataset.Int64Value(stream.MaxTimestamp.UnixNano())) _ = rowsCountBuilder.Append(i, dataset.Int64Value(int64(stream.Rows))) _ = uncompressedSizeBuilder.Append(i, dataset.Int64Value(stream.UncompressedSize)) err := stream.Labels.Validate(func(label labels.Label) error { builder, err := getLabelColumn(label.Name) if err != nil { return fmt.Errorf("getting label column: %w", err) } _ = builder.Append(i, dataset.BinaryValue([]byte(label.Value))) return nil }) if err != nil { return err } } // Encode our builders to sections. We ignore errors after enc.OpenStreams // (which may fail due to a caller) since we guarantee correct usage of the // encoding API. { var errs []error errs = append(errs, encodeColumn(enc, ColumnTypeStreamID, idBuilder)) errs = append(errs, encodeColumn(enc, ColumnTypeMinTimestamp, minTimestampBuilder)) errs = append(errs, encodeColumn(enc, ColumnTypeMaxTimestamp, maxTimestampBuilder)) errs = append(errs, encodeColumn(enc, ColumnTypeRows, rowsCountBuilder)) errs = append(errs, encodeColumn(enc, ColumnTypeUncompressedSize, uncompressedSizeBuilder)) if err := errors.Join(errs...); err != nil { return fmt.Errorf("encoding columns: %w", err) } } for _, labelBuilder := range labelBuilders { // For consistency we'll make sure each label builder has the same number // of rows as the other columns (which is the number of streams). labelBuilder.Backfill(len(b.ordered)) err := encodeColumn(enc, ColumnTypeLabel, labelBuilder) if err != nil { return fmt.Errorf("encoding label column: %w", err) } } return nil } func numberColumnBuilder(columnType ColumnType, pageSize, pageRowCount int) (*dataset.ColumnBuilder, error) { return dataset.NewColumnBuilder("", dataset.BuilderOptions{ PageSizeHint: pageSize, PageMaxRowCount: pageRowCount, Type: dataset.ColumnType{ Physical: datasetmd.PHYSICAL_TYPE_INT64, Logical: columnType.String(), }, Encoding: datasetmd.ENCODING_TYPE_DELTA, Compression: datasetmd.COMPRESSION_TYPE_NONE, Statistics: dataset.StatisticsOptions{ StoreRangeStats: true, }, }) } func encodeColumn(enc *columnar.Encoder, columnType ColumnType, builder *dataset.ColumnBuilder) error { column, err := builder.Flush() if err != nil { return fmt.Errorf("flushing %s column: %w", columnType, err) } columnEnc, err := enc.OpenColumn(column.ColumnDesc()) if err != nil { return fmt.Errorf("opening %s column encoder: %w", columnType, err) } defer func() { // Discard on defer for safety. This will return an error if we // successfully committed. _ = columnEnc.Discard() }() if len(column.Pages) == 0 { // Column has no data; discard. return nil } for _, page := range column.Pages { err := columnEnc.AppendPage(page) if err != nil { return fmt.Errorf("appending %s page: %w", columnType, err) } } return columnEnc.Commit() } // Reset resets all state, allowing Streams to be reused. func (b *Builder) Reset() { b.lastID.Store(0) for _, stream := range b.ordered { streamPool.Put(stream) } clear(b.lookup) b.tenant = "" b.ordered = sliceclear.Clear(b.ordered) b.currentLabelsSize = 0 b.globalMinTimestamp = time.Time{} b.globalMaxTimestamp = time.Time{} b.metrics.streamCount.Set(0) b.metrics.minTimestamp.Set(0) b.metrics.maxTimestamp.Set(0) }