diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index 7935d7d94a..985d608b90 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -2389,6 +2389,11 @@ The `limits_config` block configures global and per-tenant limits in Loki. # CLI flag: -querier.tsdb-max-query-parallelism [tsdb_max_query_parallelism: | default = 512] +# Maximum number of bytes assigned to a single sharded query. Also expressible +# in human readable forms (1GB, etc). +# CLI flag: -querier.tsdb-max-bytes-per-shard +[tsdb_max_bytes_per_shard: | default = 600MB] + # Cardinality limit for index queries. # CLI flag: -store.cardinality-limit [cardinality_limit: | default = 100000] diff --git a/pkg/querier/queryrange/limits.go b/pkg/querier/queryrange/limits.go index f6bdbadaeb..d4ce55581a 100644 --- a/pkg/querier/queryrange/limits.go +++ b/pkg/querier/queryrange/limits.go @@ -59,6 +59,9 @@ type Limits interface { // TSDBMaxQueryParallelism returns the limit to the number of split queries the // frontend will process in parallel for TSDB queries. TSDBMaxQueryParallelism(context.Context, string) int + // TSDBMaxBytesPerShard returns the limit to the number of bytes a single shard + TSDBMaxBytesPerShard(string) int + RequiredLabels(context.Context, string) []string RequiredNumberLabels(context.Context, string) int MaxQueryBytesRead(context.Context, string) int diff --git a/pkg/querier/queryrange/roundtrip_test.go b/pkg/querier/queryrange/roundtrip_test.go index 4eb4b7656f..0d3294aa59 100644 --- a/pkg/querier/queryrange/roundtrip_test.go +++ b/pkg/querier/queryrange/roundtrip_test.go @@ -35,6 +35,7 @@ import ( util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/util/marshal" "github.com/grafana/loki/pkg/util/validation" + valid "github.com/grafana/loki/pkg/validation" ) var ( @@ -1361,6 +1362,10 @@ func (f fakeLimits) VolumeEnabled(_ string) bool { return f.volumeEnabled } +func (f fakeLimits) TSDBMaxBytesPerShard(_ string) int { + return valid.DefaultTSDBMaxBytesPerShard +} + func counter() (*int, http.Handler) { count := 0 var lock sync.Mutex diff --git a/pkg/querier/queryrange/shard_resolver.go b/pkg/querier/queryrange/shard_resolver.go index bd2911e23e..496c164777 100644 --- a/pkg/querier/queryrange/shard_resolver.go +++ b/pkg/querier/queryrange/shard_resolver.go @@ -11,6 +11,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/concurrency" + "github.com/grafana/dskit/tenant" "github.com/opentracing/opentracing-go" "github.com/prometheus/common/model" @@ -21,6 +22,8 @@ import ( "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/index/stats" "github.com/grafana/loki/pkg/util/spanlogger" + "github.com/grafana/loki/pkg/util/validation" + valid "github.com/grafana/loki/pkg/validation" ) func shardResolverForConf( @@ -182,7 +185,13 @@ func (r *dynamicShardResolver) Shards(e syntax.Expr) (int, uint64, error) { return 0, 0, err } - factor := guessShardFactor(combined, r.maxShards) + tenantIDs, err := tenant.TenantIDs(ctx) + if err != nil { + return 0, 0, err + } + + maxBytesPerShard := validation.SmallestPositiveIntPerTenant(tenantIDs, r.limits.TSDBMaxBytesPerShard) + factor := guessShardFactor(combined, maxBytesPerShard, r.maxShards) var bytesPerShard = combined.Bytes if factor > 0 { @@ -200,17 +209,18 @@ func (r *dynamicShardResolver) Shards(e syntax.Expr) (int, uint64, error) { return factor, bytesPerShard, nil } -const ( - // Just some observed values to get us started on better query planning. - maxBytesPerShard = 600 << 20 -) - // Since we shard by powers of two and we increase shard factor // once each shard surpasses maxBytesPerShard, if the shard factor // is at least two, the range of data per shard is (maxBytesPerShard/2, maxBytesPerShard] // For instance, for a maxBytesPerShard of 500MB and a query touching 1000MB, we split into two shards of 500MB. // If there are 1004MB, we split into four shards of 251MB. -func guessShardFactor(stats stats.Stats, maxShards int) int { +func guessShardFactor(stats stats.Stats, maxBytesPerShard, maxShards int) int { + // If maxBytesPerShard is 0, we use the default value + // to avoid division by zero + if maxBytesPerShard < 1 { + maxBytesPerShard = valid.DefaultTSDBMaxBytesPerShard + } + minShards := float64(stats.Bytes) / float64(maxBytesPerShard) // round up to nearest power of 2 diff --git a/pkg/querier/queryrange/shard_resolver_test.go b/pkg/querier/queryrange/shard_resolver_test.go index 92fea236d5..8e963b0659 100644 --- a/pkg/querier/queryrange/shard_resolver_test.go +++ b/pkg/querier/queryrange/shard_resolver_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/storage/stores/index/stats" + "github.com/grafana/loki/pkg/validation" ) func TestGuessShardFactor(t *testing.T) { @@ -22,52 +23,52 @@ func TestGuessShardFactor(t *testing.T) { { exp: 4, stats: stats.Stats{ - Bytes: maxBytesPerShard * 4, + Bytes: validation.DefaultTSDBMaxBytesPerShard * 4, }, }, { // round up shard factor exp: 16, stats: stats.Stats{ - Bytes: maxBytesPerShard * 15, + Bytes: validation.DefaultTSDBMaxBytesPerShard * 15, }, }, { exp: 2, stats: stats.Stats{ - Bytes: maxBytesPerShard + 1, + Bytes: validation.DefaultTSDBMaxBytesPerShard + 1, }, }, { exp: 0, stats: stats.Stats{ - Bytes: maxBytesPerShard, + Bytes: validation.DefaultTSDBMaxBytesPerShard, }, }, { maxShards: 8, exp: 4, stats: stats.Stats{ - Bytes: maxBytesPerShard * 4, + Bytes: validation.DefaultTSDBMaxBytesPerShard * 4, }, }, { maxShards: 2, exp: 2, stats: stats.Stats{ - Bytes: maxBytesPerShard * 4, + Bytes: validation.DefaultTSDBMaxBytesPerShard * 4, }, }, { maxShards: 1, exp: 0, stats: stats.Stats{ - Bytes: maxBytesPerShard * 4, + Bytes: validation.DefaultTSDBMaxBytesPerShard * 4, }, }, } { t.Run(fmt.Sprintf("%+v", tc.stats), func(t *testing.T) { - require.Equal(t, tc.exp, guessShardFactor(tc.stats, tc.maxShards)) + require.Equal(t, tc.exp, guessShardFactor(tc.stats, validation.DefaultTSDBMaxBytesPerShard, tc.maxShards)) }) } } diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 438f3c309e..2f34d0ae85 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -47,8 +47,9 @@ const ( bytesInMB = 1048576 - defaultPerStreamRateLimit = 3 << 20 // 3MB - defaultPerStreamBurstLimit = 5 * defaultPerStreamRateLimit + defaultPerStreamRateLimit = 3 << 20 // 3MB + DefaultTSDBMaxBytesPerShard = 600 << 20 // 600MB + defaultPerStreamBurstLimit = 5 * defaultPerStreamRateLimit DefaultPerTenantQueryTimeout = "1m" ) @@ -81,22 +82,23 @@ type Limits struct { PerStreamRateLimitBurst flagext.ByteSize `yaml:"per_stream_rate_limit_burst" json:"per_stream_rate_limit_burst"` // Querier enforced limits. - MaxChunksPerQuery int `yaml:"max_chunks_per_query" json:"max_chunks_per_query"` - MaxQuerySeries int `yaml:"max_query_series" json:"max_query_series"` - MaxQueryLookback model.Duration `yaml:"max_query_lookback" json:"max_query_lookback"` - MaxQueryLength model.Duration `yaml:"max_query_length" json:"max_query_length"` - MaxQueryRange model.Duration `yaml:"max_query_range" json:"max_query_range"` - MaxQueryParallelism int `yaml:"max_query_parallelism" json:"max_query_parallelism"` - TSDBMaxQueryParallelism int `yaml:"tsdb_max_query_parallelism" json:"tsdb_max_query_parallelism"` - CardinalityLimit int `yaml:"cardinality_limit" json:"cardinality_limit"` - MaxStreamsMatchersPerQuery int `yaml:"max_streams_matchers_per_query" json:"max_streams_matchers_per_query"` - MaxConcurrentTailRequests int `yaml:"max_concurrent_tail_requests" json:"max_concurrent_tail_requests"` - MaxEntriesLimitPerQuery int `yaml:"max_entries_limit_per_query" json:"max_entries_limit_per_query"` - MaxCacheFreshness model.Duration `yaml:"max_cache_freshness_per_query" json:"max_cache_freshness_per_query"` - MaxStatsCacheFreshness model.Duration `yaml:"max_stats_cache_freshness" json:"max_stats_cache_freshness"` - MaxQueriersPerTenant int `yaml:"max_queriers_per_tenant" json:"max_queriers_per_tenant"` - QueryReadyIndexNumDays int `yaml:"query_ready_index_num_days" json:"query_ready_index_num_days"` - QueryTimeout model.Duration `yaml:"query_timeout" json:"query_timeout"` + MaxChunksPerQuery int `yaml:"max_chunks_per_query" json:"max_chunks_per_query"` + MaxQuerySeries int `yaml:"max_query_series" json:"max_query_series"` + MaxQueryLookback model.Duration `yaml:"max_query_lookback" json:"max_query_lookback"` + MaxQueryLength model.Duration `yaml:"max_query_length" json:"max_query_length"` + MaxQueryRange model.Duration `yaml:"max_query_range" json:"max_query_range"` + MaxQueryParallelism int `yaml:"max_query_parallelism" json:"max_query_parallelism"` + TSDBMaxQueryParallelism int `yaml:"tsdb_max_query_parallelism" json:"tsdb_max_query_parallelism"` + TSDBMaxBytesPerShard flagext.ByteSize `yaml:"tsdb_max_bytes_per_shard" json:"tsdb_max_bytes_per_shard"` + CardinalityLimit int `yaml:"cardinality_limit" json:"cardinality_limit"` + MaxStreamsMatchersPerQuery int `yaml:"max_streams_matchers_per_query" json:"max_streams_matchers_per_query"` + MaxConcurrentTailRequests int `yaml:"max_concurrent_tail_requests" json:"max_concurrent_tail_requests"` + MaxEntriesLimitPerQuery int `yaml:"max_entries_limit_per_query" json:"max_entries_limit_per_query"` + MaxCacheFreshness model.Duration `yaml:"max_cache_freshness_per_query" json:"max_cache_freshness_per_query"` + MaxStatsCacheFreshness model.Duration `yaml:"max_stats_cache_freshness" json:"max_stats_cache_freshness"` + MaxQueriersPerTenant int `yaml:"max_queriers_per_tenant" json:"max_queriers_per_tenant"` + QueryReadyIndexNumDays int `yaml:"query_ready_index_num_days" json:"query_ready_index_num_days"` + QueryTimeout model.Duration `yaml:"query_timeout" json:"query_timeout"` // Query frontend enforced limits. The default is actually parameterized by the queryrange config. QuerySplitDuration model.Duration `yaml:"split_queries_by_interval" json:"split_queries_by_interval"` @@ -235,6 +237,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.Var(&l.MaxQueryLookback, "querier.max-query-lookback", "Limit how far back in time series data and metadata can be queried, up until lookback duration ago. This limit is enforced in the query frontend, the querier and the ruler. If the requested time range is outside the allowed range, the request will not fail, but will be modified to only query data within the allowed time range. The default value of 0 does not set a limit.") f.IntVar(&l.MaxQueryParallelism, "querier.max-query-parallelism", 32, "Maximum number of queries that will be scheduled in parallel by the frontend.") f.IntVar(&l.TSDBMaxQueryParallelism, "querier.tsdb-max-query-parallelism", 512, "Maximum number of queries will be scheduled in parallel by the frontend for TSDB schemas.") + _ = l.TSDBMaxBytesPerShard.Set(strconv.Itoa(DefaultTSDBMaxBytesPerShard)) + f.Var(&l.TSDBMaxBytesPerShard, "querier.tsdb-max-bytes-per-shard", "Maximum number of bytes assigned to a single sharded query. Also expressible in human readable forms (1GB, etc).") f.IntVar(&l.CardinalityLimit, "store.cardinality-limit", 1e5, "Cardinality limit for index queries.") f.IntVar(&l.MaxStreamsMatchersPerQuery, "querier.max-streams-matcher-per-query", 1000, "Maximum number of stream matchers per query.") f.IntVar(&l.MaxConcurrentTailRequests, "querier.max-concurrent-tail-requests", 10, "Maximum number of concurrent tail requests.") @@ -474,6 +478,11 @@ func (o *Overrides) TSDBMaxQueryParallelism(_ context.Context, userID string) in return o.getOverridesForUser(userID).TSDBMaxQueryParallelism } +// TSDBMaxBytesPerShard returns the maximum number of bytes assigned to a specific shard in a tsdb query +func (o *Overrides) TSDBMaxBytesPerShard(userID string) int { + return o.getOverridesForUser(userID).TSDBMaxBytesPerShard.Val() +} + // MaxQueryParallelism returns the limit to the number of sub-queries the // frontend will process in parallel. func (o *Overrides) MaxQueryParallelism(_ context.Context, userID string) int {