diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index 343941b78c..f5b31616c6 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -385,9 +385,15 @@ func (b *LabelsBuilder) Set(category LabelCategory, n, v string) *LabelsBuilder } b.add[category] = append(b.add[category], labels.Label{Name: n, Value: v}) - // Sometimes labels are set and later modified. Only record - // each label once - b.parserKeyHints.RecordExtracted(n) + if category == ParsedLabel { + // We record parsed labels as extracted so that future parse stages can + // quickly bypass any existing extracted fields. + // + // Note that because this is used for bypassing extracted fields, and + // because parsed labels always take precedence over structured metadata + // and stream labels, we must only call RecordExtracted for parsed labels. + b.parserKeyHints.RecordExtracted(n) + } return b } diff --git a/pkg/logql/log/parser.go b/pkg/logql/log/parser.go index 6a40dcfe31..2aafafcd78 100644 --- a/pkg/logql/log/parser.go +++ b/pkg/logql/log/parser.go @@ -147,7 +147,7 @@ func (j *JSONParser) parseLabelValue(key, value []byte, dataType jsonparser.Valu } return field, true }) - if !ok { + if !ok || j.lbs.ParserLabelHints().Extracted(sanitizedKey) { return nil } j.lbs.Set(ParsedLabel, sanitizedKey, readValue(value, dataType)) @@ -188,7 +188,7 @@ func (j *JSONParser) parseLabelValue(key, value []byte, dataType jsonparser.Valu // reset the prefix position j.prefixBuffer = j.prefixBuffer[:prefixLen] - if !ok { + if !ok || j.lbs.ParserLabelHints().Extracted(keyString) { return nil } @@ -321,7 +321,7 @@ func (r *RegexpParser) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byte return sanitize, true }) - if !ok { + if !ok || parserHints.Extracted(key) { continue } @@ -387,7 +387,7 @@ func (l *LogfmtParser) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byte } return sanitized, true }) - if !ok { + if !ok || parserHints.Extracted(key) { continue } @@ -459,7 +459,7 @@ func (l *PatternParser) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byt name = name + duplicateSuffix } - if !parserHints.ShouldExtract(name) { + if parserHints.Extracted(name) || !parserHints.ShouldExtract(name) { continue } @@ -526,6 +526,14 @@ func (l *LogfmtExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilde } } + // alwaysExtract checks whether a key should be extracted regardless of other + // conditions. + alwaysExtract := func(key string) bool { + // Any key in the expression list should always be extracted. + _, ok := keys[key] + return ok + } + l.dec.Reset(line) var current []byte for !l.dec.EOL() { @@ -546,14 +554,12 @@ func (l *LogfmtExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilde return "", false } - _, alwaysExtract := keys[sanitized] - if !alwaysExtract && !lbs.ParserLabelHints().ShouldExtract(sanitized) { + if !alwaysExtract(sanitized) && !lbs.ParserLabelHints().ShouldExtract(sanitized) { return "", false } return sanitized, true }) - - if !ok { + if !ok || (!alwaysExtract(key) && lbs.ParserLabelHints().Extracted(key)) { continue } @@ -572,7 +578,7 @@ func (l *LogfmtExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilde if _, ok := l.expressions[key]; ok { if lbs.BaseHas(key) { key = key + duplicateSuffix - if !lbs.ParserLabelHints().ShouldExtract(key) { + if lbs.ParserLabelHints().Extracted(key) || !lbs.ParserLabelHints().ShouldExtract(key) { // Don't extract duplicates if we don't have to break } @@ -784,7 +790,7 @@ func (u *UnpackParser) unpack(entry []byte, lbs *LabelsBuilder) ([]byte, error) } return field, true }) - if !ok { + if !ok || lbs.ParserLabelHints().Extracted(key) { return nil } diff --git a/pkg/logql/log/parser_hints.go b/pkg/logql/log/parser_hints.go index 32a789250d..660aa46372 100644 --- a/pkg/logql/log/parser_hints.go +++ b/pkg/logql/log/parser_hints.go @@ -19,7 +19,13 @@ func NoParserHints() ParserHint { // // All we need to extract is the status_code in the json parser. type ParserHint interface { - // Tells if a label with the given key should be extracted. + // Extracted returns whether a key has already been extracted by a previous + // parse stage. This result must not be cached. + Extracted(key string) bool + + // Tells if a label with the given key should be extracted. Parsers may cache + // the result of ShouldExtract, so implementations of houldExtract must + // return consistent results for any log line or parser stage. ShouldExtract(key string) bool // Tells if there's any hint that start with the given prefix. @@ -54,17 +60,20 @@ type Hints struct { noLabels bool requiredLabels []string shouldPreserveError bool - extracted []string + extracted map[string]struct{} labelFilters []LabelFilterer labelNames []string } +func (p *Hints) Extracted(key string) bool { + _, ok := p.extracted[key] // It's safe to read from a nil map. + return ok +} + func (p *Hints) ShouldExtract(key string) bool { - for _, l := range p.extracted { - if l == key { - return false - } - } + // The result of ShouldExtract gets cached in parser stages, so we must + // return consistent results throughout the lifetime of a query; this means + // we can't account for p.extracted here. for _, l := range p.requiredLabels { if l == key { @@ -93,7 +102,10 @@ func (p *Hints) NoLabels() bool { } func (p *Hints) RecordExtracted(key string) { - p.extracted = append(p.extracted, key) + if p.extracted == nil { + p.extracted = make(map[string]struct{}) + } + p.extracted[key] = struct{}{} } func (p *Hints) AllRequiredExtracted() bool { @@ -103,11 +115,8 @@ func (p *Hints) AllRequiredExtracted() bool { found := 0 for _, l := range p.requiredLabels { - for _, e := range p.extracted { - if l == e { - found++ - break - } + if p.Extracted(l) { + found++ } } @@ -115,7 +124,7 @@ func (p *Hints) AllRequiredExtracted() bool { } func (p *Hints) Reset() { - p.extracted = p.extracted[:0] + clear(p.extracted) } func (p *Hints) PreserveError() bool { @@ -159,7 +168,7 @@ func NewParserHint(requiredLabelNames, groups []string, without, noLabels bool, } } - extracted := make([]string, 0, len(hints)) + extracted := make(map[string]struct{}, len(hints)) if noLabels { if len(hints) > 0 { return &Hints{requiredLabels: hints, extracted: extracted, shouldPreserveError: containsError(hints), labelFilters: labelFilters, labelNames: labelNames} diff --git a/pkg/logql/log/parser_test.go b/pkg/logql/log/parser_test.go index ffdb348974..3f6eb449bf 100644 --- a/pkg/logql/log/parser_test.go +++ b/pkg/logql/log/parser_test.go @@ -301,6 +301,10 @@ type fakeParseHints struct { extractAll bool } +func (p *fakeParseHints) Extracted(_ string) bool { + return false +} + func (p *fakeParseHints) ShouldExtract(key string) bool { p.checkCount++ return key == p.label || p.extractAll @@ -1149,7 +1153,7 @@ func TestLogfmtParser_parse(t *testing.T) { []byte(`foobar="foo bar" foobar=10ms`), labels.FromStrings("foo", "bar"), labels.FromStrings("foo", "bar", - "foobar", "10ms", + "foobar", "foo bar", ), nil, NoParserHints(), @@ -1342,6 +1346,69 @@ func TestLogfmtParser_keepEmpty(t *testing.T) { } } +func TestLogfmtConsistentPrecedence(t *testing.T) { + line := `app=lowkey level=error ts=2021-02-12T19:18:10.037940878Z msg="hello world"` + + t.Run("sturctured metadata first", func(t *testing.T) { + var ( + metadataStream = NewBaseLabelsBuilder(). + ForLabels(labels.FromStrings("foo", "bar"), 0). + Set(StructuredMetadataLabel, "app", "loki") + + basicStream = NewBaseLabelsBuilder(). + ForLabels(labels.FromStrings("foo", "baz"), 0) + ) + + parser := NewLogfmtParser(true, true) + + _, ok := parser.Process(0, []byte(line), metadataStream) + require.True(t, ok) + + _, ok = parser.Process(0, []byte(line), basicStream) + require.True(t, ok) + + res, cat, ok := metadataStream.GetWithCategory("app") + require.Equal(t, "lowkey", res) + require.Equal(t, ParsedLabel, cat) + require.True(t, ok) + + res, cat, ok = basicStream.GetWithCategory("app") + require.Equal(t, "lowkey", res) + require.Equal(t, ParsedLabel, cat) + require.True(t, ok) + }) + + t.Run("parsed labels first", func(t *testing.T) { + var ( + metadataStream = NewBaseLabelsBuilder(). + ForLabels(labels.FromStrings("foo", "bar"), 0). + Set(StructuredMetadataLabel, "app", "loki") + + basicStream = NewBaseLabelsBuilder(). + ForLabels(labels.FromStrings("foo", "baz"), 0) + ) + + parser := NewLogfmtParser(true, true) + + _, ok := parser.Process(0, []byte(line), basicStream) + require.True(t, ok) + + _, ok = parser.Process(0, []byte(line), metadataStream) + require.True(t, ok) + + res, cat, ok := metadataStream.GetWithCategory("app") + require.Equal(t, "lowkey", res) + require.Equal(t, ParsedLabel, cat) + require.True(t, ok) + + res, cat, ok = basicStream.GetWithCategory("app") + require.Equal(t, "lowkey", res) + require.Equal(t, ParsedLabel, cat) + require.True(t, ok) + }) + +} + func TestLogfmtExpressionParser(t *testing.T) { testLine := []byte(`app=foo level=error spaces="value with ÜFT8👌" ts=2021-02-12T19:18:10.037940878Z`)