Introduce a unpack parser. (#3420)

* Introduce a unpack parser.

To go with the new pack stage in promtail https://github.com/grafana/loki/pull/3401 this PR introduces a unpack parser
that will automatically unpack all labels packed into the log line and set back the original log line.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add documentation.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* tweak promtail docs a little now that we have unpack in Loki

Co-authored-by: Edward Welch <edward.welch@grafana.com>
k43
Cyril Tovena 5 years ago committed by GitHub
parent 7a9804bce7
commit 7886f35b8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 24
      docs/sources/clients/promtail/stages/pack.md
  2. 29
      docs/sources/logql/_index.md
  3. 9
      pkg/logentry/stages/pack.go
  4. 16
      pkg/logentry/stages/pack_test.go
  5. 3
      pkg/logql/ast.go
  6. 3
      pkg/logql/ast_test.go
  7. 3
      pkg/logql/expr.y
  8. 584
      pkg/logql/expr.y.go
  9. 5
      pkg/logql/lex.go
  10. 66
      pkg/logql/log/parser.go
  11. 14
      pkg/logql/log/parser_hints_test.go
  12. 70
      pkg/logql/log/parser_test.go
  13. 45
      pkg/logql/parser_test.go

@ -57,28 +57,32 @@ This would create a log line
}
```
Loki 2.0 has some tools to make querying packed log lines easier as well.
**Loki 2.2 also includes a new [`unpack`](../../../../logql/#unpack) parser to work with the pack stage.**
Display the log line as if it were never packed:
For example:
```logql
{cluster="us-central1", job="myjob"} | unpack
```
{cluster="us-central1", job="myjob"} | json | line_format "{{._entry}}"
```
Will automatically unpack embedded labels and log line and replace the log line with the original log line automatically.
### More Examples
Use the packed labels for filtering:
```
{cluster="us-central1", job="myjob"} | json | container="myapp" | line_format "{{._entry}}"
```logql
{cluster="us-central1", job="myjob"} | unpack | container="myapp"
```
You can even use the `json` parser twice if your original message was json:
```
{cluster="us-central1", job="myjob"} | json | container="myapp" | line_format "{{._entry}}" | json | val_from_original_log_json="foo"
```logql
{cluster="us-central1", job="myjob"} | unpack | container="myapp" | json | val_from_original_log_json="foo"
```
Or any other parser
```
{cluster="us-central1", job="myjob"} | json | container="myapp" | line_format "{{._entry}}" | logfmt | val_from_original_log_json="foo"
```logql
{cluster="us-central1", job="myjob"} | unpack | container="myapp" | logfmt | val_from_original_log_json="foo"
```

@ -143,7 +143,11 @@ In case of errors, for instance if the line is not in the expected format, the l
If an extracted label key name already exists in the original log stream, the extracted label key will be suffixed with the `_extracted` keyword to make the distinction between the two labels. You can forcefully override the original label using a [label formatter expression](#labels-format-expression). However if an extracted key appears twice, only the latest label value will be kept.
We support currently support json, logfmt and regexp parsers.
We support currently support [json](#json), [logfmt](#logfmt), [regexp](#regexp) and [unpack](#unpack) parsers.
It's easier to use the predefined parsers like `json` and `logfmt` when you can, falling back to `regexp` when the log lines have unusual structure. Multiple parsers can be used during the same log pipeline which is useful when you want to parse complex logs. ([see examples](#multiple-parsers))
##### Json
The **json** parser operates in two modes:
@ -240,6 +244,8 @@ The **json** parser operates in two modes:
"headers" => `{"Accept": "*/*", "User-Agent": "curl/7.68.0"}`
```
##### logfmt
The **logfmt** parser can be added using the `| logfmt` and will extract all keys and values from the [logfmt](https://brandur.org/logfmt) formatted log line.
For example the following log line:
@ -260,6 +266,8 @@ will get those labels extracted:
"status" => "200"
```
##### regexp
Unlike the logfmt and json, which extract implicitly all values and takes no parameters, the **regexp** parser takes a single parameter `| regexp "<re>"` which is the regular expression using the [Golang](https://golang.org/) [RE2 syntax](https://github.com/google/re2/wiki/Syntax).
The regular expression must contain a least one named sub-match (e.g `(?P<name>re)`), each sub-match will extract a different label.
@ -279,7 +287,24 @@ those labels:
"duration" => "1.5s"
```
It's easier to use the predefined parsers like `json` and `logfmt` when you can, falling back to `regexp` when the log lines have unusual structure. Multiple parsers can be used during the same log pipeline which is useful when you want to parse complex logs. ([see examples](#multiple-parsers))
##### unpack
The `unpack` parser will parse a json log line, and unpack all embedded labels via the [`pack`](../clients/promtail/stages/pack/) stage.
**A special property `_entry` will also be used to replace the original log line**.
For example, using `| unpack` with the following log line:
```json
{
"container": "myapp",
"pod": "pod-3223f",
"_entry": "original log message"
}
```
allows to extract the `container` and `pod` labels and the `original log message` as the new log line.
> You can combine `unpack` with `json` parser (or any other parsers) if the original embedded log line is specific format.
#### Label Filter Expression

@ -14,10 +14,8 @@ import (
"github.com/mitchellh/mapstructure"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
)
const (
entryKey = "_entry"
logql_log "github.com/grafana/loki/pkg/logql/log"
)
var (
@ -40,7 +38,7 @@ func (w *Packed) UnmarshalJSON(data []byte) error {
w.Labels = map[string]string{}
for k, v := range *m {
// _entry key goes to the Entry field, everything else becomes a label
if k == entryKey {
if k == logql_log.PackedEntryKey {
if s, ok := v.(string); ok {
w.Entry = s
} else {
@ -59,7 +57,6 @@ func (w *Packed) UnmarshalJSON(data []byte) error {
// MarshalJSON creates a Packed struct as JSON where the Labels are flattened into the top level of the object
func (w Packed) MarshalJSON() ([]byte, error) {
// Marshal the entry to properly escape if it's json or contains quotes
b, err := json.Marshal(w.Entry)
if err != nil {
@ -101,7 +98,7 @@ func (w Packed) MarshalJSON() ([]byte, error) {
buf.WriteString(",")
}
// Add the line entry
buf.WriteString("\"" + entryKey + "\":")
buf.WriteString("\"" + logql_log.PackedEntryKey + "\":")
buf.Write(b)
buf.WriteString("}")

@ -13,6 +13,7 @@ import (
ww "github.com/weaveworks/common/server"
"github.com/grafana/loki/pkg/logproto"
logql_log "github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/promtail/api"
)
@ -139,7 +140,7 @@ func Test_packStage_Run(t *testing.T) {
},
Entry: logproto.Entry{
Timestamp: time.Unix(1, 0),
Line: "{\"" + entryKey + "\":\"test line 1\"}",
Line: "{\"" + logql_log.PackedEntryKey + "\":\"test line 1\"}",
},
},
},
@ -170,7 +171,7 @@ func Test_packStage_Run(t *testing.T) {
},
Entry: logproto.Entry{
Timestamp: time.Unix(1, 0),
Line: "{\"foo\":\"bar\",\"" + entryKey + "\":\"test line 1\"}",
Line: "{\"foo\":\"bar\",\"" + logql_log.PackedEntryKey + "\":\"test line 1\"}",
},
},
},
@ -199,7 +200,7 @@ func Test_packStage_Run(t *testing.T) {
Labels: model.LabelSet{},
Entry: logproto.Entry{
Timestamp: time.Unix(1, 0),
Line: "{\"bar\":\"baz\",\"foo\":\"bar\",\"" + entryKey + "\":\"test line 1\"}",
Line: "{\"bar\":\"baz\",\"foo\":\"bar\",\"" + logql_log.PackedEntryKey + "\":\"test line 1\"}",
},
},
},
@ -233,7 +234,7 @@ func Test_packStage_Run(t *testing.T) {
},
Entry: logproto.Entry{
Timestamp: time.Unix(1, 0),
Line: "{\"extr1\":\"etr1val\",\"foo\":\"bar\",\"" + entryKey + "\":\"test line 1\"}",
Line: "{\"extr1\":\"etr1val\",\"foo\":\"bar\",\"" + logql_log.PackedEntryKey + "\":\"test line 1\"}",
},
},
},
@ -267,7 +268,7 @@ func Test_packStage_Run(t *testing.T) {
},
Entry: logproto.Entry{
Timestamp: time.Unix(1, 0),
Line: "{\"foo\":\"bar\",\"" + entryKey + "\":\"test line 1\"}",
Line: "{\"foo\":\"bar\",\"" + logql_log.PackedEntryKey + "\":\"test line 1\"}",
},
},
},
@ -301,7 +302,7 @@ func Test_packStage_Run(t *testing.T) {
},
Entry: logproto.Entry{
Timestamp: time.Unix(1, 0),
Line: "{\"ex\\\"tr2\":\"\\\"fd\\\"\",\"foo\":\"bar\",\"" + entryKey + "\":\"test line 1\"}",
Line: "{\"ex\\\"tr2\":\"\\\"fd\\\"\",\"foo\":\"bar\",\"" + logql_log.PackedEntryKey + "\":\"test line 1\"}",
},
},
},
@ -333,7 +334,7 @@ func Test_packStage_Run(t *testing.T) {
},
Entry: logproto.Entry{
Timestamp: time.Unix(1, 0), // Ignored in test execution below
Line: "{\"" + entryKey + "\":\"test line 1\"}",
Line: "{\"" + logql_log.PackedEntryKey + "\":\"test line 1\"}",
},
},
},
@ -362,7 +363,6 @@ func Test_packStage_Run(t *testing.T) {
} else {
assert.Equal(t, tt.expectedEntry.Timestamp, out[0].Timestamp)
}
})
}
}

@ -327,6 +327,8 @@ func (e *labelParserExpr) Stage() (log.Stage, error) {
return log.NewLogfmtParser(), nil
case OpParserTypeRegexp:
return log.NewRegexpParser(e.param)
case OpParserTypeUnpack:
return log.NewUnpackParser(), nil
default:
return nil, fmt.Errorf("unknown parser operator: %s", e.op)
}
@ -573,6 +575,7 @@ const (
OpParserTypeJSON = "json"
OpParserTypeLogfmt = "logfmt"
OpParserTypeRegexp = "regexp"
OpParserTypeUnpack = "unpack"
OpFmtLine = "line_format"
OpFmtLabel = "label_format"

@ -27,6 +27,7 @@ func Test_logSelectorExpr_String(t *testing.T) {
{`{foo="bar", bar!="baz"} |~ "" |= "" |~ ".*"`, false},
{`{foo="bar", bar!="baz"} != "bip" !~ ".+bop" | json`, true},
{`{foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap" | logfmt`, true},
{`{foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap" | unpack | foo>5`, true},
{`{foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap" | logfmt | b>=10GB`, true},
{`{foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap" | regexp "(?P<foo>foo|bar)"`, true},
{`{foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap" | regexp "(?P<foo>foo|bar)" | ( ( foo<5.01 , bar>20ms ) or foo="bar" ) | line_format "blip{{.boop}}bap" | label_format foo=bar,bar="blip{{.blop}}"`, true},
@ -64,6 +65,7 @@ func Test_SampleExpr_String(t *testing.T) {
`sum(count_over_time({job="mysql"}[5m]))`,
`sum(count_over_time({job="mysql"} | json [5m]))`,
`sum(count_over_time({job="mysql"} | logfmt [5m]))`,
`sum(count_over_time({job="mysql"} | unpack | json [5m]))`,
`sum(count_over_time({job="mysql"} | regexp "(?P<foo>foo|bar)" [5m]))`,
`topk(10,sum(rate({region="us-east1"}[5m])) by (name))`,
`topk by (name)(10,sum(rate({region="us-east1"}[5m])))`,
@ -328,6 +330,7 @@ func Test_parserExpr_Parser(t *testing.T) {
wantErr bool
}{
{"json", OpParserTypeJSON, "", log.NewJSONParser(), false},
{"unpack", OpParserTypeUnpack, "", log.NewUnpackParser(), false},
{"logfmt", OpParserTypeLogfmt, "", log.NewLogfmtParser(), false},
{"regexp", OpParserTypeRegexp, "(?P<foo>foo)", mustNewRegexParser("(?P<foo>foo)"), false},
{"regexp err ", OpParserTypeRegexp, "foo", nil, true},

@ -98,7 +98,7 @@ import (
OPEN_PARENTHESIS CLOSE_PARENTHESIS BY WITHOUT COUNT_OVER_TIME RATE SUM AVG MAX MIN COUNT STDDEV STDVAR BOTTOMK TOPK
BYTES_OVER_TIME BYTES_RATE BOOL JSON REGEXP LOGFMT PIPE LINE_FMT LABEL_FMT UNWRAP AVG_OVER_TIME SUM_OVER_TIME MIN_OVER_TIME
MAX_OVER_TIME STDVAR_OVER_TIME STDDEV_OVER_TIME QUANTILE_OVER_TIME BYTES_CONV DURATION_CONV DURATION_SECONDS_CONV
ABSENT_OVER_TIME LABEL_REPLACE
ABSENT_OVER_TIME LABEL_REPLACE UNPACK
// Operators are listed with increasing precedence.
%left <binOp> OR
@ -231,6 +231,7 @@ labelParser:
JSON { $$ = newLabelParserExpr(OpParserTypeJSON, "") }
| LOGFMT { $$ = newLabelParserExpr(OpParserTypeLogfmt, "") }
| REGEXP STRING { $$ = newLabelParserExpr(OpParserTypeRegexp, $2) }
| UNPACK { $$ = newLabelParserExpr(OpParserTypeUnpack, "") }
;
jsonExpressionParser:

File diff suppressed because it is too large Load Diff

@ -54,6 +54,7 @@ var tokens = map[string]int{
OpParserTypeJSON: JSON,
OpParserTypeRegexp: REGEXP,
OpParserTypeLogfmt: LOGFMT,
OpParserTypeUnpack: UNPACK,
// fmt
OpFmtLabel: LABEL_FMT,
@ -195,7 +196,7 @@ func (l *lexer) Error(msg string) {
func tryScanDuration(number string, l *scanner.Scanner) (time.Duration, bool) {
var sb strings.Builder
sb.WriteString(number)
//copy the scanner to avoid advancing it in case it's not a duration.
// copy the scanner to avoid advancing it in case it's not a duration.
s := *l
consumed := 0
for r := s.Peek(); r != scanner.EOF && !unicode.IsSpace(r); r = s.Peek() {
@ -235,7 +236,7 @@ func isDurationRune(r rune) bool {
func tryScanBytes(number string, l *scanner.Scanner) (uint64, bool) {
var sb strings.Builder
sb.WriteString(number)
//copy the scanner to avoid advancing it in case it's not a duration.
// copy the scanner to avoid advancing it in case it's not a duration.
s := *l
consumed := 0
for r := s.Peek(); r != scanner.EOF && !unicode.IsSpace(r); r = s.Peek() {

@ -18,6 +18,7 @@ const (
duplicateSuffix = "_extracted"
trueString = "true"
falseString = "false"
PackedEntryKey = "_entry"
)
var (
@ -313,3 +314,68 @@ func (j *JSONExpressionParser) Process(line []byte, lbs *LabelsBuilder) ([]byte,
}
func (j *JSONExpressionParser) RequiredLabelNames() []string { return []string{} }
type UnpackParser struct{}
// NewUnpackParser creates a new unpack stage.
// The unpack stage will parse a json log line as map[string]string where each key will be translated into labels.
// A special key _entry will also be used to replace the original log line. This is to be used in conjunction with Promtail pack stage.
// see https://grafana.com/docs/loki/latest/clients/promtail/stages/pack/
func NewUnpackParser() *UnpackParser {
return &UnpackParser{}
}
func (UnpackParser) RequiredLabelNames() []string { return []string{} }
func (u *UnpackParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) {
if lbs.ParserLabelHints().NoLabels() {
return line, true
}
it := jsoniter.ConfigFastest.BorrowIterator(line)
defer jsoniter.ConfigFastest.ReturnIterator(it)
entry, err := u.unpack(it, lbs)
if err != nil {
lbs.SetErr(errJSON)
return line, true
}
return entry, true
}
func (u *UnpackParser) unpack(it *jsoniter.Iterator, lbs *LabelsBuilder) ([]byte, error) {
// we only care about object and values.
if nextType := it.WhatIsNext(); nextType != jsoniter.ObjectValue {
return nil, fmt.Errorf("expecting json object(%d), got %d", jsoniter.ObjectValue, nextType)
}
var entry []byte
_ = it.ReadMapCB(func(iter *jsoniter.Iterator, field string) bool {
switch iter.WhatIsNext() {
case jsoniter.StringValue:
// we only unpack map[string]string. Anything else is skipped.
if field == PackedEntryKey {
s := iter.ReadStringAsSlice()
// todo(ctovena): we should just reslice the original line since the property is contiguous
// but jsoniter doesn't allow us to do this right now.
// https://github.com/buger/jsonparser might do a better job at this.
entry = make([]byte, len(s))
copy(entry, s)
return true
}
if !lbs.ParserLabelHints().ShouldExtract(field) {
iter.Skip()
return true
}
if lbs.BaseHas(field) {
field = field + duplicateSuffix
}
lbs.Set(field, iter.ReadString())
default:
iter.Skip()
}
return true
})
if it.Error != nil && it.Error != io.EOF {
return nil, it.Error
}
return entry, nil
}

@ -183,6 +183,20 @@ func Test_ParserHints(t *testing.T) {
1.0,
`{cluster_extracted="us-east-west"}`,
},
{
`sum by (cluster_extracted)(count_over_time({app="nginx"} | unpack | cluster_extracted="us-east-west" [1m]))`,
jsonLine,
true,
1.0,
`{cluster_extracted="us-east-west"}`,
},
{
`sum(rate({app="nginx"} | unpack | nonexistant_field="foo" [1m]))`,
jsonLine,
false,
0,
``,
},
} {
tt := tt
t.Run(tt.expr, func(t *testing.T) {

@ -395,6 +395,7 @@ func Benchmark_Parser(b *testing.B) {
jsonLine := `{"proxy_protocol_addr": "","remote_addr": "3.112.221.14","remote_user": "","upstream_addr": "10.12.15.234:5000","the_real_ip": "3.112.221.14","timestamp": "2020-12-11T16:20:07+00:00","protocol": "HTTP/1.1","upstream_name": "hosted-grafana-hosted-grafana-api-80","request": {"id": "c8eacb6053552c0cd1ae443bc660e140","time": "0.001","method" : "GET","host": "hg-api-qa-us-central1.grafana.net","uri": "/","size" : "128","user_agent": "worldping-api","referer": ""},"response": {"status": 200,"upstream_status": "200","size": "1155","size_sent": "265","latency_seconds": "0.001"}}`
logfmtLine := `level=info ts=2020-12-14T21:25:20.947307459Z caller=metrics.go:83 org_id=29 traceID=c80e691e8db08e2 latency=fast query="sum by (object_name) (rate(({container=\"metrictank\", cluster=\"hm-us-east2\"} |= \"PANIC\")[5m]))" query_type=metric range_type=range length=5m0s step=15s duration=322.623724ms status=200 throughput=1.2GB total_bytes=375MB`
nginxline := `10.1.0.88 - - [14/Dec/2020:22:56:24 +0000] "GET /static/img/about/bob.jpg HTTP/1.1" 200 60755 "https://grafana.com/go/observabilitycon/grafana-the-open-and-composable-observability-platform/?tech=ggl-o&pg=oss-graf&plcmt=hero-txt" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.1 Safari/605.1.15" "123.123.123.123, 35.35.122.223" "TLSv1.3"`
packedLike := `{"job":"123","pod":"someuid123","app":"foo","_entry":"10.1.0.88 - - [14/Dec/2020:22:56:24 +0000] "GET /static/img/about/bob.jpg HTTP/1.1"}`
for _, tt := range []struct {
name string
@ -403,6 +404,7 @@ func Benchmark_Parser(b *testing.B) {
LabelParseHints []string // hints to reduce label extractions.
}{
{"json", jsonLine, NewJSONParser(), []string{"response_latency_seconds"}},
{"unpack", packedLike, NewUnpackParser(), []string{"pod"}},
{"logfmt", logfmtLine, NewLogfmtParser(), []string{"info", "throughput", "org_id"}},
{"regex greedy", nginxline, mustNewRegexParser(`GET (?P<path>.*?)/\?`), []string{"path"}},
{"regex status digits", nginxline, mustNewRegexParser(`HTTP/1.1" (?P<statuscode>\d{3}) `), []string{"statuscode"}},
@ -632,3 +634,71 @@ func Test_logfmtParser_Parse(t *testing.T) {
})
}
}
func Test_unpackParser_Parse(t *testing.T) {
tests := []struct {
name string
line []byte
lbs labels.Labels
wantLbs labels.Labels
wantLine []byte
}{
{
"should extract only map[string]string",
[]byte(`{"bar":1,"app":"foo","namespace":"prod","_entry":"some message","pod":{"uid":"1"}}`),
labels.Labels{{Name: "cluster", Value: "us-central1"}},
labels.Labels{
{Name: "app", Value: "foo"},
{Name: "namespace", Value: "prod"},
{Name: "cluster", Value: "us-central1"},
},
[]byte(`some message`),
},
{
"wrong json",
[]byte(`"app":"foo","namespace":"prod","_entry":"some message","pod":{"uid":"1"}`),
labels.Labels{},
labels.Labels{
{Name: "__error__", Value: "JSONParserErr"},
},
[]byte(`"app":"foo","namespace":"prod","_entry":"some message","pod":{"uid":"1"}`),
},
{
"not a map",
[]byte(`["foo","bar"]`),
labels.Labels{{Name: "cluster", Value: "us-central1"}},
labels.Labels{
{Name: "__error__", Value: "JSONParserErr"},
{Name: "cluster", Value: "us-central1"},
},
[]byte(`["foo","bar"]`),
},
{
"should rename",
[]byte(`{"bar":1,"app":"foo","namespace":"prod","_entry":"some message","pod":{"uid":"1"}}`),
labels.Labels{
{Name: "cluster", Value: "us-central1"},
{Name: "app", Value: "bar"},
},
labels.Labels{
{Name: "app", Value: "bar"},
{Name: "app_extracted", Value: "foo"},
{Name: "namespace", Value: "prod"},
{Name: "cluster", Value: "us-central1"},
},
[]byte(`some message`),
},
}
for _, tt := range tests {
j := NewUnpackParser()
t.Run(tt.name, func(t *testing.T) {
b := NewBaseLabelsBuilder().ForLabels(tt.lbs, tt.lbs.Hash())
b.Reset()
l, _ := j.Process(tt.line, b)
sort.Sort(tt.wantLbs)
require.Equal(t, tt.wantLbs, b.Labels())
require.Equal(t, tt.wantLine, l)
})
}
}

@ -466,6 +466,26 @@ func TestParse(t *testing.T) {
interval: 5 * time.Minute,
}, OpRangeTypeBytes, nil, nil),
},
{
in: `bytes_over_time(({foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap" | unpack)[5m])`,
exp: newRangeAggregationExpr(
&logRange{
left: newPipelineExpr(
newMatcherExpr([]*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}),
MultiStageExpr{
newLineFilterExpr(
newLineFilterExpr(
newLineFilterExpr(
newLineFilterExpr(nil, labels.MatchEqual, "baz"),
labels.MatchRegexp, "blip"),
labels.MatchNotEqual, "flip"),
labels.MatchNotRegexp, "flap"),
newLabelParserExpr(OpParserTypeUnpack, ""),
},
),
interval: 5 * time.Minute,
}, OpRangeTypeBytes, nil, nil),
},
{
in: `
label_replace(
@ -1063,6 +1083,26 @@ func TestParse(t *testing.T) {
},
},
},
{
in: `{app="foo"} |= "bar" | unpack | json | latency >= 250ms or ( status_code < 500 and status_code > 200)`,
exp: &pipelineExpr{
left: newMatcherExpr([]*labels.Matcher{{Type: labels.MatchEqual, Name: "app", Value: "foo"}}),
pipeline: MultiStageExpr{
newLineFilterExpr(nil, labels.MatchEqual, "bar"),
newLabelParserExpr(OpParserTypeUnpack, ""),
newLabelParserExpr(OpParserTypeJSON, ""),
&labelFilterExpr{
LabelFilterer: log.NewOrLabelFilter(
log.NewDurationLabelFilter(log.LabelFilterGreaterThanOrEqual, "latency", 250*time.Millisecond),
log.NewAndLabelFilter(
log.NewNumericLabelFilter(log.LabelFilterLesserThan, "status_code", 500.0),
log.NewNumericLabelFilter(log.LabelFilterGreaterThan, "status_code", 200.0),
),
),
},
},
},
},
{
in: `{app="foo"} |= "bar" | json | (duration > 1s or status!= 200) and method!="POST"`,
exp: &pipelineExpr{
@ -1317,7 +1357,8 @@ func TestParse(t *testing.T) {
newUnwrapExpr("foo", "")),
OpRangeTypeStdvar, nil, nil,
),
}, {
},
{
in: `stdvar_over_time({app="foo"} |= "bar" | json | latency >= 250ms or ( status_code < 500 and status_code > 200)
| line_format "blip{{ .foo }}blop {{.status_code}}" | label_format foo=bar,status_code="buzz{{.bar}}" | unwrap duration(foo) [5m])`,
exp: newRangeAggregationExpr(
@ -2237,7 +2278,6 @@ func TestParse(t *testing.T) {
}
func TestParseMatchers(t *testing.T) {
tests := []struct {
input string
want []*labels.Matcher
@ -2286,7 +2326,6 @@ func TestParseMatchers(t *testing.T) {
}
func TestIsParseError(t *testing.T) {
tests := []struct {
name string
errFn func() error

Loading…
Cancel
Save