Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/querier/queryrange/queryrangebase/promql_test.go

689 lines
19 KiB

package queryrangebase
import (
"context"
"fmt"
"math"
"sort"
"strings"
"testing"
"time"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/querier/astmapper"
)
var (
start = time.Unix(1000, 0)
end = start.Add(3 * time.Minute)
step = 30 * time.Second
ctx = context.Background()
engine = promql.NewEngine(promql.EngineOpts{
Reg: prometheus.DefaultRegisterer,
Logger: log.NewNopLogger(),
Timeout: 1 * time.Hour,
MaxSamples: 10e6,
ActiveQueryTracker: nil,
})
)
// This test allows to verify which PromQL expressions can be parallelized.
func Test_PromQL(t *testing.T) {
t.Parallel()
var tests = []struct {
normalQuery string
shardQuery string
shouldEqual bool
}{
// Vector can be parallelized but we need to remove the cortex shard label.
// It should be noted that the __cortex_shard__ label is required by the engine
// and therefore should be returned by the storage.
// Range vectors `bar1{baz="blip"}[1m]` are not tested here because it is not supported
// by range queries.
{
`bar1{baz="blip"}`,
`label_replace(
bar1{__cortex_shard__="0_of_3",baz="blip"} or
bar1{__cortex_shard__="1_of_3",baz="blip"} or
bar1{__cortex_shard__="2_of_3",baz="blip"},
"__cortex_shard__","","",""
)`,
true,
},
// __cortex_shard__ label is required otherwise the or will keep only the first series.
{
`sum(bar1{baz="blip"})`,
`sum(
sum (bar1{__cortex_shard__="0_of_3",baz="blip"}) or
sum (bar1{__cortex_shard__="1_of_3",baz="blip"}) or
sum (bar1{__cortex_shard__="2_of_3",baz="blip"})
)`,
false,
},
{
`sum(bar1{baz="blip"})`,
`sum(
sum without(__cortex_shard__) (bar1{__cortex_shard__="0_of_3",baz="blip"}) or
sum without(__cortex_shard__) (bar1{__cortex_shard__="1_of_3",baz="blip"}) or
sum without(__cortex_shard__) (bar1{__cortex_shard__="2_of_3",baz="blip"})
)`,
true,
},
{
`sum by (foo) (bar1{baz="blip"})`,
`sum by (foo) (
sum by(foo,__cortex_shard__) (bar1{__cortex_shard__="0_of_3",baz="blip"}) or
sum by(foo,__cortex_shard__) (bar1{__cortex_shard__="1_of_3",baz="blip"}) or
sum by(foo,__cortex_shard__) (bar1{__cortex_shard__="2_of_3",baz="blip"})
)`,
true,
},
{
`sum by (foo,bar) (bar1{baz="blip"})`,
`sum by (foo,bar)(
sum by(foo,bar,__cortex_shard__) (bar1{__cortex_shard__="0_of_3",baz="blip"}) or
sum by(foo,bar,__cortex_shard__) (bar1{__cortex_shard__="1_of_3",baz="blip"}) or
sum by(foo,bar,__cortex_shard__) (bar1{__cortex_shard__="2_of_3",baz="blip"})
)`,
true,
},
// since series are unique to a shard, it's safe to sum without shard first, then reaggregate
{
`sum without (foo,bar) (bar1{baz="blip"})`,
`sum without (foo,bar)(
sum without(__cortex_shard__) (bar1{__cortex_shard__="0_of_3",baz="blip"}) or
sum without(__cortex_shard__) (bar1{__cortex_shard__="1_of_3",baz="blip"}) or
sum without(__cortex_shard__) (bar1{__cortex_shard__="2_of_3",baz="blip"})
)`,
true,
},
{
`min by (foo,bar) (bar1{baz="blip"})`,
`min by (foo,bar)(
min by(foo,bar,__cortex_shard__) (bar1{__cortex_shard__="0_of_3",baz="blip"}) or
min by(foo,bar,__cortex_shard__) (bar1{__cortex_shard__="1_of_3",baz="blip"}) or
min by(foo,bar,__cortex_shard__) (bar1{__cortex_shard__="2_of_3",baz="blip"})
)`,
true,
},
{
`max by (foo,bar) (bar1{baz="blip"})`,
` max by (foo,bar)(
max by(foo,bar,__cortex_shard__) (bar1{__cortex_shard__="0_of_3",baz="blip"}) or
max by(foo,bar,__cortex_shard__) (bar1{__cortex_shard__="1_of_3",baz="blip"}) or
max by(foo,bar,__cortex_shard__) (bar1{__cortex_shard__="2_of_3",baz="blip"})
)`,
true,
},
// avg generally cant be parallelized
{
`avg(bar1{baz="blip"})`,
`avg(
avg by(__cortex_shard__) (bar1{__cortex_shard__="0_of_3",baz="blip"}) or
avg by(__cortex_shard__) (bar1{__cortex_shard__="1_of_3",baz="blip"}) or
avg by(__cortex_shard__) (bar1{__cortex_shard__="2_of_3",baz="blip"})
)`,
false,
},
// stddev can't be parallelized.
{
`stddev(bar1{baz="blip"})`,
` stddev(
stddev by(__cortex_shard__) (bar1{__cortex_shard__="0_of_3",baz="blip"}) or
stddev by(__cortex_shard__) (bar1{__cortex_shard__="1_of_3",baz="blip"}) or
stddev by(__cortex_shard__) (bar1{__cortex_shard__="2_of_3",baz="blip"})
)`,
false,
},
// stdvar can't be parallelized.
{
`stdvar(bar1{baz="blip"})`,
`stdvar(
stdvar by(__cortex_shard__) (bar1{__cortex_shard__="0_of_3",baz="blip"}) or
stdvar by(__cortex_shard__) (bar1{__cortex_shard__="1_of_3",baz="blip"}) or
stdvar by(__cortex_shard__) (bar1{__cortex_shard__="2_of_3",baz="blip"})
)`,
false,
},
{
`count(bar1{baz="blip"})`,
`count(
count without (__cortex_shard__) (bar1{__cortex_shard__="0_of_3",baz="blip"}) or
count without (__cortex_shard__) (bar1{__cortex_shard__="1_of_3",baz="blip"}) or
count without (__cortex_shard__) (bar1{__cortex_shard__="2_of_3",baz="blip"})
)`,
true,
},
{
`count by (foo,bar) (bar1{baz="blip"})`,
`count by (foo,bar) (
count by (foo,bar,__cortex_shard__) (bar1{__cortex_shard__="0_of_3",baz="blip"}) or
count by (foo,bar,__cortex_shard__) (bar1{__cortex_shard__="1_of_3",baz="blip"}) or
count by (foo,bar,__cortex_shard__) (bar1{__cortex_shard__="2_of_3",baz="blip"})
)`,
true,
},
// different ways to represent count without.
{
`count without (foo) (bar1{baz="blip"})`,
`count without (foo) (
count without (__cortex_shard__) (bar1{__cortex_shard__="0_of_3",baz="blip"}) or
count without (__cortex_shard__) (bar1{__cortex_shard__="1_of_3",baz="blip"}) or
count without (__cortex_shard__) (bar1{__cortex_shard__="2_of_3",baz="blip"})
)`,
true,
},
{
`count without (foo) (bar1{baz="blip"})`,
`sum without (__cortex_shard__) (
count without (foo) (bar1{__cortex_shard__="0_of_3",baz="blip"}) or
count without (foo) (bar1{__cortex_shard__="1_of_3",baz="blip"}) or
count without (foo) (bar1{__cortex_shard__="2_of_3",baz="blip"})
)`,
true,
},
{
`count without (foo, bar) (bar1{baz="blip"})`,
`count without (foo, bar) (
count without (__cortex_shard__) (bar1{__cortex_shard__="0_of_3",baz="blip"}) or
count without (__cortex_shard__) (bar1{__cortex_shard__="1_of_3",baz="blip"}) or
count without (__cortex_shard__) (bar1{__cortex_shard__="2_of_3",baz="blip"})
)`,
true,
},
{
`topk(2,bar1{baz="blip"})`,
`label_replace(
topk(2,
topk(2,(bar1{__cortex_shard__="0_of_3",baz="blip"})) without(__cortex_shard__) or
topk(2,(bar1{__cortex_shard__="1_of_3",baz="blip"})) without(__cortex_shard__) or
topk(2,(bar1{__cortex_shard__="2_of_3",baz="blip"})) without(__cortex_shard__)
),
"__cortex_shard__","","","")`,
true,
},
{
`bottomk(2,bar1{baz="blip"})`,
`label_replace(
bottomk(2,
bottomk(2,(bar1{__cortex_shard__="0_of_3",baz="blip"})) without(__cortex_shard__) or
bottomk(2,(bar1{__cortex_shard__="1_of_3",baz="blip"})) without(__cortex_shard__) or
bottomk(2,(bar1{__cortex_shard__="2_of_3",baz="blip"})) without(__cortex_shard__)
),
"__cortex_shard__","","","")`,
true,
},
{
`sum by (foo,bar) (avg_over_time(bar1{baz="blip"}[1m]))`,
`sum by (foo,bar)(
sum by(foo,bar,__cortex_shard__) (avg_over_time(bar1{__cortex_shard__="0_of_3",baz="blip"}[1m])) or
sum by(foo,bar,__cortex_shard__) (avg_over_time(bar1{__cortex_shard__="1_of_3",baz="blip"}[1m])) or
sum by(foo,bar,__cortex_shard__) (avg_over_time(bar1{__cortex_shard__="2_of_3",baz="blip"}[1m]))
)`,
true,
},
{
`sum by (foo,bar) (min_over_time(bar1{baz="blip"}[1m]))`,
`sum by (foo,bar)(
sum by(foo,bar,__cortex_shard__) (min_over_time(bar1{__cortex_shard__="0_of_3",baz="blip"}[1m])) or
sum by(foo,bar,__cortex_shard__) (min_over_time(bar1{__cortex_shard__="1_of_3",baz="blip"}[1m])) or
sum by(foo,bar,__cortex_shard__) (min_over_time(bar1{__cortex_shard__="2_of_3",baz="blip"}[1m]))
)`,
true,
},
{
// Sub aggregations must avoid non-associative series merging across shards
`sum(
count(
bar1
) by (foo,bazz)
)`,
`
sum without(__cortex_shard__) (
sum by(__cortex_shard__) (
count by(foo, bazz) (foo{__cortex_shard__="0_of_2",bar="baz"})
) or
sum by(__cortex_shard__) (
count by(foo, bazz) (foo{__cortex_shard__="1_of_2",bar="baz"})
)
)
`,
false,
},
{
// Note: this is a speculative optimization that we don't currently include due to mapping complexity.
// Certain sub aggregations may inject __cortex_shard__ for all (by) subgroupings.
// This is the same as the previous test with the exception that the shard label is injected to the count grouping
`sum(
count(
bar1
) by (foo,bazz)
)`,
`
sum without(__cortex_shard__) (
sum by(__cortex_shard__) (
count by(foo, bazz, __cortex_shard__) (foo{__cortex_shard__="0_of_2",bar="baz"})
) or
sum by(__cortex_shard__) (
count by(foo, bazz, __cortex_shard__) (foo{__cortex_shard__="1_of_2",bar="baz"})
)
)
`,
true,
},
{
// Note: this is a speculative optimization that we don't currently include due to mapping complexity
// This example details multiple layers of aggregations.
// Sub aggregations must inject __cortex_shard__ for all (by) subgroupings.
`sum(
count(
count(
bar1
) by (foo,bazz)
) by (bazz)
)`,
`
sum without(__cortex_shard__) (
sum by(__cortex_shard__) (
count by(bazz, __cortex_shard__) (
count by(foo, bazz, __cortex_shard__) (
foo{__cortex_shard__="0_of_2", bar="baz"}
)
)
) or
sum by(__cortex_shard__) (
count by(bazz, __cortex_shard__) (
count by(foo, bazz, __cortex_shard__) (
foo{__cortex_shard__="1_of_2", bar="baz"}
)
)
)
)
`,
true,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.normalQuery, func(t *testing.T) {
baseQuery, err := engine.NewRangeQuery(shardAwareQueryable, nil, tt.normalQuery, start, end, step)
require.Nil(t, err)
shardQuery, err := engine.NewRangeQuery(shardAwareQueryable, nil, tt.shardQuery, start, end, step)
require.Nil(t, err)
baseResult := baseQuery.Exec(ctx)
shardResult := shardQuery.Exec(ctx)
t.Logf("base: %v\n", baseResult)
t.Logf("shard: %v\n", shardResult)
if tt.shouldEqual {
require.Equal(t, baseResult, shardResult)
return
}
require.NotEqual(t, baseResult, shardResult)
})
}
}
func Test_FunctionParallelism(t *testing.T) {
tpl := `sum(<fn>(bar1{}<fArgs>))`
shardTpl := `sum(
sum without(__cortex_shard__) (<fn>(bar1{__cortex_shard__="0_of_3"}<fArgs>)) or
sum without(__cortex_shard__) (<fn>(bar1{__cortex_shard__="1_of_3"}<fArgs>)) or
sum without(__cortex_shard__) (<fn>(bar1{__cortex_shard__="2_of_3"}<fArgs>))
)`
mkQuery := func(tpl, fn string, testMatrix bool, fArgs []string) (result string) {
result = strings.Replace(tpl, "<fn>", fn, -1)
if testMatrix {
// turn selectors into ranges
result = strings.Replace(result, "}<fArgs>", "}[1m]<fArgs>", -1)
}
if len(fArgs) > 0 {
args := "," + strings.Join(fArgs, ",")
result = strings.Replace(result, "<fArgs>", args, -1)
} else {
result = strings.Replace(result, "<fArgs>", "", -1)
}
return result
}
for _, tc := range []struct {
fn string
fArgs []string
isTestMatrix bool
approximate bool
}{
{
fn: "abs",
},
{
fn: "avg_over_time",
isTestMatrix: true,
approximate: true,
},
{
fn: "ceil",
},
{
fn: "changes",
isTestMatrix: true,
},
{
fn: "count_over_time",
isTestMatrix: true,
},
{
fn: "days_in_month",
},
{
fn: "day_of_month",
},
{
fn: "day_of_week",
},
{
fn: "delta",
isTestMatrix: true,
approximate: true,
},
{
fn: "deriv",
isTestMatrix: true,
approximate: true,
},
{
fn: "exp",
approximate: true,
},
{
fn: "floor",
},
{
fn: "hour",
},
{
fn: "idelta",
isTestMatrix: true,
approximate: true,
},
{
fn: "increase",
isTestMatrix: true,
approximate: true,
},
{
fn: "irate",
isTestMatrix: true,
approximate: true,
},
{
fn: "ln",
approximate: true,
},
{
fn: "log10",
approximate: true,
},
{
fn: "log2",
approximate: true,
},
{
fn: "max_over_time",
isTestMatrix: true,
},
{
fn: "min_over_time",
isTestMatrix: true,
},
{
fn: "minute",
},
{
fn: "month",
},
{
fn: "rate",
isTestMatrix: true,
approximate: true,
},
{
fn: "resets",
isTestMatrix: true,
},
{
fn: "sort",
},
{
fn: "sort_desc",
},
{
fn: "sqrt",
approximate: true,
},
{
fn: "stddev_over_time",
isTestMatrix: true,
approximate: true,
},
{
fn: "stdvar_over_time",
isTestMatrix: true,
approximate: true,
},
{
fn: "sum_over_time",
isTestMatrix: true,
},
{
fn: "timestamp",
},
{
fn: "year",
},
{
fn: "clamp_max",
fArgs: []string{"5"},
},
{
fn: "clamp_min",
fArgs: []string{"5"},
},
{
fn: "predict_linear",
isTestMatrix: true,
approximate: true,
fArgs: []string{"1"},
},
{
fn: "round",
fArgs: []string{"20"},
},
{
fn: "holt_winters",
isTestMatrix: true,
fArgs: []string{"0.5", "0.7"},
approximate: true,
},
} {
t.Run(tc.fn, func(t *testing.T) {
baseQuery, err := engine.NewRangeQuery(
shardAwareQueryable,
nil,
mkQuery(tpl, tc.fn, tc.isTestMatrix, tc.fArgs),
start,
end,
step,
)
require.Nil(t, err)
shardQuery, err := engine.NewRangeQuery(
shardAwareQueryable,
nil,
mkQuery(shardTpl, tc.fn, tc.isTestMatrix, tc.fArgs),
start,
end,
step,
)
require.Nil(t, err)
baseResult := baseQuery.Exec(ctx)
shardResult := shardQuery.Exec(ctx)
t.Logf("base: %+v\n", baseResult)
t.Logf("shard: %+v\n", shardResult)
if !tc.approximate {
require.Equal(t, baseResult, shardResult)
} else {
// Some functions yield tiny differences when sharded due to combining floating point calculations.
baseSeries := baseResult.Value.(promql.Matrix)[0]
shardSeries := shardResult.Value.(promql.Matrix)[0]
require.Equal(t, len(baseSeries.Points), len(shardSeries.Points))
for i, basePt := range baseSeries.Points {
shardPt := shardSeries.Points[i]
require.Equal(t, basePt.T, shardPt.T)
require.Equal(
t,
math.Round(basePt.V*1e6)/1e6,
math.Round(shardPt.V*1e6)/1e6,
)
}
}
})
}
}
var shardAwareQueryable = storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return &testMatrix{
series: []*promql.StorageSeries{
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blop"}, {Name: "foo", Value: "barr"}}, factor(5)),
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blop"}, {Name: "foo", Value: "bazz"}}, factor(7)),
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blap"}, {Name: "foo", Value: "buzz"}}, factor(12)),
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blap"}, {Name: "foo", Value: "bozz"}}, factor(11)),
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blop"}, {Name: "foo", Value: "buzz"}}, factor(8)),
newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blap"}, {Name: "foo", Value: "bazz"}}, identity),
},
}, nil
})
type testMatrix struct {
series []*promql.StorageSeries
}
func (m *testMatrix) Copy() *testMatrix {
cpy := *m
return &cpy
}
func (m testMatrix) Next() bool { return len(m.series) != 0 }
func (m *testMatrix) At() storage.Series {
res := m.series[0]
m.series = m.series[1:]
return res
}
func (m *testMatrix) Err() error { return nil }
func (m *testMatrix) Warnings() storage.Warnings { return nil }
func (m *testMatrix) Select(_ bool, selectParams *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
s, _, err := astmapper.ShardFromMatchers(matchers)
if err != nil {
return storage.ErrSeriesSet(err)
}
if s != nil {
return splitByShard(s.Shard, s.Of, m)
}
return m.Copy()
}
func (m *testMatrix) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
return nil, nil, nil
}
func (m *testMatrix) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
return nil, nil, nil
}
func (m *testMatrix) Close() error { return nil }
func newSeries(metric labels.Labels, generator func(float64) float64) *promql.StorageSeries {
sort.Sort(metric)
var points []promql.Point
for ts := start.Add(-step); ts.Unix() <= end.Unix(); ts = ts.Add(step) {
t := ts.Unix() * 1e3
points = append(points, promql.Point{
T: t,
V: generator(float64(t)),
})
}
return promql.NewStorageSeries(promql.Series{
Metric: metric,
Points: points,
})
}
func identity(t float64) float64 {
return t
}
func factor(f float64) func(float64) float64 {
i := 0.
return func(float64) float64 {
i++
res := i * f
return res
}
}
// var identity(t int64) float64 {
// return float64(t)
// }
// splitByShard returns the shard subset of a testMatrix.
// e.g if a testMatrix has 6 series, and we want 3 shard, then each shard will contain
// 2 series.
func splitByShard(shardIndex, shardTotal int, testMatrices *testMatrix) *testMatrix {
res := &testMatrix{}
var it chunkenc.Iterator
for i, s := range testMatrices.series {
if i%shardTotal != shardIndex {
continue
}
var points []promql.Point
it = s.Iterator(it)
for it.Next() != chunkenc.ValNone {
t, v := it.At()
points = append(points, promql.Point{
T: t,
V: v,
})
}
lbs := s.Labels().Copy()
lbs = append(lbs, labels.Label{Name: "__cortex_shard__", Value: fmt.Sprintf("%d_of_%d", shardIndex, shardTotal)})
sort.Sort(lbs)
res.series = append(res.series, promql.NewStorageSeries(promql.Series{
Metric: lbs,
Points: points,
}))
}
return res
}