diff --git a/docs/sources/logql/log_queries/_index.md b/docs/sources/logql/log_queries/_index.md index f7034b1fa8..0e853419e4 100644 --- a/docs/sources/logql/log_queries/_index.md +++ b/docs/sources/logql/log_queries/_index.md @@ -285,7 +285,7 @@ For instance, the pipeline `| json` will produce the following mapping: In case of errors, for instance if the line is not in the expected format, the log line won't be filtered but instead will get a new `__error__` label added. -If an extracted label key name already exists in the original log stream, the extracted label key will be suffixed with the `_extracted` keyword to make the distinction between the two labels. You can forcefully override the original label using a [label formatter expression](#labels-format-expression). However if an extracted key appears twice, only the latest label value will be kept. +If an extracted label key name already exists in the original log stream, the extracted label key will be suffixed with the `_extracted` keyword to make the distinction between the two labels. You can forcefully override the original label using a [label formatter expression](#labels-format-expression). However, if an extracted key appears twice, only the first label value will be kept. Loki supports [JSON](#json), [logfmt](#logfmt), [pattern](#pattern), [regexp](#regular-expression) and [unpack](#unpack) parsers. diff --git a/docs/sources/upgrading/_index.md b/docs/sources/upgrading/_index.md index b2646f8206..a7bd7ca78f 100644 --- a/docs/sources/upgrading/_index.md +++ b/docs/sources/upgrading/_index.md @@ -35,6 +35,11 @@ The output is incredibly verbose as it shows the entire internal config struct u ### Loki +#### Change in LogQL behavior + +When there are duplicate labels in a log line, only the first value will be kept. Previously only the last value +was kept. + #### Default retention_period has changed This change will affect you if you have: diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index 1081787db4..692330e3b7 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -93,6 +93,10 @@ type LabelsBuilder struct { // NewBaseLabelsBuilderWithGrouping creates a new base labels builder with grouping to compute results. func NewBaseLabelsBuilderWithGrouping(groups []string, parserKeyHints ParserHint, without, noLabels bool) *BaseLabelsBuilder { + if parserKeyHints == nil { + parserKeyHints = noParserHints + } + return &BaseLabelsBuilder{ del: make([]string, 0, 5), add: make([]labels.Label, 0, 16), @@ -137,6 +141,7 @@ func (b *LabelsBuilder) Reset() { b.add = b.add[:0] b.err = "" b.errDetails = "" + b.parserKeyHints.Reset() } // ParserLabelHints returns a limited list of expected labels to extract for metric queries. @@ -233,6 +238,9 @@ func (b *LabelsBuilder) Set(n, v string) *LabelsBuilder { } b.add = append(b.add, labels.Label{Name: n, Value: v}) + // Sometimes labels are set and later modified. Only record + // each label once + b.parserKeyHints.RecordExtracted(n) return b } diff --git a/pkg/logql/log/metrics_extraction.go b/pkg/logql/log/metrics_extraction.go index 7b5a0a48d8..52f59cf874 100644 --- a/pkg/logql/log/metrics_extraction.go +++ b/pkg/logql/log/metrics_extraction.go @@ -50,7 +50,7 @@ type lineSampleExtractor struct { // Multiple log stages are run before converting the log line. func NewLineSampleExtractor(ex LineExtractor, stages []Stage, groups []string, without, noLabels bool) (SampleExtractor, error) { s := ReduceStages(stages) - hints := newParserHint(s.RequiredLabelNames(), groups, without, noLabels, "") + hints := NewParserHint(s.RequiredLabelNames(), groups, without, noLabels, "") return &lineSampleExtractor{ Stage: s, LineExtractor: ex, @@ -138,7 +138,7 @@ func LabelExtractorWithStages( sort.Strings(groups) } preStage := ReduceStages(preStages) - hints := newParserHint(append(preStage.RequiredLabelNames(), postFilter.RequiredLabelNames()...), groups, without, noLabels, labelName) + hints := NewParserHint(append(preStage.RequiredLabelNames(), postFilter.RequiredLabelNames()...), groups, without, noLabels, labelName) return &labelSampleExtractor{ preStage: preStage, conversionFn: convFn, diff --git a/pkg/logql/log/parser.go b/pkg/logql/log/parser.go index 59ae2ee74d..f3e0e9fc7d 100644 --- a/pkg/logql/log/parser.go +++ b/pkg/logql/log/parser.go @@ -37,6 +37,7 @@ var ( errUnexpectedJSONObject = fmt.Errorf("expecting json object(%d), but it is not", jsoniter.ObjectValue) errMissingCapture = errors.New("at least one named capture must be supplied") + errFoundAllLabels = errors.New("found all required labels") ) type JSONParser struct { @@ -64,6 +65,11 @@ func (j *JSONParser) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byte, j.lbs = lbs if err := jsonparser.ObjectEach(line, j.parseObject); err != nil { + if errors.Is(err, errFoundAllLabels) { + // Short-circuited + return line, true + } + lbs.SetErr(errJSON) lbs.SetErrorDetails(err.Error()) if lbs.ParserLabelHints().PreserveError() { @@ -89,6 +95,12 @@ func (j *JSONParser) parseObject(key, value []byte, dataType jsonparser.ValueTyp return err } + if j.lbs.ParserLabelHints().AllRequiredExtracted() { + // Not actually an error. Parsing can be short-circuited + // and this tells jsonparser to stop parsing + return errFoundAllLabels + } + return nil } @@ -135,6 +147,7 @@ func (j *JSONParser) parseLabelValue(key, value []byte, dataType jsonparser.Valu if !j.lbs.ParserLabelHints().ShouldExtract(string(j.prefixBuffer)) { return "", false } + return string(j.prefixBuffer), true }) @@ -234,6 +247,10 @@ func (r *RegexpParser) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byte if lbs.BaseHas(sanitize) { sanitize = fmt.Sprintf("%s%s", sanitize, duplicateSuffix) } + if !lbs.ParserLabelHints().ShouldExtract(sanitize) { + return "", false + } + return sanitize, true }) if !ok { @@ -265,19 +282,22 @@ func (l *LogfmtParser) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byte if lbs.ParserLabelHints().NoLabels() { return line, true } + l.dec.Reset(line) for l.dec.ScanKeyval() { key, ok := l.keys.Get(l.dec.Key(), func() (string, bool) { sanitized := sanitizeLabelKey(string(l.dec.Key()), true) - if !lbs.ParserLabelHints().ShouldExtract(sanitized) { - return "", false - } if len(sanitized) == 0 { return "", false } + if lbs.BaseHas(sanitized) { sanitized = fmt.Sprintf("%s%s", sanitized, duplicateSuffix) } + + if !lbs.ParserLabelHints().ShouldExtract(sanitized) { + return "", false + } return sanitized, true }) if !ok { @@ -289,6 +309,10 @@ func (l *LogfmtParser) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byte val = nil } lbs.Set(key, string(val)) + + if lbs.ParserLabelHints().AllRequiredExtracted() { + break + } } if l.dec.Err() != nil { lbs.SetErr(errLogfmt) @@ -332,13 +356,14 @@ func (l *PatternParser) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byt names := l.names[:len(matches)] for i, m := range matches { name := names[i] - if !lbs.parserKeyHints.ShouldExtract(name) { - continue - } if lbs.BaseHas(name) { name = name + duplicateSuffix } + if !lbs.parserKeyHints.ShouldExtract(name) { + continue + } + lbs.Set(name, string(m)) } return line, true @@ -431,8 +456,17 @@ func (l *LogfmtExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilde if _, ok := l.expressions[key]; ok { if lbs.BaseHas(key) { key = key + duplicateSuffix + if !lbs.ParserLabelHints().ShouldExtract(key) { + // Don't extract duplicates if we don't have to + break + } } + lbs.Set(key, string(val)) + + if lbs.ParserLabelHints().AllRequiredExtracted() { + break + } } } if l.dec.Err() != nil { @@ -618,15 +652,14 @@ func (u *UnpackParser) unpack(entry []byte, lbs *LabelsBuilder) ([]byte, error) isPacked = true return nil } - key, ok := u.keys.Get(key, func() (string, bool) { field := unsafeGetString(key) - if !lbs.ParserLabelHints().ShouldExtract(field) { - return "", false - } if lbs.BaseHas(field) { field = field + duplicateSuffix } + if !lbs.ParserLabelHints().ShouldExtract(field) { + return "", false + } return field, true }) if !ok { diff --git a/pkg/logql/log/parser_hints.go b/pkg/logql/log/parser_hints.go index 35e995b22c..356d44dac8 100644 --- a/pkg/logql/log/parser_hints.go +++ b/pkg/logql/log/parser_hints.go @@ -5,7 +5,7 @@ import ( "strings" ) -var noParserHints = &parserHint{} +var noParserHints = &Hints{} // ParserHint are hints given to LogQL parsers. // This is specially useful for parser that extract implicitly all possible label keys. @@ -26,29 +26,44 @@ type ParserHint interface { // sum(rate({app="foo"} | json [5m])) // We don't need to extract any labels from the log line. NoLabels() bool + + // Holds state about what's already been extracted for the associated + // labels. This assumes that only required labels are ever extracted + RecordExtracted(string) + AllRequiredExtracted() bool + Reset() // PreserveError returns true when parsing errors were specifically requested PreserveError() bool } -type parserHint struct { +type Hints struct { noLabels bool requiredLabels []string shouldPreserveError bool + extracted []string } -func (p *parserHint) ShouldExtract(key string) bool { +func (p *Hints) ShouldExtract(key string) bool { if len(p.requiredLabels) == 0 { return true } + + for _, l := range p.extracted { + if l == key { + return false + } + } + for _, l := range p.requiredLabels { if l == key { return true } } + return false } -func (p *parserHint) ShouldExtractPrefix(prefix string) bool { +func (p *Hints) ShouldExtractPrefix(prefix string) bool { if len(p.requiredLabels) == 0 { return true } @@ -61,27 +76,49 @@ func (p *parserHint) ShouldExtractPrefix(prefix string) bool { return false } -func (p *parserHint) NoLabels() bool { - return p.noLabels +func (p *Hints) NoLabels() bool { + return p.noLabels || p.AllRequiredExtracted() +} + +func (p *Hints) RecordExtracted(key string) { + for _, l := range p.requiredLabels { + if l == key { + p.extracted = append(p.extracted, key) + return + } + } +} + +func (p *Hints) AllRequiredExtracted() bool { + if len(p.requiredLabels) == 0 { + return false + } + return len(p.extracted) == len(p.requiredLabels) +} + +func (p *Hints) Reset() { + p.extracted = p.extracted[:0] } -func (p *parserHint) PreserveError() bool { +// NewParserHint creates a new parser hint using the list of labels that are seen and required in a query. +func (p *Hints) PreserveError() bool { return p.shouldPreserveError } -// newParserHint creates a new parser hint using the list of labels that are seen and required in a query. -func newParserHint(requiredLabelNames, groups []string, without, noLabels bool, metricLabelName string) *parserHint { +// NewParserHint creates a new parser hint using the list of labels that are seen and required in a query. +func NewParserHint(requiredLabelNames, groups []string, without, noLabels bool, metricLabelName string) *Hints { hints := make([]string, 0, 2*(len(requiredLabelNames)+len(groups)+1)) hints = appendLabelHints(hints, requiredLabelNames...) hints = appendLabelHints(hints, groups...) hints = appendLabelHints(hints, metricLabelName) hints = uniqueString(hints) + extracted := make([]string, 0, len(hints)) if noLabels { if len(hints) > 0 { - return &parserHint{requiredLabels: hints, shouldPreserveError: containsError(hints)} + return &Hints{requiredLabels: hints, extracted: extracted, shouldPreserveError: containsError(hints)} } - return &parserHint{noLabels: true} + return &Hints{noLabels: true} } // we don't know what is required when a without clause is used. // Same is true when there's no grouping. @@ -89,7 +126,7 @@ func newParserHint(requiredLabelNames, groups []string, without, noLabels bool, if without || len(groups) == 0 { return noParserHints } - return &parserHint{requiredLabels: hints, shouldPreserveError: containsError(hints)} + return &Hints{requiredLabels: hints, extracted: extracted, shouldPreserveError: containsError(hints)} } func containsError(hints []string) bool { diff --git a/pkg/logql/log/parser_hints_test.go b/pkg/logql/log/parser_hints_test.go index c3d3d136e1..9e0bea9c8c 100644 --- a/pkg/logql/log/parser_hints_test.go +++ b/pkg/logql/log/parser_hints_test.go @@ -2,6 +2,7 @@ package log_test import ( + "github.com/grafana/loki/pkg/logql/log" "testing" "github.com/prometheus/prometheus/model/labels" @@ -232,3 +233,21 @@ func Test_ParserHints(t *testing.T) { }) } } + +func TestRecordingExtractedLabels(t *testing.T) { + p := log.NewParserHint([]string{"1", "2", "3"}, nil, false, true, "") + p.RecordExtracted("1") + p.RecordExtracted("2") + + require.False(t, p.AllRequiredExtracted()) + require.False(t, p.NoLabels()) + + p.RecordExtracted("3") + + require.True(t, p.AllRequiredExtracted()) + require.True(t, p.NoLabels()) + + p.Reset() + require.False(t, p.AllRequiredExtracted()) + require.False(t, p.NoLabels()) +} diff --git a/pkg/logql/log/parser_test.go b/pkg/logql/log/parser_test.go index 503b459414..c60eff56da 100644 --- a/pkg/logql/log/parser_test.go +++ b/pkg/logql/log/parser_test.go @@ -100,7 +100,7 @@ func Test_jsonParser_Parse(t *testing.T) { {Name: "__error_details__", Value: "Value looks like object, but can't find closing '}' symbol"}, {Name: "__preserve_error__", Value: "true"}, }, - newParserHint([]string{"__error__"}, nil, false, true, ""), + NewParserHint([]string{"__error__"}, nil, false, true, ""), }, { "duplicate extraction", @@ -131,6 +131,76 @@ func Test_jsonParser_Parse(t *testing.T) { } } +func TestKeyShortCircuit(t *testing.T) { + simpleJsn := []byte(`{ + "data": "Click Here", + "size": 36, + "style": "bold", + "name": "text1", + "name": "duplicate", + "hOffset": 250, + "vOffset": 100, + "alignment": "center", + "onMouseUp": "sun1.opacity = (sun1.opacity / 100) * 90;" + }`) + logFmt := []byte(`data="ClickHere" size=36 style=bold name=text1 name=duplicate hOffset=250 vOffset=100 alignment=center onMouseUp="sun1.opacity = (sun1.opacity / 100) * 90;"`) + + hints := &fakeParseHints{label: "name"} + lbs := NewBaseLabelsBuilder().ForLabels(labels.Labels{}, 0) + lbs.parserKeyHints = hints + + tests := []struct { + name string + p Stage + line []byte + }{ + {"json", NewJSONParser(), simpleJsn}, + {"logfmt", NewLogfmtParser(), logFmt}, + {"logfmt-expression", mustStage(NewLogfmtExpressionParser([]LabelExtractionExpr{NewLabelExtractionExpr("name", "name")})), logFmt}, + } + for _, tt := range tests { + lbs.Reset() + t.Run(tt.name, func(t *testing.T) { + _, result = tt.p.Process(0, tt.line, lbs) + + require.Len(t, lbs.labels(), 1) + name, ok := lbs.Get("name") + require.True(t, ok) + require.Contains(t, name, "text1") + }) + } +} + +type fakeParseHints struct { + label string + checkCount int + count int +} + +func (p *fakeParseHints) ShouldExtract(key string) bool { + p.checkCount++ + return key == p.label +} +func (p *fakeParseHints) ShouldExtractPrefix(prefix string) bool { + return prefix == p.label +} +func (p *fakeParseHints) NoLabels() bool { + return false +} +func (p *fakeParseHints) RecordExtracted(_ string) { + p.count++ +} +func (p *fakeParseHints) AllRequiredExtracted() bool { + return p.count == 1 +} +func (p *fakeParseHints) Reset() { + p.checkCount = 0 + p.count = 0 +} +func (p *fakeParseHints) PreserveError() bool { + return false +} + func TestJSONExpressionParser(t *testing.T) { testLine := []byte(`{"app":"foo","field with space":"value","field with ÜFT8👌":"value","null_field":null,"bool_field":false,"namespace":"prod","pod":{"uuid":"foo","deployment":{"ref":"foobar", "params": [1,2,3]}}}`) @@ -398,7 +468,7 @@ func TestJSONExpressionParser(t *testing.T) { {Name: logqlmodel.ErrorLabel, Value: errJSON}, {Name: logqlmodel.PreserveErrorLabel, Value: "true"}, }, - newParserHint([]string{"__error__"}, nil, false, true, ""), + NewParserHint([]string{"__error__"}, nil, false, true, ""), }, { "empty line", @@ -516,7 +586,7 @@ func Benchmark_Parser(b *testing.B) { b.Run("labels hints", func(b *testing.B) { builder := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash()) - builder.parserKeyHints = newParserHint(tt.LabelParseHints, tt.LabelParseHints, false, false, "") + builder.parserKeyHints = NewParserHint(tt.LabelParseHints, tt.LabelParseHints, false, false, "") for n := 0; n < b.N; n++ { builder.Reset() _, _ = tt.s.Process(0, line, builder) @@ -526,6 +596,43 @@ func Benchmark_Parser(b *testing.B) { } } +func BenchmarkKeyExtraction(b *testing.B) { + simpleJsn := []byte(`{ + "data": "Click Here", + "size": 36, + "style": "bold", + "name": "text1", + "hOffset": 250, + "vOffset": 100, + "alignment": "center", + "onMouseUp": "sun1.opacity = (sun1.opacity / 100) * 90;" + }`) + logFmt := []byte(`data="Click Here" size=36 style=bold name=text1 hOffset=250 vOffset=100 alignment=center onMouseUp="sun1.opacity = (sun1.opacity / 100) * 90;"`) + + lbs := NewBaseLabelsBuilder().ForLabels(labels.Labels{}, 0) + lbs.parserKeyHints = NewParserHint([]string{"name"}, nil, false, true, "") + + benchmarks := []struct { + name string + p Stage + line []byte + }{ + {"json", NewJSONParser(), simpleJsn}, + {"logfmt", NewLogfmtParser(), logFmt}, + {"logfmt-expression", mustStage(NewLogfmtExpressionParser([]LabelExtractionExpr{NewLabelExtractionExpr("name", "name")})), logFmt}, + } + for _, bb := range benchmarks { + b.Run(bb.name, func(b *testing.B) { + b.ResetTimer() + + for n := 0; n < b.N; n++ { + lbs.Reset() + _, result = bb.p.Process(0, bb.line, lbs) + } + }) + } +} + func mustStage(s Stage, err error) Stage { if err != nil { panic(err) @@ -658,7 +765,7 @@ func Test_logfmtParser_Parse(t *testing.T) { {Name: "__error_details__", Value: "logfmt syntax error at pos 8 : unexpected '='"}, {Name: "__preserve_error__", Value: "true"}, }, - newParserHint([]string{"__error__"}, nil, false, true, ""), + NewParserHint([]string{"__error__"}, nil, false, true, ""), }, { "utf8 error rune", @@ -1035,7 +1142,7 @@ func Test_unpackParser_Parse(t *testing.T) { {Name: "__preserve_error__", Value: "true"}, }, []byte(`"app":"foo","namespace":"prod","_entry":"some message","pod":{"uid":"1"}`), - newParserHint([]string{"__error__"}, nil, false, true, ""), + NewParserHint([]string{"__error__"}, nil, false, true, ""), }, { "not a map",