From a5790f332ef7fa0d363f9cce2cb2806ccda008ca Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Tue, 31 Aug 2021 16:13:42 +0200 Subject: [PATCH] Chore: Use runtimeconfig from dskit (#4227) * Use runtimeconfig from dskit Signed-off-by: Arve Knudsen * runtimeconfig: Use cortex_ prefix for metrics Signed-off-by: Arve Knudsen --- pkg/loki/loki.go | 40 ++-- pkg/loki/modules.go | 4 +- pkg/loki/runtime_config.go | 2 +- pkg/loki/runtime_config_test.go | 7 +- .../grafana/dskit/runtimeconfig/manager.go | 210 ++++++++++++++++++ vendor/modules.txt | 1 + 6 files changed, 238 insertions(+), 26 deletions(-) create mode 100644 vendor/github.com/grafana/dskit/runtimeconfig/manager.go diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index f3cdcbb2d4..5b871cf172 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -18,11 +18,11 @@ import ( "github.com/cortexproject/cortex/pkg/util/fakeauth" "github.com/cortexproject/cortex/pkg/util/grpc/healthcheck" util_log "github.com/cortexproject/cortex/pkg/util/log" - "github.com/cortexproject/cortex/pkg/util/runtimeconfig" "github.com/felixge/fgprof" "github.com/go-kit/kit/log/level" "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/modules" + "github.com/grafana/dskit/runtimeconfig" "github.com/grafana/dskit/services" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -52,25 +52,25 @@ type Config struct { AuthEnabled bool `yaml:"auth_enabled,omitempty"` HTTPPrefix string `yaml:"http_prefix"` - Server server.Config `yaml:"server,omitempty"` - Distributor distributor.Config `yaml:"distributor,omitempty"` - Querier querier.Config `yaml:"querier,omitempty"` - IngesterClient client.Config `yaml:"ingester_client,omitempty"` - Ingester ingester.Config `yaml:"ingester,omitempty"` - StorageConfig storage.Config `yaml:"storage_config,omitempty"` - ChunkStoreConfig storage.ChunkStoreConfig `yaml:"chunk_store_config,omitempty"` - SchemaConfig storage.SchemaConfig `yaml:"schema_config,omitempty"` - LimitsConfig validation.Limits `yaml:"limits_config,omitempty"` - TableManager chunk.TableManagerConfig `yaml:"table_manager,omitempty"` - Worker worker.Config `yaml:"frontend_worker,omitempty"` - Frontend lokifrontend.Config `yaml:"frontend,omitempty"` - Ruler ruler.Config `yaml:"ruler,omitempty"` - QueryRange queryrange.Config `yaml:"query_range,omitempty"` - RuntimeConfig runtimeconfig.ManagerConfig `yaml:"runtime_config,omitempty"` - MemberlistKV memberlist.KVConfig `yaml:"memberlist"` - Tracing tracing.Config `yaml:"tracing"` - CompactorConfig compactor.Config `yaml:"compactor,omitempty"` - QueryScheduler scheduler.Config `yaml:"query_scheduler"` + Server server.Config `yaml:"server,omitempty"` + Distributor distributor.Config `yaml:"distributor,omitempty"` + Querier querier.Config `yaml:"querier,omitempty"` + IngesterClient client.Config `yaml:"ingester_client,omitempty"` + Ingester ingester.Config `yaml:"ingester,omitempty"` + StorageConfig storage.Config `yaml:"storage_config,omitempty"` + ChunkStoreConfig storage.ChunkStoreConfig `yaml:"chunk_store_config,omitempty"` + SchemaConfig storage.SchemaConfig `yaml:"schema_config,omitempty"` + LimitsConfig validation.Limits `yaml:"limits_config,omitempty"` + TableManager chunk.TableManagerConfig `yaml:"table_manager,omitempty"` + Worker worker.Config `yaml:"frontend_worker,omitempty"` + Frontend lokifrontend.Config `yaml:"frontend,omitempty"` + Ruler ruler.Config `yaml:"ruler,omitempty"` + QueryRange queryrange.Config `yaml:"query_range,omitempty"` + RuntimeConfig runtimeconfig.Config `yaml:"runtime_config,omitempty"` + MemberlistKV memberlist.KVConfig `yaml:"memberlist"` + Tracing tracing.Config `yaml:"tracing"` + CompactorConfig compactor.Config `yaml:"compactor,omitempty"` + QueryScheduler scheduler.Config `yaml:"query_scheduler"` } // RegisterFlags registers flag. diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 1318d73835..80c21d564a 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -24,8 +24,8 @@ import ( "github.com/cortexproject/cortex/pkg/scheduler" "github.com/cortexproject/cortex/pkg/scheduler/schedulerpb" util_log "github.com/cortexproject/cortex/pkg/util/log" - "github.com/cortexproject/cortex/pkg/util/runtimeconfig" "github.com/go-kit/kit/log/level" + "github.com/grafana/dskit/runtimeconfig" "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" @@ -149,7 +149,7 @@ func (t *Loki) initRuntimeConfig() (services.Service, error) { validation.SetDefaultLimitsForYAMLUnmarshalling(t.Cfg.LimitsConfig) var err error - t.runtimeConfig, err = runtimeconfig.NewRuntimeConfigManager(t.Cfg.RuntimeConfig, prometheus.DefaultRegisterer) + t.runtimeConfig, err = runtimeconfig.New(t.Cfg.RuntimeConfig, prometheus.WrapRegistererWithPrefix("cortex_", prometheus.DefaultRegisterer), util_log.Logger) return t.runtimeConfig, err } diff --git a/pkg/loki/runtime_config.go b/pkg/loki/runtime_config.go index 43d7b3715b..f49771098d 100644 --- a/pkg/loki/runtime_config.go +++ b/pkg/loki/runtime_config.go @@ -5,7 +5,7 @@ import ( "io" "github.com/cortexproject/cortex/pkg/ring/kv" - "github.com/cortexproject/cortex/pkg/util/runtimeconfig" + "github.com/grafana/dskit/runtimeconfig" "gopkg.in/yaml.v2" "github.com/grafana/loki/pkg/runtime" diff --git a/pkg/loki/runtime_config_test.go b/pkg/loki/runtime_config_test.go index e70ce37ca1..b38afc16fc 100644 --- a/pkg/loki/runtime_config_test.go +++ b/pkg/loki/runtime_config_test.go @@ -9,7 +9,8 @@ import ( "testing" "time" - "github.com/cortexproject/cortex/pkg/util/runtimeconfig" + "github.com/go-kit/kit/log" + "github.com/grafana/dskit/runtimeconfig" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" @@ -91,7 +92,7 @@ func newTestOverrides(t *testing.T, yaml string) *validation.Overrides { loader := func(_ io.Reader) (interface{}, error) { return loadRuntimeConfig(strings.NewReader(yaml)) } - cfg := runtimeconfig.ManagerConfig{ + cfg := runtimeconfig.Config{ ReloadPeriod: 1 * time.Second, Loader: loader, LoadPath: path, @@ -102,7 +103,7 @@ func newTestOverrides(t *testing.T, yaml string) *validation.Overrides { require.NoError(t, flagset.Parse(nil)) validation.SetDefaultLimitsForYAMLUnmarshalling(defaults) - runtimeConfig, err := runtimeconfig.NewRuntimeConfigManager(cfg, prometheus.DefaultRegisterer) + runtimeConfig, err := runtimeconfig.New(cfg, prometheus.WrapRegistererWithPrefix("cortex_", prometheus.DefaultRegisterer), log.NewNopLogger()) require.NoError(t, err) require.NoError(t, runtimeConfig.StartAsync(context.Background())) diff --git a/vendor/github.com/grafana/dskit/runtimeconfig/manager.go b/vendor/github.com/grafana/dskit/runtimeconfig/manager.go new file mode 100644 index 0000000000..f650663694 --- /dev/null +++ b/vendor/github.com/grafana/dskit/runtimeconfig/manager.go @@ -0,0 +1,210 @@ +package runtimeconfig + +import ( + "bytes" + "context" + "crypto/sha256" + "flag" + "fmt" + "io" + "io/ioutil" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/grafana/dskit/services" +) + +// Loader loads the configuration from file. +type Loader func(r io.Reader) (interface{}, error) + +// Config holds the config for an Manager instance. +// It holds config related to loading per-tenant config. +type Config struct { + ReloadPeriod time.Duration `yaml:"period"` + // LoadPath contains the path to the runtime config file, requires an + // non-empty value + LoadPath string `yaml:"file"` + Loader Loader `yaml:"-"` +} + +// RegisterFlags registers flags. +func (mc *Config) RegisterFlags(f *flag.FlagSet) { + f.StringVar(&mc.LoadPath, "runtime-config.file", "", "File with the configuration that can be updated in runtime.") + f.DurationVar(&mc.ReloadPeriod, "runtime-config.reload-period", 10*time.Second, "How often to check runtime config file.") +} + +// Manager periodically reloads the configuration from a file, and keeps this +// configuration available for clients. +type Manager struct { + services.Service + + cfg Config + logger log.Logger + + listenersMtx sync.Mutex + listeners []chan interface{} + + configMtx sync.RWMutex + config interface{} + + configLoadSuccess prometheus.Gauge + configHash *prometheus.GaugeVec +} + +// New creates an instance of Manager and starts reload config loop based on config +func New(cfg Config, registerer prometheus.Registerer, logger log.Logger) (*Manager, error) { + if cfg.LoadPath == "" { + return nil, errors.New("LoadPath is empty") + } + + mgr := Manager{ + cfg: cfg, + configLoadSuccess: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ + Name: "runtime_config_last_reload_successful", + Help: "Whether the last runtime-config reload attempt was successful.", + }), + configHash: promauto.With(registerer).NewGaugeVec(prometheus.GaugeOpts{ + Name: "runtime_config_hash", + Help: "Hash of the currently active runtime config file.", + }, []string{"sha256"}), + logger: logger, + } + + mgr.Service = services.NewBasicService(mgr.starting, mgr.loop, mgr.stopping) + return &mgr, nil +} + +func (om *Manager) starting(_ context.Context) error { + if om.cfg.LoadPath == "" { + return nil + } + + return errors.Wrap(om.loadConfig(), "failed to load runtime config") +} + +// CreateListenerChannel creates new channel that can be used to receive new config values. +// If there is no receiver waiting for value when config manager tries to send the update, +// or channel buffer is full, update is discarded. +// +// When config manager is stopped, it closes all channels to notify receivers that they will +// not receive any more updates. +func (om *Manager) CreateListenerChannel(buffer int) <-chan interface{} { + ch := make(chan interface{}, buffer) + + om.listenersMtx.Lock() + defer om.listenersMtx.Unlock() + + om.listeners = append(om.listeners, ch) + return ch +} + +// CloseListenerChannel removes given channel from list of channels to send notifications to and closes channel. +func (om *Manager) CloseListenerChannel(listener <-chan interface{}) { + om.listenersMtx.Lock() + defer om.listenersMtx.Unlock() + + for ix, ch := range om.listeners { + if ch == listener { + om.listeners = append(om.listeners[:ix], om.listeners[ix+1:]...) + close(ch) + break + } + } +} + +func (om *Manager) loop(ctx context.Context) error { + if om.cfg.LoadPath == "" { + level.Info(om.logger).Log("msg", "runtime config disabled: file not specified") + <-ctx.Done() + return nil + } + + ticker := time.NewTicker(om.cfg.ReloadPeriod) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + err := om.loadConfig() + if err != nil { + // Log but don't stop on error - we don't want to halt all ingesters because of a typo + level.Error(om.logger).Log("msg", "failed to load config", "err", err) + } + case <-ctx.Done(): + return nil + } + } +} + +// loadConfig loads configuration using the loader function, and if successful, +// stores it as current configuration and notifies listeners. +func (om *Manager) loadConfig() error { + buf, err := ioutil.ReadFile(om.cfg.LoadPath) + if err != nil { + om.configLoadSuccess.Set(0) + return errors.Wrap(err, "read file") + } + hash := sha256.Sum256(buf) + + cfg, err := om.cfg.Loader(bytes.NewReader(buf)) + if err != nil { + om.configLoadSuccess.Set(0) + return errors.Wrap(err, "load file") + } + om.configLoadSuccess.Set(1) + + om.setConfig(cfg) + om.callListeners(cfg) + + // expose hash of runtime config + om.configHash.Reset() + om.configHash.WithLabelValues(fmt.Sprintf("%x", hash[:])).Set(1) + + return nil +} + +func (om *Manager) setConfig(config interface{}) { + om.configMtx.Lock() + defer om.configMtx.Unlock() + om.config = config +} + +func (om *Manager) callListeners(newValue interface{}) { + om.listenersMtx.Lock() + defer om.listenersMtx.Unlock() + + for _, ch := range om.listeners { + select { + case ch <- newValue: + // ok + default: + // nobody is listening or buffer full. + } + } +} + +// Stop stops the Manager +func (om *Manager) stopping(_ error) error { + om.listenersMtx.Lock() + defer om.listenersMtx.Unlock() + + for _, ch := range om.listeners { + close(ch) + } + om.listeners = nil + return nil +} + +// GetConfig returns last loaded config value, possibly nil. +func (om *Manager) GetConfig() interface{} { + om.configMtx.RLock() + defer om.configMtx.RUnlock() + + return om.config +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 3e8c9fc179..0ce8248148 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -572,6 +572,7 @@ github.com/gorilla/websocket github.com/grafana/dskit/backoff github.com/grafana/dskit/flagext github.com/grafana/dskit/modules +github.com/grafana/dskit/runtimeconfig github.com/grafana/dskit/services # github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 ## explicit