allow applying retention at different interval than compaction with a config (#4736)

* allow applying retention at different interval than compaction with a config

* add changelog entry
pull/4751/head
Sandeep Sukhani 4 years ago committed by GitHub
parent 11071d4c31
commit 02d88462f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 31
      pkg/storage/stores/shipper/compactor/compactor.go
  3. 2
      pkg/storage/stores/shipper/compactor/compactor_test.go
  4. 6
      pkg/storage/stores/shipper/compactor/metrics.go

@ -9,6 +9,7 @@ Release notes for 2.4.1 can be found on the [release notes page](https://grafana
* [4687](https://github.com/grafana/loki/pull/4687) **owen-d**: overrides checks for nil tenant limits on AllByUserID
* [4683](https://github.com/grafana/loki/pull/4683) **owen-d**: Adds replication_factor doc to common config
* [4681](https://github.com/grafana/loki/pull/4681) **slim-bean**: Loki: check new Read target when initializing boltdb-shipper store
* [4736](https://github.com/grafana/loki/pull/4736) **sandeepsukhani**: allow applying retention at different interval than compaction
# 2.4.0 (2021/11/05)

@ -57,6 +57,7 @@ type Config struct {
SharedStoreType string `yaml:"shared_store"`
SharedStoreKeyPrefix string `yaml:"shared_store_key_prefix"`
CompactionInterval time.Duration `yaml:"compaction_interval"`
ApplyRetentionInterval time.Duration `yaml:"apply_retention_interval"`
RetentionEnabled bool `yaml:"retention_enabled"`
RetentionDeleteDelay time.Duration `yaml:"retention_delete_delay"`
RetentionDeleteWorkCount int `yaml:"retention_delete_worker_count"`
@ -71,6 +72,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.SharedStoreType, "boltdb.shipper.compactor.shared-store", "", "Shared store used for storing boltdb files. Supported types: gcs, s3, azure, swift, filesystem")
f.StringVar(&cfg.SharedStoreKeyPrefix, "boltdb.shipper.compactor.shared-store.key-prefix", "index/", "Prefix to add to Object Keys in Shared store. Path separator(if any) should always be a '/'. Prefix should never start with a separator but should always end with it.")
f.DurationVar(&cfg.CompactionInterval, "boltdb.shipper.compactor.compaction-interval", 10*time.Minute, "Interval at which to re-run the compaction operation.")
f.DurationVar(&cfg.ApplyRetentionInterval, "boltdb.shipper.compactor.apply-retention-interval", 0, "Interval at which to apply/enforce retention. 0 means run at same interval as compaction. If non-zero, it should always be a multiple of compaction interval.")
f.DurationVar(&cfg.RetentionDeleteDelay, "boltdb.shipper.compactor.retention-delete-delay", 2*time.Hour, "Delay after which chunks will be fully deleted during retention.")
f.BoolVar(&cfg.RetentionEnabled, "boltdb.shipper.compactor.retention-enabled", false, "(Experimental) Activate custom (per-stream,per-tenant) retention.")
f.IntVar(&cfg.RetentionDeleteWorkCount, "boltdb.shipper.compactor.retention-delete-worker-count", 150, "The total amount of worker to use to delete chunks.")
@ -84,6 +86,10 @@ func (cfg *Config) Validate() error {
if cfg.MaxCompactionParallelism < 1 {
return errors.New("max compaction parallelism must be >= 1")
}
if cfg.RetentionEnabled && cfg.ApplyRetentionInterval != 0 && cfg.ApplyRetentionInterval%cfg.CompactionInterval != 0 {
return errors.New("interval for applying retention should either be set to a 0 or a multiple of compaction interval")
}
return shipper_util.ValidateSharedStoreKeyPrefix(cfg.SharedStoreKeyPrefix)
}
@ -338,12 +344,24 @@ func (c *Compactor) runCompactions(ctx context.Context) {
break
}
lastRetentionRunAt := time.Unix(0, 0)
runCompaction := func() {
err := c.RunCompaction(ctx)
applyRetention := false
if c.cfg.RetentionEnabled && time.Since(lastRetentionRunAt) >= c.cfg.ApplyRetentionInterval {
level.Info(util_log.Logger).Log("msg", "applying retention with compaction")
applyRetention = true
}
err := c.RunCompaction(ctx, applyRetention)
if err != nil {
level.Error(util_log.Logger).Log("msg", "failed to run compaction", "err", err)
}
if applyRetention {
lastRetentionRunAt = time.Now()
}
}
c.wg.Add(1)
go func() {
defer c.wg.Done()
@ -380,7 +398,7 @@ func (c *Compactor) stopping(_ error) error {
return services.StopManagerAndAwaitStopped(context.Background(), c.subservices)
}
func (c *Compactor) CompactTable(ctx context.Context, tableName string) error {
func (c *Compactor) CompactTable(ctx context.Context, tableName string, applyRetention bool) error {
table, err := newTable(ctx, filepath.Join(c.cfg.WorkingDirectory, tableName), c.indexStorageClient, c.cfg.RetentionEnabled, c.tableMarker)
if err != nil {
level.Error(util_log.Logger).Log("msg", "failed to initialize table for compaction", "table", tableName, "err", err)
@ -389,7 +407,7 @@ func (c *Compactor) CompactTable(ctx context.Context, tableName string) error {
interval := retention.ExtractIntervalFromTableName(tableName)
intervalMayHaveExpiredChunks := false
if c.cfg.RetentionEnabled {
if c.cfg.RetentionEnabled && applyRetention {
intervalMayHaveExpiredChunks = c.expirationChecker.IntervalMayHaveExpiredChunks(interval)
}
@ -401,7 +419,7 @@ func (c *Compactor) CompactTable(ctx context.Context, tableName string) error {
return nil
}
func (c *Compactor) RunCompaction(ctx context.Context) error {
func (c *Compactor) RunCompaction(ctx context.Context, applyRetention bool) error {
status := statusSuccess
start := time.Now()
@ -415,6 +433,9 @@ func (c *Compactor) RunCompaction(ctx context.Context) error {
if status == statusSuccess {
c.metrics.compactTablesOperationDurationSeconds.Set(runtime.Seconds())
c.metrics.compactTablesOperationLastSuccess.SetToCurrentTime()
if applyRetention {
c.metrics.applyRetentionLastSuccess.SetToCurrentTime()
}
}
if c.cfg.RetentionEnabled {
@ -453,7 +474,7 @@ func (c *Compactor) RunCompaction(ctx context.Context) error {
}
level.Info(util_log.Logger).Log("msg", "compacting table", "table-name", tableName)
err = c.CompactTable(ctx, tableName)
err = c.CompactTable(ctx, tableName, applyRetention)
if err != nil {
return
}

@ -95,7 +95,7 @@ func TestCompactor_RunCompaction(t *testing.T) {
}
compactor := setupTestCompactor(t, tempDir)
err = compactor.RunCompaction(context.Background())
err = compactor.RunCompaction(context.Background(), true)
require.NoError(t, err)
for name := range tables {

@ -14,6 +14,7 @@ type metrics struct {
compactTablesOperationTotal *prometheus.CounterVec
compactTablesOperationDurationSeconds prometheus.Gauge
compactTablesOperationLastSuccess prometheus.Gauge
applyRetentionLastSuccess prometheus.Gauge
compactorRunning prometheus.Gauge
}
@ -34,6 +35,11 @@ func newMetrics(r prometheus.Registerer) *metrics {
Name: "compact_tables_operation_last_successful_run_timestamp_seconds",
Help: "Unix timestamp of the last successful compaction run",
}),
applyRetentionLastSuccess: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: "loki_boltdb_shipper",
Name: "apply_retention_last_successful_run_timestamp_seconds",
Help: "Unix timestamp of the last successful retention run",
}),
compactorRunning: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: "loki_boltdb_shipper",
Name: "compactor_running",

Loading…
Cancel
Save