From 0c58c788285384a25d99c459efdbdc2ad4700469 Mon Sep 17 00:00:00 2001 From: Ed Welch Date: Fri, 6 Oct 2023 09:34:39 -0400 Subject: [PATCH] Index Gateway: Set some concurrency limits around concurrent index table processing (#10768) **What this PR does / why we need it**: This addresses an issue we've seen with index gateways when handling queries that touch a large number of series, although these are improvements not necessarily solutions. We've seen a tremendous amount of goroutines being spawned while processing GetChunkRefs requests, specifically when the gateways have recently found a lot of uncompacted recent ingester tables. This PR puts some limits on the parallelism of processing tables, the implementation is a bit arbitrary, we limit the concurrent iterations to GOMAXPROCS/2 In practice this is potentially still too much as an index gateway will be serving many requests in parallel witch each able to do parallel table iteration, however it's a more reasonable line in the sand than having no limits and eases some pressure on the Go scheduler. **Which issue(s) this PR fixes**: Fixes # **Special notes for your reviewer**: **Checklist** - [ ] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [ ] Tests updated - [ ] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/setup/upgrade/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) --------- Signed-off-by: Edward Welch --- .../indexshipper/downloads/index_set.go | 13 +++++++++- .../shipper/indexshipper/downloads/table.go | 18 +++++++++++++ .../stores/shipper/indexshipper/shipper.go | 1 + .../tsdb/index_shipper_querier.go | 2 +- .../indexshipper/tsdb/multi_file_index.go | 26 ++++++++++++++----- 5 files changed, 52 insertions(+), 8 deletions(-) diff --git a/pkg/storage/stores/shipper/indexshipper/downloads/index_set.go b/pkg/storage/stores/shipper/indexshipper/downloads/index_set.go index 1a2cefb491..b9a24d5253 100644 --- a/pkg/storage/stores/shipper/indexshipper/downloads/index_set.go +++ b/pkg/storage/stores/shipper/indexshipper/downloads/index_set.go @@ -7,6 +7,7 @@ import ( "io" "os" "path/filepath" + "runtime" "strings" "sync" "time" @@ -51,6 +52,7 @@ type indexSet struct { tableName, userID string cacheLocation string logger log.Logger + maxConcurrent int lastUsedAt time.Time index map[string]index.Index @@ -73,6 +75,11 @@ func NewIndexSet(tableName, userID, cacheLocation string, baseIndexSet storage.I return nil, err } + maxConcurrent := runtime.GOMAXPROCS(0) / 2 + if maxConcurrent == 0 { + maxConcurrent = 1 + } + is := indexSet{ openIndexFileFunc: openIndexFileFunc, baseIndexSet: baseIndexSet, @@ -80,6 +87,7 @@ func NewIndexSet(tableName, userID, cacheLocation string, baseIndexSet storage.I userID: userID, cacheLocation: cacheLocation, logger: logger, + maxConcurrent: maxConcurrent, lastUsedAt: time.Now(), index: map[string]index.Index{}, indexMtx: newMtxWithReadiness(), @@ -202,7 +210,10 @@ func (t *indexSet) ForEachConcurrent(ctx context.Context, callback index.ForEach defer t.indexMtx.rUnlock() g, ctx := errgroup.WithContext(ctx) - g.SetLimit(200) + if t.maxConcurrent == 0 { + panic("maxConcurrent cannot be 0, indexSet is being initialized without setting maxConcurrent") + } + g.SetLimit(t.maxConcurrent) logger := util_log.WithContext(ctx, t.logger) level.Debug(logger).Log("index-files-count", len(t.index)) diff --git a/pkg/storage/stores/shipper/indexshipper/downloads/table.go b/pkg/storage/stores/shipper/indexshipper/downloads/table.go index 6e4d8093b7..47c78924f2 100644 --- a/pkg/storage/stores/shipper/indexshipper/downloads/table.go +++ b/pkg/storage/stores/shipper/indexshipper/downloads/table.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path/filepath" + "runtime" "sync" "time" @@ -44,6 +45,7 @@ type table struct { storageClient storage.Client openIndexFileFunc index.OpenIndexFileFunc metrics *metrics + maxConcurrent int baseUserIndexSet, baseCommonIndexSet storage.IndexSet @@ -55,6 +57,11 @@ type table struct { // NewTable just creates an instance of table without trying to load files from local storage or object store. // It is used for initializing table at query time. func NewTable(name, cacheLocation string, storageClient storage.Client, openIndexFileFunc index.OpenIndexFileFunc, metrics *metrics) Table { + maxConcurrent := runtime.GOMAXPROCS(0) / 2 + if maxConcurrent == 0 { + maxConcurrent = 1 + } + table := table{ name: name, cacheLocation: cacheLocation, @@ -64,6 +71,7 @@ func NewTable(name, cacheLocation string, storageClient storage.Client, openInde logger: log.With(util_log.Logger, "table-name", name), openIndexFileFunc: openIndexFileFunc, metrics: metrics, + maxConcurrent: maxConcurrent, indexSets: map[string]IndexSet{}, } @@ -83,6 +91,11 @@ func LoadTable(name, cacheLocation string, storageClient storage.Client, openInd return nil, err } + maxConcurrent := runtime.GOMAXPROCS(0) / 2 + if maxConcurrent == 0 { + maxConcurrent = 1 + } + table := table{ name: name, cacheLocation: cacheLocation, @@ -93,6 +106,7 @@ func LoadTable(name, cacheLocation string, storageClient storage.Client, openInd indexSets: map[string]IndexSet{}, openIndexFileFunc: openIndexFileFunc, metrics: metrics, + maxConcurrent: maxConcurrent, } level.Debug(table.logger).Log("msg", fmt.Sprintf("opening locally present files for table %s", name), "files", fmt.Sprint(dirEntries)) @@ -148,6 +162,10 @@ func (t *table) Close() { func (t *table) ForEachConcurrent(ctx context.Context, userID string, callback index.ForEachIndexCallback) error { g, ctx := errgroup.WithContext(ctx) + if t.maxConcurrent == 0 { + panic("maxConcurrent cannot be 0, downloads.table is being initialized without setting maxConcurrent") + } + g.SetLimit(t.maxConcurrent) // iterate through both user and common index users := []string{userID, ""} diff --git a/pkg/storage/stores/shipper/indexshipper/shipper.go b/pkg/storage/stores/shipper/indexshipper/shipper.go index b4c29a7520..f947070cc8 100644 --- a/pkg/storage/stores/shipper/indexshipper/shipper.go +++ b/pkg/storage/stores/shipper/indexshipper/shipper.go @@ -224,6 +224,7 @@ func (s *indexShipper) ForEach(ctx context.Context, tableName, userID string, ca func (s *indexShipper) ForEachConcurrent(ctx context.Context, tableName, userID string, callback index.ForEachIndexCallback) error { g, ctx := errgroup.WithContext(ctx) + // E.Welch not setting a bound on the errgroup here because we set one inside the downloadsManager.ForEachConcurrent if s.downloadsManager != nil { g.Go(func() error { diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/index_shipper_querier.go b/pkg/storage/stores/shipper/indexshipper/tsdb/index_shipper_querier.go index 1a89df0c26..c07add72b6 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/index_shipper_querier.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/index_shipper_querier.go @@ -33,7 +33,7 @@ func newIndexShipperQuerier(shipper indexShipperIterator, tableRange config.Tabl type indexIterFunc func(func(context.Context, Index) error) error -func (i indexIterFunc) For(_ context.Context, f func(context.Context, Index) error) error { +func (i indexIterFunc) For(_ context.Context, _ int, f func(context.Context, Index) error) error { return i(f) } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/multi_file_index.go b/pkg/storage/stores/shipper/indexshipper/tsdb/multi_file_index.go index 41f1f1a834..50a162533d 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/multi_file_index.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/multi_file_index.go @@ -3,6 +3,7 @@ package tsdb import ( "context" "math" + "runtime" "sync" "github.com/prometheus/common/model" @@ -14,8 +15,9 @@ import ( ) type MultiIndex struct { - iter IndexIter - filterer chunk.RequestChunkFilterer + iter IndexIter + filterer chunk.RequestChunkFilterer + maxParallel int } type IndexIter interface { @@ -26,13 +28,18 @@ type IndexIter interface { // Lazy iteration may touch different index files within the same index query. // `For` e.g, Bounds and GetChunkRefs might go through different index files // if a sync happened between the calls. - For(context.Context, func(context.Context, Index) error) error + // The second parameter sets a limit on the number of indexes iterated concurrently. + For(context.Context, int, func(context.Context, Index) error) error } type IndexSlice []Index -func (xs IndexSlice) For(ctx context.Context, fn func(context.Context, Index) error) error { +func (xs IndexSlice) For(ctx context.Context, maxConcurrent int, fn func(context.Context, Index) error) error { g, ctx := errgroup.WithContext(ctx) + if maxConcurrent == 0 { + panic("maxConcurrent cannot be 0, IndexIter is being called with a maxConcurrent of 0") + } + g.SetLimit(maxConcurrent) for i := range xs { x := xs[i] g.Go(func() error { @@ -43,7 +50,14 @@ func (xs IndexSlice) For(ctx context.Context, fn func(context.Context, Index) er } func NewMultiIndex(i IndexIter) *MultiIndex { - return &MultiIndex{iter: i} + maxConcurrent := runtime.GOMAXPROCS(0) / 2 + if maxConcurrent == 0 { + maxConcurrent = 1 + } + return &MultiIndex{ + iter: i, + maxParallel: maxConcurrent, + } } func (i *MultiIndex) Bounds() (model.Time, model.Time) { @@ -90,7 +104,7 @@ func (i *MultiIndex) Close() error { func (i *MultiIndex) forMatchingIndices(ctx context.Context, from, through model.Time, f func(context.Context, Index) error) error { queryBounds := newBounds(from, through) - return i.iter.For(ctx, func(ctx context.Context, idx Index) error { + return i.iter.For(ctx, i.maxParallel, func(ctx context.Context, idx Index) error { if Overlap(queryBounds, idx) { if i.filterer != nil {