Decouple logql engine/AST from execution context (#1605)

* logql engine is an interface

* [wip] begins agnostic logql evaluator work

* decouples logql AST from execution context

* healthcheck comments
pull/1608/head
Owen Diehl 5 years ago committed by GitHub
parent 576e479090
commit b7d23f41bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 50
      pkg/logql/ast.go
  2. 366
      pkg/logql/engine.go
  3. 14
      pkg/logql/engine_test.go
  4. 327
      pkg/logql/evaluator.go
  5. 6
      pkg/querier/http.go
  6. 10
      pkg/querier/querier.go

@ -3,6 +3,7 @@ package logql
import (
"bytes"
"context"
"errors"
"fmt"
"regexp"
"strconv"
@ -212,30 +213,46 @@ const (
type SampleExpr interface {
// Selector is the LogQL selector to apply when retrieving logs.
Selector() LogSelectorExpr
// Evaluator returns a `StepEvaluator` that can evaluate the expression step by step
Evaluator() StepEvaluator
// Close all resources used.
Close() error
}
// StepEvaluator evaluate a single step of a query.
type StepEvaluator interface {
Next() (bool, int64, promql.Vector)
// Close all resources used.
Close() error
}
type stepEvaluator struct {
fn func() (bool, int64, promql.Vector)
close func() error
}
// StepEvaluatorFn is a function to chain multiple `StepEvaluator`.
type StepEvaluatorFn func() (bool, int64, promql.Vector)
func newStepEvaluator(fn func() (bool, int64, promql.Vector), close func() error) (StepEvaluator, error) {
if fn == nil {
return nil, errors.New("nil step evaluator fn")
}
// Next implements `StepEvaluator`
func (s StepEvaluatorFn) Next() (bool, int64, promql.Vector) {
return s()
if close == nil {
close = func() error { return nil }
}
return &stepEvaluator{
fn: fn,
close: close,
}, nil
}
func (e *stepEvaluator) Next() (bool, int64, promql.Vector) {
return e.fn()
}
func (e *stepEvaluator) Close() error {
return e.close()
}
type rangeAggregationExpr struct {
left *logRange
operation string
iterator RangeVectorIterator
}
func newRangeAggregationExpr(left *logRange, operation string) SampleExpr {
@ -245,13 +262,6 @@ func newRangeAggregationExpr(left *logRange, operation string) SampleExpr {
}
}
func (e *rangeAggregationExpr) Close() error {
if e.iterator == nil {
return nil
}
return e.iterator.Close()
}
func (e *rangeAggregationExpr) Selector() LogSelectorExpr {
return e.left.left
}
@ -296,10 +306,6 @@ func mustNewVectorAggregationExpr(left SampleExpr, operation string, gr *groupin
}
}
func (v *vectorAggregationExpr) Close() error {
return v.left.Close()
}
func (v *vectorAggregationExpr) Selector() LogSelectorExpr {
return v.left.Selector()
}

@ -1,9 +1,7 @@
package logql
import (
"container/heap"
"context"
"math"
"sort"
"time"
@ -12,7 +10,6 @@ import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/stats"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/pkg/labels"
@ -60,18 +57,32 @@ func (opts *EngineOpts) applyDefault() {
}
}
// Engine is the LogQL engine.
type Engine struct {
timeout time.Duration
maxLookBackPeriod time.Duration
// Engine interface used to construct queries
type Engine interface {
NewRangeQuery(qs string, start, end time.Time, step time.Duration, direction logproto.Direction, limit uint32) Query
NewInstantQuery(qs string, ts time.Time, direction logproto.Direction, limit uint32) Query
}
// engine is the LogQL engine.
type engine struct {
timeout time.Duration
evaluator Evaluator
}
// NewEngine creates a new LogQL engine.
func NewEngine(opts EngineOpts) *Engine {
func NewEngine(opts EngineOpts, q Querier) Engine {
if q == nil {
panic("nil Querier")
}
opts.applyDefault()
return &Engine{
timeout: opts.Timeout,
maxLookBackPeriod: opts.MaxLookBackPeriod,
return &engine{
timeout: opts.Timeout,
evaluator: &defaultEvaluator{
querier: q,
maxLookBackPeriod: opts.MaxLookBackPeriod,
},
}
}
@ -82,24 +93,16 @@ type Query interface {
}
type query struct {
querier Querier
qs string
start, end time.Time
step time.Duration
direction logproto.Direction
limit uint32
ng *Engine
}
LiteralParams
func (q *query) isInstant() bool {
return q.start == q.end && q.step == 0
ng *engine
}
// Exec Implements `Query`
func (q *query) Exec(ctx context.Context) (promql.Value, error) {
var queryType string
if q.isInstant() {
if IsInstant(q) {
queryType = "instant"
} else {
queryType = "range"
@ -110,55 +113,57 @@ func (q *query) Exec(ctx context.Context) (promql.Value, error) {
}
// NewRangeQuery creates a new LogQL range query.
func (ng *Engine) NewRangeQuery(
q Querier,
func (ng *engine) NewRangeQuery(
qs string,
start, end time.Time, step time.Duration,
direction logproto.Direction, limit uint32) Query {
return &query{
querier: q,
qs: qs,
start: start,
end: end,
step: step,
direction: direction,
limit: limit,
ng: ng,
LiteralParams: LiteralParams{
qs: qs,
start: start,
end: end,
step: step,
direction: direction,
limit: limit,
},
ng: ng,
}
}
// NewInstantQuery creates a new LogQL instant query.
func (ng *Engine) NewInstantQuery(
q Querier,
func (ng *engine) NewInstantQuery(
qs string,
ts time.Time,
direction logproto.Direction, limit uint32) Query {
return &query{
querier: q,
qs: qs,
start: ts,
end: ts,
step: 0,
direction: direction,
limit: limit,
ng: ng,
LiteralParams: LiteralParams{
qs: qs,
start: ts,
end: ts,
step: 0,
direction: direction,
limit: limit,
},
ng: ng,
}
}
func (ng *Engine) exec(ctx context.Context, q *query) (promql.Value, error) {
func (ng *engine) exec(ctx context.Context, q *query) (promql.Value, error) {
log, ctx := spanlogger.New(ctx, "Engine.exec")
defer log.Finish()
ctx, cancel := context.WithTimeout(ctx, ng.timeout)
defer cancel()
if q.qs == "1+1" {
if q.isInstant() {
qs := q.String()
// This is a legacy query used for health checking. Not the best practice, but it works.
if qs == "1+1" {
if IsInstant(q) {
return promql.Vector{}, nil
}
return promql.Matrix{}, nil
}
expr, err := ParseExpr(q.qs)
expr, err := ParseExpr(qs)
if err != nil {
return nil, err
}
@ -172,26 +177,10 @@ func (ng *Engine) exec(ctx context.Context, q *query) (promql.Value, error) {
switch e := expr.(type) {
case SampleExpr:
if err := ng.setupIterators(ctx, e, q); err != nil {
return nil, err
}
return ng.evalSample(e, q), nil
return ng.evalSample(ctx, e, q)
case LogSelectorExpr:
params := SelectParams{
QueryRequest: &logproto.QueryRequest{
Start: q.start,
End: q.end,
Limit: q.limit,
Direction: q.direction,
Selector: e.String(),
},
}
// instant query, we look back to find logs near the requested ts.
if q.isInstant() {
params.Start = params.Start.Add(-ng.maxLookBackPeriod)
}
iter, err := q.querier.Select(ctx, params)
iter, err := ng.evaluator.Iterator(ctx, e, q)
if err != nil {
return nil, err
}
@ -202,44 +191,20 @@ func (ng *Engine) exec(ctx context.Context, q *query) (promql.Value, error) {
return nil, nil
}
// setupIterators walk through the AST tree and build iterators required to eval samples.
func (ng *Engine) setupIterators(ctx context.Context, expr SampleExpr, q *query) error {
if expr == nil {
return nil
}
switch e := expr.(type) {
case *vectorAggregationExpr:
return ng.setupIterators(ctx, e.left, q)
case *rangeAggregationExpr:
iter, err := q.querier.Select(ctx, SelectParams{
&logproto.QueryRequest{
Start: q.start.Add(-e.left.interval),
End: q.end,
Limit: 0,
Direction: logproto.FORWARD,
Selector: e.Selector().String(),
},
})
if err != nil {
return err
}
e.iterator = newRangeVectorIterator(iter, e.left.interval.Nanoseconds(), q.step.Nanoseconds(),
q.start.UnixNano(), q.end.UnixNano())
}
return nil
}
// evalSample evaluate a sampleExpr
func (ng *Engine) evalSample(expr SampleExpr, q *query) promql.Value {
defer helpers.LogError("closing SampleExpr", expr.Close)
func (ng *engine) evalSample(ctx context.Context, expr SampleExpr, q *query) (promql.Value, error) {
stepEvaluator := expr.Evaluator()
stepEvaluator, err := ng.evaluator.Evaluator(ctx, expr, q)
defer helpers.LogError("closing SampleExpr", stepEvaluator.Close)
if err != nil {
return nil, err
}
seriesIndex := map[uint64]*promql.Series{}
next, ts, vec := stepEvaluator.Next()
if q.isInstant() {
if IsInstant(q) {
sort.Slice(vec, func(i, j int) bool { return labels.Compare(vec[i].Metric, vec[j].Metric) < 0 })
return vec
return vec, nil
}
for next {
for _, p := range vec {
@ -270,7 +235,7 @@ func (ng *Engine) evalSample(expr SampleExpr, q *query) promql.Value {
}
result := promql.Matrix(series)
sort.Sort(result)
return result
return result, nil
}
func readStreams(i iter.EntryIterator, size uint32) (Streams, error) {
@ -304,211 +269,6 @@ type groupedAggregation struct {
reverseHeap vectorByReverseValueHeap
}
// Evaluator implements `SampleExpr` for a vectorAggregationExpr
// this is copied and adapted from Prometheus vector aggregation code.
func (v *vectorAggregationExpr) Evaluator() StepEvaluator {
nextEvaluator := v.left.Evaluator()
return StepEvaluatorFn(func() (bool, int64, promql.Vector) {
next, ts, vec := nextEvaluator.Next()
if !next {
return false, 0, promql.Vector{}
}
result := map[uint64]*groupedAggregation{}
if v.operation == OpTypeTopK || v.operation == OpTypeBottomK {
if v.params < 1 {
return next, ts, promql.Vector{}
}
}
for _, s := range vec {
metric := s.Metric
var (
groupingKey uint64
)
if v.grouping.without {
groupingKey, _ = metric.HashWithoutLabels(make([]byte, 0, 1024), v.grouping.groups...)
} else {
groupingKey, _ = metric.HashForLabels(make([]byte, 0, 1024), v.grouping.groups...)
}
group, ok := result[groupingKey]
// Add a new group if it doesn't exist.
if !ok {
var m labels.Labels
if v.grouping.without {
lb := labels.NewBuilder(metric)
lb.Del(v.grouping.groups...)
lb.Del(labels.MetricName)
m = lb.Labels()
} else {
m = make(labels.Labels, 0, len(v.grouping.groups))
for _, l := range metric {
for _, n := range v.grouping.groups {
if l.Name == n {
m = append(m, l)
break
}
}
}
sort.Sort(m)
}
result[groupingKey] = &groupedAggregation{
labels: m,
value: s.V,
mean: s.V,
groupCount: 1,
}
inputVecLen := len(vec)
resultSize := v.params
if v.params > inputVecLen {
resultSize = inputVecLen
}
if v.operation == OpTypeStdvar || v.operation == OpTypeStddev {
result[groupingKey].value = 0.0
} else if v.operation == OpTypeTopK {
result[groupingKey].heap = make(vectorByValueHeap, 0, resultSize)
heap.Push(&result[groupingKey].heap, &promql.Sample{
Point: promql.Point{V: s.V},
Metric: s.Metric,
})
} else if v.operation == OpTypeBottomK {
result[groupingKey].reverseHeap = make(vectorByReverseValueHeap, 0, resultSize)
heap.Push(&result[groupingKey].reverseHeap, &promql.Sample{
Point: promql.Point{V: s.V},
Metric: s.Metric,
})
}
continue
}
switch v.operation {
case OpTypeSum:
group.value += s.V
case OpTypeAvg:
group.groupCount++
group.mean += (s.V - group.mean) / float64(group.groupCount)
case OpTypeMax:
if group.value < s.V || math.IsNaN(group.value) {
group.value = s.V
}
case OpTypeMin:
if group.value > s.V || math.IsNaN(group.value) {
group.value = s.V
}
case OpTypeCount:
group.groupCount++
case OpTypeStddev, OpTypeStdvar:
group.groupCount++
delta := s.V - group.mean
group.mean += delta / float64(group.groupCount)
group.value += delta * (s.V - group.mean)
case OpTypeTopK:
if len(group.heap) < v.params || group.heap[0].V < s.V || math.IsNaN(group.heap[0].V) {
if len(group.heap) == v.params {
heap.Pop(&group.heap)
}
heap.Push(&group.heap, &promql.Sample{
Point: promql.Point{V: s.V},
Metric: s.Metric,
})
}
case OpTypeBottomK:
if len(group.reverseHeap) < v.params || group.reverseHeap[0].V > s.V || math.IsNaN(group.reverseHeap[0].V) {
if len(group.reverseHeap) == v.params {
heap.Pop(&group.reverseHeap)
}
heap.Push(&group.reverseHeap, &promql.Sample{
Point: promql.Point{V: s.V},
Metric: s.Metric,
})
}
default:
panic(errors.Errorf("expected aggregation operator but got %q", v.operation))
}
}
vec = vec[:0]
for _, aggr := range result {
switch v.operation {
case OpTypeAvg:
aggr.value = aggr.mean
case OpTypeCount:
aggr.value = float64(aggr.groupCount)
case OpTypeStddev:
aggr.value = math.Sqrt(aggr.value / float64(aggr.groupCount))
case OpTypeStdvar:
aggr.value = aggr.value / float64(aggr.groupCount)
case OpTypeTopK:
// The heap keeps the lowest value on top, so reverse it.
sort.Sort(sort.Reverse(aggr.heap))
for _, v := range aggr.heap {
vec = append(vec, promql.Sample{
Metric: v.Metric,
Point: promql.Point{
T: ts,
V: v.V,
},
})
}
continue // Bypass default append.
case OpTypeBottomK:
// The heap keeps the lowest value on top, so reverse it.
sort.Sort(sort.Reverse(aggr.reverseHeap))
for _, v := range aggr.reverseHeap {
vec = append(vec, promql.Sample{
Metric: v.Metric,
Point: promql.Point{
T: ts,
V: v.V,
},
})
}
continue // Bypass default append.
default:
}
vec = append(vec, promql.Sample{
Metric: aggr.labels,
Point: promql.Point{
T: ts,
V: aggr.value,
},
})
}
return next, ts, vec
})
}
// Evaluator implements `SampleExpr` for a rangeAggregationExpr
func (e *rangeAggregationExpr) Evaluator() StepEvaluator {
var fn RangeVectorAggregator
switch e.operation {
case OpTypeRate:
fn = rate(e.left.interval)
case OpTypeCountOverTime:
fn = count
}
return StepEvaluatorFn(func() (bool, int64, promql.Vector) {
next := e.iterator.Next()
if !next {
return false, 0, promql.Vector{}
}
ts, vec := e.iterator.At(fn)
return true, ts, vec
})
}
// rate calculate the per-second rate of log lines.
func rate(selRange time.Duration) func(ts int64, samples []promql.Point) float64 {
return func(ts int64, samples []promql.Point) float64 {

@ -18,7 +18,6 @@ var testSize = int64(300)
func TestEngine_NewInstantQuery(t *testing.T) {
t.Parallel()
eng := NewEngine(EngineOpts{})
for _, test := range []struct {
qs string
ts time.Time
@ -291,7 +290,8 @@ func TestEngine_NewInstantQuery(t *testing.T) {
t.Run(fmt.Sprintf("%s %s", test.qs, test.direction), func(t *testing.T) {
t.Parallel()
q := eng.NewInstantQuery(newQuerierRecorder(test.streams, test.params), test.qs, test.ts, test.direction, test.limit)
eng := NewEngine(EngineOpts{}, newQuerierRecorder(test.streams, test.params))
q := eng.NewInstantQuery(test.qs, test.ts, test.direction, test.limit)
res, err := q.Exec(context.Background())
if err != nil {
t.Fatal(err)
@ -303,7 +303,6 @@ func TestEngine_NewInstantQuery(t *testing.T) {
func TestEngine_NewRangeQuery(t *testing.T) {
t.Parallel()
eng := NewEngine(EngineOpts{})
for _, test := range []struct {
qs string
start time.Time
@ -680,7 +679,9 @@ func TestEngine_NewRangeQuery(t *testing.T) {
t.Run(fmt.Sprintf("%s %s", test.qs, test.direction), func(t *testing.T) {
t.Parallel()
q := eng.NewRangeQuery(newQuerierRecorder(test.streams, test.params), test.qs, test.start, test.end, test.step, test.direction, test.limit)
eng := NewEngine(EngineOpts{}, newQuerierRecorder(test.streams, test.params))
q := eng.NewRangeQuery(test.qs, test.start, test.end, test.step, test.direction, test.limit)
res, err := q.Exec(context.Background())
if err != nil {
t.Fatal(err)
@ -709,10 +710,9 @@ var result promql.Value
func benchmarkRangeQuery(testsize int64, b *testing.B) {
b.ReportAllocs()
eng := NewEngine(EngineOpts{})
eng := NewEngine(EngineOpts{}, getLocalQuerier(testsize))
start := time.Unix(0, 0)
end := time.Unix(testsize, 0)
querier := getLocalQuerier(testsize)
b.ResetTimer()
for i := 0; i < b.N; i++ {
for _, test := range []struct {
@ -741,7 +741,7 @@ func benchmarkRangeQuery(testsize int64, b *testing.B) {
{`bottomk(2,rate(({app=~"foo|bar"} |~".+bar")[1m]))`, logproto.FORWARD},
{`bottomk(3,rate(({app=~"foo|bar"} |~".+bar")[1m])) without (app)`, logproto.FORWARD},
} {
q := eng.NewRangeQuery(querier, test.qs, start, end, 60*time.Second, test.direction, 1000)
q := eng.NewRangeQuery(test.qs, start, end, 60*time.Second, test.direction, 1000)
res, err := q.Exec(context.Background())
if err != nil {
b.Fatal(err)

@ -0,0 +1,327 @@
package logql
import (
"container/heap"
"context"
"math"
"sort"
"time"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
)
// Params details the parameters associated with a loki request
type Params interface {
String() string
Start() time.Time
End() time.Time
Step() time.Duration
Limit() uint32
Direction() logproto.Direction
}
// LiteralParams impls Params
type LiteralParams struct {
qs string
start, end time.Time
step time.Duration
direction logproto.Direction
limit uint32
}
// String impls Params
func (p LiteralParams) String() string { return p.qs }
// Start impls Params
func (p LiteralParams) Start() time.Time { return p.start }
// End impls Params
func (p LiteralParams) End() time.Time { return p.end }
// Step impls Params
func (p LiteralParams) Step() time.Duration { return p.step }
// Limit impls Params
func (p LiteralParams) Limit() uint32 { return p.limit }
// Direction impls Params
func (p LiteralParams) Direction() logproto.Direction { return p.direction }
// IsInstant returns whether a query is an instant query
func IsInstant(q Params) bool {
return q.Start() == q.End() && q.Step() == 0
}
// Evaluator is an interface for iterating over data at different nodes in the AST
type Evaluator interface {
// Evaluator returns a StepEvaluator for a given SampleExpr
Evaluator(context.Context, SampleExpr, Params) (StepEvaluator, error)
// Iterator returns the iter.EntryIterator for a given LogSelectorExpr
Iterator(context.Context, LogSelectorExpr, Params) (iter.EntryIterator, error)
}
type defaultEvaluator struct {
maxLookBackPeriod time.Duration
querier Querier
}
func (ev *defaultEvaluator) Iterator(ctx context.Context, expr LogSelectorExpr, q Params) (iter.EntryIterator, error) {
params := SelectParams{
QueryRequest: &logproto.QueryRequest{
Start: q.Start(),
End: q.End(),
Limit: q.Limit(),
Direction: q.Direction(),
Selector: expr.String(),
},
}
if IsInstant(q) {
params.Start = params.Start.Add(-ev.maxLookBackPeriod)
}
return ev.querier.Select(ctx, params)
}
func (ev *defaultEvaluator) Evaluator(ctx context.Context, expr SampleExpr, q Params) (StepEvaluator, error) {
switch e := expr.(type) {
case *vectorAggregationExpr:
return ev.vectorAggEvaluator(ctx, e, q)
case *rangeAggregationExpr:
return ev.rangeAggEvaluator(ctx, e, q)
default:
return nil, errors.Errorf("unexpected type (%T): %v", e, e)
}
}
func (ev *defaultEvaluator) vectorAggEvaluator(ctx context.Context, expr *vectorAggregationExpr, q Params) (StepEvaluator, error) {
nextEvaluator, err := ev.Evaluator(ctx, expr.left, q)
if err != nil {
return nil, err
}
return newStepEvaluator(func() (bool, int64, promql.Vector) {
next, ts, vec := nextEvaluator.Next()
if !next {
return false, 0, promql.Vector{}
}
result := map[uint64]*groupedAggregation{}
if expr.operation == OpTypeTopK || expr.operation == OpTypeBottomK {
if expr.params < 1 {
return next, ts, promql.Vector{}
}
}
for _, s := range vec {
metric := s.Metric
var (
groupingKey uint64
)
if expr.grouping.without {
groupingKey, _ = metric.HashWithoutLabels(make([]byte, 0, 1024), expr.grouping.groups...)
} else {
groupingKey, _ = metric.HashForLabels(make([]byte, 0, 1024), expr.grouping.groups...)
}
group, ok := result[groupingKey]
// Add a new group if it doesn't exist.
if !ok {
var m labels.Labels
if expr.grouping.without {
lb := labels.NewBuilder(metric)
lb.Del(expr.grouping.groups...)
lb.Del(labels.MetricName)
m = lb.Labels()
} else {
m = make(labels.Labels, 0, len(expr.grouping.groups))
for _, l := range metric {
for _, n := range expr.grouping.groups {
if l.Name == n {
m = append(m, l)
break
}
}
}
sort.Sort(m)
}
result[groupingKey] = &groupedAggregation{
labels: m,
value: s.V,
mean: s.V,
groupCount: 1,
}
inputVecLen := len(vec)
resultSize := expr.params
if expr.params > inputVecLen {
resultSize = inputVecLen
}
if expr.operation == OpTypeStdvar || expr.operation == OpTypeStddev {
result[groupingKey].value = 0.0
} else if expr.operation == OpTypeTopK {
result[groupingKey].heap = make(vectorByValueHeap, 0, resultSize)
heap.Push(&result[groupingKey].heap, &promql.Sample{
Point: promql.Point{V: s.V},
Metric: s.Metric,
})
} else if expr.operation == OpTypeBottomK {
result[groupingKey].reverseHeap = make(vectorByReverseValueHeap, 0, resultSize)
heap.Push(&result[groupingKey].reverseHeap, &promql.Sample{
Point: promql.Point{V: s.V},
Metric: s.Metric,
})
}
continue
}
switch expr.operation {
case OpTypeSum:
group.value += s.V
case OpTypeAvg:
group.groupCount++
group.mean += (s.V - group.mean) / float64(group.groupCount)
case OpTypeMax:
if group.value < s.V || math.IsNaN(group.value) {
group.value = s.V
}
case OpTypeMin:
if group.value > s.V || math.IsNaN(group.value) {
group.value = s.V
}
case OpTypeCount:
group.groupCount++
case OpTypeStddev, OpTypeStdvar:
group.groupCount++
delta := s.V - group.mean
group.mean += delta / float64(group.groupCount)
group.value += delta * (s.V - group.mean)
case OpTypeTopK:
if len(group.heap) < expr.params || group.heap[0].V < s.V || math.IsNaN(group.heap[0].V) {
if len(group.heap) == expr.params {
heap.Pop(&group.heap)
}
heap.Push(&group.heap, &promql.Sample{
Point: promql.Point{V: s.V},
Metric: s.Metric,
})
}
case OpTypeBottomK:
if len(group.reverseHeap) < expr.params || group.reverseHeap[0].V > s.V || math.IsNaN(group.reverseHeap[0].V) {
if len(group.reverseHeap) == expr.params {
heap.Pop(&group.reverseHeap)
}
heap.Push(&group.reverseHeap, &promql.Sample{
Point: promql.Point{V: s.V},
Metric: s.Metric,
})
}
default:
panic(errors.Errorf("expected aggregation operator but got %q", expr.operation))
}
}
vec = vec[:0]
for _, aggr := range result {
switch expr.operation {
case OpTypeAvg:
aggr.value = aggr.mean
case OpTypeCount:
aggr.value = float64(aggr.groupCount)
case OpTypeStddev:
aggr.value = math.Sqrt(aggr.value / float64(aggr.groupCount))
case OpTypeStdvar:
aggr.value = aggr.value / float64(aggr.groupCount)
case OpTypeTopK:
// The heap keeps the lowest value on top, so reverse it.
sort.Sort(sort.Reverse(aggr.heap))
for _, v := range aggr.heap {
vec = append(vec, promql.Sample{
Metric: v.Metric,
Point: promql.Point{
T: ts,
V: v.V,
},
})
}
continue // Bypass default append.
case OpTypeBottomK:
// The heap keeps the lowest value on top, so reverse it.
sort.Sort(sort.Reverse(aggr.reverseHeap))
for _, v := range aggr.reverseHeap {
vec = append(vec, promql.Sample{
Metric: v.Metric,
Point: promql.Point{
T: ts,
V: v.V,
},
})
}
continue // Bypass default append.
default:
}
vec = append(vec, promql.Sample{
Metric: aggr.labels,
Point: promql.Point{
T: ts,
V: aggr.value,
},
})
}
return next, ts, vec
}, nextEvaluator.Close)
}
func (ev *defaultEvaluator) rangeAggEvaluator(ctx context.Context, expr *rangeAggregationExpr, q Params) (StepEvaluator, error) {
entryIter, err := ev.querier.Select(ctx, SelectParams{
&logproto.QueryRequest{
Start: q.Start().Add(-expr.left.interval),
End: q.End(),
Limit: 0,
Direction: logproto.FORWARD,
Selector: expr.Selector().String(),
},
})
if err != nil {
return nil, err
}
vecIter := newRangeVectorIterator(entryIter, expr.left.interval.Nanoseconds(), q.Step().Nanoseconds(),
q.Start().UnixNano(), q.End().UnixNano())
var fn RangeVectorAggregator
switch expr.operation {
case OpTypeRate:
fn = rate(expr.left.interval)
case OpTypeCountOverTime:
fn = count
}
return newStepEvaluator(func() (bool, int64, promql.Vector) {
next := vecIter.Next()
if !next {
return false, 0, promql.Vector{}
}
ts, vec := vecIter.At(fn)
return true, ts, vec
}, vecIter.Close)
}

@ -41,7 +41,7 @@ func (q *Querier) RangeQueryHandler(w http.ResponseWriter, r *http.Request) {
http.Error(w, httpgrpc.Errorf(http.StatusBadRequest, err.Error()).Error(), http.StatusBadRequest)
return
}
query := q.engine.NewRangeQuery(q, request.Query, request.Start, request.End, request.Step, request.Direction, request.Limit)
query := q.engine.NewRangeQuery(request.Query, request.Start, request.End, request.Step, request.Direction, request.Limit)
result, err := query.Exec(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
@ -65,7 +65,7 @@ func (q *Querier) InstantQueryHandler(w http.ResponseWriter, r *http.Request) {
http.Error(w, httpgrpc.Errorf(http.StatusBadRequest, err.Error()).Error(), http.StatusBadRequest)
return
}
query := q.engine.NewInstantQuery(q, request.Query, request.Ts, request.Direction, request.Limit)
query := q.engine.NewInstantQuery(request.Query, request.Ts, request.Direction, request.Limit)
result, err := query.Exec(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
@ -111,7 +111,7 @@ func (q *Querier) LogQueryHandler(w http.ResponseWriter, r *http.Request) {
return
}
query := q.engine.NewRangeQuery(q, request.Query, request.Start, request.End, request.Step, request.Direction, request.Limit)
query := q.engine.NewRangeQuery(request.Query, request.Start, request.End, request.Step, request.Direction, request.Limit)
result, err := query.Exec(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)

@ -60,7 +60,7 @@ type Querier struct {
ring ring.ReadRing
pool *cortex_client.Pool
store storage.Store
engine *logql.Engine
engine logql.Engine
limits *validation.Overrides
}
@ -76,14 +76,16 @@ func New(cfg Config, clientCfg client.Config, ring ring.ReadRing, store storage.
// newQuerier creates a new Querier and allows to pass a custom ingester client factory
// used for testing purposes
func newQuerier(cfg Config, clientCfg client.Config, clientFactory cortex_client.Factory, ring ring.ReadRing, store storage.Store, limits *validation.Overrides) (*Querier, error) {
return &Querier{
querier := Querier{
cfg: cfg,
ring: ring,
pool: cortex_client.NewPool(clientCfg.PoolConfig, ring, clientFactory, util.Logger),
store: store,
engine: logql.NewEngine(cfg.Engine),
limits: limits,
}, nil
}
querier.engine = logql.NewEngine(cfg.Engine, &querier)
return &querier, nil
}
type responseFromIngesters struct {

Loading…
Cancel
Save