diff --git a/pkg/ruler/registry.go b/pkg/ruler/registry.go index 0cdabee380..43f6600825 100644 --- a/pkg/ruler/registry.go +++ b/pkg/ruler/registry.go @@ -10,7 +10,6 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - "github.com/imdario/mergo" "github.com/prometheus/client_golang/prometheus" promConfig "github.com/prometheus/common/config" "github.com/prometheus/common/model" @@ -200,7 +199,7 @@ func (r *walRegistry) getTenantConfig(tenant string) (instance.Config, error) { // ensure that no variation of the X-Scope-OrgId header can be added, which might trick authentication for k, _ := range rwCfg.Client.Headers { - if strings.ToLower(user.OrgIDHeaderName) == strings.ToLower(k) { + if strings.ToLower(user.OrgIDHeaderName) == strings.ToLower(strings.TrimSpace(k)) { delete(rwCfg.Client.Headers, k) } } @@ -220,14 +219,44 @@ func (r *walRegistry) getTenantConfig(tenant string) (instance.Config, error) { } func (r *walRegistry) getTenantRemoteWriteConfig(tenant string, base RemoteWriteConfig) (*RemoteWriteConfig, error) { - copy, err := base.Clone() + overrides, err := base.Clone() if err != nil { return nil, fmt.Errorf("error generating tenant remote-write config: %w", err) } - u, err := url.Parse(r.overrides.RulerRemoteWriteURL(tenant)) - if err != nil { - return nil, fmt.Errorf("error parsing given remote-write URL: %w", err) + overrides.Client.Name = fmt.Sprintf("%s-rw", tenant) + overrides.Client.SendExemplars = false + // TODO(dannyk): configure HTTP client overrides + overrides.Client.HTTPClientConfig = promConfig.HTTPClientConfig{} + // metadata is only used by prometheus scrape configs + overrides.Client.MetadataConfig = config.MetadataConfig{Send: false} + overrides.Client.SigV4Config = nil + + if r.overrides.RulerRemoteWriteDisabled(tenant) { + overrides.Enabled = false + } + + if v := r.overrides.RulerRemoteWriteURL(tenant); v != "" { + u, err := url.Parse(v) + if err != nil { + return nil, fmt.Errorf("error parsing given remote-write URL: %w", err) + } + overrides.Client.URL = &promConfig.URL{u} + } + + if v := r.overrides.RulerRemoteWriteTimeout(tenant); v > 0 { + overrides.Client.RemoteTimeout = model.Duration(v) + } + + // merge headers with the base + if v := r.overrides.RulerRemoteWriteHeaders(tenant); len(v) > 0 { + if overrides.Client.Headers == nil { + overrides.Client.Headers = make(map[string]string, len(v)) + } + + for k, val := range v { + overrides.Client.Headers[k] = val + } } relabelConfigs, err := r.createRelabelConfigs(tenant) @@ -235,46 +264,46 @@ func (r *walRegistry) getTenantRemoteWriteConfig(tenant string, base RemoteWrite return nil, fmt.Errorf("failed to parse relabel configs: %w", err) } - overrides := RemoteWriteConfig{ - Client: config.RemoteWriteConfig{ - URL: &promConfig.URL{u}, - RemoteTimeout: model.Duration(r.overrides.RulerRemoteWriteTimeout(tenant)), - Headers: r.overrides.RulerRemoteWriteHeaders(tenant), - WriteRelabelConfigs: relabelConfigs, - Name: fmt.Sprintf("%s-rw", tenant), - SendExemplars: false, - // TODO(dannyk): configure HTTP client overrides - HTTPClientConfig: promConfig.HTTPClientConfig{}, - QueueConfig: config.QueueConfig{ - Capacity: r.overrides.RulerRemoteWriteQueueCapacity(tenant), - MaxShards: r.overrides.RulerRemoteWriteQueueMaxShards(tenant), - MinShards: r.overrides.RulerRemoteWriteQueueMinShards(tenant), - MaxSamplesPerSend: r.overrides.RulerRemoteWriteQueueMaxSamplesPerSend(tenant), - BatchSendDeadline: model.Duration(r.overrides.RulerRemoteWriteQueueBatchSendDeadline(tenant)), - MinBackoff: model.Duration(r.overrides.RulerRemoteWriteQueueMinBackoff(tenant)), - MaxBackoff: model.Duration(r.overrides.RulerRemoteWriteQueueMaxBackoff(tenant)), - RetryOnRateLimit: r.overrides.RulerRemoteWriteQueueRetryOnRateLimit(tenant), - }, - MetadataConfig: config.MetadataConfig{ - Send: false, - }, - SigV4Config: nil, - }, - Enabled: true, - } - - err = mergo.Merge(copy, overrides, mergo.WithOverride, mergo.WithOverrideEmptySlice) - if err != nil { - return nil, err + // if any relabel configs are defined for a tenant, override all base relabel configs, + // even if an empty list is configured; however if this value is not overridden for a tenant, + // it should retain the base value + if relabelConfigs != nil { + overrides.Client.WriteRelabelConfigs = relabelConfigs } - // we can't use mergo.WithOverwriteWithEmptyValue since that will set all the default values, so here we - // explicitly apply some config options that might be set to their type's zero value - if r.overrides.RulerRemoteWriteDisabled(tenant) { - copy.Enabled = false + if v := r.overrides.RulerRemoteWriteQueueCapacity(tenant); v > 0 { + overrides.Client.QueueConfig.Capacity = v } - return copy, nil + if v := r.overrides.RulerRemoteWriteQueueMinShards(tenant); v > 0 { + overrides.Client.QueueConfig.MinShards = v + } + + if v := r.overrides.RulerRemoteWriteQueueMaxShards(tenant); v > 0 { + overrides.Client.QueueConfig.MaxShards = v + } + + if v := r.overrides.RulerRemoteWriteQueueMaxSamplesPerSend(tenant); v > 0 { + overrides.Client.QueueConfig.MaxSamplesPerSend = v + } + + if v := r.overrides.RulerRemoteWriteQueueMinBackoff(tenant); v > 0 { + overrides.Client.QueueConfig.MinBackoff = model.Duration(v) + } + + if v := r.overrides.RulerRemoteWriteQueueMaxBackoff(tenant); v > 0 { + overrides.Client.QueueConfig.MaxBackoff = model.Duration(v) + } + + if v := r.overrides.RulerRemoteWriteQueueBatchSendDeadline(tenant); v > 0 { + overrides.Client.QueueConfig.BatchSendDeadline = model.Duration(v) + } + + if v := r.overrides.RulerRemoteWriteQueueRetryOnRateLimit(tenant); v { + overrides.Client.QueueConfig.RetryOnRateLimit = v + } + + return overrides, nil } // createRelabelConfigs converts the util.RelabelConfig into relabel.Config to allow for @@ -282,8 +311,14 @@ func (r *walRegistry) getTenantRemoteWriteConfig(tenant string, base RemoteWrite func (r *walRegistry) createRelabelConfigs(tenant string) ([]*relabel.Config, error) { configs := r.overrides.RulerRemoteWriteRelabelConfigs(tenant) - var relabelConfigs []*relabel.Config - for _, config := range configs { + // zero value is nil, which we want to treat as "no override" + if configs == nil { + return nil, nil + } + + // we want to treat an empty slice as "no relabel configs" + relabelConfigs := make([]*relabel.Config, len(configs)) + for i, config := range configs { out, err := yaml.Marshal(config) if err != nil { return nil, err @@ -294,7 +329,7 @@ func (r *walRegistry) createRelabelConfigs(tenant string) ([]*relabel.Config, er return nil, err } - relabelConfigs = append(relabelConfigs, &rc) + relabelConfigs[i] = &rc } return relabelConfigs, nil diff --git a/pkg/ruler/registry_test.go b/pkg/ruler/registry_test.go index e180b7ae9c..8ffcc79163 100644 --- a/pkg/ruler/registry_test.go +++ b/pkg/ruler/registry_test.go @@ -25,11 +25,13 @@ import ( "github.com/grafana/loki/pkg/validation" ) -const enabledRWTenant = "12345" -const disabledRWTenant = "54321" -const additionalHeadersRWTenant = "55443" -const customRelabelsTenant = "98765" -const badRelabelsTenant = "45677" +const enabledRWTenant = "enabled" +const disabledRWTenant = "disabled" +const additionalHeadersRWTenant = "additional-headers" +const customRelabelsTenant = "custom-relabels" +const badRelabelsTenant = "bad-relabels" +const nilRelabelsTenant = "nil-relabels" +const emptySliceRelabelsTenant = "empty-slice-relabels" const defaultCapacity = 1000 @@ -43,14 +45,13 @@ func newFakeLimits() fakeLimits { RulerRemoteWriteDisabled: true, }, additionalHeadersRWTenant: { - RulerRemoteWriteHeaders: validation.OverwriteMarshalingStringMap{ - M: map[string]string{ - user.OrgIDHeaderName: "overridden", - strings.ToLower(user.OrgIDHeaderName): "overridden-lower", - strings.ToUpper(user.OrgIDHeaderName): "overridden-upper", - "Additional": "Header", - }, - }, + RulerRemoteWriteHeaders: validation.NewOverwriteMarshalingStringMap(map[string]string{ + user.OrgIDHeaderName: "overridden", + fmt.Sprintf(" %s ", user.OrgIDHeaderName): "overridden", + strings.ToLower(user.OrgIDHeaderName): "overridden-lower", + strings.ToUpper(user.OrgIDHeaderName): "overridden-upper", + "Additional": "Header", + }), }, customRelabelsTenant: { RulerRemoteWriteRelabelConfigs: []*util.RelabelConfig{ @@ -65,6 +66,10 @@ func newFakeLimits() fakeLimits { }, }, }, + nilRelabelsTenant: {}, + emptySliceRelabelsTenant: { + RulerRemoteWriteRelabelConfigs: []*util.RelabelConfig{}, + }, badRelabelsTenant: { RulerRemoteWriteRelabelConfigs: []*util.RelabelConfig{ { @@ -87,11 +92,16 @@ func setupRegistry(t *testing.T, dir string) *walRegistry { QueueConfig: config.QueueConfig{ Capacity: defaultCapacity, }, + Headers: map[string]string{ + "Base": "value", + }, WriteRelabelConfigs: []*relabel.Config{ { SourceLabels: []model.LabelName{"__name__"}, Regex: relabel.MustNewRegexp("ALERTS.*"), Action: "drop", + Separator: ";", + Replacement: "$1", }, }, }, @@ -169,9 +179,11 @@ func TestTenantRemoteWriteHeaderOverride(t *testing.T) { tenantCfg, err := reg.getTenantConfig(additionalHeadersRWTenant) require.NoError(t, err) - assert.Len(t, tenantCfg.RemoteWrite[0].Headers, 2) + assert.Len(t, tenantCfg.RemoteWrite[0].Headers, 3) // ensure that tenant cannot override X-Scope-OrgId header assert.Equal(t, tenantCfg.RemoteWrite[0].Headers[user.OrgIDHeaderName], additionalHeadersRWTenant) + // and that the base header defined is set + assert.Equal(t, tenantCfg.RemoteWrite[0].Headers["Base"], "value") // but that the additional header defined is set assert.Equal(t, tenantCfg.RemoteWrite[0].Headers["Additional"], "Header") @@ -195,6 +207,32 @@ func TestRelabelConfigOverrides(t *testing.T) { assert.Len(t, tenantCfg.RemoteWrite[0].WriteRelabelConfigs, 2) } +func TestRelabelConfigOverridesNilWriteRelabels(t *testing.T) { + walDir, err := createTempWALDir() + require.NoError(t, err) + reg := setupRegistry(t, walDir) + defer os.RemoveAll(walDir) + + tenantCfg, err := reg.getTenantConfig(nilRelabelsTenant) + require.NoError(t, err) + + // if there are no relabel configs defined for the tenant, it should not override + assert.Equal(t, tenantCfg.RemoteWrite[0].WriteRelabelConfigs, reg.config.RemoteWrite.Client.WriteRelabelConfigs) +} + +func TestRelabelConfigOverridesEmptySliceWriteRelabels(t *testing.T) { + walDir, err := createTempWALDir() + require.NoError(t, err) + reg := setupRegistry(t, walDir) + defer os.RemoveAll(walDir) + + tenantCfg, err := reg.getTenantConfig(emptySliceRelabelsTenant) + require.NoError(t, err) + + // if there is an empty slice of relabel configs, it should clear existing relabel configs + assert.Len(t, tenantCfg.RemoteWrite[0].WriteRelabelConfigs, 0) +} + func TestRelabelConfigOverridesWithErrors(t *testing.T) { walDir, err := createTempWALDir() require.NoError(t, err) diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 8b3397ab18..cd203e30ce 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -535,17 +535,21 @@ func (o *Overrides) getOverridesForUser(userID string) *Limits { // OverwriteMarshalingStringMap will overwrite the src map when unmarshaling // as opposed to merging. type OverwriteMarshalingStringMap struct { - M map[string]string + m map[string]string +} + +func NewOverwriteMarshalingStringMap(m map[string]string) OverwriteMarshalingStringMap { + return OverwriteMarshalingStringMap{m: m} } func (sm *OverwriteMarshalingStringMap) Map() map[string]string { - return sm.M + return sm.m } // MarshalJSON explicitly uses the the type receiver and not pointer receiver // or it won't be called func (sm OverwriteMarshalingStringMap) MarshalJSON() ([]byte, error) { - return json.Marshal(sm.M) + return json.Marshal(sm.m) } func (sm *OverwriteMarshalingStringMap) UnmarshalJSON(val []byte) error { @@ -553,7 +557,7 @@ func (sm *OverwriteMarshalingStringMap) UnmarshalJSON(val []byte) error { if err := json.Unmarshal(val, &def); err != nil { return err } - sm.M = def + sm.m = def return nil @@ -562,7 +566,7 @@ func (sm *OverwriteMarshalingStringMap) UnmarshalJSON(val []byte) error { // MarshalYAML explicitly uses the the type receiver and not pointer receiver // or it won't be called func (sm OverwriteMarshalingStringMap) MarshalYAML() (interface{}, error) { - return sm.M, nil + return sm.m, nil } func (sm *OverwriteMarshalingStringMap) UnmarshalYAML(unmarshal func(interface{}) error) error { @@ -572,7 +576,7 @@ func (sm *OverwriteMarshalingStringMap) UnmarshalYAML(unmarshal func(interface{} if err != nil { return err } - sm.M = def + sm.m = def return nil } diff --git a/pkg/validation/limits_test.go b/pkg/validation/limits_test.go index 507f5ea71b..a5141a6cbe 100644 --- a/pkg/validation/limits_test.go +++ b/pkg/validation/limits_test.go @@ -113,9 +113,7 @@ per_tenant_override_period: 230s } func TestOverwriteMarshalingStringMapJSON(t *testing.T) { - m := OverwriteMarshalingStringMap{ - M: map[string]string{"foo": "bar"}, - } + m := NewOverwriteMarshalingStringMap(map[string]string{"foo": "bar"}) require.Nil(t, json.Unmarshal([]byte(`{"bazz": "buzz"}`), &m)) require.Equal(t, map[string]string{"bazz": "buzz"}, m.Map()) @@ -127,9 +125,7 @@ func TestOverwriteMarshalingStringMapJSON(t *testing.T) { } func TestOverwriteMarshalingStringMapYAML(t *testing.T) { - m := OverwriteMarshalingStringMap{ - M: map[string]string{"foo": "bar"}, - } + m := NewOverwriteMarshalingStringMap(map[string]string{"foo": "bar"}) require.Nil(t, yaml.Unmarshal([]byte(`{"bazz": "buzz"}`), &m)) require.Equal(t, map[string]string{"bazz": "buzz"}, m.Map())