chore: Revert "chore: enforce max-query-series when returning shard results" (#18156)

pull/18161/head
Ed Welch 7 months ago committed by GitHub
parent 243a1e4efb
commit 970ea61143
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 42
      pkg/logql/accumulator.go
  2. 21
      pkg/querier/queryrange/downstreamer.go
  3. 48
      pkg/querier/queryrange/downstreamer_test.go
  4. 2
      pkg/querier/queryrange/roundtrip_test.go

@ -9,8 +9,6 @@ import (
"sort"
"time"
"github.com/prometheus/prometheus/promql"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logqlmodel"
"github.com/grafana/loki/v3/pkg/logqlmodel/metadata"
@ -505,43 +503,3 @@ func (acc *AccumulatedStreams) Accumulate(_ context.Context, x logqlmodel.Result
}
return nil
}
// LimitingAccumulator wraps another Accumulator and enforces a total series/stream limit.
type LimitingAccumulator struct {
inner Accumulator
limit int
count int
}
func NewLimitingAccumulator(inner Accumulator, limit int) *LimitingAccumulator {
return &LimitingAccumulator{
inner: inner,
limit: limit,
}
}
func (a *LimitingAccumulator) Accumulate(ctx context.Context, res logqlmodel.Result, idx int) error {
// If the result contains a matrix or vector, count the number of series/streams.
// We do a simple series/stream count here because our sharding dispatches unique streams to each shard,
// while this doesn't actually guarantee the results from each shard will have unique streams I believe
// this to be good enough for approximation and enforcing the stream limit per query sooner to avoid
// allocationg all the memory for a query only then to find out it exceeds the limit.
var n int
switch data := res.Data.(type) {
case promql.Matrix:
n = len(data)
case promql.Vector:
n = len(data)
default:
n = 0
}
a.count += n
if a.limit > 0 && a.count > a.limit {
return logqlmodel.NewSeriesLimitError(a.limit)
}
return a.inner.Accumulate(ctx, res, idx)
}
func (a *LimitingAccumulator) Result() []logqlmodel.Result {
return a.inner.Result()
}

@ -94,7 +94,6 @@ func (h DownstreamHandler) Downstreamer(ctx context.Context) logql.Downstreamer
locks: locks,
handler: h.next,
splitAlign: h.splitAlign,
limits: h.limits,
}
}
@ -105,7 +104,6 @@ type instance struct {
handler queryrangebase.Handler
splitAlign bool
limits Limits
}
// withoutOffset returns the given query string with offsets removed and timestamp adjusted accordingly. If no offset is present in original query, it will be returned as is.
@ -136,16 +134,6 @@ func withoutOffset(query logql.DownstreamQuery) (string, time.Time, time.Time) {
}
func (in instance) Downstream(ctx context.Context, queries []logql.DownstreamQuery, acc logql.Accumulator) ([]logqlmodel.Result, error) {
// Get the user/tenant ID from context
user, _ := tenant.TenantID(ctx)
// Get the max_query_series limit from the instance's limits
maxSeries := 0
if in.limits != nil {
maxSeries = in.limits.MaxQuerySeries(ctx, user)
}
if maxSeries > 0 {
acc = logql.NewLimitingAccumulator(acc, maxSeries)
}
return in.For(ctx, queries, acc, func(qry logql.DownstreamQuery) (logqlmodel.Result, error) {
var req queryrangebase.Request
if in.splitAlign {
@ -219,10 +207,7 @@ func (in instance) For(
for {
select {
case <-ctx.Done():
// Prefer returning the accumulator error if it exists
if err != nil {
return acc.Result(), err
}
// Return early if the context is canceled
return acc.Result(), ctx.Err()
case resp, ok := <-ch:
if !ok {
@ -237,10 +222,6 @@ func (in instance) For(
continue
}
err = acc.Accumulate(ctx, resp.Res, resp.I)
if err != nil {
cancel() // Cancel all workers immediately
continue
}
}
}
}

@ -603,51 +603,3 @@ func TestDownstreamerUsesCorrectParallelism(t *testing.T) {
}
require.Equal(t, l.maxQueryParallelism, ct)
}
func TestDownstreamer_EnforcesMaxQuerySeriesLimit(t *testing.T) {
// Set up a fakeLimits with a low maxSeries
l := fakeLimits{maxSeries: 2}
params, err := logql.NewLiteralParams(
`{foo="bar"}`,
time.Now(),
time.Now(),
0,
0,
logproto.BACKWARD,
1000,
nil,
nil,
)
require.NoError(t, err)
// Create 2 queries, each will return 2 series, so total = 4 > limit
queries := []logql.DownstreamQuery{
{Params: params},
{Params: params},
}
// Handler returns a matrix with 2 series for each query
handler := queryrangebase.HandlerFunc(func(_ context.Context, _ queryrangebase.Request) (queryrangebase.Response, error) {
return &LokiPromResponse{
Response: &queryrangebase.PrometheusResponse{
Data: queryrangebase.PrometheusData{
Result: []queryrangebase.SampleStream{
{Labels: []logproto.LabelAdapter{{Name: "foo", Value: "bar1"}}, Samples: []logproto.LegacySample{{Value: 1, TimestampMs: 1}}},
{Labels: []logproto.LabelAdapter{{Name: "foo", Value: "bar2"}}, Samples: []logproto.LegacySample{{Value: 2, TimestampMs: 2}}},
},
},
},
}, nil
})
dh := DownstreamHandler{
limits: l,
next: handler,
}
ds := dh.Downstreamer(context.Background())
acc := logql.NewBufferedAccumulator(len(queries))
_, err = ds.Downstream(context.Background(), queries, acc)
require.Error(t, err)
require.True(t, errors.Is(err, logqlmodel.ErrLimit), "expected ErrLimit, got: %v", err)
}

@ -354,7 +354,7 @@ func TestInstantQueryTripperwareResultCaching(t *testing.T) {
maxQueryBytesRead: 1000,
maxQuerierBytesRead: 100,
queryTimeout: 1 * time.Minute,
maxSeries: 10,
maxSeries: 1,
}
tpw, stopper, err := NewMiddleware(testLocal, testEngineOpts, nil, util_log.Logger, l, config.SchemaConfig{Configs: testSchemasTSDB}, nil, false, nil, constants.Loki)
if stopper != nil {

Loading…
Cancel
Save