diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index b5d3ec6a4b..a51025c0c4 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -8,6 +8,8 @@ import ( "github.com/grafana/loki/pkg/logqlmodel" ) +const MaxInternedStrings = 1024 + var emptyLabelsResult = NewLabelsResult(labels.Labels{}, labels.Labels{}.Hash()) // LabelsResult is a computed labels result that contains the labels set with associated string and hash. @@ -368,3 +370,24 @@ func (b *LabelsBuilder) toBaseGroup() LabelsResult { b.groupedResult = res return res } + +type internedStringSet map[string]struct { + s string + ok bool +} + +func (i internedStringSet) Get(data []byte, createNew func() (string, bool)) (string, bool) { + s, ok := i[string(data)] + if ok { + return s.s, s.ok + } + new, ok := createNew() + if len(i) >= MaxInternedStrings { + return new, ok + } + i[string(data)] = struct { + s string + ok bool + }{s: new, ok: ok} + return new, ok +} diff --git a/pkg/logql/log/parser.go b/pkg/logql/log/parser.go index 2a991c8b17..bacde99a0a 100644 --- a/pkg/logql/log/parser.go +++ b/pkg/logql/log/parser.go @@ -36,12 +36,15 @@ var ( type JSONParser struct { buf []byte // buffer used to build json keys lbs *LabelsBuilder + + keys internedStringSet } // NewJSONParser creates a log stage that can parse a json log line and add properties as labels. func NewJSONParser() *JSONParser { return &JSONParser{ - buf: make([]byte, 0, 1024), + buf: make([]byte, 0, 1024), + keys: internedStringSet{}, } } @@ -110,7 +113,7 @@ func (j *JSONParser) nextKeyPrefix(prefix, field string) (string, bool) { j.buf = append(j.buf, byte(jsonSpacer)) j.buf = append(j.buf, sanitizeLabelKey(field, false)...) // if matches keep going - if j.lbs.ParserLabelHints().ShouldExtractPrefix(string(j.buf)) { + if j.lbs.ParserLabelHints().ShouldExtractPrefix(unsafeGetString(j.buf)) { return string(j.buf), true } return "", false @@ -119,17 +122,21 @@ func (j *JSONParser) nextKeyPrefix(prefix, field string) (string, bool) { func (j *JSONParser) parseLabelValue(iter *jsoniter.Iterator, prefix, field string) { // the first time we use the field as label key. if len(prefix) == 0 { - field = sanitizeLabelKey(field, true) - if !j.lbs.ParserLabelHints().ShouldExtract(field) { - // we can skip the value + key, ok := j.keys.Get(unsafeGetBytes(field), func() (string, bool) { + field = sanitizeLabelKey(field, true) + if !j.lbs.ParserLabelHints().ShouldExtract(field) { + return "", false + } + if j.lbs.BaseHas(field) { + field = field + duplicateSuffix + } + return field, true + }) + if !ok { iter.Skip() return - - } - if j.lbs.BaseHas(field) { - field = field + duplicateSuffix } - j.lbs.Set(field, readValue(iter)) + j.lbs.Set(key, readValue(iter)) return } @@ -138,14 +145,20 @@ func (j *JSONParser) parseLabelValue(iter *jsoniter.Iterator, prefix, field stri j.buf = append(j.buf, prefix...) j.buf = append(j.buf, byte(jsonSpacer)) j.buf = append(j.buf, sanitizeLabelKey(field, false)...) - if j.lbs.BaseHas(string(j.buf)) { - j.buf = append(j.buf, duplicateSuffix...) - } - if !j.lbs.ParserLabelHints().ShouldExtract(string(j.buf)) { + key, ok := j.keys.Get(j.buf, func() (string, bool) { + if j.lbs.BaseHas(string(j.buf)) { + j.buf = append(j.buf, duplicateSuffix...) + } + if !j.lbs.ParserLabelHints().ShouldExtract(string(j.buf)) { + return "", false + } + return string(j.buf), true + }) + if !ok { iter.Skip() return } - j.lbs.Set(string(j.buf), readValue(iter)) + j.lbs.Set(key, readValue(iter)) } func (j *JSONParser) RequiredLabelNames() []string { return []string{} } @@ -172,20 +185,11 @@ func readValue(iter *jsoniter.Iterator) string { } } -func addLabel(lbs *LabelsBuilder, key, value string) { - key = sanitizeLabelKey(key, true) - if len(key) == 0 { - return - } - if lbs.BaseHas(key) { - key = fmt.Sprintf("%s%s", key, duplicateSuffix) - } - lbs.Set(key, value) -} - type RegexpParser struct { regex *regexp.Regexp nameIndex map[int]string + + keys internedStringSet } // NewRegexpParser creates a new log stage that can extract labels from a log line using a regex expression. @@ -218,13 +222,27 @@ func NewRegexpParser(re string) (*RegexpParser, error) { return &RegexpParser{ regex: regex, nameIndex: nameIndex, + keys: internedStringSet{}, }, nil } func (r *RegexpParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) { for i, value := range r.regex.FindSubmatch(line) { if name, ok := r.nameIndex[i]; ok { - addLabel(lbs, name, string(value)) + key, ok := r.keys.Get(unsafeGetBytes(name), func() (string, bool) { + sanitize := sanitizeLabelKey(name, true) + if len(sanitize) == 0 { + return "", false + } + if lbs.BaseHas(sanitize) { + sanitize = fmt.Sprintf("%s%s", sanitize, duplicateSuffix) + } + return sanitize, true + }) + if !ok { + continue + } + lbs.Set(key, string(value)) } } return line, true @@ -233,14 +251,16 @@ func (r *RegexpParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) { func (r *RegexpParser) RequiredLabelNames() []string { return []string{} } type LogfmtParser struct { - dec *logfmt.Decoder + dec *logfmt.Decoder + keys internedStringSet } // NewLogfmtParser creates a parser that can extract labels from a logfmt log line. // Each keyval is extracted into a respective label. func NewLogfmtParser() *LogfmtParser { return &LogfmtParser{ - dec: logfmt.NewDecoder(nil), + dec: logfmt.NewDecoder(nil), + keys: internedStringSet{}, } } @@ -250,16 +270,28 @@ func (l *LogfmtParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) { } l.dec.Reset(line) for l.dec.ScanKeyval() { - if !lbs.ParserLabelHints().ShouldExtract(sanitizeLabelKey(string(l.dec.Key()), true)) { + key, ok := l.keys.Get(l.dec.Key(), func() (string, bool) { + sanitized := sanitizeLabelKey(string(l.dec.Key()), true) + if !lbs.ParserLabelHints().ShouldExtract(sanitized) { + return "", false + } + if len(sanitized) == 0 { + return "", false + } + if lbs.BaseHas(sanitized) { + sanitized = fmt.Sprintf("%s%s", sanitized, duplicateSuffix) + } + return sanitized, true + }) + if !ok { continue } - key := string(l.dec.Key()) val := l.dec.Value() // the rune error replacement is rejected by Prometheus, so we skip it. if bytes.ContainsRune(val, utf8.RuneError) { val = nil } - addLabel(lbs, key, string(val)) + lbs.Set(key, string(val)) } if l.dec.Err() != nil { lbs.SetErr(errLogfmt) @@ -315,6 +347,8 @@ func (l *PatternParser) RequiredLabelNames() []string { return []string{} } type JSONExpressionParser struct { expressions map[string][]interface{} + + keys internedStringSet } func NewJSONExpressionParser(expressions []JSONExpression) (*JSONExpressionParser, error) { @@ -335,6 +369,7 @@ func NewJSONExpressionParser(expressions []JSONExpression) (*JSONExpressionParse return &JSONExpressionParser{ expressions: paths, + keys: internedStringSet{}, }, nil } @@ -350,12 +385,14 @@ func (j *JSONExpressionParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, for identifier, paths := range j.expressions { result := jsoniter.ConfigFastest.Get(line, paths...).ToString() + key, _ := j.keys.Get(unsafeGetBytes(identifier), func() (string, bool) { + if lbs.BaseHas(identifier) { + identifier = identifier + duplicateSuffix + } + return identifier, true + }) - if lbs.BaseHas(identifier) { - identifier = identifier + duplicateSuffix - } - - lbs.Set(identifier, result) + lbs.Set(key, result) } return line, true @@ -365,6 +402,8 @@ func (j *JSONExpressionParser) RequiredLabelNames() []string { return []string{} type UnpackParser struct { lbsBuffer []string + + keys internedStringSet } // NewUnpackParser creates a new unpack stage. @@ -374,6 +413,7 @@ type UnpackParser struct { func NewUnpackParser() *UnpackParser { return &UnpackParser{ lbsBuffer: make([]string, 0, 16), + keys: internedStringSet{}, } } @@ -413,15 +453,22 @@ func (u *UnpackParser) unpack(it *jsoniter.Iterator, entry []byte, lbs *LabelsBu isPacked = true return true } - if !lbs.ParserLabelHints().ShouldExtract(field) { + key, ok := u.keys.Get(unsafeGetBytes(field), func() (string, bool) { + if !lbs.ParserLabelHints().ShouldExtract(field) { + return "", false + } + if lbs.BaseHas(field) { + field = field + duplicateSuffix + } + return field, true + }) + if !ok { iter.Skip() return true } - if lbs.BaseHas(field) { - field = field + duplicateSuffix - } + // append to the buffer of labels - u.lbsBuffer = append(u.lbsBuffer, field, iter.ReadString()) + u.lbsBuffer = append(u.lbsBuffer, key, iter.ReadString()) default: iter.Skip() }