Refactor/remove global splitby (#5243)

* cleans up internal limits struct, removes unnecessary logic

* moves querier.split-queries-by-interval to limits, simplifies resulting code.

* docs, changelog

* flag supports model.duration instead of time.duration

* lint and adds new error for deprecated field

* reregisters queryrange flags

* comment lint

* ensures we validate queryrange config and simplifies embedded struct validate()

* 0 still disables

* simplify cache key generation and protect against divide by zero
pull/5261/head
Owen Diehl 4 years ago committed by GitHub
parent 3d135e5338
commit 91d837e79c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 6
      docs/sources/configuration/_index.md
  3. 19
      docs/sources/upgrading/_index.md
  4. 27
      pkg/loki/loki.go
  5. 33
      pkg/querier/queryrange/limits.go
  6. 34
      pkg/querier/queryrange/limits_test.go
  7. 18
      pkg/querier/queryrange/queryrangebase/roundtrip.go
  8. 14
      pkg/querier/queryrange/roundtrip.go
  9. 13
      pkg/querier/queryrange/roundtrip_test.go
  10. 10
      pkg/querier/queryrange/split_by_interval_test.go
  11. 4
      pkg/validation/limits.go

@ -1,5 +1,6 @@
## Main
* [5243](https://github.com/grafana/loki/pull/5243) **owen-d**: moves `querier.split-queries-by-interval` to limits code only.
* [5139](https://github.com/grafana/loki/pull/5139) **DylanGuedes**: Drop support for legacy configuration rules format.
* [4911](https://github.com/grafana/loki/pull/4911) **jeschkies**: Support Docker service discovery in Promtail.
* [5107](https://github.com/grafana/loki/pull/5107) **chaudum** Fix bug in fluentd plugin that caused log lines containing non UTF-8 characters to be dropped.

@ -2263,10 +2263,8 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# CLI flag: -frontend.min-sharding-lookback
[min_sharding_lookback: <duration> | default = 0s]
# Split queries by an interval and execute in parallel, 0 disables it. You
# should use in multiple of 24 hours (same as the storage bucketing scheme),
# to avoid queriers downloading and processing the same chunks. This also
# determines how cache keys are chosen when result caching is enabled
# Split queries by an interval and execute in parallel, any value less than zero disables it.
# This also determines how cache keys are chosen when result caching is enabled
# CLI flag: -querier.split-queries-by-interval
[split_queries_by_interval: <duration> | default = 30m]
```

@ -33,6 +33,25 @@ The output is incredibly verbose as it shows the entire internal config struct u
### Loki
#### `querier.split-queries-by-interval` flag migrated yaml path and default value.
The CLI flag `querier.split-queries-by-interval` has changed it's corresponding yaml equivalent from
```yaml
query_range:
split_queries_by_interval: 10m
```
->
```
limits_config:
split_queries_by_interval: 10m
```
Additionally, it has a new default value of `30m` rather than `0`.
This is part of it's migration path from a global configuration to a per-tenant one (still subject to default tenant limits in the `limits_config`).
It keeps it's CLI flag as `querier.split-queries-by-interval`.
#### Dropped support for old Prometheus rules configuration format
Alerting rules previously could be specified in two formats: 1.x format (legacy one, named `v0` internally) and 2.x.

@ -109,7 +109,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.Frontend.RegisterFlags(f)
c.Ruler.RegisterFlags(f)
c.Worker.RegisterFlags(f)
c.registerQueryRangeFlagsWithChangedDefaultValues(f)
c.QueryRange.RegisterFlags(f)
c.RuntimeConfig.RegisterFlags(f)
c.MemberlistKV.RegisterFlags(f)
c.Tracing.RegisterFlags(f)
@ -138,28 +138,6 @@ func (c *Config) registerServerFlagsWithChangedDefaultValues(fs *flag.FlagSet) {
})
}
func (c *Config) registerQueryRangeFlagsWithChangedDefaultValues(fs *flag.FlagSet) {
throwaway := flag.NewFlagSet("throwaway", flag.PanicOnError)
// NB: We can remove this after removing Loki's dependency on Cortex and bringing in the queryrange.Config.
// That will let us change the defaults there rather than include wrapper functions like this one.
// Register to throwaway flags first. Default values are remembered during registration and cannot be changed,
// but we can take values from throwaway flag set and reregister into supplied flags with new default values.
c.QueryRange.RegisterFlags(throwaway)
throwaway.VisitAll(func(f *flag.Flag) {
// Ignore errors when setting new values. We have a test to verify that it works.
switch f.Name {
case "querier.split-queries-by-interval":
_ = f.Value.Set("30m")
case "querier.parallelise-shardable-queries":
_ = f.Value.Set("true")
}
fs.Var(f.Value, f.Name, f.Usage)
})
}
// Clone takes advantage of pass-by-value semantics to return a distinct *Config.
// This is primarily used to parse a different flag set without mutating the original *Config.
func (c *Config) Clone() flagext.Registerer {
@ -222,6 +200,9 @@ func (c *Config) Validate() error {
)
}
}
if err := c.QueryRange.Validate(); err != nil {
return errors.Wrap(err, "invalid query_range config")
}
return nil
}

@ -43,32 +43,10 @@ type Limits interface {
type limits struct {
Limits
splitDuration time.Duration
overrides bool
}
func (l limits) QuerySplitDuration(user string) time.Duration {
if !l.overrides {
return l.splitDuration
}
dur := l.Limits.QuerySplitDuration(user)
if dur == 0 {
return l.splitDuration
}
return dur
}
// WithDefaults will construct a Limits with a default value for QuerySplitDuration when no overrides are present.
func WithDefaultLimits(l Limits, conf queryrangebase.Config) Limits {
res := limits{
Limits: l,
overrides: true,
}
if conf.SplitQueriesByInterval != 0 {
res.splitDuration = conf.SplitQueriesByInterval
}
return res
return l.splitDuration
}
// WithSplitByLimits will construct a Limits with a static split by duration.
@ -84,11 +62,14 @@ type cacheKeyLimits struct {
Limits
}
// GenerateCacheKey will panic if it encounters a 0 split duration. We ensure against this by requiring
// a nonzero split interval when caching is enabled
func (l cacheKeyLimits) GenerateCacheKey(userID string, r queryrangebase.Request) string {
split := l.QuerySplitDuration(userID)
currentInterval := r.GetStart() / int64(split/time.Millisecond)
var currentInterval int64
if denominator := int64(split / time.Millisecond); denominator > 0 {
currentInterval = r.GetStart() / denominator
}
// include both the currentInterval and the split duration in key to ensure
// a cache key can't be reused when an interval changes
return fmt.Sprintf("%s:%s:%d:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval, split)

@ -27,15 +27,12 @@ func TestLimits(t *testing.T) {
splits: map[string]time.Duration{"a": time.Minute},
}
require.Equal(t, l.QuerySplitDuration("a"), time.Minute)
require.Equal(t, l.QuerySplitDuration("b"), time.Duration(0))
wrapped := WithSplitByLimits(l, time.Hour)
wrapped := WithDefaultLimits(l, queryrangebase.Config{
SplitQueriesByInterval: time.Hour,
})
require.Equal(t, wrapped.QuerySplitDuration("a"), time.Minute)
// Test default
require.Equal(t, wrapped.QuerySplitDuration("b"), time.Hour)
// Ensure we override the underlying implementation
require.Equal(t, wrapped.QuerySplitDuration("a"), time.Hour)
r := &LokiRequest{
Query: "qry",
@ -45,17 +42,17 @@ func TestLimits(t *testing.T) {
require.Equal(
t,
fmt.Sprintf("%s:%s:%d:%d:%d", "a", r.GetQuery(), r.GetStep(), r.GetStart()/int64(time.Minute/time.Millisecond), int64(time.Minute)),
fmt.Sprintf("%s:%s:%d:%d:%d", "a", r.GetQuery(), r.GetStep(), r.GetStart()/int64(time.Hour/time.Millisecond), int64(time.Hour)),
cacheKeyLimits{wrapped}.GenerateCacheKey("a", r),
)
}
func Test_seriesLimiter(t *testing.T) {
cfg := testConfig
cfg.SplitQueriesByInterval = time.Hour
cfg.CacheResults = false
// split in 7 with 2 in // max.
tpw, stopper, err := NewTripperware(cfg, util_log.Logger, fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, chunk.SchemaConfig{}, nil)
l := WithSplitByLimits(fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, time.Hour)
tpw, stopper, err := NewTripperware(cfg, util_log.Logger, l, chunk.SchemaConfig{}, nil)
if stopper != nil {
defer stopper.Stop()
}
@ -269,3 +266,20 @@ func Test_MaxQueryLookBack(t *testing.T) {
_, err = tpw(rt).RoundTrip(req)
require.NoError(t, err)
}
func Test_GenerateCacheKey_NoDivideZero(t *testing.T) {
l := cacheKeyLimits{WithSplitByLimits(nil, 0)}
start := time.Now()
r := &LokiRequest{
Query: "qry",
StartTs: start,
Step: int64(time.Minute / time.Millisecond),
}
require.Equal(
t,
fmt.Sprintf("foo:qry:%d:0:0", r.GetStep()),
l.GenerateCacheKey("foo", r),
)
}

@ -54,12 +54,14 @@ var (
// Config for query_range middleware chain.
type Config struct {
// Deprecated: SplitQueriesByInterval will be removed in the next major release
SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval"`
AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
ResultsCacheConfig `yaml:"results_cache"`
CacheResults bool `yaml:"cache_results"`
MaxRetries int `yaml:"max_retries"`
ShardedQueries bool `yaml:"parallelise_shardable_queries"`
AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
ResultsCacheConfig `yaml:"results_cache"`
CacheResults bool `yaml:"cache_results"`
MaxRetries int `yaml:"max_retries"`
ShardedQueries bool `yaml:"parallelise_shardable_queries"`
// List of headers which query_range middleware chain would forward to downstream querier.
ForwardHeaders flagext.StringSlice `yaml:"forward_headers_list"`
}
@ -67,16 +69,18 @@ type Config struct {
// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxRetries, "querier.max-retries-per-request", 5, "Maximum number of retries for a single request; beyond this, the downstream error is returned.")
f.DurationVar(&cfg.SplitQueriesByInterval, "querier.split-queries-by-interval", 0, "Split queries by an interval and execute in parallel, 0 disables it. You should use an a multiple of 24 hours (same as the storage bucketing scheme), to avoid queriers downloading and processing the same chunks. This also determines how cache keys are chosen when result caching is enabled")
f.BoolVar(&cfg.AlignQueriesWithStep, "querier.align-querier-with-step", false, "Mutate incoming queries to align their start and end with their step.")
f.BoolVar(&cfg.CacheResults, "querier.cache-results", false, "Cache query results.")
f.BoolVar(&cfg.ShardedQueries, "querier.parallelise-shardable-queries", false, "Perform query parallelisations based on storage sharding configuration and query ASTs. This feature is supported only by the chunks storage engine.")
f.BoolVar(&cfg.ShardedQueries, "querier.parallelise-shardable-queries", true, "Perform query parallelisations based on storage sharding configuration and query ASTs. This feature is supported only by the chunks storage engine.")
f.Var(&cfg.ForwardHeaders, "frontend.forward-headers-list", "List of headers forwarded by the query Frontend to downstream querier.")
cfg.ResultsCacheConfig.RegisterFlags(f)
}
// Validate validates the config.
func (cfg *Config) Validate() error {
if cfg.SplitQueriesByInterval != 0 {
return errors.New("the yaml flag `split_queries_by_interval` must now be set in the `limits_config` section instead of the `query_range` config section")
}
if cfg.CacheResults {
if err := cfg.ResultsCacheConfig.Validate(); err != nil {
return errors.Wrap(err, "invalid ResultsCache config")

@ -9,7 +9,6 @@ import (
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/weaveworks/common/httpgrpc"
@ -31,16 +30,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.Config.RegisterFlags(f)
}
// Validate validates the config.
func (cfg *Config) Validate() error {
if cfg.CacheResults {
if err := cfg.ResultsCacheConfig.Validate(); err != nil {
return errors.Wrap(err, "invalid ResultsCache config")
}
}
return nil
}
// Stopper gracefully shutdown resources created
type Stopper interface {
Stop()
@ -54,9 +43,6 @@ func NewTripperware(
schema chunk.SchemaConfig,
registerer prometheus.Registerer,
) (queryrangebase.Tripperware, Stopper, error) {
// Ensure that QuerySplitDuration uses configuration defaults.
// This avoids divide by zero errors when determining cache keys where user specific overrides don't exist.
limits = WithDefaultLimits(limits, cfg.Config)
instrumentMetrics := queryrangebase.NewInstrumentMiddlewareMetrics(registerer)
retryMetrics := queryrangebase.NewRetryMiddlewareMetrics(registerer)

@ -33,10 +33,9 @@ import (
var (
testTime = time.Date(2019, 12, 02, 11, 10, 10, 10, time.UTC)
testConfig = Config{queryrangebase.Config{
SplitQueriesByInterval: 4 * time.Hour,
AlignQueriesWithStep: true,
MaxRetries: 3,
CacheResults: true,
AlignQueriesWithStep: true,
MaxRetries: 3,
CacheResults: true,
ResultsCacheConfig: queryrangebase.ResultsCacheConfig{
CacheConfig: cache.Config{
EnableFifoCache: true,
@ -109,7 +108,8 @@ var (
// those tests are mostly for testing the glue between all component and make sure they activate correctly.
func TestMetricsTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxSeries: math.MaxInt32, maxQueryParallelism: 1}, chunk.SchemaConfig{}, nil)
l := WithSplitByLimits(fakeLimits{maxSeries: math.MaxInt32, maxQueryParallelism: 1}, 4*time.Hour)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, chunk.SchemaConfig{}, nil)
if stopper != nil {
defer stopper.Stop()
}
@ -402,7 +402,8 @@ func TestUnhandledPath(t *testing.T) {
}
func TestRegexpParamsSupport(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, chunk.SchemaConfig{}, nil)
l := WithSplitByLimits(fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, 4*time.Hour)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, chunk.SchemaConfig{}, nil)
if stopper != nil {
defer stopper.Stop()
}

@ -581,7 +581,7 @@ func Test_splitByInterval_Do(t *testing.T) {
}, nil
})
l := WithDefaultLimits(fakeLimits{maxQueryParallelism: 1}, queryrangebase.Config{SplitQueriesByInterval: time.Hour})
l := WithSplitByLimits(fakeLimits{maxQueryParallelism: 1}, time.Hour)
split := SplitByIntervalMiddleware(
l,
LokiCodec,
@ -749,7 +749,7 @@ func Test_series_splitByInterval_Do(t *testing.T) {
}, nil
})
l := WithDefaultLimits(fakeLimits{maxQueryParallelism: 1}, queryrangebase.Config{SplitQueriesByInterval: time.Hour})
l := WithSplitByLimits(fakeLimits{maxQueryParallelism: 1}, time.Hour)
split := SplitByIntervalMiddleware(
l,
LokiCodec,
@ -830,7 +830,7 @@ func Test_ExitEarly(t *testing.T) {
}, nil
})
l := WithDefaultLimits(fakeLimits{maxQueryParallelism: 1}, queryrangebase.Config{SplitQueriesByInterval: time.Hour})
l := WithSplitByLimits(fakeLimits{maxQueryParallelism: 1}, time.Hour)
split := SplitByIntervalMiddleware(
l,
LokiCodec,
@ -907,9 +907,7 @@ func Test_DoesntDeadlock(t *testing.T) {
}, nil
})
l := WithDefaultLimits(fakeLimits{
maxQueryParallelism: n,
}, queryrangebase.Config{SplitQueriesByInterval: time.Hour})
l := WithSplitByLimits(fakeLimits{maxQueryParallelism: n}, time.Hour)
split := SplitByIntervalMiddleware(
l,
LokiCodec,

@ -184,6 +184,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
_ = l.PerTenantOverridePeriod.Set("10s")
f.Var(&l.PerTenantOverridePeriod, "limits.per-user-override-period", "Period with this to reload the overrides.")
_ = l.QuerySplitDuration.Set("30m")
f.Var(&l.QuerySplitDuration, "querier.split-queries-by-interval", "Split queries by an interval and execute in parallel, 0 disables it. This also determines how cache keys are chosen when result caching is enabled")
}
// UnmarshalYAML implements the yaml.Unmarshaler interface.
@ -208,6 +211,7 @@ func (l *Limits) UnmarshalYAML(unmarshal func(interface{}) error) error {
// Validate validates that this limits config is valid.
func (l *Limits) Validate() error {
if l.StreamRetention != nil {
for i, rule := range l.StreamRetention {
matchers, err := logql.ParseMatchers(rule.Selector)

Loading…
Cancel
Save