Limit series for metric queries. (#2903)

* Work in progress limit series.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* First draft of the series limiter.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add limiter to query-frontend.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add documentation.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fix build

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* fmted

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* lint and build fix

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Update docs/sources/configuration/_index.md

Co-authored-by: achatterjee-grafana <70489351+achatterjee-grafana@users.noreply.github.com>

* Review feedback.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Reduce // for to fix flaky test.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

Co-authored-by: achatterjee-grafana <70489351+achatterjee-grafana@users.noreply.github.com>
pull/2946/head
Cyril Tovena 5 years ago committed by GitHub
parent 901562e040
commit 61ba02e2c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      docs/sources/configuration/_index.md
  2. 2
      pkg/logcli/query/query.go
  3. 2
      pkg/logcli/query/query_test.go
  4. 25
      pkg/logql/engine.go
  5. 55
      pkg/logql/engine_test.go
  6. 42
      pkg/logql/error.go
  7. 22
      pkg/logql/limits.go
  8. 2
      pkg/logql/parser.go
  9. 2
      pkg/logql/parser_test.go
  10. 5
      pkg/logql/sharding.go
  11. 7
      pkg/logql/sharding_test.go
  12. 2
      pkg/loki/modules.go
  13. 2
      pkg/querier/querier.go
  14. 73
      pkg/querier/queryrange/limits.go
  15. 101
      pkg/querier/queryrange/limits_test.go
  16. 6
      pkg/querier/queryrange/querysharding.go
  17. 3
      pkg/querier/queryrange/querysharding_test.go
  18. 2
      pkg/querier/queryrange/roundtrip.go
  19. 8
      pkg/querier/queryrange/roundtrip_test.go
  20. 11
      pkg/querier/queryrange/split_by_interval.go
  21. 5
      pkg/util/server/error.go
  22. 2
      pkg/util/server/error_test.go
  23. 7
      pkg/util/validation/limits.go

@ -1648,6 +1648,11 @@ logs in Loki.
# CLI flag: -querier.max-query-parallelism
[max_query_parallelism: <int> | default = 14]
# Limit the maximum of unique series that is returned by a metric query.
# When the limit is reached an error is returned.
# CLI flag: -querier.max-query-series
[max_query_series: <int> | default = 500]
# Cardinality limit for index queries.
# CLI flag: -store.cardinality-limit
[cardinality_limit: <int> | default = 100000]
@ -1881,4 +1886,4 @@ multi_kv_config:
mirror-enabled: false
primary: consul
```
### Generic placeholders
### Generic placeholders

@ -201,7 +201,7 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string
return err
}
eng := logql.NewEngine(conf.Querier.Engine, querier)
eng := logql.NewEngine(conf.Querier.Engine, querier, limits)
var query logql.Query
if q.isInstant() {

@ -502,7 +502,7 @@ type testQueryClient struct {
func newTestQueryClient(testStreams ...logproto.Stream) *testQueryClient {
q := logql.NewMockQuerier(0, testStreams)
e := logql.NewEngine(logql.EngineOpts{}, q)
e := logql.NewEngine(logql.EngineOpts{}, q, logql.NoLimits)
return &testQueryClient{
engine: e,
queryRangeCalls: 0,

@ -15,6 +15,7 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
promql_parser "github.com/prometheus/prometheus/promql/parser"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/iter"
@ -85,14 +86,16 @@ func (opts *EngineOpts) applyDefault() {
type Engine struct {
timeout time.Duration
evaluator Evaluator
limits Limits
}
// NewEngine creates a new LogQL Engine.
func NewEngine(opts EngineOpts, q Querier) *Engine {
func NewEngine(opts EngineOpts, q Querier, l Limits) *Engine {
opts.applyDefault()
return &Engine{
timeout: opts.Timeout,
evaluator: NewDefaultEvaluator(q, opts.MaxLookBackPeriod),
limits: l,
}
}
@ -106,6 +109,7 @@ func (ng *Engine) Query(params Params) Query {
return ParseExpr(query)
},
record: true,
limits: ng.limits,
}
}
@ -119,6 +123,7 @@ type query struct {
timeout time.Duration
params Params
parse func(context.Context, string) (Expr, error)
limits Limits
evaluator Evaluator
record bool
}
@ -145,7 +150,7 @@ func (q *query) Exec(ctx context.Context) (Result, error) {
status := "200"
if err != nil {
status = "500"
if IsParseError(err) || IsPipelineError(err) {
if errors.Is(err, ErrParse) || errors.Is(err, ErrPipeline) || errors.Is(err, ErrLimit) {
status = "400"
}
}
@ -194,6 +199,11 @@ func (q *query) evalSample(ctx context.Context, expr SampleExpr) (promql_parser.
return q.evalLiteral(ctx, lit)
}
userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}
stepEvaluator, err := q.evaluator.StepEvaluator(ctx, q.evaluator, expr, q.params)
if err != nil {
return nil, err
@ -201,11 +211,18 @@ func (q *query) evalSample(ctx context.Context, expr SampleExpr) (promql_parser.
defer helpers.LogErrorWithContext(ctx, "closing SampleExpr", stepEvaluator.Close)
seriesIndex := map[uint64]*promql.Series{}
maxSeries := q.limits.MaxQuerySeries(userID)
next, ts, vec := stepEvaluator.Next()
if stepEvaluator.Error() != nil {
return nil, stepEvaluator.Error()
}
// fail fast for the first step or instant query
if len(vec) > maxSeries {
return nil, newSeriesLimitError(maxSeries)
}
if GetRangeType(q.params) == InstantType {
sort.Slice(vec, func(i, j int) bool { return labels.Compare(vec[i].Metric, vec[j].Metric) < 0 })
return vec, nil
@ -237,6 +254,10 @@ func (q *query) evalSample(ctx context.Context, expr SampleExpr) (promql_parser.
V: p.V,
})
}
// as we slowly build the full query for each steps, make sure we don't go over the limit of unique series.
if len(seriesIndex) > maxSeries {
return nil, newSeriesLimitError(maxSeries)
}
next, ts, vec = stepEvaluator.Next()
if stepEvaluator.Error() != nil {
return nil, stepEvaluator.Error()

@ -16,6 +16,7 @@ import (
promql_parser "github.com/prometheus/prometheus/promql/parser"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
@ -460,7 +461,7 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
t.Run(fmt.Sprintf("%s %s", test.qs, test.direction), func(t *testing.T) {
t.Parallel()
eng := NewEngine(EngineOpts{}, newQuerierRecorder(t, test.data, test.params))
eng := NewEngine(EngineOpts{}, newQuerierRecorder(t, test.data, test.params), NoLimits)
q := eng.Query(LiteralParams{
qs: test.qs,
start: test.ts,
@ -468,7 +469,7 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
direction: test.direction,
limit: test.limit,
})
res, err := q.Exec(context.Background())
res, err := q.Exec(user.InjectOrgID(context.Background(), "fake"))
if err != nil {
t.Fatal(err)
}
@ -1513,7 +1514,7 @@ func TestEngine_RangeQuery(t *testing.T) {
t.Run(fmt.Sprintf("%s %s", test.qs, test.direction), func(t *testing.T) {
t.Parallel()
eng := NewEngine(EngineOpts{}, newQuerierRecorder(t, test.data, test.params))
eng := NewEngine(EngineOpts{}, newQuerierRecorder(t, test.data, test.params), NoLimits)
q := eng.Query(LiteralParams{
qs: test.qs,
@ -1524,7 +1525,7 @@ func TestEngine_RangeQuery(t *testing.T) {
direction: test.direction,
limit: test.limit,
})
res, err := q.Exec(context.Background())
res, err := q.Exec(user.InjectOrgID(context.Background(), "fake"))
if err != nil {
t.Fatal(err)
}
@ -1549,7 +1550,7 @@ func (statsQuerier) SelectSamples(ctx context.Context, p SelectSampleParams) (it
func TestEngine_Stats(t *testing.T) {
eng := NewEngine(EngineOpts{}, &statsQuerier{})
eng := NewEngine(EngineOpts{}, &statsQuerier{}, NoLimits)
q := eng.Query(LiteralParams{
qs: `{foo="bar"}`,
@ -1558,7 +1559,7 @@ func TestEngine_Stats(t *testing.T) {
direction: logproto.BACKWARD,
limit: 1000,
})
r, err := q.Exec(context.Background())
r, err := q.Exec(user.InjectOrgID(context.Background(), "fake"))
require.NoError(t, err)
require.Equal(t, int64(1), r.Statistics.Store.DecompressedBytes)
}
@ -1621,19 +1622,53 @@ func TestStepEvaluator_Error(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
tc := tc
eng := NewEngine(EngineOpts{}, tc.querier)
eng := NewEngine(EngineOpts{}, tc.querier, NoLimits)
q := eng.Query(LiteralParams{
qs: tc.qs,
start: time.Unix(0, 0),
end: time.Unix(180, 0),
step: 1 * time.Second,
})
_, err := q.Exec(context.Background())
_, err := q.Exec(user.InjectOrgID(context.Background(), "fake"))
require.Equal(t, tc.err, err)
})
}
}
func TestEngine_MaxSeries(t *testing.T) {
eng := NewEngine(EngineOpts{}, getLocalQuerier(100000), &fakeLimits{maxSeries: 1})
for _, test := range []struct {
qs string
direction logproto.Direction
expectLimitErr bool
}{
{`topk(1,rate(({app=~"foo|bar"})[1m]))`, logproto.FORWARD, true},
{`{app="foo"}`, logproto.FORWARD, false},
{`{app="bar"} |= "foo" |~ ".+bar"`, logproto.BACKWARD, false},
{`rate({app="foo"} |~".+bar" [1m])`, logproto.BACKWARD, true},
{`rate({app="foo"}[30s])`, logproto.FORWARD, true},
{`count_over_time({app="foo|bar"} |~".+bar" [1m])`, logproto.BACKWARD, true},
{`avg(count_over_time({app=~"foo|bar"} |~".+bar" [1m]))`, logproto.FORWARD, false},
} {
q := eng.Query(LiteralParams{
qs: test.qs,
start: time.Unix(0, 0),
end: time.Unix(100000, 0),
step: 60 * time.Second,
direction: test.direction,
limit: 1000,
})
_, err := q.Exec(user.InjectOrgID(context.Background(), "fake"))
if test.expectLimitErr {
require.NotNil(t, err)
require.True(t, errors.Is(err, ErrLimit))
return
}
require.Nil(t, err)
}
}
// go test -mod=vendor ./pkg/logql/ -bench=. -benchmem -memprofile memprofile.out -cpuprofile cpuprofile.out
func BenchmarkRangeQuery100000(b *testing.B) {
benchmarkRangeQuery(int64(100000), b)
@ -1653,7 +1688,7 @@ var result promql_parser.Value
func benchmarkRangeQuery(testsize int64, b *testing.B) {
b.ReportAllocs()
eng := NewEngine(EngineOpts{}, getLocalQuerier(testsize))
eng := NewEngine(EngineOpts{}, getLocalQuerier(testsize), NoLimits)
start := time.Unix(0, 0)
end := time.Unix(testsize, 0)
b.ResetTimer()
@ -1692,7 +1727,7 @@ func benchmarkRangeQuery(testsize int64, b *testing.B) {
direction: test.direction,
limit: 1000,
})
res, err := q.Exec(context.Background())
res, err := q.Exec(user.InjectOrgID(context.Background(), "fake"))
if err != nil {
b.Fatal(err)
}

@ -1,6 +1,7 @@
package logql
import (
"errors"
"fmt"
"github.com/prometheus/prometheus/pkg/labels"
@ -8,6 +9,14 @@ import (
"github.com/grafana/loki/pkg/logql/log"
)
// Those errors are useful for comparing error returned by the engine.
// e.g. errors.Is(err,logql.ErrParse) let you know if this is a ast parsing error.
var (
ErrParse = errors.New("failed to parse the log query")
ErrPipeline = errors.New("failed execute pipeline")
ErrLimit = errors.New("limit reached while evaluating the query")
)
// ParseError is what is returned when we failed to parse.
type ParseError struct {
msg string
@ -21,6 +30,11 @@ func (p ParseError) Error() string {
return fmt.Sprintf("parse error at line %d, col %d: %s", p.line, p.col, p.msg)
}
// Is allows to use errors.Is(err,ErrParse) on this error.
func (p ParseError) Is(target error) bool {
return target == ErrParse
}
func newParseError(msg string, line, col int) ParseError {
return ParseError{
msg: msg,
@ -37,12 +51,6 @@ func newStageError(expr Expr, err error) ParseError {
}
}
// IsParseError returns true if the err is a ast parsing error.
func IsParseError(err error) bool {
_, ok := err.(ParseError)
return ok
}
type pipelineError struct {
metric labels.Labels
errorType string
@ -64,8 +72,22 @@ func (e pipelineError) Error() string {
e.errorType, e.metric, e.errorType)
}
// IsPipelineError tells if the error is generated by a Pipeline.
func IsPipelineError(err error) bool {
_, ok := err.(*pipelineError)
return ok
// Is allows to use errors.Is(err,ErrPipeline) on this error.
func (e pipelineError) Is(target error) bool {
return target == ErrPipeline
}
type limitError struct {
error
}
func newSeriesLimitError(limit int) *limitError {
return &limitError{
error: fmt.Errorf("maximum of series (%d) reached for a single query", limit),
}
}
// Is allows to use errors.Is(err,ErrLimit) on this error.
func (e limitError) Is(target error) bool {
return target == ErrLimit
}

@ -0,0 +1,22 @@
package logql
import (
"math"
)
var (
NoLimits = &fakeLimits{maxSeries: math.MaxInt32}
)
// Limits allow the engine to fetch limits for a given users.
type Limits interface {
MaxQuerySeries(userID string) int
}
type fakeLimits struct {
maxSeries int
}
func (f fakeLimits) MaxQuerySeries(userID string) int {
return f.maxSeries
}

@ -57,7 +57,7 @@ func ParseExpr(input string) (expr Expr, err error) {
if r := recover(); r != nil {
var ok bool
if err, ok = r.(error); ok {
if IsParseError(err) {
if errors.Is(err, ErrParse) {
return
}
err = newParseError(err.Error(), 0, 0)

@ -1975,7 +1975,7 @@ func TestIsParseError(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := IsParseError(tt.errFn()); got != tt.want {
if got := errors.Is(tt.errFn(), ErrParse); got != tt.want {
t.Errorf("IsParseError() = %v, want %v", got, tt.want)
}
})

@ -31,16 +31,18 @@ which can then take advantage of our sharded execution model.
type ShardedEngine struct {
timeout time.Duration
downstreamable Downstreamable
limits Limits
metrics *ShardingMetrics
}
// NewShardedEngine constructs a *ShardedEngine
func NewShardedEngine(opts EngineOpts, downstreamable Downstreamable, metrics *ShardingMetrics) *ShardedEngine {
func NewShardedEngine(opts EngineOpts, downstreamable Downstreamable, metrics *ShardingMetrics, limits Limits) *ShardedEngine {
opts.applyDefault()
return &ShardedEngine{
timeout: opts.Timeout,
downstreamable: downstreamable,
metrics: metrics,
limits: limits,
}
}
@ -54,6 +56,7 @@ func (ng *ShardedEngine) Query(p Params, mapped Expr) Query {
parse: func(_ context.Context, _ string) (Expr, error) {
return mapped, nil
},
limits: ng.limits,
}
}

@ -8,6 +8,7 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/logproto"
)
@ -59,8 +60,8 @@ func TestMappingEquivalence(t *testing.T) {
)
opts := EngineOpts{}
regular := NewEngine(opts, q)
sharded := NewShardedEngine(opts, MockDownstreamer{regular}, nilMetrics)
regular := NewEngine(opts, q, NoLimits)
sharded := NewShardedEngine(opts, MockDownstreamer{regular}, nilMetrics, NoLimits)
t.Run(tc.query, func(t *testing.T) {
params := NewLiteralParams(
@ -74,7 +75,7 @@ func TestMappingEquivalence(t *testing.T) {
nil,
)
qry := regular.Query(params)
ctx := context.Background()
ctx := user.InjectOrgID(context.Background(), "fake")
mapper, err := NewShardMapper(shards, nilMetrics)
require.Nil(t, err)

@ -464,7 +464,7 @@ func (t *Loki) initRuler() (_ services.Service, err error) {
return nil, err
}
engine := logql.NewEngine(t.cfg.Querier.Engine, q)
engine := logql.NewEngine(t.cfg.Querier.Engine, q, t.overrides)
t.ruler, err = ruler.NewRuler(
t.cfg.Ruler,

@ -72,7 +72,7 @@ func New(cfg Config, store storage.Store, ingesterQuerier *IngesterQuerier, limi
limits: limits,
}
querier.engine = logql.NewEngine(cfg.Engine, &querier)
querier.engine = logql.NewEngine(cfg.Engine, &querier, limits)
return &querier, nil
}

@ -1,16 +1,26 @@
package queryrange
import (
"context"
"fmt"
"net/http"
"sync"
"time"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/weaveworks/common/httpgrpc"
)
const (
limitErrTmpl = "maximum of series (%d) reached for a single query"
)
// Limits extends the cortex limits interface with support for per tenant splitby parameters
type Limits interface {
queryrange.Limits
QuerySplitDuration(string) time.Duration
MaxQuerySeries(string) int
MaxEntriesLimitPerQuery(string) int
}
@ -71,3 +81,66 @@ func (l cacheKeyLimits) GenerateCacheKey(userID string, r queryrange.Request) st
// a cache key can't be reused when an interval changes
return fmt.Sprintf("%s:%s:%d:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval, split)
}
type seriesLimiter struct {
hashes map[uint64]struct{}
rw sync.RWMutex
buf []byte // buf used for hashing to avoid allocations.
maxSeries int
next queryrange.Handler
}
type seriesLimiterMiddleware int
// newSeriesLimiter creates a new series limiter middleware for use for a single request.
func newSeriesLimiter(maxSeries int) queryrange.Middleware {
return seriesLimiterMiddleware(maxSeries)
}
// Wrap wraps a global handler and returns a per request limited handler.
// The handler returned is thread safe.
func (slm seriesLimiterMiddleware) Wrap(next queryrange.Handler) queryrange.Handler {
return &seriesLimiter{
hashes: make(map[uint64]struct{}),
maxSeries: int(slm),
buf: make([]byte, 0, 1024),
next: next,
}
}
func (sl *seriesLimiter) Do(ctx context.Context, req queryrange.Request) (queryrange.Response, error) {
// no need to fire a request if the limit is already reached.
if sl.isLimitReached() {
return nil, httpgrpc.Errorf(http.StatusBadRequest, limitErrTmpl, sl.maxSeries)
}
res, err := sl.next.Do(ctx, req)
if err != nil {
return res, err
}
promResponse, ok := res.(*LokiPromResponse)
if !ok {
return res, nil
}
if promResponse.Response == nil {
return res, nil
}
sl.rw.Lock()
var hash uint64
for _, s := range promResponse.Response.Data.Result {
lbs := client.FromLabelAdaptersToLabels(s.Labels)
hash, sl.buf = lbs.HashWithoutLabels(sl.buf, []string(nil)...)
sl.hashes[hash] = struct{}{}
}
sl.rw.Unlock()
if sl.isLimitReached() {
return nil, httpgrpc.Errorf(http.StatusBadRequest, limitErrTmpl, sl.maxSeries)
}
return res, nil
}
func (sl *seriesLimiter) isLimitReached() bool {
sl.rw.RLock()
defer sl.rw.RUnlock()
return len(sl.hashes) > sl.maxSeries
}

@ -1,12 +1,24 @@
package queryrange
import (
"context"
"fmt"
"net/http"
"sync"
"testing"
"time"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/cortexproject/cortex/pkg/util"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/marshal"
)
func TestLimits(t *testing.T) {
@ -44,3 +56,92 @@ func TestLimits(t *testing.T) {
cacheKeyLimits{wrapped}.GenerateCacheKey("a", r),
)
}
func Test_seriesLimiter(t *testing.T) {
cfg := testConfig
cfg.SplitQueriesByInterval = time.Hour
cfg.CacheResults = false
// split in 6 with 4 in // max.
tpw, stopper, err := NewTripperware(cfg, util.Logger, fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, chunk.SchemaConfig{}, 0, nil)
if stopper != nil {
defer stopper.Stop()
}
require.NoError(t, err)
lreq := &LokiRequest{
Query: `rate({app="foo"} |= "foo"[1m])`,
Limit: 1000,
Step: 30000, //30sec
StartTs: testTime.Add(-6 * time.Hour),
EndTs: testTime,
Direction: logproto.FORWARD,
Path: "/query_range",
}
ctx := user.InjectOrgID(context.Background(), "1")
req, err := lokiCodec.EncodeRequest(ctx, lreq)
require.NoError(t, err)
req = req.WithContext(ctx)
err = user.InjectOrgIDIntoHTTPRequest(ctx, req)
require.NoError(t, err)
rt, err := newfakeRoundTripper()
require.NoError(t, err)
defer rt.Close()
count, h := promqlResult(matrix)
rt.setHandler(h)
_, err = tpw(rt).RoundTrip(req)
require.NoError(t, err)
require.Equal(t, 6, *count)
// 2 series should not be allowed.
c := new(int)
m := &sync.Mutex{}
h = http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
m.Lock()
defer m.Unlock()
defer func() {
*c++
}()
// first time returns a single series
if *c == 0 {
if err := marshal.WriteQueryResponseJSON(logql.Result{Data: matrix}, rw); err != nil {
panic(err)
}
return
}
// second time returns a different series.
if err := marshal.WriteQueryResponseJSON(logql.Result{
Data: promql.Matrix{
{
Points: []promql.Point{
{
T: toMs(testTime.Add(-4 * time.Hour)),
V: 0.013333333333333334,
},
},
Metric: []labels.Label{
{
Name: "filename",
Value: `/var/hostlog/apport.log`,
},
{
Name: "job",
Value: "anotherjob",
},
},
},
},
}, rw); err != nil {
panic(err)
}
})
rt.setHandler(h)
_, err = tpw(rt).RoundTrip(req)
require.Error(t, err)
require.LessOrEqual(t, *c, 4)
}

@ -24,6 +24,7 @@ func NewQueryShardMiddleware(
minShardingLookback time.Duration,
middlewareMetrics *queryrange.InstrumentMiddlewareMetrics,
shardingMetrics *logql.ShardingMetrics,
limits logql.Limits,
) queryrange.Middleware {
noshards := !hasShards(confs)
@ -38,7 +39,7 @@ func NewQueryShardMiddleware(
}
mapperware := queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler {
return newASTMapperware(confs, next, logger, shardingMetrics)
return newASTMapperware(confs, next, logger, shardingMetrics, limits)
})
return queryrange.MiddlewareFunc(func(next queryrange.Handler) queryrange.Handler {
@ -60,13 +61,14 @@ func newASTMapperware(
next queryrange.Handler,
logger log.Logger,
metrics *logql.ShardingMetrics,
limits logql.Limits,
) *astMapperware {
return &astMapperware{
confs: confs,
logger: log.With(logger, "middleware", "QueryShard.astMapperware"),
next: next,
ng: logql.NewShardedEngine(logql.EngineOpts{}, DownstreamHandler{next}, metrics),
ng: logql.NewShardedEngine(logql.EngineOpts{}, DownstreamHandler{next}, metrics, limits),
metrics: metrics,
}
}

@ -3,6 +3,7 @@ package queryrange
import (
"context"
"fmt"
"math"
"sort"
"sync"
"testing"
@ -146,6 +147,7 @@ func Test_astMapper(t *testing.T) {
handler,
log.NewNopLogger(),
nilShardingMetrics,
fakeLimits{maxSeries: math.MaxInt32},
)
resp, err := mware.Do(context.Background(), defaultReq().WithQuery(`{food="bar"}`))
@ -175,6 +177,7 @@ func Test_ShardingByPass(t *testing.T) {
handler,
log.NewNopLogger(),
nilShardingMetrics,
fakeLimits{maxSeries: math.MaxInt32},
)
_, err := mware.Do(context.Background(), defaultReq().WithQuery(`1+1`))

@ -239,6 +239,7 @@ func NewLogFilterTripperware(
minShardingLookback,
instrumentMetrics, // instrumentation is included in the sharding middleware
shardingMetrics,
limits,
),
)
}
@ -388,6 +389,7 @@ func NewMetricTripperware(
minShardingLookback,
instrumentMetrics, // instrumentation is included in the sharding middleware
shardingMetrics,
limits,
),
)
}

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"io/ioutil"
"math"
"net/http"
"net/http/httptest"
"net/url"
@ -92,7 +93,7 @@ var (
// those tests are mostly for testing the glue between all component and make sure they activate correctly.
func TestMetricsTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util.Logger, fakeLimits{}, chunk.SchemaConfig{}, 0, nil)
tpw, stopper, err := NewTripperware(testConfig, util.Logger, fakeLimits{maxSeries: math.MaxInt32}, chunk.SchemaConfig{}, 0, nil)
if stopper != nil {
defer stopper.Stop()
}
@ -493,6 +494,7 @@ func TestEntriesLimitWithZeroTripperware(t *testing.T) {
type fakeLimits struct {
maxQueryParallelism int
maxEntriesLimitPerQuery int
maxSeries int
splits map[string]time.Duration
}
@ -518,6 +520,10 @@ func (f fakeLimits) MaxEntriesLimitPerQuery(string) int {
return f.maxEntriesLimitPerQuery
}
func (f fakeLimits) MaxQuerySeries(string) int {
return f.maxSeries
}
func (f fakeLimits) MaxCacheFreshness(string) time.Duration {
return 1 * time.Minute
}

@ -83,6 +83,7 @@ func (h *splitByInterval) Process(
parallelism int,
threshold int64,
input []*lokiResult,
userID string,
) ([]queryrange.Response, error) {
var responses []queryrange.Response
ctx, cancel := context.WithCancel(ctx)
@ -102,8 +103,10 @@ func (h *splitByInterval) Process(
p = len(input)
}
// per request wrapped handler for limiting the amount of series.
next := newSeriesLimiter(h.limits.MaxQuerySeries(userID)).Wrap(h.next)
for i := 0; i < p; i++ {
go h.loop(ctx, ch)
go h.loop(ctx, ch, next)
}
for _, x := range input {
@ -134,14 +137,14 @@ func (h *splitByInterval) Process(
return responses, nil
}
func (h *splitByInterval) loop(ctx context.Context, ch <-chan *lokiResult) {
func (h *splitByInterval) loop(ctx context.Context, ch <-chan *lokiResult, next queryrange.Handler) {
for data := range ch {
sp, ctx := opentracing.StartSpanFromContext(ctx, "interval")
data.req.LogToSpan(sp)
resp, err := h.next.Do(ctx, data.req)
resp, err := next.Do(ctx, data.req)
select {
case <-ctx.Done():
@ -203,7 +206,7 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrange.Request) (queryra
})
}
resps, err := h.Process(ctx, h.limits.MaxQueryParallelism(userid), limit, input)
resps, err := h.Process(ctx, h.limits.MaxQueryParallelism(userid), limit, input, userid)
if err != nil {
return nil, err
}

@ -7,6 +7,7 @@ import (
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/logql"
)
@ -30,7 +31,9 @@ func WriteError(err error, w http.ResponseWriter) {
http.Error(w, ErrDeadlineExceeded, http.StatusGatewayTimeout)
case errors.As(err, &queryErr):
http.Error(w, err.Error(), http.StatusBadRequest)
case logql.IsParseError(err) || logql.IsPipelineError(err):
case errors.Is(err, logql.ErrLimit) || errors.Is(err, logql.ErrParse) || errors.Is(err, logql.ErrPipeline):
http.Error(w, err.Error(), http.StatusBadRequest)
case errors.Is(err, user.ErrNoOrgID):
http.Error(w, err.Error(), http.StatusBadRequest)
default:
if grpcErr, ok := httpgrpc.HTTPResponseFromError(err); ok {

@ -12,6 +12,7 @@ import (
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/logql"
)
@ -25,6 +26,7 @@ func Test_writeError(t *testing.T) {
expectedStatus int
}{
{"cancelled", context.Canceled, ErrClientCanceled, StatusClientClosedRequest},
{"orgid", user.ErrNoOrgID, user.ErrNoOrgID.Error(), http.StatusBadRequest},
{"deadline", context.DeadlineExceeded, ErrDeadlineExceeded, http.StatusGatewayTimeout},
{"parse error", logql.ParseError{}, "parse error : ", http.StatusBadRequest},
{"httpgrpc", httpgrpc.Errorf(http.StatusBadRequest, errors.New("foo").Error()), "foo", http.StatusBadRequest},

@ -39,6 +39,7 @@ type Limits struct {
// Querier enforced limits.
MaxChunksPerQuery int `yaml:"max_chunks_per_query"`
MaxQuerySeries int `yaml:"max_query_series"`
MaxQueryLength time.Duration `yaml:"max_query_length"`
MaxQueryParallelism int `yaml:"max_query_parallelism"`
CardinalityLimit int `yaml:"cardinality_limit"`
@ -75,6 +76,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.MaxChunksPerQuery, "store.query-chunk-limit", 2e6, "Maximum number of chunks that can be fetched in a single query.")
f.DurationVar(&l.MaxQueryLength, "store.max-query-length", 0, "Limit to length of chunk store queries, 0 to disable.")
f.IntVar(&l.MaxQuerySeries, "querier.max-query-series", 500, "Limit the maximum of unique series returned by a metric query. When the limit is reached an error is returned.")
f.IntVar(&l.MaxQueryParallelism, "querier.max-query-parallelism", 14, "Maximum number of queries will be scheduled in parallel by the frontend.")
f.IntVar(&l.CardinalityLimit, "store.cardinality-limit", 1e5, "Cardinality limit for index queries.")
f.IntVar(&l.MaxStreamsMatchersPerQuery, "querier.max-streams-matcher-per-query", 1000, "Limit the number of streams matchers per query")
@ -204,6 +206,11 @@ func (o *Overrides) MaxQueryLength(userID string) time.Duration {
return o.getOverridesForUser(userID).MaxQueryLength
}
// MaxQueryLength returns the limit of the series of metric queries.
func (o *Overrides) MaxQuerySeries(userID string) int {
return o.getOverridesForUser(userID).MaxQuerySeries
}
// MaxQueryParallelism returns the limit to the number of sub-queries the
// frontend will process in parallel.
func (o *Overrides) MaxQueryParallelism(userID string) int {

Loading…
Cancel
Save