Remove querier.query-ingesters-within check when sharding is enabled (#5024)

pull/4938/merge
Susana Ferreira 3 years ago committed by GitHub
parent 1b16bb3d1d
commit 0c2ed01d92
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      pkg/loki/modules.go
  2. 4
      pkg/querier/queryrange/limits_test.go
  3. 15
      pkg/querier/queryrange/roundtrip.go
  4. 20
      pkg/querier/queryrange/roundtrip_test.go

@ -438,7 +438,6 @@ func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) {
util_log.Logger,
t.overrides,
t.Cfg.SchemaConfig.SchemaConfig,
t.Cfg.Querier.QueryIngestersWithin,
prometheus.DefaultRegisterer,
)
if err != nil {

@ -55,7 +55,7 @@ func Test_seriesLimiter(t *testing.T) {
cfg.SplitQueriesByInterval = time.Hour
cfg.CacheResults = false
// split in 7 with 2 in // max.
tpw, stopper, err := NewTripperware(cfg, util_log.Logger, fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, chunk.SchemaConfig{}, 0, nil)
tpw, stopper, err := NewTripperware(cfg, util_log.Logger, fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, chunk.SchemaConfig{}, nil)
if stopper != nil {
defer stopper.Stop()
}
@ -210,7 +210,7 @@ func Test_MaxQueryParallelismLateScheduling(t *testing.T) {
func Test_MaxQueryLookBack(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{
maxQueryLookback: 1 * time.Hour,
}, chunk.SchemaConfig{}, 0, nil)
}, chunk.SchemaConfig{}, nil)
if stopper != nil {
defer stopper.Stop()
}

@ -1,7 +1,6 @@
package queryrange
import (
"errors"
"flag"
"net/http"
"strings"
@ -42,7 +41,6 @@ func NewTripperware(
log log.Logger,
limits Limits,
schema chunk.SchemaConfig,
minShardingLookback time.Duration,
registerer prometheus.Registerer,
) (queryrange.Tripperware, Stopper, error) {
// Ensure that QuerySplitDuration uses configuration defaults.
@ -54,7 +52,7 @@ func NewTripperware(
shardingMetrics := logql.NewShardingMetrics(registerer)
splitByMetrics := NewSplitByMetrics(registerer)
metricsTripperware, cache, err := NewMetricTripperware(cfg, log, limits, schema, minShardingLookback, LokiCodec,
metricsTripperware, cache, err := NewMetricTripperware(cfg, log, limits, schema, LokiCodec,
PrometheusExtractor{}, instrumentMetrics, retryMetrics, shardingMetrics, splitByMetrics, registerer)
if err != nil {
return nil, nil, err
@ -62,7 +60,7 @@ func NewTripperware(
// NOTE: When we would start caching response from non-metric queries we would have to consider cache gen headers as well in
// MergeResponse implementation for Loki codecs same as it is done in Cortex at https://github.com/cortexproject/cortex/blob/21bad57b346c730d684d6d0205efef133422ab28/pkg/querier/queryrange/query_range.go#L170
logFilterTripperware, err := NewLogFilterTripperware(cfg, log, limits, schema, minShardingLookback, LokiCodec, instrumentMetrics, retryMetrics, shardingMetrics, splitByMetrics)
logFilterTripperware, err := NewLogFilterTripperware(cfg, log, limits, schema, LokiCodec, instrumentMetrics, retryMetrics, shardingMetrics, splitByMetrics)
if err != nil {
return nil, nil, err
}
@ -137,6 +135,7 @@ func (r roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
if err := validateLimits(req, rangeQuery.Limit, r.limits); err != nil {
return nil, err
}
// Only filter expressions are query sharded
if !expr.HasFilter() {
return r.next.RoundTrip(req)
}
@ -239,7 +238,6 @@ func NewLogFilterTripperware(
log log.Logger,
limits Limits,
schema chunk.SchemaConfig,
minShardingLookback time.Duration,
codec queryrange.Codec,
instrumentMetrics *queryrange.InstrumentMiddlewareMetrics,
retryMiddlewareMetrics *queryrange.RetryMiddlewareMetrics,
@ -252,9 +250,6 @@ func NewLogFilterTripperware(
}
if cfg.ShardedQueries {
if minShardingLookback == 0 {
return nil, errors.New("a non-zero value is required for querier.query-ingesters-within when -querier.parallelise-shardable-queries is enabled")
}
queryRangeMiddleware = append(queryRangeMiddleware,
NewQueryShardMiddleware(
log,
@ -363,7 +358,6 @@ func NewMetricTripperware(
log log.Logger,
limits Limits,
schema chunk.SchemaConfig,
minShardingLookback time.Duration,
codec queryrange.Codec,
extractor queryrange.Extractor,
instrumentMetrics *queryrange.InstrumentMiddlewareMetrics,
@ -414,9 +408,6 @@ func NewMetricTripperware(
}
if cfg.ShardedQueries {
if minShardingLookback == 0 {
return nil, nil, errors.New("a non-zero value is required for querier.query-ingesters-within when -querier.parallelise-shardable-queries is enabled")
}
queryRangeMiddleware = append(queryRangeMiddleware,
NewQueryShardMiddleware(
log,

@ -109,7 +109,7 @@ var (
// those tests are mostly for testing the glue between all component and make sure they activate correctly.
func TestMetricsTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxSeries: math.MaxInt32}, chunk.SchemaConfig{}, 0, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxSeries: math.MaxInt32}, chunk.SchemaConfig{}, nil)
if stopper != nil {
defer stopper.Stop()
}
@ -172,7 +172,7 @@ func TestMetricsTripperware(t *testing.T) {
}
func TestLogFilterTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, 0, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, nil)
if stopper != nil {
defer stopper.Stop()
}
@ -221,7 +221,7 @@ func TestLogFilterTripperware(t *testing.T) {
func TestInstantQueryTripperware(t *testing.T) {
testShardingConfig := testConfig
testShardingConfig.ShardedQueries = true
tpw, stopper, err := NewTripperware(testShardingConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, 1*time.Second, nil)
tpw, stopper, err := NewTripperware(testShardingConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, nil)
if stopper != nil {
defer stopper.Stop()
}
@ -257,7 +257,7 @@ func TestInstantQueryTripperware(t *testing.T) {
}
func TestSeriesTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour}, chunk.SchemaConfig{}, 0, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour}, chunk.SchemaConfig{}, nil)
if stopper != nil {
defer stopper.Stop()
}
@ -298,7 +298,7 @@ func TestSeriesTripperware(t *testing.T) {
}
func TestLabelsTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour}, chunk.SchemaConfig{}, 0, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour}, chunk.SchemaConfig{}, nil)
if stopper != nil {
defer stopper.Stop()
}
@ -344,7 +344,7 @@ func TestLabelsTripperware(t *testing.T) {
}
func TestLogNoRegex(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, 0, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, nil)
if stopper != nil {
defer stopper.Stop()
}
@ -378,7 +378,7 @@ func TestLogNoRegex(t *testing.T) {
}
func TestUnhandledPath(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, 0, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, nil)
if stopper != nil {
defer stopper.Stop()
}
@ -402,7 +402,7 @@ func TestUnhandledPath(t *testing.T) {
}
func TestRegexpParamsSupport(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, 0, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, nil)
if stopper != nil {
defer stopper.Stop()
}
@ -485,7 +485,7 @@ func TestPostQueries(t *testing.T) {
}
func TestEntriesLimitsTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxEntriesLimitPerQuery: 5000}, chunk.SchemaConfig{}, 0, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxEntriesLimitPerQuery: 5000}, chunk.SchemaConfig{}, nil)
if stopper != nil {
defer stopper.Stop()
}
@ -516,7 +516,7 @@ func TestEntriesLimitsTripperware(t *testing.T) {
}
func TestEntriesLimitWithZeroTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, 0, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, chunk.SchemaConfig{}, nil)
if stopper != nil {
defer stopper.Stop()
}

Loading…
Cancel
Save