From e196b977afdfd3cc72ac15de97845bec056a8a3d Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Wed, 25 Sep 2024 10:38:47 +0200 Subject: [PATCH] Process MemPostings.Delete() with GOMAXPROCS workers We are still seeing lock contention on MemPostings.mtx, and MemPostings.Delete() is by far the most expensive operation on that mutex. This adds parallelism to that method, trying to reduce the amount of time we spend with the mutex held. Signed-off-by: Oleg Zaytsev --- tsdb/index/postings.go | 41 +++++++++++++++++++++++++++++++++---- tsdb/index/postings_test.go | 2 +- 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index bfe74c323d..25780e4ad8 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -26,6 +26,7 @@ import ( "sync" "github.com/bboreham/go-loser" + "github.com/cespare/xxhash/v2" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" @@ -293,6 +294,9 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma p.mtx.Lock() defer p.mtx.Unlock() + // Deleting label names mutates p.m map, so it should be done from a single goroutine after nobody else is reading it. + deleteLabelNames := make(chan string, len(p.m)) + process := func(l labels.Label) { orig := p.m[l.Name][l.Value] repl := make([]storage.SeriesRef, 0, len(orig)) @@ -305,17 +309,46 @@ func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected ma p.m[l.Name][l.Value] = repl } else { delete(p.m[l.Name], l.Value) - // Delete the key if we removed all values. if len(p.m[l.Name]) == 0 { - delete(p.m, l.Name) + // Delete the key if we removed all values. + deleteLabelNames <- l.Name } } } + // Create GOMAXPROCS workers. + wg := sync.WaitGroup{} + jobs := make([]chan labels.Label, runtime.GOMAXPROCS(0)) + for i := range jobs { + jobs[i] = make(chan labels.Label, 128) + wg.Add(1) + go func(jobs chan labels.Label) { + defer wg.Done() + for l := range jobs { + process(l) + } + }(jobs[i]) + } + + // Process all affected labels and the allPostingsKey. for l := range affected { - process(l) + j := int(xxhash.Sum64String(l.Name) % uint64(len(jobs))) + jobs[j] <- l + } + j := int(xxhash.Sum64String(allPostingsKey.Name) % uint64(len(jobs))) + jobs[j] <- allPostingsKey + + // Close jobs channels and wait all workers to finish. + for i := range jobs { + close(jobs[i]) + } + wg.Wait() + + // Close deleteLabelNames channel and delete the label names requested. + close(deleteLabelNames) + for name := range deleteLabelNames { + delete(p.m, name) } - process(allPostingsKey) } // Iter calls f for each postings list. It aborts if f returns an error and returns it. diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index 96c9ed124b..1802c9e891 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -1025,7 +1025,7 @@ func BenchmarkMemPostings_Delete(b *testing.B) { return s } - const total = 1e6 + const total = 2e6 allSeries := [total]labels.Labels{} nameValues := make([]string, 0, 100) for i := 0; i < total; i++ {