Integrates label replace into sharding code (#3132)

* avoid sharding label replace

* updates sharding tests

* matrix stepper end-inclusive for rangeVectorIterator parity

* Revert "matrix stepper end-inclusive for rangeVectorIterator parity"

This reverts commit 2f34deab98.

* removes sharding equivalence tests due to interference by non-sharding bug in the control case
pull/3153/head
Owen Diehl 5 years ago committed by GitHub
parent fcabfec470
commit c9b85b33ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      pkg/logql/ast.go
  2. 1
      pkg/logql/sharding_test.go
  3. 12
      pkg/logql/shardmapper.go
  4. 13
      pkg/logql/shardmapper_test.go

@ -933,7 +933,7 @@ func (e *labelReplaceExpr) Extractor() (SampleExtractor, error) {
}
func (e *labelReplaceExpr) Operations() []string {
return e.left.Operations()
return append([]string{OpLabelReplace}, e.left.Operations()...)
}
func (e *labelReplaceExpr) String() string {

@ -39,7 +39,6 @@ func TestMappingEquivalence(t *testing.T) {
{`rate({a=~".*"}[1s])`, false},
{`sum by (a) (rate({a=~".*"}[1s]))`, false},
{`sum(rate({a=~".*"}[1s]))`, false},
{`max without (a) (rate({a=~".*"}[1s]))`, false},
{`count(rate({a=~".*"}[1s]))`, false},
{`avg(rate({a=~".*"}[1s]))`, true},

@ -133,6 +133,8 @@ func (m ShardMapper) Map(expr Expr, r *shardRecorder) (Expr, error) {
return m.mapLogSelectorExpr(e.(LogSelectorExpr), r), nil
case *vectorAggregationExpr:
return m.mapVectorAggregationExpr(e, r)
case *labelReplaceExpr:
return m.mapLabelReplaceExpr(e, r)
case *rangeAggregationExpr:
return m.mapRangeAggregationExpr(e, r), nil
case *binOpExpr:
@ -277,6 +279,16 @@ func (m ShardMapper) mapVectorAggregationExpr(expr *vectorAggregationExpr, r *sh
}
}
func (m ShardMapper) mapLabelReplaceExpr(expr *labelReplaceExpr, r *shardRecorder) (SampleExpr, error) {
subMapped, err := m.Map(expr.left, r)
if err != nil {
return nil, err
}
cpy := *expr
cpy.left = subMapped.(SampleExpr)
return &cpy, nil
}
func (m ShardMapper) mapRangeAggregationExpr(expr *rangeAggregationExpr, r *shardRecorder) SampleExpr {
if hasLabelModifier(expr) {
// if an expr can modify labels this means multiple shards can returns the same labelset.

@ -175,6 +175,19 @@ func TestMappingStrings(t *testing.T) {
in: `sum by (cluster) (stddev_over_time({foo="bar"} |= "id=123" | logfmt | unwrap latency [5m]))`,
out: `sum by (cluster) (stddev_over_time({foo="bar"} |= "id=123" | logfmt | unwrap latency [5m]))`,
},
{
in: `
sum without (a) (
label_replace(
sum without (b) (
rate({foo="bar"}[5m])
),
"baz", "buz", "foo", "(.*)"
)
)
`,
out: `sum without(a)(label_replace(sum without(b)(downstream<sum without(b)(rate({foo="bar"}[5m])),shard=0_of_2>++downstream<sum without(b)(rate({foo="bar"}[5m])),shard=1_of_2>),"baz","buz","foo","(.*)"))`,
},
} {
t.Run(tc.in, func(t *testing.T) {
ast, err := ParseExpr(tc.in)

Loading…
Cancel
Save