|
|
|
|
@ -39,6 +39,10 @@ type Streams struct { |
|
|
|
|
lastID atomic.Int64 |
|
|
|
|
lookup map[uint64][]*Stream |
|
|
|
|
|
|
|
|
|
// Size of all label values across all streams; used for
|
|
|
|
|
// [Streams.EstimatedSize]. Resets on [Streams.Reset].
|
|
|
|
|
currentLabelsSize int |
|
|
|
|
|
|
|
|
|
globalMinTimestamp time.Time // Minimum timestamp across all streams, used for metrics.
|
|
|
|
|
globalMaxTimestamp time.Time // Maximum timestamp across all streams, used for metrics.
|
|
|
|
|
|
|
|
|
|
@ -112,20 +116,13 @@ func (s *Streams) EstimatedSize() int { |
|
|
|
|
rowDeltaSize = streamio.VarintSize(500) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
var labelsSize int |
|
|
|
|
for _, stream := range s.ordered { |
|
|
|
|
for _, lbl := range stream.Labels { |
|
|
|
|
labelsSize += len(lbl.Value) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var sizeEstimate int |
|
|
|
|
|
|
|
|
|
sizeEstimate += len(s.ordered) * idDeltaSize // ID
|
|
|
|
|
sizeEstimate += len(s.ordered) * timestampDeltaSize // Min timestamp
|
|
|
|
|
sizeEstimate += len(s.ordered) * timestampDeltaSize // Max timestamp
|
|
|
|
|
sizeEstimate += len(s.ordered) * rowDeltaSize // Rows
|
|
|
|
|
sizeEstimate += labelsSize / 2 // All labels (2x compression ratio)
|
|
|
|
|
sizeEstimate += s.currentLabelsSize / 2 // All labels (2x compression ratio)
|
|
|
|
|
|
|
|
|
|
return sizeEstimate |
|
|
|
|
} |
|
|
|
|
@ -151,6 +148,10 @@ func (s *Streams) addStream(hash uint64, streamLabels labels.Labels) *Stream { |
|
|
|
|
// ordering.
|
|
|
|
|
sort.Sort(streamLabels) |
|
|
|
|
|
|
|
|
|
for _, lbl := range streamLabels { |
|
|
|
|
s.currentLabelsSize += len(lbl.Value) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
newStream := &Stream{ID: s.lastID.Add(1), Labels: streamLabels} |
|
|
|
|
s.lookup[hash] = append(s.lookup[hash], newStream) |
|
|
|
|
s.ordered = append(s.ordered, newStream) |
|
|
|
|
@ -333,6 +334,7 @@ func (s *Streams) Reset() { |
|
|
|
|
s.lastID.Store(0) |
|
|
|
|
clear(s.lookup) |
|
|
|
|
s.ordered = s.ordered[:0] |
|
|
|
|
s.currentLabelsSize = 0 |
|
|
|
|
s.globalMinTimestamp = time.Time{} |
|
|
|
|
s.globalMaxTimestamp = time.Time{} |
|
|
|
|
|
|
|
|
|
|