diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index 1c2689d457..9ef4176c59 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -2184,6 +2184,23 @@ compacts index shards to more performant forms. # The hash ring configuration used by compactors to elect a single instance for running compactions # The CLI flags prefix for this block config is: boltdb.shipper.compactor.ring [compactor_ring: ] + +# Number of tables that compactor will try to compact. Newer tables +# are chosen when this is less than the number of tables available +# CLI flag: -boltdb.shipper.compact.tables-to-compact +[tables_to_compact: | default: 0] + +# Do not compact N latest tables. Together with +# -boltdb.shipper.compactor.run-once and +# -boltdb.shipper.compactor.tables-to-compact, this is useful when +# clearing compactor backlogs. +# CLI flag: -boltdb.shipper.compact.skip-latest-n-tables +[skip_latest_n_tables: | default: 0] + +# The hash ring configuration used by compactors to elect a single instance for running compactions +# The CLI flags prefix for this block config is: boltdb.shipper.compactor.ring +[compactor_ring: ] + ``` ## limits_config diff --git a/pkg/storage/stores/indexshipper/compactor/compactor.go b/pkg/storage/stores/indexshipper/compactor/compactor.go index 6d6ff77438..f409757a47 100644 --- a/pkg/storage/stores/indexshipper/compactor/compactor.go +++ b/pkg/storage/stores/indexshipper/compactor/compactor.go @@ -6,6 +6,7 @@ import ( "fmt" "net/http" "path/filepath" + "sort" "strconv" "strings" "sync" @@ -85,7 +86,9 @@ type Config struct { DeleteMaxInterval time.Duration `yaml:"delete_max_interval"` MaxCompactionParallelism int `yaml:"max_compaction_parallelism"` CompactorRing util.RingConfig `yaml:"compactor_ring,omitempty"` - RunOnce bool `yaml:"-"` + RunOnce bool `yaml:"_"` + TablesToCompact int `yaml:"tables_to_compact"` + SkipLatestNTables int `yaml:"skip_latest_n_tables"` // Deprecated DeletionMode string `yaml:"deletion_mode"` @@ -112,6 +115,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { flagext.DeprecatedFlag(f, "boltdb.shipper.compactor.deletion-mode", "Deprecated. This has been moved to the deletion_mode per tenant configuration.", util_log.Logger) cfg.CompactorRing.RegisterFlagsWithPrefix("boltdb.shipper.compactor.", "collectors/", f) + f.IntVar(&cfg.TablesToCompact, "boltdb.shipper.compactor.tables-to-compact", 0, "The number of most recent tables to compact in a single run. Default: all") + f.IntVar(&cfg.SkipLatestNTables, "boltdb.shipper.compactor.skip-latest-n-tables", 0, "Skip compacting latest N tables") + } // Validate verifies the config does not contain inappropriate values @@ -558,6 +564,17 @@ func (c *Compactor) RunCompaction(ctx context.Context, applyRetention bool) erro c.indexStorageClient.RefreshIndexListCache(ctx) tables, err := c.indexStorageClient.ListTables(ctx) + + // process most recent tables first + sortTablesByRange(tables) + + // apply passed in compaction limits + if c.cfg.SkipLatestNTables <= len(tables) { + tables = tables[c.cfg.SkipLatestNTables:] + } + if c.cfg.TablesToCompact > 0 && c.cfg.TablesToCompact < len(tables) { + tables = tables[:c.cfg.TablesToCompact] + } if err != nil { status = statusFailure return err @@ -695,6 +712,19 @@ func (c *Compactor) ServeHTTP(w http.ResponseWriter, req *http.Request) { c.ring.ServeHTTP(w, req) } +func sortTablesByRange(tables []string) { + tableRanges := make(map[string]model.Interval) + for _, table := range tables { + tableRanges[table] = retention.ExtractIntervalFromTableName(table) + } + + sort.Slice(tables, func(i, j int) bool { + // less than if start time is after produces a most recent first sort order + return tableRanges[tables[i]].Start.After(tableRanges[tables[j]].Start) + }) + +} + func schemaPeriodForTable(cfg config.SchemaConfig, tableName string) (config.PeriodConfig, bool) { // first round removes configs that does not have the prefix. candidates := []config.PeriodConfig{} diff --git a/pkg/storage/stores/indexshipper/compactor/compactor_test.go b/pkg/storage/stores/indexshipper/compactor/compactor_test.go index be20594af6..5b5a3031d1 100644 --- a/pkg/storage/stores/indexshipper/compactor/compactor_test.go +++ b/pkg/storage/stores/indexshipper/compactor/compactor_test.go @@ -174,3 +174,14 @@ func Test_schemaPeriodForTable(t *testing.T) { }) } } + +func Test_tableSort(t *testing.T) { + intervals := []string{ + "index_19191", + "index_19195", + "index_19192", + } + + sortTablesByRange(intervals) + require.Equal(t, []string{"index_19195", "index_19192", "index_19191"}, intervals) +}