Allow math on errors (#8823)

Currently metric expressions will fail if they contain the parser error
label `__error__`. There are cases where you may want to count errors.
For example, `sum(count_over_time({foo="bar"} | json | __error__=
"JSONParserErr" [$__interval]))` to see how many of your logs are
unparsable JSON.

This pr uses `ParseHint` to determine whether the `__error__` label was
requested in the query and, if it was, do math on it like any other
query. Parse hints are only included in VectorAggregations so the label
used to signal whether or not to keep errors is never seen by a user.

All other operations remain unaffected
pull/8600/head^2
Travis Patterson 3 years ago committed by GitHub
parent 8b8d2ca382
commit 37c0b3dec2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      pkg/logql/evaluator.go
  2. 18
      pkg/logql/log/parser.go
  3. 26
      pkg/logql/log/parser_hints.go
  4. 127
      pkg/logql/log/parser_test.go
  5. 13
      pkg/logqlmodel/error.go

@ -499,8 +499,8 @@ func (r *rangeVectorEvaluator) Next() (bool, int64, promql.Vector) {
}
ts, vec := r.iter.At()
for _, s := range vec {
// Errors are not allowed in metrics.
if s.Metric.Has(logqlmodel.ErrorLabel) {
// Errors are not allowed in metrics unless they've been specifically requested.
if s.Metric.Has(logqlmodel.ErrorLabel) && s.Metric.Get(logqlmodel.PreserveErrorLabel) != "true" {
r.err = logqlmodel.NewPipelineErr(s.Metric)
return false, 0, promql.Vector{}
}
@ -531,8 +531,8 @@ func (r *absentRangeVectorEvaluator) Next() (bool, int64, promql.Vector) {
}
ts, vec := r.iter.At()
for _, s := range vec {
// Errors are not allowed in metrics.
if s.Metric.Has(logqlmodel.ErrorLabel) {
// Errors are not allowed in metrics unless they've been specifically requested.
if s.Metric.Has(logqlmodel.ErrorLabel) && s.Metric.Get(logqlmodel.PreserveErrorLabel) != "true" {
r.err = logqlmodel.NewPipelineErr(s.Metric)
return false, 0, promql.Vector{}
}

@ -66,6 +66,9 @@ func (j *JSONParser) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byte,
if err := jsonparser.ObjectEach(line, j.parseObject); err != nil {
lbs.SetErr(errJSON)
lbs.SetErrorDetails(err.Error())
if lbs.ParserLabelHints().PreserveError() {
lbs.Set(logqlmodel.PreserveErrorLabel, "true")
}
return line, true
}
return line, true
@ -290,6 +293,9 @@ func (l *LogfmtParser) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byte
if l.dec.Err() != nil {
lbs.SetErr(errLogfmt)
lbs.SetErrorDetails(l.dec.Err().Error())
if lbs.ParserLabelHints().PreserveError() {
lbs.Set(logqlmodel.PreserveErrorLabel, "true")
}
return line, true
}
return line, true
@ -432,6 +438,9 @@ func (l *LogfmtExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilde
if l.dec.Err() != nil {
lbs.SetErr(errLogfmt)
lbs.SetErrorDetails(l.dec.Err().Error())
if lbs.ParserLabelHints().PreserveError() {
lbs.Set(logqlmodel.PreserveErrorLabel, "true")
}
return line, true
}
@ -493,6 +502,9 @@ func (j *JSONExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilder)
// parts of the line are malformed
if !isValidJSONStart(line) {
lbs.SetErr(errJSON)
if lbs.ParserLabelHints().PreserveError() {
lbs.Set(logqlmodel.PreserveErrorLabel, "true")
}
return line, true
}
@ -500,6 +512,9 @@ func (j *JSONExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilder)
jsonparser.EachKey(line, func(idx int, data []byte, typ jsonparser.ValueType, err error) {
if err != nil {
lbs.SetErr(errJSON)
if lbs.ParserLabelHints().PreserveError() {
lbs.Set(logqlmodel.PreserveErrorLabel, "true")
}
return
}
@ -573,6 +588,9 @@ func (u *UnpackParser) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byte
if err != nil {
lbs.SetErr(errJSON)
lbs.SetErrorDetails(err.Error())
if lbs.ParserLabelHints().PreserveError() {
lbs.Set(logqlmodel.PreserveErrorLabel, "true")
}
return line, true
}
return entry, true

@ -1,6 +1,7 @@
package log
import (
"github.com/grafana/loki/pkg/logqlmodel"
"strings"
)
@ -25,11 +26,14 @@ type ParserHint interface {
// sum(rate({app="foo"} | json [5m]))
// We don't need to extract any labels from the log line.
NoLabels() bool
// PreserveError returns true when parsing errors were specifically requested
PreserveError() bool
}
type parserHint struct {
noLabels bool
requiredLabels []string
noLabels bool
requiredLabels []string
shouldPreserveError bool
}
func (p *parserHint) ShouldExtract(key string) bool {
@ -61,6 +65,10 @@ func (p *parserHint) NoLabels() bool {
return p.noLabels
}
func (p *parserHint) PreserveError() bool {
return p.shouldPreserveError
}
// newParserHint creates a new parser hint using the list of labels that are seen and required in a query.
func newParserHint(requiredLabelNames, groups []string, without, noLabels bool, metricLabelName string) *parserHint {
hints := make([]string, 0, 2*(len(requiredLabelNames)+len(groups)+1))
@ -68,9 +76,10 @@ func newParserHint(requiredLabelNames, groups []string, without, noLabels bool,
hints = appendLabelHints(hints, groups...)
hints = appendLabelHints(hints, metricLabelName)
hints = uniqueString(hints)
if noLabels {
if len(hints) > 0 {
return &parserHint{requiredLabels: hints}
return &parserHint{requiredLabels: hints, shouldPreserveError: containsError(hints)}
}
return &parserHint{noLabels: true}
}
@ -80,7 +89,16 @@ func newParserHint(requiredLabelNames, groups []string, without, noLabels bool,
if without || len(groups) == 0 {
return noParserHints
}
return &parserHint{requiredLabels: hints}
return &parserHint{requiredLabels: hints, shouldPreserveError: containsError(hints)}
}
func containsError(hints []string) bool {
for _, s := range hints {
if s == logqlmodel.ErrorLabel {
return true
}
}
return false
}
// appendLabelHints Appends the label to the list of hints with and without the duplicate suffix.

@ -13,10 +13,11 @@ import (
func Test_jsonParser_Parse(t *testing.T) {
tests := []struct {
name string
line []byte
lbs labels.Labels
want labels.Labels
name string
line []byte
lbs labels.Labels
want labels.Labels
hints ParserHint
}{
{
"multi depth",
@ -28,6 +29,7 @@ func Test_jsonParser_Parse(t *testing.T) {
{Name: "pod_uuid", Value: "foo"},
{Name: "pod_deployment_ref", Value: "foobar"},
},
noParserHints,
},
{
"numeric",
@ -37,6 +39,7 @@ func Test_jsonParser_Parse(t *testing.T) {
{Name: "counter", Value: "1"},
{Name: "price__net_", Value: "5.56909"},
},
noParserHints,
},
{
"escaped",
@ -47,6 +50,7 @@ func Test_jsonParser_Parse(t *testing.T) {
{Name: "price__net_", Value: "5.56909"},
{Name: "foo", Value: `foo\"bar`},
},
noParserHints,
},
{
"utf8 error rune",
@ -57,6 +61,7 @@ func Test_jsonParser_Parse(t *testing.T) {
{Name: "price__net_", Value: "5.56909"},
{Name: "foo", Value: ""},
},
noParserHints,
},
{
"skip arrays",
@ -65,6 +70,7 @@ func Test_jsonParser_Parse(t *testing.T) {
labels.Labels{
{Name: "counter", Value: "1"},
},
noParserHints,
},
{
"bad key replaced",
@ -73,6 +79,7 @@ func Test_jsonParser_Parse(t *testing.T) {
labels.Labels{
{Name: "cou_nter", Value: "1"},
},
noParserHints,
},
{
"errors",
@ -82,6 +89,18 @@ func Test_jsonParser_Parse(t *testing.T) {
{Name: "__error__", Value: "JSONParserErr"},
{Name: "__error_details__", Value: "Value looks like object, but can't find closing '}' symbol"},
},
noParserHints,
},
{
"errors hints",
[]byte(`{n}`),
labels.Labels{},
labels.Labels{
{Name: "__error__", Value: "JSONParserErr"},
{Name: "__error_details__", Value: "Value looks like object, but can't find closing '}' symbol"},
{Name: "__preserve_error__", Value: "true"},
},
newParserHint([]string{"__error__"}, nil, false, true, ""),
},
{
"duplicate extraction",
@ -97,12 +116,13 @@ func Test_jsonParser_Parse(t *testing.T) {
{Name: "next_err", Value: "false"},
{Name: "pod_deployment_ref", Value: "foobar"},
},
noParserHints,
},
}
for _, tt := range tests {
j := NewJSONParser()
t.Run(tt.name, func(t *testing.T) {
b := NewBaseLabelsBuilder().ForLabels(tt.lbs, tt.lbs.Hash())
b := NewBaseLabelsBuilderWithGrouping(nil, tt.hints, false, false).ForLabels(tt.lbs, tt.lbs.Hash())
b.Reset()
_, _ = j.Process(0, tt.line, b)
sort.Sort(tt.want)
@ -120,6 +140,7 @@ func TestJSONExpressionParser(t *testing.T) {
expressions []LabelExtractionExpr
lbs labels.Labels
want labels.Labels
hints ParserHint
}{
{
"single field",
@ -131,6 +152,7 @@ func TestJSONExpressionParser(t *testing.T) {
labels.Labels{
{Name: "app", Value: "foo"},
},
noParserHints,
},
{
"alternate syntax",
@ -142,6 +164,7 @@ func TestJSONExpressionParser(t *testing.T) {
labels.Labels{
{Name: "test", Value: "value"},
},
noParserHints,
},
{
"multiple fields",
@ -155,6 +178,7 @@ func TestJSONExpressionParser(t *testing.T) {
{Name: "app", Value: "foo"},
{Name: "namespace", Value: "prod"},
},
noParserHints,
},
{
"utf8",
@ -166,6 +190,7 @@ func TestJSONExpressionParser(t *testing.T) {
labels.Labels{
{Name: "utf8", Value: "value"},
},
noParserHints,
},
{
"nested field",
@ -177,6 +202,7 @@ func TestJSONExpressionParser(t *testing.T) {
labels.Labels{
{Name: "uuid", Value: "foo"},
},
noParserHints,
},
{
"nested field alternate syntax",
@ -188,6 +214,7 @@ func TestJSONExpressionParser(t *testing.T) {
labels.Labels{
{Name: "uuid", Value: "foo"},
},
noParserHints,
},
{
"nested field alternate syntax 2",
@ -199,6 +226,7 @@ func TestJSONExpressionParser(t *testing.T) {
labels.Labels{
{Name: "uuid", Value: "foo"},
},
noParserHints,
},
{
"nested field alternate syntax 3",
@ -210,6 +238,7 @@ func TestJSONExpressionParser(t *testing.T) {
labels.Labels{
{Name: "uuid", Value: "foo"},
},
noParserHints,
},
{
"array element",
@ -221,6 +250,7 @@ func TestJSONExpressionParser(t *testing.T) {
labels.Labels{
{Name: "param", Value: "1"},
},
noParserHints,
},
{
"full array",
@ -232,6 +262,7 @@ func TestJSONExpressionParser(t *testing.T) {
labels.Labels{
{Name: "params", Value: "[1,2,3]"},
},
noParserHints,
},
{
"full object",
@ -243,6 +274,7 @@ func TestJSONExpressionParser(t *testing.T) {
labels.Labels{
{Name: "deployment", Value: `{"ref":"foobar", "params": [1,2,3]}`},
},
noParserHints,
},
{
"expression matching nothing",
@ -254,6 +286,7 @@ func TestJSONExpressionParser(t *testing.T) {
labels.Labels{
labels.Label{Name: "nope", Value: ""},
},
noParserHints,
},
{
"null field",
@ -265,6 +298,7 @@ func TestJSONExpressionParser(t *testing.T) {
labels.Labels{
labels.Label{Name: "nf", Value: ""}, // null is coerced to an empty string
},
noParserHints,
},
{
"boolean field",
@ -276,6 +310,7 @@ func TestJSONExpressionParser(t *testing.T) {
labels.Labels{
{Name: "bool", Value: `false`},
},
noParserHints,
},
{
"label override",
@ -290,6 +325,7 @@ func TestJSONExpressionParser(t *testing.T) {
{Name: "uuid", Value: "bar"},
{Name: "uuid_extracted", Value: "foo"},
},
noParserHints,
},
{
"non-matching expression",
@ -304,6 +340,7 @@ func TestJSONExpressionParser(t *testing.T) {
{Name: "uuid", Value: "bar"},
{Name: "request_size", Value: ""},
},
noParserHints,
},
{
"empty line",
@ -315,6 +352,7 @@ func TestJSONExpressionParser(t *testing.T) {
labels.Labels{
labels.Label{Name: "uuid", Value: ""},
},
noParserHints,
},
{
"existing labels are not affected",
@ -329,6 +367,7 @@ func TestJSONExpressionParser(t *testing.T) {
{Name: "foo", Value: "bar"},
{Name: "uuid", Value: ""},
},
noParserHints,
},
{
"invalid JSON line",
@ -343,6 +382,23 @@ func TestJSONExpressionParser(t *testing.T) {
{Name: "foo", Value: "bar"},
{Name: logqlmodel.ErrorLabel, Value: errJSON},
},
noParserHints,
},
{
"invalid JSON line with hints",
[]byte(`invalid json`),
[]LabelExtractionExpr{
NewLabelExtractionExpr("uuid", `will.not.work`),
},
labels.Labels{
{Name: "foo", Value: "bar"},
},
labels.Labels{
{Name: "foo", Value: "bar"},
{Name: logqlmodel.ErrorLabel, Value: errJSON},
{Name: logqlmodel.PreserveErrorLabel, Value: "true"},
},
newParserHint([]string{"__error__"}, nil, false, true, ""),
},
}
for _, tt := range tests {
@ -351,7 +407,7 @@ func TestJSONExpressionParser(t *testing.T) {
t.Fatalf("cannot create JSON expression parser: %s", err.Error())
}
t.Run(tt.name, func(t *testing.T) {
b := NewBaseLabelsBuilder().ForLabels(tt.lbs, tt.lbs.Hash())
b := NewBaseLabelsBuilderWithGrouping(nil, tt.hints, false, false).ForLabels(tt.lbs, tt.lbs.Hash())
b.Reset()
_, _ = j.Process(0, tt.line, b)
sort.Sort(tt.want)
@ -557,10 +613,11 @@ func Test_regexpParser_Parse(t *testing.T) {
func Test_logfmtParser_Parse(t *testing.T) {
tests := []struct {
name string
line []byte
lbs labels.Labels
want labels.Labels
name string
line []byte
lbs labels.Labels
want labels.Labels
hints ParserHint
}{
{
"not logfmt",
@ -573,6 +630,21 @@ func Test_logfmtParser_Parse(t *testing.T) {
{Name: "__error__", Value: "LogfmtParserErr"},
{Name: "__error_details__", Value: "logfmt syntax error at pos 8 : unexpected '='"},
},
noParserHints,
},
{
"not logfmt with hints",
[]byte("foobar====wqe=sdad1r"),
labels.Labels{
{Name: "foo", Value: "bar"},
},
labels.Labels{
{Name: "foo", Value: "bar"},
{Name: "__error__", Value: "LogfmtParserErr"},
{Name: "__error_details__", Value: "logfmt syntax error at pos 8 : unexpected '='"},
{Name: "__preserve_error__", Value: "true"},
},
newParserHint([]string{"__error__"}, nil, false, true, ""),
},
{
"utf8 error rune",
@ -582,6 +654,7 @@ func Test_logfmtParser_Parse(t *testing.T) {
{Name: "buzz", Value: "foo"},
{Name: "bar", Value: ""},
},
noParserHints,
},
{
"key alone logfmt",
@ -594,6 +667,7 @@ func Test_logfmtParser_Parse(t *testing.T) {
{Name: "bar", Value: "foo"},
{Name: "buzz", Value: ""},
},
noParserHints,
},
{
"quoted logfmt",
@ -605,6 +679,7 @@ func Test_logfmtParser_Parse(t *testing.T) {
{Name: "foo", Value: "bar"},
{Name: "foobar", Value: "foo bar"},
},
noParserHints,
},
{
"escaped control chars in logfmt",
@ -616,6 +691,7 @@ func Test_logfmtParser_Parse(t *testing.T) {
{Name: "a", Value: "b"},
{Name: "foobar", Value: "foo\nbar\tbaz"},
},
noParserHints,
},
{
"literal control chars in logfmt",
@ -627,6 +703,7 @@ func Test_logfmtParser_Parse(t *testing.T) {
{Name: "a", Value: "b"},
{Name: "foobar", Value: "foo\nbar\tbaz"},
},
noParserHints,
},
{
"escaped slash logfmt",
@ -638,6 +715,7 @@ func Test_logfmtParser_Parse(t *testing.T) {
{Name: "a", Value: "b"},
{Name: "foobar", Value: `foo ba\r baz`},
},
noParserHints,
},
{
"literal newline and escaped slash logfmt",
@ -649,6 +727,7 @@ func Test_logfmtParser_Parse(t *testing.T) {
{Name: "a", Value: "b"},
{Name: "foobar", Value: "foo bar\nb\\az"},
},
noParserHints,
},
{
"double property logfmt",
@ -661,6 +740,7 @@ func Test_logfmtParser_Parse(t *testing.T) {
{Name: "foobar", Value: "foo bar"},
{Name: "latency", Value: "10ms"},
},
noParserHints,
},
{
"duplicate from line property",
@ -672,6 +752,7 @@ func Test_logfmtParser_Parse(t *testing.T) {
{Name: "foo", Value: "bar"},
{Name: "foobar", Value: "10ms"},
},
noParserHints,
},
{
"duplicate property",
@ -684,6 +765,7 @@ func Test_logfmtParser_Parse(t *testing.T) {
{Name: "foo_extracted", Value: "foo bar"},
{Name: "foobar", Value: "10ms"},
},
noParserHints,
},
{
"invalid key names",
@ -697,6 +779,7 @@ func Test_logfmtParser_Parse(t *testing.T) {
{Name: "foo_bar", Value: "10ms"},
{Name: "test_dash", Value: "foo"},
},
noParserHints,
},
{
"nil",
@ -707,12 +790,13 @@ func Test_logfmtParser_Parse(t *testing.T) {
labels.Labels{
{Name: "foo", Value: "bar"},
},
noParserHints,
},
}
p := NewLogfmtParser()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
b := NewBaseLabelsBuilder().ForLabels(tt.lbs, tt.lbs.Hash())
b := NewBaseLabelsBuilderWithGrouping(nil, tt.hints, false, false).ForLabels(tt.lbs, tt.lbs.Hash())
b.Reset()
_, _ = p.Process(0, tt.line, b)
sort.Sort(tt.want)
@ -894,6 +978,7 @@ func Test_unpackParser_Parse(t *testing.T) {
wantLbs labels.Labels
wantLine []byte
hints ParserHint
}{
{
"should extract only map[string]string",
@ -905,6 +990,7 @@ func Test_unpackParser_Parse(t *testing.T) {
{Name: "cluster", Value: "us-central1"},
},
[]byte(`some message`),
noParserHints,
},
{
"wrong json",
@ -915,6 +1001,19 @@ func Test_unpackParser_Parse(t *testing.T) {
{Name: "__error_details__", Value: "expecting json object(6), but it is not"},
},
[]byte(`"app":"foo","namespace":"prod","_entry":"some message","pod":{"uid":"1"}`),
noParserHints,
},
{
"wrong json with hints",
[]byte(`"app":"foo","namespace":"prod","_entry":"some message","pod":{"uid":"1"}`),
labels.Labels{},
labels.Labels{
{Name: "__error__", Value: "JSONParserErr"},
{Name: "__error_details__", Value: "expecting json object(6), but it is not"},
{Name: "__preserve_error__", Value: "true"},
},
[]byte(`"app":"foo","namespace":"prod","_entry":"some message","pod":{"uid":"1"}`),
newParserHint([]string{"__error__"}, nil, false, true, ""),
},
{
"not a map",
@ -926,6 +1025,7 @@ func Test_unpackParser_Parse(t *testing.T) {
{Name: "cluster", Value: "us-central1"},
},
[]byte(`["foo","bar"]`),
noParserHints,
},
{
"should rename",
@ -941,6 +1041,7 @@ func Test_unpackParser_Parse(t *testing.T) {
{Name: "cluster", Value: "us-central1"},
},
[]byte(`some message`),
noParserHints,
},
{
"should not change log and labels if no packed entry",
@ -954,6 +1055,7 @@ func Test_unpackParser_Parse(t *testing.T) {
{Name: "cluster", Value: "us-central1"},
},
[]byte(`{"bar":1,"app":"foo","namespace":"prod","pod":{"uid":"1"}}`),
noParserHints,
},
{
"non json with escaped quotes",
@ -967,12 +1069,13 @@ func Test_unpackParser_Parse(t *testing.T) {
{Name: "cluster", Value: "us-central1"},
},
[]byte(`I0303 17:49:45.976518 1526 kubelet_getters.go:178] "Pod status updated" pod="openshift-etcd/etcd-ip-10-0-150-50.us-east-2.compute.internal" status=Running`),
noParserHints,
},
}
for _, tt := range tests {
j := NewUnpackParser()
t.Run(tt.name, func(t *testing.T) {
b := NewBaseLabelsBuilder().ForLabels(tt.lbs, tt.lbs.Hash())
b := NewBaseLabelsBuilderWithGrouping(nil, tt.hints, false, false).ForLabels(tt.lbs, tt.lbs.Hash())
b.Reset()
copy := string(tt.line)
l, _ := j.Process(0, tt.line, b)

@ -10,12 +10,13 @@ import (
// Those errors are useful for comparing error returned by the engine.
// e.g. errors.Is(err,logqlmodel.ErrParse) let you know if this is a ast parsing error.
var (
ErrParse = errors.New("failed to parse the log query")
ErrPipeline = errors.New("failed execute pipeline")
ErrLimit = errors.New("limit reached while evaluating the query")
ErrBlocked = errors.New("query blocked by policy")
ErrorLabel = "__error__"
ErrorDetailsLabel = "__error_details__"
ErrParse = errors.New("failed to parse the log query")
ErrPipeline = errors.New("failed execute pipeline")
ErrLimit = errors.New("limit reached while evaluating the query")
ErrBlocked = errors.New("query blocked by policy")
ErrorLabel = "__error__"
PreserveErrorLabel = "__preserve_error__"
ErrorDetailsLabel = "__error_details__"
)
// ParseError is what is returned when we failed to parse.

Loading…
Cancel
Save