@ -1377,7 +1377,7 @@ func (ev *evaluator) rangeEval(ctx context.Context, prepSeries func(labels.Label
return mat , warnings
}
func ( ev * evaluator ) rangeEvalAgg ( ctx context . Context , aggExpr * parser . AggregateExpr , sortedGrouping [ ] string , inputMatrix Matrix , param float64 ) ( Matrix , annotations . Annotations ) {
func ( ev * evaluator ) rangeEvalAgg ( ctx context . Context , aggExpr * parser . AggregateExpr , sortedGrouping [ ] string , inputMatrix Matrix , params * fParams ) ( Matrix , annotations . Annotations ) {
// Keep a copy of the original point slice so that it can be returned to the pool.
origMatrix := slices . Clone ( inputMatrix )
defer func ( ) {
@ -1387,7 +1387,7 @@ func (ev *evaluator) rangeEvalAgg(ctx context.Context, aggExpr *parser.Aggregate
}
} ( )
var warning s annotations . Annotations
var anno s annotations . Annotations
enh := & EvalNodeHelper { enableDelayedNameRemoval : ev . enableDelayedNameRemoval }
tempNumSamples := ev . currentSamples
@ -1417,46 +1417,43 @@ func (ev *evaluator) rangeEvalAgg(ctx context.Context, aggExpr *parser.Aggregate
}
groups := make ( [ ] groupedAggregation , groupCount )
var k int64
var ratio float64
var seriess map [ uint64 ] Series
switch aggExpr . Op {
case parser . TOPK , parser . BOTTOMK , parser . LIMITK :
if ! convertibleToInt64 ( param ) {
ev . errorf ( "Scalar value %v overflows int64" , param )
}
k = int64 ( param )
if k > int64 ( len ( inputMatrix ) ) {
k = int64 ( len ( inputMatrix ) )
}
if k < 1 {
return nil , warnings
// Return early if all k values are less than one.
if params . Max ( ) < 1 {
return nil , annos
}
seriess = make ( map [ uint64 ] Series , len ( inputMatrix ) ) // Output series by series hash.
seriess = make ( map [ uint64 ] Series , len ( inputMatrix ) )
case parser . LIMIT_RATIO :
if math . IsNaN ( param ) {
ev . errorf ( "Ratio value %v is NaN" , param )
// Return early if all r values are zero.
if params . Max ( ) == 0 && params . Min ( ) == 0 {
return nil , annos
}
switch {
case param == 0 :
return nil , warnings
case param < - 1.0 :
ratio = - 1.0
warnings . Add ( annotations . NewInvalidRatioWarning ( param , ratio , aggExpr . Param . PositionRange ( ) ) )
case param > 1.0 :
ratio = 1.0
warnings . Add ( annotations . NewInvalidRatioWarning ( param , ratio , aggExpr . Param . PositionRange ( ) ) )
default :
ratio = param
if params . Max ( ) > 1.0 {
annos . Add ( annotations . NewInvalidRatioWarning ( params . Max ( ) , 1.0 , aggExpr . Param . PositionRange ( ) ) )
}
if params . Min ( ) < - 1.0 {
annos . Add ( annotations . NewInvalidRatioWarning ( params . Min ( ) , - 1.0 , aggExpr . Param . PositionRange ( ) ) )
}
seriess = make ( map [ uint64 ] Series , len ( inputMatrix ) ) // Output series by series hash.
seriess = make ( map [ uint64 ] Series , len ( inputMatrix ) )
case parser . QUANTILE :
if math . IsNaN ( param ) || param < 0 || param > 1 {
warnings . Add ( annotations . NewInvalidQuantileWarning ( param , aggExpr . Param . PositionRange ( ) ) )
if params . HasAnyNaN ( ) {
annos . Add ( annotations . NewInvalidQuantileWarning ( math . NaN ( ) , aggExpr . Param . PositionRange ( ) ) )
}
if params . Max ( ) > 1 {
annos . Add ( annotations . NewInvalidQuantileWarning ( params . Max ( ) , aggExpr . Param . PositionRange ( ) ) )
}
if params . Min ( ) < 0 {
annos . Add ( annotations . NewInvalidQuantileWarning ( params . Min ( ) , aggExpr . Param . PositionRange ( ) ) )
}
}
for ts := ev . startTimestamp ; ts <= ev . endTimestamp ; ts += ev . interval {
fParam := params . Next ( )
if err := contextDone ( ctx , "expression evaluation" ) ; err != nil {
ev . error ( err )
}
@ -1468,17 +1465,17 @@ func (ev *evaluator) rangeEvalAgg(ctx context.Context, aggExpr *parser.Aggregate
var ws annotations . Annotations
switch aggExpr . Op {
case parser . TOPK , parser . BOTTOMK , parser . LIMITK , parser . LIMIT_RATIO :
result , ws = ev . aggregationK ( aggExpr , k , ratio , inputMatrix , seriesToResult , groups , enh , seriess )
result , ws = ev . aggregationK ( aggExpr , fParam , inputMatrix , seriesToResult , groups , enh , seriess )
// If this could be an instant query, shortcut so as not to change sort order.
if ev . end Timestamp == ev . start Timestamp {
warning s. Merge ( ws )
return result , warning s
if ev . start Timestamp == ev . end Timestamp {
anno s. Merge ( ws )
return result , anno s
}
default :
ws = ev . aggregation ( aggExpr , p aram, inputMatrix , result , seriesToResult , groups , enh )
ws = ev . aggregation ( aggExpr , fP aram, inputMatrix , result , seriesToResult , groups , enh )
}
warning s. Merge ( ws )
anno s. Merge ( ws )
if ev . currentSamples > ev . maxSamples {
ev . error ( ErrTooManySamples ( env ) )
@ -1503,7 +1500,7 @@ func (ev *evaluator) rangeEvalAgg(ctx context.Context, aggExpr *parser.Aggregate
}
result = result [ : dst ]
}
return result , warning s
return result , anno s
}
// evalSeries generates a Matrix between ev.startTimestamp and ev.endTimestamp (inclusive), each point spaced ev.interval apart, from series given offset.
@ -1681,18 +1678,14 @@ func (ev *evaluator) eval(ctx context.Context, expr parser.Expr) (parser.Value,
var warnings annotations . Annotations
originalNumSamples := ev . currentSamples
// param is the number k for topk/bottomk, or q for quantile.
var fParam float64
if param != nil {
val , ws := ev . eval ( ctx , param )
warnings . Merge ( ws )
fParam = val . ( Matrix ) [ 0 ] . Floats [ 0 ] . F
}
fp , ws := newFParams ( ctx , ev , param )
warnings . Merge ( ws )
// Now fetch the data to be aggregated.
val , ws := ev . eval ( ctx , e . Expr )
warnings . Merge ( ws )
inputMatrix := val . ( Matrix )
result , ws := ev . rangeEvalAgg ( ctx , e , sortedGrouping , inputMatrix , fParam )
result , ws := ev . rangeEvalAgg ( ctx , e , sortedGrouping , inputMatrix , fp )
warnings . Merge ( ws )
ev . currentSamples = originalNumSamples + result . TotalSamples ( )
ev . samplesStats . UpdatePeak ( ev . currentSamples )
@ -3269,7 +3262,7 @@ func (ev *evaluator) aggregation(e *parser.AggregateExpr, q float64, inputMatrix
// seriesToResult maps inputMatrix indexes to groups indexes.
// For an instant query, returns a Matrix in descending order for topk or ascending for bottomk, or without any order for limitk / limit_ratio.
// For a range query, aggregates output in the seriess map.
func ( ev * evaluator ) aggregationK ( e * parser . AggregateExpr , k int64 , r float64 , inputMatrix Matrix , seriesToResult [ ] int , groups [ ] groupedAggregation , enh * EvalNodeHelper , seriess map [ uint64 ] Series ) ( Matrix , annotations . Annotations ) {
func ( ev * evaluator ) aggregationK ( e * parser . AggregateExpr , fParam float64 , inputMatrix Matrix , seriesToResult [ ] int , groups [ ] groupedAggregation , enh * EvalNodeHelper , seriess map [ uint64 ] Series ) ( Matrix , annotations . Annotations ) {
op := e . Op
var s Sample
var annos annotations . Annotations
@ -3278,6 +3271,14 @@ func (ev *evaluator) aggregationK(e *parser.AggregateExpr, k int64, r float64, i
for i := range groups {
groups [ i ] . seen = false
}
// advanceRemainingSeries discards any values at the current timestamp `ts`
// for the remaining input series. In range queries, if these values are not
// consumed now, they will no longer be accessible in the next evaluation step.
advanceRemainingSeries := func ( ts int64 , startIdx int ) {
for i := startIdx ; i < len ( inputMatrix ) ; i ++ {
_ , _ , _ = ev . nextValues ( ts , & inputMatrix [ i ] )
}
}
seriesLoop :
for si := range inputMatrix {
@ -3287,6 +3288,42 @@ seriesLoop:
}
s = Sample { Metric : inputMatrix [ si ] . Metric , F : f , H : h , DropName : inputMatrix [ si ] . DropName }
var k int64
var r float64
switch op {
case parser . TOPK , parser . BOTTOMK , parser . LIMITK :
if ! convertibleToInt64 ( fParam ) {
ev . errorf ( "Scalar value %v overflows int64" , fParam )
}
k = int64 ( fParam )
if k > int64 ( len ( inputMatrix ) ) {
k = int64 ( len ( inputMatrix ) )
}
if k < 1 {
if enh . Ts != ev . endTimestamp {
advanceRemainingSeries ( enh . Ts , si + 1 )
}
return nil , annos
}
case parser . LIMIT_RATIO :
if math . IsNaN ( fParam ) {
ev . errorf ( "Ratio value %v is NaN" , fParam )
}
switch {
case fParam == 0 :
if enh . Ts != ev . endTimestamp {
advanceRemainingSeries ( enh . Ts , si + 1 )
}
return nil , annos
case fParam < - 1.0 :
r = - 1.0
case fParam > 1.0 :
r = 1.0
default :
r = fParam
}
}
group := & groups [ seriesToResult [ si ] ]
// Initialize this group if it's the first time we've seen it.
if ! group . seen {
@ -3377,6 +3414,10 @@ seriesLoop:
group . groupAggrComplete = true
groupsRemaining --
if groupsRemaining == 0 {
// Process other values in the series before breaking the loop in case of range query.
if enh . Ts != ev . endTimestamp {
advanceRemainingSeries ( enh . Ts , si + 1 )
}
break seriesLoop
}
}