[Promtail] drop lines with malformed json (#6099)

* Refactor json pipeline stage to have a Run function

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Allow for dropping log lines in json pipeline stage if they are not
valid json strings

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* fix linting

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Clean up new malformed json test.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Address some review feedback.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Refactor json pipeline stage test to use testify assert instead of
t.Errorf

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Update changelog and docs with new drop malformed option

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Update CHANGELOG.md

Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com>

* Update docs based on Karen's comments.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com>
pull/6242/head
Callum Styan 3 years ago committed by GitHub
parent 3d2282745b
commit b6ac00e1a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 39
      clients/pkg/logentry/stages/json.go
  3. 59
      clients/pkg/logentry/stages/json_test.go
  4. 4
      docs/sources/clients/promtail/stages/json.md

@ -1,5 +1,6 @@
## Main
* [6099](https://github.com/grafana/loki/pull/6099/files) **cstyan**: Drop lines with malformed JSON in Promtail JSON pipeline stage
* [6136](https://github.com/grafana/loki/pull/6136) **periklis**: Add support for alertmanager header authorization
* [6102](https://github.com/grafana/loki/pull/6102) **timchenko-a**: Add multi-tenancy support to lambda-promtail
* [5971](https://github.com/grafana/loki/pull/5971) **kavirajk**: Record statistics about metadata queries such as labels and series queries in `metrics.go` as well

@ -3,7 +3,6 @@ package stages
import (
"fmt"
"reflect"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
@ -11,7 +10,6 @@ import (
json "github.com/json-iterator/go"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
)
// Config Errors
@ -20,12 +18,14 @@ const (
ErrCouldNotCompileJMES = "could not compile JMES expression"
ErrEmptyJSONStageConfig = "empty json stage configuration"
ErrEmptyJSONStageSource = "empty source"
ErrMalformedJSON = "malformed json"
)
// JSONConfig represents a JSON Stage configuration
type JSONConfig struct {
Expressions map[string]string `mapstructure:"expressions"`
Source *string `mapstructure:"source"`
Expressions map[string]string `mapstructure:"expressions"`
Source *string `mapstructure:"source"`
DropMalformed bool `mapstructure:"drop_malformed"`
}
// validateJSONConfig validates a json config and returns a map of necessary jmespath expressions.
@ -76,11 +76,11 @@ func newJSONStage(logger log.Logger, config interface{}) (Stage, error) {
if err != nil {
return nil, err
}
return toStage(&jsonStage{
return &jsonStage{
cfg: cfg,
expressions: expressions,
logger: log.With(logger, "component", "stage", "type", "json"),
}), nil
}, nil
}
func parseJSONConfig(config interface{}) (*JSONConfig, error) {
@ -92,8 +92,22 @@ func parseJSONConfig(config interface{}) (*JSONConfig, error) {
return cfg, nil
}
// Process implements Stage
func (j *jsonStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
func (j *jsonStage) Run(in chan Entry) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)
for e := range in {
err := j.processEntry(e.Extracted, &e.Line)
if err != nil && j.cfg.DropMalformed {
continue
}
out <- e
}
}()
return out
}
func (j *jsonStage) processEntry(extracted map[string]interface{}, entry *string) error {
// If a source key is provided, the json stage should process it
// from the extracted map, otherwise should fallback to the entry
input := entry
@ -103,7 +117,7 @@ func (j *jsonStage) Process(labels model.LabelSet, extracted map[string]interfac
if Debug {
level.Debug(j.logger).Log("msg", "source does not exist in the set of extracted values", "source", *j.cfg.Source)
}
return
return nil
}
value, err := getString(extracted[*j.cfg.Source])
@ -111,7 +125,7 @@ func (j *jsonStage) Process(labels model.LabelSet, extracted map[string]interfac
if Debug {
level.Debug(j.logger).Log("msg", "failed to convert source value to string", "source", *j.cfg.Source, "err", err, "type", reflect.TypeOf(extracted[*j.cfg.Source]))
}
return
return nil
}
input = &value
@ -121,7 +135,7 @@ func (j *jsonStage) Process(labels model.LabelSet, extracted map[string]interfac
if Debug {
level.Debug(j.logger).Log("msg", "cannot parse a nil entry")
}
return
return nil
}
var data map[string]interface{}
@ -130,7 +144,7 @@ func (j *jsonStage) Process(labels model.LabelSet, extracted map[string]interfac
if Debug {
level.Debug(j.logger).Log("msg", "failed to unmarshal log line", "err", err)
}
return
return errors.New(ErrMalformedJSON)
}
for n, e := range j.expressions {
@ -167,6 +181,7 @@ func (j *jsonStage) Process(labels model.LabelSet, extracted map[string]interfac
if Debug {
level.Debug(j.logger).Log("msg", "extracted data debug in json stage", "extracted data", fmt.Sprintf("%v", extracted))
}
return nil
}
// Name implements Stage

@ -84,9 +84,7 @@ func TestPipeline_JSON(t *testing.T) {
t.Parallel()
pl, err := NewPipeline(util_log.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer)
if err != nil {
t.Fatal(err)
}
assert.NoError(t, err, "Expected pipeline creation to not result in error")
out := processEntries(pl, newEntry(nil, nil, testData.entry, time.Now()))[0]
assert.Equal(t, testData.expectedExtract, out.Extracted)
})
@ -104,26 +102,19 @@ func TestYamlMapStructure(t *testing.T) {
// testing that we can use yaml data into mapstructure.
var mapstruct map[interface{}]interface{}
if err := yaml.Unmarshal([]byte(cfg), &mapstruct); err != nil {
t.Fatalf("error while un-marshalling config: %s", err)
}
err := yaml.Unmarshal([]byte(cfg), &mapstruct)
assert.NoError(t, err, "error while un-marshalling config: %s", err)
p, ok := mapstruct["json"].(map[interface{}]interface{})
if !ok {
t.Fatalf("could not read parser %+v", mapstruct["json"])
}
assert.True(t, ok, "could not read parser %+v", mapstruct["json"])
got, err := parseJSONConfig(p)
if err != nil {
t.Fatalf("could not create parser from yaml: %s", err)
}
assert.NoError(t, err, "could not create parser from yaml: %s", err)
want := &JSONConfig{
Expressions: map[string]string{
"key1": "expression1",
"key2": "expression2.expression2",
},
}
if !reflect.DeepEqual(got, want) {
t.Fatalf("want: %+v got: %+v", want, got)
}
assert.True(t, reflect.DeepEqual(got, want), "want: %+v got: %+v", want, got)
}
func TestJSONConfig_validate(t *testing.T) {
@ -191,17 +182,13 @@ func TestJSONConfig_validate(t *testing.T) {
tt := tt
t.Run(tName, func(t *testing.T) {
c, err := parseJSONConfig(tt.config)
if err != nil {
t.Fatalf("failed to create config: %s", err)
}
assert.NoError(t, err, "failed to create config: %s", err)
got, err := validateJSONConfig(c)
if (err != nil) != (tt.err != nil) {
t.Errorf("JSONConfig.validate() expected error = %v, actual error = %v", tt.err, err)
return
if tt.err != nil {
assert.NotNil(t, err, "JSONConfig.validate() expected error = %v, but got nil", tt.err)
}
if (err != nil) && (err.Error() != tt.err.Error()) {
t.Errorf("JSONConfig.validate() expected error = %v, actual error = %v", tt.err, err)
return
if err != nil {
assert.Equal(t, tt.err.Error(), err.Error(), "JSONConfig.validate() expected error = %v, actual error = %v", tt.err, err)
}
assert.Equal(t, tt.wantExprCount, len(got))
})
@ -356,12 +343,30 @@ func TestJSONParser_Parse(t *testing.T) {
t.Run(tName, func(t *testing.T) {
t.Parallel()
p, err := New(util_log.Logger, nil, StageTypeJSON, tt.config, nil)
if err != nil {
t.Fatalf("failed to create json parser: %s", err)
}
assert.NoError(t, err, "failed to create json parser: %s", err)
out := processEntries(p, newEntry(tt.extracted, nil, tt.entry, time.Now()))[0]
assert.Equal(t, tt.expectedExtract, out.Extracted)
})
}
}
func TestValidateJSONDrop(t *testing.T) {
labels := map[string]string{"foo": "bar"}
matchConfig := JSONConfig{
DropMalformed: true,
Expressions: map[string]string{"page": "page"},
}
s, err := newJSONStage(util_log.Logger, matchConfig)
assert.NoError(t, err, "withMatcher() error = %v", err)
assert.NotNil(t, s, "newJSONStage failed to create the pipeline stage and was nil")
out := processEntries(s, newEntry(map[string]interface{}{
"test_label": "unimportant value",
}, toLabelSet(labels), `{"page": 1, "fruits": ["apple", "peach"]}`, time.Now()))
assert.Equal(t, 1, len(out), "stage should have kept one valid json line but got %v", out)
out = processEntries(s, newEntry(map[string]interface{}{
"test_label": "unimportant value",
}, toLabelSet(labels), `{"page": 1, fruits": ["apple", "peach"]}`, time.Now()))
assert.Equal(t, 0, len(out), "stage should have kept zero valid json line but got %v", out)
}

@ -22,6 +22,10 @@ json:
# Name from extracted data to parse. If empty, uses the log message.
[source: <string>]
# When true, then any lines that cannot be successfully parsed as valid JSON objects
# will be dropped instead of being sent to Loki.
[drop_malformed: <bool> | default = false]
```
This stage uses the Go JSON unmarshaler, which means non-string types like

Loading…
Cancel
Save