diff --git a/pkg/dataobj/internal/sections/streams/streams.go b/pkg/dataobj/internal/sections/streams/streams.go index a90c64cac1..3fc583342b 100644 --- a/pkg/dataobj/internal/sections/streams/streams.go +++ b/pkg/dataobj/internal/sections/streams/streams.go @@ -39,6 +39,10 @@ type Streams struct { lastID atomic.Int64 lookup map[uint64][]*Stream + // Size of all label values across all streams; used for + // [Streams.EstimatedSize]. Resets on [Streams.Reset]. + currentLabelsSize int + globalMinTimestamp time.Time // Minimum timestamp across all streams, used for metrics. globalMaxTimestamp time.Time // Maximum timestamp across all streams, used for metrics. @@ -112,20 +116,13 @@ func (s *Streams) EstimatedSize() int { rowDeltaSize = streamio.VarintSize(500) ) - var labelsSize int - for _, stream := range s.ordered { - for _, lbl := range stream.Labels { - labelsSize += len(lbl.Value) - } - } - var sizeEstimate int sizeEstimate += len(s.ordered) * idDeltaSize // ID sizeEstimate += len(s.ordered) * timestampDeltaSize // Min timestamp sizeEstimate += len(s.ordered) * timestampDeltaSize // Max timestamp sizeEstimate += len(s.ordered) * rowDeltaSize // Rows - sizeEstimate += labelsSize / 2 // All labels (2x compression ratio) + sizeEstimate += s.currentLabelsSize / 2 // All labels (2x compression ratio) return sizeEstimate } @@ -151,6 +148,10 @@ func (s *Streams) addStream(hash uint64, streamLabels labels.Labels) *Stream { // ordering. sort.Sort(streamLabels) + for _, lbl := range streamLabels { + s.currentLabelsSize += len(lbl.Value) + } + newStream := &Stream{ID: s.lastID.Add(1), Labels: streamLabels} s.lookup[hash] = append(s.lookup[hash], newStream) s.ordered = append(s.ordered, newStream) @@ -333,6 +334,7 @@ func (s *Streams) Reset() { s.lastID.Store(0) clear(s.lookup) s.ordered = s.ordered[:0] + s.currentLabelsSize = 0 s.globalMinTimestamp = time.Time{} s.globalMaxTimestamp = time.Time{}