diff --git a/clients/pkg/logentry/stages/labels.go b/clients/pkg/logentry/stages/labels.go index 7d33061f0e..3a3486977d 100644 --- a/clients/pkg/logentry/stages/labels.go +++ b/clients/pkg/logentry/stages/labels.go @@ -63,23 +63,31 @@ type labelStage struct { // Process implements Stage func (l *labelStage) Process(labels model.LabelSet, extracted map[string]interface{}, _ *time.Time, _ *string) { - for lName, lSrc := range l.cfgs { + processLabelsConfigs(l.logger, extracted, l.cfgs, func(labelName model.LabelName, labelValue model.LabelValue) { + labels[labelName] = labelValue + }) +} + +type labelsConsumer func(labelName model.LabelName, labelValue model.LabelValue) + +func processLabelsConfigs(logger log.Logger, extracted map[string]interface{}, configs LabelsConfig, consumer labelsConsumer) { + for lName, lSrc := range configs { if lValue, ok := extracted[*lSrc]; ok { s, err := getString(lValue) if err != nil { if Debug { - level.Debug(l.logger).Log("msg", "failed to convert extracted label value to string", "err", err, "type", reflect.TypeOf(lValue)) + level.Debug(logger).Log("msg", "failed to convert extracted label value to string", "err", err, "type", reflect.TypeOf(lValue)) } continue } labelValue := model.LabelValue(s) if !labelValue.IsValid() { if Debug { - level.Debug(l.logger).Log("msg", "invalid label value parsed", "value", labelValue) + level.Debug(logger).Log("msg", "invalid label value parsed", "value", labelValue) } continue } - labels[model.LabelName(lName)] = labelValue + consumer(model.LabelName(lName), labelValue) } } } diff --git a/clients/pkg/logentry/stages/nonindexedlabels.go b/clients/pkg/logentry/stages/nonindexedlabels.go new file mode 100644 index 0000000000..06302309b6 --- /dev/null +++ b/clients/pkg/logentry/stages/nonindexedlabels.go @@ -0,0 +1,43 @@ +package stages + +import ( + "github.com/go-kit/log" + "github.com/mitchellh/mapstructure" + "github.com/prometheus/common/model" + + "github.com/grafana/loki/pkg/logproto" +) + +func newNonIndexedLabelsStage(params StageCreationParams) (Stage, error) { + cfgs := &LabelsConfig{} + err := mapstructure.Decode(params.config, cfgs) + if err != nil { + return nil, err + } + err = validateLabelsConfig(*cfgs) + if err != nil { + return nil, err + } + return &nonIndexedLabelsStage{ + cfgs: *cfgs, + logger: params.logger, + }, nil +} + +type nonIndexedLabelsStage struct { + cfgs LabelsConfig + logger log.Logger +} + +func (s *nonIndexedLabelsStage) Name() string { + return StageTypeNonIndexedLabels +} + +func (s *nonIndexedLabelsStage) Run(in chan Entry) chan Entry { + return RunWith(in, func(e Entry) Entry { + processLabelsConfigs(s.logger, e.Extracted, s.cfgs, func(labelName model.LabelName, labelValue model.LabelValue) { + e.NonIndexedLabels = append(e.NonIndexedLabels, logproto.LabelAdapter{Name: string(labelName), Value: string(labelValue)}) + }) + return e + }) +} diff --git a/clients/pkg/logentry/stages/nonindexedlabels_test.go b/clients/pkg/logentry/stages/nonindexedlabels_test.go new file mode 100644 index 0000000000..4889210fc0 --- /dev/null +++ b/clients/pkg/logentry/stages/nonindexedlabels_test.go @@ -0,0 +1,113 @@ +package stages + +import ( + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/push" + util_log "github.com/grafana/loki/pkg/util/log" +) + +var pipelineStagesNonIndexedLabelsFromLogfmt = ` +pipeline_stages: +- logfmt: + mapping: + app: +- non_indexed_labels: + app: +` + +var pipelineStagesNonIndexedLabelsFromJSON = ` +pipeline_stages: +- json: + expressions: + app: +- non_indexed_labels: + app: +` + +var pipelineStagesNonIndexedLabelsWithRegexParser = ` +pipeline_stages: +- regex: + expression: "^(?s)(?P