package logql import ( "sync" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" promql_parser "github.com/prometheus/prometheus/promql/parser" "github.com/grafana/loki/pkg/iter" ) // RangeVectorAggregator aggregates samples for a given range of samples. // It receives the current milliseconds timestamp and the list of point within // the range. type RangeVectorAggregator func([]promql.Point) float64 // RangeVectorIterator iterates through a range of samples. // To fetch the current vector use `At` with a `RangeVectorAggregator`. type RangeVectorIterator interface { Next() bool At(aggregator RangeVectorAggregator) (int64, promql.Vector) Close() error Error() error } type rangeVectorIterator struct { iter iter.PeekingSampleIterator selRange, step, end, current int64 window map[string]*promql.Series metrics map[string]labels.Labels at []promql.Sample } func newRangeVectorIterator( it iter.PeekingSampleIterator, selRange, step, start, end int64) *rangeVectorIterator { // forces at least one step. if step == 0 { step = 1 } return &rangeVectorIterator{ iter: it, step: step, end: end, selRange: selRange, current: start - step, // first loop iteration will set it to start window: map[string]*promql.Series{}, metrics: map[string]labels.Labels{}, } } func (r *rangeVectorIterator) Next() bool { // slides the range window to the next position r.current = r.current + r.step if r.current > r.end { return false } rangeEnd := r.current rangeStart := r.current - r.selRange // load samples r.popBack(rangeStart) r.load(rangeStart, rangeEnd) return true } func (r *rangeVectorIterator) Close() error { return r.iter.Close() } func (r *rangeVectorIterator) Error() error { return r.iter.Error() } // popBack removes all entries out of the current window from the back. func (r *rangeVectorIterator) popBack(newStart int64) { // possible improvement: if there is no overlap we can just remove all. for fp := range r.window { lastPoint := 0 remove := false for i, p := range r.window[fp].Points { if p.T <= newStart { lastPoint = i remove = true continue } break } if remove { r.window[fp].Points = r.window[fp].Points[lastPoint+1:] } if len(r.window[fp].Points) == 0 { s := r.window[fp] delete(r.window, fp) putSeries(s) } } } // load the next sample range window. func (r *rangeVectorIterator) load(start, end int64) { for lbs, sample, hasNext := r.iter.Peek(); hasNext; lbs, sample, hasNext = r.iter.Peek() { if sample.Timestamp > end { // not consuming the iterator as this belong to another range. return } // the lower bound of the range is not inclusive if sample.Timestamp <= start { _ = r.iter.Next() continue } // adds the sample. var series *promql.Series var ok bool series, ok = r.window[lbs] if !ok { var metric labels.Labels if metric, ok = r.metrics[lbs]; !ok { var err error metric, err = promql_parser.ParseMetric(lbs) if err != nil { _ = r.iter.Next() continue } r.metrics[lbs] = metric } series = getSeries() series.Metric = metric r.window[lbs] = series } p := promql.Point{ T: sample.Timestamp, V: sample.Value, } series.Points = append(series.Points, p) _ = r.iter.Next() } } func (r *rangeVectorIterator) At(aggregator RangeVectorAggregator) (int64, promql.Vector) { if r.at == nil { r.at = make([]promql.Sample, 0, len(r.window)) } r.at = r.at[:0] // convert ts from nano to milli seconds as the iterator work with nanoseconds ts := r.current / 1e+6 for _, series := range r.window { r.at = append(r.at, promql.Sample{ Point: promql.Point{ V: aggregator(series.Points), T: ts, }, Metric: series.Metric, }) } return ts, r.at } var seriesPool sync.Pool func getSeries() *promql.Series { if r := seriesPool.Get(); r != nil { s := r.(*promql.Series) s.Points = s.Points[:0] return s } return &promql.Series{ Points: make([]promql.Point, 0, 1024), } } func putSeries(s *promql.Series) { seriesPool.Put(s) }