mirror of https://github.com/grafana/loki
add static labels in stages (#4495)
Signed-off-by: Sankalp Rangare <sankalprangare786@gmail.com>pull/4565/head
parent
ea8e0987b3
commit
9e2acd656d
@ -0,0 +1,87 @@ |
||||
package stages |
||||
|
||||
import ( |
||||
"fmt" |
||||
"reflect" |
||||
"time" |
||||
|
||||
"github.com/go-kit/log" |
||||
"github.com/go-kit/log/level" |
||||
"github.com/mitchellh/mapstructure" |
||||
"github.com/pkg/errors" |
||||
"github.com/prometheus/common/model" |
||||
) |
||||
|
||||
const ( |
||||
// ErrEmptyStaticLabelStageConfig error returned if config is empty
|
||||
ErrEmptyStaticLabelStageConfig = "static_labels stage config cannot be empty" |
||||
) |
||||
|
||||
// StaticLabelConfig is a slice of static-labels to be included
|
||||
type StaticLabelConfig map[string]*string |
||||
|
||||
func validateLabelStaticConfig(c StaticLabelConfig) error { |
||||
if c == nil { |
||||
return errors.New(ErrEmptyStaticLabelStageConfig) |
||||
} |
||||
for labelName := range c { |
||||
if !model.LabelName(labelName).IsValid() { |
||||
return fmt.Errorf(ErrInvalidLabelName, labelName) |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func newStaticLabelsStage(logger log.Logger, configs interface{}) (Stage, error) { |
||||
cfgs := &StaticLabelConfig{} |
||||
err := mapstructure.Decode(configs, cfgs) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
err = validateLabelStaticConfig(*cfgs) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return toStage(&StaticLabelStage{ |
||||
cfgs: *cfgs, |
||||
logger: logger, |
||||
}), nil |
||||
} |
||||
|
||||
type StaticLabelStage struct { |
||||
cfgs StaticLabelConfig |
||||
logger log.Logger |
||||
} |
||||
|
||||
// Process implements Stage
|
||||
func (l *StaticLabelStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { |
||||
|
||||
for lName, lSrc := range l.cfgs { |
||||
if lSrc == nil || *lSrc == "" { |
||||
continue |
||||
} |
||||
s, err := getString(*lSrc) |
||||
if err != nil { |
||||
if Debug { |
||||
level.Debug(l.logger).Log("msg", "failed to convert static label value to string", "err", err, "type", reflect.TypeOf(lSrc)) |
||||
} |
||||
continue |
||||
} |
||||
lvalue := model.LabelValue(s) |
||||
if !lvalue.IsValid() { |
||||
if Debug { |
||||
level.Debug(l.logger).Log("msg", "invalid label value parsed", "value", lvalue) |
||||
} |
||||
continue |
||||
} |
||||
lname := model.LabelName(lName) |
||||
labels[lname] = lvalue |
||||
} |
||||
} |
||||
|
||||
// Name implements Stage
|
||||
func (l *StaticLabelStage) Name() string { |
||||
return StageTypeStaticLabels |
||||
} |
@ -0,0 +1,58 @@ |
||||
package stages |
||||
|
||||
import ( |
||||
"testing" |
||||
"time" |
||||
|
||||
util_log "github.com/cortexproject/cortex/pkg/util/log" |
||||
"github.com/prometheus/common/model" |
||||
"github.com/stretchr/testify/assert" |
||||
) |
||||
|
||||
func Test_staticLabelStage_Process(t *testing.T) { |
||||
staticVal := "val" |
||||
|
||||
tests := []struct { |
||||
name string |
||||
config StaticLabelConfig |
||||
inputLabels model.LabelSet |
||||
expectedLabels model.LabelSet |
||||
}{ |
||||
{ |
||||
name: "add static label", |
||||
config: map[string]*string{ |
||||
"staticLabel": &staticVal, |
||||
}, |
||||
inputLabels: model.LabelSet{ |
||||
"testLabel": "testValue", |
||||
}, |
||||
expectedLabels: model.LabelSet{ |
||||
"testLabel": "testValue", |
||||
"staticLabel": "val", |
||||
}, |
||||
}, |
||||
{ |
||||
name: "add static label with empty value", |
||||
config: map[string]*string{ |
||||
"staticLabel": nil, |
||||
}, |
||||
inputLabels: model.LabelSet{ |
||||
"testLabel": "testValue", |
||||
}, |
||||
expectedLabels: model.LabelSet{ |
||||
"testLabel": "testValue", |
||||
}, |
||||
}, |
||||
} |
||||
|
||||
for _, test := range tests { |
||||
t.Run(test.name, func(t *testing.T) { |
||||
st, err := newStaticLabelsStage(util_log.Logger, test.config) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
out := processEntries(st, newEntry(nil, test.inputLabels, "", time.Now()))[0] |
||||
assert.Equal(t, test.expectedLabels, out.Labels) |
||||
}) |
||||
} |
||||
} |
@ -0,0 +1,39 @@ |
||||
--- |
||||
title: static_labels |
||||
--- |
||||
# `static_labels` stage |
||||
|
||||
The static_labels stage is an action stage that adds static-labels to the label set that is sent to Loki with the log entry. |
||||
|
||||
## Schema |
||||
|
||||
```yaml |
||||
static_labels: |
||||
[ <string>: [<string>] ... ] |
||||
``` |
||||
|
||||
### Examples |
||||
|
||||
For the given pipeline: |
||||
|
||||
```yaml |
||||
- json: |
||||
expressions: |
||||
stream: stream |
||||
- labels: |
||||
stream: |
||||
- static_labels: |
||||
custom_key: custom_val |
||||
``` |
||||
|
||||
Given the following log line: |
||||
|
||||
``` |
||||
{"log":"log message\n","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z"} |
||||
``` |
||||
|
||||
The first stage would extract `stream` into the extracted map with a value of |
||||
`stderr`. The `labels` stage would turn that key-value pair into a label. The |
||||
`static_labels` stage would add the provided static labels into the label set. |
||||
|
||||
The resulting entry that is sent to Loki will contain `stream="stderr"` and `custom_key="custom_val"` as labels. |
Loading…
Reference in new issue