Index Gateway: Set some concurrency limits around concurrent index table processing (#10768)

**What this PR does / why we need it**:

This addresses an issue we've seen with index gateways when handling
queries that touch a large number of series, although these are
improvements not necessarily solutions.

We've seen a tremendous amount of goroutines being spawned while
processing GetChunkRefs requests, specifically when the gateways have
recently found a lot of uncompacted recent ingester tables.

This PR puts some limits on the parallelism of processing tables, the
implementation is a bit arbitrary, we limit the concurrent iterations to
GOMAXPROCS/2

In practice this is potentially still too much as an index gateway will
be serving many requests in parallel witch each able to do parallel
table iteration, however it's a more reasonable line in the sand than
having no limits and eases some pressure on the Go scheduler.

**Which issue(s) this PR fixes**:
Fixes #<issue number>

**Special notes for your reviewer**:

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [ ] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e3ec)

---------

Signed-off-by: Edward Welch <edward.welch@grafana.com>
pull/10804/head
Ed Welch 3 years ago committed by GitHub
parent e2ed1c01d4
commit 0c58c78828
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      pkg/storage/stores/shipper/indexshipper/downloads/index_set.go
  2. 18
      pkg/storage/stores/shipper/indexshipper/downloads/table.go
  3. 1
      pkg/storage/stores/shipper/indexshipper/shipper.go
  4. 2
      pkg/storage/stores/shipper/indexshipper/tsdb/index_shipper_querier.go
  5. 26
      pkg/storage/stores/shipper/indexshipper/tsdb/multi_file_index.go

@ -7,6 +7,7 @@ import (
"io"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"time"
@ -51,6 +52,7 @@ type indexSet struct {
tableName, userID string
cacheLocation string
logger log.Logger
maxConcurrent int
lastUsedAt time.Time
index map[string]index.Index
@ -73,6 +75,11 @@ func NewIndexSet(tableName, userID, cacheLocation string, baseIndexSet storage.I
return nil, err
}
maxConcurrent := runtime.GOMAXPROCS(0) / 2
if maxConcurrent == 0 {
maxConcurrent = 1
}
is := indexSet{
openIndexFileFunc: openIndexFileFunc,
baseIndexSet: baseIndexSet,
@ -80,6 +87,7 @@ func NewIndexSet(tableName, userID, cacheLocation string, baseIndexSet storage.I
userID: userID,
cacheLocation: cacheLocation,
logger: logger,
maxConcurrent: maxConcurrent,
lastUsedAt: time.Now(),
index: map[string]index.Index{},
indexMtx: newMtxWithReadiness(),
@ -202,7 +210,10 @@ func (t *indexSet) ForEachConcurrent(ctx context.Context, callback index.ForEach
defer t.indexMtx.rUnlock()
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(200)
if t.maxConcurrent == 0 {
panic("maxConcurrent cannot be 0, indexSet is being initialized without setting maxConcurrent")
}
g.SetLimit(t.maxConcurrent)
logger := util_log.WithContext(ctx, t.logger)
level.Debug(logger).Log("index-files-count", len(t.index))

@ -5,6 +5,7 @@ import (
"fmt"
"os"
"path/filepath"
"runtime"
"sync"
"time"
@ -44,6 +45,7 @@ type table struct {
storageClient storage.Client
openIndexFileFunc index.OpenIndexFileFunc
metrics *metrics
maxConcurrent int
baseUserIndexSet, baseCommonIndexSet storage.IndexSet
@ -55,6 +57,11 @@ type table struct {
// NewTable just creates an instance of table without trying to load files from local storage or object store.
// It is used for initializing table at query time.
func NewTable(name, cacheLocation string, storageClient storage.Client, openIndexFileFunc index.OpenIndexFileFunc, metrics *metrics) Table {
maxConcurrent := runtime.GOMAXPROCS(0) / 2
if maxConcurrent == 0 {
maxConcurrent = 1
}
table := table{
name: name,
cacheLocation: cacheLocation,
@ -64,6 +71,7 @@ func NewTable(name, cacheLocation string, storageClient storage.Client, openInde
logger: log.With(util_log.Logger, "table-name", name),
openIndexFileFunc: openIndexFileFunc,
metrics: metrics,
maxConcurrent: maxConcurrent,
indexSets: map[string]IndexSet{},
}
@ -83,6 +91,11 @@ func LoadTable(name, cacheLocation string, storageClient storage.Client, openInd
return nil, err
}
maxConcurrent := runtime.GOMAXPROCS(0) / 2
if maxConcurrent == 0 {
maxConcurrent = 1
}
table := table{
name: name,
cacheLocation: cacheLocation,
@ -93,6 +106,7 @@ func LoadTable(name, cacheLocation string, storageClient storage.Client, openInd
indexSets: map[string]IndexSet{},
openIndexFileFunc: openIndexFileFunc,
metrics: metrics,
maxConcurrent: maxConcurrent,
}
level.Debug(table.logger).Log("msg", fmt.Sprintf("opening locally present files for table %s", name), "files", fmt.Sprint(dirEntries))
@ -148,6 +162,10 @@ func (t *table) Close() {
func (t *table) ForEachConcurrent(ctx context.Context, userID string, callback index.ForEachIndexCallback) error {
g, ctx := errgroup.WithContext(ctx)
if t.maxConcurrent == 0 {
panic("maxConcurrent cannot be 0, downloads.table is being initialized without setting maxConcurrent")
}
g.SetLimit(t.maxConcurrent)
// iterate through both user and common index
users := []string{userID, ""}

@ -224,6 +224,7 @@ func (s *indexShipper) ForEach(ctx context.Context, tableName, userID string, ca
func (s *indexShipper) ForEachConcurrent(ctx context.Context, tableName, userID string, callback index.ForEachIndexCallback) error {
g, ctx := errgroup.WithContext(ctx)
// E.Welch not setting a bound on the errgroup here because we set one inside the downloadsManager.ForEachConcurrent
if s.downloadsManager != nil {
g.Go(func() error {

@ -33,7 +33,7 @@ func newIndexShipperQuerier(shipper indexShipperIterator, tableRange config.Tabl
type indexIterFunc func(func(context.Context, Index) error) error
func (i indexIterFunc) For(_ context.Context, f func(context.Context, Index) error) error {
func (i indexIterFunc) For(_ context.Context, _ int, f func(context.Context, Index) error) error {
return i(f)
}

@ -3,6 +3,7 @@ package tsdb
import (
"context"
"math"
"runtime"
"sync"
"github.com/prometheus/common/model"
@ -14,8 +15,9 @@ import (
)
type MultiIndex struct {
iter IndexIter
filterer chunk.RequestChunkFilterer
iter IndexIter
filterer chunk.RequestChunkFilterer
maxParallel int
}
type IndexIter interface {
@ -26,13 +28,18 @@ type IndexIter interface {
// Lazy iteration may touch different index files within the same index query.
// `For` e.g, Bounds and GetChunkRefs might go through different index files
// if a sync happened between the calls.
For(context.Context, func(context.Context, Index) error) error
// The second parameter sets a limit on the number of indexes iterated concurrently.
For(context.Context, int, func(context.Context, Index) error) error
}
type IndexSlice []Index
func (xs IndexSlice) For(ctx context.Context, fn func(context.Context, Index) error) error {
func (xs IndexSlice) For(ctx context.Context, maxConcurrent int, fn func(context.Context, Index) error) error {
g, ctx := errgroup.WithContext(ctx)
if maxConcurrent == 0 {
panic("maxConcurrent cannot be 0, IndexIter is being called with a maxConcurrent of 0")
}
g.SetLimit(maxConcurrent)
for i := range xs {
x := xs[i]
g.Go(func() error {
@ -43,7 +50,14 @@ func (xs IndexSlice) For(ctx context.Context, fn func(context.Context, Index) er
}
func NewMultiIndex(i IndexIter) *MultiIndex {
return &MultiIndex{iter: i}
maxConcurrent := runtime.GOMAXPROCS(0) / 2
if maxConcurrent == 0 {
maxConcurrent = 1
}
return &MultiIndex{
iter: i,
maxParallel: maxConcurrent,
}
}
func (i *MultiIndex) Bounds() (model.Time, model.Time) {
@ -90,7 +104,7 @@ func (i *MultiIndex) Close() error {
func (i *MultiIndex) forMatchingIndices(ctx context.Context, from, through model.Time, f func(context.Context, Index) error) error {
queryBounds := newBounds(from, through)
return i.iter.For(ctx, func(ctx context.Context, idx Index) error {
return i.iter.For(ctx, i.maxParallel, func(ctx context.Context, idx Index) error {
if Overlap(queryBounds, idx) {
if i.filterer != nil {

Loading…
Cancel
Save