Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/logql/syntax/extractor_test.go

428 lines
13 KiB

package syntax
import (
"testing"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/util/constants"
)
func Test_Extractor(t *testing.T) {
t.Parallel()
for _, tc := range []string{
`rate( ( {job="mysql"} |="error" !="timeout" ) [10s] )`,
`absent_over_time( ( {job="mysql"} |="error" !="timeout" ) [10s] )`,
`absent_over_time( ( {job="mysql"} |="error" !="timeout" ) [10s] offset 30s )`,
`sum without(a) ( rate ( ( {job="mysql"} |="error" !="timeout" ) [10s] ) )`,
`sum by(a) (rate( ( {job="mysql"} |="error" !="timeout" ) [10s] ) )`,
`sum(count_over_time({job="mysql"}[5m]))`,
`sum(count_over_time({job="mysql"} | json [5m]))`,
`sum(count_over_time({job="mysql"} | logfmt [5m]))`,
`sum(count_over_time({job="mysql"} | pattern "<foo> bar <buzz>" [5m]))`,
`sum(count_over_time({job="mysql"} | regexp "(?P<foo>foo|bar)" [5m]))`,
`sum(count_over_time({job="mysql"} | regexp "(?P<foo>foo|bar)" [5m] offset 1h))`,
`topk(10,sum(rate({region="us-east1"}[5m])) by (name))`,
`topk by (name)(10,sum(rate({region="us-east1"}[5m])))`,
`avg( rate( ( {job="nginx"} |= "GET" ) [10s] ) ) by (region)`,
`avg(min_over_time({job="nginx"} |= "GET" | unwrap foo[10s])) by (region)`,
`sum by (cluster) (count_over_time({job="mysql"}[5m]))`,
`sum by (cluster) (count_over_time({job="mysql"}[5m])) / sum by (cluster) (count_over_time({job="postgres"}[5m])) `,
`
sum by (cluster) (count_over_time({job="postgres"}[5m])) /
sum by (cluster) (count_over_time({job="postgres"}[5m])) /
sum by (cluster) (count_over_time({job="postgres"}[5m]))
`,
`sum by (cluster) (count_over_time({job="mysql"}[5m])) / min(count_over_time({job="mysql"}[5m])) `,
`sum by (job) (
count_over_time({namespace="tns"} |= "level=error"[5m])
/
count_over_time({namespace="tns"}[5m])
)`,
`stdvar_over_time({app="foo"} |= "bar" | json | latency >= 250ms or ( status_code < 500 and status_code > 200)
| line_format "blip{{ .foo }}blop {{.status_code}}" | label_format foo=bar,status_code="buzz{{.bar}}" | unwrap foo [5m])`,
`sum_over_time({namespace="tns"} |= "level=error" | json |foo>=5,bar<25ms|unwrap latency [5m])`,
`sum by (job) (
sum_over_time({namespace="tns"} |= "level=error" | json | foo=5 and bar<25ms | unwrap latency[5m])
/
count_over_time({namespace="tns"} | logfmt | label_format foo=bar[5m])
)`,
`sum by (job) (
sum_over_time({namespace="tns"} |= "level=error" | json | foo=5 and bar<25ms | unwrap bytes(latency)[5m])
/
count_over_time({namespace="tns"} | logfmt | label_format foo=bar[5m])
)`,
`sum by (job) (
sum_over_time(
{namespace="tns"} |= "level=error" | json | avg=5 and bar<25ms | unwrap duration(latency) [5m]
)
/
count_over_time({namespace="tns"} | logfmt | label_format foo=bar[5m])
)`,
`sum_over_time({namespace="tns"} |= "level=error" | json |foo>=5,bar<25ms | unwrap latency | __error__!~".*" | foo >5[5m])`,
`absent_over_time({namespace="tns"} |= "level=error" | json |foo>=5,bar<25ms | unwrap latency | __error__!~".*" | foo >5[5m])`,
`absent_over_time({namespace="tns"} |= "level=error" | json [5m])`,
`sum by (job) (
sum_over_time(
{namespace="tns"} |= "level=error" | json | avg=5 and bar<25ms | unwrap duration(latency) | __error__!~".*" [5m]
)
/
count_over_time({namespace="tns"} | logfmt | label_format foo=bar[5m])
)`,
`label_replace(
sum by (job) (
sum_over_time(
{namespace="tns"} |= "level=error" | json | avg=5 and bar<25ms | unwrap duration(latency) | __error__!~".*" [5m]
)
/
count_over_time({namespace="tns"} | logfmt | label_format foo=bar[5m])
),
"foo",
"$1",
"service",
"(.*):.*"
)
`,
`label_replace(
sum by (job) (
sum_over_time(
{namespace="tns"} |= "level=error" | json | avg=5 and bar<25ms | unwrap duration(latency) | __error__!~".*" [5m] offset 1h
)
/
count_over_time({namespace="tns"} | logfmt | label_format foo=bar[5m] offset 1h)
),
"foo",
"$1",
"service",
"(.*):.*"
)
`,
} {
t.Run(tc, func(t *testing.T) {
expr, err := ParseSampleExpr(tc)
require.Nil(t, err)
extractors, err := expr.Extractors()
require.Nil(t, err)
require.Len(t, extractors, 1)
})
}
}
func Test_MultiVariantExpr_Extractors(t *testing.T) {
t.Parallel()
type sample struct {
value int
labels labels.Labels
}
t.Run("single variant is equivalent to non-variant", func(t *testing.T) {
for _, tc := range []struct {
query string
variantQuery string
testLine string
}{
{
query: `count_over_time({app="foo"} | json [5m])`,
variantQuery: `variants(count_over_time({app="foo"} | json [5m])) of ({app="foo"} | json [5m])`,
testLine: `{"error": true, "method": "GET", "size": "1024", "latency": "250s"}`,
},
{
query: `sum by (method) (count_over_time({app="foo"} | json [5m]))`,
variantQuery: `variants(sum by (method) (count_over_time({app="foo"} | json [5m]))) of ({app="foo"} | json [5m])`,
testLine: `{"error": true, "method": "GET", "size": "1024", "latency": "250s"}`,
},
{
query: `sum by (method) (sum_over_time({app="foo"} | logfmt | unwrap duration(latency) [5m]))`,
variantQuery: `variants(sum by (method) (sum_over_time({app="foo"} | logfmt | unwrap duration(latency) [5m]))) of ({app="foo"} | json [5m])`,
testLine: `error=true method="GET" size=1024 latency="250s"`,
},
} {
t.Run(tc.query, func(t *testing.T) {
now := time.Now()
expr, err := ParseSampleExpr(tc.query)
require.NoError(t, err)
extractors, err := expr.Extractors()
require.NoError(t, err)
require.Len(t, extractors, 1, "should return a single consolidated extractor")
// Test that the extractor actually works with a mock stream
lbls := labels.FromStrings("app", "foo")
streamExtractor := extractors[0].ForStream(lbls)
require.NotNil(t, streamExtractor, "stream extractor should not be nil")
samples, ok := streamExtractor.Process(now.UnixNano(), []byte(tc.testLine), labels.EmptyLabels())
require.True(t, ok)
seen := make(map[string]float64, len(samples))
for _, s := range samples {
lbls := s.Labels.Labels()
seen[lbls.String()] = s.Value
}
mvExpr, err := ParseSampleExpr(tc.variantQuery)
require.NoError(t, err)
extractors, err = mvExpr.Extractors()
require.NoError(t, err)
require.Len(t, extractors, 1, "should return a single consolidated extractor")
streamExtractor = extractors[0].ForStream(lbls)
require.NotNil(t, streamExtractor, "multi-variant stream extractor should not be nil")
mvSamples, ok := streamExtractor.Process(now.UnixNano(), []byte(tc.testLine), labels.EmptyLabels())
require.True(t, ok)
// remove variant label
mvSeen := make(map[string]float64, len(mvSamples))
for _, s := range mvSamples {
lbls := s.Labels.Labels()
newLbls := labels.NewScratchBuilder(lbls.Len())
lbls.Range(func(lbl labels.Label) {
if lbl.Name == "__variant__" {
return
}
newLbls.Add(lbl.Name, lbl.Value)
})
mvSeen[newLbls.Labels().String()] = s.Value
}
require.Equal(t, seen, mvSeen)
})
}
})
t.Run("multiple extractors", func(t *testing.T) {
for _, tc := range []struct {
name string
query string
testLine string
expected []sample
}{
{
name: "two logfmt variants with common filter and parser",
query: `variants(
count_over_time({app="foo"} |= "error" | logfmt [5m]),
bytes_over_time({app="foo"} |= "error" | logfmt [5m])
) of ({app="foo"} |= "error" | logfmt [5m])`,
testLine: "error=true method=GET status=500 size=1024 latency=250ms",
expected: []sample{
{
1,
labels.FromStrings(
constants.VariantLabel, "0",
"app", "foo",
"error", "true",
"latency", "250ms",
"method", "GET",
"size", "1024",
"status", "500",
),
},
{
len("error=true method=GET status=500 size=1024 latency=250ms"),
labels.FromStrings(
constants.VariantLabel, "1",
"app", "foo",
"error", "true",
"latency", "250ms",
"method", "GET",
"size", "1024",
"status", "500",
),
},
},
},
{
name: "two variants with different unwraps",
query: `variants(
sum_over_time({app="foo"} |= "error" | json | unwrap status [5m]),
sum_over_time({app="foo"} |= "error" | json | unwrap duration(latency) [5m])
) of ({app="foo"} |= "error" | json [5m])`,
testLine: `{"error": true, "method": "GET", "status": 500, "latency": "250s"}`,
expected: []sample{
{
500,
labels.FromStrings(
constants.VariantLabel, "0",
"app", "foo",
"error", "true",
"latency", "250s",
"method", "GET",
),
},
{
250,
labels.FromStrings(
constants.VariantLabel, "1",
"app", "foo",
"error", "true",
"method", "GET",
"status", "500",
),
},
},
},
{
name: "three json variants with different label extractors",
query: `variants(
sum_over_time({app="foo"} |= "error" | json | unwrap bytes(size) [5m]),
sum_over_time({app="foo"} |= "error" | json | unwrap duration(latency) [5m]),
count_over_time({app="foo"} |= "error" | json [5m])
) of ({app="foo"} |= "error" | json [5m])`,
testLine: `{"error": true, "method": "GET", "size": "1024", "latency": "250s"}`,
expected: []sample{
{
1024,
labels.FromStrings(
constants.VariantLabel, "0",
"app", "foo",
"error", "true",
"latency", "250s",
"method", "GET",
),
},
{
250,
labels.FromStrings(
constants.VariantLabel, "1",
"app", "foo",
"error", "true",
"method", "GET",
"size", "1024",
),
},
{
1,
labels.FromStrings(
constants.VariantLabel, "2",
"app", "foo",
"error", "true",
"latency", "250s",
"method", "GET",
"size", "1024",
),
},
},
},
{
name: "three logfmt variants with different extractors",
query: `variants(
count_over_time({app="foo"} |= "error" | logfmt [5m]),
bytes_over_time({app="foo"} |= "error" | logfmt [5m]),
sum_over_time({app="foo"} |= "error" | logfmt | unwrap size [5m])
) of ({app="foo"} |= "error" | logfmt [5m])`,
testLine: "error=true method=GET status=500 size=1024 latency=250ms",
expected: []sample{
{
1,
labels.FromStrings(
constants.VariantLabel, "0",
"app", "foo",
"error", "true",
"latency", "250ms",
"method", "GET",
"size", "1024",
"status", "500",
),
},
{
len("error=true method=GET status=500 size=1024 latency=250ms"),
labels.FromStrings(
constants.VariantLabel, "1",
"app", "foo",
"error", "true",
"latency", "250ms",
"method", "GET",
"size", "1024",
"status", "500",
),
},
{
1024,
labels.FromStrings(
constants.VariantLabel, "2",
"app", "foo",
"error", "true",
"latency", "250ms",
"method", "GET",
"status", "500",
),
},
},
},
{
name: "variants with nested expressions",
query: `variants(
sum by (status) (count_over_time({app="foo"} |= "error" | logfmt [5m])),
bytes_over_time({app="foo"} |= "error" | logfmt [5m]),
sum by (method) (sum_over_time({app="foo"} |= "error" | logfmt | unwrap size [5m]))
) of ({app="foo"} |= "error" | logfmt [5m])`,
testLine: "error=true method=GET status=500 size=1024 latency=250ms",
expected: []sample{
{
1,
labels.FromStrings(
constants.VariantLabel, "0",
"status", "500",
),
},
{
len("error=true method=GET status=500 size=1024 latency=250ms"),
labels.FromStrings(
constants.VariantLabel, "1",
"app", "foo",
"error", "true",
"latency", "250ms",
"method", "GET",
"size", "1024",
"status", "500",
),
},
{
1024,
labels.FromStrings(
constants.VariantLabel, "2",
"method", "GET",
),
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
now := time.Now()
expr, err := ParseSampleExpr(tc.query)
require.NoError(t, err)
extractors, err := expr.Extractors()
require.NoError(t, err)
require.Len(t, extractors, 1, "should return a single consolidated extractor")
// Test that the extractor actually works with a mock stream
lbls := labels.FromStrings("app", "foo")
streamExtractor := extractors[0].ForStream(lbls)
require.NotNil(t, streamExtractor, "stream extractor should not be nil")
samples, ok := streamExtractor.Process(now.UnixNano(), []byte(tc.testLine), labels.EmptyLabels())
require.True(t, ok)
expectedSamples := make(map[string]float64, len(tc.expected))
for _, s := range tc.expected {
expectedSamples[s.labels.String()] = float64(s.value)
}
seenSamples := make(map[string]float64, len(samples))
for _, s := range samples {
seenSamples[s.Labels.String()] = s.Value
}
require.Equal(t, expectedSamples, seenSamples)
})
}
})
}