From 8b0d33e5b20475318a1a72857ed8a9947b13e1aa Mon Sep 17 00:00:00 2001 From: Neeraj Gartia <80708727+NeerajGartia21@users.noreply.github.com> Date: Sun, 11 May 2025 19:10:31 +0530 Subject: [PATCH] promql: support variable scalar parameter in aggregations in range queries (#16404) This fixes the regression introduced in https://github.com/prometheus/prometheus/issues/15971 while preserving the performance improvements. Signed-off-by: Neeraj Gartia --- promql/engine.go | 131 +++++++++++++------- promql/promqltest/testdata/aggregators.test | 20 ++- promql/promqltest/testdata/limit.test | 22 +++- promql/value.go | 66 ++++++++++ 4 files changed, 186 insertions(+), 53 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index a2738fdc1e..b5fec3153e 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1377,7 +1377,7 @@ func (ev *evaluator) rangeEval(ctx context.Context, prepSeries func(labels.Label return mat, warnings } -func (ev *evaluator) rangeEvalAgg(ctx context.Context, aggExpr *parser.AggregateExpr, sortedGrouping []string, inputMatrix Matrix, param float64) (Matrix, annotations.Annotations) { +func (ev *evaluator) rangeEvalAgg(ctx context.Context, aggExpr *parser.AggregateExpr, sortedGrouping []string, inputMatrix Matrix, params *fParams) (Matrix, annotations.Annotations) { // Keep a copy of the original point slice so that it can be returned to the pool. origMatrix := slices.Clone(inputMatrix) defer func() { @@ -1387,7 +1387,7 @@ func (ev *evaluator) rangeEvalAgg(ctx context.Context, aggExpr *parser.Aggregate } }() - var warnings annotations.Annotations + var annos annotations.Annotations enh := &EvalNodeHelper{enableDelayedNameRemoval: ev.enableDelayedNameRemoval} tempNumSamples := ev.currentSamples @@ -1417,46 +1417,43 @@ func (ev *evaluator) rangeEvalAgg(ctx context.Context, aggExpr *parser.Aggregate } groups := make([]groupedAggregation, groupCount) - var k int64 - var ratio float64 var seriess map[uint64]Series + switch aggExpr.Op { case parser.TOPK, parser.BOTTOMK, parser.LIMITK: - if !convertibleToInt64(param) { - ev.errorf("Scalar value %v overflows int64", param) - } - k = int64(param) - if k > int64(len(inputMatrix)) { - k = int64(len(inputMatrix)) - } - if k < 1 { - return nil, warnings + // Return early if all k values are less than one. + if params.Max() < 1 { + return nil, annos } - seriess = make(map[uint64]Series, len(inputMatrix)) // Output series by series hash. + seriess = make(map[uint64]Series, len(inputMatrix)) + case parser.LIMIT_RATIO: - if math.IsNaN(param) { - ev.errorf("Ratio value %v is NaN", param) + // Return early if all r values are zero. + if params.Max() == 0 && params.Min() == 0 { + return nil, annos } - switch { - case param == 0: - return nil, warnings - case param < -1.0: - ratio = -1.0 - warnings.Add(annotations.NewInvalidRatioWarning(param, ratio, aggExpr.Param.PositionRange())) - case param > 1.0: - ratio = 1.0 - warnings.Add(annotations.NewInvalidRatioWarning(param, ratio, aggExpr.Param.PositionRange())) - default: - ratio = param + if params.Max() > 1.0 { + annos.Add(annotations.NewInvalidRatioWarning(params.Max(), 1.0, aggExpr.Param.PositionRange())) + } + if params.Min() < -1.0 { + annos.Add(annotations.NewInvalidRatioWarning(params.Min(), -1.0, aggExpr.Param.PositionRange())) } - seriess = make(map[uint64]Series, len(inputMatrix)) // Output series by series hash. + seriess = make(map[uint64]Series, len(inputMatrix)) + case parser.QUANTILE: - if math.IsNaN(param) || param < 0 || param > 1 { - warnings.Add(annotations.NewInvalidQuantileWarning(param, aggExpr.Param.PositionRange())) + if params.HasAnyNaN() { + annos.Add(annotations.NewInvalidQuantileWarning(math.NaN(), aggExpr.Param.PositionRange())) + } + if params.Max() > 1 { + annos.Add(annotations.NewInvalidQuantileWarning(params.Max(), aggExpr.Param.PositionRange())) + } + if params.Min() < 0 { + annos.Add(annotations.NewInvalidQuantileWarning(params.Min(), aggExpr.Param.PositionRange())) } } for ts := ev.startTimestamp; ts <= ev.endTimestamp; ts += ev.interval { + fParam := params.Next() if err := contextDone(ctx, "expression evaluation"); err != nil { ev.error(err) } @@ -1468,17 +1465,17 @@ func (ev *evaluator) rangeEvalAgg(ctx context.Context, aggExpr *parser.Aggregate var ws annotations.Annotations switch aggExpr.Op { case parser.TOPK, parser.BOTTOMK, parser.LIMITK, parser.LIMIT_RATIO: - result, ws = ev.aggregationK(aggExpr, k, ratio, inputMatrix, seriesToResult, groups, enh, seriess) + result, ws = ev.aggregationK(aggExpr, fParam, inputMatrix, seriesToResult, groups, enh, seriess) // If this could be an instant query, shortcut so as not to change sort order. - if ev.endTimestamp == ev.startTimestamp { - warnings.Merge(ws) - return result, warnings + if ev.startTimestamp == ev.endTimestamp { + annos.Merge(ws) + return result, annos } default: - ws = ev.aggregation(aggExpr, param, inputMatrix, result, seriesToResult, groups, enh) + ws = ev.aggregation(aggExpr, fParam, inputMatrix, result, seriesToResult, groups, enh) } - warnings.Merge(ws) + annos.Merge(ws) if ev.currentSamples > ev.maxSamples { ev.error(ErrTooManySamples(env)) @@ -1503,7 +1500,7 @@ func (ev *evaluator) rangeEvalAgg(ctx context.Context, aggExpr *parser.Aggregate } result = result[:dst] } - return result, warnings + return result, annos } // evalSeries generates a Matrix between ev.startTimestamp and ev.endTimestamp (inclusive), each point spaced ev.interval apart, from series given offset. @@ -1681,18 +1678,14 @@ func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value, var warnings annotations.Annotations originalNumSamples := ev.currentSamples // param is the number k for topk/bottomk, or q for quantile. - var fParam float64 - if param != nil { - val, ws := ev.eval(ctx, param) - warnings.Merge(ws) - fParam = val.(Matrix)[0].Floats[0].F - } + fp, ws := newFParams(ctx, ev, param) + warnings.Merge(ws) // Now fetch the data to be aggregated. val, ws := ev.eval(ctx, e.Expr) warnings.Merge(ws) inputMatrix := val.(Matrix) - result, ws := ev.rangeEvalAgg(ctx, e, sortedGrouping, inputMatrix, fParam) + result, ws := ev.rangeEvalAgg(ctx, e, sortedGrouping, inputMatrix, fp) warnings.Merge(ws) ev.currentSamples = originalNumSamples + result.TotalSamples() ev.samplesStats.UpdatePeak(ev.currentSamples) @@ -3269,7 +3262,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix // seriesToResult maps inputMatrix indexes to groups indexes. // For an instant query, returns a Matrix in descending order for topk or ascending for bottomk, or without any order for limitk / limit_ratio. // For a range query, aggregates output in the seriess map. -func (ev *evaluator) aggregationK(e *parser.AggregateExpr, k int64, r float64, inputMatrix Matrix, seriesToResult []int, groups []groupedAggregation, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) { +func (ev *evaluator) aggregationK(e *parser.AggregateExpr, fParam float64, inputMatrix Matrix, seriesToResult []int, groups []groupedAggregation, enh *EvalNodeHelper, seriess map[uint64]Series) (Matrix, annotations.Annotations) { op := e.Op var s Sample var annos annotations.Annotations @@ -3278,6 +3271,14 @@ func (ev *evaluator) aggregationK(e *parser.AggregateExpr, k int64, r float64, i for i := range groups { groups[i].seen = false } + // advanceRemainingSeries discards any values at the current timestamp `ts` + // for the remaining input series. In range queries, if these values are not + // consumed now, they will no longer be accessible in the next evaluation step. + advanceRemainingSeries := func(ts int64, startIdx int) { + for i := startIdx; i < len(inputMatrix); i++ { + _, _, _ = ev.nextValues(ts, &inputMatrix[i]) + } + } seriesLoop: for si := range inputMatrix { @@ -3287,6 +3288,42 @@ seriesLoop: } s = Sample{Metric: inputMatrix[si].Metric, F: f, H: h, DropName: inputMatrix[si].DropName} + var k int64 + var r float64 + switch op { + case parser.TOPK, parser.BOTTOMK, parser.LIMITK: + if !convertibleToInt64(fParam) { + ev.errorf("Scalar value %v overflows int64", fParam) + } + k = int64(fParam) + if k > int64(len(inputMatrix)) { + k = int64(len(inputMatrix)) + } + if k < 1 { + if enh.Ts != ev.endTimestamp { + advanceRemainingSeries(enh.Ts, si+1) + } + return nil, annos + } + case parser.LIMIT_RATIO: + if math.IsNaN(fParam) { + ev.errorf("Ratio value %v is NaN", fParam) + } + switch { + case fParam == 0: + if enh.Ts != ev.endTimestamp { + advanceRemainingSeries(enh.Ts, si+1) + } + return nil, annos + case fParam < -1.0: + r = -1.0 + case fParam > 1.0: + r = 1.0 + default: + r = fParam + } + } + group := &groups[seriesToResult[si]] // Initialize this group if it's the first time we've seen it. if !group.seen { @@ -3377,6 +3414,10 @@ seriesLoop: group.groupAggrComplete = true groupsRemaining-- if groupsRemaining == 0 { + // Process other values in the series before breaking the loop in case of range query. + if enh.Ts != ev.endTimestamp { + advanceRemainingSeries(enh.Ts, si+1) + } break seriesLoop } } diff --git a/promql/promqltest/testdata/aggregators.test b/promql/promqltest/testdata/aggregators.test index 1e3ab79a35..b8ebdc55c6 100644 --- a/promql/promqltest/testdata/aggregators.test +++ b/promql/promqltest/testdata/aggregators.test @@ -274,7 +274,7 @@ load 5m http_requests{job="app-server", instance="1", group="canary"} 0+80x10 http_requests_histogram{job="app-server", instance="2", group="canary"} {{schema:0 sum:10 count:10}}x11 http_requests_histogram{job="api-server", instance="3", group="production"} {{schema:0 sum:20 count:20}}x11 - foo 3+0x10 + foo 1+1x9 3 eval_ordered instant at 50m topk(3, http_requests) http_requests{group="canary", instance="1", job="app-server"} 800 @@ -340,6 +340,13 @@ eval_ordered instant at 50m topk(scalar(foo), http_requests) http_requests{group="canary", instance="0", job="app-server"} 700 http_requests{group="production", instance="1", job="app-server"} 600 +# Bug #15971. +eval range from 0m to 50m step 5m count(topk(scalar(foo), http_requests)) + {} 1 2 3 4 5 6 7 8 9 9 3 + +eval range from 0m to 50m step 5m count(bottomk(scalar(foo), http_requests)) + {} 1 2 3 4 5 6 7 8 9 9 3 + # Tests for histogram: should ignore histograms. eval_info instant at 50m topk(100, http_requests_histogram) #empty @@ -447,7 +454,7 @@ load 10s data{test="uneven samples",point="b"} 1 data{test="uneven samples",point="c"} 4 data_histogram{test="histogram sample", point="c"} {{schema:2 count:4 sum:10 buckets:[1 0 0 0 1 0 0 1 1]}} - foo .8 + foo 0 1 0 1 0 1 0.8 eval instant at 1m quantile without(point)(0.8, data) {test="two samples"} 0.8 @@ -475,11 +482,18 @@ eval instant at 1m quantile without(point)((scalar(foo)), data) {test="three samples"} 1.6 {test="uneven samples"} 2.8 -eval_warn instant at 1m quantile without(point)(NaN, data) +eval instant at 1m quantile without(point)(NaN, data) + expect warn msg: PromQL warning: quantile value should be between 0 and 1, got NaN {test="two samples"} NaN {test="three samples"} NaN {test="uneven samples"} NaN +# Bug #15971. +eval range from 0m to 1m step 10s quantile without(point) (scalar(foo), data) + {test="two samples"} 0 1 0 1 0 1 0.8 + {test="three samples"} 0 2 0 2 0 2 1.6 + {test="uneven samples"} 0 4 0 4 0 4 2.8 + # Tests for group. clear diff --git a/promql/promqltest/testdata/limit.test b/promql/promqltest/testdata/limit.test index e6dd007af4..484760cc85 100644 --- a/promql/promqltest/testdata/limit.test +++ b/promql/promqltest/testdata/limit.test @@ -11,6 +11,8 @@ load 5m http_requests{job="api-server", instance="3", group="canary"} 0+60x10 http_requests{job="api-server", instance="histogram_1", group="canary"} {{schema:0 sum:10 count:10}}x11 http_requests{job="api-server", instance="histogram_2", group="canary"} {{schema:0 sum:20 count:20}}x11 + foo 1+1x10 + bar 0 1 0 -1 0 1 0 -1 0 1 0 eval instant at 50m count(limitk by (group) (0, http_requests)) # empty @@ -69,6 +71,10 @@ eval instant at 50m count(limitk(1000, http_requests{instance=~"histogram_[0-9]" eval range from 0 to 50m step 5m count(limitk(1000, http_requests{instance=~"histogram_[0-9]"})) {} 2+0x10 +# Bug #15971. +eval range from 0m to 50m step 5m count(limitk(scalar(foo), http_requests)) + {} 1 2 3 4 5 6 7 8 8 8 8 + # limit_ratio eval range from 0 to 50m step 5m count(limit_ratio(0.0, http_requests)) # empty @@ -105,11 +111,13 @@ eval range from 0 to 50m step 5m count(limit_ratio(-1.0, http_requests) and http {} 8+0x10 # Capped to 1.0 -> all samples. -eval_warn range from 0 to 50m step 5m count(limit_ratio(1.1, http_requests) and http_requests) +eval range from 0 to 50m step 5m count(limit_ratio(1.1, http_requests) and http_requests) + expect warn msg: PromQL warning: ratio value should be between -1 and 1, got 1.1, capping to 1 {} 8+0x10 # Capped to -1.0 -> all samples. -eval_warn range from 0 to 50m step 5m count(limit_ratio(-1.1, http_requests) and http_requests) +eval range from 0 to 50m step 5m count(limit_ratio(-1.1, http_requests) and http_requests) + expect warn msg: PromQL warning: ratio value should be between -1 and 1, got -1.1, capping to -1 {} 8+0x10 # Verify that limit_ratio(value) and limit_ratio(1.0-value) return the "complement" of each other. @@ -137,12 +145,12 @@ eval range from 0 to 50m step 5m count(limit_ratio(0.8, http_requests) or limit_ eval range from 0 to 50m step 5m count(limit_ratio(0.8, http_requests) and limit_ratio(-0.2, http_requests)) # empty -# Complement below for [some_ratio, 1.0 - some_ratio], some_ratio derived from time(), +# Complement below for [some_ratio, - (1.0 - some_ratio)], some_ratio derived from time(), # using a small prime number to avoid rounded ratio values, and a small set of them. -eval range from 0 to 50m step 5m count(limit_ratio(time() % 17/17, http_requests) or limit_ratio(1.0 - (time() % 17/17), http_requests)) +eval range from 0 to 50m step 5m count(limit_ratio(time() % 17/17, http_requests) or limit_ratio( - (1.0 - (time() % 17/17)), http_requests)) {} 8+0x10 -eval range from 0 to 50m step 5m count(limit_ratio(time() % 17/17, http_requests) and limit_ratio(1.0 - (time() % 17/17), http_requests)) +eval range from 0 to 50m step 5m count(limit_ratio(time() % 17/17, http_requests) and limit_ratio( - (1.0 - (time() % 17/17)), http_requests)) # empty # Poor man's normality check: ok (loaded samples follow a nice linearity over labels and time). @@ -156,3 +164,7 @@ eval instant at 50m limit_ratio(1, http_requests{instance="histogram_1"}) eval range from 0 to 50m step 5m limit_ratio(1, http_requests{instance="histogram_1"}) {__name__="http_requests", group="canary", instance="histogram_1", job="api-server"} {{count:10 sum:10}}x10 + +# Bug #15971. +eval range from 0m to 50m step 5m count(limit_ratio(scalar(bar), http_requests)) + {} _ 8 _ 8 _ 8 _ 8 _ 8 _ diff --git a/promql/value.go b/promql/value.go index f19c0b5b58..dc59b9e9cc 100644 --- a/promql/value.go +++ b/promql/value.go @@ -14,6 +14,7 @@ package promql import ( + "context" "encoding/json" "errors" "fmt" @@ -533,3 +534,68 @@ func (ssi *storageSeriesIterator) Next() chunkenc.ValueType { func (ssi *storageSeriesIterator) Err() error { return nil } + +type fParams struct { + series Series + constValue float64 + isConstant bool + minValue float64 + maxValue float64 + hasAnyNaN bool +} + +// newFParams evaluates the expression and returns an fParams object, +// which holds the parameter values (constant or series) along with min, max, and NaN info. +func newFParams(ctx context.Context, ev *evaluator, expr parser.Expr) (*fParams, annotations.Annotations) { + if expr == nil { + return &fParams{}, nil + } + var constParam bool + if _, ok := expr.(*parser.NumberLiteral); ok { + constParam = true + } + val, ws := ev.eval(ctx, expr) + mat, ok := val.(Matrix) + if !ok || len(mat) == 0 { + return &fParams{}, ws + } + fp := &fParams{ + series: mat[0], + isConstant: constParam, + minValue: math.MaxFloat64, + maxValue: -math.MaxFloat64, + } + + if constParam { + fp.constValue = fp.series.Floats[0].F + fp.minValue, fp.maxValue = fp.constValue, fp.constValue + fp.hasAnyNaN = math.IsNaN(fp.constValue) + return fp, ws + } + + for _, v := range fp.series.Floats { + fp.maxValue = math.Max(fp.maxValue, v.F) + fp.minValue = math.Min(fp.minValue, v.F) + if math.IsNaN(v.F) { + fp.hasAnyNaN = true + } + } + return fp, ws +} + +func (fp *fParams) Max() float64 { return fp.maxValue } +func (fp *fParams) Min() float64 { return fp.minValue } +func (fp *fParams) HasAnyNaN() bool { return fp.hasAnyNaN } + +// Next returns the next value from the series or the constant value, and advances the series if applicable. +func (fp *fParams) Next() float64 { + if fp.isConstant { + return fp.constValue + } + if len(fp.series.Floats) > 0 { + val := fp.series.Floats[0].F + fp.series.Floats = fp.series.Floats[1:] + return val + } + return 0 +}