feat(ruler): add per-tenant configuration to disable WAL replay (#16717)

pull/16836/head
Trevor Whitney 9 months ago committed by GitHub
parent d68713850b
commit eda3ba865b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 5
      docs/sources/shared/configuration.md
  2. 1
      pkg/ruler/base/compat.go
  3. 7
      pkg/ruler/base/ruler_test.go
  4. 24
      pkg/ruler/registry.go
  5. 4
      pkg/ruler/registry_test.go
  6. 4
      pkg/ruler/storage/instance/instance.go
  7. 29
      pkg/ruler/storage/wal/wal.go
  8. 49
      pkg/ruler/storage/wal/wal_test.go
  9. 7
      pkg/validation/limits.go

@ -3785,6 +3785,11 @@ discover_generic_fields:
# CLI flag: -ruler.tenant-shard-size
[ruler_tenant_shard_size: <int> | default = 0]
# Enable WAL replay on ruler startup. Disabling this can reduce memory usage on
# startup at the cost of not recovering in-memory WAL metrics on restart.
# CLI flag: -ruler.enable-wal-replay
[ruler_enable_wal_replay: <boolean> | default = true]
# Disable recording rules remote-write.
[ruler_remote_write_disabled: <boolean>]

@ -135,6 +135,7 @@ type RulesLimits interface {
RulerMaxRuleGroupsPerTenant(userID string) int
RulerMaxRulesPerRuleGroup(userID string) int
RulerAlertManagerConfig(userID string) *config.AlertManagerConfig
RulerEnableWALReplay(userID string) bool
}
func MetricsQueryFunc(qf rules.QueryFunc, queries, failedQueries prometheus.Counter) rules.QueryFunc {

@ -89,6 +89,7 @@ type ruleLimits struct {
maxRulesPerRuleGroup int
maxRuleGroups int
alertManagerConfig map[string]*config.AlertManagerConfig
enableWALReplay bool
}
func (r ruleLimits) RulerTenantShardSize(_ string) int {
@ -107,6 +108,10 @@ func (r ruleLimits) RulerAlertManagerConfig(tenantID string) *config.AlertManage
return r.alertManagerConfig[tenantID]
}
func (r ruleLimits) RulerEnableWALReplay(_ string) bool {
return r.enableWALReplay
}
func testQueryableFunc(q storage.Querier) storage.QueryableFunc {
if q != nil {
return func(_, _ int64) (storage.Querier, error) {
@ -140,7 +145,7 @@ func testSetup(t *testing.T, q storage.Querier) (*promql.Engine, storage.Queryab
reg := prometheus.NewRegistry()
queryable := testQueryableFunc(q)
return engine, queryable, pusher, l, ruleLimits{maxRuleGroups: 20, maxRulesPerRuleGroup: 15}, reg
return engine, queryable, pusher, l, ruleLimits{maxRuleGroups: 20, maxRulesPerRuleGroup: 15, enableWALReplay: true}, reg
}
func newManager(t *testing.T, cfg Config, q storage.Querier) *DefaultMultiTenantManager {

@ -57,7 +57,7 @@ func newWALRegistry(logger log.Logger, reg prometheus.Registerer, config Config,
return nullRegistry{}
}
manager := createInstanceManager(logger, reg)
manager := createInstanceManager(logger, reg, overrides)
return &walRegistry{
logger: logger,
@ -75,10 +75,11 @@ func newWALRegistry(logger log.Logger, reg prometheus.Registerer, config Config,
}
}
func createInstanceManager(logger log.Logger, reg prometheus.Registerer) *instance.BasicManager {
func createInstanceManager(logger log.Logger, reg prometheus.Registerer, overrides RulesLimits) *instance.BasicManager {
tenantManager := &tenantWALManager{
reg: reg,
logger: log.With(logger, "manager", "tenant-wal"),
reg: reg,
logger: log.With(logger, "manager", "tenant-wal"),
overrides: overrides,
}
return instance.NewBasicManager(instance.BasicManagerConfig{
@ -430,8 +431,9 @@ type readyChecker interface {
}
type tenantWALManager struct {
logger log.Logger
reg prometheus.Registerer
logger log.Logger
reg prometheus.Registerer
overrides RulesLimits
}
func (t *tenantWALManager) newInstance(c instance.Config) (instance.ManagedInstance, error) {
@ -439,8 +441,14 @@ func (t *tenantWALManager) newInstance(c instance.Config) (instance.ManagedInsta
"tenant": c.Tenant,
}, t.reg)
// create metrics here and pass down
return instance.New(reg, c, wal.NewMetrics(reg), t.logger)
// Get the per-tenant setting for WAL replay from overrides
enableReplay := true // Default to true for backward compatibility
if t.overrides != nil {
enableReplay = t.overrides.RulerEnableWALReplay(c.Tenant)
}
// create the instance with our custom walFactory
return instance.New(reg, c, wal.NewMetrics(reg), t.logger, enableReplay)
}
type storageRegistryMetrics struct {

@ -168,9 +168,11 @@ func newFakeLimitsBackwardCompat() fakeLimits {
limits: map[string]*validation.Limits{
enabledRWTenant: {
RulerRemoteWriteQueueCapacity: 987,
RulerEnableWALReplay: true,
},
disabledRWTenant: {
RulerRemoteWriteDisabled: true,
RulerEnableWALReplay: false,
},
additionalHeadersRWTenant: {
RulerRemoteWriteHeaders: validation.NewOverwriteMarshalingStringMap(map[string]string{
@ -231,9 +233,11 @@ func newFakeLimits() fakeLimits {
QueueConfig: config.QueueConfig{Capacity: 987},
},
},
RulerEnableWALReplay: true,
},
disabledRWTenant: {
RulerRemoteWriteDisabled: true,
RulerEnableWALReplay: false,
},
additionalHeadersRWTenant: {
RulerRemoteWriteConfig: map[string]config.RemoteWriteConfig{

@ -179,13 +179,13 @@ type Instance struct {
// New creates a new Instance with a directory for storing the WAL. The instance
// will not start until Run is called on the instance.
func New(reg prometheus.Registerer, cfg Config, metrics *wal.Metrics, logger log.Logger) (*Instance, error) {
func New(reg prometheus.Registerer, cfg Config, metrics *wal.Metrics, logger log.Logger, enableReplay bool) (*Instance, error) {
logger = log.With(logger, "instance", cfg.Name)
instWALDir := filepath.Join(cfg.Dir, cfg.Tenant)
newWal := func(reg prometheus.Registerer) (walStorage, error) {
return wal.NewStorage(logger, metrics, reg, instWALDir)
return wal.NewStorage(logger, metrics, reg, instWALDir, enableReplay)
}
return newInstance(cfg, reg, logger, newWal, cfg.Tenant)

@ -69,7 +69,7 @@ type Storage struct {
}
// NewStorage makes a new Storage.
func NewStorage(logger log.Logger, metrics *Metrics, registerer prometheus.Registerer, path string) (*Storage, error) {
func NewStorage(logger log.Logger, metrics *Metrics, registerer prometheus.Registerer, path string, enableReplay bool) (*Storage, error) {
w, err := wlog.NewSize(util_log.SlogFromGoKit(logger), registerer, SubDirectory(path), wlog.DefaultSegmentSize, wlog.CompressionSnappy)
if err != nil {
return nil, err
@ -106,21 +106,24 @@ func NewStorage(logger log.Logger, metrics *Metrics, registerer prometheus.Regis
}
start := time.Now()
if err := storage.replayWAL(); err != nil {
metrics.TotalCorruptions.Inc()
level.Warn(storage.logger).Log("msg", "encountered WAL read error, attempting repair", "err", err)
if err := w.Repair(err); err != nil {
metrics.TotalFailedRepairs.Inc()
metrics.ReplayDuration.Observe(time.Since(start).Seconds())
return nil, errors.Wrap(err, "repair corrupted WAL")
}
if enableReplay {
if err := storage.replayWAL(); err != nil {
metrics.TotalCorruptions.Inc()
level.Warn(storage.logger).Log("msg", "encountered WAL read error, attempting repair", "err", err)
if err := w.Repair(err); err != nil {
metrics.TotalFailedRepairs.Inc()
metrics.ReplayDuration.Observe(time.Since(start).Seconds())
return nil, errors.Wrap(err, "repair corrupted WAL")
}
metrics.TotalSucceededRepairs.Inc()
metrics.TotalSucceededRepairs.Inc()
}
metrics.ReplayDuration.Observe(time.Since(start).Seconds())
} else {
level.Info(storage.logger).Log("msg", "WAL replay disabled")
}
metrics.ReplayDuration.Observe(time.Since(start).Seconds())
go storage.recordSize()
return storage, nil

@ -24,7 +24,7 @@ import (
func newTestStorage(walDir string) (*Storage, error) {
metrics := NewMetrics(prometheus.DefaultRegisterer)
return NewStorage(log.NewNopLogger(), metrics, nil, walDir)
return NewStorage(log.NewNopLogger(), metrics, nil, walDir, true)
}
func TestStorage_InvalidSeries(t *testing.T) {
@ -323,6 +323,53 @@ func TestStorage_TruncateAfterClose(t *testing.T) {
require.Error(t, ErrWALClosed, s.Truncate(0))
}
func TestStorage_DisableReplay(t *testing.T) {
walDir := t.TempDir()
// Create a WAL and write some data to it
metrics := NewMetrics(prometheus.DefaultRegisterer)
s, err := NewStorage(log.NewNopLogger(), metrics, nil, walDir, true)
require.NoError(t, err)
app := s.Appender(context.Background())
// Write some samples
payload := buildSeries([]string{"foo", "bar", "baz"})
for _, metric := range payload {
metric.Write(t, app)
}
require.NoError(t, app.Commit())
require.NoError(t, s.Close())
// Create a new WAL with replay disabled
s, err = NewStorage(log.NewNopLogger(), metrics, nil, walDir, false)
require.NoError(t, err)
// Verify that no series were loaded (replay didn't happen)
count := 0
for range s.series.iterator().Channel() {
count++
}
require.Equal(t, 0, count, "no series should have been loaded with replay disabled")
require.NoError(t, s.Close())
// Create a new WAL with replay enabled
s, err = NewStorage(log.NewNopLogger(), metrics, nil, walDir, true)
require.NoError(t, err)
defer func() {
require.NoError(t, s.Close())
}()
// Verify that series were loaded (replay happened)
count = 0
for range s.series.iterator().Channel() {
count++
}
require.Equal(t, len(payload), count, "series should have been loaded with replay enabled")
}
type sample struct {
ts int64
val float64

@ -145,6 +145,7 @@ type Limits struct {
RulerMaxRuleGroupsPerTenant int `yaml:"ruler_max_rule_groups_per_tenant" json:"ruler_max_rule_groups_per_tenant"`
RulerAlertManagerConfig *ruler_config.AlertManagerConfig `yaml:"ruler_alertmanager_config" json:"ruler_alertmanager_config" doc:"hidden"`
RulerTenantShardSize int `yaml:"ruler_tenant_shard_size" json:"ruler_tenant_shard_size"`
RulerEnableWALReplay bool `yaml:"ruler_enable_wal_replay" json:"ruler_enable_wal_replay" doc:"description=Enable WAL replay on ruler startup. Disabling this can reduce memory usage on startup at the cost of not recovering in-memory WAL metrics on restart."`
// TODO(dannyk): add HTTP client overrides (basic auth / tls config, etc)
// Ruler remote-write limits.
@ -379,6 +380,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.RulerMaxRulesPerRuleGroup, "ruler.max-rules-per-rule-group", 0, "Maximum number of rules per rule group per-tenant. 0 to disable.")
f.IntVar(&l.RulerMaxRuleGroupsPerTenant, "ruler.max-rule-groups-per-tenant", 0, "Maximum number of rule groups per-tenant. 0 to disable.")
f.IntVar(&l.RulerTenantShardSize, "ruler.tenant-shard-size", 0, "The default tenant's shard size when shuffle-sharding is enabled in the ruler. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.")
f.BoolVar(&l.RulerEnableWALReplay, "ruler.enable-wal-replay", true, "Enable WAL replay on ruler startup. Disabling this can reduce memory usage on startup at the cost of not recovering in-memory WAL metrics on restart.")
f.StringVar(&l.PerTenantOverrideConfig, "limits.per-user-override-config", "", "Feature renamed to 'runtime configuration', flag deprecated in favor of -runtime-config.file (runtime_config.file in YAML).")
_ = l.RetentionPeriod.Set("0s")
@ -847,6 +849,11 @@ func (o *Overrides) RulerTenantShardSize(userID string) int {
return o.getOverridesForUser(userID).RulerTenantShardSize
}
// RulerEnableWALReplay returns whether WAL replay is enabled for a given user.
func (o *Overrides) RulerEnableWALReplay(userID string) bool {
return o.getOverridesForUser(userID).RulerEnableWALReplay
}
func (o *Overrides) IngestionPartitionsTenantShardSize(userID string) int {
return o.getOverridesForUser(userID).IngestionPartitionsTenantShardSize
}

Loading…
Cancel
Save