Support labelallow stage in Promtail (#3468)

* add labelallow stage

* added docs

* add docs

* fix lint

* update stage name in _index.md
pull/3505/head
Aditya C S 5 years ago committed by GitHub
parent d42e5a2ce9
commit 0f5fceef8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      docs/sources/clients/promtail/stages/_index.md
  2. 41
      docs/sources/clients/promtail/stages/labelallow.md
  3. 2
      docs/sources/clients/promtail/stages/labeldrop.md
  4. 65
      pkg/logentry/stages/labelallow.go
  5. 72
      pkg/logentry/stages/labelallow_test.go
  6. 40
      pkg/logentry/stages/stage.go

@ -24,6 +24,7 @@ Action stages:
- [timestamp](timestamp/): Set the timestamp value for the log entry.
- [output](output/): Set the log line text.
- [labeldrop](labeldrop/): Drop label set for the log entry.
- [labelallow](labelallow/): Allow label set for the log entry.
- [labels](labels/): Update the label set for the log entry.
- [metrics](metrics/): Calculate metrics based on extracted data.
- [tenant](tenant/): Set the tenant ID value to use for the log entry.

@ -0,0 +1,41 @@
---
title: labelallow
---
# `labelallow` stage
The labelallow stage is an action stage that allows only the provided labels
to be included in the label set that is sent to Loki with the log entry.
## Schema
```yaml
labelallow:
- [<string>]
...
```
### Examples
For the given pipeline:
```yaml
kubernetes_sd_configs:
- role: pod
pipeline_stages:
- docker: {}
- labelallow:
- kubernetes_pod_name
- kubernetes_container_name
```
Given the following incoming labels:
- `kubernetes_pod_name`: `"loki-pqrs"`
- `kubernetes_container_name`: `"loki"`
- `kubernetes_pod_template_hash`: `"79f5db67b"`
- `kubernetes_controller_revision_hash`: `"774858987d"`
Only the below labels would be sent to `loki`
- `kubernetes_pod_name`: `"loki-pqrs"`
- `contaikubernetes_container_namener`: `"loki"`

@ -3,7 +3,7 @@ title: labeldrop
---
# `labeldrop` stage
The labeldrop stage is an action stage that takes drops labels from
The labeldrop stage is an action stage that drops labels from
the label set that is sent to Loki with the log entry.
## Schema

@ -0,0 +1,65 @@
package stages
import (
"time"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
)
const (
// ErrEmptyLabelAllowStageConfig error returned if config is empty
ErrEmptyLabelAllowStageConfig = "labelallow stage config cannot be empty"
)
// labelallowConfig is a slice of labels to be included
type LabelAllowConfig []string
func validateLabelAllowConfig(c LabelAllowConfig) error {
if c == nil || len(c) < 1 {
return errors.New(ErrEmptyLabelAllowStageConfig)
}
return nil
}
func newLabelAllowStage(configs interface{}) (Stage, error) {
cfgs := &LabelAllowConfig{}
err := mapstructure.Decode(configs, cfgs)
if err != nil {
return nil, err
}
err = validateLabelAllowConfig(*cfgs)
if err != nil {
return nil, err
}
labelMap := make(map[string]struct{})
for _, label := range *cfgs {
labelMap[label] = struct{}{}
}
return toStage(&labelAllowStage{
labels: labelMap,
}), nil
}
type labelAllowStage struct {
labels map[string]struct{}
}
// Process implements Stage
func (l *labelAllowStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
for label := range labels {
if _, ok := l.labels[string(label)]; !ok {
delete(labels, label)
}
}
}
// Name implements Stage
func (l *labelAllowStage) Name() string {
return StageTypeLabelAllow
}

@ -0,0 +1,72 @@
package stages
import (
"testing"
"time"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
ww "github.com/weaveworks/common/server"
)
func Test_addLabelStage_Process(t *testing.T) {
// Enable debug logging
cfg := &ww.Config{}
require.Nil(t, cfg.LogLevel.Set("debug"))
util_log.InitLogger(cfg)
Debug = true
tests := []struct {
name string
config *LabelAllowConfig
inputLabels model.LabelSet
expectedLabels model.LabelSet
}{
{
name: "allow single label",
config: &LabelAllowConfig{"testLabel1"},
inputLabels: model.LabelSet{
"testLabel1": "testValue",
"testLabel2": "testValue",
},
expectedLabels: model.LabelSet{
"testLabel1": "testValue",
},
},
{
name: "allow multiple labels",
config: &LabelAllowConfig{"testLabel1", "testLabel2"},
inputLabels: model.LabelSet{
"testLabel1": "testValue",
"testLabel2": "testValue",
"testLabel3": "testValue",
},
expectedLabels: model.LabelSet{
"testLabel1": "testValue",
"testLabel2": "testValue",
},
},
{
name: "allow non-existing label",
config: &LabelAllowConfig{"foobar"},
inputLabels: model.LabelSet{
"testLabel1": "testValue",
"testLabel2": "testValue",
},
expectedLabels: model.LabelSet{},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
st, err := newLabelAllowStage(test.config)
if err != nil {
t.Fatal(err)
}
out := processEntries(st, newEntry(nil, test.inputLabels, "", time.Now()))[0]
assert.Equal(t, test.expectedLabels, out.Labels)
})
}
}

@ -12,23 +12,24 @@ import (
)
const (
StageTypeJSON = "json"
StageTypeRegex = "regex"
StageTypeReplace = "replace"
StageTypeMetric = "metrics"
StageTypeLabel = "labels"
StageTypeLabelDrop = "labeldrop"
StageTypeTimestamp = "timestamp"
StageTypeOutput = "output"
StageTypeDocker = "docker"
StageTypeCRI = "cri"
StageTypeMatch = "match"
StageTypeTemplate = "template"
StageTypePipeline = "pipeline"
StageTypeTenant = "tenant"
StageTypeDrop = "drop"
StageTypeMultiline = "multiline"
StageTypePack = "pack"
StageTypeJSON = "json"
StageTypeRegex = "regex"
StageTypeReplace = "replace"
StageTypeMetric = "metrics"
StageTypeLabel = "labels"
StageTypeLabelDrop = "labeldrop"
StageTypeTimestamp = "timestamp"
StageTypeOutput = "output"
StageTypeDocker = "docker"
StageTypeCRI = "cri"
StageTypeMatch = "match"
StageTypeTemplate = "template"
StageTypePipeline = "pipeline"
StageTypeTenant = "tenant"
StageTypeDrop = "drop"
StageTypeMultiline = "multiline"
StageTypePack = "pack"
StageTypeLabelAllow = "labelallow"
)
// Processor takes an existing set of labels, timestamp and log entry and returns either a possibly mutated
@ -151,6 +152,11 @@ func New(logger log.Logger, jobName *string, stageType string,
if err != nil {
return nil, err
}
case StageTypeLabelAllow:
s, err = newLabelAllowStage(cfg)
if err != nil {
return nil, err
}
default:
return nil, errors.Errorf("Unknown stage type: %s", stageType)
}

Loading…
Cancel
Save