Sharding optimizations I: AST mapping (#1846)

* [wip] sharding evaluator/ast

* [wip] continues experimenting with ast mapping

* refactoring in preparation for binops

* evaluators can pass state to other evaluators

* compiler alignment

* Evaluator method renamed to StepEvaluator

* chained evaluator impl

* tidying up sharding code

* handling for ConcatSampleExpr

* downstream iterator

* structure for downstreaming asts

* outlines sharding optimizations

* work on sharding mapper

* ast sharding optimizations

* test for different logrange positions

* shard mapper tests

* stronger ast sharding & tests

* shardmapper tests for string->string

* removes sharding evaluator code

* removes unused ctx arg

* Update pkg/logql/evaluator.go

Co-Authored-By: Cyril Tovena <cyril.tovena@gmail.com>

Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>
pull/1857/head
Owen Diehl 6 years ago committed by GitHub
parent b7e868ad5c
commit 7effeec642
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 42
      pkg/logql/ast.go
  2. 26
      pkg/logql/astmapper.go
  3. 2
      pkg/logql/engine.go
  4. 91
      pkg/logql/evaluator.go
  5. 5
      pkg/logql/evaluator_test.go
  6. 38
      pkg/logql/parser_test.go
  7. 57
      pkg/logql/sharding.go
  8. 224
      pkg/logql/shardmapper.go
  9. 944
      pkg/logql/shardmapper_test.go

@ -182,15 +182,18 @@ func addFilterToLogRangeExpr(left *logRange, ty labels.MatchType, match string)
}
const (
OpTypeSum = "sum"
OpTypeAvg = "avg"
OpTypeMax = "max"
OpTypeMin = "min"
OpTypeCount = "count"
OpTypeStddev = "stddev"
OpTypeStdvar = "stdvar"
OpTypeBottomK = "bottomk"
OpTypeTopK = "topk"
// vector ops
OpTypeSum = "sum"
OpTypeAvg = "avg"
OpTypeMax = "max"
OpTypeMin = "min"
OpTypeCount = "count"
OpTypeStddev = "stddev"
OpTypeStdvar = "stdvar"
OpTypeBottomK = "bottomk"
OpTypeTopK = "topk"
// range vector ops
OpTypeCountOverTime = "count_over_time"
OpTypeRate = "rate"
@ -217,6 +220,8 @@ func IsLogicalBinOp(op string) bool {
type SampleExpr interface {
// Selector is the LogQL selector to apply when retrieving logs.
Selector() LogSelectorExpr
// Operations returns the list of operations used in this SampleExpr
Operations() []string
Expr
}
@ -244,6 +249,11 @@ func (e *rangeAggregationExpr) String() string {
return formatOperation(e.operation, nil, e.left.String())
}
// impl SampleExpr
func (e *rangeAggregationExpr) Operations() []string {
return []string{e.operation}
}
type grouping struct {
groups []string
without bool
@ -320,6 +330,11 @@ func (e *vectorAggregationExpr) String() string {
return formatOperation(e.operation, e.grouping, params...)
}
// impl SampleExpr
func (e *vectorAggregationExpr) Operations() []string {
return append(e.left.Operations(), e.operation)
}
type binOpExpr struct {
SampleExpr
RHS SampleExpr
@ -330,6 +345,12 @@ func (e *binOpExpr) String() string {
return fmt.Sprintf("%s %s %s", e.SampleExpr.String(), e.op, e.RHS.String())
}
// impl SampleExpr
func (e *binOpExpr) Operations() []string {
ops := append(e.SampleExpr.Operations(), e.RHS.Operations()...)
return append(ops, e.op)
}
func mustNewBinOpExpr(op string, lhs, rhs Expr) SampleExpr {
left, ok := lhs.(SampleExpr)
if !ok {
@ -386,7 +407,7 @@ func mustNewBinOpExpr(op string, lhs, rhs Expr) SampleExpr {
// This is because literals need match all labels, which is currently difficult to encode into StepEvaluators.
// Therefore, we ensure a binop can be reduced/simplified, maintaining the invariant that it does not have two literal legs.
func reduceBinOp(op string, left, right *literalExpr) *literalExpr {
merged := (&defaultEvaluator{}).mergeBinOp(
merged := mergeBinOp(
op,
&promql.Sample{Point: promql.Point{V: left.value}},
&promql.Sample{Point: promql.Point{V: right.value}},
@ -423,6 +444,7 @@ func (e *literalExpr) String() string {
// to facilitate sum types. We'll be type switching when evaluating them anyways
// and they will only be present in binary operation legs.
func (e *literalExpr) Selector() LogSelectorExpr { return e }
func (e *literalExpr) Operations() []string { return nil }
func (e *literalExpr) Filter() (LineFilter, error) { return nil, nil }
func (e *literalExpr) Matchers() []*labels.Matcher { return nil }

@ -0,0 +1,26 @@
package logql
import (
"fmt"
"github.com/pkg/errors"
)
// ASTMapper is the exported interface for mapping between multiple AST representations
type ASTMapper interface {
Map(Expr) (Expr, error)
}
// CloneExpr is a helper function to clone a node.
func CloneExpr(expr Expr) (Expr, error) {
return ParseExpr(expr.String())
}
func badASTMapping(expected string, got Expr) error {
return fmt.Errorf("Bad AST mapping: expected one type (%s), but got (%T)", expected, got)
}
// MapperUnsuportedType is a helper for signaling that an evaluator does not support an Expr type
func MapperUnsupportedType(expr Expr, m ASTMapper) error {
return errors.Errorf("unexpected expr type (%T) for ASTMapper type (%T) ", expr, m)
}

@ -212,7 +212,7 @@ func (ng *engine) evalSample(ctx context.Context, expr SampleExpr, q *query) (pr
return ng.evalLiteral(ctx, lit, q)
}
stepEvaluator, err := ng.evaluator.Evaluator(ctx, expr, q)
stepEvaluator, err := ng.evaluator.StepEvaluator(ctx, ng.evaluator, expr, q)
if err != nil {
return nil, err
}

@ -69,12 +69,18 @@ func GetRangeType(q Params) QueryRangeType {
// Evaluator is an interface for iterating over data at different nodes in the AST
type Evaluator interface {
// Evaluator returns a StepEvaluator for a given SampleExpr
Evaluator(context.Context, SampleExpr, Params) (StepEvaluator, error)
// StepEvaluator returns a StepEvaluator for a given SampleExpr. It's explicitly passed another StepEvaluator// in order to enable arbitrary compuation of embedded expressions. This allows more modular & extensible
// StepEvaluator implementations which can be composed.
StepEvaluator(ctx context.Context, nextEvaluator Evaluator, expr SampleExpr, p Params) (StepEvaluator, error)
// Iterator returns the iter.EntryIterator for a given LogSelectorExpr
Iterator(context.Context, LogSelectorExpr, Params) (iter.EntryIterator, error)
}
// EvaluatorUnsupportedType is a helper for signaling that an evaluator does not support an Expr type
func EvaluatorUnsupportedType(expr Expr, ev Evaluator) error {
return errors.Errorf("unexpected expr type (%T) for Evaluator type (%T) ", expr, ev)
}
type defaultEvaluator struct {
maxLookBackPeriod time.Duration
querier Querier
@ -99,21 +105,43 @@ func (ev *defaultEvaluator) Iterator(ctx context.Context, expr LogSelectorExpr,
}
func (ev *defaultEvaluator) Evaluator(ctx context.Context, expr SampleExpr, q Params) (StepEvaluator, error) {
func (ev *defaultEvaluator) StepEvaluator(
ctx context.Context,
nextEv Evaluator,
expr SampleExpr,
q Params,
) (StepEvaluator, error) {
switch e := expr.(type) {
case *vectorAggregationExpr:
return ev.vectorAggEvaluator(ctx, e, q)
return vectorAggEvaluator(ctx, nextEv, e, q)
case *rangeAggregationExpr:
return ev.rangeAggEvaluator(ctx, e, q)
entryIter, err := ev.querier.Select(ctx, SelectParams{
&logproto.QueryRequest{
Start: q.Start().Add(-e.left.interval),
End: q.End(),
Limit: 0,
Direction: logproto.FORWARD,
Selector: expr.Selector().String(),
},
})
if err != nil {
return nil, err
}
return rangeAggEvaluator(entryIter, e, q)
case *binOpExpr:
return ev.binOpEvaluator(ctx, e, q)
return binOpStepEvaluator(ctx, nextEv, e, q)
default:
return nil, errors.Errorf("unexpected type (%T): %v", e, e)
return nil, EvaluatorUnsupportedType(e, ev)
}
}
func (ev *defaultEvaluator) vectorAggEvaluator(ctx context.Context, expr *vectorAggregationExpr, q Params) (StepEvaluator, error) {
nextEvaluator, err := ev.Evaluator(ctx, expr.left, q)
func vectorAggEvaluator(
ctx context.Context,
ev Evaluator,
expr *vectorAggregationExpr,
q Params,
) (StepEvaluator, error) {
nextEvaluator, err := ev.StepEvaluator(ctx, ev, expr.left, q)
if err != nil {
return nil, err
}
@ -302,21 +330,11 @@ func (ev *defaultEvaluator) vectorAggEvaluator(ctx context.Context, expr *vector
}, nextEvaluator.Close)
}
func (ev *defaultEvaluator) rangeAggEvaluator(ctx context.Context, expr *rangeAggregationExpr, q Params) (StepEvaluator, error) {
entryIter, err := ev.querier.Select(ctx, SelectParams{
&logproto.QueryRequest{
Start: q.Start().Add(-expr.left.interval),
End: q.End(),
Limit: 0,
Direction: logproto.FORWARD,
Selector: expr.Selector().String(),
},
})
if err != nil {
return nil, err
}
func rangeAggEvaluator(
entryIter iter.EntryIterator,
expr *rangeAggregationExpr,
q Params,
) (StepEvaluator, error) {
vecIter := newRangeVectorIterator(entryIter, expr.left.interval.Nanoseconds(), q.Step().Nanoseconds(),
q.Start().UnixNano(), q.End().UnixNano())
@ -341,8 +359,9 @@ func (ev *defaultEvaluator) rangeAggEvaluator(ctx context.Context, expr *rangeAg
// binOpExpr explicly does not handle when both legs are literals as
// it makes the type system simpler and these are reduced in mustNewBinOpExpr
func (ev *defaultEvaluator) binOpEvaluator(
func binOpStepEvaluator(
ctx context.Context,
ev Evaluator,
expr *binOpExpr,
q Params,
) (StepEvaluator, error) {
@ -352,26 +371,26 @@ func (ev *defaultEvaluator) binOpEvaluator(
// match a literal expr with all labels in the other leg
if lOk {
rhs, err := ev.Evaluator(ctx, expr.RHS, q)
rhs, err := ev.StepEvaluator(ctx, ev, expr.RHS, q)
if err != nil {
return nil, err
}
return ev.literalEvaluator(expr.op, leftLit, rhs, false)
return literalStepEvaluator(expr.op, leftLit, rhs, false)
}
if rOk {
lhs, err := ev.Evaluator(ctx, expr.SampleExpr, q)
lhs, err := ev.StepEvaluator(ctx, ev, expr.SampleExpr, q)
if err != nil {
return nil, err
}
return ev.literalEvaluator(expr.op, rightLit, lhs, true)
return literalStepEvaluator(expr.op, rightLit, lhs, true)
}
// we have two non literal legs
lhs, err := ev.Evaluator(ctx, expr.SampleExpr, q)
lhs, err := ev.StepEvaluator(ctx, ev, expr.SampleExpr, q)
if err != nil {
return nil, err
}
rhs, err := ev.Evaluator(ctx, expr.RHS, q)
rhs, err := ev.StepEvaluator(ctx, ev, expr.RHS, q)
if err != nil {
return nil, err
}
@ -409,7 +428,7 @@ func (ev *defaultEvaluator) binOpEvaluator(
for _, pair := range pairs {
// merge
if merged := ev.mergeBinOp(expr.op, pair[0], pair[1]); merged != nil {
if merged := mergeBinOp(expr.op, pair[0], pair[1]); merged != nil {
results = append(results, *merged)
}
}
@ -425,7 +444,7 @@ func (ev *defaultEvaluator) binOpEvaluator(
})
}
func (ev *defaultEvaluator) mergeBinOp(op string, left, right *promql.Sample) *promql.Sample {
func mergeBinOp(op string, left, right *promql.Sample) *promql.Sample {
var merger func(left, right *promql.Sample) *promql.Sample
switch op {
@ -554,9 +573,9 @@ func (ev *defaultEvaluator) mergeBinOp(op string, left, right *promql.Sample) *p
}
// literalEvaluator merges a literal with a StepEvaluator. Since order matters in
// literalStepEvaluator merges a literal with a StepEvaluator. Since order matters in
// non commutative operations, inverted should be true when the literalExpr is not the left argument.
func (ev *defaultEvaluator) literalEvaluator(
func literalStepEvaluator(
op string,
lit *literalExpr,
eval StepEvaluator,
@ -578,7 +597,7 @@ func (ev *defaultEvaluator) literalEvaluator(
left, right = right, left
}
if merged := ev.mergeBinOp(
if merged := mergeBinOp(
op,
left,
right,

@ -9,9 +9,8 @@ import (
)
func TestDefaultEvaluator_DivideByZero(t *testing.T) {
ev := &defaultEvaluator{}
require.Equal(t, true, math.IsNaN(ev.mergeBinOp(OpTypeDiv,
require.Equal(t, true, math.IsNaN(mergeBinOp(OpTypeDiv,
&promql.Sample{
Point: promql.Point{T: 1, V: 1},
},
@ -20,7 +19,7 @@ func TestDefaultEvaluator_DivideByZero(t *testing.T) {
},
).Point.V))
require.Equal(t, true, math.IsNaN(ev.mergeBinOp(OpTypeMod,
require.Equal(t, true, math.IsNaN(mergeBinOp(OpTypeMod,
&promql.Sample{
Point: promql.Point{T: 1, V: 1},
},

@ -20,6 +20,44 @@ func TestParse(t *testing.T) {
exp Expr
err error
}{
{
// test [12h] before filter expr
in: `count_over_time({foo="bar"}[12h] |= "error")`,
exp: &rangeAggregationExpr{
operation: "count_over_time",
left: &logRange{
left: &filterExpr{
ty: labels.MatchEqual,
match: "error",
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
},
interval: 12 * time.Hour,
},
},
},
{
// test [12h] after filter expr
in: `count_over_time({foo="bar"} |= "error" [12h])`,
exp: &rangeAggregationExpr{
operation: "count_over_time",
left: &logRange{
left: &filterExpr{
ty: labels.MatchEqual,
match: "error",
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
},
interval: 12 * time.Hour,
},
},
},
{
in: `{foo="bar"}`,
exp: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}},

@ -0,0 +1,57 @@
package logql
import (
"fmt"
"github.com/cortexproject/cortex/pkg/querier/astmapper"
)
// DownstreamSampleExpr is a SampleExpr which signals downstream computation
type DownstreamSampleExpr struct {
shard *astmapper.ShardAnnotation
SampleExpr
}
func (d DownstreamSampleExpr) String() string {
return fmt.Sprintf("downstream<%s, shard=%s>", d.SampleExpr.String(), d.shard)
}
// DownstreamLogSelectorExpr is a LogSelectorExpr which signals downstream computation
type DownstreamLogSelectorExpr struct {
shard *astmapper.ShardAnnotation
LogSelectorExpr
}
func (d DownstreamLogSelectorExpr) String() string {
return fmt.Sprintf("downstream<%s, shard=%s>", d.LogSelectorExpr.String(), d.shard)
}
// ConcatSampleExpr is an expr for concatenating multiple SampleExpr
// Contract: The embedded SampleExprs within a linked list of ConcatSampleExprs must be of the
// same structure. This makes special implementations of SampleExpr.Associative() unnecessary.
type ConcatSampleExpr struct {
SampleExpr
next *ConcatSampleExpr
}
func (c ConcatSampleExpr) String() string {
if c.next == nil {
return c.SampleExpr.String()
}
return fmt.Sprintf("%s ++ %s", c.SampleExpr.String(), c.next.String())
}
// ConcatLogSelectorExpr is an expr for concatenating multiple LogSelectorExpr
type ConcatLogSelectorExpr struct {
LogSelectorExpr
next *ConcatLogSelectorExpr
}
func (c ConcatLogSelectorExpr) String() string {
if c.next == nil {
return c.LogSelectorExpr.String()
}
return fmt.Sprintf("%s ++ %s", c.LogSelectorExpr.String(), c.next.String())
}

@ -0,0 +1,224 @@
package logql
import (
"fmt"
"github.com/cortexproject/cortex/pkg/querier/astmapper"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
)
func NewShardMapper(shards int) (ShardMapper, error) {
if shards < 2 {
return ShardMapper{}, fmt.Errorf("Cannot create ShardMapper with <2 shards. Received %d", shards)
}
return ShardMapper{shards}, nil
}
type ShardMapper struct {
shards int
}
func (m ShardMapper) Map(expr Expr) (Expr, error) {
switch e := expr.(type) {
case *literalExpr:
return e, nil
case *matchersExpr, *filterExpr:
return m.mapLogSelectorExpr(e.(LogSelectorExpr)), nil
case *vectorAggregationExpr:
return m.mapVectorAggregationExpr(e)
case *rangeAggregationExpr:
return m.mapRangeAggregationExpr(e), nil
case *binOpExpr:
lhsMapped, err := m.Map(e.SampleExpr)
if err != nil {
return nil, err
}
rhsMapped, err := m.Map(e.RHS)
if err != nil {
return nil, err
}
lhsSampleExpr, ok := lhsMapped.(SampleExpr)
if !ok {
return nil, badASTMapping("SampleExpr", lhsMapped)
}
rhsSampleExpr, ok := rhsMapped.(SampleExpr)
if !ok {
return nil, badASTMapping("SampleExpr", rhsMapped)
}
e.SampleExpr = lhsSampleExpr
e.RHS = rhsSampleExpr
return e, nil
default:
return nil, MapperUnsupportedType(expr, m)
}
}
func (m ShardMapper) mapLogSelectorExpr(expr LogSelectorExpr) LogSelectorExpr {
var head *ConcatLogSelectorExpr
for i := m.shards - 1; i >= 0; i-- {
head = &ConcatLogSelectorExpr{
LogSelectorExpr: DownstreamLogSelectorExpr{
shard: &astmapper.ShardAnnotation{
Shard: i,
Of: m.shards,
},
LogSelectorExpr: expr,
},
next: head,
}
}
return head
}
func (m ShardMapper) mapSampleExpr(expr SampleExpr) SampleExpr {
var head *ConcatSampleExpr
for i := m.shards - 1; i >= 0; i-- {
head = &ConcatSampleExpr{
SampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: i,
Of: m.shards,
},
SampleExpr: expr,
},
next: head,
}
}
return head
}
// technically, std{dev,var} are also parallelizable if there is no cross-shard merging
// in descendent nodes in the AST. This optimization is currently avoided for simplicity.
func (m ShardMapper) mapVectorAggregationExpr(expr *vectorAggregationExpr) (SampleExpr, error) {
// if this AST contains unshardable operations, don't shard this at this level,
// but attempt to shard a child node.
if shardable := isShardable(expr.Operations()); !shardable {
subMapped, err := m.Map(expr.left)
if err != nil {
return nil, err
}
sampleExpr, ok := subMapped.(SampleExpr)
if !ok {
return nil, badASTMapping("SampleExpr", subMapped)
}
return &vectorAggregationExpr{
left: sampleExpr,
grouping: expr.grouping,
params: expr.params,
operation: expr.operation,
}, nil
}
switch expr.operation {
case OpTypeSum:
// sum(x) -> sum(sum(x, shard=1) ++ sum(x, shard=2)...)
return &vectorAggregationExpr{
left: m.mapSampleExpr(expr),
grouping: expr.grouping,
params: expr.params,
operation: expr.operation,
}, nil
case OpTypeAvg:
// avg(x) -> sum(x)/count(x)
lhs, err := m.mapVectorAggregationExpr(&vectorAggregationExpr{
left: expr.left,
grouping: expr.grouping,
operation: OpTypeSum,
})
if err != nil {
return nil, err
}
rhs, err := m.mapVectorAggregationExpr(&vectorAggregationExpr{
left: expr.left,
grouping: expr.grouping,
operation: OpTypeCount,
})
if err != nil {
return nil, err
}
return &binOpExpr{
SampleExpr: lhs,
RHS: rhs,
op: OpTypeDiv,
}, nil
case OpTypeCount:
// count(x) -> sum(count(x, shard=1) ++ count(x, shard=2)...)
sharded := m.mapSampleExpr(expr)
return &vectorAggregationExpr{
left: sharded,
grouping: expr.grouping,
operation: OpTypeSum,
}, nil
default:
// this should not be reachable. If an operation is shardable it should
// have an optimization listed.
level.Warn(util.Logger).Log(
"msg", "unexpected operation which appears shardable, ignoring",
"operation", expr.operation,
)
return expr, nil
}
}
func (m ShardMapper) mapRangeAggregationExpr(expr *rangeAggregationExpr) SampleExpr {
switch expr.operation {
case OpTypeCountOverTime, OpTypeRate:
// count_over_time(x) -> count_over_time(x, shard=1) ++ count_over_time(x, shard=2)...
// rate(x) -> rate(x, shard=1) ++ rate(x, shard=2)...
return m.mapSampleExpr(expr)
default:
return expr
}
}
// isShardable returns false if any of the listed operation types are not shardable and true otherwise
func isShardable(ops []string) bool {
for _, op := range ops {
if shardable := shardableOps[op]; !shardable {
return false
}
}
return true
}
// shardableOps lists the operations which may be sharded.
// topk, botk, max, & min all must be concatenated and then evaluated in order to avoid
// potential data loss due to series distribution across shards.
// For example, grouping by `cluster` for a `max` operation may yield
// 2 results on the first shard and 10 results on the second. If we prematurely
// calculated `max`s on each shard, the shard/label combination with `2` may be
// discarded and some other combination with `11` may be reported falsely as the max.
//
// Explanation: this is my (owen-d) best understanding.
//
// For an operation to be shardable, first the sample-operation itself must be associative like (+, *) but not (%, /, ^).
// Secondly, if the operation is part of a vector aggregation expression or utilizes logical/set binary ops,
// the vector operation must be distributive over the sample-operation.
// This ensures that the vector merging operation can be applied repeatedly to data in different shards.
// references:
// https://en.wikipedia.org/wiki/Associative_property
// https://en.wikipedia.org/wiki/Distributive_property
var shardableOps = map[string]bool{
// vector ops
OpTypeSum: true,
// avg is only marked as shardable because we remap it into sum/count.
OpTypeAvg: true,
OpTypeCount: true,
// range vector ops
OpTypeCountOverTime: true,
OpTypeRate: true,
// binops - arith
OpTypeAdd: true,
OpTypeMul: true,
}

@ -0,0 +1,944 @@
package logql
import (
"testing"
"time"
"github.com/cortexproject/cortex/pkg/querier/astmapper"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/stretchr/testify/require"
)
func TestStringer(t *testing.T) {
for _, tc := range []struct {
in Expr
out string
}{
{
in: &ConcatLogSelectorExpr{
LogSelectorExpr: DownstreamLogSelectorExpr{
shard: &astmapper.ShardAnnotation{
Shard: 0,
Of: 2,
},
LogSelectorExpr: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
},
next: &ConcatLogSelectorExpr{
LogSelectorExpr: DownstreamLogSelectorExpr{
shard: &astmapper.ShardAnnotation{
Shard: 1,
Of: 2,
},
LogSelectorExpr: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
},
next: nil,
},
},
out: `downstream<{foo="bar"}, shard=0_of_2> ++ downstream<{foo="bar"}, shard=1_of_2>`,
},
} {
t.Run(tc.out, func(t *testing.T) {
require.Equal(t, tc.out, tc.in.String())
})
}
}
func TestMapSampleExpr(t *testing.T) {
m, err := NewShardMapper(2)
require.Nil(t, err)
for _, tc := range []struct {
in SampleExpr
out SampleExpr
}{
{
in: &rangeAggregationExpr{
operation: OpTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
interval: time.Minute,
},
},
out: &ConcatSampleExpr{
SampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 0,
Of: 2,
},
SampleExpr: &rangeAggregationExpr{
operation: OpTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
interval: time.Minute,
},
},
},
next: &ConcatSampleExpr{
SampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 1,
Of: 2,
},
SampleExpr: &rangeAggregationExpr{
operation: OpTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
interval: time.Minute,
},
},
},
next: nil,
},
},
},
} {
t.Run(tc.in.String(), func(t *testing.T) {
require.Equal(t, tc.out, m.mapSampleExpr(tc.in))
})
}
}
func TestMappingStrings(t *testing.T) {
m, err := NewShardMapper(2)
require.Nil(t, err)
for _, tc := range []struct {
in string
out string
}{
{
in: `sum(rate({foo="bar"}[1m]))`,
out: `sum(downstream<sum(rate(({foo="bar"})[1m])), shard=0_of_2> ++ downstream<sum(rate(({foo="bar"})[1m])), shard=1_of_2>)`,
},
{
in: `max(count(rate({foo="bar"}[5m]))) / 2`,
out: `max(sum(downstream<count(rate(({foo="bar"})[5m])), shard=0_of_2> ++ downstream<count(rate(({foo="bar"})[5m])), shard=1_of_2>)) / 2.000000`,
},
{
in: `topk(3, rate({foo="bar"}[5m]))`,
out: `topk(3,downstream<rate(({foo="bar"})[5m]), shard=0_of_2> ++ downstream<rate(({foo="bar"})[5m]), shard=1_of_2>)`,
},
{
in: `sum(max(rate({foo="bar"}[5m])))`,
out: `sum(max(downstream<rate(({foo="bar"})[5m]), shard=0_of_2> ++ downstream<rate(({foo="bar"})[5m]), shard=1_of_2>))`,
},
{
in: `{foo="bar"} |= "id=123"`,
out: `downstream<{foo="bar"}|="id=123", shard=0_of_2> ++ downstream<{foo="bar"}|="id=123", shard=1_of_2>`,
},
{
in: `sum by (cluster) (rate({foo="bar"} |= "id=123" [5m]))`,
out: `sum by(cluster)(downstream<sum by(cluster)(rate(({foo="bar"}|="id=123")[5m])), shard=0_of_2> ++ downstream<sum by(cluster)(rate(({foo="bar"}|="id=123")[5m])), shard=1_of_2>)`,
},
} {
t.Run(tc.in, func(t *testing.T) {
ast, err := ParseExpr(tc.in)
require.Nil(t, err)
mapped, err := m.Map(ast)
require.Nil(t, err)
require.Equal(t, tc.out, mapped.String())
})
}
}
func TestMapping(t *testing.T) {
m, err := NewShardMapper(2)
require.Nil(t, err)
for _, tc := range []struct {
in string
expr Expr
err error
}{
{
in: `{foo="bar"}`,
expr: &ConcatLogSelectorExpr{
LogSelectorExpr: DownstreamLogSelectorExpr{
shard: &astmapper.ShardAnnotation{
Shard: 0,
Of: 2,
},
LogSelectorExpr: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
},
next: &ConcatLogSelectorExpr{
LogSelectorExpr: DownstreamLogSelectorExpr{
shard: &astmapper.ShardAnnotation{
Shard: 1,
Of: 2,
},
LogSelectorExpr: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
},
next: nil,
},
},
},
{
in: `{foo="bar"} |= "error"`,
expr: &ConcatLogSelectorExpr{
LogSelectorExpr: DownstreamLogSelectorExpr{
shard: &astmapper.ShardAnnotation{
Shard: 0,
Of: 2,
},
LogSelectorExpr: &filterExpr{
match: "error",
ty: labels.MatchEqual,
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
},
},
next: &ConcatLogSelectorExpr{
LogSelectorExpr: DownstreamLogSelectorExpr{
shard: &astmapper.ShardAnnotation{
Shard: 1,
Of: 2,
},
LogSelectorExpr: &filterExpr{
match: "error",
ty: labels.MatchEqual,
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
},
},
next: nil,
},
},
},
{
in: `rate({foo="bar"}[5m])`,
expr: &ConcatSampleExpr{
SampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 0,
Of: 2,
},
SampleExpr: &rangeAggregationExpr{
operation: OpTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
interval: 5 * time.Minute,
},
},
},
next: &ConcatSampleExpr{
SampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 1,
Of: 2,
},
SampleExpr: &rangeAggregationExpr{
operation: OpTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
interval: 5 * time.Minute,
},
},
},
next: nil,
},
},
},
{
in: `count_over_time({foo="bar"}[5m])`,
expr: &ConcatSampleExpr{
SampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 0,
Of: 2,
},
SampleExpr: &rangeAggregationExpr{
operation: OpTypeCountOverTime,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
interval: 5 * time.Minute,
},
},
},
next: &ConcatSampleExpr{
SampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 1,
Of: 2,
},
SampleExpr: &rangeAggregationExpr{
operation: OpTypeCountOverTime,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
interval: 5 * time.Minute,
},
},
},
next: nil,
},
},
},
{
in: `sum(rate({foo="bar"}[5m]))`,
expr: &vectorAggregationExpr{
grouping: &grouping{},
operation: OpTypeSum,
left: &ConcatSampleExpr{
SampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 0,
Of: 2,
},
SampleExpr: &vectorAggregationExpr{
grouping: &grouping{},
operation: OpTypeSum,
left: &rangeAggregationExpr{
operation: OpTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
interval: 5 * time.Minute,
},
},
},
},
next: &ConcatSampleExpr{
SampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 1,
Of: 2,
},
SampleExpr: &vectorAggregationExpr{
grouping: &grouping{},
operation: OpTypeSum,
left: &rangeAggregationExpr{
operation: OpTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
interval: 5 * time.Minute,
},
},
},
},
next: nil,
},
},
},
},
{
in: `topk(3, rate({foo="bar"}[5m]))`,
expr: &vectorAggregationExpr{
grouping: &grouping{},
params: 3,
operation: OpTypeTopK,
left: &ConcatSampleExpr{
SampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 0,
Of: 2,
},
SampleExpr: &rangeAggregationExpr{
operation: OpTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
interval: 5 * time.Minute,
},
},
},
next: &ConcatSampleExpr{
SampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 1,
Of: 2,
},
SampleExpr: &rangeAggregationExpr{
operation: OpTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
interval: 5 * time.Minute,
},
},
},
next: nil,
},
},
},
},
{
in: `max without (env) (rate({foo="bar"}[5m]))`,
expr: &vectorAggregationExpr{
grouping: &grouping{
without: true,
groups: []string{"env"},
},
operation: OpTypeMax,
left: &ConcatSampleExpr{
SampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 0,
Of: 2,
},
SampleExpr: &rangeAggregationExpr{
operation: OpTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
interval: 5 * time.Minute,
},
},
},
next: &ConcatSampleExpr{
SampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 1,
Of: 2,
},
SampleExpr: &rangeAggregationExpr{
operation: OpTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
interval: 5 * time.Minute,
},
},
},
next: nil,
},
},
},
},
{
in: `count(rate({foo="bar"}[5m]))`,
expr: &vectorAggregationExpr{
operation: OpTypeSum,
grouping: &grouping{},
left: &ConcatSampleExpr{
SampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 0,
Of: 2,
},
SampleExpr: &vectorAggregationExpr{
grouping: &grouping{},
operation: OpTypeCount,
left: &rangeAggregationExpr{
operation: OpTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
interval: 5 * time.Minute,
},
},
},
},
next: &ConcatSampleExpr{
SampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 1,
Of: 2,
},
SampleExpr: &vectorAggregationExpr{
grouping: &grouping{},
operation: OpTypeCount,
left: &rangeAggregationExpr{
operation: OpTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
interval: 5 * time.Minute,
},
},
},
},
next: nil,
},
},
},
},
{
in: `avg(rate({foo="bar"}[5m]))`,
expr: &binOpExpr{
op: OpTypeDiv,
SampleExpr: &vectorAggregationExpr{
grouping: &grouping{},
operation: OpTypeSum,
left: &ConcatSampleExpr{
SampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 0,
Of: 2,
},
SampleExpr: &vectorAggregationExpr{
grouping: &grouping{},
operation: OpTypeSum,
left: &rangeAggregationExpr{
operation: OpTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
interval: 5 * time.Minute,
},
},
},
},
next: &ConcatSampleExpr{
SampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 1,
Of: 2,
},
SampleExpr: &vectorAggregationExpr{
grouping: &grouping{},
operation: OpTypeSum,
left: &rangeAggregationExpr{
operation: OpTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
interval: 5 * time.Minute,
},
},
},
},
next: nil,
},
},
},
RHS: &vectorAggregationExpr{
operation: OpTypeSum,
grouping: &grouping{},
left: &ConcatSampleExpr{
SampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 0,
Of: 2,
},
SampleExpr: &vectorAggregationExpr{
grouping: &grouping{},
operation: OpTypeCount,
left: &rangeAggregationExpr{
operation: OpTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
interval: 5 * time.Minute,
},
},
},
},
next: &ConcatSampleExpr{
SampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 1,
Of: 2,
},
SampleExpr: &vectorAggregationExpr{
grouping: &grouping{},
operation: OpTypeCount,
left: &rangeAggregationExpr{
operation: OpTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
interval: 5 * time.Minute,
},
},
},
},
next: nil,
},
},
},
},
},
{
in: `1 + sum by (cluster) (rate({foo="bar"}[5m]))`,
expr: &binOpExpr{
op: OpTypeAdd,
SampleExpr: &literalExpr{1},
RHS: &vectorAggregationExpr{
grouping: &grouping{
groups: []string{"cluster"},
},
operation: OpTypeSum,
left: &ConcatSampleExpr{
SampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 0,
Of: 2,
},
SampleExpr: &vectorAggregationExpr{
grouping: &grouping{
groups: []string{"cluster"},
},
operation: OpTypeSum,
left: &rangeAggregationExpr{
operation: OpTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
interval: 5 * time.Minute,
},
},
},
},
next: &ConcatSampleExpr{
SampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 1,
Of: 2,
},
SampleExpr: &vectorAggregationExpr{
grouping: &grouping{
groups: []string{"cluster"},
},
operation: OpTypeSum,
left: &rangeAggregationExpr{
operation: OpTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
interval: 5 * time.Minute,
},
},
},
},
next: nil,
},
},
},
},
},
// sum(max) should not shard the maxes
{
in: `sum(max(rate({foo="bar"}[5m])))`,
expr: &vectorAggregationExpr{
grouping: &grouping{},
operation: OpTypeSum,
left: &vectorAggregationExpr{
grouping: &grouping{},
operation: OpTypeMax,
left: &ConcatSampleExpr{
SampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 0,
Of: 2,
},
SampleExpr: &rangeAggregationExpr{
operation: OpTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
interval: 5 * time.Minute,
},
},
},
next: &ConcatSampleExpr{
SampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 1,
Of: 2,
},
SampleExpr: &rangeAggregationExpr{
operation: OpTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
interval: 5 * time.Minute,
},
},
},
next: nil,
},
},
},
},
},
// max(count) should shard the count, but not the max
{
in: `max(count(rate({foo="bar"}[5m])))`,
expr: &vectorAggregationExpr{
grouping: &grouping{},
operation: OpTypeMax,
left: &vectorAggregationExpr{
operation: OpTypeSum,
grouping: &grouping{},
left: &ConcatSampleExpr{
SampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 0,
Of: 2,
},
SampleExpr: &vectorAggregationExpr{
grouping: &grouping{},
operation: OpTypeCount,
left: &rangeAggregationExpr{
operation: OpTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
interval: 5 * time.Minute,
},
},
},
},
next: &ConcatSampleExpr{
SampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 1,
Of: 2,
},
SampleExpr: &vectorAggregationExpr{
grouping: &grouping{},
operation: OpTypeCount,
left: &rangeAggregationExpr{
operation: OpTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
interval: 5 * time.Minute,
},
},
},
},
next: nil,
},
},
},
},
},
{
in: `max(sum by (cluster) (rate({foo="bar"}[5m]))) / count(rate({foo="bar"}[5m]))`,
expr: &binOpExpr{
op: OpTypeDiv,
SampleExpr: &vectorAggregationExpr{
operation: OpTypeMax,
grouping: &grouping{},
left: &vectorAggregationExpr{
grouping: &grouping{
groups: []string{"cluster"},
},
operation: OpTypeSum,
left: &ConcatSampleExpr{
SampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 0,
Of: 2,
},
SampleExpr: &vectorAggregationExpr{
grouping: &grouping{
groups: []string{"cluster"},
},
operation: OpTypeSum,
left: &rangeAggregationExpr{
operation: OpTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
interval: 5 * time.Minute,
},
},
},
},
next: &ConcatSampleExpr{
SampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 1,
Of: 2,
},
SampleExpr: &vectorAggregationExpr{
grouping: &grouping{
groups: []string{"cluster"},
},
operation: OpTypeSum,
left: &rangeAggregationExpr{
operation: OpTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
interval: 5 * time.Minute,
},
},
},
},
next: nil,
},
},
},
},
RHS: &vectorAggregationExpr{
operation: OpTypeSum,
grouping: &grouping{},
left: &ConcatSampleExpr{
SampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 0,
Of: 2,
},
SampleExpr: &vectorAggregationExpr{
grouping: &grouping{},
operation: OpTypeCount,
left: &rangeAggregationExpr{
operation: OpTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
interval: 5 * time.Minute,
},
},
},
},
next: &ConcatSampleExpr{
SampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 1,
Of: 2,
},
SampleExpr: &vectorAggregationExpr{
grouping: &grouping{},
operation: OpTypeCount,
left: &rangeAggregationExpr{
operation: OpTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, "foo", "bar"),
},
},
interval: 5 * time.Minute,
},
},
},
},
next: nil,
},
},
},
},
},
} {
t.Run(tc.in, func(t *testing.T) {
ast, err := ParseExpr(tc.in)
require.Equal(t, tc.err, err)
mapped, err := m.Map(ast)
require.Equal(t, tc.err, err)
require.Equal(t, tc.expr.String(), mapped.String())
require.Equal(t, tc.expr, mapped)
})
}
}
Loading…
Cancel
Save