Added source support to regex and json stages

pull/757/head
Marco Pracucci 7 years ago committed by Ed
parent d43de72f3b
commit 812aecc795
  1. 68
      docs/logentry/processing-log-lines.md
  2. 2
      pkg/logentry/stages/extensions.go
  3. 25
      pkg/logentry/stages/json.go
  4. 149
      pkg/logentry/stages/json_test.go
  5. 25
      pkg/logentry/stages/regex.go
  6. 155
      pkg/logentry/stages/regex_test.go

@ -144,17 +144,19 @@ A regex stage will take the provided regex and set the named groups as data in t
```yaml
- regex:
expression: ①
source: ②
```
`expression` is **required** and needs to be a [golang RE2 regex string](https://github.com/google/re2/wiki/Syntax). Every capture group `(re)` will be set into the `extracted` map, every capture group **must be named:** `(?P<name>re)`, the name will be used as the key in the map.
`source` is optional and contains the name of key in the `extracted` map containing the data to parse. If omitted, the regex stage will parse the log `entry`.
##### Example:
##### Example (without source):
```yaml
- regex:
expression: "^(?s)(?P<time>\\S+?) (?P<stream>stdout|stderr) (?P<flags>\\S+?) (?P<content>.*)$"
```
Log line: `2019-01-01T01:00:00.000000001Z stderr P i'm a log message!`
Would create the following `extracted` map:
@ -168,10 +170,34 @@ Would create the following `extracted` map:
}
```
These map entries can then be used by other pipeline stages such as [timestamp](#timestamp) and/or [output](#output)
These map entries can then be used by other pipeline stages such as [timestamp](#timestamp) and/or [output](#output)
[Example in unit test](../../pkg/logentry/stages/regex_test.go)
##### Example (with source):
```yaml
- json:
expressions:
time:
- regex:
expression: "^(?P<year>\\d+)"
source: "time"
```
Log line: `{"time":"2019-01-01T01:00:00.000000001Z"}`
Would create the following `extracted` map:
```go
{
"time": "2019-01-01T01:00:00.000000001Z",
"year": "2019"
}
```
These map entries can then be used by other pipeline stages such as [timestamp](#timestamp) and/or [output](#output)
### json
A json stage will take the provided [JMESPath expressions](http://jmespath.org/) and set the key/value data in the `extracted` map.
@ -180,16 +206,18 @@ A json stage will take the provided [JMESPath expressions](http://jmespath.org/)
- json:
expressions: ①
key: expression ②
source: ③
```
`expressions` is a required yaml object containing key/value pairs of JMESPath expressions
`expressions` is a required yaml object containing key/value pairs of JMESPath expressions
`key: expression` where `key` will be the key in the `extracted` map, and the value will be the evaluated JMESPath expression.
`source` is optional and contains the name of key in the `extracted` map containing the json to parse. If omitted, the json stage will parse the log `entry`.
This stage uses the Go JSON unmarshaller, which means non string types like numbers or booleans will be unmarshalled into those types. The `extracted` map will accept non-string values and this stage will keep primitive types as they are unmarshalled (e.g. bool or float64). Downstream stages will need to perform correct type conversion of these values as necessary.
If the value is a complex type, for example a JSON object, it will be marshalled back to JSON before being put in the `extracted` map.
##### Example:
##### Example (without source):
```yaml
- json:
@ -212,6 +240,36 @@ Would create the following `extracted` map:
```
[Example in unit test](../../pkg/logentry/stages/json_test.go)
##### Example (with source):
```yaml
- json:
expressions:
output: log
stream: stream
timestamp: time
extra:
- json:
expressions:
user:
source: extra
```
Log line: `{"log":"log message\n","stream":"stderr","time":"2019-04-30T02:12:41.8443515Z","extra":"{\"user\":\"marco\"}"}`
Would create the following `extracted` map:
```go
{
"output": "log message\n",
"stream": "stderr",
"timestamp": "2019-04-30T02:12:41.8443515",
"extra": "{\"user\":\"marco\"}",
"user": "marco"
}
```
#### template
A template stage lets you manipulate the values in the `extracted` data map using [Go's template package](https://golang.org/pkg/text/template/). This can be useful if you want to manipulate data extracted by regex or json stages before setting label values. Maybe to replace all spaces with underscores or make everything lowercase, or append some values to the extracted data.

@ -40,7 +40,7 @@ func NewCRI(logger log.Logger, registerer prometheus.Registerer) (Stage, error)
stages := PipelineStages{
PipelineStage{
StageTypeRegex: RegexConfig{
"^(?s)(?P<time>\\S+?) (?P<stream>stdout|stderr) (?P<flags>\\S+?) (?P<content>.*)$",
Expression: "^(?s)(?P<time>\\S+?) (?P<stream>stdout|stderr) (?P<flags>\\S+?) (?P<content>.*)$",
},
},
PipelineStage{

@ -4,6 +4,7 @@ import (
"encoding/json"
//"encoding/json"
//"fmt"
"reflect"
"time"
"github.com/go-kit/kit/log"
@ -24,6 +25,7 @@ const (
// JSONConfig represents a JSON Stage configuration
type JSONConfig struct {
Expressions map[string]string `mapstructure:"expressions"`
Source string `mapstructure:"source"`
}
// validateJSONConfig validates a json config and returns a map of necessary jmespath expressions.
@ -88,14 +90,33 @@ func parseJSONConfig(config interface{}) (*JSONConfig, error) {
// Process implements Stage
func (j *jsonStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
if entry == nil {
// If a source key is provided, the json stage should process it
// from the exctracted map, otherwise should fallback to the entry
input := entry
if j.cfg.Source != "" {
if _, ok := extracted[j.cfg.Source]; !ok {
level.Debug(j.logger).Log("msg", "source does not exist in the set of extracted values", "source", j.cfg.Source)
return
}
value, err := getString(extracted[j.cfg.Source])
if err != nil {
level.Debug(j.logger).Log("msg", "failed to convert source value to string", "source", j.cfg.Source, "err", err, "type", reflect.TypeOf(extracted[j.cfg.Source]).String())
return
}
input = &value
}
if input == nil {
level.Debug(j.logger).Log("msg", "cannot parse a nil entry")
return
}
var data map[string]interface{}
if err := json.Unmarshal([]byte(*entry), &data); err != nil {
if err := json.Unmarshal([]byte(*input), &data); err != nil {
level.Debug(j.logger).Log("msg", "failed to unmarshal log line", "err", err)
return
}

@ -13,7 +13,7 @@ import (
"gopkg.in/yaml.v2"
)
var testJSONYaml = `
var testJSONYamlSingleStageWithoutSource = `
pipeline_stages:
- json:
expressions:
@ -23,6 +23,17 @@ pipeline_stages:
duration:
`
var testJSONYamlMultiStageWithSource = `
pipeline_stages:
- json:
expressions:
extra:
- json:
expressions:
user:
source: extra
`
var testJSONLogLine = `
{
"time":"2012-11-01T22:08:41+00:00",
@ -31,28 +42,51 @@ var testJSONLogLine = `
"level" : "WARN",
"nested" : {"child":"value"},
"duration" : 125,
"message" : "this is a log line"
"message" : "this is a log line",
"extra": "{\"user\":\"marco\"}"
}
`
func TestPipeline_JSON(t *testing.T) {
expected := map[string]interface{}{
"out": "this is a log line",
"app": "loki",
"nested": "{\"child\":\"value\"}",
"duration": float64(125),
tests := map[string]struct {
config string
entry string
expectedExtract map[string]interface{}
}{
"successfully run a pipeline with 1 json stage without source": {
testJSONYamlSingleStageWithoutSource,
testJSONLogLine,
map[string]interface{}{
"out": "this is a log line",
"app": "loki",
"nested": "{\"child\":\"value\"}",
"duration": float64(125),
},
},
"successfully run a pipeline with 2 json stages with source": {
testJSONYamlMultiStageWithSource,
testJSONLogLine,
map[string]interface{}{
"extra": "{\"user\":\"marco\"}",
"user": "marco",
},
},
}
pl, err := NewPipeline(util.Logger, loadConfig(testJSONYaml), nil, prometheus.DefaultRegisterer)
if err != nil {
t.Fatal(err)
for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
pl, err := NewPipeline(util.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer)
if err != nil {
t.Fatal(err)
}
lbls := model.LabelSet{}
ts := time.Now()
entry := testData.entry
extracted := map[string]interface{}{}
pl.Process(lbls, extracted, &ts, &entry)
assert.Equal(t, testData.expectedExtract, extracted)
})
}
lbls := model.LabelSet{}
ts := time.Now()
entry := testJSONLogLine
extracted := map[string]interface{}{}
pl.Process(lbls, extracted, &ts, &entry)
assert.Equal(t, expected, extracted)
}
var cfg = `json:
@ -114,13 +148,25 @@ func TestJSONConfig_validate(t *testing.T) {
0,
errors.Wrap(errors.New("SyntaxError: Unknown char: '#'"), ErrCouldNotCompileJMES),
},
"valid": {
"valid without source": {
&JSONConfig{
Expressions: map[string]string{
"expr1": "expr",
"expr2": "",
"expr3": "expr1.expr2",
},
},
3,
nil,
},
"valid with source": {
&JSONConfig{
Expressions: map[string]string{
"expr1": "expr",
"expr2": "",
"expr3": "expr1.expr2",
},
Source: "log",
},
3,
nil,
@ -171,10 +217,11 @@ func TestJSONParser_Parse(t *testing.T) {
tests := map[string]struct {
config *JSONConfig
extracted map[string]interface{}
entry string
expectedExtract map[string]interface{}
}{
"extract all": {
"successfully decode json on entry": {
&JSONConfig{
Expressions: map[string]string{
"time": "",
@ -189,6 +236,7 @@ func TestJSONParser_Parse(t *testing.T) {
"complex": "complex.log.array[1].test3",
},
},
map[string]interface{}{},
logFixture,
map[string]interface{}{
"time": "2012-11-01T22:08:41+00:00",
@ -203,15 +251,76 @@ func TestJSONParser_Parse(t *testing.T) {
"complex": "test4",
},
},
"invalid json": {
"successfully decode json on extracted[source]": {
&JSONConfig{
Expressions: map[string]string{
"time": "",
"app": "",
"component": "",
"level": "",
"float": "numeric.float",
"integer": "numeric.integer",
"string": "numeric.string",
"nested": "",
"message": "",
"complex": "complex.log.array[1].test3",
},
Source: "log",
},
map[string]interface{}{
"log": logFixture,
},
"{}",
map[string]interface{}{
"time": "2012-11-01T22:08:41+00:00",
"app": "loki",
"component": "[\"parser\",\"type\"]",
"level": "WARN",
"float": 12.34,
"integer": 123.0,
"string": "123",
"nested": "{\"child\":\"value\"}",
"message": "this is a log line",
"complex": "test4",
"log": logFixture,
},
},
"missing extracted[source]": {
&JSONConfig{
Expressions: map[string]string{
"app": "",
},
Source: "log",
},
map[string]interface{}{},
logFixture,
map[string]interface{}{},
},
"invalid json on entry": {
&JSONConfig{
Expressions: map[string]string{
"expr1": "",
},
},
map[string]interface{}{},
"ts=now log=notjson",
map[string]interface{}{},
},
"invalid json on extracted[source]": {
&JSONConfig{
Expressions: map[string]string{
"app": "",
},
Source: "log",
},
map[string]interface{}{
"log": "not a json",
},
logFixture,
map[string]interface{}{
"log": "not a json",
},
},
}
for tName, tt := range tests {
tt := tt
@ -222,7 +331,7 @@ func TestJSONParser_Parse(t *testing.T) {
t.Fatalf("failed to create json parser: %s", err)
}
lbs := model.LabelSet{}
extr := map[string]interface{}{}
extr := tt.extracted
ts := time.Now()
p.Process(lbs, extr, &ts, &tt.entry)

@ -1,6 +1,7 @@
package stages
import (
"reflect"
"regexp"
"time"
@ -21,6 +22,7 @@ const (
// RegexConfig contains a regexStage configuration
type RegexConfig struct {
Expression string `mapstructure:"expression"`
Source string `mapstructure:"source"`
}
// validateRegexConfig validates the config and return a regex
@ -77,12 +79,31 @@ func parseRegexConfig(config interface{}) (*RegexConfig, error) {
// Process implements Stage
func (r *regexStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
if entry == nil {
// If a source key is provided, the regex stage should process it
// from the exctracted map, otherwise should fallback to the entry
input := entry
if r.cfg.Source != "" {
if _, ok := extracted[r.cfg.Source]; !ok {
level.Debug(r.logger).Log("msg", "source does not exist in the set of extracted values", "source", r.cfg.Source)
return
}
value, err := getString(extracted[r.cfg.Source])
if err != nil {
level.Debug(r.logger).Log("msg", "failed to convert source value to string", "source", r.cfg.Source, "err", err, "type", reflect.TypeOf(extracted[r.cfg.Source]).String())
return
}
input = &value
}
if input == nil {
level.Debug(r.logger).Log("msg", "cannot parse a nil entry")
return
}
match := r.expression.FindStringSubmatch(*entry)
match := r.expression.FindStringSubmatch(*input)
if match == nil {
level.Debug(r.logger).Log("msg", "regex did not match")
return

@ -13,42 +13,84 @@ import (
"gopkg.in/yaml.v2"
)
var testRegexYaml = `
var testRegexYamlSingleStageWithoutSource = `
pipeline_stages:
- regex:
expression: "^(?P<ip>\\S+) (?P<identd>\\S+) (?P<user>\\S+) \\[(?P<timestamp>[\\w:/]+\\s[+\\-]\\d{4})\\] \"(?P<action>\\S+)\\s?(?P<path>\\S+)?\\s?(?P<protocol>\\S+)?\" (?P<status>\\d{3}|-) (?P<size>\\d+|-)\\s?\"?(?P<referer>[^\"]*)\"?\\s?\"?(?P<useragent>[^\"]*)?\"?$"
`
var testRegexYamlMultiStageWithSource = `
pipeline_stages:
- regex:
expression: "^(?P<ip>\\S+) (?P<identd>\\S+) (?P<user>\\S+) \\[(?P<timestamp>[\\w:/]+\\s[+\\-]\\d{4})\\] \"(?P<action>\\S+)\\s?(?P<path>\\S+)?\\s?(?P<protocol>\\S+)?\" (?P<status>\\d{3}|-) (?P<size>\\d+|-)\\s?\"?(?P<referer>[^\"]*)\"?\\s?\"?(?P<useragent>[^\"]*)?\"?$"
- regex:
expression: "^HTTP\\/(?P<protocol_version>[0-9\\.]+)$"
source: "protocol"
`
var testRegexLogLine = `11.11.11.11 - frank [25/Jan/2000:14:00:01 -0500] "GET /1986.js HTTP/1.1" 200 932 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.7) Gecko/20091221 Firefox/3.5.7 GTB6"`
func TestPipeline_Regex(t *testing.T) {
expected := map[string]interface{}{
"ip": "11.11.11.11",
"identd": "-",
"user": "frank",
"timestamp": "25/Jan/2000:14:00:01 -0500",
"action": "GET",
"path": "/1986.js",
"protocol": "HTTP/1.1",
"status": "200",
"size": "932",
"referer": "-",
"useragent": "Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.7) Gecko/20091221 Firefox/3.5.7 GTB6",
tests := map[string]struct {
config string
entry string
expectedExtract map[string]interface{}
}{
"successfully run a pipeline with 1 regex stage without source": {
testRegexYamlSingleStageWithoutSource,
testRegexLogLine,
map[string]interface{}{
"ip": "11.11.11.11",
"identd": "-",
"user": "frank",
"timestamp": "25/Jan/2000:14:00:01 -0500",
"action": "GET",
"path": "/1986.js",
"protocol": "HTTP/1.1",
"status": "200",
"size": "932",
"referer": "-",
"useragent": "Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.7) Gecko/20091221 Firefox/3.5.7 GTB6",
},
},
"successfully run a pipeline with 2 regex stages with source": {
testRegexYamlMultiStageWithSource,
testRegexLogLine,
map[string]interface{}{
"ip": "11.11.11.11",
"identd": "-",
"user": "frank",
"timestamp": "25/Jan/2000:14:00:01 -0500",
"action": "GET",
"path": "/1986.js",
"protocol": "HTTP/1.1",
"protocol_version": "1.1",
"status": "200",
"size": "932",
"referer": "-",
"useragent": "Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.7) Gecko/20091221 Firefox/3.5.7 GTB6",
},
},
}
pl, err := NewPipeline(util.Logger, loadConfig(testRegexYaml), nil, prometheus.DefaultRegisterer)
if err != nil {
t.Fatal(err)
for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
pl, err := NewPipeline(util.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer)
if err != nil {
t.Fatal(err)
}
lbls := model.LabelSet{}
ts := time.Now()
entry := testData.entry
extracted := map[string]interface{}{}
pl.Process(lbls, extracted, &ts, &entry)
assert.Equal(t, testData.expectedExtract, extracted)
})
}
lbls := model.LabelSet{}
ts := time.Now()
entry := testRegexLogLine
extracted := map[string]interface{}{}
pl.Process(lbls, extracted, &ts, &entry)
assert.Equal(t, expected, extracted)
}
var regexCfg = `regex:
var regexCfg = `regex:
expression: "regexexpression"`
// nolint
@ -96,12 +138,19 @@ func TestRegexConfig_validate(t *testing.T) {
},
errors.New(ErrCouldNotCompileRegex + ": error parsing regexp: invalid named capture: `(?P<ts[0-9]+).*`"),
},
"valid": {
"valid without source": {
map[string]interface{}{
"expression": "(?P<ts>[0-9]+).*",
},
nil,
},
"valid with source": {
map[string]interface{}{
"expression": "(?P<ts>[0-9]+).*",
"source": "log",
},
nil,
},
}
for tName, tt := range tests {
tt := tt
@ -131,13 +180,15 @@ func TestRegexParser_Parse(t *testing.T) {
t.Parallel()
tests := map[string]struct {
config interface{}
extracted map[string]interface{}
entry string
expectedExtract map[string]interface{}
}{
"happy path": {
"successfully match expression on entry": {
map[string]interface{}{
"expression": "^(?P<ip>\\S+) (?P<identd>\\S+) (?P<user>\\S+) \\[(?P<timestamp>[\\w:/]+\\s[+\\-]\\d{4})\\] \"(?P<action>\\S+)\\s?(?P<path>\\S+)?\\s?(?P<protocol>\\S+)?\" (?P<status>\\d{3}|-) (?P<size>\\d+|-)\\s?\"?(?P<referer>[^\"]*)\"?\\s?\"?(?P<useragent>[^\"]*)?\"?$",
},
map[string]interface{}{},
regexLogFixture,
map[string]interface{}{
"ip": "11.11.11.11",
@ -153,13 +204,63 @@ func TestRegexParser_Parse(t *testing.T) {
"useragent": "Mozilla/5.0 (Windows; U; Windows NT 5.1; de; rv:1.9.1.7) Gecko/20091221 Firefox/3.5.7 GTB6",
},
},
"match failed": {
"successfully match expression on extracted[source]": {
map[string]interface{}{
"expression": "^HTTP\\/(?P<protocol_version>.*)$",
"source": "protocol",
},
map[string]interface{}{
"protocol": "HTTP/1.1",
},
regexLogFixture,
map[string]interface{}{
"protocol": "HTTP/1.1",
"protocol_version": "1.1",
},
},
"failed to match expression on entry": {
map[string]interface{}{
"expression": "^(?s)(?P<time>\\S+?) (?P<stream>stdout|stderr) (?P<flags>\\S+?) (?P<message>.*)$",
},
map[string]interface{}{},
"blahblahblah",
map[string]interface{}{},
},
"failed to match expression on extracted[source]": {
map[string]interface{}{
"expression": "^HTTP\\/(?P<protocol_version>.*)$",
"source": "protocol",
},
map[string]interface{}{
"protocol": "unknown",
},
"unknown/unknown",
map[string]interface{}{
"protocol": "unknown",
},
},
"missing extracted[source]": {
map[string]interface{}{
"expression": "^HTTP\\/(?P<protocol_version>.*)$",
"source": "protocol",
},
map[string]interface{}{},
"blahblahblah",
map[string]interface{}{},
},
"invalid data type in extracted[source]": {
map[string]interface{}{
"expression": "^HTTP\\/(?P<protocol_version>.*)$",
"source": "protocol",
},
map[string]interface{}{
"protocol": true,
},
"unknown/unknown",
map[string]interface{}{
"protocol": true,
},
},
}
for tName, tt := range tests {
tt := tt
@ -170,7 +271,7 @@ func TestRegexParser_Parse(t *testing.T) {
t.Fatalf("failed to create regex parser: %s", err)
}
lbs := model.LabelSet{}
extr := map[string]interface{}{}
extr := tt.extracted
ts := time.Now()
p.Process(lbs, extr, &ts, &tt.entry)
assert.Equal(t, tt.expectedExtract, extr)

Loading…
Cancel
Save