From 416e01ca8e8f6e606135ea7b18f9db3393345ede Mon Sep 17 00:00:00 2001 From: Vladyslav Diachenko <82767850+vlad-diachenko@users.noreply.github.com> Date: Fri, 21 Jul 2023 15:33:33 +0300 Subject: [PATCH] [promtail] non_indexed_labels stage (#9986) **What this PR does / why we need it**: added non_indexed_labels stage to copy values from extracted data tonon-indexed-labels field of the Entry. **Special notes for your reviewer**: **Checklist** - [ ] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [ ] Tests updated - [ ] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) --------- Signed-off-by: Vladyslav Diachenko --- clients/pkg/logentry/stages/labels.go | 16 +- .../pkg/logentry/stages/nonindexedlabels.go | 43 +++ .../logentry/stages/nonindexedlabels_test.go | 113 +++++++ clients/pkg/logentry/stages/stage.go | 287 +++++++++--------- 4 files changed, 305 insertions(+), 154 deletions(-) create mode 100644 clients/pkg/logentry/stages/nonindexedlabels.go create mode 100644 clients/pkg/logentry/stages/nonindexedlabels_test.go diff --git a/clients/pkg/logentry/stages/labels.go b/clients/pkg/logentry/stages/labels.go index 7d33061f0e..3a3486977d 100644 --- a/clients/pkg/logentry/stages/labels.go +++ b/clients/pkg/logentry/stages/labels.go @@ -63,23 +63,31 @@ type labelStage struct { // Process implements Stage func (l *labelStage) Process(labels model.LabelSet, extracted map[string]interface{}, _ *time.Time, _ *string) { - for lName, lSrc := range l.cfgs { + processLabelsConfigs(l.logger, extracted, l.cfgs, func(labelName model.LabelName, labelValue model.LabelValue) { + labels[labelName] = labelValue + }) +} + +type labelsConsumer func(labelName model.LabelName, labelValue model.LabelValue) + +func processLabelsConfigs(logger log.Logger, extracted map[string]interface{}, configs LabelsConfig, consumer labelsConsumer) { + for lName, lSrc := range configs { if lValue, ok := extracted[*lSrc]; ok { s, err := getString(lValue) if err != nil { if Debug { - level.Debug(l.logger).Log("msg", "failed to convert extracted label value to string", "err", err, "type", reflect.TypeOf(lValue)) + level.Debug(logger).Log("msg", "failed to convert extracted label value to string", "err", err, "type", reflect.TypeOf(lValue)) } continue } labelValue := model.LabelValue(s) if !labelValue.IsValid() { if Debug { - level.Debug(l.logger).Log("msg", "invalid label value parsed", "value", labelValue) + level.Debug(logger).Log("msg", "invalid label value parsed", "value", labelValue) } continue } - labels[model.LabelName(lName)] = labelValue + consumer(model.LabelName(lName), labelValue) } } } diff --git a/clients/pkg/logentry/stages/nonindexedlabels.go b/clients/pkg/logentry/stages/nonindexedlabels.go new file mode 100644 index 0000000000..06302309b6 --- /dev/null +++ b/clients/pkg/logentry/stages/nonindexedlabels.go @@ -0,0 +1,43 @@ +package stages + +import ( + "github.com/go-kit/log" + "github.com/mitchellh/mapstructure" + "github.com/prometheus/common/model" + + "github.com/grafana/loki/pkg/logproto" +) + +func newNonIndexedLabelsStage(params StageCreationParams) (Stage, error) { + cfgs := &LabelsConfig{} + err := mapstructure.Decode(params.config, cfgs) + if err != nil { + return nil, err + } + err = validateLabelsConfig(*cfgs) + if err != nil { + return nil, err + } + return &nonIndexedLabelsStage{ + cfgs: *cfgs, + logger: params.logger, + }, nil +} + +type nonIndexedLabelsStage struct { + cfgs LabelsConfig + logger log.Logger +} + +func (s *nonIndexedLabelsStage) Name() string { + return StageTypeNonIndexedLabels +} + +func (s *nonIndexedLabelsStage) Run(in chan Entry) chan Entry { + return RunWith(in, func(e Entry) Entry { + processLabelsConfigs(s.logger, e.Extracted, s.cfgs, func(labelName model.LabelName, labelValue model.LabelValue) { + e.NonIndexedLabels = append(e.NonIndexedLabels, logproto.LabelAdapter{Name: string(labelName), Value: string(labelValue)}) + }) + return e + }) +} diff --git a/clients/pkg/logentry/stages/nonindexedlabels_test.go b/clients/pkg/logentry/stages/nonindexedlabels_test.go new file mode 100644 index 0000000000..4889210fc0 --- /dev/null +++ b/clients/pkg/logentry/stages/nonindexedlabels_test.go @@ -0,0 +1,113 @@ +package stages + +import ( + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/push" + util_log "github.com/grafana/loki/pkg/util/log" +) + +var pipelineStagesNonIndexedLabelsFromLogfmt = ` +pipeline_stages: +- logfmt: + mapping: + app: +- non_indexed_labels: + app: +` + +var pipelineStagesNonIndexedLabelsFromJSON = ` +pipeline_stages: +- json: + expressions: + app: +- non_indexed_labels: + app: +` + +var pipelineStagesNonIndexedLabelsWithRegexParser = ` +pipeline_stages: +- regex: + expression: "^(?s)(?P