From bef20431cbbf302e584c4eea2eb423537bcf86e7 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Thu, 9 Jan 2025 11:27:23 +0100 Subject: [PATCH] perf(approx_topk): Reduce memory usage of HyperLogLog in approx_topk. (#15559) The count min sketch data structure backing the new approx_topk aggregation uses HyperLogLog (HLL) to track the actual cardinality of the aggregated vector. We were using the sparse version of the HLL. However, that resulted in memory and allocation overhead. --- pkg/logql/count_min_sketch.go | 7 +++++-- pkg/logql/log/labels.go | 5 ++++- pkg/logql/log/parser_test.go | 14 +++++++++++++- pkg/logql/sketch/cms.go | 2 +- 4 files changed, 23 insertions(+), 5 deletions(-) diff --git a/pkg/logql/count_min_sketch.go b/pkg/logql/count_min_sketch.go index e24e089ad3..927ff6ff77 100644 --- a/pkg/logql/count_min_sketch.go +++ b/pkg/logql/count_min_sketch.go @@ -3,6 +3,8 @@ package logql import ( "container/heap" "fmt" + "slices" + "strings" "github.com/axiomhq/hyperloglog" "github.com/cespare/xxhash/v2" @@ -187,16 +189,17 @@ func NewHeapCountMinSketchVector(ts int64, metricsLength, maxLabels int) HeapCou } func (v *HeapCountMinSketchVector) Add(metric labels.Labels, value float64) { + slices.SortFunc(metric, func(a, b labels.Label) int { return strings.Compare(a.Name, b.Name) }) v.buffer = metric.Bytes(v.buffer) v.F.Add(v.buffer, value) - // Add our metric if we haven't seen it - // TODO(karsten): There is a chance that the ids match but not the labels due to hash collision. Ideally there's // an else block the compares the series labels. However, that's not trivial. Besides, instance.Series has the // same issue in its deduping logic. id := xxhash.Sum64(v.buffer) + + // Add our metric if we haven't seen it if _, ok := v.observed[id]; !ok { heap.Push(v, metric) v.observed[id] = struct{}{} diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index c5ef408cc2..9e494f9923 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -2,7 +2,9 @@ package log import ( "fmt" + "slices" "sort" + "strings" "sync" "github.com/prometheus/prometheus/model/labels" @@ -585,7 +587,8 @@ func (b *LabelsBuilder) LabelsResult() LabelsResult { // Get all labels at once and sort them b.buf = b.UnsortedLabels(b.buf) - sort.Sort(b.buf) + // sort.Sort(b.buf) + slices.SortFunc(b.buf, func(a, b labels.Label) int { return strings.Compare(a.Name, b.Name) }) hash := b.hasher.Hash(b.buf) if cached, ok := b.resultCache[hash]; ok { diff --git a/pkg/logql/log/parser_test.go b/pkg/logql/log/parser_test.go index 5ac3a87503..af332c8cb5 100644 --- a/pkg/logql/log/parser_test.go +++ b/pkg/logql/log/parser_test.go @@ -241,25 +241,32 @@ func (p *fakeParseHints) ShouldExtract(key string) bool { p.checkCount++ return key == p.label || p.extractAll } + func (p *fakeParseHints) ShouldExtractPrefix(prefix string) bool { return prefix == p.label || p.extractAll } + func (p *fakeParseHints) NoLabels() bool { return false } + func (p *fakeParseHints) RecordExtracted(_ string) { p.count++ } + func (p *fakeParseHints) AllRequiredExtracted() bool { return !p.extractAll && p.count == 1 } + func (p *fakeParseHints) Reset() { p.checkCount = 0 p.count = 0 } + func (p *fakeParseHints) PreserveError() bool { return false } + func (p *fakeParseHints) ShouldContinueParsingLine(_ string, _ *LabelsBuilder) bool { return p.keepGoing } @@ -656,30 +663,36 @@ func Benchmark_Parser(b *testing.B) { b.Run(tt.name, func(b *testing.B) { line := []byte(tt.line) b.Run("no labels hints", func(b *testing.B) { + b.ReportAllocs() builder := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash()) for n := 0; n < b.N; n++ { builder.Reset() _, _ = tt.s.Process(0, line, builder) + builder.LabelsResult() } }) b.Run("labels hints", func(b *testing.B) { + b.ReportAllocs() builder := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash()) builder.parserKeyHints = NewParserHint(tt.LabelParseHints, tt.LabelParseHints, false, false, "", nil) for n := 0; n < b.N; n++ { builder.Reset() _, _ = tt.s.Process(0, line, builder) + builder.LabelsResult() } }) b.Run("inline stages", func(b *testing.B) { + b.ReportAllocs() stages := []Stage{NewStringLabelFilter(tt.LabelFilterParseHint)} builder := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash()) builder.parserKeyHints = NewParserHint(nil, nil, false, false, ", nil", stages) for n := 0; n < b.N; n++ { builder.Reset() _, _ = tt.s.Process(0, line, builder) + builder.LabelsResult() } }) }) @@ -1251,7 +1264,6 @@ func TestXExpressionParserFailures(t *testing.T) { }, } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { _, err := NewLogfmtExpressionParser([]LabelExtractionExpr{tt.expression}, false) diff --git a/pkg/logql/sketch/cms.go b/pkg/logql/sketch/cms.go index b510ee8504..67f72be976 100644 --- a/pkg/logql/sketch/cms.go +++ b/pkg/logql/sketch/cms.go @@ -19,7 +19,7 @@ func NewCountMinSketch(w, d uint32) (*CountMinSketch, error) { Depth: d, Width: w, Counters: make2dslice(w, d), - HyperLogLog: hyperloglog.New16(), + HyperLogLog: hyperloglog.New16NoSparse(), }, nil }