perf(approx_topk): Reduce memory usage of HyperLogLog in approx_topk. (#15559)

The count min sketch data structure backing the new approx_topk aggregation uses HyperLogLog (HLL) to track the actual cardinality of the aggregated vector.

We were using the sparse version of the HLL. However, that resulted in memory and allocation overhead.
pull/15669/head
Karsten Jeschkies 1 year ago committed by GitHub
parent 5f476a39e1
commit bef20431cb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 7
      pkg/logql/count_min_sketch.go
  2. 5
      pkg/logql/log/labels.go
  3. 14
      pkg/logql/log/parser_test.go
  4. 2
      pkg/logql/sketch/cms.go

@ -3,6 +3,8 @@ package logql
import (
"container/heap"
"fmt"
"slices"
"strings"
"github.com/axiomhq/hyperloglog"
"github.com/cespare/xxhash/v2"
@ -187,16 +189,17 @@ func NewHeapCountMinSketchVector(ts int64, metricsLength, maxLabels int) HeapCou
}
func (v *HeapCountMinSketchVector) Add(metric labels.Labels, value float64) {
slices.SortFunc(metric, func(a, b labels.Label) int { return strings.Compare(a.Name, b.Name) })
v.buffer = metric.Bytes(v.buffer)
v.F.Add(v.buffer, value)
// Add our metric if we haven't seen it
// TODO(karsten): There is a chance that the ids match but not the labels due to hash collision. Ideally there's
// an else block the compares the series labels. However, that's not trivial. Besides, instance.Series has the
// same issue in its deduping logic.
id := xxhash.Sum64(v.buffer)
// Add our metric if we haven't seen it
if _, ok := v.observed[id]; !ok {
heap.Push(v, metric)
v.observed[id] = struct{}{}

@ -2,7 +2,9 @@ package log
import (
"fmt"
"slices"
"sort"
"strings"
"sync"
"github.com/prometheus/prometheus/model/labels"
@ -585,7 +587,8 @@ func (b *LabelsBuilder) LabelsResult() LabelsResult {
// Get all labels at once and sort them
b.buf = b.UnsortedLabels(b.buf)
sort.Sort(b.buf)
// sort.Sort(b.buf)
slices.SortFunc(b.buf, func(a, b labels.Label) int { return strings.Compare(a.Name, b.Name) })
hash := b.hasher.Hash(b.buf)
if cached, ok := b.resultCache[hash]; ok {

@ -241,25 +241,32 @@ func (p *fakeParseHints) ShouldExtract(key string) bool {
p.checkCount++
return key == p.label || p.extractAll
}
func (p *fakeParseHints) ShouldExtractPrefix(prefix string) bool {
return prefix == p.label || p.extractAll
}
func (p *fakeParseHints) NoLabels() bool {
return false
}
func (p *fakeParseHints) RecordExtracted(_ string) {
p.count++
}
func (p *fakeParseHints) AllRequiredExtracted() bool {
return !p.extractAll && p.count == 1
}
func (p *fakeParseHints) Reset() {
p.checkCount = 0
p.count = 0
}
func (p *fakeParseHints) PreserveError() bool {
return false
}
func (p *fakeParseHints) ShouldContinueParsingLine(_ string, _ *LabelsBuilder) bool {
return p.keepGoing
}
@ -656,30 +663,36 @@ func Benchmark_Parser(b *testing.B) {
b.Run(tt.name, func(b *testing.B) {
line := []byte(tt.line)
b.Run("no labels hints", func(b *testing.B) {
b.ReportAllocs()
builder := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash())
for n := 0; n < b.N; n++ {
builder.Reset()
_, _ = tt.s.Process(0, line, builder)
builder.LabelsResult()
}
})
b.Run("labels hints", func(b *testing.B) {
b.ReportAllocs()
builder := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash())
builder.parserKeyHints = NewParserHint(tt.LabelParseHints, tt.LabelParseHints, false, false, "", nil)
for n := 0; n < b.N; n++ {
builder.Reset()
_, _ = tt.s.Process(0, line, builder)
builder.LabelsResult()
}
})
b.Run("inline stages", func(b *testing.B) {
b.ReportAllocs()
stages := []Stage{NewStringLabelFilter(tt.LabelFilterParseHint)}
builder := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash())
builder.parserKeyHints = NewParserHint(nil, nil, false, false, ", nil", stages)
for n := 0; n < b.N; n++ {
builder.Reset()
_, _ = tt.s.Process(0, line, builder)
builder.LabelsResult()
}
})
})
@ -1251,7 +1264,6 @@ func TestXExpressionParserFailures(t *testing.T) {
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := NewLogfmtExpressionParser([]LabelExtractionExpr{tt.expression}, false)

@ -19,7 +19,7 @@ func NewCountMinSketch(w, d uint32) (*CountMinSketch, error) {
Depth: d,
Width: w,
Counters: make2dslice(w, d),
HyperLogLog: hyperloglog.New16(),
HyperLogLog: hyperloglog.New16NoSparse(),
}, nil
}

Loading…
Cancel
Save