diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 29e425f459..600a534401 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -4907,6 +4907,15 @@ engine_v2: # of the normal ingesters. # CLI flag: -querier.query-partition-ingesters [query_partition_ingesters: | default = false] + +# Amount of time until data objects are available. +# CLI flag: -querier.dataobj-storage-lag +[dataobj_storage_lag: | default = 1h] + +# Initial date when data objects became available. Format YYYY-MM-DD. If not +# set, assume data objects are always available no matter how far back. +# CLI flag: -querier.dataobj-storage-start +[dataobj_storage_start: | default = ""] ``` ### query_range diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 04cbd7abf1..7e747075b2 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -92,7 +92,7 @@ func (q *QuerierAPI) RangeQueryHandler(ctx context.Context, req *queryrange.Loki return result, err } - if q.cfg.EngineV2.Enable && hasDataObjectsAvailable(params.Start(), params.End()) { + if q.cfg.EngineV2.Enable && hasDataObjectsAvailable(q.cfg, params.Start(), params.End()) { query := q.engineV2.Query(params) result, err = query.Exec(ctx) if err == nil { @@ -109,10 +109,16 @@ func (q *QuerierAPI) RangeQueryHandler(ctx context.Context, req *queryrange.Loki return query.Exec(ctx) } -func hasDataObjectsAvailable(_, end time.Time) bool { +func hasDataObjectsAvailable(config Config, start, end time.Time) bool { // Data objects in object storage lag behind 20-30 minutes. - // We are generous and only enable v2 engine queries that end earlier than 1 hour ago, to ensure data objects are available. - return end.Before(time.Now().Add(-1 * time.Hour)) + // We are generous and only enable v2 engine queries that end earlier than 1DataObjStorageLag ago (default 1h), + // to ensure data objects are available. + if config.DataobjStorageStart != "" { + startTime, _ := time.Parse("2006-01-02", config.DataobjStorageStart) // already validated + return end.Before(time.Now().Add(-1*config.DataobjStorageLag.Abs())) && start.After(startTime) + } + // no start time; assume we always have data objects no matter how far back + return end.Before(time.Now().Add(-1 * config.DataobjStorageLag.Abs())) } // InstantQueryHandler is a http.HandlerFunc for instant queries. @@ -131,7 +137,7 @@ func (q *QuerierAPI) InstantQueryHandler(ctx context.Context, req *queryrange.Lo return logqlmodel.Result{}, err } - if q.cfg.EngineV2.Enable && hasDataObjectsAvailable(params.Start(), params.End()) { + if q.cfg.EngineV2.Enable && hasDataObjectsAvailable(q.cfg, params.Start(), params.End()) { query := q.engineV2.Query(params) result, err := query.Exec(ctx) if err == nil { diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 9f98fb4f3c..a967782ff7 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -66,6 +66,8 @@ type Config struct { IngesterQueryStoreMaxLookback time.Duration `yaml:"-"` QueryPatternIngestersWithin time.Duration `yaml:"-"` + DataobjStorageLag time.Duration `yaml:"dataobj_storage_lag" category:"experimental"` + DataobjStorageStart string `yaml:"dataobj_storage_start" category:"experimental"` } // RegisterFlags register flags. @@ -77,6 +79,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.DurationVar(&cfg.TailMaxDuration, prefix+"tail-max-duration", 1*time.Hour, "Maximum duration for which the live tailing requests are served.") f.DurationVar(&cfg.ExtraQueryDelay, prefix+"extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.") f.DurationVar(&cfg.QueryIngestersWithin, prefix+"query-ingesters-within", 3*time.Hour, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.") + f.DurationVar(&cfg.DataobjStorageLag, prefix+"dataobj-storage-lag", 1*time.Hour, "Amount of time until data objects are available.") cfg.Engine.RegisterFlagsWithPrefix(prefix+"engine.", f) cfg.EngineV2.RegisterFlagsWithPrefix(prefix+"engine-v2.", f) f.IntVar(&cfg.MaxConcurrent, prefix+"max-concurrent", 4, "The maximum number of queries that can be simultaneously processed by the querier.") @@ -85,6 +88,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.BoolVar(&cfg.MultiTenantQueriesEnabled, prefix+"multi-tenant-queries-enabled", false, "When true, allow queries to span multiple tenants.") f.BoolVar(&cfg.PerRequestLimitsEnabled, prefix+"per-request-limits-enabled", false, "When true, querier limits sent via a header are enforced.") f.BoolVar(&cfg.QueryPartitionIngesters, prefix+"query-partition-ingesters", false, "When true, querier directs ingester queries to the partition-ingesters instead of the normal ingesters.") + f.StringVar(&cfg.DataobjStorageStart, prefix+"dataobj-storage-start", "", "Initial date when data objects became available. Format YYYY-MM-DD. If not set, assume data objects are always available no matter how far back.") } // Validate validates the config. @@ -92,6 +96,12 @@ func (cfg *Config) Validate() error { if cfg.QueryStoreOnly && cfg.QueryIngesterOnly { return errors.New("querier.query_store_only and querier.query_ingester_only cannot both be true") } + if cfg.DataobjStorageStart != "" { + _, err := time.Parse("2006-01-02", cfg.DataobjStorageStart) + if err != nil { + return errors.Wrap(err, "data_obj_storage_start must be a valid date") + } + } return nil }