Introduce `overrides-exporter` module to Loki (#4520)

* Introduce overrides-exporter module from cortex into loki; add some test configs for local dev

* Fix panic by adding server dependency to overrides-exporter module

* Add tests for limits exporter

* Remove dummy configs for testing

* Add more limits to export

* Add more limits; move mockTenantLimits to exporter_test.go

* Introduce overrides-exporter module from cortex into loki; add some test configs for local dev

* Fix panic by adding server dependency to overrides-exporter module

* Add tests for limits exporter

* Remove dummy configs for testing

* Add more limits to export

* Add more limits; move mockTenantLimits to exporter_test.go

* Use reflection; add more limits to export

* Fix import to satisfy linter

* refactors TenantLimits for clarity & efficiency. Also fixes tenantLimits nil check

* Fix breaking test with nil check in runtime_config

Co-authored-by: Owen Diehl <ow.diehl@gmail.com>
pull/4531/head
JordanRushing 5 years ago committed by GitHub
parent e3726a19ca
commit ca67292b54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      pkg/distributor/validator_test.go
  2. 3
      pkg/loki/loki.go
  3. 18
      pkg/loki/modules.go
  4. 23
      pkg/loki/runtime_config.go
  5. 4
      pkg/ruler/registry_test.go
  6. 7
      pkg/storage/stores/shipper/compactor/retention/expiration.go
  7. 14
      pkg/storage/stores/shipper/compactor/retention/expiration_test.go
  8. 60
      pkg/validation/exporter.go
  9. 41
      pkg/validation/exporter_test.go
  10. 11
      pkg/validation/limits.go

@ -29,7 +29,10 @@ func (f fakeLimits) TenantLimits(userID string) *validation.Limits {
return f.limits
}
func (f fakeLimits) ForEachTenantLimit(validation.ForEachTenantLimitCallback) {}
// unused, but satisfies interface
func (f fakeLimits) AllByUserID() map[string]*validation.Limits {
return nil
}
func TestValidator_ValidateEntry(t *testing.T) {
tests := []struct {

@ -209,6 +209,7 @@ type Loki struct {
ring *ring.Ring
overrides *validation.Overrides
tenantConfigs *runtime.TenantConfigs
TenantLimits validation.TenantLimits
distributor *distributor.Distributor
Ingester *ingester.Ingester
Querier *querier.Querier
@ -409,6 +410,7 @@ func (t *Loki) setupModuleManager() error {
mm.RegisterModule(MemberlistKV, t.initMemberlistKV)
mm.RegisterModule(Ring, t.initRing)
mm.RegisterModule(Overrides, t.initOverrides)
mm.RegisterModule(OverridesExporter, t.initOverridesExporter)
mm.RegisterModule(TenantConfigs, t.initTenantConfigs)
mm.RegisterModule(Distributor, t.initDistributor)
mm.RegisterModule(Store, t.initStore)
@ -429,6 +431,7 @@ func (t *Loki) setupModuleManager() error {
deps := map[string][]string{
Ring: {RuntimeConfig, Server, MemberlistKV},
Overrides: {RuntimeConfig},
OverridesExporter: {RuntimeConfig, Server},
TenantConfigs: {RuntimeConfig},
Distributor: {Ring, Server, Overrides, TenantConfigs},
Store: {Overrides},

@ -63,6 +63,7 @@ const (
Ring string = "ring"
RuntimeConfig string = "runtime-config"
Overrides string = "overrides"
OverridesExporter string = "overrides-exporter"
TenantConfigs string = "tenant-configs"
Server string = "server"
Distributor string = "distributor"
@ -152,15 +153,30 @@ func (t *Loki) initRuntimeConfig() (services.Service, error) {
var err error
t.runtimeConfig, err = runtimeconfig.New(t.Cfg.RuntimeConfig, prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer), util_log.Logger)
t.TenantLimits = newtenantLimitsFromRuntimeConfig(t.runtimeConfig)
return t.runtimeConfig, err
}
func (t *Loki) initOverrides() (_ services.Service, err error) {
t.overrides, err = validation.NewOverrides(t.Cfg.LimitsConfig, newtenantLimitsFromRuntimeConfig(t.runtimeConfig))
t.overrides, err = validation.NewOverrides(t.Cfg.LimitsConfig, t.TenantLimits)
// overrides are not a service, since they don't have any operational state.
return nil, err
}
func (t *Loki) initOverridesExporter() (services.Service, error) {
if t.Cfg.isModuleEnabled(OverridesExporter) && t.TenantLimits == nil {
// This target isn't enabled by default ("all") and requires per-tenant limits to run.
return nil, errors.New("overrides-exporter has been enabled, but no runtime configuration file was configured")
}
exporter := validation.NewOverridesExporter(t.TenantLimits)
prometheus.MustRegister(exporter)
// The overrides-exporter has no state and reads overrides for runtime configuration each time it
// is collected so there is no need to return any service.
return nil, nil
}
func (t *Loki) initTenantConfigs() (_ services.Service, err error) {
t.tenantConfigs, err = runtime.NewTenantConfigs(tenantConfigFromRuntimeConfig(t.runtimeConfig))
// tenantConfigs are not a service, since they don't have any operational state.

@ -56,30 +56,21 @@ type tenantLimitsFromRuntimeConfig struct {
c *runtimeconfig.Manager
}
func (t *tenantLimitsFromRuntimeConfig) TenantLimits(userID string) *validation.Limits {
func (t *tenantLimitsFromRuntimeConfig) AllByUserID() map[string]*validation.Limits {
if t.c == nil {
return nil
}
cfg, ok := t.c.GetConfig().(*runtimeConfigValues)
if !ok || cfg == nil {
return nil
if cfg != nil && ok {
return cfg.TenantLimits
}
return cfg.TenantLimits[userID]
return nil
}
func (t *tenantLimitsFromRuntimeConfig) ForEachTenantLimit(callback validation.ForEachTenantLimitCallback) {
if t.c == nil {
return
}
cfg, ok := t.c.GetConfig().(*runtimeConfigValues)
if !ok || cfg == nil {
return
}
for userID, tenantLimit := range cfg.TenantLimits {
callback(userID, tenantLimit)
}
func (t *tenantLimitsFromRuntimeConfig) TenantLimits(userID string) *validation.Limits {
return t.AllByUserID()[userID]
}
func newtenantLimitsFromRuntimeConfig(c *runtimeconfig.Manager) validation.TenantLimits {

@ -372,4 +372,6 @@ func (f fakeLimits) TenantLimits(userID string) *validation.Limits {
return limits
}
func (f fakeLimits) ForEachTenantLimit(validation.ForEachTenantLimitCallback) {}
func (f fakeLimits) AllByUserID() map[string]*validation.Limits {
return f.limits
}

@ -30,7 +30,7 @@ type expirationChecker struct {
type Limits interface {
RetentionPeriod(userID string) time.Duration
StreamRetention(userID string) []validation.StreamRetention
ForEachTenantLimit(validation.ForEachTenantLimitCallback)
AllByUserID() map[string]*validation.Limits
DefaultLimits() *validation.Limits
}
@ -123,7 +123,7 @@ func findSmallestRetentionPeriod(limits Limits) time.Duration {
}
}
limits.ForEachTenantLimit(func(userID string, limit *validation.Limits) {
for _, limit := range limits.AllByUserID() {
if limit.RetentionPeriod < smallestRetentionPeriod {
smallestRetentionPeriod = limit.RetentionPeriod
}
@ -132,7 +132,8 @@ func findSmallestRetentionPeriod(limits Limits) time.Duration {
smallestRetentionPeriod = streamRetention.Period
}
}
})
}
return time.Duration(smallestRetentionPeriod)
}

@ -36,16 +36,18 @@ func (f fakeLimits) StreamRetention(userID string) []validation.StreamRetention
return f.perTenant[userID].streamRetention
}
func (f fakeLimits) ForEachTenantLimit(callback validation.ForEachTenantLimitCallback) {
for userID, limit := range f.perTenant {
callback(userID, limit.convertToValidationLimit())
}
}
func (f fakeLimits) DefaultLimits() *validation.Limits {
return f.defaultLimit.convertToValidationLimit()
}
func (f fakeLimits) AllByUserID() map[string]*validation.Limits {
res := make(map[string]*validation.Limits)
for userID, ret := range f.perTenant {
res[userID] = ret.convertToValidationLimit()
}
return res
}
func Test_expirationChecker_Expired(t *testing.T) {
e := NewExpirationChecker(&fakeLimits{
perTenant: map[string]retentionLimit{

@ -0,0 +1,60 @@
package validation
import (
"reflect"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/util/flagext"
)
type OverridesExporter struct {
tenantLimits TenantLimits
description *prometheus.Desc
}
// TODO(jordanrushing): break out overrides from defaults?
func NewOverridesExporter(tenantLimits TenantLimits) *OverridesExporter {
return &OverridesExporter{
tenantLimits: tenantLimits,
description: prometheus.NewDesc(
"loki_overrides",
"Resource limit overrides applied to tenants",
[]string{"limit_name", "user"},
nil,
),
}
}
func (oe *OverridesExporter) Describe(ch chan<- *prometheus.Desc) {
ch <- oe.description
}
func (oe *OverridesExporter) Collect(ch chan<- prometheus.Metric) {
var metricValue float64
var metricLabelValue string
var rv reflect.Value
for tenant, limits := range oe.tenantLimits.AllByUserID() {
rv = reflect.ValueOf(limits).Elem()
for i := 0; i < rv.NumField(); i++ {
switch rv.Field(i).Interface().(type) {
case int, time.Duration:
metricValue = float64(rv.Field(i).Int())
case model.Duration:
metricValue = float64(rv.Field(i).Interface().(model.Duration))
case flagext.ByteSize:
metricValue = float64(rv.Field(i).Uint())
case float64:
metricValue = rv.Field(i).Float()
default:
continue
}
metricLabelValue = rv.Type().Field(i).Tag.Get("yaml")
ch <- prometheus.MustNewConstMetric(oe.description, prometheus.GaugeValue, metricValue, metricLabelValue, tenant)
}
}
}

@ -0,0 +1,41 @@
package validation
import (
"testing"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
)
type mockTenantLimits struct {
limits map[string]*Limits
}
func newMockTenantLimits(limits map[string]*Limits) *mockTenantLimits {
return &mockTenantLimits{
limits: limits,
}
}
func (l *mockTenantLimits) TenantLimits(userID string) *Limits {
return l.limits[userID]
}
func (l *mockTenantLimits) AllByUserID() map[string]*Limits { return l.limits }
func TestOverridesExporter_noConfig(t *testing.T) {
exporter := NewOverridesExporter(newMockTenantLimits(nil))
count := testutil.CollectAndCount(exporter, "loki_overrides")
assert.Equal(t, 0, count)
}
func TestOverridesExporter_withConfig(t *testing.T) {
tenantLimits := map[string]*Limits{
"tenant-a": {
MaxQueriersPerTenant: 5,
},
}
exporter := NewOverridesExporter(newMockTenantLimits(tenantLimits))
count := testutil.CollectAndCount(exporter, "loki_overrides")
assert.Greater(t, count, 0)
}

@ -234,13 +234,12 @@ func SetDefaultLimitsForYAMLUnmarshalling(defaults Limits) {
defaultLimits = &defaults
}
type ForEachTenantLimitCallback func(userID string, limit *Limits)
type TenantLimits interface {
// TenantLimits is a function that returns limits for given tenant, or
// nil, if there are no tenant-specific limits.
TenantLimits(userID string) *Limits
ForEachTenantLimit(ForEachTenantLimitCallback)
// AllByUserID gets a mapping of all tenant IDs and limits for that user
AllByUserID() map[string]*Limits
}
// Overrides periodically fetch a set of per-user overrides, and provides convenience
@ -258,6 +257,8 @@ func NewOverrides(defaults Limits, tenantLimits TenantLimits) (*Overrides, error
}, nil
}
func (o *Overrides) AllByUserID() map[string]*Limits { return o.tenantLimits.AllByUserID() }
// IngestionRateStrategy returns whether the ingestion rate limit should be individually applied
// to each distributor instance (local) or evenly shared across the cluster (global).
func (o *Overrides) IngestionRateStrategy() string {
@ -505,10 +506,6 @@ func (o *Overrides) UnorderedWrites(userID string) bool {
return o.getOverridesForUser(userID).UnorderedWrites
}
func (o *Overrides) ForEachTenantLimit(callback ForEachTenantLimitCallback) {
o.tenantLimits.ForEachTenantLimit(callback)
}
func (o *Overrides) DefaultLimits() *Limits {
return o.defaultLimits
}

Loading…
Cancel
Save