Ruler: Refactoring remote-write config overrides (#4429)

* Refactoring remote-write config overrides

Replacing mergo.Merge because it cannot distinguish between a nil and an empty slice, which is needed to overwrite relabel configs

Signed-off-by: Danny Kopping <danny.kopping@grafana.com>

* Simplifying condition

Signed-off-by: Danny Kopping <danny.kopping@grafana.com>

* Merging headers instead of overwriting blindly

Adding extra check for header smuggling

Signed-off-by: Danny Kopping <danny.kopping@grafana.com>

* Unexporting OverwriteMarshalingStringMap field, preferring constructor

Signed-off-by: Danny Kopping <danny.kopping@grafana.com>
pull/4432/head
Danny Kopping 4 years ago committed by GitHub
parent 415d4b03fa
commit a961612436
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 121
      pkg/ruler/registry.go
  2. 58
      pkg/ruler/registry_test.go
  3. 16
      pkg/validation/limits.go
  4. 8
      pkg/validation/limits_test.go

@ -10,7 +10,6 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/imdario/mergo"
"github.com/prometheus/client_golang/prometheus"
promConfig "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
@ -200,7 +199,7 @@ func (r *walRegistry) getTenantConfig(tenant string) (instance.Config, error) {
// ensure that no variation of the X-Scope-OrgId header can be added, which might trick authentication
for k, _ := range rwCfg.Client.Headers {
if strings.ToLower(user.OrgIDHeaderName) == strings.ToLower(k) {
if strings.ToLower(user.OrgIDHeaderName) == strings.ToLower(strings.TrimSpace(k)) {
delete(rwCfg.Client.Headers, k)
}
}
@ -220,61 +219,91 @@ func (r *walRegistry) getTenantConfig(tenant string) (instance.Config, error) {
}
func (r *walRegistry) getTenantRemoteWriteConfig(tenant string, base RemoteWriteConfig) (*RemoteWriteConfig, error) {
copy, err := base.Clone()
overrides, err := base.Clone()
if err != nil {
return nil, fmt.Errorf("error generating tenant remote-write config: %w", err)
}
u, err := url.Parse(r.overrides.RulerRemoteWriteURL(tenant))
overrides.Client.Name = fmt.Sprintf("%s-rw", tenant)
overrides.Client.SendExemplars = false
// TODO(dannyk): configure HTTP client overrides
overrides.Client.HTTPClientConfig = promConfig.HTTPClientConfig{}
// metadata is only used by prometheus scrape configs
overrides.Client.MetadataConfig = config.MetadataConfig{Send: false}
overrides.Client.SigV4Config = nil
if r.overrides.RulerRemoteWriteDisabled(tenant) {
overrides.Enabled = false
}
if v := r.overrides.RulerRemoteWriteURL(tenant); v != "" {
u, err := url.Parse(v)
if err != nil {
return nil, fmt.Errorf("error parsing given remote-write URL: %w", err)
}
overrides.Client.URL = &promConfig.URL{u}
}
if v := r.overrides.RulerRemoteWriteTimeout(tenant); v > 0 {
overrides.Client.RemoteTimeout = model.Duration(v)
}
// merge headers with the base
if v := r.overrides.RulerRemoteWriteHeaders(tenant); len(v) > 0 {
if overrides.Client.Headers == nil {
overrides.Client.Headers = make(map[string]string, len(v))
}
for k, val := range v {
overrides.Client.Headers[k] = val
}
}
relabelConfigs, err := r.createRelabelConfigs(tenant)
if err != nil {
return nil, fmt.Errorf("failed to parse relabel configs: %w", err)
}
overrides := RemoteWriteConfig{
Client: config.RemoteWriteConfig{
URL: &promConfig.URL{u},
RemoteTimeout: model.Duration(r.overrides.RulerRemoteWriteTimeout(tenant)),
Headers: r.overrides.RulerRemoteWriteHeaders(tenant),
WriteRelabelConfigs: relabelConfigs,
Name: fmt.Sprintf("%s-rw", tenant),
SendExemplars: false,
// TODO(dannyk): configure HTTP client overrides
HTTPClientConfig: promConfig.HTTPClientConfig{},
QueueConfig: config.QueueConfig{
Capacity: r.overrides.RulerRemoteWriteQueueCapacity(tenant),
MaxShards: r.overrides.RulerRemoteWriteQueueMaxShards(tenant),
MinShards: r.overrides.RulerRemoteWriteQueueMinShards(tenant),
MaxSamplesPerSend: r.overrides.RulerRemoteWriteQueueMaxSamplesPerSend(tenant),
BatchSendDeadline: model.Duration(r.overrides.RulerRemoteWriteQueueBatchSendDeadline(tenant)),
MinBackoff: model.Duration(r.overrides.RulerRemoteWriteQueueMinBackoff(tenant)),
MaxBackoff: model.Duration(r.overrides.RulerRemoteWriteQueueMaxBackoff(tenant)),
RetryOnRateLimit: r.overrides.RulerRemoteWriteQueueRetryOnRateLimit(tenant),
},
MetadataConfig: config.MetadataConfig{
Send: false,
},
SigV4Config: nil,
},
Enabled: true,
}
err = mergo.Merge(copy, overrides, mergo.WithOverride, mergo.WithOverrideEmptySlice)
if err != nil {
return nil, err
// if any relabel configs are defined for a tenant, override all base relabel configs,
// even if an empty list is configured; however if this value is not overridden for a tenant,
// it should retain the base value
if relabelConfigs != nil {
overrides.Client.WriteRelabelConfigs = relabelConfigs
}
// we can't use mergo.WithOverwriteWithEmptyValue since that will set all the default values, so here we
// explicitly apply some config options that might be set to their type's zero value
if r.overrides.RulerRemoteWriteDisabled(tenant) {
copy.Enabled = false
if v := r.overrides.RulerRemoteWriteQueueCapacity(tenant); v > 0 {
overrides.Client.QueueConfig.Capacity = v
}
return copy, nil
if v := r.overrides.RulerRemoteWriteQueueMinShards(tenant); v > 0 {
overrides.Client.QueueConfig.MinShards = v
}
if v := r.overrides.RulerRemoteWriteQueueMaxShards(tenant); v > 0 {
overrides.Client.QueueConfig.MaxShards = v
}
if v := r.overrides.RulerRemoteWriteQueueMaxSamplesPerSend(tenant); v > 0 {
overrides.Client.QueueConfig.MaxSamplesPerSend = v
}
if v := r.overrides.RulerRemoteWriteQueueMinBackoff(tenant); v > 0 {
overrides.Client.QueueConfig.MinBackoff = model.Duration(v)
}
if v := r.overrides.RulerRemoteWriteQueueMaxBackoff(tenant); v > 0 {
overrides.Client.QueueConfig.MaxBackoff = model.Duration(v)
}
if v := r.overrides.RulerRemoteWriteQueueBatchSendDeadline(tenant); v > 0 {
overrides.Client.QueueConfig.BatchSendDeadline = model.Duration(v)
}
if v := r.overrides.RulerRemoteWriteQueueRetryOnRateLimit(tenant); v {
overrides.Client.QueueConfig.RetryOnRateLimit = v
}
return overrides, nil
}
// createRelabelConfigs converts the util.RelabelConfig into relabel.Config to allow for
@ -282,8 +311,14 @@ func (r *walRegistry) getTenantRemoteWriteConfig(tenant string, base RemoteWrite
func (r *walRegistry) createRelabelConfigs(tenant string) ([]*relabel.Config, error) {
configs := r.overrides.RulerRemoteWriteRelabelConfigs(tenant)
var relabelConfigs []*relabel.Config
for _, config := range configs {
// zero value is nil, which we want to treat as "no override"
if configs == nil {
return nil, nil
}
// we want to treat an empty slice as "no relabel configs"
relabelConfigs := make([]*relabel.Config, len(configs))
for i, config := range configs {
out, err := yaml.Marshal(config)
if err != nil {
return nil, err
@ -294,7 +329,7 @@ func (r *walRegistry) createRelabelConfigs(tenant string) ([]*relabel.Config, er
return nil, err
}
relabelConfigs = append(relabelConfigs, &rc)
relabelConfigs[i] = &rc
}
return relabelConfigs, nil

@ -25,11 +25,13 @@ import (
"github.com/grafana/loki/pkg/validation"
)
const enabledRWTenant = "12345"
const disabledRWTenant = "54321"
const additionalHeadersRWTenant = "55443"
const customRelabelsTenant = "98765"
const badRelabelsTenant = "45677"
const enabledRWTenant = "enabled"
const disabledRWTenant = "disabled"
const additionalHeadersRWTenant = "additional-headers"
const customRelabelsTenant = "custom-relabels"
const badRelabelsTenant = "bad-relabels"
const nilRelabelsTenant = "nil-relabels"
const emptySliceRelabelsTenant = "empty-slice-relabels"
const defaultCapacity = 1000
@ -43,14 +45,13 @@ func newFakeLimits() fakeLimits {
RulerRemoteWriteDisabled: true,
},
additionalHeadersRWTenant: {
RulerRemoteWriteHeaders: validation.OverwriteMarshalingStringMap{
M: map[string]string{
RulerRemoteWriteHeaders: validation.NewOverwriteMarshalingStringMap(map[string]string{
user.OrgIDHeaderName: "overridden",
fmt.Sprintf(" %s ", user.OrgIDHeaderName): "overridden",
strings.ToLower(user.OrgIDHeaderName): "overridden-lower",
strings.ToUpper(user.OrgIDHeaderName): "overridden-upper",
"Additional": "Header",
},
},
}),
},
customRelabelsTenant: {
RulerRemoteWriteRelabelConfigs: []*util.RelabelConfig{
@ -65,6 +66,10 @@ func newFakeLimits() fakeLimits {
},
},
},
nilRelabelsTenant: {},
emptySliceRelabelsTenant: {
RulerRemoteWriteRelabelConfigs: []*util.RelabelConfig{},
},
badRelabelsTenant: {
RulerRemoteWriteRelabelConfigs: []*util.RelabelConfig{
{
@ -87,11 +92,16 @@ func setupRegistry(t *testing.T, dir string) *walRegistry {
QueueConfig: config.QueueConfig{
Capacity: defaultCapacity,
},
Headers: map[string]string{
"Base": "value",
},
WriteRelabelConfigs: []*relabel.Config{
{
SourceLabels: []model.LabelName{"__name__"},
Regex: relabel.MustNewRegexp("ALERTS.*"),
Action: "drop",
Separator: ";",
Replacement: "$1",
},
},
},
@ -169,9 +179,11 @@ func TestTenantRemoteWriteHeaderOverride(t *testing.T) {
tenantCfg, err := reg.getTenantConfig(additionalHeadersRWTenant)
require.NoError(t, err)
assert.Len(t, tenantCfg.RemoteWrite[0].Headers, 2)
assert.Len(t, tenantCfg.RemoteWrite[0].Headers, 3)
// ensure that tenant cannot override X-Scope-OrgId header
assert.Equal(t, tenantCfg.RemoteWrite[0].Headers[user.OrgIDHeaderName], additionalHeadersRWTenant)
// and that the base header defined is set
assert.Equal(t, tenantCfg.RemoteWrite[0].Headers["Base"], "value")
// but that the additional header defined is set
assert.Equal(t, tenantCfg.RemoteWrite[0].Headers["Additional"], "Header")
@ -195,6 +207,32 @@ func TestRelabelConfigOverrides(t *testing.T) {
assert.Len(t, tenantCfg.RemoteWrite[0].WriteRelabelConfigs, 2)
}
func TestRelabelConfigOverridesNilWriteRelabels(t *testing.T) {
walDir, err := createTempWALDir()
require.NoError(t, err)
reg := setupRegistry(t, walDir)
defer os.RemoveAll(walDir)
tenantCfg, err := reg.getTenantConfig(nilRelabelsTenant)
require.NoError(t, err)
// if there are no relabel configs defined for the tenant, it should not override
assert.Equal(t, tenantCfg.RemoteWrite[0].WriteRelabelConfigs, reg.config.RemoteWrite.Client.WriteRelabelConfigs)
}
func TestRelabelConfigOverridesEmptySliceWriteRelabels(t *testing.T) {
walDir, err := createTempWALDir()
require.NoError(t, err)
reg := setupRegistry(t, walDir)
defer os.RemoveAll(walDir)
tenantCfg, err := reg.getTenantConfig(emptySliceRelabelsTenant)
require.NoError(t, err)
// if there is an empty slice of relabel configs, it should clear existing relabel configs
assert.Len(t, tenantCfg.RemoteWrite[0].WriteRelabelConfigs, 0)
}
func TestRelabelConfigOverridesWithErrors(t *testing.T) {
walDir, err := createTempWALDir()
require.NoError(t, err)

@ -535,17 +535,21 @@ func (o *Overrides) getOverridesForUser(userID string) *Limits {
// OverwriteMarshalingStringMap will overwrite the src map when unmarshaling
// as opposed to merging.
type OverwriteMarshalingStringMap struct {
M map[string]string
m map[string]string
}
func NewOverwriteMarshalingStringMap(m map[string]string) OverwriteMarshalingStringMap {
return OverwriteMarshalingStringMap{m: m}
}
func (sm *OverwriteMarshalingStringMap) Map() map[string]string {
return sm.M
return sm.m
}
// MarshalJSON explicitly uses the the type receiver and not pointer receiver
// or it won't be called
func (sm OverwriteMarshalingStringMap) MarshalJSON() ([]byte, error) {
return json.Marshal(sm.M)
return json.Marshal(sm.m)
}
func (sm *OverwriteMarshalingStringMap) UnmarshalJSON(val []byte) error {
@ -553,7 +557,7 @@ func (sm *OverwriteMarshalingStringMap) UnmarshalJSON(val []byte) error {
if err := json.Unmarshal(val, &def); err != nil {
return err
}
sm.M = def
sm.m = def
return nil
@ -562,7 +566,7 @@ func (sm *OverwriteMarshalingStringMap) UnmarshalJSON(val []byte) error {
// MarshalYAML explicitly uses the the type receiver and not pointer receiver
// or it won't be called
func (sm OverwriteMarshalingStringMap) MarshalYAML() (interface{}, error) {
return sm.M, nil
return sm.m, nil
}
func (sm *OverwriteMarshalingStringMap) UnmarshalYAML(unmarshal func(interface{}) error) error {
@ -572,7 +576,7 @@ func (sm *OverwriteMarshalingStringMap) UnmarshalYAML(unmarshal func(interface{}
if err != nil {
return err
}
sm.M = def
sm.m = def
return nil
}

@ -113,9 +113,7 @@ per_tenant_override_period: 230s
}
func TestOverwriteMarshalingStringMapJSON(t *testing.T) {
m := OverwriteMarshalingStringMap{
M: map[string]string{"foo": "bar"},
}
m := NewOverwriteMarshalingStringMap(map[string]string{"foo": "bar"})
require.Nil(t, json.Unmarshal([]byte(`{"bazz": "buzz"}`), &m))
require.Equal(t, map[string]string{"bazz": "buzz"}, m.Map())
@ -127,9 +125,7 @@ func TestOverwriteMarshalingStringMapJSON(t *testing.T) {
}
func TestOverwriteMarshalingStringMapYAML(t *testing.T) {
m := OverwriteMarshalingStringMap{
M: map[string]string{"foo": "bar"},
}
m := NewOverwriteMarshalingStringMap(map[string]string{"foo": "bar"})
require.Nil(t, yaml.Unmarshal([]byte(`{"bazz": "buzz"}`), &m))
require.Equal(t, map[string]string{"bazz": "buzz"}, m.Map())

Loading…
Cancel
Save