diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index e65d025c64..1523f2454e 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -2959,6 +2959,10 @@ shard_streams: # CLI flag: -bloom-compactor.min-table-age [bloom_compactor_min_table_age: | default = 1h] +# Whether to compact chunks into bloom filters. +# CLI flag: -bloom-compactor.enable-compaction +[bloom_compactor_enable_compaction: | default = false] + # Allow user to send structured metadata in push payload. # CLI flag: -validation.allow-structured-metadata [allow_structured_metadata: | default = false] diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index c41d4bdfd7..71dbb08380 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -293,6 +293,12 @@ func (c *Compactor) compactUsers(ctx context.Context, logger log.Logger, sc stor return fmt.Errorf("interrupting compaction of tenants: %w", err) } + // Skip tenant if compaction is not enabled + if !c.limits.BloomCompactorEnabled(tenant) { + level.Info(tenantLogger).Log("msg", "compaction disabled for tenant. Skipping.") + continue + } + // Skip this table if it is too new/old for the tenant limits. now := model.Now() tableMinAge := c.limits.BloomCompactorMinTableAge(tenant) diff --git a/pkg/bloomcompactor/config.go b/pkg/bloomcompactor/config.go index b87aa4e894..57721850d2 100644 --- a/pkg/bloomcompactor/config.go +++ b/pkg/bloomcompactor/config.go @@ -43,4 +43,5 @@ type Limits interface { BloomCompactorShardSize(tenantID string) int BloomCompactorMaxTableAge(tenantID string) time.Duration BloomCompactorMinTableAge(tenantID string) time.Duration + BloomCompactorEnabled(tenantID string) bool } diff --git a/pkg/bloomcompactor/sharding_test.go b/pkg/bloomcompactor/sharding_test.go index d99f883dd3..1bd7b19864 100644 --- a/pkg/bloomcompactor/sharding_test.go +++ b/pkg/bloomcompactor/sharding_test.go @@ -143,3 +143,7 @@ func (m mockLimits) BloomCompactorMaxTableAge(_ string) time.Duration { func (m mockLimits) BloomCompactorMinTableAge(_ string) time.Duration { return 0 } + +func (m mockLimits) BloomCompactorEnabled(_ string) bool { + return false +} diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 823b23dd93..0a482b2c04 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -185,6 +185,7 @@ type Limits struct { BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size"` BloomCompactorMaxTableAge time.Duration `yaml:"bloom_compactor_max_table_age" json:"bloom_compactor_max_table_age"` BloomCompactorMinTableAge time.Duration `yaml:"bloom_compactor_min_table_age" json:"bloom_compactor_min_table_age"` + BloomCompactorEnabled bool `yaml:"bloom_compactor_enable_compaction" json:"bloom_compactor_enable_compaction"` AllowStructuredMetadata bool `yaml:"allow_structured_metadata,omitempty" json:"allow_structured_metadata,omitempty" doc:"description=Allow user to send structured metadata in push payload."` MaxStructuredMetadataSize flagext.ByteSize `yaml:"max_structured_metadata_size" json:"max_structured_metadata_size" doc:"description=Maximum size accepted for structured metadata per log line."` @@ -301,6 +302,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.BloomCompactorShardSize, "bloom-compactor.shard-size", 1, "The shard size defines how many bloom compactors should be used by a tenant when computing blooms. If it's set to 0, shuffle sharding is disabled.") f.DurationVar(&l.BloomCompactorMaxTableAge, "bloom-compactor.max-table-age", 7*24*time.Hour, "The maximum age of a table before it is compacted. Do not compact tables older than the the configured time. Default to 7 days. 0s means no limit.") f.DurationVar(&l.BloomCompactorMinTableAge, "bloom-compactor.min-table-age", 1*time.Hour, "The minimum age of a table before it is compacted. Do not compact tables newer than the the configured time. Default to 1 hour. 0s means no limit. This is useful to avoid compacting tables that will be updated with out-of-order writes.") + f.BoolVar(&l.BloomCompactorEnabled, "bloom-compactor.enable-compaction", false, "Whether to compact chunks into bloom filters.") l.ShardStreams = &shardstreams.Config{} l.ShardStreams.RegisterFlagsWithPrefix("shard-streams", f) @@ -796,6 +798,10 @@ func (o *Overrides) BloomCompactorMinTableAge(userID string) time.Duration { return o.getOverridesForUser(userID).BloomCompactorMinTableAge } +func (o *Overrides) BloomCompactorEnabled(userID string) bool { + return o.getOverridesForUser(userID).BloomCompactorEnabled +} + func (o *Overrides) AllowStructuredMetadata(userID string) bool { return o.getOverridesForUser(userID).AllowStructuredMetadata }