From d9f3bf30b1d9dff869883a9eae5ac5ab5687c2c9 Mon Sep 17 00:00:00 2001 From: Ashwanth Date: Fri, 1 Dec 2023 19:52:37 +0530 Subject: [PATCH] config: adds `frontend.max-query-capacity` to tune per-tenant query capacity (#11284) **What this PR does / why we need it**: Adds a new config `frontend.max-query-capacity` that allows users to configure what portion of the the available querier replicas can be used by a tenant. `max_query_capacity` is the corresponding YAML option that can be configured in limits or runtime overrides. For example, setting this to 0.5 would allow a tenant to use half of the available queriers. This complements the existing `frontend.max-queriers-per-tenant`. When both are configured, the smaller value of the resulting querier replica count is considered: ``` min(frontend.max-queriers-per-tenant, ceil(querier_replicas * frontend.max-query-capacity)) ``` *All* queriers will handle requests for a tenant if neither limits are applied. **Which issue(s) this PR fixes**: Fixes # **Special notes for your reviewer**: noticed that we don't pass down the shuffle sharding limits for frontend (only using it with schedulers) https://github.com/grafana/loki/blob/26f097162a856db48ecbd16bef2f0b750029855b/pkg/loki/modules.go#L895 but the [docs](https://github.com/grafana/loki/blob/26f097162a856db48ecbd16bef2f0b750029855b/pkg/validation/limits.go#L276) mention that`frontend.max-queriers-per-tenant` applies to frontend as well. ``` This option only works with queriers connecting to the query-frontend / query-scheduler, not when using downstream URL. ``` **Checklist** - [x] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [x] Documentation added - [x] Tests updated - [x] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/setup/upgrade/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) - [ ] If the change is deprecating or removing a configuration option, update the `deprecated-config.yaml` and `deleted-config.yaml` files respectively in the `tools/deprecated-config-checker` directory. [Example PR](https://github.com/grafana/loki/pull/10840/commits/0d4416a4b03739583349934b96f272fb4f685d15) --------- Co-authored-by: J Stickler Co-authored-by: Danny Kopping --- CHANGELOG.md | 1 + docs/sources/configure/_index.md | 16 + pkg/bloomgateway/bloomgateway.go | 12 +- pkg/loki/modules.go | 4 +- pkg/lokifrontend/frontend/v1/frontend.go | 18 +- pkg/lokifrontend/frontend/v1/frontend_test.go | 16 +- pkg/lokifrontend/frontend/v1/queue_test.go | 2 +- pkg/queue/dequeue_qos_test.go | 28 +- pkg/queue/queue.go | 23 +- pkg/queue/queue_test.go | 28 +- pkg/queue/tenant_queues.go | 74 ++-- pkg/queue/tenant_queues_test.go | 390 +++++++++--------- pkg/scheduler/limits/definitions.go | 41 +- pkg/scheduler/limits/definitions_test.go | 78 ++++ pkg/scheduler/scheduler.go | 17 +- .../shipper/bloomshipper/block_downloader.go | 13 +- pkg/validation/exporter.go | 2 +- pkg/validation/limits.go | 23 +- 18 files changed, 499 insertions(+), 287 deletions(-) create mode 100644 pkg/scheduler/limits/definitions_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index a322ff8906..8a93537fe6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ * [10727](https://github.com/grafana/loki/pull/10727) **sandeepsukhani** Native otlp ingestion support * [11051](https://github.com/grafana/loki/pull/11051) Refactor to not use global logger in modules * [10956](https://github.com/grafana/loki/pull/10956) **jeschkies** do not wrap requests but send pure Protobuf from frontend v2 via scheduler to querier when `-frontend.encoding=protobuf`. +* [11284](https://github.com/grafana/loki/pull/11284) **ashwanthgoli** Config: Adds `frontend.max-query-capacity` to tune per-tenant query capacity. ##### Fixes * [11074](https://github.com/grafana/loki/pull/11074) **hainenber** Fix panic in lambda-promtail due to mishandling of empty DROP_LABELS env var. diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index 9844c6109d..52466dd5e6 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -2768,6 +2768,22 @@ The `limits_config` block configures global and per-tenant limits in Loki. # CLI flag: -frontend.max-queriers-per-tenant [max_queriers_per_tenant: | default = 0] +# How much of the available query capacity ("querier" components in distributed +# mode, "read" components in SSD mode) can be used by a single tenant. Allowed +# values are 0.0 to 1.0. For example, setting this to 0.5 would allow a tenant +# to use half of the available queriers for processing the query workload. If +# set to 0, query capacity is determined by frontend.max-queriers-per-tenant. +# When both frontend.max-queriers-per-tenant and frontend.max-query-capacity are +# configured, smaller value of the resulting querier replica count is +# considered: min(frontend.max-queriers-per-tenant, ceil(querier_replicas * +# frontend.max-query-capacity)). *All* queriers will handle requests for the +# tenant if neither limits are applied. This option only works with queriers +# connecting to the query-frontend / query-scheduler, not when using downstream +# URL. Use this feature in a multi-tenant setup where you need to limit query +# capacity for certain tenants. +# CLI flag: -frontend.max-query-capacity +[max_query_capacity: | default = 0] + # Number of days of index to be kept always downloaded for queries. Applies only # to per user index in boltdb-shipper index store. 0 to disable. # CLI flag: -store.query-ready-index-num-days diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index ae753e3705..e49ae1e492 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -163,6 +163,14 @@ type Gateway struct { workerConfig workerConfig } +type fixedQueueLimits struct { + maxConsumers int +} + +func (l *fixedQueueLimits) MaxConsumers(_ string, _ int) int { + return l.maxConsumers +} + // New returns a new instance of the Bloom Gateway. func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, overrides Limits, shardingStrategy ShardingStrategy, cm storage.ClientMetrics, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) { g := &Gateway{ @@ -179,7 +187,7 @@ func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, o queueMetrics: queue.NewMetrics(reg, constants.Loki, metricsSubsystem), } - g.queue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, time.Minute, g.queueMetrics) + g.queue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, time.Minute, &fixedQueueLimits{100}, g.queueMetrics) g.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(g.queueMetrics.Cleanup) client, err := bloomshipper.NewBloomClient(schemaCfg.Configs, storageCfg, cm) @@ -295,7 +303,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk g.activeUsers.UpdateUserTimestamp(tenantID, time.Now()) level.Info(g.logger).Log("msg", "enqueue task", "task", task.ID) - g.queue.Enqueue(tenantID, []string{}, task, 100, func() { + g.queue.Enqueue(tenantID, []string{}, task, func() { // When enqueuing, we also add the task to the pending tasks g.pendingTasks.Add(task.ID, task) }) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index bf450f852b..20a1580182 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -801,7 +801,9 @@ func (t *Loki) initIngesterQuerier() (_ services.Service, err error) { // Placeholder limits type to pass to cortex frontend type disabledShuffleShardingLimits struct{} -func (disabledShuffleShardingLimits) MaxQueriersPerUser(_ string) int { return 0 } +func (disabledShuffleShardingLimits) MaxQueriersPerUser(_ string) uint { return 0 } + +func (disabledShuffleShardingLimits) MaxQueryCapacity(_ string) float64 { return 0 } func (t *Loki) initQueryFrontendMiddleware() (_ services.Service, err error) { level.Debug(util_log.Logger).Log("msg", "initializing query frontend tripperware") diff --git a/pkg/lokifrontend/frontend/v1/frontend.go b/pkg/lokifrontend/frontend/v1/frontend.go index ff32cbf7b9..cf17b62b03 100644 --- a/pkg/lokifrontend/frontend/v1/frontend.go +++ b/pkg/lokifrontend/frontend/v1/frontend.go @@ -21,9 +21,9 @@ import ( "github.com/grafana/loki/pkg/lokifrontend/frontend/v1/frontendv1pb" "github.com/grafana/loki/pkg/querier/stats" "github.com/grafana/loki/pkg/queue" + "github.com/grafana/loki/pkg/scheduler/limits" "github.com/grafana/loki/pkg/util" lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc" - "github.com/grafana/loki/pkg/util/validation" ) var errTooManyRequest = httpgrpc.Errorf(http.StatusTooManyRequests, "too many outstanding requests") @@ -42,7 +42,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { type Limits interface { // Returns max queriers to use per tenant, or 0 if shuffle sharding is disabled. - MaxQueriersPerUser(user string) int + MaxQueriersPerUser(user string) uint + + // MaxQueryCapacity returns how much of the available query capacity can be used by this user. + MaxQueryCapacity(user string) float64 } // Frontend queues HTTP requests, dispatches them to backends, and handles retries @@ -80,12 +83,12 @@ type request struct { } // New creates a new frontend. Frontend implements service, and must be started and stopped. -func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer, metricsNamespace string) (*Frontend, error) { +func New(cfg Config, frontendLimits Limits, log log.Logger, registerer prometheus.Registerer, metricsNamespace string) (*Frontend, error) { queueMetrics := queue.NewMetrics(registerer, metricsNamespace, "query_frontend") f := &Frontend{ cfg: cfg, log: log, - limits: limits, + limits: frontendLimits, queueMetrics: queueMetrics, queueDuration: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ Namespace: metricsNamespace, @@ -95,7 +98,7 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist }), } - f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, queueMetrics) + f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, limits.NewQueueLimits(frontendLimits), queueMetrics) f.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(f.cleanupInactiveUserMetrics) var err error @@ -312,13 +315,10 @@ func (f *Frontend) queueRequest(ctx context.Context, req *request) error { req.enqueueTime = now req.queueSpan, _ = opentracing.StartSpanFromContext(ctx, "queued") - // aggregate the max queriers limit in the case of a multi tenant query - maxQueriers := validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, f.limits.MaxQueriersPerUser) - joinedTenantID := tenant.JoinTenantIDs(tenantIDs) f.activeUsers.UpdateUserTimestamp(joinedTenantID, now) - err = f.requestQueue.Enqueue(joinedTenantID, nil, req, maxQueriers, nil) + err = f.requestQueue.Enqueue(joinedTenantID, nil, req, nil) if err == queue.ErrTooManyRequests { return errTooManyRequest } diff --git a/pkg/lokifrontend/frontend/v1/frontend_test.go b/pkg/lokifrontend/frontend/v1/frontend_test.go index f715d3e8f5..a10a55b379 100644 --- a/pkg/lokifrontend/frontend/v1/frontend_test.go +++ b/pkg/lokifrontend/frontend/v1/frontend_test.go @@ -35,6 +35,7 @@ import ( "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" querier_worker "github.com/grafana/loki/pkg/querier/worker" "github.com/grafana/loki/pkg/queue" + "github.com/grafana/loki/pkg/scheduler/limits" "github.com/grafana/loki/pkg/util/constants" ) @@ -135,7 +136,7 @@ func TestFrontendCheckReady(t *testing.T) { qm := queue.NewMetrics(nil, constants.Loki, "query_frontend") f := &Frontend{ log: log.NewNopLogger(), - requestQueue: queue.NewRequestQueue(5, 0, qm), + requestQueue: queue.NewRequestQueue(5, 0, limits.NewQueueLimits(nil), qm), } for i := 0; i < tt.connectedClients; i++ { f.requestQueue.RegisterConsumerConnection("test") @@ -243,7 +244,7 @@ func testFrontend(t *testing.T, config Config, handler queryrangebase.Handler, t httpListen, err := net.Listen("tcp", "localhost:0") require.NoError(t, err) - v1, err := New(config, limits{}, logger, reg, constants.Loki) + v1, err := New(config, mockLimits{}, logger, reg, constants.Loki) require.NoError(t, err) require.NotNil(t, v1) require.NoError(t, services.StartAndAwaitRunning(context.Background(), v1)) @@ -293,10 +294,15 @@ func defaultFrontendConfig() Config { return config } -type limits struct { - queriers int +type mockLimits struct { + queriers uint + queryCapacity float64 } -func (l limits) MaxQueriersPerUser(_ string) int { +func (l mockLimits) MaxQueriersPerUser(_ string) uint { return l.queriers } + +func (l mockLimits) MaxQueryCapacity(_ string) float64 { + return l.queryCapacity +} diff --git a/pkg/lokifrontend/frontend/v1/queue_test.go b/pkg/lokifrontend/frontend/v1/queue_test.go index efc04e3389..a6f380afd4 100644 --- a/pkg/lokifrontend/frontend/v1/queue_test.go +++ b/pkg/lokifrontend/frontend/v1/queue_test.go @@ -24,7 +24,7 @@ import ( func setupFrontend(t *testing.T, config Config) *Frontend { logger := log.NewNopLogger() - frontend, err := New(config, limits{queriers: 3}, logger, nil, constants.Loki) + frontend, err := New(config, mockLimits{queriers: 3}, logger, nil, constants.Loki) require.NoError(t, err) t.Cleanup(func() { diff --git a/pkg/queue/dequeue_qos_test.go b/pkg/queue/dequeue_qos_test.go index 6b1de88594..c889cbe8f4 100644 --- a/pkg/queue/dequeue_qos_test.go +++ b/pkg/queue/dequeue_qos_test.go @@ -44,7 +44,7 @@ func enqueueRequestsForActor(t testing.TB, actor []string, useActor bool, queue if !useActor { actor = nil } - err := queue.Enqueue("tenant", actor, r, 0, nil) + err := queue.Enqueue("tenant", actor, r, nil) if err != nil { t.Fatal(err) } @@ -58,7 +58,7 @@ func BenchmarkQueryFairness(t *testing.B) { for _, useActor := range []bool{false, true} { t.Run(fmt.Sprintf("use hierarchical queues = %v", useActor), func(t *testing.B) { - requestQueue := NewRequestQueue(1024, 0, NewMetrics(nil, constants.Loki, "query_scheduler")) + requestQueue := NewRequestQueue(1024, 0, noQueueLimits, NewMetrics(nil, constants.Loki, "query_scheduler")) enqueueRequestsForActor(t, []string{}, useActor, requestQueue, numSubRequestsActorA, 50*time.Millisecond) enqueueRequestsForActor(t, []string{"a"}, useActor, requestQueue, numSubRequestsActorA, 100*time.Millisecond) enqueueRequestsForActor(t, []string{"b"}, useActor, requestQueue, numSubRequestsActorB, 50*time.Millisecond) @@ -133,18 +133,18 @@ func TestQueryFairnessAcrossSameLevel(t *testing.T) { 456: [210] **/ - requestQueue := NewRequestQueue(1024, 0, NewMetrics(nil, constants.Loki, "query_scheduler")) - _ = requestQueue.Enqueue("tenant1", []string{}, r(0), 0, nil) - _ = requestQueue.Enqueue("tenant1", []string{}, r(1), 0, nil) - _ = requestQueue.Enqueue("tenant1", []string{}, r(2), 0, nil) - _ = requestQueue.Enqueue("tenant1", []string{"abc"}, r(10), 0, nil) - _ = requestQueue.Enqueue("tenant1", []string{"abc"}, r(11), 0, nil) - _ = requestQueue.Enqueue("tenant1", []string{"abc"}, r(12), 0, nil) - _ = requestQueue.Enqueue("tenant1", []string{"xyz"}, r(20), 0, nil) - _ = requestQueue.Enqueue("tenant1", []string{"xyz"}, r(21), 0, nil) - _ = requestQueue.Enqueue("tenant1", []string{"xyz"}, r(22), 0, nil) - _ = requestQueue.Enqueue("tenant1", []string{"xyz", "123"}, r(200), 0, nil) - _ = requestQueue.Enqueue("tenant1", []string{"xyz", "456"}, r(210), 0, nil) + requestQueue := NewRequestQueue(1024, 0, noQueueLimits, NewMetrics(nil, constants.Loki, "query_scheduler")) + _ = requestQueue.Enqueue("tenant1", []string{}, r(0), nil) + _ = requestQueue.Enqueue("tenant1", []string{}, r(1), nil) + _ = requestQueue.Enqueue("tenant1", []string{}, r(2), nil) + _ = requestQueue.Enqueue("tenant1", []string{"abc"}, r(10), nil) + _ = requestQueue.Enqueue("tenant1", []string{"abc"}, r(11), nil) + _ = requestQueue.Enqueue("tenant1", []string{"abc"}, r(12), nil) + _ = requestQueue.Enqueue("tenant1", []string{"xyz"}, r(20), nil) + _ = requestQueue.Enqueue("tenant1", []string{"xyz"}, r(21), nil) + _ = requestQueue.Enqueue("tenant1", []string{"xyz"}, r(22), nil) + _ = requestQueue.Enqueue("tenant1", []string{"xyz", "123"}, r(200), nil) + _ = requestQueue.Enqueue("tenant1", []string{"xyz", "456"}, r(210), nil) requestQueue.queues.recomputeUserConsumers() items := make([]int, 0) diff --git a/pkg/queue/queue.go b/pkg/queue/queue.go index f0475164bd..4af4d2c903 100644 --- a/pkg/queue/queue.go +++ b/pkg/queue/queue.go @@ -39,6 +39,11 @@ func (ui QueueIndex) ReuseLastIndex() QueueIndex { return ui - 1 } +type Limits interface { + // MaxConsumers returns the max consumers to use per tenant or 0 to allow all consumers to consume from the queue. + MaxConsumers(user string, allConsumers int) int +} + // Request stored into the queue. type Request any @@ -62,9 +67,9 @@ type RequestQueue struct { pool *SlicePool[Request] } -func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, metrics *Metrics) *RequestQueue { +func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, limits Limits, metrics *Metrics) *RequestQueue { q := &RequestQueue{ - queues: newTenantQueues(maxOutstandingPerTenant, forgetDelay), + queues: newTenantQueues(maxOutstandingPerTenant, forgetDelay, limits), connectedConsumers: atomic.NewInt32(0), metrics: metrics, pool: NewSlicePool[Request](1<<6, 1<<10, 2), // Buckets are [64, 128, 256, 512, 1024]. @@ -76,12 +81,9 @@ func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, met return q } -// Enqueue puts the request into the queue. MaxQueries is tenant-specific value that specifies how many queriers can -// this tenant use (zero or negative = all queriers). It is passed to each Enqueue, because it can change -// between calls. -// +// Enqueue puts the request into the queue. // If request is successfully enqueued, successFn is called with the lock held, before any querier can receive the request. -func (q *RequestQueue) Enqueue(tenant string, path []string, req Request, maxQueriers int, successFn func()) error { +func (q *RequestQueue) Enqueue(tenant string, path []string, req Request, successFn func()) error { q.mtx.Lock() defer q.mtx.Unlock() @@ -89,10 +91,9 @@ func (q *RequestQueue) Enqueue(tenant string, path []string, req Request, maxQue return ErrStopped } - queue := q.queues.getOrAddQueue(tenant, path, maxQueriers) - if queue == nil { - // This can only happen if tenant is "". - return errors.New("no queue found") + queue, err := q.queues.getOrAddQueue(tenant, path) + if err != nil { + return fmt.Errorf("no queue found: %w", err) } // Optimistically increase queue counter for tenant instead of doing separate diff --git a/pkg/queue/queue_test.go b/pkg/queue/queue_test.go index a2cb42441c..623e240733 100644 --- a/pkg/queue/queue_test.go +++ b/pkg/queue/queue_test.go @@ -47,7 +47,7 @@ func BenchmarkGetNextRequest(b *testing.B) { queues := make([]*RequestQueue, 0, b.N) for n := 0; n < b.N; n++ { - queue := NewRequestQueue(maxOutstandingPerTenant, 0, NewMetrics(nil, constants.Loki, "query_scheduler")) + queue := NewRequestQueue(maxOutstandingPerTenant, 0, noQueueLimits, NewMetrics(nil, constants.Loki, "query_scheduler")) queues = append(queues, queue) for ix := 0; ix < queriers; ix++ { @@ -57,7 +57,7 @@ func BenchmarkGetNextRequest(b *testing.B) { for i := 0; i < maxOutstandingPerTenant; i++ { for j := 0; j < numTenants; j++ { userID := strconv.Itoa(j) - err := queue.Enqueue(userID, benchCase.fn(j), "request", 0, nil) + err := queue.Enqueue(userID, benchCase.fn(j), "request", nil) if err != nil { b.Fatal(err) } @@ -105,7 +105,7 @@ func BenchmarkQueueRequest(b *testing.B) { requests := make([]string, 0, numTenants) for n := 0; n < b.N; n++ { - q := NewRequestQueue(maxOutstandingPerTenant, 0, NewMetrics(nil, constants.Loki, "query_scheduler")) + q := NewRequestQueue(maxOutstandingPerTenant, 0, noQueueLimits, NewMetrics(nil, constants.Loki, "query_scheduler")) for ix := 0; ix < queriers; ix++ { q.RegisterConsumerConnection(fmt.Sprintf("querier-%d", ix)) @@ -123,7 +123,7 @@ func BenchmarkQueueRequest(b *testing.B) { for n := 0; n < b.N; n++ { for i := 0; i < maxOutstandingPerTenant; i++ { for j := 0; j < numTenants; j++ { - err := queues[n].Enqueue(users[j], nil, requests[j], 0, nil) + err := queues[n].Enqueue(users[j], nil, requests[j], nil) if err != nil { b.Fatal(err) } @@ -135,7 +135,7 @@ func BenchmarkQueueRequest(b *testing.B) { func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBecauseQuerierHasBeenForgotten(t *testing.T) { const forgetDelay = 3 * time.Second - queue := NewRequestQueue(1, forgetDelay, NewMetrics(nil, constants.Loki, "query_scheduler")) + queue := NewRequestQueue(1, forgetDelay, &mockQueueLimits{maxConsumers: 1}, NewMetrics(nil, constants.Loki, "query_scheduler")) // Start the queue service. ctx := context.Background() @@ -162,7 +162,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe // Enqueue a request from an user which would be assigned to querier-1. // NOTE: "user-1" hash falls in the querier-1 shard. - require.NoError(t, queue.Enqueue("user-1", nil, "request", 1, nil)) + require.NoError(t, queue.Enqueue("user-1", nil, "request", nil)) startTime := time.Now() querier2wg.Wait() @@ -306,17 +306,17 @@ func TestContextCond(t *testing.T) { func TestMaxQueueSize(t *testing.T) { t.Run("queue size is tracked per tenant", func(t *testing.T) { maxSize := 3 - queue := NewRequestQueue(maxSize, 0, NewMetrics(nil, constants.Loki, "query_scheduler")) + queue := NewRequestQueue(maxSize, 0, noQueueLimits, NewMetrics(nil, constants.Loki, "query_scheduler")) queue.RegisterConsumerConnection("querier") // enqueue maxSize items with different actors // different actors have individual channels with maxSize length - assert.NoError(t, queue.Enqueue("tenant", []string{"user-a"}, 1, 0, nil)) - assert.NoError(t, queue.Enqueue("tenant", []string{"user-b"}, 2, 0, nil)) - assert.NoError(t, queue.Enqueue("tenant", []string{"user-c"}, 3, 0, nil)) + assert.NoError(t, queue.Enqueue("tenant", []string{"user-a"}, 1, nil)) + assert.NoError(t, queue.Enqueue("tenant", []string{"user-b"}, 2, nil)) + assert.NoError(t, queue.Enqueue("tenant", []string{"user-c"}, 3, nil)) // max queue length per tenant is tracked globally for all actors within a tenant - err := queue.Enqueue("tenant", []string{"user-a"}, 4, 0, nil) + err := queue.Enqueue("tenant", []string{"user-a"}, 4, nil) assert.Equal(t, err, ErrTooManyRequests) // dequeue and enqueue some items @@ -325,10 +325,10 @@ func TestMaxQueueSize(t *testing.T) { _, _, err = queue.Dequeue(context.Background(), StartIndexWithLocalQueue, "querier") assert.NoError(t, err) - assert.NoError(t, queue.Enqueue("tenant", []string{"user-a"}, 4, 0, nil)) - assert.NoError(t, queue.Enqueue("tenant", []string{"user-b"}, 5, 0, nil)) + assert.NoError(t, queue.Enqueue("tenant", []string{"user-a"}, 4, nil)) + assert.NoError(t, queue.Enqueue("tenant", []string{"user-b"}, 5, nil)) - err = queue.Enqueue("tenant", []string{"user-c"}, 6, 0, nil) + err = queue.Enqueue("tenant", []string{"user-c"}, 6, nil) assert.Equal(t, err, ErrTooManyRequests) }) } diff --git a/pkg/queue/tenant_queues.go b/pkg/queue/tenant_queues.go index 46e8a999fb..69fac6ed60 100644 --- a/pkg/queue/tenant_queues.go +++ b/pkg/queue/tenant_queues.go @@ -6,11 +6,17 @@ package queue import ( + "fmt" "math/rand" "sort" "time" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/tenant" + "github.com/grafana/loki/pkg/util" + util_log "github.com/grafana/loki/pkg/util/log" + "github.com/grafana/loki/pkg/util/validation" ) type intPointerMap map[string]*int @@ -67,6 +73,8 @@ type tenantQueues struct { // sortedConsumer list of consumer IDs, used when creating per-user shard. sortedConsumers []string + + limits Limits } type Queue interface { @@ -87,16 +95,15 @@ type tenantQueue struct { *TreeQueue // If not nil, only these consumers can handle user requests. If nil, all consumers can. - // We set this to nil if number of available consumers <= maxQueriers. - consumers map[string]struct{} - maxQueriers int + // We set this to nil if number of available consumers <= MaxConsumers. + consumers map[string]struct{} // Seed for shuffle sharding of consumers. This seed is based on userID only and is therefore consistent // between different frontends. seed int64 } -func newTenantQueues(maxUserQueueSize int, forgetDelay time.Duration) *tenantQueues { +func newTenantQueues(maxUserQueueSize int, forgetDelay time.Duration, limits Limits) *tenantQueues { mm := &Mapping[*tenantQueue]{} mm.Init(64) return &tenantQueues{ @@ -106,6 +113,7 @@ func newTenantQueues(maxUserQueueSize int, forgetDelay time.Duration) *tenantQue forgetDelay: forgetDelay, consumers: map[string]*consumer{}, sortedConsumers: nil, + limits: limits, } } @@ -118,37 +126,42 @@ func (q *tenantQueues) deleteQueue(tenant string) { } // Returns existing or new queue for a tenant. -// MaxQueriers is used to compute which consumers should handle requests for this tenant. -// If maxQueriers is <= 0, all consumers can handle this tenant's requests. -// If maxQueriers has changed since the last call, consumers for this are recomputed. -func (q *tenantQueues) getOrAddQueue(tenant string, path []string, maxQueriers int) Queue { +func (q *tenantQueues) getOrAddQueue(tenantID string, path []string) (Queue, error) { // Empty tenant is not allowed, as that would break our tenants list ("" is used for free spot). - if tenant == "" { - return nil + if tenantID == "" { + return nil, fmt.Errorf("empty tenant is not allowed") } - if maxQueriers < 0 { - maxQueriers = 0 + // extract tenantIDs to compute limits for multi-tenant queries + tenantIDs, err := tenant.TenantIDsFromOrgID(tenantID) + if err != nil { + return nil, fmt.Errorf("extract tenant ids: %w", err) } - uq := q.mapping.GetByKey(tenant) + uq := q.mapping.GetByKey(tenantID) if uq == nil { uq = &tenantQueue{ - seed: util.ShuffleShardSeed(tenant, ""), + seed: util.ShuffleShardSeed(tenantID, ""), } - uq.TreeQueue = newTreeQueue(q.maxUserQueueSize, tenant) - q.mapping.Put(tenant, uq) + uq.TreeQueue = newTreeQueue(q.maxUserQueueSize, tenantID) + q.mapping.Put(tenantID, uq) } - if uq.maxQueriers != maxQueriers { - uq.maxQueriers = maxQueriers - uq.consumers = shuffleConsumersForTenants(uq.seed, maxQueriers, q.sortedConsumers, nil) + consumersToSelect := validation.SmallestPositiveNonZeroIntPerTenant( + tenantIDs, + func(tenantID string) int { + return q.limits.MaxConsumers(tenantID, len(q.sortedConsumers)) + }, + ) + + if len(uq.consumers) != consumersToSelect { + uq.consumers = shuffleConsumersForTenants(uq.seed, consumersToSelect, q.sortedConsumers, nil) } if len(path) == 0 { - return uq + return uq, nil } - return uq.add(path) + return uq.add(path), nil } // Finds next queue for the consumer. To support fair scheduling between users, client is expected @@ -294,8 +307,23 @@ func (q *tenantQueues) forgetDisconnectedConsumers(now time.Time) int { func (q *tenantQueues) recomputeUserConsumers() { scratchpad := make([]string, 0, len(q.sortedConsumers)) - for _, uq := range q.mapping.Values() { - uq.consumers = shuffleConsumersForTenants(uq.seed, uq.maxQueriers, q.sortedConsumers, scratchpad) + for _, tenantID := range q.mapping.Keys() { + if uq := q.mapping.GetByKey(tenantID); uq != nil { + tenantIDs, err := tenant.TenantIDsFromOrgID(tenantID) + if err != nil { + // this is unlikely to happen since we do tenantID validation when creating the queue. + level.Error(util_log.Logger).Log("msg", "failed to shuffle consumers because of errors in tenantID extraction", "tenant", tenantID, "error", err) + continue + } + + consumersToSelect := validation.SmallestPositiveNonZeroIntPerTenant( + tenantIDs, + func(tenantID string) int { + return q.limits.MaxConsumers(tenantID, len(q.sortedConsumers)) + }, + ) + uq.consumers = shuffleConsumersForTenants(uq.seed, consumersToSelect, q.sortedConsumers, scratchpad) + } } } diff --git a/pkg/queue/tenant_queues_test.go b/pkg/queue/tenant_queues_test.go index 95f2a67963..4f49b82333 100644 --- a/pkg/queue/tenant_queues_test.go +++ b/pkg/queue/tenant_queues_test.go @@ -15,53 +15,57 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/scheduler/limits" ) +var noQueueLimits = limits.NewQueueLimits(nil) + func TestQueues(t *testing.T) { - uq := newTenantQueues(0, 0) + uq := newTenantQueues(0, 0, noQueueLimits) assert.NotNil(t, uq) assert.NoError(t, isConsistent(uq)) - uq.addConsumerToConnection("querier-1") - uq.addConsumerToConnection("querier-2") + uq.addConsumerToConnection("consumer-1") + uq.addConsumerToConnection("consumer-2") - q, u, lastUserIndex := uq.getNextQueueForConsumer(-1, "querier-1") + q, u, lastUserIndex := uq.getNextQueueForConsumer(-1, "consumer-1") assert.Nil(t, q) assert.Equal(t, "", u) // Add queues: [one] - qOne := getOrAdd(t, uq, "one", 0) - lastUserIndex = confirmOrderForQuerier(t, uq, "querier-1", lastUserIndex, qOne, qOne) + qOne := getOrAdd(t, uq, "one") + lastUserIndex = confirmOrderForConsumer(t, uq, "consumer-1", lastUserIndex, qOne, qOne) // [one two] - qTwo := getOrAdd(t, uq, "two", 0) + qTwo := getOrAdd(t, uq, "two") assert.NotEqual(t, qOne, qTwo) - lastUserIndex = confirmOrderForQuerier(t, uq, "querier-1", lastUserIndex, qTwo, qOne, qTwo, qOne) - confirmOrderForQuerier(t, uq, "querier-2", -1, qOne, qTwo, qOne) + lastUserIndex = confirmOrderForConsumer(t, uq, "consumer-1", lastUserIndex, qTwo, qOne, qTwo, qOne) + confirmOrderForConsumer(t, uq, "consumer-2", -1, qOne, qTwo, qOne) // [one two three] // confirm fifo by adding a third queue and iterating to it - qThree := getOrAdd(t, uq, "three", 0) + qThree := getOrAdd(t, uq, "three") - lastUserIndex = confirmOrderForQuerier(t, uq, "querier-1", lastUserIndex, qTwo, qThree, qOne) + lastUserIndex = confirmOrderForConsumer(t, uq, "consumer-1", lastUserIndex, qTwo, qThree, qOne) // Remove one: ["" two three] uq.deleteQueue("one") assert.NoError(t, isConsistent(uq)) - lastUserIndex = confirmOrderForQuerier(t, uq, "querier-1", lastUserIndex, qTwo, qThree, qTwo) + lastUserIndex = confirmOrderForConsumer(t, uq, "consumer-1", lastUserIndex, qTwo, qThree, qTwo) // "four" is added at the beginning of the list: [four two three] - qFour := getOrAdd(t, uq, "four", 0) + qFour := getOrAdd(t, uq, "four") - lastUserIndex = confirmOrderForQuerier(t, uq, "querier-1", lastUserIndex, qThree, qFour, qTwo, qThree) + lastUserIndex = confirmOrderForConsumer(t, uq, "consumer-1", lastUserIndex, qThree, qFour, qTwo, qThree) // Remove two: [four "" three] uq.deleteQueue("two") assert.NoError(t, isConsistent(uq)) - lastUserIndex = confirmOrderForQuerier(t, uq, "querier-1", lastUserIndex, qFour, qThree, qFour) + lastUserIndex = confirmOrderForConsumer(t, uq, "consumer-1", lastUserIndex, qFour, qThree, qFour) // Remove three: [four] uq.deleteQueue("three") @@ -71,55 +75,55 @@ func TestQueues(t *testing.T) { uq.deleteQueue("four") assert.NoError(t, isConsistent(uq)) - q, _, _ = uq.getNextQueueForConsumer(lastUserIndex, "querier-1") + q, _, _ = uq.getNextQueueForConsumer(lastUserIndex, "consumer-1") assert.Nil(t, q) } -func TestQueuesOnTerminatingQuerier(t *testing.T) { - uq := newTenantQueues(0, 0) +func TestQueuesOnTerminatingConsumer(t *testing.T) { + uq := newTenantQueues(0, 0, noQueueLimits) assert.NotNil(t, uq) assert.NoError(t, isConsistent(uq)) - uq.addConsumerToConnection("querier-1") - uq.addConsumerToConnection("querier-2") + uq.addConsumerToConnection("consumer-1") + uq.addConsumerToConnection("consumer-2") // Add queues: [one, two] - qOne := getOrAdd(t, uq, "one", 0) - qTwo := getOrAdd(t, uq, "two", 0) - confirmOrderForQuerier(t, uq, "querier-1", -1, qOne, qTwo, qOne, qTwo) - confirmOrderForQuerier(t, uq, "querier-2", -1, qOne, qTwo, qOne, qTwo) - - // After notify shutdown for querier-2, it's expected to own no queue. - uq.notifyQuerierShutdown("querier-2") - q, u, _ := uq.getNextQueueForConsumer(-1, "querier-2") + qOne := getOrAdd(t, uq, "one") + qTwo := getOrAdd(t, uq, "two") + confirmOrderForConsumer(t, uq, "consumer-1", -1, qOne, qTwo, qOne, qTwo) + confirmOrderForConsumer(t, uq, "consumer-2", -1, qOne, qTwo, qOne, qTwo) + + // After notify shutdown for consumer-2, it's expected to own no queue. + uq.notifyQuerierShutdown("consumer-2") + q, u, _ := uq.getNextQueueForConsumer(-1, "consumer-2") assert.Nil(t, q) assert.Equal(t, "", u) - // However, querier-1 still get queues because it's still running. - confirmOrderForQuerier(t, uq, "querier-1", -1, qOne, qTwo, qOne, qTwo) + // However, consumer-1 still get queues because it's still running. + confirmOrderForConsumer(t, uq, "consumer-1", -1, qOne, qTwo, qOne, qTwo) - // After disconnecting querier-2, it's expected to own no queue. - uq.removeConsumer("querier-2") - q, u, _ = uq.getNextQueueForConsumer(-1, "querier-2") + // After disconnecting consumer-2, it's expected to own no queue. + uq.removeConsumer("consumer-2") + q, u, _ = uq.getNextQueueForConsumer(-1, "consumer-2") assert.Nil(t, q) assert.Equal(t, "", u) } -func TestQueuesWithQueriers(t *testing.T) { - uq := newTenantQueues(0, 0) +func TestQueuesWithConsumers(t *testing.T) { + maxConsumers := 5 + uq := newTenantQueues(0, 0, &mockQueueLimits{maxConsumers: maxConsumers}) assert.NotNil(t, uq) assert.NoError(t, isConsistent(uq)) - queriers := 30 + consumers := 30 users := 1000 - maxQueriersPerUser := 5 - // Add some queriers. - for ix := 0; ix < queriers; ix++ { - qid := fmt.Sprintf("querier-%d", ix) + // Add some consumers. + for ix := 0; ix < consumers; ix++ { + qid := fmt.Sprintf("consumer-%d", ix) uq.addConsumerToConnection(qid) - // No querier has any queues yet. + // No consumer has any queues yet. q, u, _ := uq.getNextQueueForConsumer(-1, qid) assert.Nil(t, q) assert.Equal(t, "", u) @@ -130,19 +134,19 @@ func TestQueuesWithQueriers(t *testing.T) { // Add user queues. for u := 0; u < users; u++ { uid := fmt.Sprintf("user-%d", u) - getOrAdd(t, uq, uid, maxQueriersPerUser) + getOrAdd(t, uq, uid) - // Verify it has maxQueriersPerUser queriers assigned now. + // Verify it has maxConsumers consumers assigned now. qs := uq.mapping.GetByKey(uid).consumers - assert.Equal(t, maxQueriersPerUser, len(qs)) + assert.Equal(t, maxConsumers, len(qs)) } - // After adding all users, verify results. For each querier, find out how many different users it handles, + // After adding all users, verify results. For each consumer, find out how many different users it handles, // and compute mean and stdDev. - queriersMap := make(map[string]int) + consumerMap := make(map[string]int) - for q := 0; q < queriers; q++ { - qid := fmt.Sprintf("querier-%d", q) + for q := 0; q < consumers; q++ { + qid := fmt.Sprintf("consumer-%d", q) lastUserIndex := StartIndex for { @@ -151,25 +155,25 @@ func TestQueuesWithQueriers(t *testing.T) { break } lastUserIndex = newIx - queriersMap[qid]++ + consumerMap[qid]++ } } mean := float64(0) - for _, c := range queriersMap { + for _, c := range consumerMap { mean += float64(c) } - mean = mean / float64(len(queriersMap)) + mean = mean / float64(len(consumerMap)) stdDev := float64(0) - for _, c := range queriersMap { + for _, c := range consumerMap { d := float64(c) - mean stdDev += (d * d) } - stdDev = math.Sqrt(stdDev / float64(len(queriersMap))) + stdDev = math.Sqrt(stdDev / float64(len(consumerMap))) t.Log("mean:", mean, "stddev:", stdDev) - assert.InDelta(t, users*maxQueriersPerUser/queriers, mean, 1) + assert.InDelta(t, users*maxConsumers/consumers, mean, 1) assert.InDelta(t, stdDev, 0, mean*0.2) } @@ -183,7 +187,7 @@ func TestQueuesConsistency(t *testing.T) { for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - uq := newTenantQueues(0, testData.forgetDelay) + uq := newTenantQueues(0, testData.forgetDelay, &mockQueueLimits{maxConsumers: 3}) assert.NotNil(t, uq) assert.NoError(t, isConsistent(uq)) @@ -196,25 +200,27 @@ func TestQueuesConsistency(t *testing.T) { for i := 0; i < 10000; i++ { switch r.Int() % 6 { case 0: - assert.NotNil(t, uq.getOrAddQueue(generateTenant(r), generateActor(r), 3)) + q, err := uq.getOrAddQueue(generateTenant(r), generateActor(r)) + assert.NoError(t, err) + assert.NotNil(t, q) case 1: - qid := generateQuerier(r) + qid := generateConsumer(r) _, _, luid := uq.getNextQueueForConsumer(lastUserIndexes[qid], qid) lastUserIndexes[qid] = luid case 2: uq.deleteQueue(generateTenant(r)) case 3: - q := generateQuerier(r) + q := generateConsumer(r) uq.addConsumerToConnection(q) conns[q]++ case 4: - q := generateQuerier(r) + q := generateConsumer(r) if conns[q] > 0 { uq.removeConsumerConnection(q, time.Now()) conns[q]-- } case 5: - q := generateQuerier(r) + q := generateConsumer(r) uq.notifyQuerierShutdown(q) } @@ -226,166 +232,166 @@ func TestQueuesConsistency(t *testing.T) { func TestQueues_ForgetDelay(t *testing.T) { const ( - forgetDelay = time.Minute - maxQueriersPerUser = 1 - numUsers = 100 + forgetDelay = time.Minute + maxConsumers = 1 + numUsers = 100 ) now := time.Now() - uq := newTenantQueues(0, forgetDelay) + uq := newTenantQueues(0, forgetDelay, &mockQueueLimits{maxConsumers: maxConsumers}) assert.NotNil(t, uq) assert.NoError(t, isConsistent(uq)) - // 3 queriers open 2 connections each. + // 3 consumers open 2 connections each. for i := 1; i <= 3; i++ { - uq.addConsumerToConnection(fmt.Sprintf("querier-%d", i)) - uq.addConsumerToConnection(fmt.Sprintf("querier-%d", i)) + uq.addConsumerToConnection(fmt.Sprintf("consumer-%d", i)) + uq.addConsumerToConnection(fmt.Sprintf("consumer-%d", i)) } // Add user queues. for i := 0; i < numUsers; i++ { userID := fmt.Sprintf("user-%d", i) - getOrAdd(t, uq, userID, maxQueriersPerUser) + getOrAdd(t, uq, userID) } - // We expect querier-1 to have some users. - querier1Users := getUsersByQuerier(uq, "querier-1") - require.NotEmpty(t, querier1Users) + // We expect consumer-1 to have some users. + consumer1Users := getUsersByConsumer(uq, "consumer-1") + require.NotEmpty(t, consumer1Users) - // Gracefully shutdown querier-1. - uq.removeConsumerConnection("querier-1", now.Add(20*time.Second)) - uq.removeConsumerConnection("querier-1", now.Add(21*time.Second)) - uq.notifyQuerierShutdown("querier-1") + // Gracefully shutdown consumer-1. + uq.removeConsumerConnection("consumer-1", now.Add(20*time.Second)) + uq.removeConsumerConnection("consumer-1", now.Add(21*time.Second)) + uq.notifyQuerierShutdown("consumer-1") - // We expect querier-1 has been removed. - assert.NotContains(t, uq.consumers, "querier-1") + // We expect consumer-1 has been removed. + assert.NotContains(t, uq.consumers, "consumer-1") assert.NoError(t, isConsistent(uq)) - // We expect querier-1 users have been shuffled to other queriers. - for _, userID := range querier1Users { - assert.Contains(t, append(getUsersByQuerier(uq, "querier-2"), getUsersByQuerier(uq, "querier-3")...), userID) + // We expect consumer-1 users have been shuffled to other consumers. + for _, userID := range consumer1Users { + assert.Contains(t, append(getUsersByConsumer(uq, "consumer-2"), getUsersByConsumer(uq, "consumer-3")...), userID) } - // Querier-1 reconnects. - uq.addConsumerToConnection("querier-1") - uq.addConsumerToConnection("querier-1") + // Consumer-1 reconnects. + uq.addConsumerToConnection("consumer-1") + uq.addConsumerToConnection("consumer-1") - // We expect the initial querier-1 users have got back to querier-1. - for _, userID := range querier1Users { - assert.Contains(t, getUsersByQuerier(uq, "querier-1"), userID) - assert.NotContains(t, getUsersByQuerier(uq, "querier-2"), userID) - assert.NotContains(t, getUsersByQuerier(uq, "querier-3"), userID) + // We expect the initial consumer-1 users have got back to consumer-1. + for _, userID := range consumer1Users { + assert.Contains(t, getUsersByConsumer(uq, "consumer-1"), userID) + assert.NotContains(t, getUsersByConsumer(uq, "consumer-2"), userID) + assert.NotContains(t, getUsersByConsumer(uq, "consumer-3"), userID) } - // Querier-1 abruptly terminates (no shutdown notification received). - uq.removeConsumerConnection("querier-1", now.Add(40*time.Second)) - uq.removeConsumerConnection("querier-1", now.Add(41*time.Second)) + // Consumer-1 abruptly terminates (no shutdown notification received). + uq.removeConsumerConnection("consumer-1", now.Add(40*time.Second)) + uq.removeConsumerConnection("consumer-1", now.Add(41*time.Second)) - // We expect querier-1 has NOT been removed. - assert.Contains(t, uq.consumers, "querier-1") + // We expect consumer-1 has NOT been removed. + assert.Contains(t, uq.consumers, "consumer-1") assert.NoError(t, isConsistent(uq)) - // We expect the querier-1 users have not been shuffled to other queriers. - for _, userID := range querier1Users { - assert.Contains(t, getUsersByQuerier(uq, "querier-1"), userID) - assert.NotContains(t, getUsersByQuerier(uq, "querier-2"), userID) - assert.NotContains(t, getUsersByQuerier(uq, "querier-3"), userID) + // We expect the consumer-1 users have not been shuffled to other consumers. + for _, userID := range consumer1Users { + assert.Contains(t, getUsersByConsumer(uq, "consumer-1"), userID) + assert.NotContains(t, getUsersByConsumer(uq, "consumer-2"), userID) + assert.NotContains(t, getUsersByConsumer(uq, "consumer-3"), userID) } - // Try to forget disconnected queriers, but querier-1 forget delay hasn't passed yet. + // Try to forget disconnected consumers, but consumer-1 forget delay hasn't passed yet. uq.forgetDisconnectedConsumers(now.Add(90 * time.Second)) - assert.Contains(t, uq.consumers, "querier-1") + assert.Contains(t, uq.consumers, "consumer-1") assert.NoError(t, isConsistent(uq)) - for _, userID := range querier1Users { - assert.Contains(t, getUsersByQuerier(uq, "querier-1"), userID) - assert.NotContains(t, getUsersByQuerier(uq, "querier-2"), userID) - assert.NotContains(t, getUsersByQuerier(uq, "querier-3"), userID) + for _, userID := range consumer1Users { + assert.Contains(t, getUsersByConsumer(uq, "consumer-1"), userID) + assert.NotContains(t, getUsersByConsumer(uq, "consumer-2"), userID) + assert.NotContains(t, getUsersByConsumer(uq, "consumer-3"), userID) } - // Try to forget disconnected queriers. This time querier-1 forget delay has passed. + // Try to forget disconnected consumers. This time consumer-1 forget delay has passed. uq.forgetDisconnectedConsumers(now.Add(105 * time.Second)) - assert.NotContains(t, uq.consumers, "querier-1") + assert.NotContains(t, uq.consumers, "consumer-1") assert.NoError(t, isConsistent(uq)) - // We expect querier-1 users have been shuffled to other queriers. - for _, userID := range querier1Users { - assert.Contains(t, append(getUsersByQuerier(uq, "querier-2"), getUsersByQuerier(uq, "querier-3")...), userID) + // We expect consumer-1 users have been shuffled to other consumers. + for _, userID := range consumer1Users { + assert.Contains(t, append(getUsersByConsumer(uq, "consumer-2"), getUsersByConsumer(uq, "consumer-3")...), userID) } } -func TestQueues_ForgetDelay_ShouldCorrectlyHandleQuerierReconnectingBeforeForgetDelayIsPassed(t *testing.T) { +func TestQueues_ForgetDelay_ShouldCorrectlyHandleConsumerReconnectingBeforeForgetDelayIsPassed(t *testing.T) { const ( - forgetDelay = time.Minute - maxQueriersPerUser = 1 - numUsers = 100 + forgetDelay = time.Minute + maxConsumers = 1 + numUsers = 100 ) now := time.Now() - uq := newTenantQueues(0, forgetDelay) + uq := newTenantQueues(0, forgetDelay, &mockQueueLimits{maxConsumers: maxConsumers}) assert.NotNil(t, uq) assert.NoError(t, isConsistent(uq)) - // 3 queriers open 2 connections each. + // 3 consumers open 2 connections each. for i := 1; i <= 3; i++ { - uq.addConsumerToConnection(fmt.Sprintf("querier-%d", i)) - uq.addConsumerToConnection(fmt.Sprintf("querier-%d", i)) + uq.addConsumerToConnection(fmt.Sprintf("consumer-%d", i)) + uq.addConsumerToConnection(fmt.Sprintf("consumer-%d", i)) } // Add user queues. for i := 0; i < numUsers; i++ { userID := fmt.Sprintf("user-%d", i) - getOrAdd(t, uq, userID, maxQueriersPerUser) + getOrAdd(t, uq, userID) } - // We expect querier-1 to have some users. - querier1Users := getUsersByQuerier(uq, "querier-1") - require.NotEmpty(t, querier1Users) + // We expect consumer-1 to have some users. + consumer1Users := getUsersByConsumer(uq, "consumer-1") + require.NotEmpty(t, consumer1Users) - // Querier-1 abruptly terminates (no shutdown notification received). - uq.removeConsumerConnection("querier-1", now.Add(40*time.Second)) - uq.removeConsumerConnection("querier-1", now.Add(41*time.Second)) + // Consumer-1 abruptly terminates (no shutdown notification received). + uq.removeConsumerConnection("consumer-1", now.Add(40*time.Second)) + uq.removeConsumerConnection("consumer-1", now.Add(41*time.Second)) - // We expect querier-1 has NOT been removed. - assert.Contains(t, uq.consumers, "querier-1") + // We expect consumer-1 has NOT been removed. + assert.Contains(t, uq.consumers, "consumer-1") assert.NoError(t, isConsistent(uq)) - // We expect the querier-1 users have not been shuffled to other queriers. - for _, userID := range querier1Users { - assert.Contains(t, getUsersByQuerier(uq, "querier-1"), userID) - assert.NotContains(t, getUsersByQuerier(uq, "querier-2"), userID) - assert.NotContains(t, getUsersByQuerier(uq, "querier-3"), userID) + // We expect the consumer-1 users have not been shuffled to other consumers. + for _, userID := range consumer1Users { + assert.Contains(t, getUsersByConsumer(uq, "consumer-1"), userID) + assert.NotContains(t, getUsersByConsumer(uq, "consumer-2"), userID) + assert.NotContains(t, getUsersByConsumer(uq, "consumer-3"), userID) } - // Try to forget disconnected queriers, but querier-1 forget delay hasn't passed yet. + // Try to forget disconnected consumers, but consumer-1 forget delay hasn't passed yet. uq.forgetDisconnectedConsumers(now.Add(90 * time.Second)) - // Querier-1 reconnects. - uq.addConsumerToConnection("querier-1") - uq.addConsumerToConnection("querier-1") + // Consumer-1 reconnects. + uq.addConsumerToConnection("consumer-1") + uq.addConsumerToConnection("consumer-1") - assert.Contains(t, uq.consumers, "querier-1") + assert.Contains(t, uq.consumers, "consumer-1") assert.NoError(t, isConsistent(uq)) - // We expect the querier-1 users have not been shuffled to other queriers. - for _, userID := range querier1Users { - assert.Contains(t, getUsersByQuerier(uq, "querier-1"), userID) - assert.NotContains(t, getUsersByQuerier(uq, "querier-2"), userID) - assert.NotContains(t, getUsersByQuerier(uq, "querier-3"), userID) + // We expect the consumer-1 users have not been shuffled to other consumers. + for _, userID := range consumer1Users { + assert.Contains(t, getUsersByConsumer(uq, "consumer-1"), userID) + assert.NotContains(t, getUsersByConsumer(uq, "consumer-2"), userID) + assert.NotContains(t, getUsersByConsumer(uq, "consumer-3"), userID) } - // Try to forget disconnected queriers far in the future, but there's no disconnected querier. + // Try to forget disconnected consumers far in the future, but there's no disconnected consumer. uq.forgetDisconnectedConsumers(now.Add(200 * time.Second)) - assert.Contains(t, uq.consumers, "querier-1") + assert.Contains(t, uq.consumers, "consumer-1") assert.NoError(t, isConsistent(uq)) - for _, userID := range querier1Users { - assert.Contains(t, getUsersByQuerier(uq, "querier-1"), userID) - assert.NotContains(t, getUsersByQuerier(uq, "querier-2"), userID) - assert.NotContains(t, getUsersByQuerier(uq, "querier-3"), userID) + for _, userID := range consumer1Users { + assert.Contains(t, getUsersByConsumer(uq, "consumer-1"), userID) + assert.NotContains(t, getUsersByConsumer(uq, "consumer-2"), userID) + assert.NotContains(t, getUsersByConsumer(uq, "consumer-3"), userID) } } @@ -397,24 +403,27 @@ func generateTenant(r *rand.Rand) string { return fmt.Sprint("tenant-", r.Int()%5) } -func generateQuerier(r *rand.Rand) string { - return fmt.Sprint("querier-", r.Int()%5) +func generateConsumer(r *rand.Rand) string { + return fmt.Sprint("consumer-", r.Int()%5) } -func getOrAdd(t *testing.T, uq *tenantQueues, tenant string, maxQueriers int) Queue { +func getOrAdd(t *testing.T, uq *tenantQueues, tenant string) Queue { actor := []string{} - q := uq.getOrAddQueue(tenant, actor, maxQueriers) + q, err := uq.getOrAddQueue(tenant, actor) + assert.NoError(t, err) assert.NotNil(t, q) assert.NoError(t, isConsistent(uq)) - assert.Equal(t, q, uq.getOrAddQueue(tenant, actor, maxQueriers)) + q2, err := uq.getOrAddQueue(tenant, actor) + assert.NoError(t, err) + assert.Equal(t, q, q2) return q } -func confirmOrderForQuerier(t *testing.T, uq *tenantQueues, querier string, lastUserIndex QueueIndex, qs ...Queue) QueueIndex { +func confirmOrderForConsumer(t *testing.T, uq *tenantQueues, consumer string, lastUserIndex QueueIndex, qs ...Queue) QueueIndex { t.Helper() var n Queue for _, q := range qs { - n, _, lastUserIndex = uq.getNextQueueForConsumer(lastUserIndex, querier) + n, _, lastUserIndex = uq.getNextQueueForConsumer(lastUserIndex, consumer) assert.Equal(t, q, n) assert.NoError(t, isConsistent(uq)) } @@ -423,7 +432,7 @@ func confirmOrderForQuerier(t *testing.T, uq *tenantQueues, querier string, last func isConsistent(uq *tenantQueues) error { if len(uq.sortedConsumers) != len(uq.consumers) { - return fmt.Errorf("inconsistent number of sorted queriers and querier connections") + return fmt.Errorf("inconsistent number of sorted consumers and consumer connections") } uc := 0 @@ -441,16 +450,17 @@ func isConsistent(uq *tenantQueues) error { uc++ - if q.maxQueriers == 0 && q.consumers != nil { - return fmt.Errorf("user %s has queriers, but maxQueriers=0", u) + maxConsumers := uq.limits.MaxConsumers(u, len(uq.consumers)) + if maxConsumers == 0 && q.consumers != nil { + return fmt.Errorf("consumers for user %s should be nil when no limits are set (when MaxConsumers is 0)", u) } - if q.maxQueriers > 0 && len(uq.sortedConsumers) <= q.maxQueriers && q.consumers != nil { - return fmt.Errorf("user %s has queriers set despite not enough queriers available", u) + if maxConsumers > 0 && len(uq.sortedConsumers) <= maxConsumers && q.consumers != nil { + return fmt.Errorf("consumers for user %s should be nil when MaxConsumers allowed is higher than the available consumers", u) } - if q.maxQueriers > 0 && len(uq.sortedConsumers) > q.maxQueriers && len(q.consumers) != q.maxQueriers { - return fmt.Errorf("user %s has incorrect number of queriers, expected=%d, got=%d", u, len(q.consumers), q.maxQueriers) + if maxConsumers > 0 && len(uq.sortedConsumers) > maxConsumers && len(q.consumers) != maxConsumers { + return fmt.Errorf("user %s has incorrect number of consumers, expected=%d, got=%d", u, maxConsumers, len(q.consumers)) } } @@ -461,67 +471,75 @@ func isConsistent(uq *tenantQueues) error { return nil } -// getUsersByQuerier returns the list of users handled by the provided querierID. -func getUsersByQuerier(queues *tenantQueues, querierID string) []string { +// getUsersByConsumer returns the list of users handled by the provided consumerID. +func getUsersByConsumer(queues *tenantQueues, consumerID string) []string { var userIDs []string for _, userID := range queues.mapping.Keys() { q := queues.mapping.GetByKey(userID) if q.consumers == nil { - // If it's nil then all queriers can handle this user. + // If it's nil then all consumers can handle this user. userIDs = append(userIDs, userID) continue } - if _, ok := q.consumers[querierID]; ok { + if _, ok := q.consumers[consumerID]; ok { userIDs = append(userIDs, userID) } } return userIDs } -func TestShuffleQueriers(t *testing.T) { - allQueriers := []string{"a", "b", "c", "d", "e"} +func TestShuffleConsumers(t *testing.T) { + allConsumers := []string{"a", "b", "c", "d", "e"} - require.Nil(t, shuffleConsumersForTenants(12345, 10, allQueriers, nil)) - require.Nil(t, shuffleConsumersForTenants(12345, len(allQueriers), allQueriers, nil)) + require.Nil(t, shuffleConsumersForTenants(12345, 10, allConsumers, nil)) + require.Nil(t, shuffleConsumersForTenants(12345, len(allConsumers), allConsumers, nil)) - r1 := shuffleConsumersForTenants(12345, 3, allQueriers, nil) + r1 := shuffleConsumersForTenants(12345, 3, allConsumers, nil) require.Equal(t, 3, len(r1)) // Same input produces same output. - r2 := shuffleConsumersForTenants(12345, 3, allQueriers, nil) + r2 := shuffleConsumersForTenants(12345, 3, allConsumers, nil) require.Equal(t, 3, len(r2)) require.Equal(t, r1, r2) } -func TestShuffleQueriersCorrectness(t *testing.T) { - const queriersCount = 100 +func TestShuffleConsumersCorrectness(t *testing.T) { + const consumersCount = 100 - var allSortedQueriers []string - for i := 0; i < queriersCount; i++ { - allSortedQueriers = append(allSortedQueriers, fmt.Sprintf("%d", i)) + var allSortedConsumers []string + for i := 0; i < consumersCount; i++ { + allSortedConsumers = append(allSortedConsumers, fmt.Sprintf("%d", i)) } - sort.Strings(allSortedQueriers) + sort.Strings(allSortedConsumers) r := rand.New(rand.NewSource(time.Now().UnixNano())) const tests = 1000 for i := 0; i < tests; i++ { - toSelect := r.Intn(queriersCount) + toSelect := r.Intn(consumersCount) if toSelect == 0 { toSelect = 3 } - selected := shuffleConsumersForTenants(r.Int63(), toSelect, allSortedQueriers, nil) + selected := shuffleConsumersForTenants(r.Int63(), toSelect, allSortedConsumers, nil) require.Equal(t, toSelect, len(selected)) - sort.Strings(allSortedQueriers) - prevQuerier := "" - for _, q := range allSortedQueriers { - require.True(t, prevQuerier < q, "non-unique querier") - prevQuerier = q + sort.Strings(allSortedConsumers) + prevConsumer := "" + for _, q := range allSortedConsumers { + require.True(t, prevConsumer < q, "non-unique consumer") + prevConsumer = q - ix := sort.SearchStrings(allSortedQueriers, q) - require.True(t, ix < len(allSortedQueriers) && allSortedQueriers[ix] == q, "selected querier is not between all queriers") + ix := sort.SearchStrings(allSortedConsumers, q) + require.True(t, ix < len(allSortedConsumers) && allSortedConsumers[ix] == q, "selected consumer is not between all consumers") } } } + +type mockQueueLimits struct { + maxConsumers int +} + +func (l *mockQueueLimits) MaxConsumers(_ string, _ int) int { + return l.maxConsumers +} diff --git a/pkg/scheduler/limits/definitions.go b/pkg/scheduler/limits/definitions.go index 2a00db7d4a..e2c2e26cca 100644 --- a/pkg/scheduler/limits/definitions.go +++ b/pkg/scheduler/limits/definitions.go @@ -1,7 +1,46 @@ package limits +import ( + "math" +) + // Limits needed for the Query Scheduler - interface used for decoupling. type Limits interface { // MaxQueriersPerUser returns max queriers to use per tenant, or 0 if shuffle sharding is disabled. - MaxQueriersPerUser(user string) int + MaxQueriersPerUser(user string) uint + + // MaxQueryCapacity returns how much of the available query capacity can be used by this user. + MaxQueryCapacity(user string) float64 +} + +func NewQueueLimits(limits Limits) *QueueLimits { + return &QueueLimits{limits: limits} +} + +type QueueLimits struct { + limits Limits +} + +// MaxConsumers is used to compute how many of the available queriers are allowed to handle requests for a given tenant. +// Returns the min value or one of (frontend.max-queriers-per-tenant, ceil(querier_replicas * frontend.max-query-capacity)) +// depending of whether both or only one of the two limits are configured. +// 0 is returned when neither limits are applied. +func (c *QueueLimits) MaxConsumers(tenantID string, allConsumers int) int { + if c == nil || c.limits == nil { + return 0 + } + + maxQueriers := int(c.limits.MaxQueriersPerUser(tenantID)) + maxCapacity := c.limits.MaxQueryCapacity(tenantID) + + if maxCapacity == 0 { + return maxQueriers + } + + res := int(math.Ceil(float64(allConsumers) * maxCapacity)) + if maxQueriers != 0 && maxQueriers < res { + return maxQueriers + } + + return res } diff --git a/pkg/scheduler/limits/definitions_test.go b/pkg/scheduler/limits/definitions_test.go new file mode 100644 index 0000000000..26139e2186 --- /dev/null +++ b/pkg/scheduler/limits/definitions_test.go @@ -0,0 +1,78 @@ +package limits + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestQueueLimitsMaxConsumers(t *testing.T) { + for name, tt := range map[string]struct { + limits *QueueLimits + expected int + }{ + "nil limits": { + limits: NewQueueLimits(nil), + expected: 0, + }, + "no limits": { + limits: NewQueueLimits(mockLimits{ + maxQueriers: 0, + maxQueryCapacity: 0, + }), + expected: 0, + }, + "enforce max queriers": { + limits: NewQueueLimits(mockLimits{ + maxQueriers: 5, + maxQueryCapacity: 0, + }), + expected: 5, + }, + "prefer max queriers over query capacity": { + limits: NewQueueLimits(mockLimits{ + maxQueriers: 5, + maxQueryCapacity: 1.0, + }), + expected: 5, + }, + "enforce max query capacity": { + limits: NewQueueLimits(mockLimits{ + maxQueriers: 0, + maxQueryCapacity: 0.5, + }), + expected: 5, + }, + "prefer query capacity over max queriers": { + limits: NewQueueLimits(mockLimits{ + maxQueriers: 5, + maxQueryCapacity: 0.4, + }), + expected: 4, + }, + "query capacity of 1.0": { + limits: NewQueueLimits(mockLimits{ + maxQueryCapacity: 1.0, + }), + expected: 10, + }, + } { + t.Run(name, func(t *testing.T) { + res := tt.limits.MaxConsumers("", 10) + assert.Equal(t, tt.expected, res) + }) + } +} + +type mockLimits struct { + maxQueriers uint + maxQueryCapacity float64 +} + +func (l mockLimits) MaxQueriersPerUser(_ string) uint { + return l.maxQueriers +} + +func (l mockLimits) MaxQueryCapacity(_ string) float64 { + return l.maxQueryCapacity +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 305d47b17e..5cd163ff0f 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -19,7 +19,6 @@ import ( "github.com/grafana/dskit/middleware" "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" - "github.com/grafana/dskit/tenant" "github.com/grafana/dskit/user" otgrpc "github.com/opentracing-contrib/go-grpc" "github.com/opentracing/opentracing-go" @@ -38,7 +37,6 @@ import ( lokigrpc "github.com/grafana/loki/pkg/util/httpgrpc" lokihttpreq "github.com/grafana/loki/pkg/util/httpreq" lokiring "github.com/grafana/loki/pkg/util/ring" - "github.com/grafana/loki/pkg/util/validation" ) var errSchedulerIsNotRunning = errors.New("scheduler is not running") @@ -117,7 +115,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { } // NewScheduler creates a new Scheduler. -func NewScheduler(cfg Config, limits Limits, log log.Logger, ringManager *lokiring.RingManager, registerer prometheus.Registerer, metricsNamespace string) (*Scheduler, error) { +func NewScheduler(cfg Config, schedulerLimits Limits, log log.Logger, ringManager *lokiring.RingManager, registerer prometheus.Registerer, metricsNamespace string) (*Scheduler, error) { if cfg.UseSchedulerRing { if ringManager == nil { return nil, errors.New("ring manager can't be empty when use_scheduler_ring is true") @@ -130,13 +128,13 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, ringManager *lokiri s := &Scheduler{ cfg: cfg, log: log, - limits: limits, + limits: schedulerLimits, pendingRequests: map[requestKey]*schedulerRequest{}, connectedFrontends: map[string]*connectedFrontend{}, queueMetrics: queueMetrics, ringManager: ringManager, - requestQueue: queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, queueMetrics), + requestQueue: queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, limits.NewQueueLimits(schedulerLimits), queueMetrics), } s.queueDuration = promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ @@ -353,13 +351,6 @@ func (s *Scheduler) enqueueRequest(frontendContext context.Context, frontendAddr req.queueTime = now req.ctxCancel = cancel - // aggregate the max queriers limit in the case of a multi tenant query - tenantIDs, err := tenant.TenantIDsFromOrgID(req.tenantID) - if err != nil { - return err - } - maxQueriers := validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, s.limits.MaxQueriersPerUser) - var queuePath []string if s.cfg.MaxQueueHierarchyLevels > 0 { queuePath = msg.QueuePath @@ -378,7 +369,7 @@ func (s *Scheduler) enqueueRequest(frontendContext context.Context, frontendAddr } s.activeUsers.UpdateUserTimestamp(req.tenantID, now) - return s.requestQueue.Enqueue(req.tenantID, queuePath, req, maxQueriers, func() { + return s.requestQueue.Enqueue(req.tenantID, queuePath, req, func() { shouldCancel = false s.pendingRequestsMu.Lock() diff --git a/pkg/storage/stores/shipper/bloomshipper/block_downloader.go b/pkg/storage/stores/shipper/bloomshipper/block_downloader.go index 26c9c4b3f2..edf666c8e9 100644 --- a/pkg/storage/stores/shipper/bloomshipper/block_downloader.go +++ b/pkg/storage/stores/shipper/bloomshipper/block_downloader.go @@ -39,10 +39,18 @@ type blockDownloader struct { wg sync.WaitGroup } +type queueLimits struct { + limits Limits +} + +func (l *queueLimits) MaxConsumers(tenantID string, _ int) int { + return l.limits.BloomGatewayBlocksDownloadingParallelism(tenantID) +} + func newBlockDownloader(config config.Config, blockClient BlockClient, limits Limits, logger log.Logger, reg prometheus.Registerer) (*blockDownloader, error) { queueMetrics := queue.NewMetrics(reg, constants.Loki, "bloom_blocks_downloader") //add cleanup service - downloadingQueue := queue.NewRequestQueue(config.BlocksDownloadingQueue.MaxTasksEnqueuedPerTenant, time.Minute, queueMetrics) + downloadingQueue := queue.NewRequestQueue(config.BlocksDownloadingQueue.MaxTasksEnqueuedPerTenant, time.Minute, &queueLimits{limits: limits}, queueMetrics) activeUsersService := util.NewActiveUsersCleanupWithDefaultValues(queueMetrics.Cleanup) ctx := context.Background() @@ -155,11 +163,10 @@ func (d *blockDownloader) downloadBlocks(ctx context.Context, tenantID string, r errCh := make(chan error, len(references)) blocksCh := make(chan blockWithQuerier, len(references)) - downloadingParallelism := d.limits.BloomGatewayBlocksDownloadingParallelism(tenantID) for _, reference := range references { task := NewBlockDownloadingTask(ctx, reference, blocksCh, errCh) level.Debug(d.logger).Log("msg", "enqueuing task to download block", "block", reference.BlockPath) - err := d.queue.Enqueue(tenantID, nil, task, downloadingParallelism, nil) + err := d.queue.Enqueue(tenantID, nil, task, nil) if err != nil { errCh <- fmt.Errorf("error enquing downloading task for block %s : %w", reference.BlockPath, err) return blocksCh, errCh diff --git a/pkg/validation/exporter.go b/pkg/validation/exporter.go index bbc26d1b54..ad9dde8574 100644 --- a/pkg/validation/exporter.go +++ b/pkg/validation/exporter.go @@ -52,7 +52,7 @@ func (oe *OverridesExporter) Collect(ch chan<- prometheus.Metric) { return float64(val.Field(i).Int()), true case model.Duration: return float64(val.Field(i).Interface().(model.Duration)), true - case flagext.ByteSize: + case uint, flagext.ByteSize: return float64(val.Field(i).Uint()), true case float64: return val.Field(i).Float(), true diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index c4e38a898d..0234730245 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -98,7 +98,8 @@ type Limits struct { MaxEntriesLimitPerQuery int `yaml:"max_entries_limit_per_query" json:"max_entries_limit_per_query"` MaxCacheFreshness model.Duration `yaml:"max_cache_freshness_per_query" json:"max_cache_freshness_per_query"` MaxStatsCacheFreshness model.Duration `yaml:"max_stats_cache_freshness" json:"max_stats_cache_freshness"` - MaxQueriersPerTenant int `yaml:"max_queriers_per_tenant" json:"max_queriers_per_tenant"` + MaxQueriersPerTenant uint `yaml:"max_queriers_per_tenant" json:"max_queriers_per_tenant"` + MaxQueryCapacity float64 `yaml:"max_query_capacity" json:"max_query_capacity"` QueryReadyIndexNumDays int `yaml:"query_ready_index_num_days" json:"query_ready_index_num_days"` QueryTimeout model.Duration `yaml:"query_timeout" json:"query_timeout"` @@ -276,7 +277,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { _ = l.MaxStatsCacheFreshness.Set("10m") f.Var(&l.MaxStatsCacheFreshness, "frontend.max-stats-cache-freshness", "Do not cache requests with an end time that falls within Now minus this duration. 0 disables this feature (default).") - f.IntVar(&l.MaxQueriersPerTenant, "frontend.max-queriers-per-tenant", 0, "Maximum number of queriers that can handle requests for a single tenant. If set to 0 or value higher than number of available queriers, *all* queriers will handle requests for the tenant. Each frontend (or query-scheduler, if used) will select the same set of queriers for the same tenant (given that all queriers are connected to all frontends / query-schedulers). This option only works with queriers connecting to the query-frontend / query-scheduler, not when using downstream URL.") + f.UintVar(&l.MaxQueriersPerTenant, "frontend.max-queriers-per-tenant", 0, "Maximum number of queriers that can handle requests for a single tenant. If set to 0 or value higher than number of available queriers, *all* queriers will handle requests for the tenant. Each frontend (or query-scheduler, if used) will select the same set of queriers for the same tenant (given that all queriers are connected to all frontends / query-schedulers). This option only works with queriers connecting to the query-frontend / query-scheduler, not when using downstream URL.") + f.Float64Var(&l.MaxQueryCapacity, "frontend.max-query-capacity", 0, "How much of the available query capacity (\"querier\" components in distributed mode, \"read\" components in SSD mode) can be used by a single tenant. Allowed values are 0.0 to 1.0. For example, setting this to 0.5 would allow a tenant to use half of the available queriers for processing the query workload. If set to 0, query capacity is determined by frontend.max-queriers-per-tenant. When both frontend.max-queriers-per-tenant and frontend.max-query-capacity are configured, smaller value of the resulting querier replica count is considered: min(frontend.max-queriers-per-tenant, ceil(querier_replicas * frontend.max-query-capacity)). *All* queriers will handle requests for the tenant if neither limits are applied. This option only works with queriers connecting to the query-frontend / query-scheduler, not when using downstream URL. Use this feature in a multi-tenant setup where you need to limit query capacity for certain tenants.") f.IntVar(&l.QueryReadyIndexNumDays, "store.query-ready-index-num-days", 0, "Number of days of index to be kept always downloaded for queries. Applies only to per user index in boltdb-shipper index store. 0 to disable.") f.IntVar(&l.RulerMaxRulesPerRuleGroup, "ruler.max-rules-per-rule-group", 0, "Maximum number of rules per rule group per-tenant. 0 to disable.") @@ -368,6 +370,16 @@ func (l *Limits) Validate() error { level.Warn(util_log.Logger).Log("msg", "The compactor.allow-deletes configuration option has been deprecated and will be ignored. Instead, use deletion_mode in the limits_configs to adjust deletion functionality") } + if l.MaxQueryCapacity < 0 { + level.Warn(util_log.Logger).Log("msg", "setting frontend.max-query-capacity to 0 as it is configured to a value less than 0") + l.MaxQueryCapacity = 0 + } + + if l.MaxQueryCapacity > 1 { + level.Warn(util_log.Logger).Log("msg", "setting frontend.max-query-capacity to 1 as it is configured to a value greater than 1") + l.MaxQueryCapacity = 1 + } + return nil } @@ -502,10 +514,15 @@ func (o *Overrides) MaxQueryRange(_ context.Context, userID string) time.Duratio } // MaxQueriersPerUser returns the maximum number of queriers that can handle requests for this user. -func (o *Overrides) MaxQueriersPerUser(userID string) int { +func (o *Overrides) MaxQueriersPerUser(userID string) uint { return o.getOverridesForUser(userID).MaxQueriersPerTenant } +// MaxQueryCapacity returns how much of the available query capacity can be used by this user.. +func (o *Overrides) MaxQueryCapacity(userID string) float64 { + return o.getOverridesForUser(userID).MaxQueryCapacity +} + // QueryReadyIndexNumDays returns the number of days for which we have to be query ready for a user. func (o *Overrides) QueryReadyIndexNumDays(userID string) int { return o.getOverridesForUser(userID).QueryReadyIndexNumDays