mirror of https://github.com/grafana/loki
[promtail] non_indexed_labels stage (#9986)
**What this PR does / why we need it**:
added non_indexed_labels stage to copy values from extracted data
tonon-indexed-labels field of the Entry.
**Special notes for your reviewer**:
**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [ ] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/upgrading/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e3ec)
---------
Signed-off-by: Vladyslav Diachenko <vlad.diachenko@grafana.com>
pull/10015/head
parent
7e248e1e90
commit
416e01ca8e
@ -0,0 +1,43 @@ |
||||
package stages |
||||
|
||||
import ( |
||||
"github.com/go-kit/log" |
||||
"github.com/mitchellh/mapstructure" |
||||
"github.com/prometheus/common/model" |
||||
|
||||
"github.com/grafana/loki/pkg/logproto" |
||||
) |
||||
|
||||
func newNonIndexedLabelsStage(params StageCreationParams) (Stage, error) { |
||||
cfgs := &LabelsConfig{} |
||||
err := mapstructure.Decode(params.config, cfgs) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
err = validateLabelsConfig(*cfgs) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return &nonIndexedLabelsStage{ |
||||
cfgs: *cfgs, |
||||
logger: params.logger, |
||||
}, nil |
||||
} |
||||
|
||||
type nonIndexedLabelsStage struct { |
||||
cfgs LabelsConfig |
||||
logger log.Logger |
||||
} |
||||
|
||||
func (s *nonIndexedLabelsStage) Name() string { |
||||
return StageTypeNonIndexedLabels |
||||
} |
||||
|
||||
func (s *nonIndexedLabelsStage) Run(in chan Entry) chan Entry { |
||||
return RunWith(in, func(e Entry) Entry { |
||||
processLabelsConfigs(s.logger, e.Extracted, s.cfgs, func(labelName model.LabelName, labelValue model.LabelValue) { |
||||
e.NonIndexedLabels = append(e.NonIndexedLabels, logproto.LabelAdapter{Name: string(labelName), Value: string(labelValue)}) |
||||
}) |
||||
return e |
||||
}) |
||||
} |
||||
@ -0,0 +1,113 @@ |
||||
package stages |
||||
|
||||
import ( |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"github.com/prometheus/common/model" |
||||
"github.com/stretchr/testify/require" |
||||
|
||||
"github.com/grafana/loki/pkg/push" |
||||
util_log "github.com/grafana/loki/pkg/util/log" |
||||
) |
||||
|
||||
var pipelineStagesNonIndexedLabelsFromLogfmt = ` |
||||
pipeline_stages: |
||||
- logfmt: |
||||
mapping: |
||||
app: |
||||
- non_indexed_labels: |
||||
app: |
||||
` |
||||
|
||||
var pipelineStagesNonIndexedLabelsFromJSON = ` |
||||
pipeline_stages: |
||||
- json: |
||||
expressions: |
||||
app: |
||||
- non_indexed_labels: |
||||
app: |
||||
` |
||||
|
||||
var pipelineStagesNonIndexedLabelsWithRegexParser = ` |
||||
pipeline_stages: |
||||
- regex: |
||||
expression: "^(?s)(?P<time>\\S+?) (?P<stream>stdout|stderr) (?P<flags>\\S+?) (?P<content>.*)$" |
||||
- non_indexed_labels: |
||||
stream: |
||||
` |
||||
|
||||
var pipelineStagesNonIndexedLabelsFromJSONWithTemplate = ` |
||||
pipeline_stages: |
||||
- json: |
||||
expressions: |
||||
app: |
||||
- template: |
||||
source: app |
||||
template: '{{ ToUpper .Value }}' |
||||
- non_indexed_labels: |
||||
app: |
||||
` |
||||
|
||||
var pipelineStagesNonIndexedAndRegularLabelsFromJSON = ` |
||||
pipeline_stages: |
||||
- json: |
||||
expressions: |
||||
app: |
||||
component: |
||||
- non_indexed_labels: |
||||
app: |
||||
- labels: |
||||
component:
|
||||
` |
||||
|
||||
func Test_NonIndexedLabelsStage(t *testing.T) { |
||||
tests := map[string]struct { |
||||
pipelineStagesYaml string |
||||
logLine string |
||||
expectedNonIndexedLabels push.LabelsAdapter |
||||
expectedLabels model.LabelSet |
||||
}{ |
||||
"expected non-indexed labels to be extracted with logfmt parser and to be added to entry": { |
||||
pipelineStagesYaml: pipelineStagesNonIndexedLabelsFromLogfmt, |
||||
logLine: "app=loki component=ingester", |
||||
expectedNonIndexedLabels: push.LabelsAdapter{push.LabelAdapter{Name: "app", Value: "loki"}}, |
||||
}, |
||||
"expected non-indexed labels to be extracted with json parser and to be added to entry": { |
||||
pipelineStagesYaml: pipelineStagesNonIndexedLabelsFromJSON, |
||||
logLine: `{"app":"loki" ,"component":"ingester"}`, |
||||
expectedNonIndexedLabels: push.LabelsAdapter{push.LabelAdapter{Name: "app", Value: "loki"}}, |
||||
}, |
||||
"expected non-indexed labels to be extracted with regexp parser and to be added to entry": { |
||||
pipelineStagesYaml: pipelineStagesNonIndexedLabelsWithRegexParser, |
||||
logLine: `2019-01-01T01:00:00.000000001Z stderr P i'm a log message!`, |
||||
expectedNonIndexedLabels: push.LabelsAdapter{push.LabelAdapter{Name: "stream", Value: "stderr"}}, |
||||
}, |
||||
"expected non-indexed labels to be extracted with json parser and to be added to entry after rendering the template": { |
||||
pipelineStagesYaml: pipelineStagesNonIndexedLabelsFromJSONWithTemplate, |
||||
logLine: `{"app":"loki" ,"component":"ingester"}`, |
||||
expectedNonIndexedLabels: push.LabelsAdapter{push.LabelAdapter{Name: "app", Value: "LOKI"}}, |
||||
}, |
||||
"expected non-indexed and regular labels to be extracted with json parser and to be added to entry": { |
||||
pipelineStagesYaml: pipelineStagesNonIndexedAndRegularLabelsFromJSON, |
||||
logLine: `{"app":"loki" ,"component":"ingester"}`, |
||||
expectedNonIndexedLabels: push.LabelsAdapter{push.LabelAdapter{Name: "app", Value: "loki"}}, |
||||
expectedLabels: model.LabelSet{model.LabelName("component"): model.LabelValue("ingester")}, |
||||
}, |
||||
} |
||||
for name, test := range tests { |
||||
t.Run(name, func(t *testing.T) { |
||||
pl, err := NewPipeline(util_log.Logger, loadConfig(test.pipelineStagesYaml), nil, prometheus.DefaultRegisterer) |
||||
require.NoError(t, err) |
||||
|
||||
result := processEntries(pl, newEntry(nil, nil, test.logLine, time.Now()))[0] |
||||
require.Equal(t, test.expectedNonIndexedLabels, result.NonIndexedLabels) |
||||
if test.expectedLabels != nil { |
||||
require.Equal(t, test.expectedLabels, result.Labels) |
||||
} else { |
||||
require.Empty(t, result.Labels) |
||||
} |
||||
}) |
||||
} |
||||
} |
||||
Loading…
Reference in new issue