feat: improve performance of `first_over_time` and `last_over_time` queries by sharding them (#11605)

Signed-off-by: Callum Styan <callumstyan@gmail.com>
Co-authored-by: Callum Styan <callumstyan@gmail.com>
pull/12958/head
Karsten Jeschkies 1 year ago committed by GitHub
parent 35e10d4004
commit f66172eed1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 129
      pkg/logql/downstream.go
  2. 6
      pkg/logql/downstream_test.go
  3. 10
      pkg/logql/engine.go
  4. 22
      pkg/logql/evaluator.go
  5. 13
      pkg/logql/explain.go
  6. 270
      pkg/logql/first_last_over_time.go
  7. 2
      pkg/logql/optimize.go
  8. 5
      pkg/logql/quantile_over_time_sketch.go
  9. 53
      pkg/logql/shardmapper.go
  10. 17
      pkg/logql/step_evaluator.go
  11. 13
      pkg/logql/syntax/ast.go
  12. 3
      pkg/logql/test_utils.go

@ -301,6 +301,62 @@ func (e *QuantileSketchMergeExpr) Walk(f syntax.WalkFn) {
}
}
type MergeFirstOverTimeExpr struct {
syntax.SampleExpr
downstreams []DownstreamSampleExpr
}
func (e MergeFirstOverTimeExpr) String() string {
var sb strings.Builder
for i, d := range e.downstreams {
if i >= defaultMaxDepth {
break
}
if i > 0 {
sb.WriteString(" ++ ")
}
sb.WriteString(d.String())
}
return fmt.Sprintf("MergeFirstOverTime<%s>", sb.String())
}
func (e *MergeFirstOverTimeExpr) Walk(f syntax.WalkFn) {
f(e)
for _, d := range e.downstreams {
d.Walk(f)
}
}
type MergeLastOverTimeExpr struct {
syntax.SampleExpr
downstreams []DownstreamSampleExpr
}
func (e MergeLastOverTimeExpr) String() string {
var sb strings.Builder
for i, d := range e.downstreams {
if i >= defaultMaxDepth {
break
}
if i > 0 {
sb.WriteString(" ++ ")
}
sb.WriteString(d.String())
}
return fmt.Sprintf("MergeLastOverTime<%s>", sb.String())
}
func (e *MergeLastOverTimeExpr) Walk(f syntax.WalkFn) {
f(e)
for _, d := range e.downstreams {
d.Walk(f)
}
}
type Downstreamable interface {
Downstreamer(context.Context) Downstreamer
}
@ -471,7 +527,80 @@ func (ev *DownstreamEvaluator) NewStepEvaluator(
}
inner := NewQuantileSketchMatrixStepEvaluator(matrix, params)
return NewQuantileSketchVectorStepEvaluator(inner, *e.quantile), nil
case *MergeFirstOverTimeExpr:
queries := make([]DownstreamQuery, len(e.downstreams))
for i, d := range e.downstreams {
qry := DownstreamQuery{
Params: ParamsWithExpressionOverride{
Params: params,
ExpressionOverride: d.SampleExpr,
},
}
if shard := d.shard; shard != nil {
qry.Params = ParamsWithShardsOverride{
Params: qry.Params,
ShardsOverride: Shards{shard.Shard}.Encode(),
}
}
queries[i] = qry
}
acc := NewBufferedAccumulator(len(queries))
results, err := ev.Downstream(ctx, queries, acc)
if err != nil {
return nil, err
}
xs := make([]promql.Matrix, 0, len(queries))
for _, res := range results {
switch data := res.Data.(type) {
case promql.Matrix:
xs = append(xs, data)
default:
return nil, fmt.Errorf("unexpected type (%s) uncoercible to StepEvaluator", data.Type())
}
}
return NewMergeFirstOverTimeStepEvaluator(params, xs), nil
case *MergeLastOverTimeExpr:
queries := make([]DownstreamQuery, len(e.downstreams))
for i, d := range e.downstreams {
qry := DownstreamQuery{
Params: ParamsWithExpressionOverride{
Params: params,
ExpressionOverride: d.SampleExpr,
},
}
if shard := d.shard; shard != nil {
qry.Params = ParamsWithShardsOverride{
Params: qry.Params,
ShardsOverride: Shards{shard.Shard}.Encode(),
}
}
queries[i] = qry
}
acc := NewBufferedAccumulator(len(queries))
results, err := ev.Downstream(ctx, queries, acc)
if err != nil {
return nil, err
}
xs := make([]promql.Matrix, 0, len(queries))
for _, res := range results {
switch data := res.Data.(type) {
case promql.Matrix:
xs = append(xs, data)
default:
return nil, fmt.Errorf("unexpected type (%s) uncoercible to StepEvaluator", data.Type())
}
}
return NewMergeLastOverTimeStepEvaluator(params, xs), nil
default:
return ev.defaultEvaluator.NewStepEvaluator(ctx, nextEvFactory, e, params)
}

@ -65,6 +65,10 @@ func TestMappingEquivalence(t *testing.T) {
`,
false,
},
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false},
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false},
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false},
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false},
// topk prefers already-seen values in tiebreakers. Since the test data generates
// the same log lines for each series & the resulting promql.Vectors aren't deterministically
// sorted by labels, we don't expect this to pass.
@ -141,7 +145,7 @@ func TestMappingEquivalenceSketches(t *testing.T) {
query string
realtiveError float64
}{
{`quantile_over_time(0.70, {a=~".+"} | logfmt | unwrap value [1s]) by (a)`, 0.03},
{`quantile_over_time(0.70, {a=~".+"} | logfmt | unwrap value [1s]) by (a)`, 0.05},
{`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s]) by (a)`, 0.02},
} {
q := NewMockQuerier(

@ -363,7 +363,7 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_
}
defer util.LogErrorWithContext(ctx, "closing SampleExpr", stepEvaluator.Close)
next, ts, r := stepEvaluator.Next()
next, _, r := stepEvaluator.Next()
if stepEvaluator.Error() != nil {
return nil, stepEvaluator.Error()
}
@ -373,7 +373,7 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_
case SampleVector:
maxSeriesCapture := func(id string) int { return q.limits.MaxQuerySeries(ctx, id) }
maxSeries := validation.SmallestPositiveIntPerTenant(tenantIDs, maxSeriesCapture)
return q.JoinSampleVector(next, ts, vec, stepEvaluator, maxSeries)
return q.JoinSampleVector(next, vec, stepEvaluator, maxSeries)
case ProbabilisticQuantileVector:
return MergeQuantileSketchVector(next, vec, stepEvaluator, q.params)
default:
@ -383,7 +383,7 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_
return nil, nil
}
func (q *query) JoinSampleVector(next bool, ts int64, r StepResult, stepEvaluator StepEvaluator, maxSeries int) (promql_parser.Value, error) {
func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEvaluator, maxSeries int) (promql_parser.Value, error) {
seriesIndex := map[uint64]*promql.Series{}
@ -431,7 +431,7 @@ func (q *query) JoinSampleVector(next bool, ts int64, r StepResult, stepEvaluato
seriesIndex[hash] = series
}
series.Floats = append(series.Floats, promql.FPoint{
T: ts,
T: p.T,
F: p.F,
})
}
@ -439,7 +439,7 @@ func (q *query) JoinSampleVector(next bool, ts int64, r StepResult, stepEvaluato
if len(seriesIndex) > maxSeries {
return nil, logqlmodel.NewSeriesLimitError(maxSeries)
}
next, ts, r = stepEvaluator.Next()
next, _, r = stepEvaluator.Next()
if stepEvaluator.Error() != nil {
return nil, stepEvaluator.Error()
}

@ -595,6 +595,28 @@ func newRangeAggEvaluator(
return &QuantileSketchStepEvaluator{
iter: iter,
}, nil
case syntax.OpRangeTypeFirstWithTimestamp:
iter := newFirstWithTimestampIterator(
it,
expr.Left.Interval.Nanoseconds(),
q.Step().Nanoseconds(),
q.Start().UnixNano(), q.End().UnixNano(), o.Nanoseconds(),
)
return &RangeVectorEvaluator{
iter: iter,
}, nil
case syntax.OpRangeTypeLastWithTimestamp:
iter := newLastWithTimestampIterator(
it,
expr.Left.Interval.Nanoseconds(),
q.Step().Nanoseconds(),
q.Start().UnixNano(), q.End().UnixNano(), o.Nanoseconds(),
)
return &RangeVectorEvaluator{
iter: iter,
}, nil
default:
iter, err := newRangeVectorIterator(
it, expr,

@ -57,3 +57,16 @@ func (e *BinOpStepEvaluator) Explain(parent Node) {
func (i *VectorIterator) Explain(parent Node) {
parent.Childf("%f vectorIterator", i.val)
}
func (e *QuantileSketchVectorStepEvaluator) Explain(parent Node) {
b := parent.Child("QuantileSketchVector")
e.inner.Explain(b)
}
func (e *mergeOverTimeStepEvaluator) Explain(parent Node) {
parent.Child("MergeFirstOverTime")
}
func (EmptyEvaluator[SampleVector]) Explain(parent Node) {
parent.Child("Empty")
}

@ -0,0 +1,270 @@
package logql
import (
"math"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/grafana/loki/v3/pkg/iter"
)
// newFirstWithTimestampIterator returns an iterator the returns the first value
// of a windowed aggregation.
func newFirstWithTimestampIterator(
it iter.PeekingSampleIterator,
selRange, step, start, end, offset int64) RangeVectorIterator {
inner := &batchRangeVectorIterator{
iter: it,
step: step,
end: end,
selRange: selRange,
metrics: map[string]labels.Labels{},
window: map[string]*promql.Series{},
agg: nil,
current: start - step, // first loop iteration will set it to start
offset: offset,
}
return &firstWithTimestampBatchRangeVectorIterator{
batchRangeVectorIterator: inner,
}
}
type firstWithTimestampBatchRangeVectorIterator struct {
*batchRangeVectorIterator
at []promql.Sample
}
// At aggregates the underlying window by picking the first sample with its
// timestamp.
func (r *firstWithTimestampBatchRangeVectorIterator) At() (int64, StepResult) {
if r.at == nil {
r.at = make([]promql.Sample, 0, len(r.window))
}
r.at = r.at[:0]
// convert ts from nano to milli seconds as the iterator work with nanoseconds
ts := r.current/1e+6 + r.offset/1e+6
for _, series := range r.window {
s := r.agg(series.Floats)
r.at = append(r.at, promql.Sample{
F: s.F,
T: s.T / int64(time.Millisecond),
Metric: series.Metric,
})
}
return ts, SampleVector(r.at)
}
// agg returns the first sample with its timestamp. The input is assumed to be
// in order.
func (r *firstWithTimestampBatchRangeVectorIterator) agg(samples []promql.FPoint) promql.FPoint {
if len(samples) == 0 {
return promql.FPoint{F: math.NaN(), T: 0}
}
return samples[0]
}
func newLastWithTimestampIterator(
it iter.PeekingSampleIterator,
selRange, step, start, end, offset int64) RangeVectorIterator {
inner := &batchRangeVectorIterator{
iter: it,
step: step,
end: end,
selRange: selRange,
metrics: map[string]labels.Labels{},
window: map[string]*promql.Series{},
agg: nil,
current: start - step, // first loop iteration will set it to start
offset: offset,
}
return &lastWithTimestampBatchRangeVectorIterator{
batchRangeVectorIterator: inner,
}
}
// lastWithTimestampBatchRangeVectorIterator returns an iterator that returns the
// last point in a windowed aggregation.
type lastWithTimestampBatchRangeVectorIterator struct {
*batchRangeVectorIterator
at []promql.Sample
}
// At aggregates the underlying window by picking the last sample with its
// timestamp.
func (r *lastWithTimestampBatchRangeVectorIterator) At() (int64, StepResult) {
if r.at == nil {
r.at = make([]promql.Sample, 0, len(r.window))
}
r.at = r.at[:0]
// convert ts from nano to milli seconds as the iterator work with nanoseconds
ts := r.current/1e+6 + r.offset/1e+6
for _, series := range r.window {
s := r.agg(series.Floats)
r.at = append(r.at, promql.Sample{
F: s.F,
T: s.T / int64(time.Millisecond),
Metric: series.Metric,
})
}
return ts, SampleVector(r.at)
}
// agg returns the last sample with its timestamp. The input is assumed to be
// in order.
func (r *lastWithTimestampBatchRangeVectorIterator) agg(samples []promql.FPoint) promql.FPoint {
if len(samples) == 0 {
return promql.FPoint{F: math.NaN(), T: 0}
}
return samples[len(samples)-1]
}
type mergeOverTimeStepEvaluator struct {
start, end, ts time.Time
step time.Duration
matrices []promql.Matrix
merge func(promql.Vector, int, int, promql.Series) promql.Vector
}
// Next returns the first or last element within one step of each matrix.
func (e *mergeOverTimeStepEvaluator) Next() (bool, int64, StepResult) {
var (
vec promql.Vector
)
e.ts = e.ts.Add(e.step)
if e.ts.After(e.end) {
return false, 0, nil
}
ts := e.ts.UnixNano() / int64(time.Millisecond)
// Merge other results
for i, m := range e.matrices {
for j, series := range m {
if len(series.Floats) == 0 || !e.inRange(series.Floats[0].T, ts) {
continue
}
vec = e.merge(vec, j, len(m), series)
e.pop(i, j)
}
}
// Align vector timestamps with step
for i := range vec {
vec[i].T = ts
}
if len(vec) == 0 {
return e.hasNext(), ts, SampleVector(vec)
}
return true, ts, SampleVector(vec)
}
// pop drops the float of the s'th series in the r'th matrix.
func (e *mergeOverTimeStepEvaluator) pop(r, s int) {
if len(e.matrices[r][s].Floats) <= 1 {
e.matrices[r][s].Floats = nil
return
}
e.matrices[r][s].Floats = e.matrices[r][s].Floats[1:]
}
// inRange returns true if t is in step range of ts.
func (e *mergeOverTimeStepEvaluator) inRange(t, ts int64) bool {
return (ts-e.step.Milliseconds()) <= t && t < ts
}
func (e *mergeOverTimeStepEvaluator) hasNext() bool {
for _, m := range e.matrices {
for _, s := range m {
if len(s.Floats) != 0 {
return true
}
}
}
return false
}
func (*mergeOverTimeStepEvaluator) Close() error { return nil }
func (*mergeOverTimeStepEvaluator) Error() error { return nil }
func NewMergeFirstOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEvaluator {
if len(m) == 0 {
return EmptyEvaluator[SampleVector]{}
}
var (
start = params.Start()
end = params.End()
step = params.Step()
)
return &mergeOverTimeStepEvaluator{
start: start,
end: end,
ts: start.Add(-step), // will be corrected on first Next() call
step: step,
matrices: m,
merge: mergeFirstOverTime,
}
}
// mergeFirstOverTime selects the first sample by timestamp of each series.
func mergeFirstOverTime(vec promql.Vector, pos int, nSeries int, series promql.Series) promql.Vector {
if len(vec) < nSeries {
return append(vec, promql.Sample{
Metric: series.Metric,
T: series.Floats[0].T,
F: series.Floats[0].F,
})
} else if vec[pos].T > series.Floats[0].T {
vec[pos].F = series.Floats[0].F
vec[pos].T = series.Floats[0].T
}
return vec
}
func NewMergeLastOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEvaluator {
if len(m) == 0 {
return EmptyEvaluator[SampleVector]{}
}
var (
start = params.Start()
end = params.End()
step = params.Step()
)
return &mergeOverTimeStepEvaluator{
start: start,
end: end,
ts: start.Add(-step), // will be corrected on first Next() call
step: step,
matrices: m,
merge: mergeLastOverTime,
}
}
// mergeLastOverTime selects the last sample by timestamp of each series.
func mergeLastOverTime(vec promql.Vector, pos int, nSeries int, series promql.Series) promql.Vector {
if len(vec) < nSeries {
return append(vec, promql.Sample{
Metric: series.Metric,
T: series.Floats[0].T,
F: series.Floats[0].F,
})
} else if vec[pos].T < series.Floats[0].T {
vec[pos].F = series.Floats[0].F
vec[pos].T = series.Floats[0].T
}
return vec
}

@ -8,7 +8,7 @@ func optimizeSampleExpr(expr syntax.SampleExpr) (syntax.SampleExpr, error) {
// we skip sharding AST for now, it's not easy to clone them since they are not part of the language.
expr.Walk(func(e syntax.Expr) {
switch e.(type) {
case *ConcatSampleExpr, DownstreamSampleExpr, *QuantileSketchEvalExpr, *QuantileSketchMergeExpr:
case *ConcatSampleExpr, DownstreamSampleExpr, *QuantileSketchEvalExpr, *QuantileSketchMergeExpr, *MergeFirstOverTimeExpr, *MergeLastOverTimeExpr:
skip = true
return
}

@ -448,8 +448,3 @@ func (e *QuantileSketchVectorStepEvaluator) Next() (bool, int64, StepResult) {
func (*QuantileSketchVectorStepEvaluator) Close() error { return nil }
func (*QuantileSketchVectorStepEvaluator) Error() error { return nil }
func (e *QuantileSketchVectorStepEvaluator) Explain(parent Node) {
b := parent.Child("QuantileSketchVector")
e.inner.Explain(b)
}

@ -471,6 +471,59 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,
quantile: expr.Params,
}, bytesPerShard, nil
case syntax.OpRangeTypeFirst:
potentialConflict := syntax.ReducesLabels(expr)
if !potentialConflict && (expr.Grouping == nil || expr.Grouping.Noop()) {
return m.mapSampleExpr(expr, r)
}
shards, bytesPerShard, err := m.shards.Shards(expr)
if err != nil {
return nil, 0, err
}
if len(shards) == 0 {
return noOp(expr, m.shards.Resolver())
}
downstreams := make([]DownstreamSampleExpr, 0, len(shards))
// This is the magic. We send a custom operation
expr.Operation = syntax.OpRangeTypeFirstWithTimestamp
for i := len(shards) - 1; i >= 0; i-- {
downstreams = append(downstreams, DownstreamSampleExpr{
shard: &shards[i],
SampleExpr: expr,
})
}
return &MergeFirstOverTimeExpr{
downstreams: downstreams,
}, bytesPerShard, nil
case syntax.OpRangeTypeLast:
potentialConflict := syntax.ReducesLabels(expr)
if !potentialConflict && (expr.Grouping == nil || expr.Grouping.Noop()) {
return m.mapSampleExpr(expr, r)
}
shards, bytesPerShard, err := m.shards.Shards(expr)
if err != nil {
return nil, 0, err
}
if len(shards) == 0 {
return noOp(expr, m.shards.Resolver())
}
downstreams := make([]DownstreamSampleExpr, 0, len(shards))
expr.Operation = syntax.OpRangeTypeLastWithTimestamp
for i := len(shards) - 1; i >= 0; i-- {
downstreams = append(downstreams, DownstreamSampleExpr{
shard: &shards[i],
SampleExpr: expr,
})
}
return &MergeLastOverTimeExpr{
downstreams: downstreams,
}, bytesPerShard, nil
default:
// don't shard if there's not an appropriate optimization
return noOp(expr, m.shards.Resolver())

@ -32,3 +32,20 @@ type StepEvaluator interface {
// Explain returns a print of the step evaluation tree
Explain(Node)
}
type EmptyEvaluator[R StepResult] struct {
value R
}
var _ StepEvaluator = EmptyEvaluator[SampleVector]{}
// Close implements StepEvaluator.
func (EmptyEvaluator[_]) Close() error { return nil }
// Error implements StepEvaluator.
func (EmptyEvaluator[_]) Error() error { return nil }
// Next implements StepEvaluator.
func (e EmptyEvaluator[_]) Next() (ok bool, ts int64, r StepResult) {
return false, 0, e.value
}

@ -1242,7 +1242,9 @@ const (
// internal expressions not represented in LogQL. These are used to
// evaluate expressions differently resulting in intermediate formats
// that are not consumable by LogQL clients but are used for sharding.
OpRangeTypeQuantileSketch = "__quantile_sketch_over_time__"
OpRangeTypeQuantileSketch = "__quantile_sketch_over_time__"
OpRangeTypeFirstWithTimestamp = "__first_over_time_ts__"
OpRangeTypeLastWithTimestamp = "__last_over_time_ts__"
)
func IsComparisonOperator(op string) bool {
@ -1346,7 +1348,9 @@ func (e *RangeAggregationExpr) MatcherGroups() ([]MatcherRange, error) {
func (e RangeAggregationExpr) validate() error {
if e.Grouping != nil {
switch e.Operation {
case OpRangeTypeAvg, OpRangeTypeStddev, OpRangeTypeStdvar, OpRangeTypeQuantile, OpRangeTypeQuantileSketch, OpRangeTypeMax, OpRangeTypeMin, OpRangeTypeFirst, OpRangeTypeLast:
case OpRangeTypeAvg, OpRangeTypeStddev, OpRangeTypeStdvar, OpRangeTypeQuantile,
OpRangeTypeQuantileSketch, OpRangeTypeMax, OpRangeTypeMin, OpRangeTypeFirst,
OpRangeTypeLast, OpRangeTypeFirstWithTimestamp, OpRangeTypeLastWithTimestamp:
default:
return fmt.Errorf("grouping not allowed for %s aggregation", e.Operation)
}
@ -1355,7 +1359,8 @@ func (e RangeAggregationExpr) validate() error {
switch e.Operation {
case OpRangeTypeAvg, OpRangeTypeSum, OpRangeTypeMax, OpRangeTypeMin, OpRangeTypeStddev,
OpRangeTypeStdvar, OpRangeTypeQuantile, OpRangeTypeRate, OpRangeTypeRateCounter,
OpRangeTypeAbsent, OpRangeTypeFirst, OpRangeTypeLast, OpRangeTypeQuantileSketch:
OpRangeTypeAbsent, OpRangeTypeFirst, OpRangeTypeLast, OpRangeTypeQuantileSketch,
OpRangeTypeFirstWithTimestamp, OpRangeTypeLastWithTimestamp:
return nil
default:
return fmt.Errorf("invalid aggregation %s with unwrap", e.Operation)
@ -2216,6 +2221,8 @@ var shardableOps = map[string]bool{
// range vector ops
OpRangeTypeAvg: true,
OpRangeTypeCount: true,
OpRangeTypeFirst: true,
OpRangeTypeLast: true,
OpRangeTypeRate: true,
OpRangeTypeBytes: true,
OpRangeTypeBytesRate: true,

@ -277,8 +277,9 @@ func randomStreams(nStreams, nEntries, nShards int, labelNames []string, valueFi
if valueField {
line = fmt.Sprintf("%s value=%f", line, r.Float64()*100.0)
}
nanos := r.Int63n(time.Second.Nanoseconds())
stream.Entries = append(stream.Entries, logproto.Entry{
Timestamp: time.Unix(0, int64(j*int(time.Second))),
Timestamp: time.Unix(0, int64(j*int(time.Second))+nanos),
Line: line,
})
}

Loading…
Cancel
Save