diff --git a/CHANGELOG.md b/CHANGELOG.md index ad7fa3f7db..cdc3757ea6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## Main +* [6099](https://github.com/grafana/loki/pull/6099/files) **cstyan**: Drop lines with malformed JSON in Promtail JSON pipeline stage * [6136](https://github.com/grafana/loki/pull/6136) **periklis**: Add support for alertmanager header authorization * [6102](https://github.com/grafana/loki/pull/6102) **timchenko-a**: Add multi-tenancy support to lambda-promtail * [5971](https://github.com/grafana/loki/pull/5971) **kavirajk**: Record statistics about metadata queries such as labels and series queries in `metrics.go` as well diff --git a/clients/pkg/logentry/stages/json.go b/clients/pkg/logentry/stages/json.go index 2d552e99d1..36f8f66c03 100644 --- a/clients/pkg/logentry/stages/json.go +++ b/clients/pkg/logentry/stages/json.go @@ -3,7 +3,6 @@ package stages import ( "fmt" "reflect" - "time" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -11,7 +10,6 @@ import ( json "github.com/json-iterator/go" "github.com/mitchellh/mapstructure" "github.com/pkg/errors" - "github.com/prometheus/common/model" ) // Config Errors @@ -20,12 +18,14 @@ const ( ErrCouldNotCompileJMES = "could not compile JMES expression" ErrEmptyJSONStageConfig = "empty json stage configuration" ErrEmptyJSONStageSource = "empty source" + ErrMalformedJSON = "malformed json" ) // JSONConfig represents a JSON Stage configuration type JSONConfig struct { - Expressions map[string]string `mapstructure:"expressions"` - Source *string `mapstructure:"source"` + Expressions map[string]string `mapstructure:"expressions"` + Source *string `mapstructure:"source"` + DropMalformed bool `mapstructure:"drop_malformed"` } // validateJSONConfig validates a json config and returns a map of necessary jmespath expressions. @@ -76,11 +76,11 @@ func newJSONStage(logger log.Logger, config interface{}) (Stage, error) { if err != nil { return nil, err } - return toStage(&jsonStage{ + return &jsonStage{ cfg: cfg, expressions: expressions, logger: log.With(logger, "component", "stage", "type", "json"), - }), nil + }, nil } func parseJSONConfig(config interface{}) (*JSONConfig, error) { @@ -92,8 +92,22 @@ func parseJSONConfig(config interface{}) (*JSONConfig, error) { return cfg, nil } -// Process implements Stage -func (j *jsonStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { +func (j *jsonStage) Run(in chan Entry) chan Entry { + out := make(chan Entry) + go func() { + defer close(out) + for e := range in { + err := j.processEntry(e.Extracted, &e.Line) + if err != nil && j.cfg.DropMalformed { + continue + } + out <- e + } + }() + return out +} + +func (j *jsonStage) processEntry(extracted map[string]interface{}, entry *string) error { // If a source key is provided, the json stage should process it // from the extracted map, otherwise should fallback to the entry input := entry @@ -103,7 +117,7 @@ func (j *jsonStage) Process(labels model.LabelSet, extracted map[string]interfac if Debug { level.Debug(j.logger).Log("msg", "source does not exist in the set of extracted values", "source", *j.cfg.Source) } - return + return nil } value, err := getString(extracted[*j.cfg.Source]) @@ -111,7 +125,7 @@ func (j *jsonStage) Process(labels model.LabelSet, extracted map[string]interfac if Debug { level.Debug(j.logger).Log("msg", "failed to convert source value to string", "source", *j.cfg.Source, "err", err, "type", reflect.TypeOf(extracted[*j.cfg.Source])) } - return + return nil } input = &value @@ -121,7 +135,7 @@ func (j *jsonStage) Process(labels model.LabelSet, extracted map[string]interfac if Debug { level.Debug(j.logger).Log("msg", "cannot parse a nil entry") } - return + return nil } var data map[string]interface{} @@ -130,7 +144,7 @@ func (j *jsonStage) Process(labels model.LabelSet, extracted map[string]interfac if Debug { level.Debug(j.logger).Log("msg", "failed to unmarshal log line", "err", err) } - return + return errors.New(ErrMalformedJSON) } for n, e := range j.expressions { @@ -167,6 +181,7 @@ func (j *jsonStage) Process(labels model.LabelSet, extracted map[string]interfac if Debug { level.Debug(j.logger).Log("msg", "extracted data debug in json stage", "extracted data", fmt.Sprintf("%v", extracted)) } + return nil } // Name implements Stage diff --git a/clients/pkg/logentry/stages/json_test.go b/clients/pkg/logentry/stages/json_test.go index fe941b3f1a..31a0c0219e 100644 --- a/clients/pkg/logentry/stages/json_test.go +++ b/clients/pkg/logentry/stages/json_test.go @@ -84,9 +84,7 @@ func TestPipeline_JSON(t *testing.T) { t.Parallel() pl, err := NewPipeline(util_log.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer) - if err != nil { - t.Fatal(err) - } + assert.NoError(t, err, "Expected pipeline creation to not result in error") out := processEntries(pl, newEntry(nil, nil, testData.entry, time.Now()))[0] assert.Equal(t, testData.expectedExtract, out.Extracted) }) @@ -104,26 +102,19 @@ func TestYamlMapStructure(t *testing.T) { // testing that we can use yaml data into mapstructure. var mapstruct map[interface{}]interface{} - if err := yaml.Unmarshal([]byte(cfg), &mapstruct); err != nil { - t.Fatalf("error while un-marshalling config: %s", err) - } + err := yaml.Unmarshal([]byte(cfg), &mapstruct) + assert.NoError(t, err, "error while un-marshalling config: %s", err) p, ok := mapstruct["json"].(map[interface{}]interface{}) - if !ok { - t.Fatalf("could not read parser %+v", mapstruct["json"]) - } + assert.True(t, ok, "could not read parser %+v", mapstruct["json"]) got, err := parseJSONConfig(p) - if err != nil { - t.Fatalf("could not create parser from yaml: %s", err) - } + assert.NoError(t, err, "could not create parser from yaml: %s", err) want := &JSONConfig{ Expressions: map[string]string{ "key1": "expression1", "key2": "expression2.expression2", }, } - if !reflect.DeepEqual(got, want) { - t.Fatalf("want: %+v got: %+v", want, got) - } + assert.True(t, reflect.DeepEqual(got, want), "want: %+v got: %+v", want, got) } func TestJSONConfig_validate(t *testing.T) { @@ -191,17 +182,13 @@ func TestJSONConfig_validate(t *testing.T) { tt := tt t.Run(tName, func(t *testing.T) { c, err := parseJSONConfig(tt.config) - if err != nil { - t.Fatalf("failed to create config: %s", err) - } + assert.NoError(t, err, "failed to create config: %s", err) got, err := validateJSONConfig(c) - if (err != nil) != (tt.err != nil) { - t.Errorf("JSONConfig.validate() expected error = %v, actual error = %v", tt.err, err) - return + if tt.err != nil { + assert.NotNil(t, err, "JSONConfig.validate() expected error = %v, but got nil", tt.err) } - if (err != nil) && (err.Error() != tt.err.Error()) { - t.Errorf("JSONConfig.validate() expected error = %v, actual error = %v", tt.err, err) - return + if err != nil { + assert.Equal(t, tt.err.Error(), err.Error(), "JSONConfig.validate() expected error = %v, actual error = %v", tt.err, err) } assert.Equal(t, tt.wantExprCount, len(got)) }) @@ -356,12 +343,30 @@ func TestJSONParser_Parse(t *testing.T) { t.Run(tName, func(t *testing.T) { t.Parallel() p, err := New(util_log.Logger, nil, StageTypeJSON, tt.config, nil) - if err != nil { - t.Fatalf("failed to create json parser: %s", err) - } + assert.NoError(t, err, "failed to create json parser: %s", err) out := processEntries(p, newEntry(tt.extracted, nil, tt.entry, time.Now()))[0] assert.Equal(t, tt.expectedExtract, out.Extracted) }) } } + +func TestValidateJSONDrop(t *testing.T) { + labels := map[string]string{"foo": "bar"} + matchConfig := JSONConfig{ + DropMalformed: true, + Expressions: map[string]string{"page": "page"}, + } + s, err := newJSONStage(util_log.Logger, matchConfig) + assert.NoError(t, err, "withMatcher() error = %v", err) + assert.NotNil(t, s, "newJSONStage failed to create the pipeline stage and was nil") + out := processEntries(s, newEntry(map[string]interface{}{ + "test_label": "unimportant value", + }, toLabelSet(labels), `{"page": 1, "fruits": ["apple", "peach"]}`, time.Now())) + assert.Equal(t, 1, len(out), "stage should have kept one valid json line but got %v", out) + + out = processEntries(s, newEntry(map[string]interface{}{ + "test_label": "unimportant value", + }, toLabelSet(labels), `{"page": 1, fruits": ["apple", "peach"]}`, time.Now())) + assert.Equal(t, 0, len(out), "stage should have kept zero valid json line but got %v", out) +} diff --git a/docs/sources/clients/promtail/stages/json.md b/docs/sources/clients/promtail/stages/json.md index 8b6673305e..5b82fcac3c 100644 --- a/docs/sources/clients/promtail/stages/json.md +++ b/docs/sources/clients/promtail/stages/json.md @@ -22,6 +22,10 @@ json: # Name from extracted data to parse. If empty, uses the log message. [source: ] + + # When true, then any lines that cannot be successfully parsed as valid JSON objects + # will be dropped instead of being sent to Loki. + [drop_malformed: | default = false] ``` This stage uses the Go JSON unmarshaler, which means non-string types like