diff --git a/CHANGELOG.md b/CHANGELOG.md index f9d2d38fbb..46e9a24daf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,7 @@ * [11601](https://github.com/grafana/loki/pull/11601) **dannykopping** Ruler: Fixed a panic that can be caused by concurrent read-write access of tenant configs when there are a large amount of rules. * [11606](https://github.com/grafana/loki/pull/11606) **dannykopping** Fixed regression adding newlines to HTTP error response bodies which may break client integrations. * [11657](https://github.com/grafana/loki/pull/11657) **ashwanthgoli** Log results cache: compose empty response based on the request being served to avoid returning incorrect limit or direction. +* [11587](https://github.com/grafana/loki/pull/11587) **trevorwhitney** Fix semantics of label parsing logic of metrics and logs queries. Both only parse the first label if multiple extractions into the same label are requested. ##### Changes diff --git a/pkg/logql/log/parser.go b/pkg/logql/log/parser.go index be059a2831..c03e7c91cb 100644 --- a/pkg/logql/log/parser.go +++ b/pkg/logql/log/parser.go @@ -493,11 +493,13 @@ func (l *LogfmtExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilde return "", false } - if !lbs.ParserLabelHints().ShouldExtract(sanitized) { + _, alwaysExtract := keys[sanitized] + if !alwaysExtract && !lbs.ParserLabelHints().ShouldExtract(sanitized) { return "", false } return sanitized, true }) + if !ok { continue } @@ -530,6 +532,7 @@ func (l *LogfmtExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilde } } } + if l.strict && l.dec.Err() != nil { addErrLabel(errLogfmt, l.dec.Err(), lbs) return line, true diff --git a/pkg/logql/log/parser_hints.go b/pkg/logql/log/parser_hints.go index cdb61015dd..a8b1f73f31 100644 --- a/pkg/logql/log/parser_hints.go +++ b/pkg/logql/log/parser_hints.go @@ -58,10 +58,6 @@ type Hints struct { } func (p *Hints) ShouldExtract(key string) bool { - if len(p.requiredLabels) == 0 { - return true - } - for _, l := range p.extracted { if l == key { return false @@ -74,7 +70,7 @@ func (p *Hints) ShouldExtract(key string) bool { } } - return false + return len(p.requiredLabels) == 0 } func (p *Hints) ShouldExtractPrefix(prefix string) bool { @@ -95,19 +91,25 @@ func (p *Hints) NoLabels() bool { } func (p *Hints) RecordExtracted(key string) { - for _, l := range p.requiredLabels { - if l == key { - p.extracted = append(p.extracted, key) - return - } - } + p.extracted = append(p.extracted, key) } func (p *Hints) AllRequiredExtracted() bool { - if len(p.requiredLabels) == 0 { + if len(p.requiredLabels) == 0 || len(p.extracted) < len(p.requiredLabels) { return false } - return len(p.extracted) == len(p.requiredLabels) + + found := 0 + for _, l := range p.requiredLabels { + for _, e := range p.extracted { + if l == e { + found++ + break + } + } + } + + return len(p.requiredLabels) == found } func (p *Hints) Reset() { @@ -172,9 +174,6 @@ func NewParserHint(requiredLabelNames, groups []string, without, noLabels bool, return ph } - ph.requiredLabels = hints - ph.shouldPreserveError = containsError(hints) - return &Hints{requiredLabels: hints, extracted: extracted, shouldPreserveError: containsError(hints)} } diff --git a/pkg/logql/log/parser_hints_test.go b/pkg/logql/log/parser_hints_test.go index ac232bfd87..42d0134bc1 100644 --- a/pkg/logql/log/parser_hints_test.go +++ b/pkg/logql/log/parser_hints_test.go @@ -28,7 +28,10 @@ var ( "response": { "status": 204, "latency_seconds": "30.001" - } + }, + "message": { + "message": "foo", + } }`) packedLine = []byte(`{ @@ -58,14 +61,14 @@ func Test_ParserHints(t *testing.T) { jsonLine, true, 1.0, - `{app="nginx", cluster="us-central-west", cluster_extracted="us-east-west", protocol="HTTP/2.0", remote_user="foo", request_host="foo.grafana.net", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_latency_seconds="30.001", response_status="204", upstream_addr="10.0.0.1:80"}`, + `{app="nginx", cluster="us-central-west", cluster_extracted="us-east-west", message_message="foo", protocol="HTTP/2.0", remote_user="foo", request_host="foo.grafana.net", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_latency_seconds="30.001", response_status="204", upstream_addr="10.0.0.1:80"}`, }, { `sum without (request_host,app,cluster) (rate({app="nginx"} | json | __error__="" | response_status = 204 [1m]))`, jsonLine, true, 1.0, - `{cluster_extracted="us-east-west", protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_latency_seconds="30.001", response_status="204", upstream_addr="10.0.0.1:80"}`, + `{cluster_extracted="us-east-west", message_message="foo", protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_latency_seconds="30.001", response_status="204", upstream_addr="10.0.0.1:80"}`, }, { `sum by (request_host,app) (rate({app="nginx"} | json | __error__="" | response_status = 204 [1m]))`, @@ -114,14 +117,14 @@ func Test_ParserHints(t *testing.T) { jsonLine, true, 30.001, - `{app="nginx", cluster="us-central-west", cluster_extracted="us-east-west", protocol="HTTP/2.0", remote_user="foo", request_host="foo.grafana.net", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`, + `{app="nginx", cluster="us-central-west", cluster_extracted="us-east-west", message_message="foo", protocol="HTTP/2.0", remote_user="foo", request_host="foo.grafana.net", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`, }, { `sum without (request_host,app,cluster)(rate({app="nginx"} | json | response_status = 204 | unwrap response_latency_seconds [1m]))`, jsonLine, true, 30.001, - `{cluster_extracted="us-east-west", protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`, + `{cluster_extracted="us-east-west", message_message="foo", protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`, }, { `sum(rate({app="nginx"} | logfmt | org_id=3677 | unwrap Ingester_TotalReached[1m]))`, @@ -214,6 +217,13 @@ func Test_ParserHints(t *testing.T) { 0, ``, }, + { + `sum by (message_message,app)(count_over_time({app="nginx"} | json | response_status = 204 and remote_user = "foo"[1m]))`, + jsonLine, + true, + 1, + `{app="nginx", message_message="foo"}`, + }, } { tt := tt t.Run(tt.expr, func(t *testing.T) {