From 970ea611436382a2d5d6bc92545dfcbf02b0dd2a Mon Sep 17 00:00:00 2001 From: Ed Welch Date: Wed, 18 Jun 2025 17:20:04 -0400 Subject: [PATCH] chore: Revert "chore: enforce max-query-series when returning shard results" (#18156) --- pkg/logql/accumulator.go | 42 ------------------ pkg/querier/queryrange/downstreamer.go | 21 +-------- pkg/querier/queryrange/downstreamer_test.go | 48 --------------------- pkg/querier/queryrange/roundtrip_test.go | 2 +- 4 files changed, 2 insertions(+), 111 deletions(-) diff --git a/pkg/logql/accumulator.go b/pkg/logql/accumulator.go index f36b71a5a1..2194c4a4cb 100644 --- a/pkg/logql/accumulator.go +++ b/pkg/logql/accumulator.go @@ -9,8 +9,6 @@ import ( "sort" "time" - "github.com/prometheus/prometheus/promql" - "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logqlmodel" "github.com/grafana/loki/v3/pkg/logqlmodel/metadata" @@ -505,43 +503,3 @@ func (acc *AccumulatedStreams) Accumulate(_ context.Context, x logqlmodel.Result } return nil } - -// LimitingAccumulator wraps another Accumulator and enforces a total series/stream limit. -type LimitingAccumulator struct { - inner Accumulator - limit int - count int -} - -func NewLimitingAccumulator(inner Accumulator, limit int) *LimitingAccumulator { - return &LimitingAccumulator{ - inner: inner, - limit: limit, - } -} - -func (a *LimitingAccumulator) Accumulate(ctx context.Context, res logqlmodel.Result, idx int) error { - // If the result contains a matrix or vector, count the number of series/streams. - // We do a simple series/stream count here because our sharding dispatches unique streams to each shard, - // while this doesn't actually guarantee the results from each shard will have unique streams I believe - // this to be good enough for approximation and enforcing the stream limit per query sooner to avoid - // allocationg all the memory for a query only then to find out it exceeds the limit. - var n int - switch data := res.Data.(type) { - case promql.Matrix: - n = len(data) - case promql.Vector: - n = len(data) - default: - n = 0 - } - a.count += n - if a.limit > 0 && a.count > a.limit { - return logqlmodel.NewSeriesLimitError(a.limit) - } - return a.inner.Accumulate(ctx, res, idx) -} - -func (a *LimitingAccumulator) Result() []logqlmodel.Result { - return a.inner.Result() -} diff --git a/pkg/querier/queryrange/downstreamer.go b/pkg/querier/queryrange/downstreamer.go index b92d6a05dc..8e6cb0c6ce 100644 --- a/pkg/querier/queryrange/downstreamer.go +++ b/pkg/querier/queryrange/downstreamer.go @@ -94,7 +94,6 @@ func (h DownstreamHandler) Downstreamer(ctx context.Context) logql.Downstreamer locks: locks, handler: h.next, splitAlign: h.splitAlign, - limits: h.limits, } } @@ -105,7 +104,6 @@ type instance struct { handler queryrangebase.Handler splitAlign bool - limits Limits } // withoutOffset returns the given query string with offsets removed and timestamp adjusted accordingly. If no offset is present in original query, it will be returned as is. @@ -136,16 +134,6 @@ func withoutOffset(query logql.DownstreamQuery) (string, time.Time, time.Time) { } func (in instance) Downstream(ctx context.Context, queries []logql.DownstreamQuery, acc logql.Accumulator) ([]logqlmodel.Result, error) { - // Get the user/tenant ID from context - user, _ := tenant.TenantID(ctx) - // Get the max_query_series limit from the instance's limits - maxSeries := 0 - if in.limits != nil { - maxSeries = in.limits.MaxQuerySeries(ctx, user) - } - if maxSeries > 0 { - acc = logql.NewLimitingAccumulator(acc, maxSeries) - } return in.For(ctx, queries, acc, func(qry logql.DownstreamQuery) (logqlmodel.Result, error) { var req queryrangebase.Request if in.splitAlign { @@ -219,10 +207,7 @@ func (in instance) For( for { select { case <-ctx.Done(): - // Prefer returning the accumulator error if it exists - if err != nil { - return acc.Result(), err - } + // Return early if the context is canceled return acc.Result(), ctx.Err() case resp, ok := <-ch: if !ok { @@ -237,10 +222,6 @@ func (in instance) For( continue } err = acc.Accumulate(ctx, resp.Res, resp.I) - if err != nil { - cancel() // Cancel all workers immediately - continue - } } } } diff --git a/pkg/querier/queryrange/downstreamer_test.go b/pkg/querier/queryrange/downstreamer_test.go index 47086e67b2..a10913f223 100644 --- a/pkg/querier/queryrange/downstreamer_test.go +++ b/pkg/querier/queryrange/downstreamer_test.go @@ -603,51 +603,3 @@ func TestDownstreamerUsesCorrectParallelism(t *testing.T) { } require.Equal(t, l.maxQueryParallelism, ct) } - -func TestDownstreamer_EnforcesMaxQuerySeriesLimit(t *testing.T) { - // Set up a fakeLimits with a low maxSeries - l := fakeLimits{maxSeries: 2} - - params, err := logql.NewLiteralParams( - `{foo="bar"}`, - time.Now(), - time.Now(), - 0, - 0, - logproto.BACKWARD, - 1000, - nil, - nil, - ) - require.NoError(t, err) - - // Create 2 queries, each will return 2 series, so total = 4 > limit - queries := []logql.DownstreamQuery{ - {Params: params}, - {Params: params}, - } - - // Handler returns a matrix with 2 series for each query - handler := queryrangebase.HandlerFunc(func(_ context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) { - return &LokiPromResponse{ - Response: &queryrangebase.PrometheusResponse{ - Data: queryrangebase.PrometheusData{ - Result: []queryrangebase.SampleStream{ - {Labels: []logproto.LabelAdapter{{Name: "foo", Value: "bar1"}}, Samples: []logproto.LegacySample{{Value: 1, TimestampMs: 1}}}, - {Labels: []logproto.LabelAdapter{{Name: "foo", Value: "bar2"}}, Samples: []logproto.LegacySample{{Value: 2, TimestampMs: 2}}}, - }, - }, - }, - }, nil - }) - - dh := DownstreamHandler{ - limits: l, - next: handler, - } - ds := dh.Downstreamer(context.Background()) - acc := logql.NewBufferedAccumulator(len(queries)) - _, err = ds.Downstream(context.Background(), queries, acc) - require.Error(t, err) - require.True(t, errors.Is(err, logqlmodel.ErrLimit), "expected ErrLimit, got: %v", err) -} diff --git a/pkg/querier/queryrange/roundtrip_test.go b/pkg/querier/queryrange/roundtrip_test.go index 74ea79dcef..97e2f8f612 100644 --- a/pkg/querier/queryrange/roundtrip_test.go +++ b/pkg/querier/queryrange/roundtrip_test.go @@ -354,7 +354,7 @@ func TestInstantQueryTripperwareResultCaching(t *testing.T) { maxQueryBytesRead: 1000, maxQuerierBytesRead: 100, queryTimeout: 1 * time.Minute, - maxSeries: 10, + maxSeries: 1, } tpw, stopper, err := NewMiddleware(testLocal, testEngineOpts, nil, util_log.Logger, l, config.SchemaConfig{Configs: testSchemasTSDB}, nil, false, nil, constants.Loki) if stopper != nil {