Short circuit parsers (#8724)

In many cases we only need a few labels from a line. Because most of our
parsers parse lines incrementally, we can stop parsing a line after we
have all the labels we want from it.

This pr uses `ParserHints` to keep track of the number of extracted
labels. It also provides a way for parsers to know when they should stop
parsing.

Notes:
- parsers had inconsistent ordering between the `ShouldExtract` call and
adding the `_extracted` label to duplicates. This PR makes appending
`_extracted` always happen before `ShouldExtract` to keep counts of what
is extracted compared to expected labels consistent.

Next Steps:
- When the user specifies a query with a grouping containing the
`_extracted` label but there is no duplicate between the passed line and
labels, short circuiting will not work. I'll address this in a follow up
PR.

Benchmarks:
To try and show a balanced view of what this buys us, this pr picks a
label our of the middle of a line. In a best case this might be much
better. In the worst case, we have to parse the whole line.

```
benchstat short_circuit_old.txt short_circuit_new.txt
name                               old time/op    new time/op    delta
KeyExtraction/json-8                  456ns ± 3%     256ns ± 2%  -43.84%  (p=0.000 n=9+10)
KeyExtraction/logfmt-8                347ns ± 4%     171ns ± 2%  -50.86%  (p=0.000 n=10+10)
KeyExtraction/logfmt-expression-8     552ns ± 2%     368ns ± 2%  -33.23%  (p=0.000 n=9+10)

name                               old alloc/op   new alloc/op   delta
KeyExtraction/json-8                  5.00B ± 0%     5.00B ± 0%     ~     (all equal)
KeyExtraction/logfmt-8                5.00B ± 0%     5.00B ± 0%     ~     (all equal)
KeyExtraction/logfmt-expression-8     16.0B ± 0%     16.0B ± 0%     ~     (all equal)

name                               old allocs/op  new allocs/op  delta
KeyExtraction/json-8                   1.00 ± 0%      1.00 ± 0%     ~     (all equal)
KeyExtraction/logfmt-8                 1.00 ± 0%      1.00 ± 0%     ~     (all equal)
KeyExtraction/logfmt-expression-8      2.00 ± 0%      2.00 ± 0%     ~     (all equal)
```

---------

Co-authored-by: J Stickler <julie.stickler@grafana.com>
pull/8872/head
Travis Patterson 3 years ago committed by GitHub
parent 01936330f4
commit 9d20bed26a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      docs/sources/logql/log_queries/_index.md
  2. 5
      docs/sources/upgrading/_index.md
  3. 8
      pkg/logql/log/labels.go
  4. 4
      pkg/logql/log/metrics_extraction.go
  5. 53
      pkg/logql/log/parser.go
  6. 61
      pkg/logql/log/parser_hints.go
  7. 19
      pkg/logql/log/parser_hints_test.go
  8. 117
      pkg/logql/log/parser_test.go

@ -285,7 +285,7 @@ For instance, the pipeline `| json` will produce the following mapping:
In case of errors, for instance if the line is not in the expected format, the log line won't be filtered but instead will get a new `__error__` label added.
If an extracted label key name already exists in the original log stream, the extracted label key will be suffixed with the `_extracted` keyword to make the distinction between the two labels. You can forcefully override the original label using a [label formatter expression](#labels-format-expression). However if an extracted key appears twice, only the latest label value will be kept.
If an extracted label key name already exists in the original log stream, the extracted label key will be suffixed with the `_extracted` keyword to make the distinction between the two labels. You can forcefully override the original label using a [label formatter expression](#labels-format-expression). However, if an extracted key appears twice, only the first label value will be kept.
Loki supports [JSON](#json), [logfmt](#logfmt), [pattern](#pattern), [regexp](#regular-expression) and [unpack](#unpack) parsers.

@ -35,6 +35,11 @@ The output is incredibly verbose as it shows the entire internal config struct u
### Loki
#### Change in LogQL behavior
When there are duplicate labels in a log line, only the first value will be kept. Previously only the last value
was kept.
#### Default retention_period has changed
This change will affect you if you have:

@ -93,6 +93,10 @@ type LabelsBuilder struct {
// NewBaseLabelsBuilderWithGrouping creates a new base labels builder with grouping to compute results.
func NewBaseLabelsBuilderWithGrouping(groups []string, parserKeyHints ParserHint, without, noLabels bool) *BaseLabelsBuilder {
if parserKeyHints == nil {
parserKeyHints = noParserHints
}
return &BaseLabelsBuilder{
del: make([]string, 0, 5),
add: make([]labels.Label, 0, 16),
@ -137,6 +141,7 @@ func (b *LabelsBuilder) Reset() {
b.add = b.add[:0]
b.err = ""
b.errDetails = ""
b.parserKeyHints.Reset()
}
// ParserLabelHints returns a limited list of expected labels to extract for metric queries.
@ -233,6 +238,9 @@ func (b *LabelsBuilder) Set(n, v string) *LabelsBuilder {
}
b.add = append(b.add, labels.Label{Name: n, Value: v})
// Sometimes labels are set and later modified. Only record
// each label once
b.parserKeyHints.RecordExtracted(n)
return b
}

@ -50,7 +50,7 @@ type lineSampleExtractor struct {
// Multiple log stages are run before converting the log line.
func NewLineSampleExtractor(ex LineExtractor, stages []Stage, groups []string, without, noLabels bool) (SampleExtractor, error) {
s := ReduceStages(stages)
hints := newParserHint(s.RequiredLabelNames(), groups, without, noLabels, "")
hints := NewParserHint(s.RequiredLabelNames(), groups, without, noLabels, "")
return &lineSampleExtractor{
Stage: s,
LineExtractor: ex,
@ -138,7 +138,7 @@ func LabelExtractorWithStages(
sort.Strings(groups)
}
preStage := ReduceStages(preStages)
hints := newParserHint(append(preStage.RequiredLabelNames(), postFilter.RequiredLabelNames()...), groups, without, noLabels, labelName)
hints := NewParserHint(append(preStage.RequiredLabelNames(), postFilter.RequiredLabelNames()...), groups, without, noLabels, labelName)
return &labelSampleExtractor{
preStage: preStage,
conversionFn: convFn,

@ -37,6 +37,7 @@ var (
errUnexpectedJSONObject = fmt.Errorf("expecting json object(%d), but it is not", jsoniter.ObjectValue)
errMissingCapture = errors.New("at least one named capture must be supplied")
errFoundAllLabels = errors.New("found all required labels")
)
type JSONParser struct {
@ -64,6 +65,11 @@ func (j *JSONParser) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byte,
j.lbs = lbs
if err := jsonparser.ObjectEach(line, j.parseObject); err != nil {
if errors.Is(err, errFoundAllLabels) {
// Short-circuited
return line, true
}
lbs.SetErr(errJSON)
lbs.SetErrorDetails(err.Error())
if lbs.ParserLabelHints().PreserveError() {
@ -89,6 +95,12 @@ func (j *JSONParser) parseObject(key, value []byte, dataType jsonparser.ValueTyp
return err
}
if j.lbs.ParserLabelHints().AllRequiredExtracted() {
// Not actually an error. Parsing can be short-circuited
// and this tells jsonparser to stop parsing
return errFoundAllLabels
}
return nil
}
@ -135,6 +147,7 @@ func (j *JSONParser) parseLabelValue(key, value []byte, dataType jsonparser.Valu
if !j.lbs.ParserLabelHints().ShouldExtract(string(j.prefixBuffer)) {
return "", false
}
return string(j.prefixBuffer), true
})
@ -234,6 +247,10 @@ func (r *RegexpParser) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byte
if lbs.BaseHas(sanitize) {
sanitize = fmt.Sprintf("%s%s", sanitize, duplicateSuffix)
}
if !lbs.ParserLabelHints().ShouldExtract(sanitize) {
return "", false
}
return sanitize, true
})
if !ok {
@ -265,19 +282,22 @@ func (l *LogfmtParser) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byte
if lbs.ParserLabelHints().NoLabels() {
return line, true
}
l.dec.Reset(line)
for l.dec.ScanKeyval() {
key, ok := l.keys.Get(l.dec.Key(), func() (string, bool) {
sanitized := sanitizeLabelKey(string(l.dec.Key()), true)
if !lbs.ParserLabelHints().ShouldExtract(sanitized) {
return "", false
}
if len(sanitized) == 0 {
return "", false
}
if lbs.BaseHas(sanitized) {
sanitized = fmt.Sprintf("%s%s", sanitized, duplicateSuffix)
}
if !lbs.ParserLabelHints().ShouldExtract(sanitized) {
return "", false
}
return sanitized, true
})
if !ok {
@ -289,6 +309,10 @@ func (l *LogfmtParser) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byte
val = nil
}
lbs.Set(key, string(val))
if lbs.ParserLabelHints().AllRequiredExtracted() {
break
}
}
if l.dec.Err() != nil {
lbs.SetErr(errLogfmt)
@ -332,13 +356,14 @@ func (l *PatternParser) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byt
names := l.names[:len(matches)]
for i, m := range matches {
name := names[i]
if !lbs.parserKeyHints.ShouldExtract(name) {
continue
}
if lbs.BaseHas(name) {
name = name + duplicateSuffix
}
if !lbs.parserKeyHints.ShouldExtract(name) {
continue
}
lbs.Set(name, string(m))
}
return line, true
@ -431,8 +456,17 @@ func (l *LogfmtExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilde
if _, ok := l.expressions[key]; ok {
if lbs.BaseHas(key) {
key = key + duplicateSuffix
if !lbs.ParserLabelHints().ShouldExtract(key) {
// Don't extract duplicates if we don't have to
break
}
}
lbs.Set(key, string(val))
if lbs.ParserLabelHints().AllRequiredExtracted() {
break
}
}
}
if l.dec.Err() != nil {
@ -618,15 +652,14 @@ func (u *UnpackParser) unpack(entry []byte, lbs *LabelsBuilder) ([]byte, error)
isPacked = true
return nil
}
key, ok := u.keys.Get(key, func() (string, bool) {
field := unsafeGetString(key)
if !lbs.ParserLabelHints().ShouldExtract(field) {
return "", false
}
if lbs.BaseHas(field) {
field = field + duplicateSuffix
}
if !lbs.ParserLabelHints().ShouldExtract(field) {
return "", false
}
return field, true
})
if !ok {

@ -5,7 +5,7 @@ import (
"strings"
)
var noParserHints = &parserHint{}
var noParserHints = &Hints{}
// ParserHint are hints given to LogQL parsers.
// This is specially useful for parser that extract implicitly all possible label keys.
@ -26,29 +26,44 @@ type ParserHint interface {
// sum(rate({app="foo"} | json [5m]))
// We don't need to extract any labels from the log line.
NoLabels() bool
// Holds state about what's already been extracted for the associated
// labels. This assumes that only required labels are ever extracted
RecordExtracted(string)
AllRequiredExtracted() bool
Reset()
// PreserveError returns true when parsing errors were specifically requested
PreserveError() bool
}
type parserHint struct {
type Hints struct {
noLabels bool
requiredLabels []string
shouldPreserveError bool
extracted []string
}
func (p *parserHint) ShouldExtract(key string) bool {
func (p *Hints) ShouldExtract(key string) bool {
if len(p.requiredLabels) == 0 {
return true
}
for _, l := range p.extracted {
if l == key {
return false
}
}
for _, l := range p.requiredLabels {
if l == key {
return true
}
}
return false
}
func (p *parserHint) ShouldExtractPrefix(prefix string) bool {
func (p *Hints) ShouldExtractPrefix(prefix string) bool {
if len(p.requiredLabels) == 0 {
return true
}
@ -61,27 +76,49 @@ func (p *parserHint) ShouldExtractPrefix(prefix string) bool {
return false
}
func (p *parserHint) NoLabels() bool {
return p.noLabels
func (p *Hints) NoLabels() bool {
return p.noLabels || p.AllRequiredExtracted()
}
func (p *Hints) RecordExtracted(key string) {
for _, l := range p.requiredLabels {
if l == key {
p.extracted = append(p.extracted, key)
return
}
}
}
func (p *Hints) AllRequiredExtracted() bool {
if len(p.requiredLabels) == 0 {
return false
}
return len(p.extracted) == len(p.requiredLabels)
}
func (p *Hints) Reset() {
p.extracted = p.extracted[:0]
}
func (p *parserHint) PreserveError() bool {
// NewParserHint creates a new parser hint using the list of labels that are seen and required in a query.
func (p *Hints) PreserveError() bool {
return p.shouldPreserveError
}
// newParserHint creates a new parser hint using the list of labels that are seen and required in a query.
func newParserHint(requiredLabelNames, groups []string, without, noLabels bool, metricLabelName string) *parserHint {
// NewParserHint creates a new parser hint using the list of labels that are seen and required in a query.
func NewParserHint(requiredLabelNames, groups []string, without, noLabels bool, metricLabelName string) *Hints {
hints := make([]string, 0, 2*(len(requiredLabelNames)+len(groups)+1))
hints = appendLabelHints(hints, requiredLabelNames...)
hints = appendLabelHints(hints, groups...)
hints = appendLabelHints(hints, metricLabelName)
hints = uniqueString(hints)
extracted := make([]string, 0, len(hints))
if noLabels {
if len(hints) > 0 {
return &parserHint{requiredLabels: hints, shouldPreserveError: containsError(hints)}
return &Hints{requiredLabels: hints, extracted: extracted, shouldPreserveError: containsError(hints)}
}
return &parserHint{noLabels: true}
return &Hints{noLabels: true}
}
// we don't know what is required when a without clause is used.
// Same is true when there's no grouping.
@ -89,7 +126,7 @@ func newParserHint(requiredLabelNames, groups []string, without, noLabels bool,
if without || len(groups) == 0 {
return noParserHints
}
return &parserHint{requiredLabels: hints, shouldPreserveError: containsError(hints)}
return &Hints{requiredLabels: hints, extracted: extracted, shouldPreserveError: containsError(hints)}
}
func containsError(hints []string) bool {

@ -2,6 +2,7 @@
package log_test
import (
"github.com/grafana/loki/pkg/logql/log"
"testing"
"github.com/prometheus/prometheus/model/labels"
@ -232,3 +233,21 @@ func Test_ParserHints(t *testing.T) {
})
}
}
func TestRecordingExtractedLabels(t *testing.T) {
p := log.NewParserHint([]string{"1", "2", "3"}, nil, false, true, "")
p.RecordExtracted("1")
p.RecordExtracted("2")
require.False(t, p.AllRequiredExtracted())
require.False(t, p.NoLabels())
p.RecordExtracted("3")
require.True(t, p.AllRequiredExtracted())
require.True(t, p.NoLabels())
p.Reset()
require.False(t, p.AllRequiredExtracted())
require.False(t, p.NoLabels())
}

@ -100,7 +100,7 @@ func Test_jsonParser_Parse(t *testing.T) {
{Name: "__error_details__", Value: "Value looks like object, but can't find closing '}' symbol"},
{Name: "__preserve_error__", Value: "true"},
},
newParserHint([]string{"__error__"}, nil, false, true, ""),
NewParserHint([]string{"__error__"}, nil, false, true, ""),
},
{
"duplicate extraction",
@ -131,6 +131,76 @@ func Test_jsonParser_Parse(t *testing.T) {
}
}
func TestKeyShortCircuit(t *testing.T) {
simpleJsn := []byte(`{
"data": "Click Here",
"size": 36,
"style": "bold",
"name": "text1",
"name": "duplicate",
"hOffset": 250,
"vOffset": 100,
"alignment": "center",
"onMouseUp": "sun1.opacity = (sun1.opacity / 100) * 90;"
}`)
logFmt := []byte(`data="ClickHere" size=36 style=bold name=text1 name=duplicate hOffset=250 vOffset=100 alignment=center onMouseUp="sun1.opacity = (sun1.opacity / 100) * 90;"`)
hints := &fakeParseHints{label: "name"}
lbs := NewBaseLabelsBuilder().ForLabels(labels.Labels{}, 0)
lbs.parserKeyHints = hints
tests := []struct {
name string
p Stage
line []byte
}{
{"json", NewJSONParser(), simpleJsn},
{"logfmt", NewLogfmtParser(), logFmt},
{"logfmt-expression", mustStage(NewLogfmtExpressionParser([]LabelExtractionExpr{NewLabelExtractionExpr("name", "name")})), logFmt},
}
for _, tt := range tests {
lbs.Reset()
t.Run(tt.name, func(t *testing.T) {
_, result = tt.p.Process(0, tt.line, lbs)
require.Len(t, lbs.labels(), 1)
name, ok := lbs.Get("name")
require.True(t, ok)
require.Contains(t, name, "text1")
})
}
}
type fakeParseHints struct {
label string
checkCount int
count int
}
func (p *fakeParseHints) ShouldExtract(key string) bool {
p.checkCount++
return key == p.label
}
func (p *fakeParseHints) ShouldExtractPrefix(prefix string) bool {
return prefix == p.label
}
func (p *fakeParseHints) NoLabels() bool {
return false
}
func (p *fakeParseHints) RecordExtracted(_ string) {
p.count++
}
func (p *fakeParseHints) AllRequiredExtracted() bool {
return p.count == 1
}
func (p *fakeParseHints) Reset() {
p.checkCount = 0
p.count = 0
}
func (p *fakeParseHints) PreserveError() bool {
return false
}
func TestJSONExpressionParser(t *testing.T) {
testLine := []byte(`{"app":"foo","field with space":"value","field with ÜFT8👌":"value","null_field":null,"bool_field":false,"namespace":"prod","pod":{"uuid":"foo","deployment":{"ref":"foobar", "params": [1,2,3]}}}`)
@ -398,7 +468,7 @@ func TestJSONExpressionParser(t *testing.T) {
{Name: logqlmodel.ErrorLabel, Value: errJSON},
{Name: logqlmodel.PreserveErrorLabel, Value: "true"},
},
newParserHint([]string{"__error__"}, nil, false, true, ""),
NewParserHint([]string{"__error__"}, nil, false, true, ""),
},
{
"empty line",
@ -516,7 +586,7 @@ func Benchmark_Parser(b *testing.B) {
b.Run("labels hints", func(b *testing.B) {
builder := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash())
builder.parserKeyHints = newParserHint(tt.LabelParseHints, tt.LabelParseHints, false, false, "")
builder.parserKeyHints = NewParserHint(tt.LabelParseHints, tt.LabelParseHints, false, false, "")
for n := 0; n < b.N; n++ {
builder.Reset()
_, _ = tt.s.Process(0, line, builder)
@ -526,6 +596,43 @@ func Benchmark_Parser(b *testing.B) {
}
}
func BenchmarkKeyExtraction(b *testing.B) {
simpleJsn := []byte(`{
"data": "Click Here",
"size": 36,
"style": "bold",
"name": "text1",
"hOffset": 250,
"vOffset": 100,
"alignment": "center",
"onMouseUp": "sun1.opacity = (sun1.opacity / 100) * 90;"
}`)
logFmt := []byte(`data="Click Here" size=36 style=bold name=text1 hOffset=250 vOffset=100 alignment=center onMouseUp="sun1.opacity = (sun1.opacity / 100) * 90;"`)
lbs := NewBaseLabelsBuilder().ForLabels(labels.Labels{}, 0)
lbs.parserKeyHints = NewParserHint([]string{"name"}, nil, false, true, "")
benchmarks := []struct {
name string
p Stage
line []byte
}{
{"json", NewJSONParser(), simpleJsn},
{"logfmt", NewLogfmtParser(), logFmt},
{"logfmt-expression", mustStage(NewLogfmtExpressionParser([]LabelExtractionExpr{NewLabelExtractionExpr("name", "name")})), logFmt},
}
for _, bb := range benchmarks {
b.Run(bb.name, func(b *testing.B) {
b.ResetTimer()
for n := 0; n < b.N; n++ {
lbs.Reset()
_, result = bb.p.Process(0, bb.line, lbs)
}
})
}
}
func mustStage(s Stage, err error) Stage {
if err != nil {
panic(err)
@ -658,7 +765,7 @@ func Test_logfmtParser_Parse(t *testing.T) {
{Name: "__error_details__", Value: "logfmt syntax error at pos 8 : unexpected '='"},
{Name: "__preserve_error__", Value: "true"},
},
newParserHint([]string{"__error__"}, nil, false, true, ""),
NewParserHint([]string{"__error__"}, nil, false, true, ""),
},
{
"utf8 error rune",
@ -1035,7 +1142,7 @@ func Test_unpackParser_Parse(t *testing.T) {
{Name: "__preserve_error__", Value: "true"},
},
[]byte(`"app":"foo","namespace":"prod","_entry":"some message","pod":{"uid":"1"}`),
newParserHint([]string{"__error__"}, nil, false, true, ""),
NewParserHint([]string{"__error__"}, nil, false, true, ""),
},
{
"not a map",

Loading…
Cancel
Save