Promtail: adding pipeline stage for dropping labels (#2571)

* adding pipeline stage for dropping labels, our use case for this is to move unbound labels into the log line and dropping the label, fixes #2421

Signed-off-by: Roger Steneteg <rsteneteg@ea.com>

* Update docs/sources/clients/promtail/stages/labeldrop.md

Co-authored-by: Diana Payton <52059945+oddlittlebird@users.noreply.github.com>

* Update docs/sources/clients/promtail/stages/_index.md

Co-authored-by: Diana Payton <52059945+oddlittlebird@users.noreply.github.com>

* fixed inconsistent naming, removed unused logger in the labeldrop stage

Signed-off-by: Roger Steneteg <rsteneteg@ea.com>

Co-authored-by: Diana Payton <52059945+oddlittlebird@users.noreply.github.com>
pull/2620/head
Roger Steneteg 5 years ago committed by GitHub
parent c2e610b29d
commit edb5fc5176
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      docs/sources/clients/promtail/stages/_index.md
  2. 36
      docs/sources/clients/promtail/stages/labeldrop.md
  3. 58
      pkg/logentry/stages/labeldrop.go
  4. 69
      pkg/logentry/stages/labeldrop_test.go
  5. 6
      pkg/logentry/stages/stage.go

@ -22,6 +22,7 @@ Action stages:
* [timestamp](timestamp/): Set the timestamp value for the log entry.
* [output](output/): Set the log line text.
* [labeldrop](labeldrop/): Drop label set for the log entry.
* [labels](labels/): Update the label set for the log entry.
* [metrics](metrics/): Calculate metrics based on extracted data.
* [tenant](tenant/): Set the tenant ID value to use for the log entry.
@ -30,4 +31,3 @@ Filtering stages:
* [match](match/): Conditionally run stages based on the label set.
* [drop](drop/): Conditionally drop log lines based on several options.

@ -0,0 +1,36 @@
---
title: labeldrop
---
# `labeldrop` stage
The labeldrop stage is an action stage that takes drops labels from
the label set that is sent to Loki with the log entry.
## Schema
```yaml
labeldrop:
- [<string>]
...
```
### Examples
For the given pipeline:
```yaml
- replace:
expression: "(.*)"
replace: "pod_name:{{ .kubernetes_pod_name }} {{ .Value }}"
- labeldrop:
- kubernetes_pod_name
```
Given the following log line:
```
log message\n
```
The first stage would append the value of the`kubernetes_pod_name` label into the beginning of the log line.
The labeldrop stage would drop the label from being sent to Loki, and it would now be part of the log line instead.

@ -0,0 +1,58 @@
package stages
import (
"time"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
)
const (
// ErrEmptyLabelDropStageConfig error returned if config is empty
ErrEmptyLabelDropStageConfig = "labeldrop stage config cannot be empty"
)
// LabelDropConfig is a slice of labels to be dropped
type LabelDropConfig []string
func validateLabelDropConfig(c LabelDropConfig) error {
if c == nil || len(c) < 1 {
return errors.New(ErrEmptyLabelDropStageConfig)
}
return nil
}
func newLabelDropStage(configs interface{}) (*labelDropStage, error) {
cfgs := &LabelDropConfig{}
err := mapstructure.Decode(configs, cfgs)
if err != nil {
return nil, err
}
err = validateLabelDropConfig(*cfgs)
if err != nil {
return nil, err
}
return &labelDropStage{
cfgs: *cfgs,
}, nil
}
type labelDropStage struct {
cfgs LabelDropConfig
}
// Process implements Stage
func (l *labelDropStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
for _, label := range l.cfgs {
delete(labels, model.LabelName(label))
}
}
// Name implements Stage
func (l *labelDropStage) Name() string {
return StageTypeLabelDrop
}

@ -0,0 +1,69 @@
package stages
import (
"testing"
"github.com/cortexproject/cortex/pkg/util"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
ww "github.com/weaveworks/common/server"
)
func Test_dropLabelStage_Process(t *testing.T) {
// Enable debug logging
cfg := &ww.Config{}
cfg.LogLevel.Set("debug")
util.InitLogger(cfg)
Debug = true
tests := []struct {
name string
config *LabelDropConfig
inputLabels model.LabelSet
expectedLabels model.LabelSet
}{
{
name: "drop one label",
config: &LabelDropConfig{"testLabel1"},
inputLabels: model.LabelSet{
"testLabel1": "testValue",
"testLabel2": "testValue",
},
expectedLabels: model.LabelSet{
"testLabel2": "testValue",
},
},
{
name: "drop two labels",
config: &LabelDropConfig{"testLabel1", "testLabel2"},
inputLabels: model.LabelSet{
"testLabel1": "testValue",
"testLabel2": "testValue",
},
expectedLabels: model.LabelSet{},
},
{
name: "drop non-existing label",
config: &LabelDropConfig{"foobar"},
inputLabels: model.LabelSet{
"testLabel1": "testValue",
"testLabel2": "testValue",
},
expectedLabels: model.LabelSet{
"testLabel1": "testValue",
"testLabel2": "testValue",
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
st, err := newLabelDropStage(test.config)
if err != nil {
t.Fatal(err)
}
st.Process(test.inputLabels, map[string]interface{}{}, nil, nil)
assert.Equal(t, test.expectedLabels, test.inputLabels)
})
}
}

@ -15,6 +15,7 @@ const (
StageTypeReplace = "replace"
StageTypeMetric = "metrics"
StageTypeLabel = "labels"
StageTypeLabelDrop = "labeldrop"
StageTypeTimestamp = "timestamp"
StageTypeOutput = "output"
StageTypeDocker = "docker"
@ -77,6 +78,11 @@ func New(logger log.Logger, jobName *string, stageType string,
if err != nil {
return nil, err
}
case StageTypeLabelDrop:
s, err = newLabelDropStage(cfg)
if err != nil {
return nil, err
}
case StageTypeTimestamp:
s, err = newTimestampStage(logger, cfg)
if err != nil {

Loading…
Cancel
Save