[bloom-compactor] remove BloomCompactorMinTableAge check (#11546)

**What this PR does / why we need it**:
`BloomCompactorMinTableAge` was added with the idea that we will skip
most recent index files from ingesters that are not compacted into TSDB
indexes yet.

The default value is an hour. This blocks bloom-compacter processing
TSBD of the day, because end timestamp is always in the last 15 mins. We
can either reduce it something lower than the frequency of TSDB indexes
being built (< 15mins).
Here I choose to remove it altogether, assuming uncompacted indexes from
ingesters will not be processed as a table as there won't be a schema
for that table with the [check
here](https://github.com/grafana/loki/blob/main/pkg/bloomcompactor/bloomcompactor.go#L268-L272).
pull/11579/head^2
Poyzan 1 year ago committed by GitHub
parent ce574485cf
commit 599eed7c52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      docs/sources/configure/_index.md
  2. 9
      pkg/bloomcompactor/bloomcompactor.go
  3. 1
      pkg/bloomcompactor/config.go
  4. 6
      pkg/validation/limits.go

@ -3084,13 +3084,6 @@ shard_streams:
# CLI flag: -bloom-compactor.max-table-age
[bloom_compactor_max_table_age: <duration> | default = 168h]
# The minimum age of a table before it is compacted. Do not compact tables newer
# than the the configured time. Default to 1 hour. 0s means no limit. This is
# useful to avoid compacting tables that will be updated with out-of-order
# writes.
# CLI flag: -bloom-compactor.min-table-age
[bloom_compactor_min_table_age: <duration> | default = 1h]
# Whether to compact chunks into bloom filters.
# CLI flag: -bloom-compactor.enable-compaction
[bloom_compactor_enable_compaction: <boolean> | default = false]

@ -246,13 +246,11 @@ func (c *Compactor) runCompaction(ctx context.Context) error {
_ = concurrency.ForEachJob(ctx, len(tables), parallelism, func(ctx context.Context, i int) error {
tableName := tables[i]
logger := log.With(c.logger, "table", tableName)
level.Info(logger).Log("msg", "compacting table")
err := c.compactTable(ctx, logger, tableName, tablesIntervals[tableName])
if err != nil {
errs.Add(err)
return nil
}
level.Info(logger).Log("msg", "finished compacting table")
return nil
})
@ -307,12 +305,7 @@ func (c *Compactor) compactUsers(ctx context.Context, logger log.Logger, sc stor
// Skip this table if it is too new/old for the tenant limits.
now := model.Now()
tableMinAge := c.limits.BloomCompactorMinTableAge(tenant)
tableMaxAge := c.limits.BloomCompactorMaxTableAge(tenant)
if tableMinAge > 0 && tableInterval.End.After(now.Add(-tableMinAge)) {
level.Debug(tenantLogger).Log("msg", "skipping tenant because table is too new ", "table-min-age", tableMinAge, "table-end", tableInterval.End, "now", now)
continue
}
if tableMaxAge > 0 && tableInterval.Start.Before(now.Add(-tableMaxAge)) {
level.Debug(tenantLogger).Log("msg", "skipping tenant because table is too old", "table-max-age", tableMaxAge, "table-start", tableInterval.Start, "now", now)
continue
@ -507,6 +500,7 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
}
}()
level.Info(logger).Log("msg", "started compacting table", "table", job.tableName, "tenant", job.tenantID)
if len(blocksMatchingJob) == 0 && len(metasMatchingJob) > 0 {
// There is no change to any blocks, no compaction needed
level.Info(logger).Log("msg", "No changes to tsdb, no compaction needed")
@ -597,5 +591,6 @@ func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job,
level.Error(logger).Log("msg", "failed uploading meta.json to storage", "err", err)
return err
}
level.Info(logger).Log("msg", "finished compacting table", "table", job.tableName, "tenant", job.tenantID)
return nil
}

@ -42,7 +42,6 @@ type Limits interface {
downloads.Limits
BloomCompactorShardSize(tenantID string) int
BloomCompactorMaxTableAge(tenantID string) time.Duration
BloomCompactorMinTableAge(tenantID string) time.Duration
BloomCompactorEnabled(tenantID string) bool
BloomNGramLength(tenantID string) int
BloomNGramSkip(tenantID string) int

@ -186,7 +186,6 @@ type Limits struct {
BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size"`
BloomCompactorMaxTableAge time.Duration `yaml:"bloom_compactor_max_table_age" json:"bloom_compactor_max_table_age"`
BloomCompactorMinTableAge time.Duration `yaml:"bloom_compactor_min_table_age" json:"bloom_compactor_min_table_age"`
BloomCompactorEnabled bool `yaml:"bloom_compactor_enable_compaction" json:"bloom_compactor_enable_compaction"`
BloomNGramLength int `yaml:"bloom_ngram_length" json:"bloom_ngram_length"`
BloomNGramSkip int `yaml:"bloom_ngram_skip" json:"bloom_ngram_skip"`
@ -312,7 +311,6 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.BloomCompactorShardSize, "bloom-compactor.shard-size", 1, "The shard size defines how many bloom compactors should be used by a tenant when computing blooms. If it's set to 0, shuffle sharding is disabled.")
f.DurationVar(&l.BloomCompactorMaxTableAge, "bloom-compactor.max-table-age", 7*24*time.Hour, "The maximum age of a table before it is compacted. Do not compact tables older than the the configured time. Default to 7 days. 0s means no limit.")
f.DurationVar(&l.BloomCompactorMinTableAge, "bloom-compactor.min-table-age", 1*time.Hour, "The minimum age of a table before it is compacted. Do not compact tables newer than the the configured time. Default to 1 hour. 0s means no limit. This is useful to avoid compacting tables that will be updated with out-of-order writes.")
f.BoolVar(&l.BloomCompactorEnabled, "bloom-compactor.enable-compaction", false, "Whether to compact chunks into bloom filters.")
f.IntVar(&l.BloomNGramLength, "bloom-compactor.ngram-length", 4, "Length of the n-grams created when computing blooms from log lines.")
f.IntVar(&l.BloomNGramSkip, "bloom-compactor.ngram-skip", 0, "Skip factor for the n-grams created when computing blooms from log lines.")
@ -838,10 +836,6 @@ func (o *Overrides) BloomCompactorMaxTableAge(userID string) time.Duration {
return o.getOverridesForUser(userID).BloomCompactorMaxTableAge
}
func (o *Overrides) BloomCompactorMinTableAge(userID string) time.Duration {
return o.getOverridesForUser(userID).BloomCompactorMinTableAge
}
func (o *Overrides) BloomCompactorEnabled(userID string) bool {
return o.getOverridesForUser(userID).BloomCompactorEnabled
}

Loading…
Cancel
Save