From bcd03150c91e62575816e6f58fb8f7cc0e255707 Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Thu, 11 Jan 2024 15:56:29 +0200 Subject: [PATCH] Query-frontend: customisable query splitting for queries overlapping `query_ingester_within` window (#11535) **What this PR does / why we need it**: The config option `query_ingesters_within` defines the window during which logs _could_ be present on ingesters, and as such queriers will send queries to ingesters instead. `split_queries_by_interval` is defined to split queries into subqueries for increased parallelism. Aggressive query splitting within the `query_ingesters_within` window can result in overloading ingesters with unnecessarily large numbers of subqueries, which perversely can impact writes. `query_ingesters_within` is set to 3h by default. In Grafana Cloud Logs we set `split_queries_by_interval` as low as 15m (defaults to 1h), which would result in result in 3*60/15=12 requests. Every querier queries every ingester during this window, so that's 12 requests _per ingester per query_ which has the `query_ingesters_within` window in its time range _(i.e. a query from now to now-7d would include the `query_ingesters_within` window as well, now-3h to now-7d would not)_. However, we _do_ want to split queries so an ingester won't have to handle a query for a full `query_ingesters_within` window - this could involve a large amount of data. To account for this, this PR introduces a new option `split_ingester_queries_by_interval` on the query-frontend; this setting is disabled by default. ![image](https://github.com/grafana/loki/assets/373762/2e671bd8-9e8d-4bf3-addf-bebcfc25e8d7) --- CHANGELOG.md | 1 + docs/sources/configure/_index.md | 6 + pkg/loki/modules.go | 13 + pkg/querier/queryrange/limits/definitions.go | 1 + pkg/querier/queryrange/limits_test.go | 4 +- pkg/querier/queryrange/roundtrip.go | 115 +-- pkg/querier/queryrange/roundtrip_test.go | 51 +- pkg/querier/queryrange/split_by_interval.go | 165 +--- .../queryrange/split_by_interval_test.go | 844 +++++++++++++----- pkg/querier/queryrange/splitters.go | 297 ++++++ pkg/util/config.go | 8 + pkg/util/time.go | 4 +- pkg/validation/limits.go | 10 + 13 files changed, 1050 insertions(+), 469 deletions(-) create mode 100644 pkg/querier/queryrange/splitters.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 612b70ab36..f9d2d38fbb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,6 +46,7 @@ * [11539](https://github.com/grafana/loki/pull/11539) **kaviraj,ashwanthgoli** Support caching /series and /labels query results * [11545](https://github.com/grafana/loki/pull/11545) **dannykopping** Force correct memcached timeout when fetching chunks. * [11589](https://github.com/grafana/loki/pull/11589) **ashwanthgoli** Results Cache: Adds `query_length_served` cache stat to measure the length of the query served from cache. +* [11535](https://github.com/grafana/loki/pull/11535) **dannykopping** Query Frontend: Allow customisable splitting of queries which overlap the `query_ingester_within` window to reduce query pressure on ingesters. * [11654](https://github.com/grafana/loki/pull/11654) **dannykopping** Cache: atomically check background cache size limit correctly. ##### Fixes diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index e2185c1947..9bf65788c8 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -2884,6 +2884,12 @@ The `limits_config` block configures global and per-tenant limits in Loki. # CLI flag: -querier.split-metadata-queries-by-interval [split_metadata_queries_by_interval: | default = 1d] +# Interval to use for time-based splitting when a request is within the +# `query_ingesters_within` window; defaults to `split-queries-by-interval` by +# setting to 0. +# CLI flag: -querier.split-ingester-queries-by-interval +[split_ingester_queries_by_interval: | default = 0s] + # Limit queries that can be sharded. Queries within the time range of now and # now minus this sharding lookback are not sharded. The default value of 0s # disables the lookback, causing sharding of all queries at all times. diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 1342a105f3..8282098c85 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -800,12 +800,25 @@ func (disabledShuffleShardingLimits) MaxQueriersPerUser(_ string) uint { return func (disabledShuffleShardingLimits) MaxQueryCapacity(_ string) float64 { return 0 } +// ingesterQueryOptions exists simply to avoid dependency cycles when using querier.Config directly in queryrange.NewMiddleware +type ingesterQueryOptions struct { + querier.Config +} + +func (i ingesterQueryOptions) QueryStoreOnly() bool { + return i.Config.QueryStoreOnly +} +func (i ingesterQueryOptions) QueryIngestersWithin() time.Duration { + return i.Config.QueryIngestersWithin +} + func (t *Loki) initQueryFrontendMiddleware() (_ services.Service, err error) { level.Debug(util_log.Logger).Log("msg", "initializing query frontend tripperware") middleware, stopper, err := queryrange.NewMiddleware( t.Cfg.QueryRange, t.Cfg.Querier.Engine, + ingesterQueryOptions{t.Cfg.Querier}, util_log.Logger, t.Overrides, t.Cfg.SchemaConfig, diff --git a/pkg/querier/queryrange/limits/definitions.go b/pkg/querier/queryrange/limits/definitions.go index bd84e144fa..57b2e03c66 100644 --- a/pkg/querier/queryrange/limits/definitions.go +++ b/pkg/querier/queryrange/limits/definitions.go @@ -15,6 +15,7 @@ type Limits interface { logql.Limits QuerySplitDuration(string) time.Duration MetadataQuerySplitDuration(string) time.Duration + IngesterQuerySplitDuration(string) time.Duration MaxQuerySeries(context.Context, string) int MaxEntriesLimitPerQuery(context.Context, string) int MinShardingLookback(string) time.Duration diff --git a/pkg/querier/queryrange/limits_test.go b/pkg/querier/queryrange/limits_test.go index 3b82c1dc9e..0de342e426 100644 --- a/pkg/querier/queryrange/limits_test.go +++ b/pkg/querier/queryrange/limits_test.go @@ -58,7 +58,7 @@ func Test_seriesLimiter(t *testing.T) { cfg.CacheIndexStatsResults = false // split in 7 with 2 in // max. l := WithSplitByLimits(fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, time.Hour) - tpw, stopper, err := NewMiddleware(cfg, testEngineOpts, util_log.Logger, l, config.SchemaConfig{ + tpw, stopper, err := NewMiddleware(cfg, testEngineOpts, nil, util_log.Logger, l, config.SchemaConfig{ Configs: testSchemas, }, nil, false, nil, constants.Loki) if stopper != nil { @@ -228,7 +228,7 @@ func Test_MaxQueryParallelismDisable(t *testing.T) { } func Test_MaxQueryLookBack(t *testing.T) { - tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, util_log.Logger, fakeLimits{ + tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, nil, util_log.Logger, fakeLimits{ maxQueryLookback: 1 * time.Hour, maxQueryParallelism: 1, }, config.SchemaConfig{ diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 5f0aef4a1a..6d0d62af7a 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -120,6 +120,7 @@ func newResultsCacheFromConfig(cfg base.ResultsCacheConfig, registerer prometheu func NewMiddleware( cfg Config, engineOpts logql.EngineOpts, + iqo util.IngesterQueryOptions, log log.Logger, limits Limits, schema config.SchemaConfig, @@ -176,36 +177,38 @@ func NewMiddleware( var codec base.Codec = DefaultCodec - indexStatsTripperware, err := NewIndexStatsTripperware(cfg, log, limits, schema, codec, statsCache, + split := newDefaultSplitter(limits, iqo) + + indexStatsTripperware, err := NewIndexStatsTripperware(cfg, log, limits, schema, codec, split, statsCache, cacheGenNumLoader, retentionEnabled, metrics, metricsNamespace) if err != nil { return nil, nil, err } - metricsTripperware, err := NewMetricTripperware(cfg, engineOpts, log, limits, schema, codec, resultsCache, + metricsTripperware, err := NewMetricTripperware(cfg, engineOpts, log, limits, schema, codec, newMetricQuerySplitter(limits, iqo), resultsCache, cacheGenNumLoader, retentionEnabled, PrometheusExtractor{}, metrics, indexStatsTripperware, metricsNamespace) if err != nil { return nil, nil, err } - limitedTripperware, err := NewLimitedTripperware(cfg, engineOpts, log, limits, schema, metrics, indexStatsTripperware, codec) + limitedTripperware, err := NewLimitedTripperware(cfg, engineOpts, log, limits, schema, metrics, indexStatsTripperware, codec, split) if err != nil { return nil, nil, err } // NOTE: When we would start caching response from non-metric queries we would have to consider cache gen headers as well in // MergeResponse implementation for Loki codecs same as it is done in Cortex at https://github.com/cortexproject/cortex/blob/21bad57b346c730d684d6d0205efef133422ab28/pkg/querier/queryrange/query_range.go#L170 - logFilterTripperware, err := NewLogFilterTripperware(cfg, engineOpts, log, limits, schema, codec, resultsCache, metrics, indexStatsTripperware, metricsNamespace) + logFilterTripperware, err := NewLogFilterTripperware(cfg, engineOpts, log, limits, schema, codec, split, resultsCache, metrics, indexStatsTripperware, metricsNamespace) if err != nil { return nil, nil, err } - seriesTripperware, err := NewSeriesTripperware(cfg, log, limits, metrics, schema, codec, seriesCache, cacheGenNumLoader, retentionEnabled, metricsNamespace) + seriesTripperware, err := NewSeriesTripperware(cfg, log, limits, metrics, schema, codec, split, seriesCache, cacheGenNumLoader, retentionEnabled, metricsNamespace) if err != nil { return nil, nil, err } - labelsTripperware, err := NewLabelsTripperware(cfg, log, limits, codec, labelsCache, cacheGenNumLoader, retentionEnabled, metrics, schema, metricsNamespace) + labelsTripperware, err := NewLabelsTripperware(cfg, log, limits, codec, split, labelsCache, cacheGenNumLoader, retentionEnabled, metrics, schema, metricsNamespace) if err != nil { return nil, nil, err } @@ -215,7 +218,7 @@ func NewMiddleware( return nil, nil, err } - seriesVolumeTripperware, err := NewVolumeTripperware(cfg, log, limits, schema, codec, volumeCache, cacheGenNumLoader, retentionEnabled, metrics, metricsNamespace) + seriesVolumeTripperware, err := NewVolumeTripperware(cfg, log, limits, schema, codec, split, volumeCache, cacheGenNumLoader, retentionEnabled, metrics, metricsNamespace) if err != nil { return nil, nil, err } @@ -406,18 +409,7 @@ func getOperation(path string) string { } // NewLogFilterTripperware creates a new frontend tripperware responsible for handling log requests. -func NewLogFilterTripperware( - cfg Config, - engineOpts logql.EngineOpts, - log log.Logger, - limits Limits, - schema config.SchemaConfig, - merger base.Merger, - c cache.Cache, - metrics *Metrics, - indexStatsTripperware base.Middleware, - metricsNamespace string, -) (base.Middleware, error) { +func NewLogFilterTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, split splitter, c cache.Cache, metrics *Metrics, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) { return base.MiddlewareFunc(func(next base.Handler) base.Handler { statsHandler := indexStatsTripperware.Wrap(next) @@ -426,7 +418,7 @@ func NewLogFilterTripperware( NewLimitsMiddleware(limits), NewQuerySizeLimiterMiddleware(schema.Configs, engineOpts, log, limits, statsHandler), base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics), - SplitByIntervalMiddleware(schema.Configs, limits, merger, splitByTime, metrics.SplitByMetrics), + SplitByIntervalMiddleware(schema.Configs, limits, merger, split, metrics.SplitByMetrics), } if cfg.CacheResults { @@ -481,16 +473,7 @@ func NewLogFilterTripperware( } // NewLimitedTripperware creates a new frontend tripperware responsible for handling log requests which are label matcher only, no filter expression. -func NewLimitedTripperware( - _ Config, - engineOpts logql.EngineOpts, - log log.Logger, - limits Limits, - schema config.SchemaConfig, - metrics *Metrics, - indexStatsTripperware base.Middleware, - merger base.Merger, -) (base.Middleware, error) { +func NewLimitedTripperware(_ Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, metrics *Metrics, indexStatsTripperware base.Middleware, merger base.Merger, split splitter) (base.Middleware, error) { return base.MiddlewareFunc(func(next base.Handler) base.Handler { statsHandler := indexStatsTripperware.Wrap(next) @@ -499,7 +482,7 @@ func NewLimitedTripperware( NewLimitsMiddleware(limits), NewQuerySizeLimiterMiddleware(schema.Configs, engineOpts, log, limits, statsHandler), base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics), - SplitByIntervalMiddleware(schema.Configs, WithMaxParallelism(limits, limitedQuerySplits), merger, splitByTime, metrics.SplitByMetrics), + SplitByIntervalMiddleware(schema.Configs, WithMaxParallelism(limits, limitedQuerySplits), merger, split, metrics.SplitByMetrics), NewQuerierSizeLimiterMiddleware(schema.Configs, engineOpts, log, limits, statsHandler), } @@ -518,6 +501,7 @@ func NewSeriesTripperware( metrics *Metrics, schema config.SchemaConfig, merger base.Merger, + split splitter, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, @@ -558,7 +542,7 @@ func NewSeriesTripperware( StatsCollectorMiddleware(), NewLimitsMiddleware(limits), base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics), - SplitByIntervalMiddleware(schema.Configs, limits, merger, splitByTime, metrics.SplitByMetrics), + SplitByIntervalMiddleware(schema.Configs, limits, merger, split, metrics.SplitByMetrics), } if cfg.CacheSeriesResults { @@ -567,7 +551,6 @@ func NewSeriesTripperware( base.InstrumentMiddleware("series_results_cache", metrics.InstrumentMiddlewareMetrics), cacheMiddleware, ) - } if cfg.MaxRetries > 0 { @@ -601,6 +584,7 @@ func NewLabelsTripperware( log log.Logger, limits Limits, merger base.Merger, + split splitter, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, @@ -643,7 +627,7 @@ func NewLabelsTripperware( StatsCollectorMiddleware(), NewLimitsMiddleware(limits), base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics), - SplitByIntervalMiddleware(schema.Configs, limits, merger, splitByTime, metrics.SplitByMetrics), + SplitByIntervalMiddleware(schema.Configs, limits, merger, split, metrics.SplitByMetrics), } if cfg.CacheLabelResults { @@ -652,7 +636,6 @@ func NewLabelsTripperware( base.InstrumentMiddleware("label_results_cache", metrics.InstrumentMiddlewareMetrics), cacheMiddleware, ) - } if cfg.MaxRetries > 0 { @@ -669,21 +652,7 @@ func NewLabelsTripperware( } // NewMetricTripperware creates a new frontend tripperware responsible for handling metric queries -func NewMetricTripperware( - cfg Config, - engineOpts logql.EngineOpts, - log log.Logger, - limits Limits, - schema config.SchemaConfig, - merger base.Merger, - c cache.Cache, - cacheGenNumLoader base.CacheGenNumberLoader, - retentionEnabled bool, - extractor base.Extractor, - metrics *Metrics, - indexStatsTripperware base.Middleware, - metricsNamespace string, -) (base.Middleware, error) { +func NewMetricTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, split splitter, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, extractor base.Extractor, metrics *Metrics, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) { cacheKey := cacheKeyLimits{limits, cfg.Transformer} var queryCacheMiddleware base.Middleware if cfg.CacheResults { @@ -737,7 +706,7 @@ func NewMetricTripperware( queryRangeMiddleware, NewQuerySizeLimiterMiddleware(schema.Configs, engineOpts, log, limits, statsHandler), base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics), - SplitByIntervalMiddleware(schema.Configs, limits, merger, splitMetricByTime, metrics.SplitByMetrics), + SplitByIntervalMiddleware(schema.Configs, limits, merger, split, metrics.SplitByMetrics), ) if cfg.CacheResults { @@ -793,16 +762,7 @@ func NewMetricTripperware( } // NewInstantMetricTripperware creates a new frontend tripperware responsible for handling metric queries -func NewInstantMetricTripperware( - cfg Config, - engineOpts logql.EngineOpts, - log log.Logger, - limits Limits, - schema config.SchemaConfig, - metrics *Metrics, - indexStatsTripperware base.Middleware, - metricsNamespace string, -) (base.Middleware, error) { +func NewInstantMetricTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, metrics *Metrics, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) { return base.MiddlewareFunc(func(next base.Handler) base.Handler { statsHandler := indexStatsTripperware.Wrap(next) @@ -844,21 +804,10 @@ func NewInstantMetricTripperware( }), nil } -func NewVolumeTripperware( - cfg Config, - log log.Logger, - limits Limits, - schema config.SchemaConfig, - merger base.Merger, - c cache.Cache, - cacheGenNumLoader base.CacheGenNumberLoader, - retentionEnabled bool, - metrics *Metrics, - metricsNamespace string, -) (base.Middleware, error) { +func NewVolumeTripperware(cfg Config, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, split splitter, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, metrics *Metrics, metricsNamespace string) (base.Middleware, error) { // Parallelize the volume requests, so it doesn't send a huge request to a single index-gw (i.e. {app=~".+"} for 30d). // Indices are sharded by 24 hours, so we split the volume request in 24h intervals. - limits = WithSplitByLimits(limits, 24*time.Hour) + limits = WithSplitByLimits(limits, indexStatsQuerySplitInterval) var cacheMiddleware base.Middleware if cfg.CacheVolumeResults { var err error @@ -894,6 +843,7 @@ func NewVolumeTripperware( cacheMiddleware, cfg, merger, + split, limits, log, metrics, @@ -962,18 +912,7 @@ func volumeFeatureFlagRoundTripper(nextTW base.Middleware, limits Limits) base.M }) } -func NewIndexStatsTripperware( - cfg Config, - log log.Logger, - limits Limits, - schema config.SchemaConfig, - merger base.Merger, - c cache.Cache, - cacheGenNumLoader base.CacheGenNumberLoader, - retentionEnabled bool, - metrics *Metrics, - metricsNamespace string, -) (base.Middleware, error) { +func NewIndexStatsTripperware(cfg Config, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, split splitter, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, metrics *Metrics, metricsNamespace string) (base.Middleware, error) { limits = WithSplitByLimits(limits, indexStatsQuerySplitInterval) var cacheMiddleware base.Middleware @@ -1011,6 +950,7 @@ func NewIndexStatsTripperware( cacheMiddleware, cfg, merger, + split, limits, log, metrics, @@ -1028,6 +968,7 @@ func sharedIndexTripperware( cacheMiddleware base.Middleware, cfg Config, merger base.Merger, + split splitter, limits Limits, log log.Logger, metrics *Metrics, @@ -1038,7 +979,7 @@ func sharedIndexTripperware( middlewares := []base.Middleware{ NewLimitsMiddleware(limits), base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics), - SplitByIntervalMiddleware(schema.Configs, limits, merger, splitByTime, metrics.SplitByMetrics), + SplitByIntervalMiddleware(schema.Configs, limits, merger, split, metrics.SplitByMetrics), } if cacheMiddleware != nil { diff --git a/pkg/querier/queryrange/roundtrip_test.go b/pkg/querier/queryrange/roundtrip_test.go index 883f9b1422..fe8799fffe 100644 --- a/pkg/querier/queryrange/roundtrip_test.go +++ b/pkg/querier/queryrange/roundtrip_test.go @@ -189,7 +189,7 @@ func TestMetricsTripperware(t *testing.T) { noCacheTestCfg := testConfig noCacheTestCfg.CacheResults = false noCacheTestCfg.CacheIndexStatsResults = false - tpw, stopper, err := NewMiddleware(noCacheTestCfg, testEngineOpts, util_log.Logger, l, config.SchemaConfig{ + tpw, stopper, err := NewMiddleware(noCacheTestCfg, testEngineOpts, nil, util_log.Logger, l, config.SchemaConfig{ Configs: testSchemasTSDB, }, nil, false, nil, constants.Loki) if stopper != nil { @@ -240,7 +240,7 @@ func TestMetricsTripperware(t *testing.T) { require.Error(t, err) // Configure with cache - tpw, stopper, err = NewMiddleware(testConfig, testEngineOpts, util_log.Logger, l, config.SchemaConfig{ + tpw, stopper, err = NewMiddleware(testConfig, testEngineOpts, nil, util_log.Logger, l, config.SchemaConfig{ Configs: testSchemasTSDB, }, nil, false, nil, constants.Loki) if stopper != nil { @@ -278,7 +278,7 @@ func TestLogFilterTripperware(t *testing.T) { noCacheTestCfg := testConfig noCacheTestCfg.CacheResults = false noCacheTestCfg.CacheIndexStatsResults = false - tpw, stopper, err := NewMiddleware(noCacheTestCfg, testEngineOpts, util_log.Logger, l, config.SchemaConfig{Configs: testSchemasTSDB}, nil, false, nil, constants.Loki) + tpw, stopper, err := NewMiddleware(noCacheTestCfg, testEngineOpts, nil, util_log.Logger, l, config.SchemaConfig{Configs: testSchemasTSDB}, nil, false, nil, constants.Loki) if stopper != nil { defer stopper.Stop() } @@ -347,7 +347,7 @@ func TestInstantQueryTripperware(t *testing.T) { queryTimeout: 1 * time.Minute, maxSeries: 1, } - tpw, stopper, err := NewMiddleware(testShardingConfigNoCache, testEngineOpts, util_log.Logger, l, config.SchemaConfig{Configs: testSchemasTSDB}, nil, false, nil, constants.Loki) + tpw, stopper, err := NewMiddleware(testShardingConfigNoCache, testEngineOpts, nil, util_log.Logger, l, config.SchemaConfig{Configs: testSchemasTSDB}, nil, false, nil, constants.Loki) if stopper != nil { defer stopper.Stop() } @@ -403,7 +403,7 @@ func TestSeriesTripperware(t *testing.T) { "1": 24 * time.Hour, }, } - tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, util_log.Logger, l, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki) + tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, nil, util_log.Logger, l, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki) if stopper != nil { defer stopper.Stop() } @@ -441,7 +441,7 @@ func TestLabelsTripperware(t *testing.T) { "1": 24 * time.Hour, }, } - tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, util_log.Logger, l, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki) + tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, nil, util_log.Logger, l, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki) if stopper != nil { defer stopper.Stop() } @@ -487,7 +487,7 @@ func TestLabelsTripperware(t *testing.T) { } func TestIndexStatsTripperware(t *testing.T) { - tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki) + tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, nil, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki) if stopper != nil { defer stopper.Stop() } @@ -537,7 +537,7 @@ func TestVolumeTripperware(t *testing.T) { volumeEnabled: true, maxSeries: 42, } - tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, util_log.Logger, limits, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki) + tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, nil, util_log.Logger, limits, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki) if stopper != nil { defer stopper.Stop() } @@ -593,7 +593,7 @@ func TestVolumeTripperware(t *testing.T) { }) t.Run("range queries return a prometheus style metrics response, putting volumes in buckets based on the step", func(t *testing.T) { - tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, volumeEnabled: true}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki) + tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, nil, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, volumeEnabled: true}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki) if stopper != nil { defer stopper.Stop() } @@ -784,7 +784,7 @@ func TestNewTripperware_Caches(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - _, stopper, err := NewMiddleware(tc.config, testEngineOpts, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki) + _, stopper, err := NewMiddleware(tc.config, testEngineOpts, nil, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki) if stopper != nil { defer stopper.Stop() } @@ -814,7 +814,7 @@ func TestNewTripperware_Caches(t *testing.T) { } func TestLogNoFilter(t *testing.T) { - tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki) + tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, nil, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki) if stopper != nil { defer stopper.Stop() } @@ -871,7 +871,7 @@ func TestPostQueries(t *testing.T) { } func TestTripperware_EntriesLimit(t *testing.T) { - tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, util_log.Logger, fakeLimits{maxEntriesLimitPerQuery: 5000, maxQueryParallelism: 1}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki) + tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, nil, util_log.Logger, fakeLimits{maxEntriesLimitPerQuery: 5000, maxQueryParallelism: 1}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki) if stopper != nil { defer stopper.Stop() } @@ -920,7 +920,7 @@ func TestTripperware_RequiredLabels(t *testing.T) { } { t.Run(test.qs, func(t *testing.T) { limits := fakeLimits{maxEntriesLimitPerQuery: 5000, maxQueryParallelism: 1, requiredLabels: []string{"app"}} - tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, util_log.Logger, limits, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki) + tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, nil, util_log.Logger, limits, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki) if stopper != nil { defer stopper.Stop() } @@ -1027,7 +1027,7 @@ func TestTripperware_RequiredNumberLabels(t *testing.T) { maxQueryParallelism: 1, requiredNumberLabels: tc.requiredNumberLabels, } - tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, util_log.Logger, limits, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki) + tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, nil, util_log.Logger, limits, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki) if stopper != nil { defer stopper.Stop() } @@ -1218,7 +1218,7 @@ func TestMetricsTripperware_SplitShardStats(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - tpw, stopper, err := NewMiddleware(statsTestCfg, testEngineOpts, util_log.Logger, l, config.SchemaConfig{Configs: statsSchemas}, nil, false, nil, constants.Loki) + tpw, stopper, err := NewMiddleware(statsTestCfg, testEngineOpts, nil, util_log.Logger, l, config.SchemaConfig{Configs: statsSchemas}, nil, false, nil, constants.Loki) if stopper != nil { defer stopper.Stop() } @@ -1245,6 +1245,7 @@ type fakeLimits struct { maxSeries int splitDuration map[string]time.Duration metadataSplitDuration map[string]time.Duration + ingesterSplitDuration map[string]time.Duration minShardingLookback time.Duration queryTimeout time.Duration requiredLabels []string @@ -1269,6 +1270,13 @@ func (f fakeLimits) MetadataQuerySplitDuration(key string) time.Duration { return f.metadataSplitDuration[key] } +func (f fakeLimits) IngesterQuerySplitDuration(key string) time.Duration { + if f.ingesterSplitDuration == nil { + return 0 + } + return f.ingesterSplitDuration[key] +} + func (f fakeLimits) MaxQueryLength(context.Context, string) time.Duration { if f.maxQueryLength == 0 { return time.Hour * 7 @@ -1344,6 +1352,19 @@ func (f fakeLimits) TSDBMaxBytesPerShard(_ string) int { return valid.DefaultTSDBMaxBytesPerShard } +type ingesterQueryOpts struct { + queryStoreOnly bool + queryIngestersWithin time.Duration +} + +func (i ingesterQueryOpts) QueryStoreOnly() bool { + return i.queryStoreOnly +} + +func (i ingesterQueryOpts) QueryIngestersWithin() time.Duration { + return i.queryIngestersWithin +} + func counter() (*int, base.Handler) { count := 0 var lock sync.Mutex diff --git a/pkg/querier/queryrange/split_by_interval.go b/pkg/querier/queryrange/split_by_interval.go index 9e2eda4b19..b332fe5e61 100644 --- a/pkg/querier/queryrange/split_by_interval.go +++ b/pkg/querier/queryrange/split_by_interval.go @@ -21,7 +21,6 @@ import ( "github.com/grafana/loki/pkg/logql/syntax" "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" "github.com/grafana/loki/pkg/storage/config" - "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/validation" ) @@ -56,13 +55,11 @@ type splitByInterval struct { limits Limits merger queryrangebase.Merger metrics *SplitByMetrics - splitter Splitter + splitter splitter } -type Splitter func(req queryrangebase.Request, interval time.Duration) ([]queryrangebase.Request, error) - // SplitByIntervalMiddleware creates a new Middleware that splits log requests by a given interval. -func SplitByIntervalMiddleware(configs []config.PeriodConfig, limits Limits, merger queryrangebase.Merger, splitter Splitter, metrics *SplitByMetrics) queryrangebase.Middleware { +func SplitByIntervalMiddleware(configs []config.PeriodConfig, limits Limits, merger queryrangebase.Merger, splitter splitter, metrics *SplitByMetrics) queryrangebase.Middleware { if metrics == nil { metrics = NewSplitByMetrics(nil) } @@ -197,7 +194,7 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrangebase.Request) (que return h.next.Do(ctx, r) } - intervals, err := h.splitter(r, interval) + intervals, err := h.splitter.split(time.Now().UTC(), tenantIDs, r, interval) if err != nil { return nil, err } @@ -251,73 +248,6 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrangebase.Request) (que return h.merger.MergeResponse(resps...) } -func splitByTime(req queryrangebase.Request, interval time.Duration) ([]queryrangebase.Request, error) { - var reqs []queryrangebase.Request - - switch r := req.(type) { - case *LokiRequest: - util.ForInterval(interval, r.StartTs, r.EndTs, false, func(start, end time.Time) { - reqs = append(reqs, &LokiRequest{ - Query: r.Query, - Limit: r.Limit, - Step: r.Step, - Interval: r.Interval, - Direction: r.Direction, - Path: r.Path, - StartTs: start, - EndTs: end, - Plan: r.Plan, - }) - }) - case *LokiSeriesRequest: - // metadata queries have end time inclusive. - // Set endTimeInclusive to true so that ForInterval keeps a gap of 1ms between splits to - // avoid querying duplicate data in adjacent queries. - util.ForInterval(interval, r.StartTs, r.EndTs, true, func(start, end time.Time) { - reqs = append(reqs, &LokiSeriesRequest{ - Match: r.Match, - Path: r.Path, - StartTs: start, - EndTs: end, - Shards: r.Shards, - }) - }) - case *LabelRequest: - // metadata queries have end time inclusive. - // Set endTimeInclusive to true so that ForInterval keeps a gap of 1ms between splits to - // avoid querying duplicate data in adjacent queries. - util.ForInterval(interval, *r.Start, *r.End, true, func(start, end time.Time) { - reqs = append(reqs, NewLabelRequest(start, end, r.Query, r.Name, r.Path())) - }) - case *logproto.IndexStatsRequest: - startTS := r.GetStart() - endTS := r.GetEnd() - util.ForInterval(interval, startTS, endTS, true, func(start, end time.Time) { - reqs = append(reqs, &logproto.IndexStatsRequest{ - From: model.TimeFromUnix(start.Unix()), - Through: model.TimeFromUnix(end.Unix()), - Matchers: r.GetMatchers(), - }) - }) - case *logproto.VolumeRequest: - startTS := r.GetStart() - endTS := r.GetEnd() - util.ForInterval(interval, startTS, endTS, true, func(start, end time.Time) { - reqs = append(reqs, &logproto.VolumeRequest{ - From: model.TimeFromUnix(start.Unix()), - Through: model.TimeFromUnix(end.Unix()), - Matchers: r.GetMatchers(), - Limit: r.Limit, - TargetLabels: r.TargetLabels, - AggregateBy: r.AggregateBy, - }) - }) - default: - return nil, nil - } - return reqs, nil -} - // maxRangeVectorAndOffsetDurationFromQueryString func maxRangeVectorAndOffsetDurationFromQueryString(q string) (time.Duration, time.Duration, error) { parsed, err := syntax.ParseExpr(q) @@ -346,92 +276,3 @@ func maxRangeVectorAndOffsetDuration(expr syntax.Expr) (time.Duration, time.Dura }) return maxRVDuration, maxOffset, nil } - -// reduceSplitIntervalForRangeVector reduces the split interval for a range query based on the duration of the range vector. -// Large range vector durations will not be split into smaller intervals because it can cause the queries to be slow by over-processing data. -func reduceSplitIntervalForRangeVector(r *LokiRequest, interval time.Duration) (time.Duration, error) { - maxRange, _, err := maxRangeVectorAndOffsetDuration(r.Plan.AST) - if err != nil { - return 0, err - } - if maxRange > interval { - return maxRange, nil - } - return interval, nil -} - -func splitMetricByTime(r queryrangebase.Request, interval time.Duration) ([]queryrangebase.Request, error) { - var reqs []queryrangebase.Request - - lokiReq := r.(*LokiRequest) - - interval, err := reduceSplitIntervalForRangeVector(lokiReq, interval) - if err != nil { - return nil, err - } - - // step align start and end time of the query. Start time is rounded down and end time is rounded up. - stepNs := r.GetStep() * 1e6 - startNs := lokiReq.StartTs.UnixNano() - start := time.Unix(0, startNs-startNs%stepNs) - - endNs := lokiReq.EndTs.UnixNano() - if mod := endNs % stepNs; mod != 0 { - endNs += stepNs - mod - } - end := time.Unix(0, endNs) - - lokiReq = lokiReq.WithStartEnd(start, end).(*LokiRequest) - - // step is >= configured split interval, let us just split the query interval by step - if lokiReq.Step >= interval.Milliseconds() { - util.ForInterval(time.Duration(lokiReq.Step*1e6), lokiReq.StartTs, lokiReq.EndTs, false, func(start, end time.Time) { - reqs = append(reqs, &LokiRequest{ - Query: lokiReq.Query, - Limit: lokiReq.Limit, - Step: lokiReq.Step, - Interval: lokiReq.Interval, - Direction: lokiReq.Direction, - Path: lokiReq.Path, - StartTs: start, - EndTs: end, - Plan: lokiReq.Plan, - }) - }) - - return reqs, nil - } - - for start := lokiReq.StartTs; start.Before(lokiReq.EndTs); start = nextIntervalBoundary(start, r.GetStep(), interval).Add(time.Duration(r.GetStep()) * time.Millisecond) { - end := nextIntervalBoundary(start, r.GetStep(), interval) - if end.Add(time.Duration(r.GetStep())*time.Millisecond).After(lokiReq.EndTs) || end.Add(time.Duration(r.GetStep())*time.Millisecond) == lokiReq.EndTs { - end = lokiReq.EndTs - } - reqs = append(reqs, &LokiRequest{ - Query: lokiReq.Query, - Limit: lokiReq.Limit, - Step: lokiReq.Step, - Interval: lokiReq.Interval, - Direction: lokiReq.Direction, - Path: lokiReq.Path, - StartTs: start, - EndTs: end, - Plan: lokiReq.Plan, - }) - } - - return reqs, nil -} - -// Round up to the step before the next interval boundary. -func nextIntervalBoundary(t time.Time, step int64, interval time.Duration) time.Time { - stepNs := step * 1e6 - nsPerInterval := interval.Nanoseconds() - startOfNextInterval := ((t.UnixNano() / nsPerInterval) + 1) * nsPerInterval - // ensure that target is a multiple of steps away from the start time - target := startOfNextInterval - ((startOfNextInterval - t.UnixNano()) % stepNs) - if target == startOfNextInterval { - target -= stepNs - } - return time.Unix(0, target) -} diff --git a/pkg/querier/queryrange/split_by_interval_test.go b/pkg/querier/queryrange/split_by_interval_test.go index b236b88fb4..acf8c495be 100644 --- a/pkg/querier/queryrange/split_by_interval_test.go +++ b/pkg/querier/queryrange/split_by_interval_test.go @@ -9,9 +9,9 @@ import ( "testing" "time" - "github.com/prometheus/common/model" - "github.com/grafana/dskit/user" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gopkg.in/yaml.v2" @@ -22,6 +22,8 @@ import ( "github.com/grafana/loki/pkg/querier/plan" "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" "github.com/grafana/loki/pkg/storage/config" + "github.com/grafana/loki/pkg/storage/stores/index/seriesvolume" + "github.com/grafana/loki/pkg/util" ) var nilMetrics = NewSplitByMetrics(nil) @@ -56,181 +58,393 @@ var testSchemasTSDB = func() []config.PeriodConfig { return confs }() -func Test_splitQuery(t *testing.T) { - buildLokiRequest := func(start, end time.Time) queryrangebase.Request { - return &LokiRequest{ - Query: `{app="foo"}`, - Limit: 1, - Step: 2, - StartTs: start, - EndTs: end, - Direction: logproto.BACKWARD, - Path: "/path", - Plan: &plan.QueryPlan{ - AST: syntax.MustParseExpr(`{app="foo"}`), - }, - } - } - - buildLokiRequestWithInterval := func(start, end time.Time) queryrangebase.Request { - return &LokiRequest{ - Query: `{app="foo"}`, - Limit: 1, - Interval: 2, - StartTs: start, - EndTs: end, - Direction: logproto.BACKWARD, - Path: "/path", - Plan: &plan.QueryPlan{ - AST: syntax.MustParseExpr(`{app="foo"}`), - }, - } - } - - buildLokiSeriesRequest := func(start, end time.Time) queryrangebase.Request { - return &LokiSeriesRequest{ - Match: []string{"match1"}, - StartTs: start, - EndTs: end, - Path: "/series", - Shards: []string{"shard1"}, - } - } - - buildLokiLabelNamesRequest := func(start, end time.Time) queryrangebase.Request { - return NewLabelRequest(start, end, "", "", "/lables") - } +var ( + // 62697274686461792063616b65 + refTime = time.Date(2023, 1, 15, 8, 5, 30, 123456789, time.UTC) + tenantID = "1" +) +func Test_splitQuery(t *testing.T) { type interval struct { start, end time.Time } + for requestType, tc := range map[string]struct { requestBuilderFunc func(start, end time.Time) queryrangebase.Request endTimeInclusive bool }{ - "LokiRequest": { - buildLokiRequest, - false, + "logs request": { + requestBuilderFunc: func(start, end time.Time) queryrangebase.Request { + return &LokiRequest{ + Query: `{app="foo"}`, + Limit: 1, + Step: 2, + StartTs: start, + EndTs: end, + Direction: logproto.BACKWARD, + Path: "/query", + Plan: &plan.QueryPlan{ + AST: syntax.MustParseExpr(`{app="foo"}`), + }, + } + }, + }, + "logs request with interval": { + requestBuilderFunc: func(start, end time.Time) queryrangebase.Request { + return &LokiRequest{ + Query: `{app="foo"}`, + Limit: 1, + Interval: 2, + StartTs: start, + EndTs: end, + Direction: logproto.BACKWARD, + Path: "/query", + Plan: &plan.QueryPlan{ + AST: syntax.MustParseExpr(`{app="foo"}`), + }, + } + }, + }, + "series request": { + requestBuilderFunc: func(start, end time.Time) queryrangebase.Request { + return &LokiSeriesRequest{ + Match: []string{"match1"}, + StartTs: start, + EndTs: end, + Path: "/series", + Shards: []string{"shard1"}, + } + }, + endTimeInclusive: true, + }, + "label names request": { + requestBuilderFunc: func(start, end time.Time) queryrangebase.Request { + return NewLabelRequest(start, end, `{foo="bar"}`, "", "/labels") + }, + endTimeInclusive: true, }, - "LokiRequestWithInterval": { - buildLokiRequestWithInterval, - false, + "label values request": { + requestBuilderFunc: func(start, end time.Time) queryrangebase.Request { + return NewLabelRequest(start, end, `{foo="bar"}`, "test", "/label/test/values") + }, + endTimeInclusive: true, }, - "LokiSeriesRequest": { - buildLokiSeriesRequest, - true, + "index stats request": { + requestBuilderFunc: func(start, end time.Time) queryrangebase.Request { + return &logproto.IndexStatsRequest{ + From: model.TimeFromUnix(start.Unix()), + Through: model.TimeFromUnix(end.Unix()), + Matchers: `{host="agent"}`, + } + }, + endTimeInclusive: true, }, - "LokiLabelNamesRequest": { - buildLokiLabelNamesRequest, - true, + "volume request": { + requestBuilderFunc: func(start, end time.Time) queryrangebase.Request { + return &logproto.VolumeRequest{ + From: model.TimeFromUnix(start.Unix()), + Through: model.TimeFromUnix(end.Unix()), + Matchers: `{host="agent"}`, + Limit: 5, + AggregateBy: seriesvolume.Series, + } + }, + endTimeInclusive: true, }, } { expectedSplitGap := time.Duration(0) if tc.endTimeInclusive { - expectedSplitGap = time.Millisecond + expectedSplitGap = util.SplitGap } - for name, intervals := range map[string]struct { - inp interval - expected []interval - }{ - "no_change": { - inp: interval{ - start: time.Unix(0, 0), - end: time.Unix(0, (1 * time.Hour).Nanoseconds()), - }, - expected: []interval{ - { + + t.Run(requestType, func(t *testing.T) { + for name, intervals := range map[string]struct { + input interval + expected []interval + splitInterval time.Duration + splitter splitter + }{ + "no change": { + input: interval{ start: time.Unix(0, 0), end: time.Unix(0, (1 * time.Hour).Nanoseconds()), }, + expected: []interval{ + { + start: time.Unix(0, 0), + end: time.Unix(0, (1 * time.Hour).Nanoseconds()), + }, + }, }, - }, - "align_start": { - inp: interval{ - start: time.Unix(0, (5 * time.Minute).Nanoseconds()), - end: time.Unix(0, (2 * time.Hour).Nanoseconds()), - }, - expected: []interval{ - { + "align start": { + input: interval{ start: time.Unix(0, (5 * time.Minute).Nanoseconds()), - end: time.Unix(0, (1 * time.Hour).Nanoseconds()).Add(-expectedSplitGap), - }, - { - start: time.Unix(0, (1 * time.Hour).Nanoseconds()), end: time.Unix(0, (2 * time.Hour).Nanoseconds()), }, + expected: []interval{ + { + start: time.Unix(0, (5 * time.Minute).Nanoseconds()), + end: time.Unix(0, (1 * time.Hour).Nanoseconds()).Add(-expectedSplitGap), + }, + { + start: time.Unix(0, (1 * time.Hour).Nanoseconds()), + end: time.Unix(0, (2 * time.Hour).Nanoseconds()), + }, + }, }, - }, - "align_end": { - inp: interval{ - start: time.Unix(0, 0), - end: time.Unix(0, (115 * time.Minute).Nanoseconds()), - }, - expected: []interval{ - { + "align end": { + input: interval{ start: time.Unix(0, 0), - end: time.Unix(0, (1 * time.Hour).Nanoseconds()).Add(-expectedSplitGap), - }, - { - start: time.Unix(0, (1 * time.Hour).Nanoseconds()), end: time.Unix(0, (115 * time.Minute).Nanoseconds()), }, + expected: []interval{ + { + start: time.Unix(0, 0), + end: time.Unix(0, (1 * time.Hour).Nanoseconds()).Add(-expectedSplitGap), + }, + { + start: time.Unix(0, (1 * time.Hour).Nanoseconds()), + end: time.Unix(0, (115 * time.Minute).Nanoseconds()), + }, + }, }, - }, - "align_both": { - inp: interval{ - start: time.Unix(0, (5 * time.Minute).Nanoseconds()), - end: time.Unix(0, (175 * time.Minute).Nanoseconds()), + "align both": { + input: interval{ + start: time.Unix(0, (5 * time.Minute).Nanoseconds()), + end: time.Unix(0, (175 * time.Minute).Nanoseconds()), + }, + expected: []interval{ + { + start: time.Unix(0, (5 * time.Minute).Nanoseconds()), + end: time.Unix(0, (1 * time.Hour).Nanoseconds()).Add(-expectedSplitGap), + }, + { + start: time.Unix(0, (1 * time.Hour).Nanoseconds()), + end: time.Unix(0, (2 * time.Hour).Nanoseconds()).Add(-expectedSplitGap), + }, + { + start: time.Unix(0, (2 * time.Hour).Nanoseconds()), + end: time.Unix(0, (175 * time.Minute).Nanoseconds()), + }, + }, }, - expected: []interval{ - { + "no align": { + input: interval{ start: time.Unix(0, (5 * time.Minute).Nanoseconds()), - end: time.Unix(0, (1 * time.Hour).Nanoseconds()).Add(-expectedSplitGap), + end: time.Unix(0, (55 * time.Minute).Nanoseconds()), }, - { - start: time.Unix(0, (1 * time.Hour).Nanoseconds()), - end: time.Unix(0, (2 * time.Hour).Nanoseconds()).Add(-expectedSplitGap), + expected: []interval{ + { + start: time.Unix(0, (5 * time.Minute).Nanoseconds()), + end: time.Unix(0, (55 * time.Minute).Nanoseconds()), + }, }, - { - start: time.Unix(0, (2 * time.Hour).Nanoseconds()), - end: time.Unix(0, (175 * time.Minute).Nanoseconds()), + }, + "wholly within ingester query window": { + input: interval{ + start: refTime.Add(-time.Hour).Truncate(time.Second), + end: refTime, + }, + expected: []interval{ + { + start: refTime.Add(-time.Hour).Truncate(time.Second), + end: time.Date(2023, 1, 15, 7, 30, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 7, 30, 0, 0, time.UTC), + end: refTime, + }, }, + splitInterval: time.Hour, + splitter: newDefaultSplitter( + fakeLimits{ingesterSplitDuration: map[string]time.Duration{tenantID: 90 * time.Minute}}, + ingesterQueryOpts{queryIngestersWithin: 3 * time.Hour}, + ), }, - }, - "no_align": { - inp: interval{ - start: time.Unix(0, (5 * time.Minute).Nanoseconds()), - end: time.Unix(0, (55 * time.Minute).Nanoseconds()), + "partially within ingester query window": { + input: interval{ + // overlapping `query_ingesters_within` window of 3h + start: refTime.Add(-4 * time.Hour).Add(-30 * time.Minute).Truncate(time.Second), + end: refTime, + }, + expected: []interval{ + // regular intervals until `query_ingesters_within` window + { + start: refTime.Add(-4 * time.Hour).Add(-30 * time.Minute).Truncate(time.Second), + end: time.Date(2023, 1, 15, 4, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 4, 0, 0, 0, time.UTC), + end: time.Date(2023, 1, 15, 5, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 5, 0, 0, 0, time.UTC), + end: time.Date(2023, 1, 15, 5, 5, 30, 123456789, time.UTC).Add(-expectedSplitGap), + }, + // and then different intervals for queries to ingesters + { + start: time.Date(2023, 1, 15, 5, 5, 30, 123456789, time.UTC), + end: time.Date(2023, 1, 15, 6, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 6, 0, 0, 0, time.UTC), + end: time.Date(2023, 1, 15, 7, 30, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 7, 30, 0, 0, time.UTC), + end: refTime, + }, + }, + splitInterval: time.Hour, + splitter: newDefaultSplitter( + fakeLimits{ingesterSplitDuration: map[string]time.Duration{tenantID: 90 * time.Minute}}, + ingesterQueryOpts{queryIngestersWithin: 3 * time.Hour}, + ), }, - expected: []interval{ - { - start: time.Unix(0, (5 * time.Minute).Nanoseconds()), - end: time.Unix(0, (55 * time.Minute).Nanoseconds()), + "not within ingester query window": { + input: interval{ + // outside `query_ingesters_within` range of 3h + start: refTime.Add(-5 * time.Hour).Truncate(time.Second), + end: refTime.Add(-4 * time.Hour).Truncate(time.Second), + }, + expected: []interval{ + // regular intervals outside `query_ingesters_within` window + { + start: refTime.Add(-5 * time.Hour).Truncate(time.Second), + end: time.Date(2023, 1, 15, 4, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 4, 0, 0, 0, time.UTC), + end: refTime.Add(-4 * time.Hour).Truncate(time.Second), + }, }, + splitInterval: time.Hour, + splitter: newDefaultSplitter( + fakeLimits{ingesterSplitDuration: map[string]time.Duration{tenantID: 90 * time.Minute}}, + ingesterQueryOpts{queryIngestersWithin: 3 * time.Hour}, + ), }, - }, - } { - t.Run(fmt.Sprintf("%s - %s", name, requestType), func(t *testing.T) { - inp := tc.requestBuilderFunc(intervals.inp.start, intervals.inp.end) - var want []queryrangebase.Request - for _, interval := range intervals.expected { - want = append(want, tc.requestBuilderFunc(interval.start, interval.end)) - } - splits, err := splitByTime(inp, time.Hour) - require.NoError(t, err) - require.Equal(t, want, splits) - }) - } + "ingester query split by disabled": { + input: interval{ + // overlapping `query_ingesters_within` range of 3h + start: refTime.Add(-4 * time.Hour).Truncate(time.Second), + end: refTime, + }, + expected: []interval{ + // regular intervals only, since ingester split duration is 0 + { + start: refTime.Add(-4 * time.Hour).Truncate(time.Second), + end: time.Date(2023, 1, 15, 5, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 5, 0, 0, 0, time.UTC), + end: time.Date(2023, 1, 15, 6, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 6, 0, 0, 0, time.UTC), + end: time.Date(2023, 1, 15, 7, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 7, 0, 0, 0, time.UTC), + end: time.Date(2023, 1, 15, 8, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 8, 0, 0, 0, time.UTC), + end: refTime, + }, + }, + splitInterval: time.Hour, + splitter: newDefaultSplitter( + fakeLimits{ingesterSplitDuration: map[string]time.Duration{tenantID: 0}}, + ingesterQueryOpts{queryIngestersWithin: 3 * time.Hour}, + ), + }, + "ingester query split enabled but query_store_only enabled too": { + input: interval{ + // overlapping `query_ingesters_within` range of 3h + start: refTime.Add(-4 * time.Hour).Truncate(time.Second), + end: refTime, + }, + expected: []interval{ + // regular intervals only, since ingester split duration is 0 + { + start: refTime.Add(-4 * time.Hour).Truncate(time.Second), + end: time.Date(2023, 1, 15, 5, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 5, 0, 0, 0, time.UTC), + end: time.Date(2023, 1, 15, 6, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 6, 0, 0, 0, time.UTC), + end: time.Date(2023, 1, 15, 7, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 7, 0, 0, 0, time.UTC), + end: time.Date(2023, 1, 15, 8, 0, 0, 0, time.UTC).Add(-expectedSplitGap), + }, + { + start: time.Date(2023, 1, 15, 8, 0, 0, 0, time.UTC), + end: refTime, + }, + }, + splitInterval: time.Hour, + splitter: newDefaultSplitter( + fakeLimits{ingesterSplitDuration: map[string]time.Duration{tenantID: 90 * time.Minute}}, + ingesterQueryOpts{queryIngestersWithin: 3 * time.Hour, queryStoreOnly: true}, + ), + }, + } { + t.Run(name, func(t *testing.T) { + req := tc.requestBuilderFunc(intervals.input.start, intervals.input.end) + var want []queryrangebase.Request + for _, exp := range intervals.expected { + want = append(want, tc.requestBuilderFunc(exp.start, exp.end)) + } + + if intervals.splitInterval == 0 { + intervals.splitInterval = time.Hour + } + + if intervals.splitter == nil { + intervals.splitter = newDefaultSplitter(fakeLimits{}, nil) + } + + splits, err := intervals.splitter.split(refTime, []string{tenantID}, req, intervals.splitInterval) + require.NoError(t, err) + if !assert.Equal(t, want, splits) { + t.Logf("expected and actual do not match\n") + defer t.Fail() + + if len(want) != len(splits) { + t.Logf("expected %d splits, got %d\n", len(want), len(splits)) + return + } + + for j := 0; j < len(want); j++ { + exp := want[j] + act := splits[j] + equal := assert.Equal(t, exp, act) + t.Logf("\t#%d [matches: %v]: expected %q/%q got %q/%q\n", j, equal, exp.GetStart(), exp.GetEnd(), act.GetStart(), act.GetEnd()) + } + } + }) + } + }) } } func Test_splitMetricQuery(t *testing.T) { const seconds = 1e3 // 1e3 milliseconds per second. + const shortRange = `rate({app="foo"}[1m])` + const longRange = `rate({app="foo"}[7d])` + for i, tc := range []struct { - input *LokiRequest - expected []queryrangebase.Request - interval time.Duration + input *LokiRequest + expected []queryrangebase.Request + splitInterval time.Duration + splitter splitter }{ // the step is lower than the interval therefore we should split only once. { @@ -238,172 +452,172 @@ func Test_splitMetricQuery(t *testing.T) { StartTs: time.Unix(0, 0), EndTs: time.Unix(0, 60*time.Minute.Nanoseconds()), Step: 15 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, expected: []queryrangebase.Request{ &LokiRequest{ StartTs: time.Unix(0, 0), EndTs: time.Unix(0, 60*time.Minute.Nanoseconds()), Step: 15 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, }, - interval: 24 * time.Hour, + splitInterval: 24 * time.Hour, }, { input: &LokiRequest{ StartTs: time.Unix(0, 0), EndTs: time.Unix(60*60, 0), Step: 15 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, expected: []queryrangebase.Request{ &LokiRequest{ StartTs: time.Unix(0, 0), EndTs: time.Unix(60*60, 0), Step: 15 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, }, - interval: 3 * time.Hour, + splitInterval: 3 * time.Hour, }, { input: &LokiRequest{ StartTs: time.Unix(0, 0), EndTs: time.Unix(24*3600, 0), Step: 15 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, expected: []queryrangebase.Request{ &LokiRequest{ StartTs: time.Unix(0, 0), EndTs: time.Unix(24*3600, 0), Step: 15 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, }, - interval: 24 * time.Hour, + splitInterval: 24 * time.Hour, }, { input: &LokiRequest{ StartTs: time.Unix(0, 0), EndTs: time.Unix(3*3600, 0), Step: 15 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, expected: []queryrangebase.Request{ &LokiRequest{ StartTs: time.Unix(0, 0), EndTs: time.Unix(3*3600, 0), Step: 15 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, }, - interval: 3 * time.Hour, + splitInterval: 3 * time.Hour, }, { input: &LokiRequest{ StartTs: time.Unix(0, 0), EndTs: time.Unix(2*24*3600, 0), Step: 15 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, expected: []queryrangebase.Request{ &LokiRequest{ StartTs: time.Unix(0, 0), EndTs: time.Unix((24*3600)-15, 0), Step: 15 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, &LokiRequest{ StartTs: time.Unix((24 * 3600), 0), EndTs: time.Unix((2 * 24 * 3600), 0), Step: 15 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, }, - interval: 24 * time.Hour, + splitInterval: 24 * time.Hour, }, { input: &LokiRequest{ StartTs: time.Unix(0, 0), EndTs: time.Unix(2*3*3600, 0), Step: 15 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, expected: []queryrangebase.Request{ &LokiRequest{ StartTs: time.Unix(0, 0), EndTs: time.Unix((3*3600)-15, 0), Step: 15 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, &LokiRequest{ StartTs: time.Unix((3 * 3600), 0), EndTs: time.Unix((2 * 3 * 3600), 0), Step: 15 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, }, - interval: 3 * time.Hour, + splitInterval: 3 * time.Hour, }, { input: &LokiRequest{ StartTs: time.Unix(3*3600, 0), EndTs: time.Unix(3*24*3600, 0), Step: 15 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, expected: []queryrangebase.Request{ &LokiRequest{ StartTs: time.Unix(3*3600, 0), EndTs: time.Unix((24*3600)-15, 0), Step: 15 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, &LokiRequest{ StartTs: time.Unix(24*3600, 0), EndTs: time.Unix((2*24*3600)-15, 0), Step: 15 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, &LokiRequest{ StartTs: time.Unix(2*24*3600, 0), EndTs: time.Unix(3*24*3600, 0), Step: 15 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, }, - interval: 24 * time.Hour, + splitInterval: 24 * time.Hour, }, { input: &LokiRequest{ StartTs: time.Unix(2*3600, 0), EndTs: time.Unix(3*3*3600, 0), Step: 15 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, expected: []queryrangebase.Request{ &LokiRequest{ StartTs: time.Unix(2*3600, 0), EndTs: time.Unix((3*3600)-15, 0), Step: 15 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, &LokiRequest{ StartTs: time.Unix(3*3600, 0), EndTs: time.Unix((2*3*3600)-15, 0), Step: 15 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, &LokiRequest{ StartTs: time.Unix(2*3*3600, 0), EndTs: time.Unix(3*3*3600, 0), Step: 15 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, }, - interval: 3 * time.Hour, + splitInterval: 3 * time.Hour, }, // step not a multiple of interval @@ -413,29 +627,29 @@ func Test_splitMetricQuery(t *testing.T) { StartTs: time.Unix(2*3600-9, 0), // 2h mod 17s = 9s EndTs: time.Unix(3*3*3600, 0), Step: 17 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, expected: []queryrangebase.Request{ &LokiRequest{ StartTs: time.Unix(2*3600-9, 0), EndTs: time.Unix((3*3600)-5, 0), // 3h mod 17s = 5s Step: 17 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, &LokiRequest{ StartTs: time.Unix((3*3600)+12, 0), EndTs: time.Unix((2*3*3600)-10, 0), // 6h mod 17s = 10s Step: 17 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, &LokiRequest{ StartTs: time.Unix(2*3*3600+7, 0), EndTs: time.Unix(3*3*3600+2, 0), // 9h mod 17s = 2s Step: 17 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, }, - interval: 3 * time.Hour, + splitInterval: 3 * time.Hour, }, // end time already step aligned { @@ -443,29 +657,29 @@ func Test_splitMetricQuery(t *testing.T) { StartTs: time.Unix(2*3600, 0), EndTs: time.Unix(3*3*3600+2, 0), // 9h mod 17s = 2s Step: 17 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, expected: []queryrangebase.Request{ &LokiRequest{ StartTs: time.Unix(2*3600-9, 0), // 2h mod 17s = 9s EndTs: time.Unix((3*3600)-5, 0), // 3h mod 17s = 5s Step: 17 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, &LokiRequest{ StartTs: time.Unix((3*3600)+12, 0), EndTs: time.Unix((2*3*3600)-10, 0), // 6h mod 17s = 10s Step: 17 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, &LokiRequest{ StartTs: time.Unix(2*3*3600+7, 0), EndTs: time.Unix(3*3*3600+2, 0), Step: 17 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, }, - interval: 3 * time.Hour, + splitInterval: 3 * time.Hour, }, // start & end time not aligned with step { @@ -473,29 +687,29 @@ func Test_splitMetricQuery(t *testing.T) { StartTs: time.Unix(2*3600, 0), EndTs: time.Unix(3*3*3600, 0), Step: 17 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, expected: []queryrangebase.Request{ &LokiRequest{ StartTs: time.Unix(2*3600-9, 0), // 2h mod 17s = 9s EndTs: time.Unix((3*3600)-5, 0), // 3h mod 17s = 5s Step: 17 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, &LokiRequest{ StartTs: time.Unix((3*3600)+12, 0), EndTs: time.Unix((2*3*3600)-10, 0), // 6h mod 17s = 10s Step: 17 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, &LokiRequest{ StartTs: time.Unix(2*3*3600+7, 0), EndTs: time.Unix(3*3*3600+2, 0), // 9h mod 17s = 2s Step: 17 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, }, - interval: 3 * time.Hour, + splitInterval: 3 * time.Hour, }, // step larger than split interval @@ -504,58 +718,58 @@ func Test_splitMetricQuery(t *testing.T) { StartTs: time.Unix(0, 0), EndTs: time.Unix(25*3600, 0), Step: 6 * 3600 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, expected: []queryrangebase.Request{ &LokiRequest{ StartTs: time.Unix(0, 0), EndTs: time.Unix(6*3600, 0), Step: 6 * 3600 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, &LokiRequest{ StartTs: time.Unix(6*3600, 0), EndTs: time.Unix(12*3600, 0), Step: 6 * 3600 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, &LokiRequest{ StartTs: time.Unix(12*3600, 0), EndTs: time.Unix(18*3600, 0), Step: 6 * 3600 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, &LokiRequest{ StartTs: time.Unix(18*3600, 0), EndTs: time.Unix(24*3600, 0), Step: 6 * 3600 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, &LokiRequest{ StartTs: time.Unix(24*3600, 0), EndTs: time.Unix(30*3600, 0), Step: 6 * 3600 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, }, - interval: 15 * time.Minute, + splitInterval: 15 * time.Minute, }, { input: &LokiRequest{ StartTs: time.Unix(1*3600, 0), EndTs: time.Unix(3*3600, 0), Step: 6 * 3600 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, expected: []queryrangebase.Request{ &LokiRequest{ StartTs: time.Unix(0, 0), EndTs: time.Unix(6*3600, 0), Step: 6 * 3600 * seconds, - Query: `rate({app="foo"}[1m])`, + Query: shortRange, }, }, - interval: 15 * time.Minute, + splitInterval: 15 * time.Minute, }, // reduce split by to 6h instead of 1h { @@ -579,7 +793,7 @@ func Test_splitMetricQuery(t *testing.T) { Query: `rate({app="foo"}[6h])`, }, }, - interval: 1 * time.Hour, + splitInterval: 1 * time.Hour, }, // range vector too large we don't want to split it { @@ -587,17 +801,222 @@ func Test_splitMetricQuery(t *testing.T) { StartTs: time.Unix(2*3600, 0), EndTs: time.Unix(3*3*3600, 0), Step: 15 * seconds, - Query: `rate({app="foo"}[7d])`, + Query: longRange, }, expected: []queryrangebase.Request{ &LokiRequest{ StartTs: time.Unix(2*3600, 0), EndTs: time.Unix(3*3*3600, 0), Step: 15 * seconds, - Query: `rate({app="foo"}[7d])`, + Query: longRange, + }, + }, + splitInterval: 15 * time.Minute, + }, + // query is wholly within ingester query window + { + input: &LokiRequest{ + StartTs: refTime.Add(-time.Hour), + EndTs: refTime, + Step: 15 * seconds, + Query: shortRange, + }, + expected: []queryrangebase.Request{ + &LokiRequest{ + StartTs: time.Date(2023, 1, 15, 7, 05, 30, 0, time.UTC), // start time is aligned down to step of 15s + EndTs: time.Date(2023, 1, 15, 7, 29, 45, 0, time.UTC), + Step: 15 * seconds, + Query: shortRange, + }, + &LokiRequest{ + StartTs: time.Date(2023, 1, 15, 7, 30, 0, 0, time.UTC), + EndTs: time.Date(2023, 1, 15, 8, 5, 45, 0, time.UTC), // end time is aligned up to step of 15s + Step: 15 * seconds, + Query: shortRange, }, }, - interval: 15 * time.Minute, + splitInterval: time.Hour, + splitter: newMetricQuerySplitter( + fakeLimits{ingesterSplitDuration: map[string]time.Duration{tenantID: 90 * time.Minute}}, + ingesterQueryOpts{queryIngestersWithin: 3 * time.Hour}, + ), + }, + // query is partially within ingester query window + { + input: &LokiRequest{ + StartTs: refTime.Add(-4 * time.Hour).Add(-30 * time.Minute), + EndTs: refTime, + Step: 15 * seconds, + Query: shortRange, + }, + expected: []queryrangebase.Request{ + // regular intervals until `query_ingesters_within` window + &LokiRequest{ + StartTs: time.Date(2023, 1, 15, 3, 35, 30, 0, time.UTC), + EndTs: time.Date(2023, 1, 15, 3, 59, 45, 0, time.UTC), + Step: 15 * seconds, + Query: shortRange, + }, + &LokiRequest{ + StartTs: time.Date(2023, 1, 15, 4, 0, 0, 0, time.UTC), + EndTs: time.Date(2023, 1, 15, 4, 59, 45, 0, time.UTC), + Step: 15 * seconds, + Query: shortRange, + }, + &LokiRequest{ + StartTs: time.Date(2023, 1, 15, 5, 0, 0, 0, time.UTC), + EndTs: time.Date(2023, 1, 15, 5, 5, 15, 0, time.UTC), + Step: 15 * seconds, + Query: shortRange, + }, + // and then different intervals for queries to ingesters + &LokiRequest{ + StartTs: time.Date(2023, 1, 15, 5, 5, 30, 0, time.UTC), + EndTs: time.Date(2023, 1, 15, 5, 59, 45, 0, time.UTC), + Step: 15 * seconds, + Query: shortRange, + }, + &LokiRequest{ + StartTs: time.Date(2023, 1, 15, 6, 0, 0, 0, time.UTC), + EndTs: time.Date(2023, 1, 15, 7, 29, 45, 0, time.UTC), + Step: 15 * seconds, + Query: shortRange, + }, + &LokiRequest{ + StartTs: time.Date(2023, 1, 15, 7, 30, 0, 0, time.UTC), + EndTs: time.Date(2023, 1, 15, 8, 5, 45, 0, time.UTC), + Step: 15 * seconds, + Query: shortRange, + }, + }, + splitInterval: time.Hour, + splitter: newMetricQuerySplitter( + fakeLimits{ingesterSplitDuration: map[string]time.Duration{tenantID: 90 * time.Minute}}, + ingesterQueryOpts{queryIngestersWithin: 3 * time.Hour}, + ), + }, + // not within ingester query window + { + input: &LokiRequest{ + StartTs: refTime.Add(-5 * time.Hour), + EndTs: refTime.Add(-4 * time.Hour), + Step: 15 * seconds, + Query: shortRange, + }, + expected: []queryrangebase.Request{ + // regular intervals until `query_ingesters_within` window + &LokiRequest{ + StartTs: time.Date(2023, 1, 15, 3, 5, 30, 0, time.UTC), + EndTs: time.Date(2023, 1, 15, 3, 59, 45, 0, time.UTC), + Step: 15 * seconds, + Query: shortRange, + }, + &LokiRequest{ + StartTs: time.Date(2023, 1, 15, 4, 0, 0, 0, time.UTC), + EndTs: time.Date(2023, 1, 15, 4, 5, 45, 0, time.UTC), + Step: 15 * seconds, + Query: shortRange, + }, + }, + splitInterval: time.Hour, + splitter: newMetricQuerySplitter( + fakeLimits{ingesterSplitDuration: map[string]time.Duration{tenantID: 90 * time.Minute}}, + ingesterQueryOpts{queryIngestersWithin: 3 * time.Hour}, + ), + }, + // ingester query split by disabled + { + input: &LokiRequest{ + StartTs: refTime.Add(-4 * time.Hour), + EndTs: refTime, + Step: 15 * seconds, + Query: shortRange, + }, + expected: []queryrangebase.Request{ + // regular intervals only, since ingester split duration is 0 + &LokiRequest{ + StartTs: time.Date(2023, 1, 15, 4, 5, 30, 0, time.UTC), + EndTs: time.Date(2023, 1, 15, 4, 59, 45, 0, time.UTC), + Step: 15 * seconds, + Query: shortRange, + }, + &LokiRequest{ + StartTs: time.Date(2023, 1, 15, 5, 0, 0, 0, time.UTC), + EndTs: time.Date(2023, 1, 15, 5, 59, 45, 0, time.UTC), + Step: 15 * seconds, + Query: shortRange, + }, + &LokiRequest{ + StartTs: time.Date(2023, 1, 15, 6, 0, 0, 0, time.UTC), + EndTs: time.Date(2023, 1, 15, 6, 59, 45, 0, time.UTC), + Step: 15 * seconds, + Query: shortRange, + }, + &LokiRequest{ + StartTs: time.Date(2023, 1, 15, 7, 0, 0, 0, time.UTC), + EndTs: time.Date(2023, 1, 15, 7, 59, 45, 0, time.UTC), + Step: 15 * seconds, + Query: shortRange, + }, + &LokiRequest{ + StartTs: time.Date(2023, 1, 15, 8, 0, 0, 0, time.UTC), + EndTs: time.Date(2023, 1, 15, 8, 5, 45, 0, time.UTC), + Step: 15 * seconds, + Query: shortRange, + }, + }, + splitInterval: time.Hour, + splitter: newMetricQuerySplitter( + fakeLimits{ingesterSplitDuration: map[string]time.Duration{tenantID: 0}}, + ingesterQueryOpts{queryIngestersWithin: 3 * time.Hour}, + ), + }, + // ingester query split by enabled, but query_store_only is enabled too + { + input: &LokiRequest{ + StartTs: refTime.Add(-4 * time.Hour), + EndTs: refTime, + Step: 15 * seconds, + Query: shortRange, + }, + expected: []queryrangebase.Request{ + // regular intervals only, since ingester split duration is 0 + &LokiRequest{ + StartTs: time.Date(2023, 1, 15, 4, 5, 30, 0, time.UTC), + EndTs: time.Date(2023, 1, 15, 4, 59, 45, 0, time.UTC), + Step: 15 * seconds, + Query: shortRange, + }, + &LokiRequest{ + StartTs: time.Date(2023, 1, 15, 5, 0, 0, 0, time.UTC), + EndTs: time.Date(2023, 1, 15, 5, 59, 45, 0, time.UTC), + Step: 15 * seconds, + Query: shortRange, + }, + &LokiRequest{ + StartTs: time.Date(2023, 1, 15, 6, 0, 0, 0, time.UTC), + EndTs: time.Date(2023, 1, 15, 6, 59, 45, 0, time.UTC), + Step: 15 * seconds, + Query: shortRange, + }, + &LokiRequest{ + StartTs: time.Date(2023, 1, 15, 7, 0, 0, 0, time.UTC), + EndTs: time.Date(2023, 1, 15, 7, 59, 45, 0, time.UTC), + Step: 15 * seconds, + Query: shortRange, + }, + &LokiRequest{ + StartTs: time.Date(2023, 1, 15, 8, 0, 0, 0, time.UTC), + EndTs: time.Date(2023, 1, 15, 8, 5, 45, 0, time.UTC), + Step: 15 * seconds, + Query: shortRange, + }, + }, + splitInterval: time.Hour, + splitter: newMetricQuerySplitter( + fakeLimits{ingesterSplitDuration: map[string]time.Duration{tenantID: 90 * time.Minute}}, + ingesterQueryOpts{queryIngestersWithin: 3 * time.Hour, queryStoreOnly: true}, + ), }, } { // Set query plans @@ -612,13 +1031,29 @@ func Test_splitMetricQuery(t *testing.T) { } t.Run(strconv.Itoa(i), func(t *testing.T) { - splits, err := splitMetricByTime(tc.input, tc.interval) + ms := newMetricQuerySplitter(fakeLimits{}, nil) + if tc.splitter != nil { + ms = tc.splitter.(*metricQuerySplitter) + } + + splits, err := ms.split(refTime, []string{tenantID}, tc.input, tc.splitInterval) require.NoError(t, err) - for i, s := range splits { - s := s.(*LokiRequest) - t.Logf(" want: %d start:%s end:%s \n", i, s.StartTs, s.EndTs) + if !assert.Equal(t, tc.expected, splits) { + t.Logf("expected and actual do not match\n") + defer t.Fail() + + if len(tc.expected) != len(splits) { + t.Logf("expected %d splits, got %d\n", len(tc.expected), len(splits)) + return + } + + for j := 0; j < len(tc.expected); j++ { + exp := tc.expected[j] + act := splits[j] + equal := assert.Equal(t, exp, act) + t.Logf("\t#%d [matches: %v]: expected %q/%q got %q/%q\n", j, equal, exp.GetStart(), exp.GetEnd(), act.GetStart(), act.GetEnd()) + } } - require.Equal(t, tc.expected, splits) }) } @@ -646,12 +1081,13 @@ func Test_splitByInterval_Do(t *testing.T) { }, nil }) + defSplitter := newDefaultSplitter(fakeLimits{}, nil) l := WithSplitByLimits(fakeLimits{maxQueryParallelism: 1}, time.Hour) split := SplitByIntervalMiddleware( testSchemas, l, DefaultCodec, - splitByTime, + defSplitter, nilMetrics, ).Wrap(next) @@ -834,11 +1270,12 @@ func Test_series_splitByInterval_Do(t *testing.T) { "1": time.Hour, }, } + defSplitter := newDefaultSplitter(fakeLimits{}, nil) split := SplitByIntervalMiddleware( testSchemas, l, DefaultCodec, - splitByTime, + defSplitter, nilMetrics, ).Wrap(next) @@ -888,12 +1325,13 @@ func Test_series_splitByInterval_Do(t *testing.T) { func Test_seriesvolume_splitByInterval_Do(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "1") + defSplitter := newDefaultSplitter(fakeLimits{}, nil) setup := func(next queryrangebase.Handler, l Limits) queryrangebase.Handler { return SplitByIntervalMiddleware( testSchemas, l, DefaultCodec, - splitByTime, + defSplitter, nilMetrics, ).Wrap(next) } @@ -1050,11 +1488,12 @@ func Test_ExitEarly(t *testing.T) { }) l := WithSplitByLimits(fakeLimits{maxQueryParallelism: 1}, time.Hour) + defSplitter := newDefaultSplitter(fakeLimits{}, nil) split := SplitByIntervalMiddleware( testSchemas, l, DefaultCodec, - splitByTime, + defSplitter, nilMetrics, ).Wrap(next) @@ -1132,11 +1571,12 @@ func Test_DoesntDeadlock(t *testing.T) { }) l := WithSplitByLimits(fakeLimits{maxQueryParallelism: n}, time.Hour) + defSplitter := newDefaultSplitter(fakeLimits{}, nil) split := SplitByIntervalMiddleware( testSchemas, l, DefaultCodec, - splitByTime, + defSplitter, nilMetrics, ).Wrap(next) diff --git a/pkg/querier/queryrange/splitters.go b/pkg/querier/queryrange/splitters.go new file mode 100644 index 0000000000..79e3d5352e --- /dev/null +++ b/pkg/querier/queryrange/splitters.go @@ -0,0 +1,297 @@ +package queryrange + +import ( + "time" + + "github.com/prometheus/common/model" + + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" + "github.com/grafana/loki/pkg/util" + "github.com/grafana/loki/pkg/util/validation" +) + +type splitter interface { + split(execTime time.Time, tenantIDs []string, request queryrangebase.Request, interval time.Duration) ([]queryrangebase.Request, error) +} + +type defaultSplitter struct { + limits Limits + iqo util.IngesterQueryOptions +} + +func newDefaultSplitter(limits Limits, iqo util.IngesterQueryOptions) *defaultSplitter { + return &defaultSplitter{limits, iqo} +} + +func (s *defaultSplitter) split(execTime time.Time, tenantIDs []string, req queryrangebase.Request, interval time.Duration) ([]queryrangebase.Request, error) { + var ( + reqs []queryrangebase.Request + factory func(start, end time.Time) + endTimeInclusive = true + ) + + switch r := req.(type) { + case *LokiRequest: + endTimeInclusive = false + factory = func(start, end time.Time) { + reqs = append(reqs, &LokiRequest{ + Query: r.Query, + Limit: r.Limit, + Step: r.Step, + Interval: r.Interval, + Direction: r.Direction, + Path: r.Path, + StartTs: start, + EndTs: end, + Plan: r.Plan, + }) + } + case *LokiSeriesRequest: + // metadata queries have end time inclusive. + // Set endTimeInclusive to true so that ForInterval keeps a gap of 1ms between splits to + // avoid querying duplicate data in adjacent queries. + factory = func(start, end time.Time) { + reqs = append(reqs, &LokiSeriesRequest{ + Match: r.Match, + Path: r.Path, + StartTs: start, + EndTs: end, + Shards: r.Shards, + }) + } + case *LabelRequest: + // metadata queries have end time inclusive. + // Set endTimeInclusive to true so that ForInterval keeps a gap of 1ms between splits to + // avoid querying duplicate data in adjacent queries. + factory = func(start, end time.Time) { + reqs = append(reqs, NewLabelRequest(start, end, r.Query, r.Name, r.Path())) + } + case *logproto.IndexStatsRequest: + factory = func(start, end time.Time) { + reqs = append(reqs, &logproto.IndexStatsRequest{ + From: model.TimeFromUnix(start.Unix()), + Through: model.TimeFromUnix(end.Unix()), + Matchers: r.GetMatchers(), + }) + } + case *logproto.VolumeRequest: + factory = func(start, end time.Time) { + reqs = append(reqs, &logproto.VolumeRequest{ + From: model.TimeFromUnix(start.Unix()), + Through: model.TimeFromUnix(end.Unix()), + Matchers: r.GetMatchers(), + Limit: r.Limit, + TargetLabels: r.TargetLabels, + AggregateBy: r.AggregateBy, + }) + } + default: + return nil, nil + } + + var ( + ingesterSplits []queryrangebase.Request + origStart = req.GetStart().UTC() + origEnd = req.GetEnd().UTC() + ) + + start, end, needsIngesterSplits := ingesterQueryBounds(execTime, s.iqo, req) + + if ingesterQueryInterval := validation.MaxDurationPerTenant(tenantIDs, s.limits.IngesterQuerySplitDuration); ingesterQueryInterval != 0 && needsIngesterSplits { + // perform splitting using special interval (`split_ingester_queries_by_interval`) + util.ForInterval(ingesterQueryInterval, start, end, endTimeInclusive, factory) + + // rebound after ingester queries have been split out + end = start + start = req.GetStart().UTC() + if endTimeInclusive { + end = end.Add(-util.SplitGap) + } + + // query only overlaps ingester query window, nothing more to do + if start.After(end) || start.Equal(end) { + return reqs, nil + } + + // copy the splits, reset the results + ingesterSplits = reqs + reqs = nil + } else { + start = origStart + end = origEnd + } + + // perform splitting over the rest of the time range + util.ForInterval(interval, origStart, end, endTimeInclusive, factory) + + // move the ingester splits to the end to maintain correct order + reqs = append(reqs, ingesterSplits...) + return reqs, nil +} + +type metricQuerySplitter struct { + limits Limits + iqo util.IngesterQueryOptions +} + +func newMetricQuerySplitter(limits Limits, iqo util.IngesterQueryOptions) *metricQuerySplitter { + return &metricQuerySplitter{limits, iqo} +} + +// reduceSplitIntervalForRangeVector reduces the split interval for a range query based on the duration of the range vector. +// Large range vector durations will not be split into smaller intervals because it can cause the queries to be slow by over-processing data. +func (s *metricQuerySplitter) reduceSplitIntervalForRangeVector(r *LokiRequest, interval time.Duration) (time.Duration, error) { + maxRange, _, err := maxRangeVectorAndOffsetDuration(r.Plan.AST) + if err != nil { + return 0, err + } + if maxRange > interval { + return maxRange, nil + } + return interval, nil +} + +// Round up to the step before the next interval boundary. +func (s *metricQuerySplitter) nextIntervalBoundary(t time.Time, step int64, interval time.Duration) time.Time { + stepNs := step * 1e6 + nsPerInterval := interval.Nanoseconds() + startOfNextInterval := ((t.UnixNano() / nsPerInterval) + 1) * nsPerInterval + // ensure that target is a multiple of steps away from the start time + target := startOfNextInterval - ((startOfNextInterval - t.UnixNano()) % stepNs) + if target == startOfNextInterval { + target -= stepNs + } + return time.Unix(0, target) +} + +func (s *metricQuerySplitter) split(execTime time.Time, tenantIDs []string, r queryrangebase.Request, interval time.Duration) ([]queryrangebase.Request, error) { + var reqs []queryrangebase.Request + + lokiReq := r.(*LokiRequest) + + interval, err := s.reduceSplitIntervalForRangeVector(lokiReq, interval) + if err != nil { + return nil, err + } + + start, end := s.alignStartEnd(r.GetStep(), lokiReq.StartTs, lokiReq.EndTs) + + lokiReq = lokiReq.WithStartEnd(start, end).(*LokiRequest) + + factory := func(start, end time.Time) { + reqs = append(reqs, &LokiRequest{ + Query: lokiReq.Query, + Limit: lokiReq.Limit, + Step: lokiReq.Step, + Interval: lokiReq.Interval, + Direction: lokiReq.Direction, + Path: lokiReq.Path, + StartTs: start, + EndTs: end, + Plan: lokiReq.Plan, + }) + } + + // step is >= configured split interval, let us just split the query interval by step + // TODO this is likely buggy when step >= query range, how should we handle this? + if lokiReq.Step >= interval.Milliseconds() { + util.ForInterval(time.Duration(lokiReq.Step*1e6), lokiReq.StartTs, lokiReq.EndTs, false, factory) + + return reqs, nil + } + + var ( + ingesterSplits []queryrangebase.Request + needsIngesterSplits bool + ) + + origStart := start + origEnd := end + + start, end, needsIngesterSplits = ingesterQueryBounds(execTime, s.iqo, lokiReq) + start, end = s.alignStartEnd(r.GetStep(), start, end) + + if ingesterQueryInterval := validation.MaxDurationPerTenant(tenantIDs, s.limits.IngesterQuerySplitDuration); ingesterQueryInterval != 0 && needsIngesterSplits { + // perform splitting using special interval (`split_ingester_queries_by_interval`) + s.buildMetricSplits(lokiReq.GetStep(), ingesterQueryInterval, start, end, factory) + + // rebound after ingester queries have been split out + // + // the end time should now be the boundary of the `query_ingester_within` window, which is "start" currently; + // but since start is already step-aligned we need to subtract 1ns to align it down by 1 more step so that we + // get a consistent step between splits + end, _ = s.alignStartEnd(r.GetStep(), start.Add(-time.Nanosecond), end) + // we restore the previous start time (the start time of the query) + start = origStart + + // query only overlaps ingester query window, nothing more to do + if start.After(end) || start.Equal(end) { + return reqs, nil + } + + // copy the splits, reset the results + ingesterSplits = reqs + reqs = nil + } else { + start = origStart + end = origEnd + } + + // perform splitting over the rest of the time range + s.buildMetricSplits(lokiReq.GetStep(), interval, start, end, factory) + + // move the ingester splits to the end to maintain correct order + reqs = append(reqs, ingesterSplits...) + + return reqs, nil +} + +func (s *metricQuerySplitter) alignStartEnd(step int64, start, end time.Time) (time.Time, time.Time) { + // step align start and end time of the query. Start time is rounded down and end time is rounded up. + stepNs := step * 1e6 + startNs := start.UnixNano() + + endNs := end.UnixNano() + if mod := endNs % stepNs; mod != 0 { + endNs += stepNs - mod + } + + return time.Unix(0, startNs-startNs%stepNs), time.Unix(0, endNs) +} + +func (s *metricQuerySplitter) buildMetricSplits(step int64, interval time.Duration, start, end time.Time, factory func(start, end time.Time)) { + for splStart := start; splStart.Before(end); splStart = s.nextIntervalBoundary(splStart, step, interval).Add(time.Duration(step) * time.Millisecond) { + splEnd := s.nextIntervalBoundary(splStart, step, interval) + if splEnd.Add(time.Duration(step)*time.Millisecond).After(end) || splEnd.Add(time.Duration(step)*time.Millisecond) == end { + splEnd = end + } + factory(splStart, splEnd) + } +} + +// ingesterQueryBounds determines if we need to split time ranges overlapping the ingester query window (`query_ingesters_within`) +// and retrieve the bounds for those specific splits +func ingesterQueryBounds(execTime time.Time, iqo util.IngesterQueryOptions, req queryrangebase.Request) (time.Time, time.Time, bool) { + start, end := req.GetStart().UTC(), req.GetEnd().UTC() + + // ingesters are not queried, nothing to do + if iqo == nil || iqo.QueryStoreOnly() { + return start, end, false + } + + windowSize := iqo.QueryIngestersWithin() + ingesterWindow := execTime.UTC().Add(-windowSize) + + // clamp to the start time + if ingesterWindow.Before(start) { + ingesterWindow = start + } + + // query range does not overlap with ingester query window, nothing to do + if end.Before(ingesterWindow) { + return start, end, false + } + + return ingesterWindow, end, true +} diff --git a/pkg/util/config.go b/pkg/util/config.go index 6989931fb6..f54d469690 100644 --- a/pkg/util/config.go +++ b/pkg/util/config.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "strings" + "time" "github.com/go-kit/log/level" "github.com/prometheus/common/version" @@ -38,3 +39,10 @@ func PrintConfig(w io.Writer, config interface{}) error { fmt.Fprintf(w, "---\n# Loki Config\n# %s\n%s\n\n", version.Info(), string(lc)) return nil } + +// IngesterQueryOptions exists because querier.Config cannot be passed directly to the queryrange package +// due to an import cycle. +type IngesterQueryOptions interface { + QueryStoreOnly() bool + QueryIngestersWithin() time.Duration +} diff --git a/pkg/util/time.go b/pkg/util/time.go index 8f9e0c01b0..b943fea92a 100644 --- a/pkg/util/time.go +++ b/pkg/util/time.go @@ -87,6 +87,8 @@ func NewDisableableTicker(interval time.Duration) (func(), <-chan time.Time) { return func() { tick.Stop() }, tick.C } +const SplitGap = time.Millisecond + // ForInterval splits the given start and end time into given interval. // The start and end time in splits would be aligned to the interval // except for the start time of first split and end time of last split which would be kept same as original start/end @@ -107,7 +109,7 @@ func ForInterval(interval time.Duration, start, end time.Time, endTimeInclusive if !newEnd.Before(end) { newEnd = end } else if endTimeInclusive { - newEnd = newEnd.Add(-time.Millisecond) + newEnd = newEnd.Add(-SplitGap) } if firstInterval { callback(ogStart, newEnd) diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 7f1f6ea0d7..d846cfed51 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -106,6 +106,7 @@ type Limits struct { // Query frontend enforced limits. The default is actually parameterized by the queryrange config. QuerySplitDuration model.Duration `yaml:"split_queries_by_interval" json:"split_queries_by_interval"` MetadataQuerySplitDuration model.Duration `yaml:"split_metadata_queries_by_interval" json:"split_metadata_queries_by_interval"` + IngesterQuerySplitDuration model.Duration `yaml:"split_ingester_queries_by_interval" json:"split_ingester_queries_by_interval"` MinShardingLookback model.Duration `yaml:"min_sharding_lookback" json:"min_sharding_lookback"` MaxQueryBytesRead flagext.ByteSize `yaml:"max_query_bytes_read" json:"max_query_bytes_read"` MaxQuerierBytesRead flagext.ByteSize `yaml:"max_querier_bytes_read" json:"max_querier_bytes_read"` @@ -299,6 +300,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { _ = l.MetadataQuerySplitDuration.Set("24h") f.Var(&l.MetadataQuerySplitDuration, "querier.split-metadata-queries-by-interval", "Split metadata queries by a time interval and execute in parallel. The value 0 disables splitting metadata queries by time. This also determines how cache keys are chosen when label/series result caching is enabled.") + _ = l.IngesterQuerySplitDuration.Set("0s") + f.Var(&l.IngesterQuerySplitDuration, "querier.split-ingester-queries-by-interval", "Interval to use for time-based splitting when a request is within the `query_ingesters_within` window; defaults to `split-queries-by-interval` by setting to 0.") + f.StringVar(&l.DeletionMode, "compactor.deletion-mode", "filter-and-delete", "Deletion mode. Can be one of 'disabled', 'filter-only', or 'filter-and-delete'. When set to 'filter-only' or 'filter-and-delete', and if retention_enabled is true, then the log entry deletion API endpoints are available.") // Deprecated @@ -574,6 +578,12 @@ func (o *Overrides) MetadataQuerySplitDuration(userID string) time.Duration { return time.Duration(o.getOverridesForUser(userID).MetadataQuerySplitDuration) } +// IngesterQuerySplitDuration returns the tenant specific splitby interval applied in the query frontend when querying +// during the `query_ingesters_within` window. +func (o *Overrides) IngesterQuerySplitDuration(userID string) time.Duration { + return time.Duration(o.getOverridesForUser(userID).IngesterQuerySplitDuration) +} + // MaxQueryBytesRead returns the maximum bytes a query can read. func (o *Overrides) MaxQueryBytesRead(_ context.Context, userID string) int { return o.getOverridesForUser(userID).MaxQueryBytesRead.Val()