@ -16,31 +16,33 @@ import (
"github.com/grafana/loki/pkg/logql/vector"
)
// RangeVectorAggregator aggregates samples for a given range of samples.
// Batch RangeVectorAggregator aggregates samples for a given range of samples.
// It receives the current milliseconds timestamp and the list of point within
// the range.
type RangeVectorAggregator func ( [ ] promql . Point ) float64
type BatchRangeVectorAggregator func ( [ ] promql . Point ) float64
// RangeStreamingAgg streaming aggregates sample for each sample
type RangeStreamingAgg interface {
// agg func works inside the Next func of RangeVectorIterator, agg used to agg each sample.
// agg will calculate the intermediate result after streaming agg each sample and try to save an aggregate value instead of keeping all samples.
agg ( sample promql . Point )
// at func works inside the At func of RangeVectorIterator, get the intermediate result of agg func to provide the final value for At func of RangeVectorIterator
at ( ) float64
}
// RangeVectorIterator iterates through a range of samples.
// To fetch the current vector use `At` with a `RangeVectorAggregator`.
// To fetch the current vector use `At` with a `Batch RangeVectorAggregator` or `RangeStreamingAgg `.
type RangeVectorIterator interface {
Next ( ) bool
At ( aggregator RangeVectorAggregator ) ( int64 , promql . Vector )
At ( ) ( int64 , promql . Vector )
Close ( ) error
Error ( ) error
}
type rangeVectorIterator struct {
iter iter . PeekingSampleIterator
selRange , step , end , current , offset int64
window map [ string ] * promql . Series
metrics map [ string ] labels . Labels
at [ ] promql . Sample
}
func newRangeVectorIterator (
it iter . PeekingSampleIterator ,
selRange , step , start , end , offset int64 ) * rangeVectorIterator {
expr * syntax . RangeAggregationExpr ,
selRange , step , start , end , offset int64 ) ( RangeVectorIterator , error ) {
// forces at least one step.
if step == 0 {
step = 1
@ -49,19 +51,55 @@ func newRangeVectorIterator(
start = start - offset
end = end - offset
}
return & rangeVectorIterator {
var overlap bool
if selRange >= step && start != end {
overlap = true
}
if ! overlap {
_ , err := streamingAggregator ( expr )
if err != nil {
return nil , err
}
return & streamRangeVectorIterator {
iter : it ,
step : step ,
end : end ,
selRange : selRange ,
metrics : map [ string ] labels . Labels { } ,
r : expr ,
current : start - step , // first loop iteration will set it to start
offset : offset ,
} , nil
}
vectorAggregator , err := aggregator ( expr )
if err != nil {
return nil , err
}
return & batchRangeVectorIterator {
iter : it ,
step : step ,
end : end ,
selRange : selRange ,
metrics : map [ string ] labels . Labels { } ,
window : map [ string ] * promql . Series { } ,
agg : vectorAggregator ,
current : start - step , // first loop iteration will set it to start
offset : offset ,
window : map [ string ] * promql . Series { } ,
metrics : map [ string ] labels . Labels { } ,
}
} , nil
}
//batch
type batchRangeVectorIterator struct {
iter iter . PeekingSampleIterator
selRange , step , end , current , offset int64
window map [ string ] * promql . Series
metrics map [ string ] labels . Labels
at [ ] promql . Sample
agg BatchRangeVectorAggregator
}
func ( r * rangeVectorIterator ) Next ( ) bool {
func ( r * batchR angeVectorIterator) Next ( ) bool {
// slides the range window to the next position
r . current = r . current + r . step
if r . current > r . end {
@ -75,16 +113,16 @@ func (r *rangeVectorIterator) Next() bool {
return true
}
func ( r * r angeVectorIterator) Close ( ) error {
func ( r * batchR angeVectorIterator) Close ( ) error {
return r . iter . Close ( )
}
func ( r * r angeVectorIterator) Error ( ) error {
func ( r * batchR angeVectorIterator) Error ( ) error {
return r . iter . Error ( )
}
// popBack removes all entries out of the current window from the back.
func ( r * r angeVectorIterator) popBack ( newStart int64 ) {
func ( r * batchR angeVectorIterator) popBack ( newStart int64 ) {
// possible improvement: if there is no overlap we can just remove all.
for fp := range r . window {
lastPoint := 0
@ -109,7 +147,7 @@ func (r *rangeVectorIterator) popBack(newStart int64) {
}
// load the next sample range window.
func ( r * r angeVectorIterator) load ( start , end int64 ) {
func ( r * batchR angeVectorIterator) load ( start , end int64 ) {
for lbs , sample , hasNext := r . iter . Peek ( ) ; hasNext ; lbs , sample , hasNext = r . iter . Peek ( ) {
if sample . Timestamp > end {
// not consuming the iterator as this belong to another range.
@ -148,8 +186,7 @@ func (r *rangeVectorIterator) load(start, end int64) {
_ = r . iter . Next ( )
}
}
func ( r * rangeVectorIterator ) At ( aggregator RangeVectorAggregator ) ( int64 , promql . Vector ) {
func ( r * batchRangeVectorIterator ) At ( ) ( int64 , promql . Vector ) {
if r . at == nil {
r . at = make ( [ ] promql . Sample , 0 , len ( r . window ) )
}
@ -159,7 +196,7 @@ func (r *rangeVectorIterator) At(aggregator RangeVectorAggregator) (int64, promq
for _ , series := range r . window {
r . at = append ( r . at , promql . Sample {
Point : promql . Point {
V : aggregator ( series . Points ) ,
V : r . agg ( series . Points ) ,
T : ts ,
} ,
Metric : series . Metric ,
@ -185,7 +222,7 @@ func putSeries(s *promql.Series) {
seriesPool . Put ( s )
}
func aggregator ( r * syntax . RangeAggregationExpr ) ( RangeVectorAggregator , error ) {
func aggregator ( r * syntax . RangeAggregationExpr ) ( Batch RangeVectorAggregator, error ) {
switch r . Operation {
case syntax . OpRangeTypeRate :
return rateLogs ( r . Left . Interval , r . Left . Unwrap != nil ) , nil
@ -466,3 +503,353 @@ func last(samples []promql.Point) float64 {
func one ( samples [ ] promql . Point ) float64 {
return 1.0
}
// streaming range agg
type streamRangeVectorIterator struct {
iter iter . PeekingSampleIterator
selRange , step , end , current , offset int64
windowRangeAgg map [ string ] RangeStreamingAgg
r * syntax . RangeAggregationExpr
metrics map [ string ] labels . Labels
at [ ] promql . Sample
agg BatchRangeVectorAggregator
}
func ( r * streamRangeVectorIterator ) Next ( ) bool {
// slides the range window to the next position
r . current = r . current + r . step
if r . current > r . end {
return false
}
rangeEnd := r . current
rangeStart := rangeEnd - r . selRange
// load samples
r . windowRangeAgg = make ( map [ string ] RangeStreamingAgg , 0 )
r . metrics = map [ string ] labels . Labels { }
r . load ( rangeStart , rangeEnd )
return true
}
func ( r * streamRangeVectorIterator ) Close ( ) error {
return r . iter . Close ( )
}
func ( r * streamRangeVectorIterator ) Error ( ) error {
return r . iter . Error ( )
}
// load the next sample range window.
func ( r * streamRangeVectorIterator ) load ( start , end int64 ) {
for lbs , sample , hasNext := r . iter . Peek ( ) ; hasNext ; lbs , sample , hasNext = r . iter . Peek ( ) {
if sample . Timestamp > end {
// not consuming the iterator as this belong to another range.
return
}
// the lower bound of the range is not inclusive
if sample . Timestamp <= start {
_ = r . iter . Next ( )
continue
}
// adds the sample.
var rangeAgg RangeStreamingAgg
var ok bool
rangeAgg , ok = r . windowRangeAgg [ lbs ]
if ! ok {
var metric labels . Labels
if _ , ok = r . metrics [ lbs ] ; ! ok {
var err error
metric , err = promql_parser . ParseMetric ( lbs )
if err != nil {
_ = r . iter . Next ( )
continue
}
r . metrics [ lbs ] = metric
}
// never err here ,we have check error at evaluator.go rangeAggEvaluator() func
rangeAgg , _ = streamingAggregator ( r . r )
r . windowRangeAgg [ lbs ] = rangeAgg
}
p := promql . Point {
T : sample . Timestamp ,
V : sample . Value ,
}
rangeAgg . agg ( p )
_ = r . iter . Next ( )
}
}
func ( r * streamRangeVectorIterator ) At ( ) ( int64 , promql . Vector ) {
if r . at == nil {
r . at = make ( [ ] promql . Sample , 0 , len ( r . windowRangeAgg ) )
}
r . at = r . at [ : 0 ]
// convert ts from nano to milli seconds as the iterator work with nanoseconds
ts := r . current / 1e+6 + r . offset / 1e+6
for lbs , rangeAgg := range r . windowRangeAgg {
r . at = append ( r . at , promql . Sample {
Point : promql . Point {
V : rangeAgg . at ( ) ,
T : ts ,
} ,
Metric : r . metrics [ lbs ] ,
} )
}
return ts , r . at
}
func streamingAggregator ( r * syntax . RangeAggregationExpr ) ( RangeStreamingAgg , error ) {
switch r . Operation {
case syntax . OpRangeTypeRate :
return newRateLogs ( r . Left . Interval , r . Left . Unwrap != nil ) , nil
case syntax . OpRangeTypeRateCounter :
return & RateCounterOverTime { selRange : r . Left . Interval , samples : make ( [ ] promql . Point , 0 ) } , nil
case syntax . OpRangeTypeCount :
return & CountOverTime { } , nil
case syntax . OpRangeTypeBytesRate :
return & RateLogBytesOverTime { selRange : r . Left . Interval } , nil
case syntax . OpRangeTypeBytes , syntax . OpRangeTypeSum :
return & SumOverTime { } , nil
case syntax . OpRangeTypeAvg :
return & AvgOverTime { } , nil
case syntax . OpRangeTypeMax :
return & MaxOverTime { } , nil
case syntax . OpRangeTypeMin :
return & MinOverTime { } , nil
case syntax . OpRangeTypeStddev :
return & StddevOverTime { } , nil
case syntax . OpRangeTypeStdvar :
return & StdvarOverTime { } , nil
case syntax . OpRangeTypeQuantile :
return & QuantileOverTime { q : * r . Params , values : make ( vector . HeapByMaxValue , 0 ) } , nil
case syntax . OpRangeTypeFirst :
return & FirstOverTime { } , nil
case syntax . OpRangeTypeLast :
return & LastOverTime { } , nil
case syntax . OpRangeTypeAbsent :
return & OneOverTime { } , nil
default :
return nil , fmt . Errorf ( syntax . UnsupportedErr , r . Operation )
}
}
func newRateLogs ( selRange time . Duration , computeValues bool ) RangeStreamingAgg {
return & RateLogsOverTime {
selRange : selRange ,
computeValues : computeValues ,
}
}
// rateLogs calculates the per-second rate of log lines or values extracted
// from log lines
type RateLogsOverTime struct {
selRange time . Duration
val float64
count float64
computeValues bool
}
func ( a * RateLogsOverTime ) agg ( sample promql . Point ) {
a . count ++
a . val += sample . V
}
func ( a * RateLogsOverTime ) at ( ) float64 {
if ! a . computeValues {
return a . count / a . selRange . Seconds ( )
}
return a . val / a . selRange . Seconds ( )
}
// rateCounter calculates the per-second rate of values extracted from log lines
// and treat them like a "counter" metric.
type RateCounterOverTime struct {
samples [ ] promql . Point
selRange time . Duration
}
func ( a * RateCounterOverTime ) agg ( sample promql . Point ) {
a . samples = append ( a . samples , sample )
}
func ( a * RateCounterOverTime ) at ( ) float64 {
return extrapolatedRate ( a . samples , a . selRange , true , true )
}
// rateLogBytes calculates the per-second rate of log bytes.
type RateLogBytesOverTime struct {
sum float64
selRange time . Duration
}
func ( a * RateLogBytesOverTime ) agg ( sample promql . Point ) {
a . sum += sample . V
}
func ( a * RateLogBytesOverTime ) at ( ) float64 {
return a . sum / a . selRange . Seconds ( )
}
type CountOverTime struct {
count float64
}
func ( a * CountOverTime ) agg ( sample promql . Point ) {
a . count ++
}
func ( a * CountOverTime ) at ( ) float64 {
return a . count
}
type SumOverTime struct {
sum float64
}
func ( a * SumOverTime ) agg ( sample promql . Point ) {
a . sum += sample . V
}
func ( a * SumOverTime ) at ( ) float64 {
return a . sum
}
type AvgOverTime struct {
mean , count float64
}
func ( a * AvgOverTime ) agg ( sample promql . Point ) {
a . count ++
if math . IsInf ( a . mean , 0 ) {
if math . IsInf ( sample . V , 0 ) && ( a . mean > 0 ) == ( sample . V > 0 ) {
// The `mean` and `v.V` values are `Inf` of the same sign. They
// can't be subtracted, but the value of `mean` is correct
// already.
return
}
if ! math . IsInf ( sample . V , 0 ) && ! math . IsNaN ( sample . V ) {
// At this stage, the mean is an infinite. If the added
// value is neither an Inf or a Nan, we can keep that mean
// value.
// This is required because our calculation below removes
// the mean value, which would look like Inf += x - Inf and
// end up as a NaN.
return
}
}
a . mean += sample . V / a . count - a . mean / a . count
}
func ( a * AvgOverTime ) at ( ) float64 {
return a . mean
}
type MaxOverTime struct {
max float64
}
func ( a * MaxOverTime ) agg ( sample promql . Point ) {
if sample . V > a . max || math . IsNaN ( a . max ) {
a . max = sample . V
}
}
func ( a * MaxOverTime ) at ( ) float64 {
return a . max
}
type MinOverTime struct {
min float64
}
func ( a * MinOverTime ) agg ( sample promql . Point ) {
if sample . V < a . min || math . IsNaN ( a . min ) {
a . min = sample . V
}
}
func ( a * MinOverTime ) at ( ) float64 {
return a . min
}
type StdvarOverTime struct {
aux , count , mean float64
}
func ( a * StdvarOverTime ) agg ( sample promql . Point ) {
a . count ++
delta := sample . V - a . mean
a . mean += delta / a . count
a . aux += delta * ( sample . V - a . mean )
}
func ( a * StdvarOverTime ) at ( ) float64 {
return a . aux / a . count
}
type StddevOverTime struct {
aux , count , mean float64
}
func ( a * StddevOverTime ) agg ( sample promql . Point ) {
a . count ++
delta := sample . V - a . mean
a . mean += delta / a . count
a . aux += delta * ( sample . V - a . mean )
}
func ( a * StddevOverTime ) at ( ) float64 {
return math . Sqrt ( a . aux / a . count )
}
type QuantileOverTime struct {
q float64
values vector . HeapByMaxValue
}
func ( a * QuantileOverTime ) agg ( sample promql . Point ) {
a . values = append ( a . values , promql . Sample { Point : promql . Point { V : sample . V } } )
}
func ( a * QuantileOverTime ) at ( ) float64 {
return quantile ( a . q , a . values )
}
type FirstOverTime struct {
v float64
hasData bool
}
func ( a * FirstOverTime ) agg ( sample promql . Point ) {
if a . hasData {
return
}
a . v = sample . V
a . hasData = true
}
func ( a * FirstOverTime ) at ( ) float64 {
return a . v
}
type LastOverTime struct {
v float64
}
func ( a * LastOverTime ) agg ( sample promql . Point ) {
a . v = sample . V
}
func ( a * LastOverTime ) at ( ) float64 {
return a . v
}
type OneOverTime struct {
}
func ( a * OneOverTime ) agg ( sample promql . Point ) {
}
func ( a * OneOverTime ) at ( ) float64 {
return 1.0
}