|
|
|
|
@ -36,12 +36,15 @@ var ( |
|
|
|
|
type JSONParser struct { |
|
|
|
|
buf []byte // buffer used to build json keys
|
|
|
|
|
lbs *LabelsBuilder |
|
|
|
|
|
|
|
|
|
keys internedStringSet |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// NewJSONParser creates a log stage that can parse a json log line and add properties as labels.
|
|
|
|
|
func NewJSONParser() *JSONParser { |
|
|
|
|
return &JSONParser{ |
|
|
|
|
buf: make([]byte, 0, 1024), |
|
|
|
|
buf: make([]byte, 0, 1024), |
|
|
|
|
keys: internedStringSet{}, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -110,7 +113,7 @@ func (j *JSONParser) nextKeyPrefix(prefix, field string) (string, bool) { |
|
|
|
|
j.buf = append(j.buf, byte(jsonSpacer)) |
|
|
|
|
j.buf = append(j.buf, sanitizeLabelKey(field, false)...) |
|
|
|
|
// if matches keep going
|
|
|
|
|
if j.lbs.ParserLabelHints().ShouldExtractPrefix(string(j.buf)) { |
|
|
|
|
if j.lbs.ParserLabelHints().ShouldExtractPrefix(unsafeGetString(j.buf)) { |
|
|
|
|
return string(j.buf), true |
|
|
|
|
} |
|
|
|
|
return "", false |
|
|
|
|
@ -119,17 +122,21 @@ func (j *JSONParser) nextKeyPrefix(prefix, field string) (string, bool) { |
|
|
|
|
func (j *JSONParser) parseLabelValue(iter *jsoniter.Iterator, prefix, field string) { |
|
|
|
|
// the first time we use the field as label key.
|
|
|
|
|
if len(prefix) == 0 { |
|
|
|
|
field = sanitizeLabelKey(field, true) |
|
|
|
|
if !j.lbs.ParserLabelHints().ShouldExtract(field) { |
|
|
|
|
// we can skip the value
|
|
|
|
|
key, ok := j.keys.Get(unsafeGetBytes(field), func() (string, bool) { |
|
|
|
|
field = sanitizeLabelKey(field, true) |
|
|
|
|
if !j.lbs.ParserLabelHints().ShouldExtract(field) { |
|
|
|
|
return "", false |
|
|
|
|
} |
|
|
|
|
if j.lbs.BaseHas(field) { |
|
|
|
|
field = field + duplicateSuffix |
|
|
|
|
} |
|
|
|
|
return field, true |
|
|
|
|
}) |
|
|
|
|
if !ok { |
|
|
|
|
iter.Skip() |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
if j.lbs.BaseHas(field) { |
|
|
|
|
field = field + duplicateSuffix |
|
|
|
|
} |
|
|
|
|
j.lbs.Set(field, readValue(iter)) |
|
|
|
|
j.lbs.Set(key, readValue(iter)) |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
@ -138,14 +145,20 @@ func (j *JSONParser) parseLabelValue(iter *jsoniter.Iterator, prefix, field stri |
|
|
|
|
j.buf = append(j.buf, prefix...) |
|
|
|
|
j.buf = append(j.buf, byte(jsonSpacer)) |
|
|
|
|
j.buf = append(j.buf, sanitizeLabelKey(field, false)...) |
|
|
|
|
if j.lbs.BaseHas(string(j.buf)) { |
|
|
|
|
j.buf = append(j.buf, duplicateSuffix...) |
|
|
|
|
} |
|
|
|
|
if !j.lbs.ParserLabelHints().ShouldExtract(string(j.buf)) { |
|
|
|
|
key, ok := j.keys.Get(j.buf, func() (string, bool) { |
|
|
|
|
if j.lbs.BaseHas(string(j.buf)) { |
|
|
|
|
j.buf = append(j.buf, duplicateSuffix...) |
|
|
|
|
} |
|
|
|
|
if !j.lbs.ParserLabelHints().ShouldExtract(string(j.buf)) { |
|
|
|
|
return "", false |
|
|
|
|
} |
|
|
|
|
return string(j.buf), true |
|
|
|
|
}) |
|
|
|
|
if !ok { |
|
|
|
|
iter.Skip() |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
j.lbs.Set(string(j.buf), readValue(iter)) |
|
|
|
|
j.lbs.Set(key, readValue(iter)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (j *JSONParser) RequiredLabelNames() []string { return []string{} } |
|
|
|
|
@ -172,20 +185,11 @@ func readValue(iter *jsoniter.Iterator) string { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func addLabel(lbs *LabelsBuilder, key, value string) { |
|
|
|
|
key = sanitizeLabelKey(key, true) |
|
|
|
|
if len(key) == 0 { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
if lbs.BaseHas(key) { |
|
|
|
|
key = fmt.Sprintf("%s%s", key, duplicateSuffix) |
|
|
|
|
} |
|
|
|
|
lbs.Set(key, value) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type RegexpParser struct { |
|
|
|
|
regex *regexp.Regexp |
|
|
|
|
nameIndex map[int]string |
|
|
|
|
|
|
|
|
|
keys internedStringSet |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// NewRegexpParser creates a new log stage that can extract labels from a log line using a regex expression.
|
|
|
|
|
@ -218,13 +222,27 @@ func NewRegexpParser(re string) (*RegexpParser, error) { |
|
|
|
|
return &RegexpParser{ |
|
|
|
|
regex: regex, |
|
|
|
|
nameIndex: nameIndex, |
|
|
|
|
keys: internedStringSet{}, |
|
|
|
|
}, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *RegexpParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) { |
|
|
|
|
for i, value := range r.regex.FindSubmatch(line) { |
|
|
|
|
if name, ok := r.nameIndex[i]; ok { |
|
|
|
|
addLabel(lbs, name, string(value)) |
|
|
|
|
key, ok := r.keys.Get(unsafeGetBytes(name), func() (string, bool) { |
|
|
|
|
sanitize := sanitizeLabelKey(name, true) |
|
|
|
|
if len(sanitize) == 0 { |
|
|
|
|
return "", false |
|
|
|
|
} |
|
|
|
|
if lbs.BaseHas(sanitize) { |
|
|
|
|
sanitize = fmt.Sprintf("%s%s", sanitize, duplicateSuffix) |
|
|
|
|
} |
|
|
|
|
return sanitize, true |
|
|
|
|
}) |
|
|
|
|
if !ok { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
lbs.Set(key, string(value)) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return line, true |
|
|
|
|
@ -233,14 +251,16 @@ func (r *RegexpParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) { |
|
|
|
|
func (r *RegexpParser) RequiredLabelNames() []string { return []string{} } |
|
|
|
|
|
|
|
|
|
type LogfmtParser struct { |
|
|
|
|
dec *logfmt.Decoder |
|
|
|
|
dec *logfmt.Decoder |
|
|
|
|
keys internedStringSet |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// NewLogfmtParser creates a parser that can extract labels from a logfmt log line.
|
|
|
|
|
// Each keyval is extracted into a respective label.
|
|
|
|
|
func NewLogfmtParser() *LogfmtParser { |
|
|
|
|
return &LogfmtParser{ |
|
|
|
|
dec: logfmt.NewDecoder(nil), |
|
|
|
|
dec: logfmt.NewDecoder(nil), |
|
|
|
|
keys: internedStringSet{}, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -250,16 +270,28 @@ func (l *LogfmtParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) { |
|
|
|
|
} |
|
|
|
|
l.dec.Reset(line) |
|
|
|
|
for l.dec.ScanKeyval() { |
|
|
|
|
if !lbs.ParserLabelHints().ShouldExtract(sanitizeLabelKey(string(l.dec.Key()), true)) { |
|
|
|
|
key, ok := l.keys.Get(l.dec.Key(), func() (string, bool) { |
|
|
|
|
sanitized := sanitizeLabelKey(string(l.dec.Key()), true) |
|
|
|
|
if !lbs.ParserLabelHints().ShouldExtract(sanitized) { |
|
|
|
|
return "", false |
|
|
|
|
} |
|
|
|
|
if len(sanitized) == 0 { |
|
|
|
|
return "", false |
|
|
|
|
} |
|
|
|
|
if lbs.BaseHas(sanitized) { |
|
|
|
|
sanitized = fmt.Sprintf("%s%s", sanitized, duplicateSuffix) |
|
|
|
|
} |
|
|
|
|
return sanitized, true |
|
|
|
|
}) |
|
|
|
|
if !ok { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
key := string(l.dec.Key()) |
|
|
|
|
val := l.dec.Value() |
|
|
|
|
// the rune error replacement is rejected by Prometheus, so we skip it.
|
|
|
|
|
if bytes.ContainsRune(val, utf8.RuneError) { |
|
|
|
|
val = nil |
|
|
|
|
} |
|
|
|
|
addLabel(lbs, key, string(val)) |
|
|
|
|
lbs.Set(key, string(val)) |
|
|
|
|
} |
|
|
|
|
if l.dec.Err() != nil { |
|
|
|
|
lbs.SetErr(errLogfmt) |
|
|
|
|
@ -315,6 +347,8 @@ func (l *PatternParser) RequiredLabelNames() []string { return []string{} } |
|
|
|
|
|
|
|
|
|
type JSONExpressionParser struct { |
|
|
|
|
expressions map[string][]interface{} |
|
|
|
|
|
|
|
|
|
keys internedStringSet |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func NewJSONExpressionParser(expressions []JSONExpression) (*JSONExpressionParser, error) { |
|
|
|
|
@ -335,6 +369,7 @@ func NewJSONExpressionParser(expressions []JSONExpression) (*JSONExpressionParse |
|
|
|
|
|
|
|
|
|
return &JSONExpressionParser{ |
|
|
|
|
expressions: paths, |
|
|
|
|
keys: internedStringSet{}, |
|
|
|
|
}, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -350,12 +385,14 @@ func (j *JSONExpressionParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, |
|
|
|
|
|
|
|
|
|
for identifier, paths := range j.expressions { |
|
|
|
|
result := jsoniter.ConfigFastest.Get(line, paths...).ToString() |
|
|
|
|
key, _ := j.keys.Get(unsafeGetBytes(identifier), func() (string, bool) { |
|
|
|
|
if lbs.BaseHas(identifier) { |
|
|
|
|
identifier = identifier + duplicateSuffix |
|
|
|
|
} |
|
|
|
|
return identifier, true |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
if lbs.BaseHas(identifier) { |
|
|
|
|
identifier = identifier + duplicateSuffix |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
lbs.Set(identifier, result) |
|
|
|
|
lbs.Set(key, result) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return line, true |
|
|
|
|
@ -365,6 +402,8 @@ func (j *JSONExpressionParser) RequiredLabelNames() []string { return []string{} |
|
|
|
|
|
|
|
|
|
type UnpackParser struct { |
|
|
|
|
lbsBuffer []string |
|
|
|
|
|
|
|
|
|
keys internedStringSet |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// NewUnpackParser creates a new unpack stage.
|
|
|
|
|
@ -374,6 +413,7 @@ type UnpackParser struct { |
|
|
|
|
func NewUnpackParser() *UnpackParser { |
|
|
|
|
return &UnpackParser{ |
|
|
|
|
lbsBuffer: make([]string, 0, 16), |
|
|
|
|
keys: internedStringSet{}, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -413,15 +453,22 @@ func (u *UnpackParser) unpack(it *jsoniter.Iterator, entry []byte, lbs *LabelsBu |
|
|
|
|
isPacked = true |
|
|
|
|
return true |
|
|
|
|
} |
|
|
|
|
if !lbs.ParserLabelHints().ShouldExtract(field) { |
|
|
|
|
key, ok := u.keys.Get(unsafeGetBytes(field), func() (string, bool) { |
|
|
|
|
if !lbs.ParserLabelHints().ShouldExtract(field) { |
|
|
|
|
return "", false |
|
|
|
|
} |
|
|
|
|
if lbs.BaseHas(field) { |
|
|
|
|
field = field + duplicateSuffix |
|
|
|
|
} |
|
|
|
|
return field, true |
|
|
|
|
}) |
|
|
|
|
if !ok { |
|
|
|
|
iter.Skip() |
|
|
|
|
return true |
|
|
|
|
} |
|
|
|
|
if lbs.BaseHas(field) { |
|
|
|
|
field = field + duplicateSuffix |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// append to the buffer of labels
|
|
|
|
|
u.lbsBuffer = append(u.lbsBuffer, field, iter.ReadString()) |
|
|
|
|
u.lbsBuffer = append(u.lbsBuffer, key, iter.ReadString()) |
|
|
|
|
default: |
|
|
|
|
iter.Skip() |
|
|
|
|
} |
|
|
|
|
|