Loki: add a run-once flag to the compactor (#6281)

* add a run once flag to the compactor to allow it to cleanup index files with a single manual invocation. Useful when using migrate tool.

Signed-off-by: Edward Welch <edward.welch@grafana.com>

* fix lint

Signed-off-by: Edward Welch <edward.welch@grafana.com>
pull/6287/head
Ed Welch 3 years ago committed by GitHub
parent b5f0557e79
commit f52fc9d907
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      pkg/storage/stores/shipper/compactor/compactor.go

@ -72,6 +72,7 @@ type Config struct {
DeleteRequestCancelPeriod time.Duration `yaml:"delete_request_cancel_period"`
MaxCompactionParallelism int `yaml:"max_compaction_parallelism"`
CompactorRing util.RingConfig `yaml:"compactor_ring,omitempty"`
RunOnce bool `yaml:"-"`
}
// RegisterFlags registers flags.
@ -88,6 +89,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxCompactionParallelism, "boltdb.shipper.compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.")
f.StringVar(&cfg.DeletionMode, "boltdb.shipper.compactor.deletion-mode", "whole-stream-deletion", fmt.Sprintf("(Experimental) Deletion mode. Can be one of %v", strings.Join(deletion.AllModes(), "|")))
cfg.CompactorRing.RegisterFlagsWithPrefix("boltdb.shipper.compactor.", "collectors/", f)
f.BoolVar(&cfg.RunOnce, "boltdb.shipper.compactor.run-once", false, "Run the compactor one time to cleanup and compact index files only (no retention applied)")
}
// Validate verifies the config does not contain inappropriate values
@ -311,6 +313,21 @@ func (c *Compactor) starting(ctx context.Context) (err error) {
}
func (c *Compactor) loop(ctx context.Context) error {
if c.cfg.RunOnce {
level.Info(util_log.Logger).Log("msg", "running single compaction")
err := c.RunCompaction(ctx, false)
if err != nil {
level.Error(util_log.Logger).Log("msg", "compaction encountered an error", "err", err)
}
level.Info(util_log.Logger).Log("msg", "single compaction finished")
level.Info(util_log.Logger).Log("msg", "interrupt or terminate the process to finish")
// Wait for Loki to shutdown.
<-ctx.Done()
level.Info(util_log.Logger).Log("msg", "compactor exiting")
return nil
}
if c.cfg.RetentionEnabled {
if c.deleteRequestsStore != nil {
defer c.deleteRequestsStore.Stop()

Loading…
Cancel
Save