fix: align semantics of metric and log query label extraction (#11587)

both metric and log queries use the first extracted label when multiple values are requested for the same label

Fixes #11647
pull/11666/head^2
Trevor Whitney 1 year ago committed by GitHub
parent 9287c93dd3
commit 9759c130fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 5
      pkg/logql/log/parser.go
  3. 31
      pkg/logql/log/parser_hints.go
  4. 20
      pkg/logql/log/parser_hints_test.go

@ -57,6 +57,7 @@
* [11601](https://github.com/grafana/loki/pull/11601) **dannykopping** Ruler: Fixed a panic that can be caused by concurrent read-write access of tenant configs when there are a large amount of rules.
* [11606](https://github.com/grafana/loki/pull/11606) **dannykopping** Fixed regression adding newlines to HTTP error response bodies which may break client integrations.
* [11657](https://github.com/grafana/loki/pull/11657) **ashwanthgoli** Log results cache: compose empty response based on the request being served to avoid returning incorrect limit or direction.
* [11587](https://github.com/grafana/loki/pull/11587) **trevorwhitney** Fix semantics of label parsing logic of metrics and logs queries. Both only parse the first label if multiple extractions into the same label are requested.
##### Changes

@ -493,11 +493,13 @@ func (l *LogfmtExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilde
return "", false
}
if !lbs.ParserLabelHints().ShouldExtract(sanitized) {
_, alwaysExtract := keys[sanitized]
if !alwaysExtract && !lbs.ParserLabelHints().ShouldExtract(sanitized) {
return "", false
}
return sanitized, true
})
if !ok {
continue
}
@ -530,6 +532,7 @@ func (l *LogfmtExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilde
}
}
}
if l.strict && l.dec.Err() != nil {
addErrLabel(errLogfmt, l.dec.Err(), lbs)
return line, true

@ -58,10 +58,6 @@ type Hints struct {
}
func (p *Hints) ShouldExtract(key string) bool {
if len(p.requiredLabels) == 0 {
return true
}
for _, l := range p.extracted {
if l == key {
return false
@ -74,7 +70,7 @@ func (p *Hints) ShouldExtract(key string) bool {
}
}
return false
return len(p.requiredLabels) == 0
}
func (p *Hints) ShouldExtractPrefix(prefix string) bool {
@ -95,19 +91,25 @@ func (p *Hints) NoLabels() bool {
}
func (p *Hints) RecordExtracted(key string) {
for _, l := range p.requiredLabels {
if l == key {
p.extracted = append(p.extracted, key)
return
}
}
p.extracted = append(p.extracted, key)
}
func (p *Hints) AllRequiredExtracted() bool {
if len(p.requiredLabels) == 0 {
if len(p.requiredLabels) == 0 || len(p.extracted) < len(p.requiredLabels) {
return false
}
return len(p.extracted) == len(p.requiredLabels)
found := 0
for _, l := range p.requiredLabels {
for _, e := range p.extracted {
if l == e {
found++
break
}
}
}
return len(p.requiredLabels) == found
}
func (p *Hints) Reset() {
@ -172,9 +174,6 @@ func NewParserHint(requiredLabelNames, groups []string, without, noLabels bool,
return ph
}
ph.requiredLabels = hints
ph.shouldPreserveError = containsError(hints)
return &Hints{requiredLabels: hints, extracted: extracted, shouldPreserveError: containsError(hints)}
}

@ -28,7 +28,10 @@ var (
"response": {
"status": 204,
"latency_seconds": "30.001"
}
},
"message": {
"message": "foo",
}
}`)
packedLine = []byte(`{
@ -58,14 +61,14 @@ func Test_ParserHints(t *testing.T) {
jsonLine,
true,
1.0,
`{app="nginx", cluster="us-central-west", cluster_extracted="us-east-west", protocol="HTTP/2.0", remote_user="foo", request_host="foo.grafana.net", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_latency_seconds="30.001", response_status="204", upstream_addr="10.0.0.1:80"}`,
`{app="nginx", cluster="us-central-west", cluster_extracted="us-east-west", message_message="foo", protocol="HTTP/2.0", remote_user="foo", request_host="foo.grafana.net", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_latency_seconds="30.001", response_status="204", upstream_addr="10.0.0.1:80"}`,
},
{
`sum without (request_host,app,cluster) (rate({app="nginx"} | json | __error__="" | response_status = 204 [1m]))`,
jsonLine,
true,
1.0,
`{cluster_extracted="us-east-west", protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_latency_seconds="30.001", response_status="204", upstream_addr="10.0.0.1:80"}`,
`{cluster_extracted="us-east-west", message_message="foo", protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_latency_seconds="30.001", response_status="204", upstream_addr="10.0.0.1:80"}`,
},
{
`sum by (request_host,app) (rate({app="nginx"} | json | __error__="" | response_status = 204 [1m]))`,
@ -114,14 +117,14 @@ func Test_ParserHints(t *testing.T) {
jsonLine,
true,
30.001,
`{app="nginx", cluster="us-central-west", cluster_extracted="us-east-west", protocol="HTTP/2.0", remote_user="foo", request_host="foo.grafana.net", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`,
`{app="nginx", cluster="us-central-west", cluster_extracted="us-east-west", message_message="foo", protocol="HTTP/2.0", remote_user="foo", request_host="foo.grafana.net", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`,
},
{
`sum without (request_host,app,cluster)(rate({app="nginx"} | json | response_status = 204 | unwrap response_latency_seconds [1m]))`,
jsonLine,
true,
30.001,
`{cluster_extracted="us-east-west", protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`,
`{cluster_extracted="us-east-west", message_message="foo", protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`,
},
{
`sum(rate({app="nginx"} | logfmt | org_id=3677 | unwrap Ingester_TotalReached[1m]))`,
@ -214,6 +217,13 @@ func Test_ParserHints(t *testing.T) {
0,
``,
},
{
`sum by (message_message,app)(count_over_time({app="nginx"} | json | response_status = 204 and remote_user = "foo"[1m]))`,
jsonLine,
true,
1,
`{app="nginx", message_message="foo"}`,
},
} {
tt := tt
t.Run(tt.expr, func(t *testing.T) {

Loading…
Cancel
Save