diff --git a/pkg/logql/downstream.go b/pkg/logql/downstream.go index ab1d7021a0..3e976bfbf1 100644 --- a/pkg/logql/downstream.go +++ b/pkg/logql/downstream.go @@ -69,7 +69,6 @@ func (ng *DownstreamEngine) Query(ctx context.Context, p Params, mapped syntax.E logger: ng.logger, params: p, evaluator: NewDownstreamEvaluator(ng.downstreamable.Downstreamer(ctx)), - timeout: ng.opts.Timeout, parse: func(_ context.Context, _ string) (syntax.Expr, error) { return mapped, nil }, diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index 8bed5ce93e..905ce43cf9 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -34,6 +34,10 @@ import ( "github.com/grafana/loki/pkg/util/validation" ) +const ( + DefaultEngineTimeout = 5 * time.Minute +) + var ( QueryTime = promauto.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "logql", @@ -110,7 +114,7 @@ type EngineOpts struct { func (opts *EngineOpts) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { // TODO: remove this configuration after next release. - f.DurationVar(&opts.Timeout, prefix+".engine.timeout", 5*time.Minute, "Timeout for query execution. Instead, rely only on querier.query-timeout. (deprecated)") + f.DurationVar(&opts.Timeout, prefix+".engine.timeout", DefaultEngineTimeout, "Timeout for query execution. Instead, rely only on querier.query-timeout. (deprecated)") f.DurationVar(&opts.MaxLookBackPeriod, prefix+".engine.max-lookback-period", 30*time.Second, "The maximum amount of time to look back for log lines. Used only for instant log queries.") } @@ -152,9 +156,8 @@ func (ng *Engine) Query(params Params) Query { parse: func(_ context.Context, query string) (syntax.Expr, error) { return syntax.ParseExpr(query) }, - record: true, - limits: ng.limits, - timeout: ng.Timeout, + record: true, + limits: ng.limits, } } @@ -169,7 +172,6 @@ type query struct { params Params parse func(context.Context, string) (syntax.Expr, error) limits Limits - timeout time.Duration evaluator Evaluator record bool } @@ -238,16 +240,8 @@ func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) { } func (q *query) Eval(ctx context.Context) (promql_parser.Value, error) { - queryTimeout := q.timeout - if q.timeout == 0 { - queryTimeout = time.Minute * 5 - userID, err := tenant.TenantID(ctx) - if err != nil { - level.Warn(q.logger).Log("msg", fmt.Sprintf("couldn't fetch tenantID to evaluate query timeout, using default value of %s", queryTimeout), "err", err) - return nil, err - } - queryTimeout = q.limits.QueryTimeout(userID) + time.Second - } + userID, _ := tenant.TenantID(ctx) + queryTimeout := q.limits.QueryTimeout(userID) ctx, cancel := context.WithTimeout(ctx, queryTimeout) defer cancel() diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 0f6d53790b..0ee5ddc8ba 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -29,6 +29,7 @@ import ( "github.com/grafana/loki/pkg/distributor" "github.com/grafana/loki/pkg/ingester" "github.com/grafana/loki/pkg/ingester/client" + "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/loki/common" "github.com/grafana/loki/pkg/lokifrontend" "github.com/grafana/loki/pkg/querier" @@ -221,6 +222,64 @@ func (c *Config) Validate() error { return err } + if err := AdjustForTimeoutsMigration(c); err != nil { + return err + } + + return nil +} + +// AdjustForTimeoutsMigration will adjust Loki timeouts configuration to be in accordance with the next major release. +// +// We're preparing to unify the querier:engine:timeout and querier:query_timeout into a single timeout named limits_config:query_timeout. +// The migration encompasses of: +// - If limits_config:query_timeout is explicitly configured, use it everywhere as it is a new configuration and by +// configuring it, users are expressing that they're willing of using it. +// - If none are explicitly configured, use the default engine:timeout everywhere as it is longer than the default limits_config:query_timeout +// and otherwise users would start to experience shorter timeouts without expecting it. +// - If only the querier:engine:timeout was explicitly configured, warn the user and use it everywhere. +func AdjustForTimeoutsMigration(c *Config) error { + engineTimeoutIsDefault := c.Querier.Engine.Timeout == logql.DefaultEngineTimeout + perTenantTimeoutIsDefault := c.LimitsConfig.QueryTimeout.String() == validation.DefaultPerTenantQueryTimeout + if engineTimeoutIsDefault && perTenantTimeoutIsDefault { + if err := c.LimitsConfig.QueryTimeout.Set(c.Querier.Engine.Timeout.String()); err != nil { + return fmt.Errorf("couldn't set per-tenant query_timeout as the engine timeout value: %w", err) + } + level.Warn(util_log.Logger).Log("msg", + fmt.Sprintf( + "per-tenant timeout not configured, using default engine timeout (%q). This behavior will change in the next major to always use the default per-tenant timeout (%q).", + c.Querier.Engine.Timeout.String(), + c.LimitsConfig.QueryTimeout.String(), + ), + ) + return nil + } + + if !perTenantTimeoutIsDefault && !engineTimeoutIsDefault { + level.Warn(util_log.Logger).Log("msg", + fmt.Sprintf( + "using configured per-tenant timeout (%q) as the default (can be overridden per-tenant in the limits_config). Configured engine timeout (%q) is deprecated and will be ignored.", + c.LimitsConfig.QueryTimeout.String(), + c.Querier.Engine.Timeout.String(), + ), + ) + return nil + } + + if perTenantTimeoutIsDefault && !engineTimeoutIsDefault { + if err := c.LimitsConfig.QueryTimeout.Set(c.Querier.Engine.Timeout.String()); err != nil { + return fmt.Errorf("couldn't set per-tenant query_timeout as the engine timeout value: %w", err) + } + level.Warn(util_log.Logger).Log("msg", + fmt.Sprintf( + "using configured engine timeout (%q) as the default (can be overridden per-tenant in the limits_config). Be aware that engine timeout (%q) is deprecated and will be removed in the next major version.", + c.Querier.Engine.Timeout.String(), + c.LimitsConfig.QueryTimeout.String(), + ), + ) + return nil + } + return nil } diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 8d76c8b29f..ab9745f7d7 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -47,6 +47,8 @@ const ( defaultPerStreamRateLimit = 3 << 20 // 3MB defaultPerStreamBurstLimit = 5 * defaultPerStreamRateLimit + + DefaultPerTenantQueryTimeout = "1m" ) // Limits describe all the limits for users; can be used to describe global default @@ -195,7 +197,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { _ = l.MaxQueryLength.Set("721h") f.Var(&l.MaxQueryLength, "store.max-query-length", "Limit to length of chunk store queries, 0 to disable.") f.IntVar(&l.MaxQuerySeries, "querier.max-query-series", 500, "Limit the maximum of unique series returned by a metric query. When the limit is reached an error is returned.") - _ = l.QueryTimeout.Set("1m") + _ = l.QueryTimeout.Set(DefaultPerTenantQueryTimeout) f.Var(&l.QueryTimeout, "querier.query-timeout", "Timeout when querying backends (ingesters or storage) during the execution of a query request. If a specific per-tenant timeout is used, this timeout is ignored.") _ = l.MaxQueryLookback.Set("0s")