From 7eb5053d1c7f3aadbc0306a5e5cc56bc7d6edf4e Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Mon, 20 Jan 2025 03:22:44 -0500 Subject: [PATCH] chore(dataobj): improve performance of Streams.EstimatedSize (#15833) --- .../internal/sections/streams/streams.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/pkg/dataobj/internal/sections/streams/streams.go b/pkg/dataobj/internal/sections/streams/streams.go index a90c64cac1..3fc583342b 100644 --- a/pkg/dataobj/internal/sections/streams/streams.go +++ b/pkg/dataobj/internal/sections/streams/streams.go @@ -39,6 +39,10 @@ type Streams struct { lastID atomic.Int64 lookup map[uint64][]*Stream + // Size of all label values across all streams; used for + // [Streams.EstimatedSize]. Resets on [Streams.Reset]. + currentLabelsSize int + globalMinTimestamp time.Time // Minimum timestamp across all streams, used for metrics. globalMaxTimestamp time.Time // Maximum timestamp across all streams, used for metrics. @@ -112,20 +116,13 @@ func (s *Streams) EstimatedSize() int { rowDeltaSize = streamio.VarintSize(500) ) - var labelsSize int - for _, stream := range s.ordered { - for _, lbl := range stream.Labels { - labelsSize += len(lbl.Value) - } - } - var sizeEstimate int sizeEstimate += len(s.ordered) * idDeltaSize // ID sizeEstimate += len(s.ordered) * timestampDeltaSize // Min timestamp sizeEstimate += len(s.ordered) * timestampDeltaSize // Max timestamp sizeEstimate += len(s.ordered) * rowDeltaSize // Rows - sizeEstimate += labelsSize / 2 // All labels (2x compression ratio) + sizeEstimate += s.currentLabelsSize / 2 // All labels (2x compression ratio) return sizeEstimate } @@ -151,6 +148,10 @@ func (s *Streams) addStream(hash uint64, streamLabels labels.Labels) *Stream { // ordering. sort.Sort(streamLabels) + for _, lbl := range streamLabels { + s.currentLabelsSize += len(lbl.Value) + } + newStream := &Stream{ID: s.lastID.Add(1), Labels: streamLabels} s.lookup[hash] = append(s.lookup[hash], newStream) s.ordered = append(s.ordered, newStream) @@ -333,6 +334,7 @@ func (s *Streams) Reset() { s.lastID.Store(0) clear(s.lookup) s.ordered = s.ordered[:0] + s.currentLabelsSize = 0 s.globalMinTimestamp = time.Time{} s.globalMaxTimestamp = time.Time{}