Split parallelism across Period Configs (#7769)

One of things we watch while updating non-TSDB period configs to TSDB
period configs is the difference in query parallelism. TSDB dynamically
shards queries into (potentially) much smaller units of work compared to
the static shard factors uses prior. To account for this, we use much
higher query parallelism configurations with TSDB period configs.

This creates a potential problem when querying across `non-tsdb, tsdb`
period boundaries: we may want a query parallelism of 512 for the tsdb
portion but only 64 for the non-tsdb portion! However, we only had one
limit to specify this per tenant, meaning this would be too high when
querying non-tsdb periods or too low when querying tsdb ones.

This PR

* Introduces `tsdb_max_query_parallelism` (default `512`) to
`limits_config`
* Uses `tsdb_max_query_parallelism` and `max_query_parallelism` limits
to find a better parallelism _per query_ by weighting the two respective
configs by the proportion of each query spent on TSDB or non-TSDB period
configurations.

Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
pull/7779/head
Owen Diehl 3 years ago committed by GitHub
parent c44fb2e882
commit 22089415e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 133
      pkg/querier/queryrange/limits.go
  2. 98
      pkg/querier/queryrange/limits_test.go
  3. 2
      pkg/querier/queryrange/queryrangebase/limits.go
  4. 10
      pkg/querier/queryrange/queryrangebase/results_cache.go
  5. 24
      pkg/querier/queryrange/queryrangebase/results_cache_test.go
  6. 17
      pkg/querier/queryrange/queryrangebase/util.go
  7. 20
      pkg/querier/queryrange/querysharding.go
  8. 6
      pkg/querier/queryrange/querysharding_test.go
  9. 29
      pkg/querier/queryrange/roundtrip.go
  10. 28
      pkg/querier/queryrange/roundtrip_test.go
  11. 8
      pkg/querier/queryrange/split_by_interval.go
  12. 27
      pkg/querier/queryrange/split_by_interval_test.go
  13. 8
      pkg/validation/limits.go

@ -4,11 +4,13 @@ import (
"context"
"fmt"
"net/http"
"sort"
"sync"
"time"
"github.com/go-kit/log/level"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
@ -18,6 +20,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/spanlogger"
"github.com/grafana/loki/pkg/util/validation"
@ -39,6 +42,9 @@ type Limits interface {
MaxQuerySeries(string) int
MaxEntriesLimitPerQuery(string) int
MinShardingLookback(string) time.Duration
// TSDBMaxQueryParallelism returns the limit to the number of split queries the
// frontend will process in parallel for TSDB queries.
TSDBMaxQueryParallelism(string) int
}
type limits struct {
@ -210,16 +216,18 @@ func (sl *seriesLimiter) isLimitReached() bool {
}
type limitedRoundTripper struct {
next http.RoundTripper
limits Limits
configs []config.PeriodConfig
next http.RoundTripper
limits Limits
codec queryrangebase.Codec
middleware queryrangebase.Middleware
}
// NewLimitedRoundTripper creates a new roundtripper that enforces MaxQueryParallelism to the `next` roundtripper across `middlewares`.
func NewLimitedRoundTripper(next http.RoundTripper, codec queryrangebase.Codec, limits Limits, middlewares ...queryrangebase.Middleware) http.RoundTripper {
func NewLimitedRoundTripper(next http.RoundTripper, codec queryrangebase.Codec, limits Limits, configs []config.PeriodConfig, middlewares ...queryrangebase.Middleware) http.RoundTripper {
transport := limitedRoundTripper{
configs: configs,
next: next,
codec: codec,
limits: limits,
@ -272,7 +280,14 @@ func (rt limitedRoundTripper) RoundTrip(r *http.Request) (*http.Response, error)
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
parallelism := validation.SmallestPositiveIntPerTenant(tenantIDs, rt.limits.MaxQueryParallelism)
parallelism := MinWeightedParallelism(
tenantIDs,
rt.configs,
rt.limits,
model.Time(request.GetStart()),
model.Time(request.GetEnd()),
)
if parallelism < 1 {
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, ErrMaxQueryParalellism.Error())
}
@ -332,3 +347,113 @@ func (rt limitedRoundTripper) do(ctx context.Context, r queryrangebase.Request)
return rt.codec.DecodeResponse(ctx, response, r)
}
// WeightedParallelism will calculate the request parallelism to use
// based on the two fields:
// 1) `max_query_parallelism`:
// 2) `tsdb_max_query_parallelism`:
// For instance, if the max_query_parallelism=10,
// tsdb_max_query_parallelism=100, and the request is equally split
// between tsdb and non-tsdb period configs,
// the resulting parallelism will be
// 0.5 * 10 + 0.5 * 100 = 60
func WeightedParallelism(
configs []config.PeriodConfig,
user string,
l Limits,
start, end model.Time,
) int {
// Return first index of desired period configs
i := sort.Search(len(configs), func(i int) bool {
// return true when there is no overlap with query & current
// config because query is in future
// or
// there is overlap with current config
finalOrFuture := i == len(configs)-1 || configs[i].From.After(end)
if finalOrFuture {
return true
}
// qEnd not before start && qStart not after end
overlapCurrent := !end.Before(configs[i].From.Time) && !start.After(configs[i+1].From.Time)
return overlapCurrent
})
// There was no overlapping index. This can only happen when a time
// was requested before the first period config start. In that case, just
// use the first period config. It should error elsewhere.
if i == len(configs) {
i = 0
}
// If start == end, this is an instant query;
// use the appropriate parallelism type for
// the active configuration
if start.Equal(end) {
switch configs[i].IndexType {
case config.TSDBType:
return l.TSDBMaxQueryParallelism(user)
}
return l.MaxQueryParallelism(user)
}
var tsdbDur, otherDur time.Duration
for ; i < len(configs) && configs[i].From.Before(end); i++ {
_, from := minMaxModelTime(start, configs[i].From.Time)
through := end
if i+1 < len(configs) {
through, _ = minMaxModelTime(end, configs[i+1].From.Time)
}
dur := through.Sub(from)
if i+1 < len(configs) && configs[i+1].From.Time.Before(end) {
dur = configs[i+1].From.Time.Sub(from)
}
if ty := configs[i].IndexType; ty == config.TSDBType {
tsdbDur += dur
} else {
otherDur += dur
}
}
totalDur := int(tsdbDur + otherDur)
tsdbMaxQueryParallelism := l.TSDBMaxQueryParallelism(user)
regMaxQueryParallelism := l.MaxQueryParallelism(user)
tsdbPart := int(tsdbDur) * tsdbMaxQueryParallelism / totalDur
regPart := int(otherDur) * regMaxQueryParallelism / totalDur
if combined := regPart + tsdbPart; combined > 0 {
return combined
}
// As long as the actual config is not zero,
// ensure at least 1 parallelism to account for integer division
// in unlikely edge cases such as two configs with parallelism of 1
// being rounded down to zero
if (tsdbMaxQueryParallelism > 0 && tsdbDur > 0) || (regMaxQueryParallelism > 0 && otherDur > 0) {
return 1
}
return 0
}
func minMaxModelTime(a, b model.Time) (min, max model.Time) {
if a.Before(b) {
return a, b
}
return b, a
}
func MinWeightedParallelism(tenantIDs []string, configs []config.PeriodConfig, l Limits, start, end model.Time) int {
return validation.SmallestPositiveIntPerTenant(tenantIDs, func(user string) int {
return WeightedParallelism(
configs,
user,
l,
start,
end,
)
})
}

@ -8,11 +8,13 @@ import (
"testing"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
"go.uber.org/atomic"
"gopkg.in/yaml.v2"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logqlmodel"
@ -52,7 +54,9 @@ func Test_seriesLimiter(t *testing.T) {
cfg.CacheResults = false
// split in 7 with 2 in // max.
l := WithSplitByLimits(fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, time.Hour)
tpw, stopper, err := NewTripperware(cfg, util_log.Logger, l, config.SchemaConfig{}, nil, false, nil)
tpw, stopper, err := NewTripperware(cfg, util_log.Logger, l, config.SchemaConfig{
Configs: testSchemas,
}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
@ -157,6 +161,7 @@ func Test_MaxQueryParallelism(t *testing.T) {
require.Nil(t, err)
_, _ = NewLimitedRoundTripper(f, LokiCodec, fakeLimits{maxQueryParallelism: maxQueryParallelism},
testSchemas,
queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler {
return queryrangebase.HandlerFunc(func(c context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
var wg sync.WaitGroup
@ -191,6 +196,7 @@ func Test_MaxQueryParallelismLateScheduling(t *testing.T) {
require.Nil(t, err)
_, _ = NewLimitedRoundTripper(f, LokiCodec, fakeLimits{maxQueryParallelism: maxQueryParallelism},
testSchemas,
queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler {
return queryrangebase.HandlerFunc(func(c context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
for i := 0; i < 10; i++ {
@ -219,6 +225,7 @@ func Test_MaxQueryParallelismDisable(t *testing.T) {
require.Nil(t, err)
_, err = NewLimitedRoundTripper(f, LokiCodec, fakeLimits{maxQueryParallelism: maxQueryParallelism},
testSchemas,
queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler {
return queryrangebase.HandlerFunc(func(c context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
for i := 0; i < 10; i++ {
@ -237,7 +244,9 @@ func Test_MaxQueryLookBack(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{
maxQueryLookback: 1 * time.Hour,
maxQueryParallelism: 1,
}, config.SchemaConfig{}, nil, false, nil)
}, config.SchemaConfig{
Configs: testSchemas,
}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
@ -282,3 +291,88 @@ func Test_GenerateCacheKey_NoDivideZero(t *testing.T) {
l.GenerateCacheKey(context.Background(), "foo", r),
)
}
func Test_WeightedParallelism(t *testing.T) {
limits := &fakeLimits{
tsdbMaxQueryParallelism: 100,
maxQueryParallelism: 10,
}
for _, cfgs := range []struct {
desc string
periods string
}{
{
desc: "end configs",
periods: `
- from: "2022-01-01"
store: boltdb-shipper
object_store: gcs
schema: v12
- from: "2022-01-02"
store: tsdb
object_store: gcs
schema: v12
`,
},
{
// Add another test that wraps the tested period configs with other unused configs
// to ensure we bounds-test properly
desc: "middle configs",
periods: `
- from: "2021-01-01"
store: boltdb-shipper
object_store: gcs
schema: v12
- from: "2022-01-01"
store: boltdb-shipper
object_store: gcs
schema: v12
- from: "2022-01-02"
store: tsdb
object_store: gcs
schema: v12
- from: "2023-01-02"
store: tsdb
object_store: gcs
schema: v12
`,
},
} {
var confs []config.PeriodConfig
require.Nil(t, yaml.Unmarshal([]byte(cfgs.periods), &confs))
parsed, err := time.Parse("2006-01-02", "2022-01-02")
borderTime := model.TimeFromUnix(parsed.Unix())
require.Nil(t, err)
for _, tc := range []struct {
desc string
start, end model.Time
exp int
}{
{
desc: "50% each",
start: borderTime.Add(-time.Hour),
end: borderTime.Add(time.Hour),
exp: 55,
},
{
desc: "75/25 split",
start: borderTime.Add(-3 * time.Hour),
end: borderTime.Add(time.Hour),
exp: 32,
},
{
desc: "start==end",
start: borderTime.Add(time.Hour),
end: borderTime.Add(time.Hour),
exp: 100,
},
} {
t.Run(cfgs.desc+tc.desc, func(t *testing.T) {
require.Equal(t, tc.exp, WeightedParallelism(confs, "fake", limits, tc.start, tc.end))
})
}
}
}

@ -15,7 +15,7 @@ type Limits interface {
// MaxQueryParallelism returns the limit to the number of split queries the
// frontend will process in parallel.
MaxQueryParallelism(string) int
MaxQueryParallelism(tenant string) int
// MaxCacheFreshness returns the period after which results are cacheable,
// to prevent caching of very recent results.

@ -163,6 +163,7 @@ type resultsCache struct {
merger Merger
cacheGenNumberLoader CacheGenNumberLoader
shouldCache ShouldCacheFn
parallelismForReq func(tenantIDs []string, r Request) int
retentionEnabled bool
metrics *ResultsCacheMetrics
}
@ -182,6 +183,7 @@ func NewResultsCacheMiddleware(
extractor Extractor,
cacheGenNumberLoader CacheGenNumberLoader,
shouldCache ShouldCacheFn,
parallelismForReq func(tenantIDs []string, r Request) int,
retentionEnabled bool,
metrics *ResultsCacheMetrics,
) (Middleware, error) {
@ -201,6 +203,7 @@ func NewResultsCacheMiddleware(
splitter: splitter,
cacheGenNumberLoader: cacheGenNumberLoader,
shouldCache: shouldCache,
parallelismForReq: parallelismForReq,
retentionEnabled: retentionEnabled,
metrics: metrics,
}
@ -402,7 +405,12 @@ func (s resultsCache) handleHit(ctx context.Context, r Request, extents []Extent
return response, nil, err
}
reqResps, err = DoRequests(ctx, s.next, requests, s.limits)
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
reqResps, err = DoRequests(ctx, s.next, requests, s.parallelismForReq(tenantIDs, r))
if err != nil {
return nil, nil, err
}

@ -727,10 +727,11 @@ func TestHandleHit(t *testing.T) {
} {
t.Run(tc.name, func(t *testing.T) {
sut := resultsCache{
extractor: PrometheusResponseExtractor{},
minCacheExtent: 10,
limits: mockLimits{},
merger: PrometheusCodec,
extractor: PrometheusResponseExtractor{},
minCacheExtent: 10,
limits: mockLimits{},
merger: PrometheusCodec,
parallelismForReq: func(tenantIDs []string, r Request) int { return 1 },
next: HandlerFunc(func(_ context.Context, req Request) (Response, error) {
return mkAPIResponse(req.GetStart(), req.GetEnd(), req.GetStep()), nil
}),
@ -765,6 +766,9 @@ func TestResultsCache(t *testing.T) {
PrometheusResponseExtractor{},
nil,
nil,
func(tenantIDs []string, r Request) int {
return mockLimits{}.MaxQueryParallelism("fake")
},
false,
nil,
)
@ -808,6 +812,9 @@ func TestResultsCacheRecent(t *testing.T) {
PrometheusResponseExtractor{},
nil,
nil,
func(tenantIDs []string, r Request) int {
return mockLimits{}.MaxQueryParallelism("fake")
},
false,
nil,
)
@ -873,6 +880,9 @@ func TestResultsCacheMaxFreshness(t *testing.T) {
PrometheusResponseExtractor{},
nil,
nil,
func(tenantIDs []string, r Request) int {
return tc.fakeLimits.MaxQueryParallelism("fake")
},
false,
nil,
)
@ -913,6 +923,9 @@ func Test_resultsCache_MissingData(t *testing.T) {
PrometheusResponseExtractor{},
nil,
nil,
func(tenantIDs []string, r Request) int {
return mockLimits{}.MaxQueryParallelism("fake")
},
false,
nil,
)
@ -1025,6 +1038,9 @@ func TestResultsCacheShouldCacheFunc(t *testing.T) {
PrometheusResponseExtractor{},
nil,
tc.shouldCache,
func(tenantIDs []string, r Request) int {
return mockLimits{}.MaxQueryParallelism("fake")
},
false,
nil,
)

@ -2,13 +2,6 @@ package queryrangebase
import (
"context"
"net/http"
"github.com/weaveworks/common/httpgrpc"
"github.com/grafana/dskit/tenant"
"github.com/grafana/loki/pkg/util/validation"
)
// RequestResponse contains a request response and the respective request that was used.
@ -17,13 +10,8 @@ type RequestResponse struct {
Response Response
}
// DoRequests executes a list of requests in parallel. The limits parameters is used to limit parallelism per single request.
func DoRequests(ctx context.Context, downstream Handler, reqs []Request, limits Limits) ([]RequestResponse, error) {
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
// DoRequests executes a list of requests in parallel.
func DoRequests(ctx context.Context, downstream Handler, reqs []Request, parallelism int) ([]RequestResponse, error) {
// If one of the requests fail, we want to be able to cancel the rest of them.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -38,7 +26,6 @@ func DoRequests(ctx context.Context, downstream Handler, reqs []Request, limits
}()
respChan, errChan := make(chan RequestResponse), make(chan error)
parallelism := validation.SmallestPositiveIntPerTenant(tenantIDs, limits.MaxQueryParallelism)
if parallelism > len(reqs) {
parallelism = len(reqs)
}

@ -9,6 +9,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql/parser"
"github.com/weaveworks/common/httpgrpc"
@ -103,14 +104,13 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (que
if err != nil {
return nil, err
}
queryParallelism := validation.SmallestPositiveIntPerTenant(tenants, ast.limits.MaxQueryParallelism)
resolver, ok := shardResolverForConf(
ctx,
conf,
ast.ng.Opts().MaxLookBackPeriod,
ast.logger,
queryParallelism,
MinWeightedParallelism(tenants, ast.confs, ast.limits, model.Time(r.GetStart()), model.Time(r.GetEnd())),
r,
ast.next,
)
@ -292,7 +292,7 @@ func NewSeriesQueryShardMiddleware(
confs ShardingConfigs,
middlewareMetrics *queryrangebase.InstrumentMiddlewareMetrics,
shardingMetrics *logql.MapperMetrics,
limits queryrangebase.Limits,
limits Limits,
merger queryrangebase.Merger,
) queryrangebase.Middleware {
noshards := !hasShards(confs)
@ -324,7 +324,7 @@ type seriesShardingHandler struct {
logger log.Logger
next queryrangebase.Handler
metrics *logql.MapperMetrics
limits queryrangebase.Limits
limits Limits
merger queryrangebase.Merger
}
@ -353,7 +353,17 @@ func (ss *seriesShardingHandler) Do(ctx context.Context, r queryrangebase.Reques
}.String()}
requests = append(requests, &shardedRequest)
}
requestResponses, err := queryrangebase.DoRequests(ctx, ss.next, requests, ss.limits)
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
requestResponses, err := queryrangebase.DoRequests(
ctx,
ss.next,
requests,
MinWeightedParallelism(tenantIDs, ss.confs, ss.limits, model.Time(req.GetStart()), model.Time(req.GetEnd())),
)
if err != nil {
return nil, err
}

@ -261,10 +261,10 @@ func Test_InstantSharding(t *testing.T) {
called := 0
shards := []string{}
cpyPeriodConf := testSchemas[0]
cpyPeriodConf.RowShards = 3
sharding := NewQueryShardMiddleware(log.NewNopLogger(), ShardingConfigs{
config.PeriodConfig{
RowShards: 3,
},
cpyPeriodConf,
}, queryrangebase.NewInstrumentMiddlewareMetrics(nil),
nilShardingMetrics,
fakeLimits{

@ -8,6 +8,7 @@ import (
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/weaveworks/common/httpgrpc"
@ -83,7 +84,7 @@ func NewTripperware(
return nil, nil, err
}
labelsTripperware, err := NewLabelsTripperware(cfg, log, limits, LokiCodec, metrics)
labelsTripperware, err := NewLabelsTripperware(cfg, log, limits, LokiCodec, metrics, schema)
if err != nil {
return nil, nil, err
}
@ -259,7 +260,7 @@ func NewLogFilterTripperware(
StatsCollectorMiddleware(),
NewLimitsMiddleware(limits),
queryrangebase.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics),
SplitByIntervalMiddleware(limits, codec, splitByTime, metrics.SplitByMetrics),
SplitByIntervalMiddleware(schema.Configs, limits, codec, splitByTime, metrics.SplitByMetrics),
}
if cfg.CacheResults {
@ -301,7 +302,7 @@ func NewLogFilterTripperware(
return func(next http.RoundTripper) http.RoundTripper {
if len(queryRangeMiddleware) > 0 {
return NewLimitedRoundTripper(next, codec, limits, queryRangeMiddleware...)
return NewLimitedRoundTripper(next, codec, limits, schema.Configs, queryRangeMiddleware...)
}
return next
}, nil
@ -323,7 +324,7 @@ func NewSeriesTripperware(
// The Series API needs to pull one chunk per series to extract the label set, which is much cheaper than iterating through all matching chunks.
// Force a 24 hours split by for series API, this will be more efficient with our static daily bucket storage.
// This would avoid queriers downloading chunks for same series over and over again for serving smaller queries.
SplitByIntervalMiddleware(WithSplitByLimits(limits, 24*time.Hour), codec, splitByTime, metrics.SplitByMetrics),
SplitByIntervalMiddleware(schema.Configs, WithSplitByLimits(limits, 24*time.Hour), codec, splitByTime, metrics.SplitByMetrics),
}
if cfg.MaxRetries > 0 {
@ -348,7 +349,7 @@ func NewSeriesTripperware(
return func(next http.RoundTripper) http.RoundTripper {
if len(queryRangeMiddleware) > 0 {
return NewLimitedRoundTripper(next, codec, limits, queryRangeMiddleware...)
return NewLimitedRoundTripper(next, codec, limits, schema.Configs, queryRangeMiddleware...)
}
return next
}, nil
@ -361,6 +362,7 @@ func NewLabelsTripperware(
limits Limits,
codec queryrangebase.Codec,
metrics *Metrics,
schema config.SchemaConfig,
) (queryrangebase.Tripperware, error) {
queryRangeMiddleware := []queryrangebase.Middleware{
StatsCollectorMiddleware(),
@ -368,7 +370,7 @@ func NewLabelsTripperware(
queryrangebase.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics),
// Force a 24 hours split by for labels API, this will be more efficient with our static daily bucket storage.
// This is because the labels API is an index-only operation.
SplitByIntervalMiddleware(WithSplitByLimits(limits, 24*time.Hour), codec, splitByTime, metrics.SplitByMetrics),
SplitByIntervalMiddleware(schema.Configs, WithSplitByLimits(limits, 24*time.Hour), codec, splitByTime, metrics.SplitByMetrics),
}
if cfg.MaxRetries > 0 {
@ -413,7 +415,7 @@ func NewMetricTripperware(
queryRangeMiddleware = append(
queryRangeMiddleware,
queryrangebase.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics),
SplitByIntervalMiddleware(limits, codec, splitMetricByTime, metrics.SplitByMetrics),
SplitByIntervalMiddleware(schema.Configs, limits, codec, splitMetricByTime, metrics.SplitByMetrics),
)
cacheKey := cacheKeyLimits{limits, cfg.Transformer}
@ -429,6 +431,15 @@ func NewMetricTripperware(
func(r queryrangebase.Request) bool {
return !r.GetCachingOptions().Disabled
},
func(tenantIDs []string, r queryrangebase.Request) int {
return MinWeightedParallelism(
tenantIDs,
schema.Configs,
limits,
model.Time(r.GetStart()),
model.Time(r.GetEnd()),
)
},
retentionEnabled,
metrics.ResultsCacheMetrics,
)
@ -465,7 +476,7 @@ func NewMetricTripperware(
return func(next http.RoundTripper) http.RoundTripper {
// Finally, if the user selected any query range middleware, stitch it in.
if len(queryRangeMiddleware) > 0 {
rt := NewLimitedRoundTripper(next, codec, limits, queryRangeMiddleware...)
rt := NewLimitedRoundTripper(next, codec, limits, schema.Configs, queryRangeMiddleware...)
return queryrangebase.RoundTripFunc(func(r *http.Request) (*http.Response, error) {
if !strings.HasSuffix(r.URL.Path, "/query_range") {
return next.RoundTrip(r)
@ -511,7 +522,7 @@ func NewInstantMetricTripperware(
return func(next http.RoundTripper) http.RoundTripper {
if len(queryRangeMiddleware) > 0 {
return NewLimitedRoundTripper(next, codec, limits, queryRangeMiddleware...)
return NewLimitedRoundTripper(next, codec, limits, schema.Configs, queryRangeMiddleware...)
}
return next
}, nil

@ -110,7 +110,9 @@ var (
// those tests are mostly for testing the glue between all component and make sure they activate correctly.
func TestMetricsTripperware(t *testing.T) {
l := WithSplitByLimits(fakeLimits{maxSeries: math.MaxInt32, maxQueryParallelism: 1}, 4*time.Hour)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, config.SchemaConfig{}, nil, false, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, config.SchemaConfig{
Configs: testSchemas,
}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
@ -173,7 +175,7 @@ func TestMetricsTripperware(t *testing.T) {
}
func TestLogFilterTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{}, nil, false, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
@ -222,7 +224,7 @@ func TestLogFilterTripperware(t *testing.T) {
func TestInstantQueryTripperware(t *testing.T) {
testShardingConfig := testConfig
testShardingConfig.ShardedQueries = true
tpw, stopper, err := NewTripperware(testShardingConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{}, nil, false, nil)
tpw, stopper, err := NewTripperware(testShardingConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
@ -258,7 +260,7 @@ func TestInstantQueryTripperware(t *testing.T) {
}
func TestSeriesTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{}, nil, false, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
@ -299,7 +301,7 @@ func TestSeriesTripperware(t *testing.T) {
}
func TestLabelsTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{}, nil, false, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
@ -345,7 +347,7 @@ func TestLabelsTripperware(t *testing.T) {
}
func TestLogNoFilter(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, config.SchemaConfig{}, nil, false, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
@ -374,14 +376,13 @@ func TestLogNoFilter(t *testing.T) {
count, h := promqlResult(streams)
rt.setHandler(h)
_, err = tpw(rt).RoundTrip(req)
// fake round tripper is not called because we send "limited" queries to log tripperware
require.Equal(t, 0, *count)
require.Error(t, err)
require.Equal(t, 1, *count)
require.Nil(t, err)
}
func TestRegexpParamsSupport(t *testing.T) {
l := WithSplitByLimits(fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, 4*time.Hour)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, config.SchemaConfig{}, nil, false, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, config.SchemaConfig{Configs: testSchemas}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
@ -464,7 +465,7 @@ func TestPostQueries(t *testing.T) {
}
func TestEntriesLimitsTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxEntriesLimitPerQuery: 5000}, config.SchemaConfig{}, nil, false, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxEntriesLimitPerQuery: 5000}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
@ -568,6 +569,7 @@ func Test_getOperation(t *testing.T) {
type fakeLimits struct {
maxQueryLength time.Duration
maxQueryParallelism int
tsdbMaxQueryParallelism int
maxQueryLookback time.Duration
maxEntriesLimitPerQuery int
maxSeries int
@ -594,6 +596,10 @@ func (f fakeLimits) MaxQueryParallelism(string) int {
return f.maxQueryParallelism
}
func (f fakeLimits) TSDBMaxQueryParallelism(string) int {
return f.tsdbMaxQueryParallelism
}
func (f fakeLimits) MaxEntriesLimitPerQuery(string) int {
return f.maxEntriesLimitPerQuery
}

@ -9,6 +9,7 @@ import (
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/weaveworks/common/httpgrpc"
"github.com/grafana/dskit/tenant"
@ -16,6 +17,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/validation"
)
@ -46,6 +48,7 @@ func NewSplitByMetrics(r prometheus.Registerer) *SplitByMetrics {
}
type splitByInterval struct {
configs []config.PeriodConfig
next queryrangebase.Handler
limits Limits
merger queryrangebase.Merger
@ -56,9 +59,10 @@ type splitByInterval struct {
type Splitter func(req queryrangebase.Request, interval time.Duration) ([]queryrangebase.Request, error)
// SplitByIntervalMiddleware creates a new Middleware that splits log requests by a given interval.
func SplitByIntervalMiddleware(limits Limits, merger queryrangebase.Merger, splitter Splitter, metrics *SplitByMetrics) queryrangebase.Middleware {
func SplitByIntervalMiddleware(configs []config.PeriodConfig, limits Limits, merger queryrangebase.Merger, splitter Splitter, metrics *SplitByMetrics) queryrangebase.Middleware {
return queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler {
return &splitByInterval{
configs: configs,
next: next,
limits: limits,
merger: merger,
@ -217,7 +221,7 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrangebase.Request) (que
}
maxSeries := validation.SmallestPositiveIntPerTenant(tenantIDs, h.limits.MaxQuerySeries)
maxParallelism := validation.SmallestPositiveIntPerTenant(tenantIDs, h.limits.MaxQueryParallelism)
maxParallelism := MinWeightedParallelism(tenantIDs, h.configs, h.limits, model.Time(r.GetStart()), model.Time(r.GetEnd()))
resps, err := h.Process(ctx, maxParallelism, limit, input, maxSeries)
if err != nil {
return nil, err

@ -11,16 +11,32 @@ import (
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"gopkg.in/yaml.v2"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/storage/config"
)
var nilMetrics = NewSplitByMetrics(nil)
var testSchemas = func() []config.PeriodConfig {
confS := `
- from: "1950-01-01"
store: boltdb-shipper
object_store: gcs
schema: v12
`
var confs []config.PeriodConfig
if err := yaml.Unmarshal([]byte(confS), &confs); err != nil {
panic(err)
}
return confs
}()
func Test_splitQuery(t *testing.T) {
buildLokiRequest := func(start, end time.Time) queryrangebase.Request {
return &LokiRequest{
@ -572,6 +588,7 @@ func Test_splitMetricQuery(t *testing.T) {
}
require.Equal(t, tc.expected, splits)
})
}
}
@ -599,6 +616,7 @@ func Test_splitByInterval_Do(t *testing.T) {
l := WithSplitByLimits(fakeLimits{maxQueryParallelism: 1}, time.Hour)
split := SplitByIntervalMiddleware(
testSchemas,
l,
LokiCodec,
splitByTime,
@ -771,6 +789,7 @@ func Test_series_splitByInterval_Do(t *testing.T) {
l := WithSplitByLimits(fakeLimits{maxQueryParallelism: 1}, time.Hour)
split := SplitByIntervalMiddleware(
testSchemas,
l,
LokiCodec,
splitByTime,
@ -851,6 +870,7 @@ func Test_ExitEarly(t *testing.T) {
l := WithSplitByLimits(fakeLimits{maxQueryParallelism: 1}, time.Hour)
split := SplitByIntervalMiddleware(
testSchemas,
l,
LokiCodec,
splitByTime,
@ -932,6 +952,7 @@ func Test_DoesntDeadlock(t *testing.T) {
l := WithSplitByLimits(fakeLimits{maxQueryParallelism: n}, time.Hour)
split := SplitByIntervalMiddleware(
testSchemas,
l,
LokiCodec,
splitByTime,

@ -84,6 +84,7 @@ type Limits struct {
MaxQueryLookback model.Duration `yaml:"max_query_lookback" json:"max_query_lookback"`
MaxQueryLength model.Duration `yaml:"max_query_length" json:"max_query_length"`
MaxQueryParallelism int `yaml:"max_query_parallelism" json:"max_query_parallelism"`
TSDBMaxQueryParallelism int `yaml:"tsdb_max_query_parallelism" json:"tsdb_max_query_parallelism"`
CardinalityLimit int `yaml:"cardinality_limit" json:"cardinality_limit"`
MaxStreamsMatchersPerQuery int `yaml:"max_streams_matchers_per_query" json:"max_streams_matchers_per_query"`
MaxConcurrentTailRequests int `yaml:"max_concurrent_tail_requests" json:"max_concurrent_tail_requests"`
@ -203,6 +204,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
_ = l.MaxQueryLookback.Set("0s")
f.Var(&l.MaxQueryLookback, "querier.max-query-lookback", "Limit how long back data (series and metadata) can be queried, up until <lookback> duration ago. This limit is enforced in the query-frontend, querier and ruler. If the requested time range is outside the allowed range, the request will not fail but will be manipulated to only query data within the allowed time range. 0 to disable.")
f.IntVar(&l.MaxQueryParallelism, "querier.max-query-parallelism", 32, "Maximum number of queries will be scheduled in parallel by the frontend.")
f.IntVar(&l.TSDBMaxQueryParallelism, "querier.tsdb-max-query-parallelism", 512, "Maximum number of queries will be scheduled in parallel by the frontend for TSDB schemas.")
f.IntVar(&l.CardinalityLimit, "store.cardinality-limit", 1e5, "Cardinality limit for index queries.")
f.IntVar(&l.MaxStreamsMatchersPerQuery, "querier.max-streams-matcher-per-query", 1000, "Limit the number of streams matchers per query")
f.IntVar(&l.MaxConcurrentTailRequests, "querier.max-concurrent-tail-requests", 10, "Limit the number of concurrent tail requests")
@ -423,6 +425,12 @@ func (o *Overrides) QueryReadyIndexNumDays(userID string) int {
return o.getOverridesForUser(userID).QueryReadyIndexNumDays
}
// TSDBMaxQueryParallelism returns the limit to the number of sub-queries the
// frontend will process in parallel for TSDB schemas.
func (o *Overrides) TSDBMaxQueryParallelism(userID string) int {
return o.getOverridesForUser(userID).TSDBMaxQueryParallelism
}
// MaxQueryParallelism returns the limit to the number of sub-queries the
// frontend will process in parallel.
func (o *Overrides) MaxQueryParallelism(userID string) int {

Loading…
Cancel
Save