diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index b7101f4796..eb9367fe3e 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -438,7 +438,6 @@ func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) { util_log.Logger, t.overrides, t.Cfg.SchemaConfig.SchemaConfig, - t.Cfg.Querier.QueryIngestersWithin, prometheus.DefaultRegisterer, ) if err != nil { diff --git a/pkg/querier/queryrange/limits_test.go b/pkg/querier/queryrange/limits_test.go index ede339bb8c..d330050fa6 100644 --- a/pkg/querier/queryrange/limits_test.go +++ b/pkg/querier/queryrange/limits_test.go @@ -55,7 +55,7 @@ func Test_seriesLimiter(t *testing.T) { cfg.SplitQueriesByInterval = time.Hour cfg.CacheResults = false // split in 7 with 2 in // max. - tpw, stopper, err := NewTripperware(cfg, util_log.Logger, fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, chunk.SchemaConfig{}, 0, nil) + tpw, stopper, err := NewTripperware(cfg, util_log.Logger, fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, chunk.SchemaConfig{}, nil) if stopper != nil { defer stopper.Stop() } @@ -210,7 +210,7 @@ func Test_MaxQueryParallelismLateScheduling(t *testing.T) { func Test_MaxQueryLookBack(t *testing.T) { tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{ maxQueryLookback: 1 * time.Hour, - }, chunk.SchemaConfig{}, 0, nil) + }, chunk.SchemaConfig{}, nil) if stopper != nil { defer stopper.Stop() } diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index c8e63e4e08..fc9ddd225b 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -1,7 +1,6 @@ package queryrange import ( - "errors" "flag" "net/http" "strings" @@ -42,7 +41,6 @@ func NewTripperware( log log.Logger, limits Limits, schema chunk.SchemaConfig, - minShardingLookback time.Duration, registerer prometheus.Registerer, ) (queryrange.Tripperware, Stopper, error) { // Ensure that QuerySplitDuration uses configuration defaults. @@ -54,7 +52,7 @@ func NewTripperware( shardingMetrics := logql.NewShardingMetrics(registerer) splitByMetrics := NewSplitByMetrics(registerer) - metricsTripperware, cache, err := NewMetricTripperware(cfg, log, limits, schema, minShardingLookback, LokiCodec, + metricsTripperware, cache, err := NewMetricTripperware(cfg, log, limits, schema, LokiCodec, PrometheusExtractor{}, instrumentMetrics, retryMetrics, shardingMetrics, splitByMetrics, registerer) if err != nil { return nil, nil, err @@ -62,7 +60,7 @@ func NewTripperware( // NOTE: When we would start caching response from non-metric queries we would have to consider cache gen headers as well in // MergeResponse implementation for Loki codecs same as it is done in Cortex at https://github.com/cortexproject/cortex/blob/21bad57b346c730d684d6d0205efef133422ab28/pkg/querier/queryrange/query_range.go#L170 - logFilterTripperware, err := NewLogFilterTripperware(cfg, log, limits, schema, minShardingLookback, LokiCodec, instrumentMetrics, retryMetrics, shardingMetrics, splitByMetrics) + logFilterTripperware, err := NewLogFilterTripperware(cfg, log, limits, schema, LokiCodec, instrumentMetrics, retryMetrics, shardingMetrics, splitByMetrics) if err != nil { return nil, nil, err } @@ -137,6 +135,7 @@ func (r roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { if err := validateLimits(req, rangeQuery.Limit, r.limits); err != nil { return nil, err } + // Only filter expressions are query sharded if !expr.HasFilter() { return r.next.RoundTrip(req) } @@ -239,7 +238,6 @@ func NewLogFilterTripperware( log log.Logger, limits Limits, schema chunk.SchemaConfig, - minShardingLookback time.Duration, codec queryrange.Codec, instrumentMetrics *queryrange.InstrumentMiddlewareMetrics, retryMiddlewareMetrics *queryrange.RetryMiddlewareMetrics, @@ -252,9 +250,6 @@ func NewLogFilterTripperware( } if cfg.ShardedQueries { - if minShardingLookback == 0 { - return nil, errors.New("a non-zero value is required for querier.query-ingesters-within when -querier.parallelise-shardable-queries is enabled") - } queryRangeMiddleware = append(queryRangeMiddleware, NewQueryShardMiddleware( log, @@ -363,7 +358,6 @@ func NewMetricTripperware( log log.Logger, limits Limits, schema chunk.SchemaConfig, - minShardingLookback time.Duration, codec queryrange.Codec, extractor queryrange.Extractor, instrumentMetrics *queryrange.InstrumentMiddlewareMetrics, @@ -414,9 +408,6 @@ func NewMetricTripperware( } if cfg.ShardedQueries { - if minShardingLookback == 0 { - return nil, nil, errors.New("a non-zero value is required for querier.query-ingesters-within when -querier.parallelise-shardable-queries is enabled") - } queryRangeMiddleware = append(queryRangeMiddleware, NewQueryShardMiddleware( log, diff --git a/pkg/querier/queryrange/roundtrip_test.go b/pkg/querier/queryrange/roundtrip_test.go index 3601b38246..9a076ff4ce 100644 --- a/pkg/querier/queryrange/roundtrip_test.go +++ b/pkg/querier/queryrange/roundtrip_test.go @@ -109,7 +109,7 @@ var ( // those tests are mostly for testing the glue between all component and make sure they activate correctly. func TestMetricsTripperware(t *testing.T) { - tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxSeries: math.MaxInt32}, chunk.SchemaConfig{}, 0, nil) + tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxSeries: math.MaxInt32}, chunk.SchemaConfig{}, nil) if stopper != nil { defer stopper.Stop() } @@ -172,7 +172,7 @@ func TestMetricsTripperware(t *testing.T) { } func TestLogFilterTripperware(t *testing.T) { - tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, 0, nil) + tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, nil) if stopper != nil { defer stopper.Stop() } @@ -221,7 +221,7 @@ func TestLogFilterTripperware(t *testing.T) { func TestInstantQueryTripperware(t *testing.T) { testShardingConfig := testConfig testShardingConfig.ShardedQueries = true - tpw, stopper, err := NewTripperware(testShardingConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, 1*time.Second, nil) + tpw, stopper, err := NewTripperware(testShardingConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, nil) if stopper != nil { defer stopper.Stop() } @@ -257,7 +257,7 @@ func TestInstantQueryTripperware(t *testing.T) { } func TestSeriesTripperware(t *testing.T) { - tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour}, chunk.SchemaConfig{}, 0, nil) + tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour}, chunk.SchemaConfig{}, nil) if stopper != nil { defer stopper.Stop() } @@ -298,7 +298,7 @@ func TestSeriesTripperware(t *testing.T) { } func TestLabelsTripperware(t *testing.T) { - tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour}, chunk.SchemaConfig{}, 0, nil) + tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour}, chunk.SchemaConfig{}, nil) if stopper != nil { defer stopper.Stop() } @@ -344,7 +344,7 @@ func TestLabelsTripperware(t *testing.T) { } func TestLogNoRegex(t *testing.T) { - tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, 0, nil) + tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, nil) if stopper != nil { defer stopper.Stop() } @@ -378,7 +378,7 @@ func TestLogNoRegex(t *testing.T) { } func TestUnhandledPath(t *testing.T) { - tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, 0, nil) + tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, nil) if stopper != nil { defer stopper.Stop() } @@ -402,7 +402,7 @@ func TestUnhandledPath(t *testing.T) { } func TestRegexpParamsSupport(t *testing.T) { - tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, 0, nil) + tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, nil) if stopper != nil { defer stopper.Stop() } @@ -485,7 +485,7 @@ func TestPostQueries(t *testing.T) { } func TestEntriesLimitsTripperware(t *testing.T) { - tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxEntriesLimitPerQuery: 5000}, chunk.SchemaConfig{}, 0, nil) + tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxEntriesLimitPerQuery: 5000}, chunk.SchemaConfig{}, nil) if stopper != nil { defer stopper.Stop() } @@ -516,7 +516,7 @@ func TestEntriesLimitsTripperware(t *testing.T) { } func TestEntriesLimitWithZeroTripperware(t *testing.T) { - tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, 0, nil) + tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, nil) if stopper != nil { defer stopper.Stop() }