Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/pkg/logql/sharding_test.go

110 lines
2.9 KiB

Feature/querysharding ii (#1927) * [wip] sharding evaluator/ast * [wip] continues experimenting with ast mapping * refactoring in preparation for binops * evaluators can pass state to other evaluators * compiler alignment * Evaluator method renamed to StepEvaluator * chained evaluator impl * tidying up sharding code * handling for ConcatSampleExpr * downstream iterator * structure for downstreaming asts * outlines sharding optimizations * work on sharding mapper * ast sharding optimizations * test for different logrange positions * shard mapper tests * stronger ast sharding & tests * shardmapper tests for string->string * removes sharding evaluator code * removes unused ctx arg * Revert "removes sharding evaluator code" This reverts commit 55d41b9519da9496e9471f13a5048d903ea04aaa. * interfaces for downstreaming, type conversions * sharding plumbing on frontend * type alignment in queryrange to downstream sharded queriers * downstreaming support for sharding incl storage code * removes chainedevaluator * comment alignment * storage shard injection * speccing out testware for sharding equivalence * [wip] shared engine refactor * sorting streams, sharding eval fixes * downstream evaluator embeds defaultevaluator * other pkgs adopt logql changes * metrics & logs use same middleware instantiation process * wires up shardingware * middleware per metrics/logfilter * empty step populating StepEvaluator promql.Matrix adapter * sharding metrics * log/span injection into sharded engine * sharding metrics avoids multiple instantiation * downstreamhandler tracing * sharding parameterized libsonnet * removes querier replicas * default 32 concurrency for workers * jsonnet correct level override * unquote true in yaml * lowercase error + downstreamEvaluator defaults to embedded defaultEvaluator * makes shardRecorder private * logs query on failed parse * refactors engine to be multi-use, minimizes logger injection, generalizes Query methods, removes Engine interface * basic tests for querysharding mware * [wip] concurrent evaluator * integrates stat propagation into sharding evaluator * splitby histogram * extends le bounds for bytes processed * byte throughput histogram buckets to 40gb * chunk duration mixin * fixes merge w/ field rename * derives logger in sharded engine via ctx & logs some downstream evaluators * moves sharded engine to top, adds comments * logs failed merge results in stats ctx * snapshotting stats merge logic is done more effectively * per query concurrency controlled via downstreamer * unexports decodereq * queryrange testware * downstreamer tests * pr requests
6 years ago
package logql
import (
"context"
"math"
"testing"
"time"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/logproto"
)
var nilMetrics = NewShardingMetrics(nil)
func TestMappingEquivalence(t *testing.T) {
var (
shards = 3
nStreams = 60
rounds = 20
streams = randomStreams(nStreams, rounds, shards, []string{"a", "b", "c", "d"})
start = time.Unix(0, 0)
end = time.Unix(0, int64(time.Second*time.Duration(rounds)))
step = time.Second
interval = time.Duration(0)
limit = 100
)
for _, tc := range []struct {
query string
approximate bool
}{
{`1`, false},
{`1 + 1`, false},
{`{a="1"}`, false},
{`{a="1"} |= "number: 10"`, false},
{`rate({a=~".*"}[1s])`, false},
{`sum by (a) (rate({a=~".*"}[1s]))`, false},
{`max without (a) (rate({a=~".*"}[1s]))`, false},
{`count(rate({a=~".*"}[1s]))`, false},
{`avg(rate({a=~".*"}[1s]))`, true},
{`1 + sum by (cluster) (rate({a=~".*"}[1s]))`, false},
{`sum(max(rate({a=~".*"}[1s])))`, false},
{`max(count(rate({a=~".*"}[1s])))`, false},
{`max(sum by (cluster) (rate({a=~".*"}[1s]))) / count(rate({a=~".*"}[1s]))`, false},
// topk prefers already-seen values in tiebreakers. Since the test data generates
// the same log lines for each series & the resulting promql.Vectors aren't deterministically
// sorted by labels, we don't expect this to pass.
// We could sort them as stated, but it doesn't seem worth the performance hit.
// {`topk(3, rate({a=~".*"}[1s]))`, false},
} {
q := NewMockQuerier(
shards,
streams,
)
opts := EngineOpts{}
regular := NewEngine(opts, q)
sharded := NewShardedEngine(opts, MockDownstreamer{regular}, nilMetrics)
t.Run(tc.query, func(t *testing.T) {
params := NewLiteralParams(
tc.query,
start,
end,
step,
interval,
logproto.FORWARD,
uint32(limit),
nil,
)
qry := regular.Query(params)
shardedQry := sharded.Query(params, shards)
res, err := qry.Exec(context.Background())
require.Nil(t, err)
shardedRes, err := shardedQry.Exec(context.Background())
require.Nil(t, err)
if tc.approximate {
approximatelyEquals(t, res.Data.(promql.Matrix), shardedRes.Data.(promql.Matrix))
} else {
require.Equal(t, res.Data, shardedRes.Data)
}
})
}
}
// approximatelyEquals ensures two responses are approximately equal, up to 6 decimals precision per sample
func approximatelyEquals(t *testing.T, as, bs promql.Matrix) {
require.Equal(t, len(as), len(bs))
for i := 0; i < len(as); i++ {
a := as[i]
b := bs[i]
require.Equal(t, a.Metric, b.Metric)
require.Equal(t, len(a.Points), len(b.Points))
for j := 0; j < len(a.Points); j++ {
aSample := &a.Points[j]
aSample.V = math.Round(aSample.V*1e6) / 1e6
bSample := &b.Points[j]
bSample.V = math.Round(bSample.V*1e6) / 1e6
}
require.Equal(t, a, b)
}
}