fluent-bit: Attempt to unmarshal nested json. (#5223)

* fluent-bit: Attempt to unmarshal nested json.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Also support nested json array.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
pull/5262/head
Cyril Tovena 4 years ago committed by GitHub
parent 91d837e79c
commit cde9a711f3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      clients/cmd/fluent-bit/loki.go
  2. 24
      clients/cmd/fluent-bit/loki_test.go

@ -2,6 +2,7 @@ package main
import ( import (
"bytes" "bytes"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"os" "os"
@ -78,7 +79,7 @@ func (l *loki) sendRecord(r map[interface{}]interface{}, ts time.Time) error {
return nil return nil
} }
} }
line, err := createLine(records, l.cfg.lineFormat) line, err := l.createLine(records, l.cfg.lineFormat)
if err != nil { if err != nil {
return fmt.Errorf("error creating line: %v", err) return fmt.Errorf("error creating line: %v", err)
} }
@ -220,9 +221,21 @@ func removeKeys(records map[string]interface{}, keys []string) {
} }
} }
func createLine(records map[string]interface{}, f format) (string, error) { func (l *loki) createLine(records map[string]interface{}, f format) (string, error) {
switch f { switch f {
case jsonFormat: case jsonFormat:
for k, v := range records {
if s, ok := v.(string); ok && (strings.Contains(s, "{") || strings.Contains(s, "[")) {
var data interface{}
err := json.Unmarshal([]byte(s), &data)
if err != nil {
// keep this debug as it can be very verbose
level.Debug(l.logger).Log("msg", "error unmarshalling json", "err", err)
continue
}
records[k] = data
}
}
js, err := jsoniter.ConfigCompatibleWithStandardLibrary.Marshal(records) js, err := jsoniter.ConfigCompatibleWithStandardLibrary.Marshal(records)
if err != nil { if err != nil {
return "", err return "", err

@ -18,12 +18,12 @@ import (
var now = time.Now() var now = time.Now()
func Test_loki_sendRecord(t *testing.T) { func Test_loki_sendRecord(t *testing.T) {
var simpleRecordFixture = map[interface{}]interface{}{ simpleRecordFixture := map[interface{}]interface{}{
"foo": "bar", "foo": "bar",
"bar": 500, "bar": 500,
"error": make(chan struct{}), "error": make(chan struct{}),
} }
var mapRecordFixture = map[interface{}]interface{}{ mapRecordFixture := map[interface{}]interface{}{
// lots of key/value pairs in map to increase chances of test hitting in case of unsorted map marshalling // lots of key/value pairs in map to increase chances of test hitting in case of unsorted map marshalling
"A": "A", "A": "A",
"B": "B", "B": "B",
@ -34,14 +34,14 @@ func Test_loki_sendRecord(t *testing.T) {
"G": "G", "G": "G",
"H": "H", "H": "H",
} }
var byteArrayRecordFixture = map[interface{}]interface{}{ byteArrayRecordFixture := map[interface{}]interface{}{
"label": "label", "label": "label",
"outer": []byte("foo"), "outer": []byte("foo"),
"map": map[interface{}]interface{}{ "map": map[interface{}]interface{}{
"inner": []byte("bar"), "inner": []byte("bar"),
}, },
} }
var mixedTypesRecordFixture = map[interface{}]interface{}{ mixedTypesRecordFixture := map[interface{}]interface{}{
"label": "label", "label": "label",
"int": 42, "int": 42,
"float": 42.42, "float": 42.42,
@ -53,7 +53,7 @@ func Test_loki_sendRecord(t *testing.T) {
}, },
}, },
} }
var nestedJSONFixture = map[interface{}]interface{}{ nestedJSONFixture := map[interface{}]interface{}{
"kubernetes": map[interface{}]interface{}{ "kubernetes": map[interface{}]interface{}{
"annotations": map[interface{}]interface{}{ "annotations": map[interface{}]interface{}{
"kubernetes.io/psp": "test", "kubernetes.io/psp": "test",
@ -124,10 +124,15 @@ func Test_createLine(t *testing.T) {
{"kv with map", map[string]interface{}{"foo": "bar", "map": map[string]interface{}{"foo": "bar", "bar ": "foo "}}, kvPairFormat, `foo=bar map="map[bar :foo foo:bar]"`, false}, {"kv with map", map[string]interface{}{"foo": "bar", "map": map[string]interface{}{"foo": "bar", "bar ": "foo "}}, kvPairFormat, `foo=bar map="map[bar :foo foo:bar]"`, false},
{"kv empty", map[string]interface{}{}, kvPairFormat, ``, false}, {"kv empty", map[string]interface{}{}, kvPairFormat, ``, false},
{"bad format", nil, format(3), "", true}, {"bad format", nil, format(3), "", true},
{"nested json", map[string]interface{}{"log": `{"level":"error"}`}, jsonFormat, `{"log":{"level":"error"}}`, false},
{"nested json", map[string]interface{}{"log": `["level","error"]`}, jsonFormat, `{"log":["level","error"]}`, false},
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
got, err := createLine(tt.records, tt.f) l := &loki{
logger: logger,
}
got, err := l.createLine(tt.records, tt.f)
if (err != nil) != tt.wantErr { if (err != nil) != tt.wantErr {
t.Errorf("createLine() error = %v, wantErr %v", err, tt.wantErr) t.Errorf("createLine() error = %v, wantErr %v", err, tt.wantErr)
return return
@ -217,8 +222,11 @@ func Test_toStringMap(t *testing.T) {
}{ }{
{"already string", map[interface{}]interface{}{"string": "foo", "bar": []byte("buzz")}, map[string]interface{}{"string": "foo", "bar": "buzz"}}, {"already string", map[interface{}]interface{}{"string": "foo", "bar": []byte("buzz")}, map[string]interface{}{"string": "foo", "bar": "buzz"}},
{"skip non string", map[interface{}]interface{}{"string": "foo", 1.0: []byte("buzz")}, map[string]interface{}{"string": "foo"}}, {"skip non string", map[interface{}]interface{}{"string": "foo", 1.0: []byte("buzz")}, map[string]interface{}{"string": "foo"}},
{"byteslice in array", map[interface{}]interface{}{"string": "foo", "bar": []interface{}{map[interface{}]interface{}{"baz": []byte("quux")}}}, {
map[string]interface{}{"string": "foo", "bar": []interface{}{map[string]interface{}{"baz": "quux"}}}}, "byteslice in array",
map[interface{}]interface{}{"string": "foo", "bar": []interface{}{map[interface{}]interface{}{"baz": []byte("quux")}}},
map[string]interface{}{"string": "foo", "bar": []interface{}{map[string]interface{}{"baz": "quux"}}},
},
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {

Loading…
Cancel
Save