[bloom-compactor] Add configs to enable compactor per tenant (#11235)

**What this PR does / why we need it**:
We want to control bloom compaction per tenant basis. Adding configs to
enable/disable bloom compactor.

**Which issue(s) this PR fixes**:
Fixes #<issue number>

**Special notes for your reviewer**:

**Checklist**
- [x] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [x] Documentation added
- [ ] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e3ec)
- [ ] If the change is deprecating or removing a configuration option,
update the `deprecated-config.yaml` and `deleted-config.yaml` files
respectively in the `tools/deprecated-config-checker` directory.
[Example
PR](0d4416a4b0)
pull/11248/head
Poyzan 2 years ago committed by GitHub
parent 3f0f8fa5f5
commit d22c1fd39d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      docs/sources/configure/_index.md
  2. 6
      pkg/bloomcompactor/bloomcompactor.go
  3. 1
      pkg/bloomcompactor/config.go
  4. 4
      pkg/bloomcompactor/sharding_test.go
  5. 6
      pkg/validation/limits.go

@ -2959,6 +2959,10 @@ shard_streams:
# CLI flag: -bloom-compactor.min-table-age
[bloom_compactor_min_table_age: <duration> | default = 1h]
# Whether to compact chunks into bloom filters.
# CLI flag: -bloom-compactor.enable-compaction
[bloom_compactor_enable_compaction: <boolean> | default = false]
# Allow user to send structured metadata in push payload.
# CLI flag: -validation.allow-structured-metadata
[allow_structured_metadata: <boolean> | default = false]

@ -293,6 +293,12 @@ func (c *Compactor) compactUsers(ctx context.Context, logger log.Logger, sc stor
return fmt.Errorf("interrupting compaction of tenants: %w", err)
}
// Skip tenant if compaction is not enabled
if !c.limits.BloomCompactorEnabled(tenant) {
level.Info(tenantLogger).Log("msg", "compaction disabled for tenant. Skipping.")
continue
}
// Skip this table if it is too new/old for the tenant limits.
now := model.Now()
tableMinAge := c.limits.BloomCompactorMinTableAge(tenant)

@ -43,4 +43,5 @@ type Limits interface {
BloomCompactorShardSize(tenantID string) int
BloomCompactorMaxTableAge(tenantID string) time.Duration
BloomCompactorMinTableAge(tenantID string) time.Duration
BloomCompactorEnabled(tenantID string) bool
}

@ -143,3 +143,7 @@ func (m mockLimits) BloomCompactorMaxTableAge(_ string) time.Duration {
func (m mockLimits) BloomCompactorMinTableAge(_ string) time.Duration {
return 0
}
func (m mockLimits) BloomCompactorEnabled(_ string) bool {
return false
}

@ -185,6 +185,7 @@ type Limits struct {
BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size"`
BloomCompactorMaxTableAge time.Duration `yaml:"bloom_compactor_max_table_age" json:"bloom_compactor_max_table_age"`
BloomCompactorMinTableAge time.Duration `yaml:"bloom_compactor_min_table_age" json:"bloom_compactor_min_table_age"`
BloomCompactorEnabled bool `yaml:"bloom_compactor_enable_compaction" json:"bloom_compactor_enable_compaction"`
AllowStructuredMetadata bool `yaml:"allow_structured_metadata,omitempty" json:"allow_structured_metadata,omitempty" doc:"description=Allow user to send structured metadata in push payload."`
MaxStructuredMetadataSize flagext.ByteSize `yaml:"max_structured_metadata_size" json:"max_structured_metadata_size" doc:"description=Maximum size accepted for structured metadata per log line."`
@ -301,6 +302,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.BloomCompactorShardSize, "bloom-compactor.shard-size", 1, "The shard size defines how many bloom compactors should be used by a tenant when computing blooms. If it's set to 0, shuffle sharding is disabled.")
f.DurationVar(&l.BloomCompactorMaxTableAge, "bloom-compactor.max-table-age", 7*24*time.Hour, "The maximum age of a table before it is compacted. Do not compact tables older than the the configured time. Default to 7 days. 0s means no limit.")
f.DurationVar(&l.BloomCompactorMinTableAge, "bloom-compactor.min-table-age", 1*time.Hour, "The minimum age of a table before it is compacted. Do not compact tables newer than the the configured time. Default to 1 hour. 0s means no limit. This is useful to avoid compacting tables that will be updated with out-of-order writes.")
f.BoolVar(&l.BloomCompactorEnabled, "bloom-compactor.enable-compaction", false, "Whether to compact chunks into bloom filters.")
l.ShardStreams = &shardstreams.Config{}
l.ShardStreams.RegisterFlagsWithPrefix("shard-streams", f)
@ -796,6 +798,10 @@ func (o *Overrides) BloomCompactorMinTableAge(userID string) time.Duration {
return o.getOverridesForUser(userID).BloomCompactorMinTableAge
}
func (o *Overrides) BloomCompactorEnabled(userID string) bool {
return o.getOverridesForUser(userID).BloomCompactorEnabled
}
func (o *Overrides) AllowStructuredMetadata(userID string) bool {
return o.getOverridesForUser(userID).AllowStructuredMetadata
}

Loading…
Cancel
Save