fix(query sharding): Generalize avg -> sum/count sharding using existing binop mapper (#12599)

pull/12600/head
Travis Patterson 1 year ago committed by GitHub
parent 91dab51152
commit 11e768726f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 38
      pkg/logql/shardmapper.go
  2. 24
      pkg/logql/shardmapper_test.go

@ -240,33 +240,21 @@ func (m ShardMapper) mapVectorAggregationExpr(expr *syntax.VectorAggregationExpr
case syntax.OpTypeAvg:
// avg(x) -> sum(x)/count(x), which is parallelizable
lhs, lhsBytesPerShard, err := m.mapVectorAggregationExpr(&syntax.VectorAggregationExpr{
Left: expr.Left,
Grouping: expr.Grouping,
Operation: syntax.OpTypeSum,
}, r, false)
if err != nil {
return nil, 0, err
}
rhs, rhsBytesPerShard, err := m.mapVectorAggregationExpr(&syntax.VectorAggregationExpr{
Left: expr.Left,
Grouping: expr.Grouping,
Operation: syntax.OpTypeCount,
}, r, false)
if err != nil {
return nil, 0, err
binOp := &syntax.BinOpExpr{
SampleExpr: &syntax.VectorAggregationExpr{
Left: expr.Left,
Grouping: expr.Grouping,
Operation: syntax.OpTypeSum,
},
RHS: &syntax.VectorAggregationExpr{
Left: expr.Left,
Grouping: expr.Grouping,
Operation: syntax.OpTypeCount,
},
Op: syntax.OpTypeDiv,
}
// We take the maximum bytes per shard of both sides of the operation
bytesPerShard := uint64(max(int(lhsBytesPerShard), int(rhsBytesPerShard)))
return &syntax.BinOpExpr{
SampleExpr: lhs,
RHS: rhs,
Op: syntax.OpTypeDiv,
}, bytesPerShard, nil
return m.mapBinOpExpr(binOp, r, topLevel)
case syntax.OpTypeCount:
if syntax.ReducesLabels(expr.Left) {
// skip sharding optimizations at this level. If labels are reduced,

@ -231,6 +231,18 @@ func TestMappingStrings(t *testing.T) {
)
)`,
},
{
in: `avg by(foo_extracted) (quantile_over_time(0.95, {foo="baz"} | logfmt | unwrap foo_extracted | __error__="" [5m]))`,
out: `(
sum by (foo_extracted) (
downstream<sumby(foo_extracted)(quantile_over_time(0.95,{foo="baz"}|logfmt|unwrapfoo_extracted|__error__=""[5m])),shard=0_of_2>++downstream<sumby(foo_extracted)(quantile_over_time(0.95,{foo="baz"}|logfmt|unwrapfoo_extracted|__error__=""[5m])),shard=1_of_2>
)
/
sum by (foo_extracted) (
downstream<countby(foo_extracted)(quantile_over_time(0.95,{foo="baz"}|logfmt|unwrapfoo_extracted|__error__=""[5m])),shard=0_of_2>++downstream<countby(foo_extracted)(quantile_over_time(0.95,{foo="baz"}|logfmt|unwrapfoo_extracted|__error__=""[5m])),shard=1_of_2>
)
)`,
},
{
in: `count(rate({foo="bar"} | json | keep foo [5m]))`,
out: `count(
@ -283,12 +295,12 @@ func TestMappingStrings(t *testing.T) {
},
{
in: `sum without (a) (
label_replace(
sum without (b) (
rate({foo="bar"}[5m])
),
"baz", "buz", "foo", "(.*)"
)
label_replace(
sum without (b) (
rate({foo="bar"}[5m])
),
"baz", "buz", "foo", "(.*)"
)
)`,
out: `sum without(a) (
label_replace(

Loading…
Cancel
Save