Loki: Fix parser hint for extracted labels which collide with stream labels (#3380)

* add test for issue with hints and extracted labels

* one possible solution

* instead of replacing the label with the _extracted suffix removed, just add a new label with the suffix removed. This will avoid any cases where a legit json or logfmt key ends with _extracted.
pull/3388/head
Ed Welch 4 years ago committed by GitHub
parent e21d6edc10
commit 74a97899dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      pkg/logql/log/parser_hints.go
  2. 26
      pkg/logql/log/parser_hints_test.go

@ -63,6 +63,22 @@ func (p *parserHint) NoLabels() bool {
// newParserHint creates a new parser hint using the list of labels that are seen and required in a query.
func newParserHint(requiredLabelNames, groups []string, without, noLabels bool, metricLabelName string) *parserHint {
// If a parsed label collides with a stream label we add the `_extracted` suffix to it, however hints
// are used by the parsers before we know they will collide with a stream label and hence before the
// _extracted suffix is added. Therefore we must strip the _extracted suffix from any required labels
// that were parsed from somewhere in the query, say in a filter or an aggregation clause.
// Because it's possible for a valid json or logfmt key to already end with _extracted, we'll just
// leave the existing entry ending with _extracted but also add a version with the suffix removed.
extractedLabels := []string{}
for _, l := range requiredLabelNames {
if strings.HasSuffix(l, "_extracted") {
extractedLabels = append(extractedLabels, strings.TrimSuffix(l, "_extracted"))
}
}
if len(extractedLabels) > 0 {
requiredLabelNames = append(requiredLabelNames, extractedLabels...)
}
if len(groups) > 0 {
requiredLabelNames = append(requiredLabelNames, groups...)
}

@ -15,6 +15,7 @@ var (
"remote_user": "foo",
"upstream_addr": "10.0.0.1:80",
"protocol": "HTTP/2.0",
"cluster": "us-east-west",
"request": {
"time": "30.001",
"method": "POST",
@ -32,7 +33,7 @@ var (
)
func Test_ParserHints(t *testing.T) {
lbs := labels.Labels{{Name: "app", Value: "nginx"}}
lbs := labels.Labels{{Name: "app", Value: "nginx"}, {Name: "cluster", Value: "us-central-west"}}
t.Parallel()
for _, tt := range []struct {
@ -47,14 +48,14 @@ func Test_ParserHints(t *testing.T) {
jsonLine,
true,
1.0,
`{app="nginx", protocol="HTTP/2.0", remote_user="foo", request_host="foo.grafana.net", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_latency_seconds="30.001", response_status="204", upstream_addr="10.0.0.1:80"}`,
`{app="nginx", cluster="us-central-west", cluster_extracted="us-east-west", protocol="HTTP/2.0", remote_user="foo", request_host="foo.grafana.net", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_latency_seconds="30.001", response_status="204", upstream_addr="10.0.0.1:80"}`,
},
{
`sum without (request_host,app) (rate({app="nginx"} | json | __error__="" | response_status = 204 [1m]))`,
`sum without (request_host,app,cluster) (rate({app="nginx"} | json | __error__="" | response_status = 204 [1m]))`,
jsonLine,
true,
1.0,
`{protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_latency_seconds="30.001", response_status="204", upstream_addr="10.0.0.1:80"}`,
`{cluster_extracted="us-east-west", protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_latency_seconds="30.001", response_status="204", upstream_addr="10.0.0.1:80"}`,
},
{
`sum by (request_host,app) (rate({app="nginx"} | json | __error__="" | response_status = 204 [1m]))`,
@ -103,14 +104,14 @@ func Test_ParserHints(t *testing.T) {
jsonLine,
true,
30.001,
`{app="nginx", protocol="HTTP/2.0", remote_user="foo", request_host="foo.grafana.net", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`,
`{app="nginx", cluster="us-central-west", cluster_extracted="us-east-west", protocol="HTTP/2.0", remote_user="foo", request_host="foo.grafana.net", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`,
},
{
`sum without (request_host,app)(rate({app="nginx"} | json | response_status = 204 | unwrap response_latency_seconds [1m]))`,
`sum without (request_host,app,cluster)(rate({app="nginx"} | json | response_status = 204 | unwrap response_latency_seconds [1m]))`,
jsonLine,
true,
30.001,
`{protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`,
`{cluster_extracted="us-east-west", protocol="HTTP/2.0", remote_user="foo", request_method="POST", request_size="101", request_time="30.001", request_uri="/rpc/v2/stage", response_status="204", upstream_addr="10.0.0.1:80"}`,
},
{
`sum(rate({app="nginx"} | logfmt | org_id=3677 | unwrap Ingester_TotalReached[1m]))`,
@ -131,10 +132,10 @@ func Test_ParserHints(t *testing.T) {
logfmtLine,
true,
15.0,
`{Ingester_TotalBatches="0", Ingester_TotalChunksMatched="0", app="nginx", caller="spanlogger.go:79", org_id="3677", traceID="2e5c7234b8640997", ts="2021-02-02T14:35:05.983992774Z"}`,
`{Ingester_TotalBatches="0", Ingester_TotalChunksMatched="0", app="nginx", caller="spanlogger.go:79", cluster="us-central-west", org_id="3677", traceID="2e5c7234b8640997", ts="2021-02-02T14:35:05.983992774Z"}`,
},
{
`sum without (org_id,app)(rate({app="nginx"} | logfmt | org_id=3677 | unwrap Ingester_TotalReached[1m]))`,
`sum without (org_id,app,cluster)(rate({app="nginx"} | logfmt | org_id=3677 | unwrap Ingester_TotalReached[1m]))`,
logfmtLine,
true,
15.0,
@ -175,6 +176,13 @@ func Test_ParserHints(t *testing.T) {
1.0,
`{}`,
},
{
`sum by (cluster_extracted)(count_over_time({app="nginx"} | json | cluster_extracted="us-east-west" [1m]))`,
jsonLine,
true,
1.0,
`{cluster_extracted="us-east-west"}`,
},
} {
tt := tt
t.Run(tt.expr, func(t *testing.T) {

Loading…
Cancel
Save