Support tenant id from value (#6290)

Signed-off-by: Jan-Otto Kröpke <mail@jkroepke.de>
pull/6285/merge
Jan-Otto Kröpke 4 years ago committed by GitHub
parent 7df7169ccd
commit a1e0298a57
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 29
      clients/pkg/logentry/stages/tenant.go
  2. 46
      clients/pkg/logentry/stages/tenant_test.go
  3. 33
      docs/sources/clients/promtail/stages/tenant.md

@ -14,8 +14,8 @@ import (
) )
const ( const (
ErrTenantStageEmptySourceOrValue = "source or value config are required" ErrTenantStageEmptyLabelSourceOrValue = "label, source or value config are required"
ErrTenantStageConflictingSourceAndValue = "source and value are mutually exclusive: you should set source or value but not both" ErrTenantStageConflictingLabelSourceAndValue = "label, source and value are mutually exclusive: you should set source, value or label but not all"
) )
type tenantStage struct { type tenantStage struct {
@ -24,18 +24,19 @@ type tenantStage struct {
} }
type TenantConfig struct { type TenantConfig struct {
Label string `mapstructure:"label"`
Source string `mapstructure:"source"` Source string `mapstructure:"source"`
Value string `mapstructure:"value"` Value string `mapstructure:"value"`
} }
// validateTenantConfig validates the tenant stage configuration // validateTenantConfig validates the tenant stage configuration
func validateTenantConfig(c TenantConfig) error { func validateTenantConfig(c TenantConfig) error {
if c.Source == "" && c.Value == "" { if c.Source == "" && c.Value == "" && c.Label == "" {
return errors.New(ErrTenantStageEmptySourceOrValue) return errors.New(ErrTenantStageEmptyLabelSourceOrValue)
} }
if c.Source != "" && c.Value != "" { if c.Source != "" && c.Value != "" || c.Label != "" && c.Value != "" || c.Source != "" && c.Label != "" {
return errors.New(ErrTenantStageConflictingSourceAndValue) return errors.New(ErrTenantStageConflictingLabelSourceAndValue)
} }
return nil return nil
@ -67,6 +68,8 @@ func (s *tenantStage) Process(labels model.LabelSet, extracted map[string]interf
// Get tenant ID from source or configured value // Get tenant ID from source or configured value
if s.cfg.Source != "" { if s.cfg.Source != "" {
tenantID = s.getTenantFromSourceField(extracted) tenantID = s.getTenantFromSourceField(extracted)
} else if s.cfg.Label != "" {
tenantID = s.getTenantFromLabel(labels)
} else { } else {
tenantID = s.cfg.Value tenantID = s.cfg.Value
} }
@ -105,3 +108,17 @@ func (s *tenantStage) getTenantFromSourceField(extracted map[string]interface{})
return tenantID return tenantID
} }
func (s *tenantStage) getTenantFromLabel(labels model.LabelSet) string {
// Get the tenant ID from the label map
tenantID, ok := labels[model.LabelName(s.cfg.Label)]
if !ok {
if Debug {
level.Debug(s.logger).Log("msg", "the tenant source does not exist in the labels", "source", s.cfg.Source)
}
return ""
}
return string(tenantID)
}

@ -18,7 +18,7 @@ import (
util_log "github.com/grafana/loki/pkg/util/log" util_log "github.com/grafana/loki/pkg/util/log"
) )
var testTenantYaml = ` var testTenantYamlExtractedData = `
pipeline_stages: pipeline_stages:
- json: - json:
expressions: expressions:
@ -40,7 +40,7 @@ func TestPipelineWithMissingKey_Tenant(t *testing.T) {
var buf bytes.Buffer var buf bytes.Buffer
w := log.NewSyncWriter(&buf) w := log.NewSyncWriter(&buf)
logger := log.NewLogfmtLogger(w) logger := log.NewLogfmtLogger(w)
pl, err := NewPipeline(logger, loadConfig(testTenantYaml), nil, prometheus.DefaultRegisterer) pl, err := NewPipeline(logger, loadConfig(testTenantYamlExtractedData), nil, prometheus.DefaultRegisterer)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -74,26 +74,54 @@ func TestTenantStage_Validation(t *testing.T) {
}, },
"should fail on missing source and value": { "should fail on missing source and value": {
config: &TenantConfig{}, config: &TenantConfig{},
expectedErr: lokiutil.StringRef(ErrTenantStageEmptySourceOrValue), expectedErr: lokiutil.StringRef(ErrTenantStageEmptyLabelSourceOrValue),
}, },
"should fail on empty source": { "should fail on empty source": {
config: &TenantConfig{ config: &TenantConfig{
Source: "", Source: "",
}, },
expectedErr: lokiutil.StringRef(ErrTenantStageEmptySourceOrValue), expectedErr: lokiutil.StringRef(ErrTenantStageEmptyLabelSourceOrValue),
}, },
"should fail on empty value": { "should fail on empty value": {
config: &TenantConfig{ config: &TenantConfig{
Value: "", Value: "",
}, },
expectedErr: lokiutil.StringRef(ErrTenantStageEmptySourceOrValue), expectedErr: lokiutil.StringRef(ErrTenantStageEmptyLabelSourceOrValue),
},
"should fail on empty label": {
config: &TenantConfig{
Label: "",
},
expectedErr: lokiutil.StringRef(ErrTenantStageEmptyLabelSourceOrValue),
}, },
"should fail on both source and value set": { "should fail on both source and value set": {
config: &TenantConfig{ config: &TenantConfig{
Source: "tenant", Source: "tenant",
Value: "team-a", Value: "team-a",
}, },
expectedErr: lokiutil.StringRef(ErrTenantStageConflictingSourceAndValue), expectedErr: lokiutil.StringRef(ErrTenantStageConflictingLabelSourceAndValue),
},
"should fail on both source and label set": {
config: &TenantConfig{
Source: "tenant",
Label: "team-a",
},
expectedErr: lokiutil.StringRef(ErrTenantStageConflictingLabelSourceAndValue),
},
"should fail on both label and value set": {
config: &TenantConfig{
Label: "tenant",
Value: "team-a",
},
expectedErr: lokiutil.StringRef(ErrTenantStageConflictingLabelSourceAndValue),
},
"should fail on all set": {
config: &TenantConfig{
Label: "tenant",
Source: "tenant",
Value: "team-a",
},
expectedErr: lokiutil.StringRef(ErrTenantStageConflictingLabelSourceAndValue),
}, },
} }
@ -141,6 +169,12 @@ func TestTenantStage_Process(t *testing.T) {
inputExtracted: map[string]interface{}{"tenant_id": "bar"}, inputExtracted: map[string]interface{}{"tenant_id": "bar"},
expectedTenant: lokiutil.StringRef("bar"), expectedTenant: lokiutil.StringRef("bar"),
}, },
"should set the tenant if the label is defined in the label map": {
config: &TenantConfig{Label: "tenant_id"},
inputLabels: model.LabelSet{"tenant_id": "bar"},
inputExtracted: map[string]interface{}{},
expectedTenant: lokiutil.StringRef("bar"),
},
"should override the tenant if the source field is defined in the extracted map": { "should override the tenant if the source field is defined in the extracted map": {
config: &TenantConfig{Source: "tenant_id"}, config: &TenantConfig{Source: "tenant_id"},
inputLabels: model.LabelSet{client.ReservedLabelTenantID: "foo"}, inputLabels: model.LabelSet{client.ReservedLabelTenantID: "foo"},

@ -13,9 +13,13 @@ be used.
```yaml ```yaml
tenant: tenant:
# Name from extracted data to whose value should be set as tenant ID. # Either label, source or value config option is required, but not all (they
# Either source or value config option is required, but not both (they
# are mutually exclusive). # are mutually exclusive).
# Name from labels to whose value should be set as tenant ID.
[ label: <string> ]
# Name from extracted data to whose value should be set as tenant ID.
[ source: <string> ] [ source: <string> ]
# Value to use to set the tenant ID when this stage is executed. Useful # Value to use to set the tenant ID when this stage is executed. Useful
@ -81,3 +85,28 @@ The pipeline would:
1. Process the `match` stage checking if the `{app="api"}` selector matches 1. Process the `match` stage checking if the `{app="api"}` selector matches
and - whenever it matches - run the sub stages. The `tenant` sub stage and - whenever it matches - run the sub stages. The `tenant` sub stage
would override the tenant with the value `"team-api"`. would override the tenant with the value `"team-api"`.
### Example: extract the tenant ID from kubernetes sd
```yaml
scrape_configs:
- job_name: kubernetes-pods-name
kubernetes_sd_configs:
- role: pod
relabel_configs:
- action: replace
source_labels:
- __meta_kubernetes_namespace
target_label: namespace
pipeline_stages:
- match:
selector: '{namespace=".+"}'
stages:
- tenant:
label: "namespace"
- output:
source: message
```

Loading…
Cancel
Save