diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index d78dcbac51..1036fa315a 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -113,7 +113,7 @@ func New( clientCfg client.Config, configs *runtime.TenantConfigs, ingestersRing ring.ReadRing, - overrides *validation.Overrides, + overrides Limits, registerer prometheus.Registerer, ) (*Distributor, error) { factory := cfg.factory diff --git a/pkg/distributor/ingestion_rate_strategy.go b/pkg/distributor/ingestion_rate_strategy.go index 99422da846..522eae9a81 100644 --- a/pkg/distributor/ingestion_rate_strategy.go +++ b/pkg/distributor/ingestion_rate_strategy.go @@ -2,8 +2,6 @@ package distributor import ( "github.com/grafana/dskit/limiter" - - "github.com/grafana/loki/pkg/validation" ) // ReadLifecycler represents the read interface to the lifecycler. @@ -12,10 +10,10 @@ type ReadLifecycler interface { } type localStrategy struct { - limits *validation.Overrides + limits Limits } -func newLocalIngestionRateStrategy(limits *validation.Overrides) limiter.RateLimiterStrategy { +func newLocalIngestionRateStrategy(limits Limits) limiter.RateLimiterStrategy { return &localStrategy{ limits: limits, } @@ -30,11 +28,11 @@ func (s *localStrategy) Burst(userID string) int { } type globalStrategy struct { - limits *validation.Overrides + limits Limits ring ReadLifecycler } -func newGlobalIngestionRateStrategy(limits *validation.Overrides, ring ReadLifecycler) limiter.RateLimiterStrategy { +func newGlobalIngestionRateStrategy(limits Limits, ring ReadLifecycler) limiter.RateLimiterStrategy { return &globalStrategy{ limits: limits, ring: ring, diff --git a/pkg/distributor/limits.go b/pkg/distributor/limits.go index f1fd2539a8..f7cf91df62 100644 --- a/pkg/distributor/limits.go +++ b/pkg/distributor/limits.go @@ -3,13 +3,13 @@ package distributor import ( "time" - "github.com/grafana/loki/pkg/validation" - "github.com/grafana/loki/pkg/distributor/shardstreams" + "github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention" ) // Limits is an interface for distributor limits/related configs type Limits interface { + retention.Limits MaxLineSize(userID string) int MaxLineSizeTruncate(userID string) bool EnforceMetricName(userID string) bool @@ -24,5 +24,7 @@ type Limits interface { IncrementDuplicateTimestamps(userID string) bool ShardStreams(userID string) *shardstreams.Config - AllByUserID() map[string]*validation.Limits + IngestionRateStrategy() string + IngestionRateBytes(userID string) float64 + IngestionBurstSizeBytes(userID string) int } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 5c262f3e28..91b47e14ee 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -42,7 +42,6 @@ import ( errUtil "github.com/grafana/loki/pkg/util" util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/util/wal" - "github.com/grafana/loki/pkg/validation" ) const ( @@ -235,7 +234,7 @@ type Ingester struct { } // New makes a new Ingester. -func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *validation.Overrides, configs *runtime.TenantConfigs, registerer prometheus.Registerer) (*Ingester, error) { +func New(cfg Config, clientConfig client.Config, store ChunkStore, limits Limits, configs *runtime.TenantConfigs, registerer prometheus.Registerer) (*Ingester, error) { if cfg.ingesterClientFactory == nil { cfg.ingesterClientFactory = client.New } diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go index 375e4802e2..e48c2a018d 100644 --- a/pkg/ingester/limiter.go +++ b/pkg/ingester/limiter.go @@ -8,6 +8,7 @@ import ( "golang.org/x/time/rate" + "github.com/grafana/loki/pkg/distributor/shardstreams" "github.com/grafana/loki/pkg/validation" ) @@ -21,10 +22,18 @@ type RingCount interface { HealthyInstancesCount() int } +type Limits interface { + UnorderedWrites(userID string) bool + MaxLocalStreamsPerUser(userID string) int + MaxGlobalStreamsPerUser(userID string) int + PerStreamRateLimit(userID string) validation.RateLimit + ShardStreams(userID string) *shardstreams.Config +} + // Limiter implements primitives to get the maximum number of streams // an ingester can handle for a specific tenant type Limiter struct { - limits *validation.Overrides + limits Limits ring RingCount replicationFactor int metrics *ingesterMetrics @@ -48,7 +57,7 @@ func (l *Limiter) Enable() { } // NewLimiter makes a new limiter -func NewLimiter(limits *validation.Overrides, metrics *ingesterMetrics, ring RingCount, replicationFactor int) *Limiter { +func NewLimiter(limits Limits, metrics *ingesterMetrics, ring RingCount, replicationFactor int) *Limiter { return &Limiter{ limits: limits, ring: ring, diff --git a/pkg/logcli/client/file.go b/pkg/logcli/client/file.go index 0225b874ba..e135b1517f 100644 --- a/pkg/logcli/client/file.go +++ b/pkg/logcli/client/file.go @@ -186,15 +186,15 @@ type limiter struct { n int } -func (l *limiter) MaxQuerySeries(userID string) int { +func (l *limiter) MaxQuerySeries(ctx context.Context, userID string) int { return l.n } -func (l *limiter) QueryTimeout(userID string) time.Duration { +func (l *limiter) QueryTimeout(ctx context.Context, userID string) time.Duration { return time.Minute * 5 } -func (l *limiter) BlockedQueries(userID string) []*validation.BlockedQuery { +func (l *limiter) BlockedQueries(ctx context.Context, userID string) []*validation.BlockedQuery { return []*validation.BlockedQuery{} } diff --git a/pkg/logql/blocker.go b/pkg/logql/blocker.go index cf80aaaf71..7c57537ed5 100644 --- a/pkg/logql/blocker.go +++ b/pkg/logql/blocker.go @@ -26,8 +26,8 @@ func newQueryBlocker(ctx context.Context, q *query) *queryBlocker { } } -func (qb *queryBlocker) isBlocked(tenant string) bool { - patterns := qb.q.limits.BlockedQueries(tenant) +func (qb *queryBlocker) isBlocked(ctx context.Context, tenant string) bool { + patterns := qb.q.limits.BlockedQueries(ctx, tenant) if len(patterns) <= 0 { return false } diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index 4863ea79e0..56e87ed0fb 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -262,7 +262,8 @@ func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) { func (q *query) Eval(ctx context.Context) (promql_parser.Value, error) { tenants, _ := tenant.TenantIDs(ctx) - queryTimeout := validation.SmallestPositiveNonZeroDurationPerTenant(tenants, q.limits.QueryTimeout) + timeoutCapture := func(id string) time.Duration { return q.limits.QueryTimeout(ctx, id) } + queryTimeout := validation.SmallestPositiveNonZeroDurationPerTenant(tenants, timeoutCapture) ctx, cancel := context.WithTimeout(ctx, queryTimeout) defer cancel() @@ -299,7 +300,7 @@ func (q *query) checkBlocked(ctx context.Context, tenants []string) bool { blocker := newQueryBlocker(ctx, q) for _, tenant := range tenants { - if blocker.isBlocked(tenant) { + if blocker.isBlocked(ctx, tenant) { QueriesBlocked.WithLabelValues(tenant).Inc() return true } @@ -332,7 +333,8 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_ if err != nil { return nil, err } - maxSeries := validation.SmallestPositiveIntPerTenant(tenantIDs, q.limits.MaxQuerySeries) + maxSeriesCapture := func(id string) int { return q.limits.MaxQuerySeries(ctx, id) } + maxSeries := validation.SmallestPositiveIntPerTenant(tenantIDs, maxSeriesCapture) seriesIndex := map[uint64]*promql.Series{} next, ts, vec := stepEvaluator.Next() diff --git a/pkg/logql/limits.go b/pkg/logql/limits.go index 2b3523e3c6..9a256610b6 100644 --- a/pkg/logql/limits.go +++ b/pkg/logql/limits.go @@ -1,6 +1,7 @@ package logql import ( + "context" "math" "time" @@ -13,9 +14,9 @@ var ( // Limits allow the engine to fetch limits for a given users. type Limits interface { - MaxQuerySeries(userID string) int - QueryTimeout(userID string) time.Duration - BlockedQueries(userID string) []*validation.BlockedQuery + MaxQuerySeries(context.Context, string) int + QueryTimeout(context.Context, string) time.Duration + BlockedQueries(context.Context, string) []*validation.BlockedQuery } type fakeLimits struct { @@ -24,14 +25,14 @@ type fakeLimits struct { blockedQueries []*validation.BlockedQuery } -func (f fakeLimits) MaxQuerySeries(userID string) int { +func (f fakeLimits) MaxQuerySeries(ctx context.Context, userID string) int { return f.maxSeries } -func (f fakeLimits) QueryTimeout(userID string) time.Duration { +func (f fakeLimits) QueryTimeout(ctx context.Context, userID string) time.Duration { return f.timeout } -func (f fakeLimits) BlockedQueries(userID string) []*validation.BlockedQuery { +func (f fakeLimits) BlockedQueries(ctx context.Context, userID string) []*validation.BlockedQuery { return f.blockedQueries } diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index d37466151f..434aa0aa1a 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -5,12 +5,13 @@ import ( "context" "flag" "fmt" - "go.uber.org/atomic" "net/http" "os" rt "runtime" "time" + "go.uber.org/atomic" + "github.com/fatih/color" "github.com/felixge/fgprof" "github.com/go-kit/log/level" @@ -328,6 +329,17 @@ type Frontend interface { CheckReady(_ context.Context) error } +type CombinedLimits interface { + compactor.Limits + distributor.Limits + ingester.Limits + querier.Limits + queryrange.Limits + ruler.RulesLimits + scheduler.Limits + storage.StoreLimits +} + // Loki is the root datastructure for Loki. type Loki struct { Cfg Config @@ -341,7 +353,7 @@ type Loki struct { Server *server.Server InternalServer *server.Server ring *ring.Ring - overrides *validation.Overrides + overrides CombinedLimits tenantConfigs *runtime.TenantConfigs TenantLimits validation.TenantLimits distributor *distributor.Distributor diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 32da8db4f8..a005a6a344 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1138,7 +1138,7 @@ func (t *Loki) initUsageReport() (services.Service, error) { return ur, nil } -func (t *Loki) deleteRequestsClient(clientType string, limits *validation.Overrides) (deletion.DeleteRequestsClient, error) { +func (t *Loki) deleteRequestsClient(clientType string, limits CombinedLimits) (deletion.DeleteRequestsClient, error) { if !t.supportIndexDeleteRequest() || !t.Cfg.CompactorConfig.RetentionEnabled { return deletion.NewNoOpDeleteRequestsStore(), nil } diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 75fadff135..77ccac50a1 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -33,7 +33,6 @@ import ( serverutil "github.com/grafana/loki/pkg/util/server" "github.com/grafana/loki/pkg/util/spanlogger" util_validation "github.com/grafana/loki/pkg/util/validation" - "github.com/grafana/loki/pkg/validation" ) const ( @@ -49,12 +48,12 @@ type QueryResponse struct { type QuerierAPI struct { querier Querier cfg Config - limits *validation.Overrides + limits Limits engine *logql.Engine } // NewQuerierAPI returns an instance of the QuerierAPI. -func NewQuerierAPI(cfg Config, querier Querier, limits *validation.Overrides, logger log.Logger) *QuerierAPI { +func NewQuerierAPI(cfg Config, querier Querier, limits Limits, logger log.Logger) *QuerierAPI { engine := logql.NewEngine(cfg.Engine, querier, limits, logger) return &QuerierAPI{ cfg: cfg, @@ -473,7 +472,8 @@ func (q *QuerierAPI) validateEntriesLimits(ctx context.Context, query string, li return nil } - maxEntriesLimit := util_validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, q.limits.MaxEntriesLimitPerQuery) + maxEntriesCapture := func(id string) int { return q.limits.MaxEntriesLimitPerQuery(ctx, id) } + maxEntriesLimit := util_validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, maxEntriesCapture) if int(limit) > maxEntriesLimit && maxEntriesLimit != 0 { return httpgrpc.Errorf(http.StatusBadRequest, "max entries limit per query exceeded, limit > max_entries_limit (%d > %d)", limit, maxEntriesLimit) @@ -495,7 +495,8 @@ func WrapQuerySpanAndTimeout(call string, q *QuerierAPI) middleware.Interface { return } - timeout := util_validation.SmallestPositiveNonZeroDurationPerTenant(tenants, q.limits.QueryTimeout) + timeoutCapture := func(id string) time.Duration { return q.limits.QueryTimeout(ctx, id) } + timeout := util_validation.SmallestPositiveNonZeroDurationPerTenant(tenants, timeoutCapture) // TODO: remove this clause once we remove the deprecated query-timeout flag. if q.cfg.QueryTimeout != 0 { // querier YAML configuration is still configured. level.Warn(log).Log("msg", "deprecated querier:query_timeout YAML configuration identified. Please migrate to limits:query_timeout instead.", "call", "WrapQuerySpanAndTimeout", "org_id", strings.Join(tenants, ",")) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 6ca088d35e..49bd260c66 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -27,7 +27,6 @@ import ( util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/util/spanlogger" util_validation "github.com/grafana/loki/pkg/util/validation" - "github.com/grafana/loki/pkg/validation" ) const ( @@ -86,11 +85,20 @@ type Querier interface { IndexStats(ctx context.Context, req *loghttp.RangeQuery) (*stats.Stats, error) } +type Limits interface { + logql.Limits + timeRangeLimits + QueryTimeout(context.Context, string) time.Duration + MaxStreamsMatchersPerQuery(context.Context, string) int + MaxConcurrentTailRequests(context.Context, string) int + MaxEntriesLimitPerQuery(context.Context, string) int +} + // SingleTenantQuerier handles single tenant queries. type SingleTenantQuerier struct { cfg Config store storage.Store - limits *validation.Overrides + limits Limits ingesterQuerier *IngesterQuerier deleteGetter deleteGetter metrics *Metrics @@ -101,7 +109,7 @@ type deleteGetter interface { } // New makes a new Querier. -func New(cfg Config, store storage.Store, ingesterQuerier *IngesterQuerier, limits *validation.Overrides, d deleteGetter, r prometheus.Registerer) (*SingleTenantQuerier, error) { +func New(cfg Config, store storage.Store, ingesterQuerier *IngesterQuerier, limits Limits, d deleteGetter, r prometheus.Registerer) (*SingleTenantQuerier, error) { return &SingleTenantQuerier{ cfg: cfg, store: store, @@ -355,7 +363,7 @@ func (q *SingleTenantQuerier) Label(ctx context.Context, req *logproto.LabelRequ } // Enforce the query timeout while querying backends - queryTimeout := q.limits.QueryTimeout(userID) + queryTimeout := q.limits.QueryTimeout(ctx, userID) // TODO: remove this clause once we remove the deprecated query-timeout flag. if q.cfg.QueryTimeout != 0 { // querier YAML configuration. level.Warn(util_log.Logger).Log("msg", "deprecated querier:query_timeout YAML configuration identified. Please migrate to limits:query_timeout instead.", "err", err, "call", "Label") @@ -449,7 +457,7 @@ func (q *SingleTenantQuerier) Tail(ctx context.Context, req *logproto.TailReques if err != nil { return nil, errors.Wrap(err, "failed to load tenant") } - queryTimeout := q.limits.QueryTimeout(tenantID) + queryTimeout := q.limits.QueryTimeout(tailCtx, tenantID) // TODO: remove this clause once we remove the deprecated query-timeout flag. if q.cfg.QueryTimeout != 0 { // querier YAML configuration. level.Warn(util_log.Logger).Log("msg", "deprecated querier:query_timeout YAML configuration identified. Please migrate to limits:query_timeout instead.", "call", "SingleTenantQuerier/Tail") @@ -498,7 +506,7 @@ func (q *SingleTenantQuerier) Series(ctx context.Context, req *logproto.SeriesRe } // Enforce the query timeout while querying backends - queryTimeout := q.limits.QueryTimeout(userID) + queryTimeout := q.limits.QueryTimeout(ctx, userID) // TODO: remove this clause once we remove the deprecated query-timeout flag. if q.cfg.QueryTimeout != 0 { // querier YAML configuration. level.Warn(util_log.Logger).Log("msg", "deprecated querier:query_timeout YAML configuration identified. Please migrate to limits:query_timeout instead.", "call", "SingleTenantQuerier/Series") @@ -643,7 +651,7 @@ func (q *SingleTenantQuerier) validateQueryRequest(ctx context.Context, req logq } matchers := selector.Matchers() - maxStreamMatchersPerQuery := q.limits.MaxStreamsMatchersPerQuery(userID) + maxStreamMatchersPerQuery := q.limits.MaxStreamsMatchersPerQuery(ctx, userID) if len(matchers) > maxStreamMatchersPerQuery { return time.Time{}, time.Time{}, httpgrpc.Errorf(http.StatusBadRequest, "max streams matchers per query exceeded, matchers-count > limit (%d > %d)", len(matchers), maxStreamMatchersPerQuery) @@ -653,15 +661,15 @@ func (q *SingleTenantQuerier) validateQueryRequest(ctx context.Context, req logq } type timeRangeLimits interface { - MaxQueryLookback(string) time.Duration - MaxQueryLength(string) time.Duration + MaxQueryLookback(context.Context, string) time.Duration + MaxQueryLength(context.Context, string) time.Duration } func validateQueryTimeRangeLimits(ctx context.Context, userID string, limits timeRangeLimits, from, through time.Time) (time.Time, time.Time, error) { now := nowFunc() // Clamp the time range based on the max query lookback. var maxQueryLookback time.Duration - if maxQueryLookback = limits.MaxQueryLookback(userID); maxQueryLookback > 0 && from.Before(now.Add(-maxQueryLookback)) { + if maxQueryLookback = limits.MaxQueryLookback(ctx, userID); maxQueryLookback > 0 && from.Before(now.Add(-maxQueryLookback)) { origStartTime := from from = now.Add(-maxQueryLookback) @@ -671,7 +679,7 @@ func validateQueryTimeRangeLimits(ctx context.Context, userID string, limits tim "updated", from) } - if maxQueryLength := limits.MaxQueryLength(userID); maxQueryLength > 0 && (through).Sub(from) > maxQueryLength { + if maxQueryLength := limits.MaxQueryLength(ctx, userID); maxQueryLength > 0 && (through).Sub(from) > maxQueryLength { return time.Time{}, time.Time{}, httpgrpc.Errorf(http.StatusBadRequest, util_validation.ErrQueryTooLong, (through).Sub(from), maxQueryLength) } if through.Before(from) { @@ -700,7 +708,7 @@ func (q *SingleTenantQuerier) checkTailRequestLimit(ctx context.Context) error { maxCnt = resp } } - l := uint32(q.limits.MaxConcurrentTailRequests(userID)) + l := uint32(q.limits.MaxConcurrentTailRequests(ctx, userID)) if maxCnt >= l { return httpgrpc.Errorf(http.StatusBadRequest, "max concurrent tail requests limit exceeded, count > limit (%d > %d)", maxCnt+1, l) @@ -726,7 +734,7 @@ func (q *SingleTenantQuerier) IndexStats(ctx context.Context, req *loghttp.Range } // Enforce the query timeout while querying backends - queryTimeout := q.limits.QueryTimeout(userID) + queryTimeout := q.limits.QueryTimeout(ctx, userID) // TODO: remove this clause once we remove the deprecated query-timeout flag. if q.cfg.QueryTimeout != 0 { // querier YAML configuration. level.Warn(util_log.Logger).Log("msg", "deprecated querier:query_timeout YAML configuration identified. Please migrate to limits:query_timeout instead.", "call", "SingleTenantQuerier/IndexStats") diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 5631b0285d..f19d12b42e 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -1014,8 +1014,12 @@ type fakeTimeLimits struct { maxQueryLength time.Duration } -func (f fakeTimeLimits) MaxQueryLookback(_ string) time.Duration { return f.maxQueryLookback } -func (f fakeTimeLimits) MaxQueryLength(_ string) time.Duration { return f.maxQueryLength } +func (f fakeTimeLimits) MaxQueryLookback(_ context.Context, _ string) time.Duration { + return f.maxQueryLookback +} +func (f fakeTimeLimits) MaxQueryLength(_ context.Context, _ string) time.Duration { + return f.maxQueryLength +} func Test_validateQueryTimeRangeLimits(t *testing.T) { now := time.Now() diff --git a/pkg/querier/queryrange/downstreamer.go b/pkg/querier/queryrange/downstreamer.go index 5d6720ca55..1fcfcca1e1 100644 --- a/pkg/querier/queryrange/downstreamer.go +++ b/pkg/querier/queryrange/downstreamer.go @@ -64,7 +64,7 @@ func (h DownstreamHandler) Downstreamer(ctx context.Context) logql.Downstreamer // We may increase parallelism above the default, // ensure we don't end up bottlenecking here. if user, err := tenant.TenantID(ctx); err == nil { - if x := h.limits.MaxQueryParallelism(user); x > 0 { + if x := h.limits.MaxQueryParallelism(ctx, user); x > 0 { p = x } } diff --git a/pkg/querier/queryrange/limits.go b/pkg/querier/queryrange/limits.go index c10b489324..6610a2cf4d 100644 --- a/pkg/querier/queryrange/limits.go +++ b/pkg/querier/queryrange/limits.go @@ -39,12 +39,12 @@ type Limits interface { queryrangebase.Limits logql.Limits QuerySplitDuration(string) time.Duration - MaxQuerySeries(string) int - MaxEntriesLimitPerQuery(string) int + MaxQuerySeries(context.Context, string) int + MaxEntriesLimitPerQuery(context.Context, string) int MinShardingLookback(string) time.Duration // TSDBMaxQueryParallelism returns the limit to the number of split queries the // frontend will process in parallel for TSDB queries. - TSDBMaxQueryParallelism(string) int + TSDBMaxQueryParallelism(context.Context, string) int } type limits struct { @@ -61,16 +61,16 @@ func (l limits) QuerySplitDuration(user string) time.Duration { return *l.splitDuration } -func (l limits) TSDBMaxQueryParallelism(user string) int { +func (l limits) TSDBMaxQueryParallelism(ctx context.Context, user string) int { if l.maxQueryParallelism == nil { - return l.Limits.TSDBMaxQueryParallelism(user) + return l.Limits.TSDBMaxQueryParallelism(ctx, user) } return *l.maxQueryParallelism } -func (l limits) MaxQueryParallelism(user string) int { +func (l limits) MaxQueryParallelism(ctx context.Context, user string) int { if l.maxQueryParallelism == nil { - return l.Limits.MaxQueryParallelism(user) + return l.Limits.MaxQueryParallelism(ctx, user) } return *l.maxQueryParallelism } @@ -141,7 +141,8 @@ func (l limitsMiddleware) Do(ctx context.Context, r queryrangebase.Request) (que // Clamp the time range based on the max query lookback. - if maxQueryLookback := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, l.MaxQueryLookback); maxQueryLookback > 0 { + lookbackCapture := func(id string) time.Duration { return l.MaxQueryLookback(ctx, id) } + if maxQueryLookback := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, lookbackCapture); maxQueryLookback > 0 { minStartTime := util.TimeToMillis(time.Now().Add(-maxQueryLookback)) if r.GetEnd() < minStartTime { @@ -168,7 +169,8 @@ func (l limitsMiddleware) Do(ctx context.Context, r queryrangebase.Request) (que } // Enforce the max query length. - if maxQueryLength := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, l.MaxQueryLength); maxQueryLength > 0 { + lengthCapture := func(id string) time.Duration { return l.MaxQueryLength(ctx, id) } + if maxQueryLength := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, lengthCapture); maxQueryLength > 0 { queryLen := timestamp.Time(r.GetEnd()).Sub(timestamp.Time(r.GetStart())) if queryLen > maxQueryLength { return nil, httpgrpc.Errorf(http.StatusBadRequest, validation.ErrQueryTooLong, queryLen, maxQueryLength) @@ -393,8 +395,8 @@ func WeightedParallelism( ) int { logger := util_log.WithContext(ctx, util_log.Logger) - tsdbMaxQueryParallelism := l.TSDBMaxQueryParallelism(user) - regMaxQueryParallelism := l.MaxQueryParallelism(user) + tsdbMaxQueryParallelism := l.TSDBMaxQueryParallelism(ctx, user) + regMaxQueryParallelism := l.MaxQueryParallelism(ctx, user) if tsdbMaxQueryParallelism+regMaxQueryParallelism == 0 { level.Info(logger).Log("msg", "querying disabled for tenant") return 0 @@ -434,9 +436,9 @@ func WeightedParallelism( if start.Equal(end) { switch configs[i].IndexType { case config.TSDBType: - return l.TSDBMaxQueryParallelism(user) + return l.TSDBMaxQueryParallelism(ctx, user) } - return l.MaxQueryParallelism(user) + return l.MaxQueryParallelism(ctx, user) } diff --git a/pkg/querier/queryrange/log_result_cache.go b/pkg/querier/queryrange/log_result_cache.go index 48533ccb71..e009fda6d9 100644 --- a/pkg/querier/queryrange/log_result_cache.go +++ b/pkg/querier/queryrange/log_result_cache.go @@ -89,7 +89,8 @@ func (l *logResultCache) Do(ctx context.Context, req queryrangebase.Request) (qu return l.next.Do(ctx, req) } - maxCacheFreshness := validation.MaxDurationPerTenant(tenantIDs, l.limits.MaxCacheFreshness) + cacheFreshnessCapture := func(id string) time.Duration { return l.limits.MaxCacheFreshness(ctx, id) } + maxCacheFreshness := validation.MaxDurationPerTenant(tenantIDs, cacheFreshnessCapture) maxCacheTime := int64(model.Now().Add(-maxCacheFreshness)) if req.GetEnd() > maxCacheTime { return l.next.Do(ctx, req) diff --git a/pkg/querier/queryrange/queryrangebase/limits.go b/pkg/querier/queryrange/queryrangebase/limits.go index 3134485e9a..b3e1bf8061 100644 --- a/pkg/querier/queryrange/queryrangebase/limits.go +++ b/pkg/querier/queryrange/queryrangebase/limits.go @@ -1,6 +1,7 @@ package queryrangebase import ( + "context" "time" ) @@ -8,16 +9,16 @@ import ( // the query handling code. type Limits interface { // MaxQueryLookback returns the max lookback period of queries. - MaxQueryLookback(userID string) time.Duration + MaxQueryLookback(context.Context, string) time.Duration // MaxQueryLength returns the limit of the length (in time) of a query. - MaxQueryLength(string) time.Duration + MaxQueryLength(context.Context, string) time.Duration // MaxQueryParallelism returns the limit to the number of split queries the // frontend will process in parallel. - MaxQueryParallelism(tenant string) int + MaxQueryParallelism(context.Context, string) int // MaxCacheFreshness returns the period after which results are cacheable, // to prevent caching of very recent results. - MaxCacheFreshness(string) time.Duration + MaxCacheFreshness(context.Context, string) time.Duration } diff --git a/pkg/querier/queryrange/queryrangebase/limits_test.go b/pkg/querier/queryrange/queryrangebase/limits_test.go index b9f86dc748..b41ece2b50 100644 --- a/pkg/querier/queryrange/queryrangebase/limits_test.go +++ b/pkg/querier/queryrange/queryrangebase/limits_test.go @@ -1,6 +1,7 @@ package queryrangebase import ( + "context" "time" ) @@ -10,18 +11,18 @@ type mockLimits struct { maxCacheFreshness time.Duration } -func (m mockLimits) MaxQueryLookback(string) time.Duration { +func (m mockLimits) MaxQueryLookback(context.Context, string) time.Duration { return m.maxQueryLookback } -func (m mockLimits) MaxQueryLength(string) time.Duration { +func (m mockLimits) MaxQueryLength(context.Context, string) time.Duration { return m.maxQueryLength } -func (mockLimits) MaxQueryParallelism(string) int { +func (mockLimits) MaxQueryParallelism(context.Context, string) int { return 14 // Flag default. } -func (m mockLimits) MaxCacheFreshness(string) time.Duration { +func (m mockLimits) MaxCacheFreshness(context.Context, string) time.Duration { return m.maxCacheFreshness } diff --git a/pkg/querier/queryrange/queryrangebase/results_cache.go b/pkg/querier/queryrange/queryrangebase/results_cache.go index 3b61dcefd6..5fb166756a 100644 --- a/pkg/querier/queryrange/queryrangebase/results_cache.go +++ b/pkg/querier/queryrange/queryrangebase/results_cache.go @@ -231,7 +231,8 @@ func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) { response Response ) - maxCacheFreshness := validation.MaxDurationPerTenant(tenantIDs, s.limits.MaxCacheFreshness) + cacheFreshnessCapture := func(id string) time.Duration { return s.limits.MaxCacheFreshness(ctx, id) } + maxCacheFreshness := validation.MaxDurationPerTenant(tenantIDs, cacheFreshnessCapture) maxCacheTime := int64(model.Now().Add(-maxCacheFreshness)) if r.GetStart() > maxCacheTime { return s.next.Do(ctx, r) diff --git a/pkg/querier/queryrange/queryrangebase/results_cache_test.go b/pkg/querier/queryrange/queryrangebase/results_cache_test.go index 6863889356..85a1dfb67e 100644 --- a/pkg/querier/queryrange/queryrangebase/results_cache_test.go +++ b/pkg/querier/queryrange/queryrangebase/results_cache_test.go @@ -767,7 +767,7 @@ func TestResultsCache(t *testing.T) { nil, nil, func(_ context.Context, tenantIDs []string, r Request) int { - return mockLimits{}.MaxQueryParallelism("fake") + return mockLimits{}.MaxQueryParallelism(context.Background(), "fake") }, false, nil, @@ -813,7 +813,7 @@ func TestResultsCacheRecent(t *testing.T) { nil, nil, func(_ context.Context, tenantIDs []string, r Request) int { - return mockLimits{}.MaxQueryParallelism("fake") + return mockLimits{}.MaxQueryParallelism(context.Background(), "fake") }, false, nil, @@ -881,7 +881,7 @@ func TestResultsCacheMaxFreshness(t *testing.T) { nil, nil, func(_ context.Context, tenantIDs []string, r Request) int { - return tc.fakeLimits.MaxQueryParallelism("fake") + return tc.fakeLimits.MaxQueryParallelism(context.Background(), "fake") }, false, nil, @@ -924,7 +924,7 @@ func Test_resultsCache_MissingData(t *testing.T) { nil, nil, func(_ context.Context, tenantIDs []string, r Request) int { - return mockLimits{}.MaxQueryParallelism("fake") + return mockLimits{}.MaxQueryParallelism(context.Background(), "fake") }, false, nil, @@ -1039,7 +1039,7 @@ func TestResultsCacheShouldCacheFunc(t *testing.T) { nil, tc.shouldCache, func(_ context.Context, tenantIDs []string, r Request) int { - return mockLimits{}.MaxQueryParallelism("fake") + return mockLimits{}.MaxQueryParallelism(context.Background(), "fake") }, false, nil, diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 564e8d2840..761e1b77ef 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -245,7 +245,8 @@ func validateLimits(req *http.Request, reqLimit uint32, limits Limits) error { return httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } - maxEntriesLimit := validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, limits.MaxEntriesLimitPerQuery) + maxEntriesCapture := func(id string) int { return limits.MaxEntriesLimitPerQuery(req.Context(), id) } + maxEntriesLimit := validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, maxEntriesCapture) if int(reqLimit) > maxEntriesLimit && maxEntriesLimit != 0 { return httpgrpc.Errorf(http.StatusBadRequest, "max entries limit per query exceeded, limit > max_entries_limit (%d > %d)", reqLimit, maxEntriesLimit) diff --git a/pkg/querier/queryrange/roundtrip_test.go b/pkg/querier/queryrange/roundtrip_test.go index fda0e09567..169876de47 100644 --- a/pkg/querier/queryrange/roundtrip_test.go +++ b/pkg/querier/queryrange/roundtrip_test.go @@ -591,34 +591,34 @@ func (f fakeLimits) QuerySplitDuration(key string) time.Duration { return f.splits[key] } -func (f fakeLimits) MaxQueryLength(string) time.Duration { +func (f fakeLimits) MaxQueryLength(context.Context, string) time.Duration { if f.maxQueryLength == 0 { return time.Hour * 7 } return f.maxQueryLength } -func (f fakeLimits) MaxQueryParallelism(string) int { +func (f fakeLimits) MaxQueryParallelism(context.Context, string) int { return f.maxQueryParallelism } -func (f fakeLimits) TSDBMaxQueryParallelism(string) int { +func (f fakeLimits) TSDBMaxQueryParallelism(context.Context, string) int { return f.tsdbMaxQueryParallelism } -func (f fakeLimits) MaxEntriesLimitPerQuery(string) int { +func (f fakeLimits) MaxEntriesLimitPerQuery(context.Context, string) int { return f.maxEntriesLimitPerQuery } -func (f fakeLimits) MaxQuerySeries(string) int { +func (f fakeLimits) MaxQuerySeries(context.Context, string) int { return f.maxSeries } -func (f fakeLimits) MaxCacheFreshness(string) time.Duration { +func (f fakeLimits) MaxCacheFreshness(context.Context, string) time.Duration { return 1 * time.Minute } -func (f fakeLimits) MaxQueryLookback(string) time.Duration { +func (f fakeLimits) MaxQueryLookback(context.Context, string) time.Duration { return f.maxQueryLookback } @@ -626,11 +626,11 @@ func (f fakeLimits) MinShardingLookback(string) time.Duration { return f.minShardingLookback } -func (f fakeLimits) QueryTimeout(string) time.Duration { +func (f fakeLimits) QueryTimeout(context.Context, string) time.Duration { return f.queryTimeout } -func (f fakeLimits) BlockedQueries(string) []*validation.BlockedQuery { +func (f fakeLimits) BlockedQueries(context.Context, string) []*validation.BlockedQuery { return []*validation.BlockedQuery{} } diff --git a/pkg/querier/queryrange/split_by_interval.go b/pkg/querier/queryrange/split_by_interval.go index 07eb86c989..aaae5ac842 100644 --- a/pkg/querier/queryrange/split_by_interval.go +++ b/pkg/querier/queryrange/split_by_interval.go @@ -220,7 +220,8 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrangebase.Request) (que }) } - maxSeries := validation.SmallestPositiveIntPerTenant(tenantIDs, h.limits.MaxQuerySeries) + maxSeriesCapture := func(id string) int { return h.limits.MaxQuerySeries(ctx, id) } + maxSeries := validation.SmallestPositiveIntPerTenant(tenantIDs, maxSeriesCapture) maxParallelism := MinWeightedParallelism(ctx, tenantIDs, h.configs, h.limits, model.Time(r.GetStart()), model.Time(r.GetEnd())) resps, err := h.Process(ctx, maxParallelism, limit, input, maxSeries) if err != nil { diff --git a/pkg/storage/factory.go b/pkg/storage/factory.go index b211710e34..32e79a4d28 100644 --- a/pkg/storage/factory.go +++ b/pkg/storage/factory.go @@ -25,6 +25,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/client/openstack" "github.com/grafana/loki/pkg/storage/chunk/client/testutils" "github.com/grafana/loki/pkg/storage/config" + "github.com/grafana/loki/pkg/storage/stores" "github.com/grafana/loki/pkg/storage/stores/indexshipper" "github.com/grafana/loki/pkg/storage/stores/indexshipper/downloads" "github.com/grafana/loki/pkg/storage/stores/indexshipper/gatewayclient" @@ -53,9 +54,8 @@ func ResetBoltDBIndexClientWithShipper() { // StoreLimits helps get Limits specific to Queries for Stores type StoreLimits interface { downloads.Limits - CardinalityLimit(userID string) int - MaxChunksPerQueryFromStore(userID string) int - MaxQueryLength(userID string) time.Duration + stores.StoreLimits + CardinalityLimit(string) int } // NamedStores helps configure additional object stores from a given storage provider diff --git a/pkg/storage/stores/composite_store_entry.go b/pkg/storage/stores/composite_store_entry.go index 46993b7eb6..f1d62e518b 100644 --- a/pkg/storage/stores/composite_store_entry.go +++ b/pkg/storage/stores/composite_store_entry.go @@ -22,8 +22,8 @@ import ( var _ Store = &compositeStore{} type StoreLimits interface { - MaxChunksPerQueryFromStore(userID string) int - MaxQueryLength(userID string) time.Duration + MaxChunksPerQueryFromStore(string) int + MaxQueryLength(context.Context, string) time.Duration } type ChunkWriter interface { @@ -135,7 +135,7 @@ func (c *storeEntry) validateQueryTimeRange(ctx context.Context, userID string, return false, errors.QueryError(fmt.Sprintf("invalid query, through < from (%s < %s)", through, from)) } - maxQueryLength := c.limits.MaxQueryLength(userID) + maxQueryLength := c.limits.MaxQueryLength(ctx, userID) if maxQueryLength > 0 && (*through).Sub(*from) > maxQueryLength { return false, errors.QueryError(fmt.Sprintf(validation.ErrQueryTooLong, (*through).Sub(*from), maxQueryLength)) } diff --git a/pkg/storage/stores/indexshipper/compactor/compactor.go b/pkg/storage/stores/indexshipper/compactor/compactor.go index a1bd4e5dda..d6f40c2c3c 100644 --- a/pkg/storage/stores/indexshipper/compactor/compactor.go +++ b/pkg/storage/stores/indexshipper/compactor/compactor.go @@ -168,7 +168,13 @@ type Compactor struct { subservicesWatcher *services.FailureWatcher } -func NewCompactor(cfg Config, objectClient client.ObjectClient, schemaConfig config.SchemaConfig, limits *validation.Overrides, r prometheus.Registerer) (*Compactor, error) { +type Limits interface { + deletion.Limits + retention.Limits + DefaultLimits() *validation.Limits +} + +func NewCompactor(cfg Config, objectClient client.ObjectClient, schemaConfig config.SchemaConfig, limits Limits, r prometheus.Registerer) (*Compactor, error) { retentionEnabledStats.Set("false") if cfg.RetentionEnabled { retentionEnabledStats.Set("true") @@ -234,7 +240,7 @@ func NewCompactor(cfg Config, objectClient client.ObjectClient, schemaConfig con return compactor, nil } -func (c *Compactor) init(objectClient client.ObjectClient, schemaConfig config.SchemaConfig, limits *validation.Overrides, r prometheus.Registerer) error { +func (c *Compactor) init(objectClient client.ObjectClient, schemaConfig config.SchemaConfig, limits Limits, r prometheus.Registerer) error { err := chunk_util.EnsureDirectory(c.cfg.WorkingDirectory) if err != nil { return err @@ -269,7 +275,7 @@ func (c *Compactor) init(objectClient client.ObjectClient, schemaConfig config.S return nil } -func (c *Compactor) initDeletes(r prometheus.Registerer, limits *validation.Overrides) error { +func (c *Compactor) initDeletes(r prometheus.Registerer, limits Limits) error { deletionWorkDir := filepath.Join(c.cfg.WorkingDirectory, "deletion") store, err := deletion.NewDeleteStore(deletionWorkDir, c.indexStorageClient) diff --git a/pkg/validation/exporter.go b/pkg/validation/exporter.go index 78eb245dff..bbc26d1b54 100644 --- a/pkg/validation/exporter.go +++ b/pkg/validation/exporter.go @@ -10,15 +10,19 @@ import ( "github.com/grafana/loki/pkg/util/flagext" ) +type ExportedLimits interface { + AllByUserID() map[string]*Limits + DefaultLimits() *Limits +} + type OverridesExporter struct { - overrides *Overrides - // tenantLimits TenantLimits + overrides ExportedLimits tenantDesc *prometheus.Desc defaultsDesc *prometheus.Desc } // TODO(jordanrushing): break out overrides from defaults? -func NewOverridesExporter(overrides *Overrides) *OverridesExporter { +func NewOverridesExporter(overrides ExportedLimits) *OverridesExporter { return &OverridesExporter{ overrides: overrides, tenantDesc: prometheus.NewDesc( diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index fe11ecc250..a9ad440e08 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -1,6 +1,7 @@ package validation import ( + "context" "encoding/json" "flag" "fmt" @@ -414,7 +415,7 @@ func (o *Overrides) MaxChunksPerQuery(userID string) int { } // MaxQueryLength returns the limit of the length (in time) of a query. -func (o *Overrides) MaxQueryLength(userID string) time.Duration { +func (o *Overrides) MaxQueryLength(ctx context.Context, userID string) time.Duration { return time.Duration(o.getOverridesForUser(userID).MaxQueryLength) } @@ -423,7 +424,7 @@ func (o *Overrides) MaxQueryLength(userID string) time.Duration { func (o *Overrides) MaxChunksPerQueryFromStore(userID string) int { return 0 } // MaxQueryLength returns the limit of the series of metric queries. -func (o *Overrides) MaxQuerySeries(userID string) int { +func (o *Overrides) MaxQuerySeries(ctx context.Context, userID string) int { return o.getOverridesForUser(userID).MaxQuerySeries } @@ -439,13 +440,13 @@ func (o *Overrides) QueryReadyIndexNumDays(userID string) int { // TSDBMaxQueryParallelism returns the limit to the number of sub-queries the // frontend will process in parallel for TSDB schemas. -func (o *Overrides) TSDBMaxQueryParallelism(userID string) int { +func (o *Overrides) TSDBMaxQueryParallelism(ctx context.Context, userID string) int { return o.getOverridesForUser(userID).TSDBMaxQueryParallelism } // MaxQueryParallelism returns the limit to the number of sub-queries the // frontend will process in parallel. -func (o *Overrides) MaxQueryParallelism(userID string) int { +func (o *Overrides) MaxQueryParallelism(ctx context.Context, userID string) int { return o.getOverridesForUser(userID).MaxQueryParallelism } @@ -460,7 +461,7 @@ func (o *Overrides) CardinalityLimit(userID string) int { } // MaxStreamsMatchersPerQuery returns the limit to number of streams matchers per query. -func (o *Overrides) MaxStreamsMatchersPerQuery(userID string) int { +func (o *Overrides) MaxStreamsMatchersPerQuery(ctx context.Context, userID string) int { return o.getOverridesForUser(userID).MaxStreamsMatchersPerQuery } @@ -475,7 +476,7 @@ func (o *Overrides) QuerySplitDuration(userID string) time.Duration { } // MaxConcurrentTailRequests returns the limit to number of concurrent tail requests. -func (o *Overrides) MaxConcurrentTailRequests(userID string) int { +func (o *Overrides) MaxConcurrentTailRequests(ctx context.Context, userID string) int { return o.getOverridesForUser(userID).MaxConcurrentTailRequests } @@ -490,20 +491,20 @@ func (o *Overrides) MaxLineSizeTruncate(userID string) bool { } // MaxEntriesLimitPerQuery returns the limit to number of entries the querier should return per query. -func (o *Overrides) MaxEntriesLimitPerQuery(userID string) int { +func (o *Overrides) MaxEntriesLimitPerQuery(ctx context.Context, userID string) int { return o.getOverridesForUser(userID).MaxEntriesLimitPerQuery } -func (o *Overrides) QueryTimeout(userID string) time.Duration { +func (o *Overrides) QueryTimeout(ctx context.Context, userID string) time.Duration { return time.Duration(o.getOverridesForUser(userID).QueryTimeout) } -func (o *Overrides) MaxCacheFreshness(userID string) time.Duration { +func (o *Overrides) MaxCacheFreshness(ctx context.Context, userID string) time.Duration { return time.Duration(o.getOverridesForUser(userID).MaxCacheFreshness) } // MaxQueryLookback returns the max lookback period of queries. -func (o *Overrides) MaxQueryLookback(userID string) time.Duration { +func (o *Overrides) MaxQueryLookback(ctx context.Context, userID string) time.Duration { return time.Duration(o.getOverridesForUser(userID).MaxQueryLookback) } @@ -645,7 +646,7 @@ func (o *Overrides) ShardStreams(userID string) *shardstreams.Config { return o.getOverridesForUser(userID).ShardStreams } -func (o *Overrides) BlockedQueries(userID string) []*validation.BlockedQuery { +func (o *Overrides) BlockedQueries(ctx context.Context, userID string) []*validation.BlockedQuery { return o.getOverridesForUser(userID).BlockedQueries }