Split by range of instant queries (#5662)

* Split by range on Instant queries POC v3

Co-authored-by: Christian Haudum <christian.haudum@gmail.com>

* Handle uneven split by duration

* Register SplitByRangeMiddleware in roundtripper

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* fixup! Register SplitByRangeMiddleware in roundtripper

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* fixup! fixup! Register SplitByRangeMiddleware in roundtripper

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Remove rewrite if range aggr has label extraction stage

In case a range aggregation has a generic label extraction stage, such
as `| json` or `| logfmt` and no group by, we cannot split it, because
otherwise the downstream queries would result in too many series.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Fix linting

* Implement range splitting for rate() and bytes_rate()

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Fix linting

* Calculate offset of downstream queries correctly

if the outer query range contains an offset as well.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Fix linting

* Add optimization by moving the outer label grouping downstream

* Add label grouping downstream optimization to rate and bytes_rate expressions

* Add changelog entry

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Simplify types in rangemapper

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* fixup! Simplify types in rangemapper

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Check in Map function if query is splittable by range

Since this is the main function of the mapper, we can ensure here that
only supported vector/range aggregations are handled.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Some code cleanups and variable renaming

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Extract duplicate code in range aggr mapping into function

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Add topk to supported splittable vector aggregations

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Check if query is splittable by range before calling Map()

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Add more function comments

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Rename RangeVectorMapper to RangeMapper

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Fix incorrect import due to rebase

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Add equivalence test cases with `logfmt` pipeline stage

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Remove limitation of pushing down vector aggr only if grouping is present

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Remove TestRangeMappingEquivalenceMockMapper test

This test is essentially the same as the test
Test_SplitRangeVectorMapping, just using a different representation of
the result.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* fixup! Remove limitation of pushing down vector aggr only if grouping is present

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* fixup! fixup! Remove limitation of pushing down vector aggr only if grouping is present

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Fix linter errors

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Better naming of variable

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Split SplitRangeVectorMapping test into two

to have the test for noop queries separated

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

Co-authored-by: Christian Haudum <christian.haudum@gmail.com>
Co-authored-by: Owen Diehl <ow.diehl@gmail.com>
pull/5828/head
Susana Ferreira 3 years ago committed by GitHub
parent cc3a8e4415
commit 0bce8d95a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 202
      pkg/logql/downstream_test.go
  2. 332
      pkg/logql/rangemapper.go
  3. 1057
      pkg/logql/rangemapper_test.go
  4. 2
      pkg/logql/test_utils.go
  5. 1
      pkg/querier/queryrange/roundtrip.go
  6. 120
      pkg/querier/queryrange/split_by_range.go
  7. 181
      pkg/querier/queryrange/split_by_range_test.go

@ -99,7 +99,207 @@ func TestMappingEquivalence(t *testing.T) {
} }
} }
// approximatelyEquals ensures two responses are approximately equal, up to 6 decimals precision per sample func TestRangeMappingEquivalence(t *testing.T) {
var (
shards = 3
nStreams = 60
rounds = 20
streams = randomStreams(nStreams, rounds+1, shards, []string{"a", "b", "c", "d"})
start = time.Unix(0, 0)
end = time.Unix(0, int64(time.Second*time.Duration(rounds)))
step = time.Second
interval = time.Duration(0)
limit = 100
)
for _, tc := range []struct {
query string
splitByInterval time.Duration
}{
// Range vector aggregators
{`bytes_over_time({a=~".+"}[2s])`, time.Second},
{`count_over_time({a=~".+"}[2s])`, time.Second},
{`sum_over_time({a=~".+"} | unwrap b [2s])`, time.Second},
{`max_over_time({a=~".+"} | unwrap b [2s])`, time.Second},
{`max_over_time({a=~".+"} | unwrap b [2s]) by (a)`, time.Second},
{`max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a)`, time.Second},
{`min_over_time({a=~".+"} | unwrap b [2s])`, time.Second},
{`min_over_time({a=~".+"} | unwrap b [2s]) by (a)`, time.Second},
{`min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a)`, time.Second},
{`rate({a=~".+"}[2s])`, time.Second},
{`bytes_rate({a=~".+"}[2s])`, time.Second},
// sum
{`sum(bytes_over_time({a=~".+"}[2s]))`, time.Second},
{`sum(count_over_time({a=~".+"}[2s]))`, time.Second},
{`sum(sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum(max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`sum(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`sum(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`sum(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`sum(rate({a=~".+"}[2s]))`, time.Second},
{`sum(bytes_rate({a=~".+"}[2s]))`, time.Second},
// sum by
{`sum by (a) (bytes_over_time({a=~".+"}[2s]))`, time.Second},
{`sum by (a) (count_over_time({a=~".+"}[2s]))`, time.Second},
{`sum by (a) (sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum by (a) (max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum by (a) (max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`sum by (a) (max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`sum by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`sum by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`sum by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`sum by (a) (rate({a=~".+"}[2s]))`, time.Second},
{`sum by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second},
// count
{`count(bytes_over_time({a=~".+"}[2s]))`, time.Second},
{`count(count_over_time({a=~".+"}[2s]))`, time.Second},
{`count(sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`count(max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`count(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`count(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`count(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`count(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`count(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`count(rate({a=~".+"}[2s]))`, time.Second},
{`count(bytes_rate({a=~".+"}[2s]))`, time.Second},
// count by
{`count by (a) (bytes_over_time({a=~".+"}[2s]))`, time.Second},
{`count by (a) (count_over_time({a=~".+"}[2s]))`, time.Second},
{`count by (a) (sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`count by (a) (max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`count by (a) (max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`count by (a) (max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`count by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`count by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`count by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`count by (a) (rate({a=~".+"}[2s]))`, time.Second},
{`count by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second},
// max
{`max(bytes_over_time({a=~".+"}[2s]))`, time.Second},
{`max(count_over_time({a=~".+"}[2s]))`, time.Second},
{`max(sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`max(max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`max(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`max(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`max(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`max(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`max(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`max(rate({a=~".+"}[2s]))`, time.Second},
{`max(bytes_rate({a=~".+"}[2s]))`, time.Second},
// max by
{`max by (a) (bytes_over_time({a=~".+"}[2s]))`, time.Second},
{`max by (a) (count_over_time({a=~".+"}[2s]))`, time.Second},
{`max by (a) (sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`max by (a) (max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`max by (a) (max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`max by (a) (max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`max by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`max by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`max by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`max by (a) (rate({a=~".+"}[2s]))`, time.Second},
{`max by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second},
// min
{`min(bytes_over_time({a=~".+"}[2s]))`, time.Second},
{`min(count_over_time({a=~".+"}[2s]))`, time.Second},
{`min(sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min(max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min(max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`min(max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`min(min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min(min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`min(min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`min(rate({a=~".+"}[2s]))`, time.Second},
{`min(bytes_rate({a=~".+"}[2s]))`, time.Second},
// min by
{`min by (a) (bytes_over_time({a=~".+"}[2s]))`, time.Second},
{`min by (a) (count_over_time({a=~".+"}[2s]))`, time.Second},
{`min by (a) (sum_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min by (a) (max_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min by (a) (max_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`min by (a) (max_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`min by (a) (min_over_time({a=~".+"} | unwrap b [2s]))`, time.Second},
{`min by (a) (min_over_time({a=~".+"} | unwrap b [2s]) by (a))`, time.Second},
{`min by (a) (min_over_time({a=~".+"} | logfmt | unwrap line [2s]) by (a))`, time.Second},
{`min by (a) (rate({a=~".+"}[2s]))`, time.Second},
{`min by (a) (bytes_rate({a=~".+"}[2s]))`, time.Second},
// Binary operations
{`bytes_over_time({a=~".+"}[3s]) + count_over_time({a=~".+"}[5s])`, time.Second},
{`sum(count_over_time({a=~".+"}[3s]) * count(sum_over_time({a=~".+"} | unwrap b [5s])))`, time.Second},
// Multi vector aggregator layer queries
{`sum(max(bytes_over_time({a=~".+"}[3s])))`, time.Second},
{`sum(min by (a)(max(sum by (b) (count_over_time({a=~".+"} [2s])))))`, time.Second},
// Non-splittable vector aggregators
// TODO: Fix topk
//{`topk(2, count_over_time({a=~".+"}[2s]))`, time.Second},
{`avg(count_over_time({a=~".+"}[2s]))`, time.Second},
// Uneven split times
{`bytes_over_time({a=~".+"}[3s])`, 2 * time.Second},
{`count_over_time({a=~".+"}[5s])`, 2 * time.Second},
// range with offset
{`rate({a=~".+"}[2s] offset 2s)`, time.Second},
} {
q := NewMockQuerier(
shards,
streams,
)
opts := EngineOpts{}
regularEngine := NewEngine(opts, q, NoLimits, log.NewNopLogger())
downstreamEngine := NewDownstreamEngine(opts, MockDownstreamer{regularEngine}, nilMetrics, NoLimits, log.NewNopLogger())
t.Run(tc.query, func(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "fake")
params := NewLiteralParams(
tc.query,
start,
end,
step,
interval,
logproto.FORWARD,
uint32(limit),
nil,
)
// Regular engine
qry := regularEngine.Query(params)
res, err := qry.Exec(ctx)
require.Nil(t, err)
// Downstream engine - split by range
rangeMapper, err := NewRangeMapper(tc.splitByInterval)
require.Nil(t, err)
noop, rangeExpr, err := rangeMapper.Parse(tc.query)
require.Nil(t, err)
require.False(t, noop, "downstream engine cannot execute noop")
rangeQry := downstreamEngine.Query(params, rangeExpr)
rangeRes, err := rangeQry.Exec(ctx)
require.Nil(t, err)
require.Equal(t, res.Data, rangeRes.Data)
})
}
}
// approximatelyEquals ensures two responses are approximately equal,
// up to 6 decimals precision per sample
func approximatelyEquals(t *testing.T, as, bs promql.Matrix) { func approximatelyEquals(t *testing.T, as, bs promql.Matrix) {
require.Equal(t, len(as), len(bs)) require.Equal(t, len(as), len(bs))

@ -0,0 +1,332 @@
package logql
import (
"fmt"
"time"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/grafana/loki/pkg/logql/syntax"
util_log "github.com/grafana/loki/pkg/util/log"
)
var splittableVectorOp = map[string]struct{}{
syntax.OpTypeSum: {},
syntax.OpTypeCount: {},
syntax.OpTypeMax: {},
syntax.OpTypeMin: {},
syntax.OpTypeAvg: {},
syntax.OpTypeTopK: {},
}
var splittableRangeVectorOp = map[string]struct{}{
syntax.OpRangeTypeRate: {},
syntax.OpRangeTypeBytesRate: {},
syntax.OpRangeTypeBytes: {},
syntax.OpRangeTypeCount: {},
syntax.OpRangeTypeSum: {},
syntax.OpRangeTypeMax: {},
syntax.OpRangeTypeMin: {},
}
type RangeMapper struct {
splitByInterval time.Duration
}
func NewRangeMapper(interval time.Duration) (RangeMapper, error) {
if interval <= 0 {
return RangeMapper{}, fmt.Errorf("cannot create RangeMapper with splitByInterval <= 0; got %s", interval)
}
return RangeMapper{
splitByInterval: interval,
}, nil
}
// Parse returns (noop, parsed expression, error)
func (m RangeMapper) Parse(query string) (bool, syntax.Expr, error) {
origExpr, err := syntax.ParseSampleExpr(query)
if err != nil {
return true, nil, err
}
if !isSplittableByRange(origExpr) {
return true, origExpr, nil
}
modExpr, err := m.Map(origExpr, nil)
if err != nil {
return true, nil, err
}
return origExpr.String() == modExpr.String(), modExpr, err
}
func (m RangeMapper) Map(expr syntax.SampleExpr, vectorAggrPushdown *syntax.VectorAggregationExpr) (syntax.SampleExpr, error) {
// immediately clone the passed expr to avoid mutating the original
expr, err := clone(expr)
if err != nil {
return nil, err
}
switch e := expr.(type) {
case *syntax.VectorAggregationExpr:
return m.mapVectorAggregationExpr(e)
case *syntax.RangeAggregationExpr:
return m.mapRangeAggregationExpr(e, vectorAggrPushdown), nil
case *syntax.BinOpExpr:
lhsMapped, err := m.Map(e.SampleExpr, vectorAggrPushdown)
if err != nil {
return nil, err
}
rhsMapped, err := m.Map(e.RHS, vectorAggrPushdown)
if err != nil {
return nil, err
}
e.SampleExpr = lhsMapped
e.RHS = rhsMapped
return e, nil
default:
return nil, errors.Errorf("unexpected expr type (%T) for ASTMapper type (%T) ", expr, m)
}
}
// getRangeInterval returns the interval in the range vector
// Note that this function must not be called with a BinOpExpr as argument
// as it returns only the range of the RHS.
// Example: expression `count_over_time({app="foo"}[10m])` returns 10m
func getRangeInterval(expr syntax.SampleExpr) time.Duration {
var rangeInterval time.Duration
expr.Walk(func(e interface{}) {
switch concrete := e.(type) {
case *syntax.RangeAggregationExpr:
rangeInterval = concrete.Left.Interval
}
})
return rangeInterval
}
// hasLabelExtractionStage returns true if an expression contains a stage for label extraction,
// such as `| json` or `| logfmt`, that would result in an exploding amount of series in downstream queries.
func hasLabelExtractionStage(expr syntax.SampleExpr) bool {
found := false
expr.Walk(func(e interface{}) {
switch concrete := e.(type) {
case *syntax.LabelParserExpr:
// It will **not** return true for `regexp`, `unpack` and `pattern`, since these label extraction
// stages can control how many labels, and therefore the resulting amount of series, are extracted.
if concrete.Op == syntax.OpParserTypeJSON || concrete.Op == syntax.OpParserTypeLogfmt {
found = true
}
}
})
return found
}
// sumOverFullRange returns an expression that sums up individual downstream queries (with preserving labels)
// and dividing it by the full range in seconds to calculate a rate value.
// The operation defines the range aggregation operation of the downstream queries.
// Example:
// rate({app="foo"}[2m])
// => (sum without (count_over_time({app="foo"}[1m]) ++ count_over_time({app="foo"}[1m]) offset 1m) / 120)
func (m RangeMapper) sumOverFullRange(expr *syntax.RangeAggregationExpr, overrideDownstream *syntax.VectorAggregationExpr, operation string, rangeInterval time.Duration) syntax.SampleExpr {
var downstreamExpr syntax.SampleExpr = &syntax.RangeAggregationExpr{
Left: expr.Left,
Operation: operation,
}
// Optimization: in case overrideDownstream exists, the downstream expression can be optimized with the grouping
// and operation of the overrideDownstream expression in order to reduce the returned streams' label set.
if overrideDownstream != nil {
downstreamExpr = &syntax.VectorAggregationExpr{
Left: downstreamExpr,
Grouping: overrideDownstream.Grouping,
Operation: overrideDownstream.Operation,
}
}
return &syntax.BinOpExpr{
SampleExpr: &syntax.VectorAggregationExpr{
Left: m.mapConcatSampleExpr(downstreamExpr, rangeInterval),
Grouping: &syntax.Grouping{
Without: true,
},
Operation: syntax.OpTypeSum,
},
RHS: &syntax.LiteralExpr{Val: rangeInterval.Seconds()},
Op: syntax.OpTypeDiv,
Opts: &syntax.BinOpOptions{},
}
}
// vectorAggrWithRangeDownstreams returns an expression that aggregates a concat sample expression of multiple range
// aggregations. If a vector aggregation is pushed down, the downstream queries of the concat sample expression are
// wrapped in the vector aggregation of the parent node.
// Example:
// min(bytes_over_time({job="bar"} [2m])
// => min without (bytes_over_time({job="bar"} [1m]) ++ bytes_over_time({job="bar"} [1m] offset 1m))
// min by (app) (bytes_over_time({job="bar"} [2m])
// => min without (min by (app) (bytes_over_time({job="bar"} [1m])) ++ min by (app) (bytes_over_time({job="bar"} [1m] offset 1m)))
func (m RangeMapper) vectorAggrWithRangeDownstreams(expr *syntax.RangeAggregationExpr, vectorAggrPushdown *syntax.VectorAggregationExpr, op string, rangeInterval time.Duration) syntax.SampleExpr {
grouping := expr.Grouping
if expr.Grouping == nil {
grouping = &syntax.Grouping{
Without: true,
}
}
var downstream syntax.SampleExpr = expr
if vectorAggrPushdown != nil {
downstream = vectorAggrPushdown
}
return &syntax.VectorAggregationExpr{
Left: m.mapConcatSampleExpr(downstream, rangeInterval),
Grouping: grouping,
Operation: op,
}
}
// appendDownstream adds expression expr with a range interval 'interval' and offset 'offset' to the downstreams list.
// Returns the updated downstream ConcatSampleExpr.
func appendDownstream(downstreams *ConcatSampleExpr, expr syntax.SampleExpr, interval time.Duration, offset time.Duration) *ConcatSampleExpr {
sampleExpr, _ := clone(expr)
sampleExpr.Walk(func(e interface{}) {
switch concrete := e.(type) {
case *syntax.RangeAggregationExpr:
concrete.Left.Interval = interval
if offset != 0 {
concrete.Left.Offset += offset
}
}
})
downstreams = &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
SampleExpr: sampleExpr,
},
next: downstreams,
}
return downstreams
}
// mapConcatSampleExpr transform expr in multiple downstream subexpressions split by offset range interval
// rangeInterval should be greater than m.splitByInterval, otherwise the resultant expression
// will have an unnecessary aggregation operation
func (m RangeMapper) mapConcatSampleExpr(expr syntax.SampleExpr, rangeInterval time.Duration) syntax.SampleExpr {
splitCount := int(rangeInterval / m.splitByInterval)
if splitCount == 0 {
return expr
}
var split int
var downstreams *ConcatSampleExpr
for split = 0; split < splitCount; split++ {
downstreams = appendDownstream(downstreams, expr, m.splitByInterval, time.Duration(split)*m.splitByInterval)
}
// Add the remainder offset interval
if rangeInterval%m.splitByInterval != 0 {
offset := time.Duration(split) * m.splitByInterval
downstreams = appendDownstream(downstreams, expr, rangeInterval-offset, offset)
}
return downstreams
}
func (m RangeMapper) mapVectorAggregationExpr(expr *syntax.VectorAggregationExpr) (syntax.SampleExpr, error) {
rangeInterval := getRangeInterval(expr)
// in case the interval is smaller than the configured split interval,
// don't split it.
// TODO: what if there is another internal expr with an interval that can be split?
if rangeInterval <= m.splitByInterval {
return expr, nil
}
// In order to minimize the amount of streams on the downstream query,
// we can push down the outer vector aggregation to the downstream query.
// This does not work for `count()` and `topk()`, though.
// We also do not want to push down, if the inner expression is a binary operation.
// TODO: Currently, it is sending the last inner expression grouping dowstream.
// Which grouping should be sent downstream?
var vectorAggrPushdown *syntax.VectorAggregationExpr
if _, ok := expr.Left.(*syntax.BinOpExpr); !ok && expr.Operation != syntax.OpTypeCount && expr.Operation != syntax.OpTypeTopK {
vectorAggrPushdown = expr
}
// Split the vector aggregation's inner expression
lhsMapped, err := m.Map(expr.Left, vectorAggrPushdown)
if err != nil {
return nil, err
}
return &syntax.VectorAggregationExpr{
Left: lhsMapped,
Grouping: expr.Grouping,
Params: expr.Params,
Operation: expr.Operation,
}, nil
}
// mapRangeAggregationExpr maps expr into a new SampleExpr with multiple downstream subqueries split by range interval
// Optimization: in order to reduce the returned stream from the inner downstream functions, in case a range aggregation
// expression is aggregated by a vector aggregation expression with a label grouping, the downstream expression can be
// exactly the same as the initial query concatenated by a `sum` operation. If this is the case, overrideDownstream
// contains the initial query which will be the downstream expression with a split range interval.
// Example: `sum by (a) (bytes_over_time)`
// Is mapped to `sum by (a) (sum without downstream<sum by (a) (bytes_over_time)>++downstream<sum by (a) (bytes_over_time)>++...)`
func (m RangeMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, vectorAggrPushdown *syntax.VectorAggregationExpr) syntax.SampleExpr {
rangeInterval := getRangeInterval(expr)
// in case the interval is smaller than the configured split interval,
// don't split it.
if rangeInterval <= m.splitByInterval {
return expr
}
// We cannot execute downstream queries that would potentially produce a huge amount of series
// and therefore would very likely fail.
if expr.Grouping == nil && hasLabelExtractionStage(expr) {
return expr
}
switch expr.Operation {
case syntax.OpRangeTypeBytes, syntax.OpRangeTypeCount, syntax.OpRangeTypeSum:
return m.vectorAggrWithRangeDownstreams(expr, vectorAggrPushdown, syntax.OpTypeSum, rangeInterval)
case syntax.OpRangeTypeMax:
return m.vectorAggrWithRangeDownstreams(expr, vectorAggrPushdown, syntax.OpTypeMax, rangeInterval)
case syntax.OpRangeTypeMin:
return m.vectorAggrWithRangeDownstreams(expr, vectorAggrPushdown, syntax.OpTypeMin, rangeInterval)
case syntax.OpRangeTypeRate:
return m.sumOverFullRange(expr, vectorAggrPushdown, syntax.OpRangeTypeCount, rangeInterval)
case syntax.OpRangeTypeBytesRate:
return m.sumOverFullRange(expr, vectorAggrPushdown, syntax.OpRangeTypeBytes, rangeInterval)
default:
// this should not be reachable.
// If an operation is splittable it should have an optimization listed.
level.Warn(util_log.Logger).Log(
"msg", "unexpected range aggregation expression",
"operation", expr.Operation,
)
return expr
}
}
// isSplittableByRange returns whether it is possible to optimize the given sample expression
func isSplittableByRange(expr syntax.SampleExpr) bool {
switch e := expr.(type) {
case *syntax.VectorAggregationExpr:
_, ok := splittableVectorOp[e.Operation]
return ok && isSplittableByRange(e.Left)
case *syntax.BinOpExpr:
return isSplittableByRange(e.SampleExpr) && isSplittableByRange(e.RHS)
case *syntax.LabelReplaceExpr:
return isSplittableByRange(e.Left)
case *syntax.RangeAggregationExpr:
_, ok := splittableRangeVectorOp[e.Operation]
return ok
default:
return false
}
}
// clone returns a copy of the given sample expression
// This is needed whenever we want to modify the existing query tree.
func clone(expr syntax.SampleExpr) (syntax.SampleExpr, error) {
return syntax.ParseSampleExpr(expr.String())
}

File diff suppressed because it is too large Load Diff

@ -261,7 +261,7 @@ func randomStreams(nStreams, nEntries, nShards int, labelNames []string) (stream
for j := 0; j <= nEntries; j++ { for j := 0; j <= nEntries; j++ {
stream.Entries = append(stream.Entries, logproto.Entry{ stream.Entries = append(stream.Entries, logproto.Entry{
Timestamp: time.Unix(0, int64(j*int(time.Second))), Timestamp: time.Unix(0, int64(j*int(time.Second))),
Line: fmt.Sprintf("line number: %d", j), Line: fmt.Sprintf("stream=stderr level=debug line=%d", j),
}) })
} }

@ -478,6 +478,7 @@ func NewInstantMetricTripperware(
if cfg.ShardedQueries { if cfg.ShardedQueries {
queryRangeMiddleware = append(queryRangeMiddleware, queryRangeMiddleware = append(queryRangeMiddleware,
NewSplitByRangeMiddleware(log, limits, nil),
NewQueryShardMiddleware( NewQueryShardMiddleware(
log, log,
schema.Configs, schema.Configs,

@ -0,0 +1,120 @@
package queryrange
import (
"context"
"fmt"
"net/http"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/tenant"
"github.com/prometheus/prometheus/promql/parser"
"github.com/weaveworks/common/httpgrpc"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/marshal"
"github.com/grafana/loki/pkg/util/validation"
)
type splitByRange struct {
logger log.Logger
next queryrangebase.Handler
limits Limits
ng *logql.DownstreamEngine
}
// NewSplitByRangeMiddleware creates a new Middleware that splits log requests by the range interval.
func NewSplitByRangeMiddleware(logger log.Logger, limits Limits, metrics *logql.ShardingMetrics) queryrangebase.Middleware {
return queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler {
return &splitByRange{
logger: log.With(logger, "middleware", "InstantQuery.splitByRangeVector"),
next: next,
limits: limits,
ng: logql.NewDownstreamEngine(logql.EngineOpts{}, DownstreamHandler{next}, metrics, limits, logger),
}
})
}
func (s *splitByRange) Do(ctx context.Context, request queryrangebase.Request) (queryrangebase.Response, error) {
logger := util_log.WithContext(ctx, s.logger)
tenants, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
interval := validation.SmallestPositiveNonZeroDurationPerTenant(tenants, s.limits.QuerySplitDuration)
// if no interval configured, continue to the next middleware
if interval == 0 {
return s.next.Do(ctx, request)
}
mapper, err := logql.NewRangeMapper(interval)
if err != nil {
return nil, err
}
noop, parsed, err := mapper.Parse(request.GetQuery())
if err != nil {
level.Warn(logger).Log("msg", "failed mapping AST", "err", err.Error(), "query", request.GetQuery())
return nil, err
}
level.Debug(logger).Log("msg", "mapped instant query", "interval", interval.String(), "noop", noop, "original", request.GetQuery(), "mapped", parsed.String())
if noop {
// the query cannot be split, so continue
return s.next.Do(ctx, request)
}
params, err := paramsFromRequest(request)
if err != nil {
return nil, err
}
if _, ok := request.(*LokiInstantRequest); !ok {
return nil, fmt.Errorf("expected *LokiInstantRequest")
}
query := s.ng.Query(params, parsed)
res, err := query.Exec(ctx)
if err != nil {
return nil, err
}
value, err := marshal.NewResultValue(res.Data)
if err != nil {
return nil, err
}
switch res.Data.Type() {
case parser.ValueTypeMatrix:
return &LokiPromResponse{
Response: &queryrangebase.PrometheusResponse{
Status: loghttp.QueryStatusSuccess,
Data: queryrangebase.PrometheusData{
ResultType: loghttp.ResultTypeMatrix,
Result: toProtoMatrix(value.(loghttp.Matrix)),
},
},
Statistics: res.Statistics,
}, nil
case parser.ValueTypeVector:
return &LokiPromResponse{
Statistics: res.Statistics,
Response: &queryrangebase.PrometheusResponse{
Status: loghttp.QueryStatusSuccess,
Data: queryrangebase.PrometheusData{
ResultType: loghttp.ResultTypeVector,
Result: toProtoVector(value.(loghttp.Vector)),
},
},
}, nil
default:
return nil, fmt.Errorf("unexpected downstream response type (%T)", res.Data.Type())
}
}

@ -0,0 +1,181 @@
package queryrange
import (
"context"
"fmt"
"testing"
"time"
"github.com/grafana/loki/pkg/loghttp"
"github.com/go-kit/log"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
)
func Test_RangeVectorSplit(t *testing.T) {
srm := NewSplitByRangeMiddleware(log.NewNopLogger(), fakeLimits{
maxSeries: 10000,
splits: map[string]time.Duration{
"tenant": time.Minute,
},
}, nilShardingMetrics)
ctx := user.InjectOrgID(context.TODO(), "tenant")
for _, tc := range []struct {
in queryrangebase.Request
subQueries []queryrangebase.RequestResponse
expected queryrangebase.Response
}{
{
in: &LokiInstantRequest{
Query: `sum(bytes_over_time({app="foo"}[3m]))`,
TimeTs: time.Unix(1, 0),
Path: "/loki/api/v1/query",
},
subQueries: []queryrangebase.RequestResponse{
subQueryRequestResponse(`sum(bytes_over_time({app="foo"}[1m]))`, 1),
subQueryRequestResponse(`sum(bytes_over_time({app="foo"}[1m] offset 1m0s))`, 2),
subQueryRequestResponse(`sum(bytes_over_time({app="foo"}[1m] offset 2m0s))`, 3),
},
expected: expectedMergedResponse(1 + 2 + 3),
},
{
in: &LokiInstantRequest{
Query: `sum by (bar) (bytes_over_time({app="foo"}[3m]))`,
TimeTs: time.Unix(1, 0),
Path: "/loki/api/v1/query",
},
subQueries: []queryrangebase.RequestResponse{
subQueryRequestResponse(`sum by(bar)(bytes_over_time({app="foo"}[1m]))`, 10),
subQueryRequestResponse(`sum by(bar)(bytes_over_time({app="foo"}[1m] offset 1m0s))`, 20),
subQueryRequestResponse(`sum by(bar)(bytes_over_time({app="foo"}[1m] offset 2m0s))`, 30),
},
expected: expectedMergedResponse(10 + 20 + 30),
},
{
in: &LokiInstantRequest{
Query: `sum(count_over_time({app="foo"}[3m]))`,
TimeTs: time.Unix(1, 0),
Path: "/loki/api/v1/query",
},
subQueries: []queryrangebase.RequestResponse{
subQueryRequestResponse(`sum(count_over_time({app="foo"}[1m]))`, 1),
subQueryRequestResponse(`sum(count_over_time({app="foo"}[1m] offset 1m0s))`, 1),
subQueryRequestResponse(`sum(count_over_time({app="foo"}[1m] offset 2m0s))`, 1),
},
expected: expectedMergedResponse(1 + 1 + 1),
},
{
in: &LokiInstantRequest{
Query: `sum by (bar) (count_over_time({app="foo"}[3m]))`,
TimeTs: time.Unix(1, 0),
Path: "/loki/api/v1/query",
},
subQueries: []queryrangebase.RequestResponse{
subQueryRequestResponse(`sum by(bar)(count_over_time({app="foo"}[1m]))`, 0),
subQueryRequestResponse(`sum by(bar)(count_over_time({app="foo"}[1m] offset 1m0s))`, 0),
subQueryRequestResponse(`sum by(bar)(count_over_time({app="foo"}[1m] offset 2m0s))`, 0),
},
expected: expectedMergedResponse(0 + 0 + 0),
},
{
in: &LokiInstantRequest{
Query: `sum(sum_over_time({app="foo"} | unwrap bar [3m]))`,
TimeTs: time.Unix(1, 0),
Path: "/loki/api/v1/query",
},
subQueries: []queryrangebase.RequestResponse{
subQueryRequestResponse(`sum(sum_over_time({app="foo"} | unwrap bar[1m]))`, 1),
subQueryRequestResponse(`sum(sum_over_time({app="foo"} | unwrap bar[1m] offset 1m0s))`, 2),
subQueryRequestResponse(`sum(sum_over_time({app="foo"} | unwrap bar[1m] offset 2m0s))`, 3),
},
expected: expectedMergedResponse(1 + 2 + 3),
},
{
in: &LokiInstantRequest{
Query: `sum by (bar) (sum_over_time({app="foo"} | unwrap bar [3m]))`,
TimeTs: time.Unix(1, 0),
Path: "/loki/api/v1/query",
},
subQueries: []queryrangebase.RequestResponse{
subQueryRequestResponse(`sum by(bar)(sum_over_time({app="foo"} | unwrap bar[1m]))`, 1),
subQueryRequestResponse(`sum by(bar)(sum_over_time({app="foo"} | unwrap bar[1m] offset 1m0s))`, 2),
subQueryRequestResponse(`sum by(bar)(sum_over_time({app="foo"} | unwrap bar[1m] offset 2m0s))`, 3),
},
expected: expectedMergedResponse(1 + 2 + 3),
},
} {
tc := tc
t.Run(tc.in.GetQuery(), func(t *testing.T) {
resp, err := srm.Wrap(queryrangebase.HandlerFunc(
func(ctx context.Context, req queryrangebase.Request) (queryrangebase.Response, error) {
// Assert subquery request
for _, reqResp := range tc.subQueries {
if req.GetQuery() == reqResp.Request.GetQuery() {
require.Equal(t, reqResp.Request, req)
// return the test data subquery response
return reqResp.Response, nil
}
}
return nil, fmt.Errorf("subquery request '" + req.GetQuery() + "' not found")
})).Do(ctx, tc.in)
require.NoError(t, err)
require.Equal(t, tc.expected, resp.(*LokiPromResponse).Response)
})
}
}
// subQueryRequestResponse returns a RequestResponse containing the expected subQuery instant request
// and a response containing a sample value returned from the following wrapper
func subQueryRequestResponse(expectedSubQuery string, sampleValue float64) queryrangebase.RequestResponse {
return queryrangebase.RequestResponse{
Request: &LokiInstantRequest{
Query: expectedSubQuery,
TimeTs: time.Unix(1, 0),
Path: "/loki/api/v1/query",
},
Response: &LokiPromResponse{
Response: &queryrangebase.PrometheusResponse{
Status: loghttp.QueryStatusSuccess,
Data: queryrangebase.PrometheusData{
ResultType: loghttp.ResultTypeVector,
Result: []queryrangebase.SampleStream{
{
Labels: []logproto.LabelAdapter{
{Name: "app", Value: "foo"},
},
Samples: []logproto.LegacySample{
{TimestampMs: 1000, Value: sampleValue},
},
},
},
},
},
},
}
}
// expectedMergedResponse returns the expected middleware Prometheus response with the samples
// as the expectedSampleValue
func expectedMergedResponse(expectedSampleValue float64) *queryrangebase.PrometheusResponse {
return &queryrangebase.PrometheusResponse{
Status: loghttp.QueryStatusSuccess,
Data: queryrangebase.PrometheusData{
ResultType: loghttp.ResultTypeVector,
Result: []queryrangebase.SampleStream{
{
Labels: []logproto.LabelAdapter{},
Samples: []logproto.LegacySample{
{TimestampMs: 1000, Value: expectedSampleValue},
},
},
},
},
}
}
Loading…
Cancel
Save