Query-frontend: customisable query splitting for queries overlapping `query_ingester_within` window (#11535)

**What this PR does / why we need it**:
The config option `query_ingesters_within` defines the window during
which logs _could_ be present on ingesters, and as such queriers will
send queries to ingesters instead.

`split_queries_by_interval` is defined to split queries into subqueries
for increased parallelism.

Aggressive query splitting within the `query_ingesters_within` window
can result in overloading ingesters with unnecessarily large numbers of
subqueries, which perversely can impact writes.

`query_ingesters_within` is set to 3h by default. In Grafana Cloud Logs
we set `split_queries_by_interval` as low as 15m (defaults to 1h), which
would result in result in 3*60/15=12 requests. Every querier queries
every ingester during this window, so that's 12 requests _per ingester
per query_ which has the `query_ingesters_within` window in its time
range _(i.e. a query from now to now-7d would include the
`query_ingesters_within` window as well, now-3h to now-7d would not)_.

However, we _do_ want to split queries so an ingester won't have to
handle a query for a full `query_ingesters_within` window - this could
involve a large amount of data. To account for this, this PR introduces
a new option `split_ingester_queries_by_interval` on the query-frontend;
this setting is disabled by default.


![image](https://github.com/grafana/loki/assets/373762/2e671bd8-9e8d-4bf3-addf-bebcfc25e8d7)
pull/11587/head
Danny Kopping 2 years ago committed by GitHub
parent e915efc7f8
commit bcd03150c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 6
      docs/sources/configure/_index.md
  3. 13
      pkg/loki/modules.go
  4. 1
      pkg/querier/queryrange/limits/definitions.go
  5. 4
      pkg/querier/queryrange/limits_test.go
  6. 115
      pkg/querier/queryrange/roundtrip.go
  7. 51
      pkg/querier/queryrange/roundtrip_test.go
  8. 165
      pkg/querier/queryrange/split_by_interval.go
  9. 844
      pkg/querier/queryrange/split_by_interval_test.go
  10. 297
      pkg/querier/queryrange/splitters.go
  11. 8
      pkg/util/config.go
  12. 4
      pkg/util/time.go
  13. 10
      pkg/validation/limits.go

@ -46,6 +46,7 @@
* [11539](https://github.com/grafana/loki/pull/11539) **kaviraj,ashwanthgoli** Support caching /series and /labels query results
* [11545](https://github.com/grafana/loki/pull/11545) **dannykopping** Force correct memcached timeout when fetching chunks.
* [11589](https://github.com/grafana/loki/pull/11589) **ashwanthgoli** Results Cache: Adds `query_length_served` cache stat to measure the length of the query served from cache.
* [11535](https://github.com/grafana/loki/pull/11535) **dannykopping** Query Frontend: Allow customisable splitting of queries which overlap the `query_ingester_within` window to reduce query pressure on ingesters.
* [11654](https://github.com/grafana/loki/pull/11654) **dannykopping** Cache: atomically check background cache size limit correctly.
##### Fixes

@ -2884,6 +2884,12 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# CLI flag: -querier.split-metadata-queries-by-interval
[split_metadata_queries_by_interval: <duration> | default = 1d]
# Interval to use for time-based splitting when a request is within the
# `query_ingesters_within` window; defaults to `split-queries-by-interval` by
# setting to 0.
# CLI flag: -querier.split-ingester-queries-by-interval
[split_ingester_queries_by_interval: <duration> | default = 0s]
# Limit queries that can be sharded. Queries within the time range of now and
# now minus this sharding lookback are not sharded. The default value of 0s
# disables the lookback, causing sharding of all queries at all times.

@ -800,12 +800,25 @@ func (disabledShuffleShardingLimits) MaxQueriersPerUser(_ string) uint { return
func (disabledShuffleShardingLimits) MaxQueryCapacity(_ string) float64 { return 0 }
// ingesterQueryOptions exists simply to avoid dependency cycles when using querier.Config directly in queryrange.NewMiddleware
type ingesterQueryOptions struct {
querier.Config
}
func (i ingesterQueryOptions) QueryStoreOnly() bool {
return i.Config.QueryStoreOnly
}
func (i ingesterQueryOptions) QueryIngestersWithin() time.Duration {
return i.Config.QueryIngestersWithin
}
func (t *Loki) initQueryFrontendMiddleware() (_ services.Service, err error) {
level.Debug(util_log.Logger).Log("msg", "initializing query frontend tripperware")
middleware, stopper, err := queryrange.NewMiddleware(
t.Cfg.QueryRange,
t.Cfg.Querier.Engine,
ingesterQueryOptions{t.Cfg.Querier},
util_log.Logger,
t.Overrides,
t.Cfg.SchemaConfig,

@ -15,6 +15,7 @@ type Limits interface {
logql.Limits
QuerySplitDuration(string) time.Duration
MetadataQuerySplitDuration(string) time.Duration
IngesterQuerySplitDuration(string) time.Duration
MaxQuerySeries(context.Context, string) int
MaxEntriesLimitPerQuery(context.Context, string) int
MinShardingLookback(string) time.Duration

@ -58,7 +58,7 @@ func Test_seriesLimiter(t *testing.T) {
cfg.CacheIndexStatsResults = false
// split in 7 with 2 in // max.
l := WithSplitByLimits(fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, time.Hour)
tpw, stopper, err := NewMiddleware(cfg, testEngineOpts, util_log.Logger, l, config.SchemaConfig{
tpw, stopper, err := NewMiddleware(cfg, testEngineOpts, nil, util_log.Logger, l, config.SchemaConfig{
Configs: testSchemas,
}, nil, false, nil, constants.Loki)
if stopper != nil {
@ -228,7 +228,7 @@ func Test_MaxQueryParallelismDisable(t *testing.T) {
}
func Test_MaxQueryLookBack(t *testing.T) {
tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, util_log.Logger, fakeLimits{
tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, nil, util_log.Logger, fakeLimits{
maxQueryLookback: 1 * time.Hour,
maxQueryParallelism: 1,
}, config.SchemaConfig{

@ -120,6 +120,7 @@ func newResultsCacheFromConfig(cfg base.ResultsCacheConfig, registerer prometheu
func NewMiddleware(
cfg Config,
engineOpts logql.EngineOpts,
iqo util.IngesterQueryOptions,
log log.Logger,
limits Limits,
schema config.SchemaConfig,
@ -176,36 +177,38 @@ func NewMiddleware(
var codec base.Codec = DefaultCodec
indexStatsTripperware, err := NewIndexStatsTripperware(cfg, log, limits, schema, codec, statsCache,
split := newDefaultSplitter(limits, iqo)
indexStatsTripperware, err := NewIndexStatsTripperware(cfg, log, limits, schema, codec, split, statsCache,
cacheGenNumLoader, retentionEnabled, metrics, metricsNamespace)
if err != nil {
return nil, nil, err
}
metricsTripperware, err := NewMetricTripperware(cfg, engineOpts, log, limits, schema, codec, resultsCache,
metricsTripperware, err := NewMetricTripperware(cfg, engineOpts, log, limits, schema, codec, newMetricQuerySplitter(limits, iqo), resultsCache,
cacheGenNumLoader, retentionEnabled, PrometheusExtractor{}, metrics, indexStatsTripperware, metricsNamespace)
if err != nil {
return nil, nil, err
}
limitedTripperware, err := NewLimitedTripperware(cfg, engineOpts, log, limits, schema, metrics, indexStatsTripperware, codec)
limitedTripperware, err := NewLimitedTripperware(cfg, engineOpts, log, limits, schema, metrics, indexStatsTripperware, codec, split)
if err != nil {
return nil, nil, err
}
// NOTE: When we would start caching response from non-metric queries we would have to consider cache gen headers as well in
// MergeResponse implementation for Loki codecs same as it is done in Cortex at https://github.com/cortexproject/cortex/blob/21bad57b346c730d684d6d0205efef133422ab28/pkg/querier/queryrange/query_range.go#L170
logFilterTripperware, err := NewLogFilterTripperware(cfg, engineOpts, log, limits, schema, codec, resultsCache, metrics, indexStatsTripperware, metricsNamespace)
logFilterTripperware, err := NewLogFilterTripperware(cfg, engineOpts, log, limits, schema, codec, split, resultsCache, metrics, indexStatsTripperware, metricsNamespace)
if err != nil {
return nil, nil, err
}
seriesTripperware, err := NewSeriesTripperware(cfg, log, limits, metrics, schema, codec, seriesCache, cacheGenNumLoader, retentionEnabled, metricsNamespace)
seriesTripperware, err := NewSeriesTripperware(cfg, log, limits, metrics, schema, codec, split, seriesCache, cacheGenNumLoader, retentionEnabled, metricsNamespace)
if err != nil {
return nil, nil, err
}
labelsTripperware, err := NewLabelsTripperware(cfg, log, limits, codec, labelsCache, cacheGenNumLoader, retentionEnabled, metrics, schema, metricsNamespace)
labelsTripperware, err := NewLabelsTripperware(cfg, log, limits, codec, split, labelsCache, cacheGenNumLoader, retentionEnabled, metrics, schema, metricsNamespace)
if err != nil {
return nil, nil, err
}
@ -215,7 +218,7 @@ func NewMiddleware(
return nil, nil, err
}
seriesVolumeTripperware, err := NewVolumeTripperware(cfg, log, limits, schema, codec, volumeCache, cacheGenNumLoader, retentionEnabled, metrics, metricsNamespace)
seriesVolumeTripperware, err := NewVolumeTripperware(cfg, log, limits, schema, codec, split, volumeCache, cacheGenNumLoader, retentionEnabled, metrics, metricsNamespace)
if err != nil {
return nil, nil, err
}
@ -406,18 +409,7 @@ func getOperation(path string) string {
}
// NewLogFilterTripperware creates a new frontend tripperware responsible for handling log requests.
func NewLogFilterTripperware(
cfg Config,
engineOpts logql.EngineOpts,
log log.Logger,
limits Limits,
schema config.SchemaConfig,
merger base.Merger,
c cache.Cache,
metrics *Metrics,
indexStatsTripperware base.Middleware,
metricsNamespace string,
) (base.Middleware, error) {
func NewLogFilterTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, split splitter, c cache.Cache, metrics *Metrics, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) {
return base.MiddlewareFunc(func(next base.Handler) base.Handler {
statsHandler := indexStatsTripperware.Wrap(next)
@ -426,7 +418,7 @@ func NewLogFilterTripperware(
NewLimitsMiddleware(limits),
NewQuerySizeLimiterMiddleware(schema.Configs, engineOpts, log, limits, statsHandler),
base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics),
SplitByIntervalMiddleware(schema.Configs, limits, merger, splitByTime, metrics.SplitByMetrics),
SplitByIntervalMiddleware(schema.Configs, limits, merger, split, metrics.SplitByMetrics),
}
if cfg.CacheResults {
@ -481,16 +473,7 @@ func NewLogFilterTripperware(
}
// NewLimitedTripperware creates a new frontend tripperware responsible for handling log requests which are label matcher only, no filter expression.
func NewLimitedTripperware(
_ Config,
engineOpts logql.EngineOpts,
log log.Logger,
limits Limits,
schema config.SchemaConfig,
metrics *Metrics,
indexStatsTripperware base.Middleware,
merger base.Merger,
) (base.Middleware, error) {
func NewLimitedTripperware(_ Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, metrics *Metrics, indexStatsTripperware base.Middleware, merger base.Merger, split splitter) (base.Middleware, error) {
return base.MiddlewareFunc(func(next base.Handler) base.Handler {
statsHandler := indexStatsTripperware.Wrap(next)
@ -499,7 +482,7 @@ func NewLimitedTripperware(
NewLimitsMiddleware(limits),
NewQuerySizeLimiterMiddleware(schema.Configs, engineOpts, log, limits, statsHandler),
base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics),
SplitByIntervalMiddleware(schema.Configs, WithMaxParallelism(limits, limitedQuerySplits), merger, splitByTime, metrics.SplitByMetrics),
SplitByIntervalMiddleware(schema.Configs, WithMaxParallelism(limits, limitedQuerySplits), merger, split, metrics.SplitByMetrics),
NewQuerierSizeLimiterMiddleware(schema.Configs, engineOpts, log, limits, statsHandler),
}
@ -518,6 +501,7 @@ func NewSeriesTripperware(
metrics *Metrics,
schema config.SchemaConfig,
merger base.Merger,
split splitter,
c cache.Cache,
cacheGenNumLoader base.CacheGenNumberLoader,
retentionEnabled bool,
@ -558,7 +542,7 @@ func NewSeriesTripperware(
StatsCollectorMiddleware(),
NewLimitsMiddleware(limits),
base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics),
SplitByIntervalMiddleware(schema.Configs, limits, merger, splitByTime, metrics.SplitByMetrics),
SplitByIntervalMiddleware(schema.Configs, limits, merger, split, metrics.SplitByMetrics),
}
if cfg.CacheSeriesResults {
@ -567,7 +551,6 @@ func NewSeriesTripperware(
base.InstrumentMiddleware("series_results_cache", metrics.InstrumentMiddlewareMetrics),
cacheMiddleware,
)
}
if cfg.MaxRetries > 0 {
@ -601,6 +584,7 @@ func NewLabelsTripperware(
log log.Logger,
limits Limits,
merger base.Merger,
split splitter,
c cache.Cache,
cacheGenNumLoader base.CacheGenNumberLoader,
retentionEnabled bool,
@ -643,7 +627,7 @@ func NewLabelsTripperware(
StatsCollectorMiddleware(),
NewLimitsMiddleware(limits),
base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics),
SplitByIntervalMiddleware(schema.Configs, limits, merger, splitByTime, metrics.SplitByMetrics),
SplitByIntervalMiddleware(schema.Configs, limits, merger, split, metrics.SplitByMetrics),
}
if cfg.CacheLabelResults {
@ -652,7 +636,6 @@ func NewLabelsTripperware(
base.InstrumentMiddleware("label_results_cache", metrics.InstrumentMiddlewareMetrics),
cacheMiddleware,
)
}
if cfg.MaxRetries > 0 {
@ -669,21 +652,7 @@ func NewLabelsTripperware(
}
// NewMetricTripperware creates a new frontend tripperware responsible for handling metric queries
func NewMetricTripperware(
cfg Config,
engineOpts logql.EngineOpts,
log log.Logger,
limits Limits,
schema config.SchemaConfig,
merger base.Merger,
c cache.Cache,
cacheGenNumLoader base.CacheGenNumberLoader,
retentionEnabled bool,
extractor base.Extractor,
metrics *Metrics,
indexStatsTripperware base.Middleware,
metricsNamespace string,
) (base.Middleware, error) {
func NewMetricTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, split splitter, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, extractor base.Extractor, metrics *Metrics, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) {
cacheKey := cacheKeyLimits{limits, cfg.Transformer}
var queryCacheMiddleware base.Middleware
if cfg.CacheResults {
@ -737,7 +706,7 @@ func NewMetricTripperware(
queryRangeMiddleware,
NewQuerySizeLimiterMiddleware(schema.Configs, engineOpts, log, limits, statsHandler),
base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics),
SplitByIntervalMiddleware(schema.Configs, limits, merger, splitMetricByTime, metrics.SplitByMetrics),
SplitByIntervalMiddleware(schema.Configs, limits, merger, split, metrics.SplitByMetrics),
)
if cfg.CacheResults {
@ -793,16 +762,7 @@ func NewMetricTripperware(
}
// NewInstantMetricTripperware creates a new frontend tripperware responsible for handling metric queries
func NewInstantMetricTripperware(
cfg Config,
engineOpts logql.EngineOpts,
log log.Logger,
limits Limits,
schema config.SchemaConfig,
metrics *Metrics,
indexStatsTripperware base.Middleware,
metricsNamespace string,
) (base.Middleware, error) {
func NewInstantMetricTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, metrics *Metrics, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) {
return base.MiddlewareFunc(func(next base.Handler) base.Handler {
statsHandler := indexStatsTripperware.Wrap(next)
@ -844,21 +804,10 @@ func NewInstantMetricTripperware(
}), nil
}
func NewVolumeTripperware(
cfg Config,
log log.Logger,
limits Limits,
schema config.SchemaConfig,
merger base.Merger,
c cache.Cache,
cacheGenNumLoader base.CacheGenNumberLoader,
retentionEnabled bool,
metrics *Metrics,
metricsNamespace string,
) (base.Middleware, error) {
func NewVolumeTripperware(cfg Config, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, split splitter, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, metrics *Metrics, metricsNamespace string) (base.Middleware, error) {
// Parallelize the volume requests, so it doesn't send a huge request to a single index-gw (i.e. {app=~".+"} for 30d).
// Indices are sharded by 24 hours, so we split the volume request in 24h intervals.
limits = WithSplitByLimits(limits, 24*time.Hour)
limits = WithSplitByLimits(limits, indexStatsQuerySplitInterval)
var cacheMiddleware base.Middleware
if cfg.CacheVolumeResults {
var err error
@ -894,6 +843,7 @@ func NewVolumeTripperware(
cacheMiddleware,
cfg,
merger,
split,
limits,
log,
metrics,
@ -962,18 +912,7 @@ func volumeFeatureFlagRoundTripper(nextTW base.Middleware, limits Limits) base.M
})
}
func NewIndexStatsTripperware(
cfg Config,
log log.Logger,
limits Limits,
schema config.SchemaConfig,
merger base.Merger,
c cache.Cache,
cacheGenNumLoader base.CacheGenNumberLoader,
retentionEnabled bool,
metrics *Metrics,
metricsNamespace string,
) (base.Middleware, error) {
func NewIndexStatsTripperware(cfg Config, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, split splitter, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, metrics *Metrics, metricsNamespace string) (base.Middleware, error) {
limits = WithSplitByLimits(limits, indexStatsQuerySplitInterval)
var cacheMiddleware base.Middleware
@ -1011,6 +950,7 @@ func NewIndexStatsTripperware(
cacheMiddleware,
cfg,
merger,
split,
limits,
log,
metrics,
@ -1028,6 +968,7 @@ func sharedIndexTripperware(
cacheMiddleware base.Middleware,
cfg Config,
merger base.Merger,
split splitter,
limits Limits,
log log.Logger,
metrics *Metrics,
@ -1038,7 +979,7 @@ func sharedIndexTripperware(
middlewares := []base.Middleware{
NewLimitsMiddleware(limits),
base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics),
SplitByIntervalMiddleware(schema.Configs, limits, merger, splitByTime, metrics.SplitByMetrics),
SplitByIntervalMiddleware(schema.Configs, limits, merger, split, metrics.SplitByMetrics),
}
if cacheMiddleware != nil {

@ -189,7 +189,7 @@ func TestMetricsTripperware(t *testing.T) {
noCacheTestCfg := testConfig
noCacheTestCfg.CacheResults = false
noCacheTestCfg.CacheIndexStatsResults = false
tpw, stopper, err := NewMiddleware(noCacheTestCfg, testEngineOpts, util_log.Logger, l, config.SchemaConfig{
tpw, stopper, err := NewMiddleware(noCacheTestCfg, testEngineOpts, nil, util_log.Logger, l, config.SchemaConfig{
Configs: testSchemasTSDB,
}, nil, false, nil, constants.Loki)
if stopper != nil {
@ -240,7 +240,7 @@ func TestMetricsTripperware(t *testing.T) {
require.Error(t, err)
// Configure with cache
tpw, stopper, err = NewMiddleware(testConfig, testEngineOpts, util_log.Logger, l, config.SchemaConfig{
tpw, stopper, err = NewMiddleware(testConfig, testEngineOpts, nil, util_log.Logger, l, config.SchemaConfig{
Configs: testSchemasTSDB,
}, nil, false, nil, constants.Loki)
if stopper != nil {
@ -278,7 +278,7 @@ func TestLogFilterTripperware(t *testing.T) {
noCacheTestCfg := testConfig
noCacheTestCfg.CacheResults = false
noCacheTestCfg.CacheIndexStatsResults = false
tpw, stopper, err := NewMiddleware(noCacheTestCfg, testEngineOpts, util_log.Logger, l, config.SchemaConfig{Configs: testSchemasTSDB}, nil, false, nil, constants.Loki)
tpw, stopper, err := NewMiddleware(noCacheTestCfg, testEngineOpts, nil, util_log.Logger, l, config.SchemaConfig{Configs: testSchemasTSDB}, nil, false, nil, constants.Loki)
if stopper != nil {
defer stopper.Stop()
}
@ -347,7 +347,7 @@ func TestInstantQueryTripperware(t *testing.T) {
queryTimeout: 1 * time.Minute,
maxSeries: 1,
}
tpw, stopper, err := NewMiddleware(testShardingConfigNoCache, testEngineOpts, util_log.Logger, l, config.SchemaConfig{Configs: testSchemasTSDB}, nil, false, nil, constants.Loki)
tpw, stopper, err := NewMiddleware(testShardingConfigNoCache, testEngineOpts, nil, util_log.Logger, l, config.SchemaConfig{Configs: testSchemasTSDB}, nil, false, nil, constants.Loki)
if stopper != nil {
defer stopper.Stop()
}
@ -403,7 +403,7 @@ func TestSeriesTripperware(t *testing.T) {
"1": 24 * time.Hour,
},
}
tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, util_log.Logger, l, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki)
tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, nil, util_log.Logger, l, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki)
if stopper != nil {
defer stopper.Stop()
}
@ -441,7 +441,7 @@ func TestLabelsTripperware(t *testing.T) {
"1": 24 * time.Hour,
},
}
tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, util_log.Logger, l, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki)
tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, nil, util_log.Logger, l, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki)
if stopper != nil {
defer stopper.Stop()
}
@ -487,7 +487,7 @@ func TestLabelsTripperware(t *testing.T) {
}
func TestIndexStatsTripperware(t *testing.T) {
tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki)
tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, nil, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki)
if stopper != nil {
defer stopper.Stop()
}
@ -537,7 +537,7 @@ func TestVolumeTripperware(t *testing.T) {
volumeEnabled: true,
maxSeries: 42,
}
tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, util_log.Logger, limits, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki)
tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, nil, util_log.Logger, limits, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki)
if stopper != nil {
defer stopper.Stop()
}
@ -593,7 +593,7 @@ func TestVolumeTripperware(t *testing.T) {
})
t.Run("range queries return a prometheus style metrics response, putting volumes in buckets based on the step", func(t *testing.T) {
tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, volumeEnabled: true}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki)
tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, nil, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, volumeEnabled: true}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki)
if stopper != nil {
defer stopper.Stop()
}
@ -784,7 +784,7 @@ func TestNewTripperware_Caches(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
_, stopper, err := NewMiddleware(tc.config, testEngineOpts, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki)
_, stopper, err := NewMiddleware(tc.config, testEngineOpts, nil, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki)
if stopper != nil {
defer stopper.Stop()
}
@ -814,7 +814,7 @@ func TestNewTripperware_Caches(t *testing.T) {
}
func TestLogNoFilter(t *testing.T) {
tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki)
tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, nil, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki)
if stopper != nil {
defer stopper.Stop()
}
@ -871,7 +871,7 @@ func TestPostQueries(t *testing.T) {
}
func TestTripperware_EntriesLimit(t *testing.T) {
tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, util_log.Logger, fakeLimits{maxEntriesLimitPerQuery: 5000, maxQueryParallelism: 1}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki)
tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, nil, util_log.Logger, fakeLimits{maxEntriesLimitPerQuery: 5000, maxQueryParallelism: 1}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki)
if stopper != nil {
defer stopper.Stop()
}
@ -920,7 +920,7 @@ func TestTripperware_RequiredLabels(t *testing.T) {
} {
t.Run(test.qs, func(t *testing.T) {
limits := fakeLimits{maxEntriesLimitPerQuery: 5000, maxQueryParallelism: 1, requiredLabels: []string{"app"}}
tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, util_log.Logger, limits, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki)
tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, nil, util_log.Logger, limits, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki)
if stopper != nil {
defer stopper.Stop()
}
@ -1027,7 +1027,7 @@ func TestTripperware_RequiredNumberLabels(t *testing.T) {
maxQueryParallelism: 1,
requiredNumberLabels: tc.requiredNumberLabels,
}
tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, util_log.Logger, limits, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki)
tpw, stopper, err := NewMiddleware(testConfig, testEngineOpts, nil, util_log.Logger, limits, config.SchemaConfig{Configs: testSchemas}, nil, false, nil, constants.Loki)
if stopper != nil {
defer stopper.Stop()
}
@ -1218,7 +1218,7 @@ func TestMetricsTripperware_SplitShardStats(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
tpw, stopper, err := NewMiddleware(statsTestCfg, testEngineOpts, util_log.Logger, l, config.SchemaConfig{Configs: statsSchemas}, nil, false, nil, constants.Loki)
tpw, stopper, err := NewMiddleware(statsTestCfg, testEngineOpts, nil, util_log.Logger, l, config.SchemaConfig{Configs: statsSchemas}, nil, false, nil, constants.Loki)
if stopper != nil {
defer stopper.Stop()
}
@ -1245,6 +1245,7 @@ type fakeLimits struct {
maxSeries int
splitDuration map[string]time.Duration
metadataSplitDuration map[string]time.Duration
ingesterSplitDuration map[string]time.Duration
minShardingLookback time.Duration
queryTimeout time.Duration
requiredLabels []string
@ -1269,6 +1270,13 @@ func (f fakeLimits) MetadataQuerySplitDuration(key string) time.Duration {
return f.metadataSplitDuration[key]
}
func (f fakeLimits) IngesterQuerySplitDuration(key string) time.Duration {
if f.ingesterSplitDuration == nil {
return 0
}
return f.ingesterSplitDuration[key]
}
func (f fakeLimits) MaxQueryLength(context.Context, string) time.Duration {
if f.maxQueryLength == 0 {
return time.Hour * 7
@ -1344,6 +1352,19 @@ func (f fakeLimits) TSDBMaxBytesPerShard(_ string) int {
return valid.DefaultTSDBMaxBytesPerShard
}
type ingesterQueryOpts struct {
queryStoreOnly bool
queryIngestersWithin time.Duration
}
func (i ingesterQueryOpts) QueryStoreOnly() bool {
return i.queryStoreOnly
}
func (i ingesterQueryOpts) QueryIngestersWithin() time.Duration {
return i.queryIngestersWithin
}
func counter() (*int, base.Handler) {
count := 0
var lock sync.Mutex

@ -21,7 +21,6 @@ import (
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/validation"
)
@ -56,13 +55,11 @@ type splitByInterval struct {
limits Limits
merger queryrangebase.Merger
metrics *SplitByMetrics
splitter Splitter
splitter splitter
}
type Splitter func(req queryrangebase.Request, interval time.Duration) ([]queryrangebase.Request, error)
// SplitByIntervalMiddleware creates a new Middleware that splits log requests by a given interval.
func SplitByIntervalMiddleware(configs []config.PeriodConfig, limits Limits, merger queryrangebase.Merger, splitter Splitter, metrics *SplitByMetrics) queryrangebase.Middleware {
func SplitByIntervalMiddleware(configs []config.PeriodConfig, limits Limits, merger queryrangebase.Merger, splitter splitter, metrics *SplitByMetrics) queryrangebase.Middleware {
if metrics == nil {
metrics = NewSplitByMetrics(nil)
}
@ -197,7 +194,7 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrangebase.Request) (que
return h.next.Do(ctx, r)
}
intervals, err := h.splitter(r, interval)
intervals, err := h.splitter.split(time.Now().UTC(), tenantIDs, r, interval)
if err != nil {
return nil, err
}
@ -251,73 +248,6 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrangebase.Request) (que
return h.merger.MergeResponse(resps...)
}
func splitByTime(req queryrangebase.Request, interval time.Duration) ([]queryrangebase.Request, error) {
var reqs []queryrangebase.Request
switch r := req.(type) {
case *LokiRequest:
util.ForInterval(interval, r.StartTs, r.EndTs, false, func(start, end time.Time) {
reqs = append(reqs, &LokiRequest{
Query: r.Query,
Limit: r.Limit,
Step: r.Step,
Interval: r.Interval,
Direction: r.Direction,
Path: r.Path,
StartTs: start,
EndTs: end,
Plan: r.Plan,
})
})
case *LokiSeriesRequest:
// metadata queries have end time inclusive.
// Set endTimeInclusive to true so that ForInterval keeps a gap of 1ms between splits to
// avoid querying duplicate data in adjacent queries.
util.ForInterval(interval, r.StartTs, r.EndTs, true, func(start, end time.Time) {
reqs = append(reqs, &LokiSeriesRequest{
Match: r.Match,
Path: r.Path,
StartTs: start,
EndTs: end,
Shards: r.Shards,
})
})
case *LabelRequest:
// metadata queries have end time inclusive.
// Set endTimeInclusive to true so that ForInterval keeps a gap of 1ms between splits to
// avoid querying duplicate data in adjacent queries.
util.ForInterval(interval, *r.Start, *r.End, true, func(start, end time.Time) {
reqs = append(reqs, NewLabelRequest(start, end, r.Query, r.Name, r.Path()))
})
case *logproto.IndexStatsRequest:
startTS := r.GetStart()
endTS := r.GetEnd()
util.ForInterval(interval, startTS, endTS, true, func(start, end time.Time) {
reqs = append(reqs, &logproto.IndexStatsRequest{
From: model.TimeFromUnix(start.Unix()),
Through: model.TimeFromUnix(end.Unix()),
Matchers: r.GetMatchers(),
})
})
case *logproto.VolumeRequest:
startTS := r.GetStart()
endTS := r.GetEnd()
util.ForInterval(interval, startTS, endTS, true, func(start, end time.Time) {
reqs = append(reqs, &logproto.VolumeRequest{
From: model.TimeFromUnix(start.Unix()),
Through: model.TimeFromUnix(end.Unix()),
Matchers: r.GetMatchers(),
Limit: r.Limit,
TargetLabels: r.TargetLabels,
AggregateBy: r.AggregateBy,
})
})
default:
return nil, nil
}
return reqs, nil
}
// maxRangeVectorAndOffsetDurationFromQueryString
func maxRangeVectorAndOffsetDurationFromQueryString(q string) (time.Duration, time.Duration, error) {
parsed, err := syntax.ParseExpr(q)
@ -346,92 +276,3 @@ func maxRangeVectorAndOffsetDuration(expr syntax.Expr) (time.Duration, time.Dura
})
return maxRVDuration, maxOffset, nil
}
// reduceSplitIntervalForRangeVector reduces the split interval for a range query based on the duration of the range vector.
// Large range vector durations will not be split into smaller intervals because it can cause the queries to be slow by over-processing data.
func reduceSplitIntervalForRangeVector(r *LokiRequest, interval time.Duration) (time.Duration, error) {
maxRange, _, err := maxRangeVectorAndOffsetDuration(r.Plan.AST)
if err != nil {
return 0, err
}
if maxRange > interval {
return maxRange, nil
}
return interval, nil
}
func splitMetricByTime(r queryrangebase.Request, interval time.Duration) ([]queryrangebase.Request, error) {
var reqs []queryrangebase.Request
lokiReq := r.(*LokiRequest)
interval, err := reduceSplitIntervalForRangeVector(lokiReq, interval)
if err != nil {
return nil, err
}
// step align start and end time of the query. Start time is rounded down and end time is rounded up.
stepNs := r.GetStep() * 1e6
startNs := lokiReq.StartTs.UnixNano()
start := time.Unix(0, startNs-startNs%stepNs)
endNs := lokiReq.EndTs.UnixNano()
if mod := endNs % stepNs; mod != 0 {
endNs += stepNs - mod
}
end := time.Unix(0, endNs)
lokiReq = lokiReq.WithStartEnd(start, end).(*LokiRequest)
// step is >= configured split interval, let us just split the query interval by step
if lokiReq.Step >= interval.Milliseconds() {
util.ForInterval(time.Duration(lokiReq.Step*1e6), lokiReq.StartTs, lokiReq.EndTs, false, func(start, end time.Time) {
reqs = append(reqs, &LokiRequest{
Query: lokiReq.Query,
Limit: lokiReq.Limit,
Step: lokiReq.Step,
Interval: lokiReq.Interval,
Direction: lokiReq.Direction,
Path: lokiReq.Path,
StartTs: start,
EndTs: end,
Plan: lokiReq.Plan,
})
})
return reqs, nil
}
for start := lokiReq.StartTs; start.Before(lokiReq.EndTs); start = nextIntervalBoundary(start, r.GetStep(), interval).Add(time.Duration(r.GetStep()) * time.Millisecond) {
end := nextIntervalBoundary(start, r.GetStep(), interval)
if end.Add(time.Duration(r.GetStep())*time.Millisecond).After(lokiReq.EndTs) || end.Add(time.Duration(r.GetStep())*time.Millisecond) == lokiReq.EndTs {
end = lokiReq.EndTs
}
reqs = append(reqs, &LokiRequest{
Query: lokiReq.Query,
Limit: lokiReq.Limit,
Step: lokiReq.Step,
Interval: lokiReq.Interval,
Direction: lokiReq.Direction,
Path: lokiReq.Path,
StartTs: start,
EndTs: end,
Plan: lokiReq.Plan,
})
}
return reqs, nil
}
// Round up to the step before the next interval boundary.
func nextIntervalBoundary(t time.Time, step int64, interval time.Duration) time.Time {
stepNs := step * 1e6
nsPerInterval := interval.Nanoseconds()
startOfNextInterval := ((t.UnixNano() / nsPerInterval) + 1) * nsPerInterval
// ensure that target is a multiple of steps away from the start time
target := startOfNextInterval - ((startOfNextInterval - t.UnixNano()) % stepNs)
if target == startOfNextInterval {
target -= stepNs
}
return time.Unix(0, target)
}

File diff suppressed because it is too large Load Diff

@ -0,0 +1,297 @@
package queryrange
import (
"time"
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/validation"
)
type splitter interface {
split(execTime time.Time, tenantIDs []string, request queryrangebase.Request, interval time.Duration) ([]queryrangebase.Request, error)
}
type defaultSplitter struct {
limits Limits
iqo util.IngesterQueryOptions
}
func newDefaultSplitter(limits Limits, iqo util.IngesterQueryOptions) *defaultSplitter {
return &defaultSplitter{limits, iqo}
}
func (s *defaultSplitter) split(execTime time.Time, tenantIDs []string, req queryrangebase.Request, interval time.Duration) ([]queryrangebase.Request, error) {
var (
reqs []queryrangebase.Request
factory func(start, end time.Time)
endTimeInclusive = true
)
switch r := req.(type) {
case *LokiRequest:
endTimeInclusive = false
factory = func(start, end time.Time) {
reqs = append(reqs, &LokiRequest{
Query: r.Query,
Limit: r.Limit,
Step: r.Step,
Interval: r.Interval,
Direction: r.Direction,
Path: r.Path,
StartTs: start,
EndTs: end,
Plan: r.Plan,
})
}
case *LokiSeriesRequest:
// metadata queries have end time inclusive.
// Set endTimeInclusive to true so that ForInterval keeps a gap of 1ms between splits to
// avoid querying duplicate data in adjacent queries.
factory = func(start, end time.Time) {
reqs = append(reqs, &LokiSeriesRequest{
Match: r.Match,
Path: r.Path,
StartTs: start,
EndTs: end,
Shards: r.Shards,
})
}
case *LabelRequest:
// metadata queries have end time inclusive.
// Set endTimeInclusive to true so that ForInterval keeps a gap of 1ms between splits to
// avoid querying duplicate data in adjacent queries.
factory = func(start, end time.Time) {
reqs = append(reqs, NewLabelRequest(start, end, r.Query, r.Name, r.Path()))
}
case *logproto.IndexStatsRequest:
factory = func(start, end time.Time) {
reqs = append(reqs, &logproto.IndexStatsRequest{
From: model.TimeFromUnix(start.Unix()),
Through: model.TimeFromUnix(end.Unix()),
Matchers: r.GetMatchers(),
})
}
case *logproto.VolumeRequest:
factory = func(start, end time.Time) {
reqs = append(reqs, &logproto.VolumeRequest{
From: model.TimeFromUnix(start.Unix()),
Through: model.TimeFromUnix(end.Unix()),
Matchers: r.GetMatchers(),
Limit: r.Limit,
TargetLabels: r.TargetLabels,
AggregateBy: r.AggregateBy,
})
}
default:
return nil, nil
}
var (
ingesterSplits []queryrangebase.Request
origStart = req.GetStart().UTC()
origEnd = req.GetEnd().UTC()
)
start, end, needsIngesterSplits := ingesterQueryBounds(execTime, s.iqo, req)
if ingesterQueryInterval := validation.MaxDurationPerTenant(tenantIDs, s.limits.IngesterQuerySplitDuration); ingesterQueryInterval != 0 && needsIngesterSplits {
// perform splitting using special interval (`split_ingester_queries_by_interval`)
util.ForInterval(ingesterQueryInterval, start, end, endTimeInclusive, factory)
// rebound after ingester queries have been split out
end = start
start = req.GetStart().UTC()
if endTimeInclusive {
end = end.Add(-util.SplitGap)
}
// query only overlaps ingester query window, nothing more to do
if start.After(end) || start.Equal(end) {
return reqs, nil
}
// copy the splits, reset the results
ingesterSplits = reqs
reqs = nil
} else {
start = origStart
end = origEnd
}
// perform splitting over the rest of the time range
util.ForInterval(interval, origStart, end, endTimeInclusive, factory)
// move the ingester splits to the end to maintain correct order
reqs = append(reqs, ingesterSplits...)
return reqs, nil
}
type metricQuerySplitter struct {
limits Limits
iqo util.IngesterQueryOptions
}
func newMetricQuerySplitter(limits Limits, iqo util.IngesterQueryOptions) *metricQuerySplitter {
return &metricQuerySplitter{limits, iqo}
}
// reduceSplitIntervalForRangeVector reduces the split interval for a range query based on the duration of the range vector.
// Large range vector durations will not be split into smaller intervals because it can cause the queries to be slow by over-processing data.
func (s *metricQuerySplitter) reduceSplitIntervalForRangeVector(r *LokiRequest, interval time.Duration) (time.Duration, error) {
maxRange, _, err := maxRangeVectorAndOffsetDuration(r.Plan.AST)
if err != nil {
return 0, err
}
if maxRange > interval {
return maxRange, nil
}
return interval, nil
}
// Round up to the step before the next interval boundary.
func (s *metricQuerySplitter) nextIntervalBoundary(t time.Time, step int64, interval time.Duration) time.Time {
stepNs := step * 1e6
nsPerInterval := interval.Nanoseconds()
startOfNextInterval := ((t.UnixNano() / nsPerInterval) + 1) * nsPerInterval
// ensure that target is a multiple of steps away from the start time
target := startOfNextInterval - ((startOfNextInterval - t.UnixNano()) % stepNs)
if target == startOfNextInterval {
target -= stepNs
}
return time.Unix(0, target)
}
func (s *metricQuerySplitter) split(execTime time.Time, tenantIDs []string, r queryrangebase.Request, interval time.Duration) ([]queryrangebase.Request, error) {
var reqs []queryrangebase.Request
lokiReq := r.(*LokiRequest)
interval, err := s.reduceSplitIntervalForRangeVector(lokiReq, interval)
if err != nil {
return nil, err
}
start, end := s.alignStartEnd(r.GetStep(), lokiReq.StartTs, lokiReq.EndTs)
lokiReq = lokiReq.WithStartEnd(start, end).(*LokiRequest)
factory := func(start, end time.Time) {
reqs = append(reqs, &LokiRequest{
Query: lokiReq.Query,
Limit: lokiReq.Limit,
Step: lokiReq.Step,
Interval: lokiReq.Interval,
Direction: lokiReq.Direction,
Path: lokiReq.Path,
StartTs: start,
EndTs: end,
Plan: lokiReq.Plan,
})
}
// step is >= configured split interval, let us just split the query interval by step
// TODO this is likely buggy when step >= query range, how should we handle this?
if lokiReq.Step >= interval.Milliseconds() {
util.ForInterval(time.Duration(lokiReq.Step*1e6), lokiReq.StartTs, lokiReq.EndTs, false, factory)
return reqs, nil
}
var (
ingesterSplits []queryrangebase.Request
needsIngesterSplits bool
)
origStart := start
origEnd := end
start, end, needsIngesterSplits = ingesterQueryBounds(execTime, s.iqo, lokiReq)
start, end = s.alignStartEnd(r.GetStep(), start, end)
if ingesterQueryInterval := validation.MaxDurationPerTenant(tenantIDs, s.limits.IngesterQuerySplitDuration); ingesterQueryInterval != 0 && needsIngesterSplits {
// perform splitting using special interval (`split_ingester_queries_by_interval`)
s.buildMetricSplits(lokiReq.GetStep(), ingesterQueryInterval, start, end, factory)
// rebound after ingester queries have been split out
//
// the end time should now be the boundary of the `query_ingester_within` window, which is "start" currently;
// but since start is already step-aligned we need to subtract 1ns to align it down by 1 more step so that we
// get a consistent step between splits
end, _ = s.alignStartEnd(r.GetStep(), start.Add(-time.Nanosecond), end)
// we restore the previous start time (the start time of the query)
start = origStart
// query only overlaps ingester query window, nothing more to do
if start.After(end) || start.Equal(end) {
return reqs, nil
}
// copy the splits, reset the results
ingesterSplits = reqs
reqs = nil
} else {
start = origStart
end = origEnd
}
// perform splitting over the rest of the time range
s.buildMetricSplits(lokiReq.GetStep(), interval, start, end, factory)
// move the ingester splits to the end to maintain correct order
reqs = append(reqs, ingesterSplits...)
return reqs, nil
}
func (s *metricQuerySplitter) alignStartEnd(step int64, start, end time.Time) (time.Time, time.Time) {
// step align start and end time of the query. Start time is rounded down and end time is rounded up.
stepNs := step * 1e6
startNs := start.UnixNano()
endNs := end.UnixNano()
if mod := endNs % stepNs; mod != 0 {
endNs += stepNs - mod
}
return time.Unix(0, startNs-startNs%stepNs), time.Unix(0, endNs)
}
func (s *metricQuerySplitter) buildMetricSplits(step int64, interval time.Duration, start, end time.Time, factory func(start, end time.Time)) {
for splStart := start; splStart.Before(end); splStart = s.nextIntervalBoundary(splStart, step, interval).Add(time.Duration(step) * time.Millisecond) {
splEnd := s.nextIntervalBoundary(splStart, step, interval)
if splEnd.Add(time.Duration(step)*time.Millisecond).After(end) || splEnd.Add(time.Duration(step)*time.Millisecond) == end {
splEnd = end
}
factory(splStart, splEnd)
}
}
// ingesterQueryBounds determines if we need to split time ranges overlapping the ingester query window (`query_ingesters_within`)
// and retrieve the bounds for those specific splits
func ingesterQueryBounds(execTime time.Time, iqo util.IngesterQueryOptions, req queryrangebase.Request) (time.Time, time.Time, bool) {
start, end := req.GetStart().UTC(), req.GetEnd().UTC()
// ingesters are not queried, nothing to do
if iqo == nil || iqo.QueryStoreOnly() {
return start, end, false
}
windowSize := iqo.QueryIngestersWithin()
ingesterWindow := execTime.UTC().Add(-windowSize)
// clamp to the start time
if ingesterWindow.Before(start) {
ingesterWindow = start
}
// query range does not overlap with ingester query window, nothing to do
if end.Before(ingesterWindow) {
return start, end, false
}
return ingesterWindow, end, true
}

@ -4,6 +4,7 @@ import (
"fmt"
"io"
"strings"
"time"
"github.com/go-kit/log/level"
"github.com/prometheus/common/version"
@ -38,3 +39,10 @@ func PrintConfig(w io.Writer, config interface{}) error {
fmt.Fprintf(w, "---\n# Loki Config\n# %s\n%s\n\n", version.Info(), string(lc))
return nil
}
// IngesterQueryOptions exists because querier.Config cannot be passed directly to the queryrange package
// due to an import cycle.
type IngesterQueryOptions interface {
QueryStoreOnly() bool
QueryIngestersWithin() time.Duration
}

@ -87,6 +87,8 @@ func NewDisableableTicker(interval time.Duration) (func(), <-chan time.Time) {
return func() { tick.Stop() }, tick.C
}
const SplitGap = time.Millisecond
// ForInterval splits the given start and end time into given interval.
// The start and end time in splits would be aligned to the interval
// except for the start time of first split and end time of last split which would be kept same as original start/end
@ -107,7 +109,7 @@ func ForInterval(interval time.Duration, start, end time.Time, endTimeInclusive
if !newEnd.Before(end) {
newEnd = end
} else if endTimeInclusive {
newEnd = newEnd.Add(-time.Millisecond)
newEnd = newEnd.Add(-SplitGap)
}
if firstInterval {
callback(ogStart, newEnd)

@ -106,6 +106,7 @@ type Limits struct {
// Query frontend enforced limits. The default is actually parameterized by the queryrange config.
QuerySplitDuration model.Duration `yaml:"split_queries_by_interval" json:"split_queries_by_interval"`
MetadataQuerySplitDuration model.Duration `yaml:"split_metadata_queries_by_interval" json:"split_metadata_queries_by_interval"`
IngesterQuerySplitDuration model.Duration `yaml:"split_ingester_queries_by_interval" json:"split_ingester_queries_by_interval"`
MinShardingLookback model.Duration `yaml:"min_sharding_lookback" json:"min_sharding_lookback"`
MaxQueryBytesRead flagext.ByteSize `yaml:"max_query_bytes_read" json:"max_query_bytes_read"`
MaxQuerierBytesRead flagext.ByteSize `yaml:"max_querier_bytes_read" json:"max_querier_bytes_read"`
@ -299,6 +300,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
_ = l.MetadataQuerySplitDuration.Set("24h")
f.Var(&l.MetadataQuerySplitDuration, "querier.split-metadata-queries-by-interval", "Split metadata queries by a time interval and execute in parallel. The value 0 disables splitting metadata queries by time. This also determines how cache keys are chosen when label/series result caching is enabled.")
_ = l.IngesterQuerySplitDuration.Set("0s")
f.Var(&l.IngesterQuerySplitDuration, "querier.split-ingester-queries-by-interval", "Interval to use for time-based splitting when a request is within the `query_ingesters_within` window; defaults to `split-queries-by-interval` by setting to 0.")
f.StringVar(&l.DeletionMode, "compactor.deletion-mode", "filter-and-delete", "Deletion mode. Can be one of 'disabled', 'filter-only', or 'filter-and-delete'. When set to 'filter-only' or 'filter-and-delete', and if retention_enabled is true, then the log entry deletion API endpoints are available.")
// Deprecated
@ -574,6 +578,12 @@ func (o *Overrides) MetadataQuerySplitDuration(userID string) time.Duration {
return time.Duration(o.getOverridesForUser(userID).MetadataQuerySplitDuration)
}
// IngesterQuerySplitDuration returns the tenant specific splitby interval applied in the query frontend when querying
// during the `query_ingesters_within` window.
func (o *Overrides) IngesterQuerySplitDuration(userID string) time.Duration {
return time.Duration(o.getOverridesForUser(userID).IngesterQuerySplitDuration)
}
// MaxQueryBytesRead returns the maximum bytes a query can read.
func (o *Overrides) MaxQueryBytesRead(_ context.Context, userID string) int {
return o.getOverridesForUser(userID).MaxQueryBytesRead.Val()

Loading…
Cancel
Save