mirror of https://github.com/grafana/loki
- adds avg,min,max,top,bottomk,stddev,stdvar,count - updates api documentation - adds tests Improve yacc & go lexer to understand duration Remove support for regexp in all queries Clean up querier and logselectorpull/969/head
parent
8f04545c1e
commit
7f3db9d7f0
@ -0,0 +1,28 @@ |
||||
package logql |
||||
|
||||
import ( |
||||
"strings" |
||||
"testing" |
||||
) |
||||
|
||||
func Test_logSelectorExpr_String(t *testing.T) { |
||||
tests := []string{ |
||||
`{foo!~"bar"}`, |
||||
`{foo="bar", bar!="baz"}`, |
||||
`{foo="bar", bar!="baz"} != "bip" !~ ".+bop"`, |
||||
`{foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap"`, |
||||
} |
||||
|
||||
for _, tt := range tests { |
||||
tt := tt |
||||
t.Run(tt, func(t *testing.T) { |
||||
expr, err := ParseLogSelector(tt) |
||||
if err != nil { |
||||
t.Fatalf("failed to parse log selector: %s", err) |
||||
} |
||||
if expr.String() != strings.Replace(tt, " ", "", -1) { |
||||
t.Fatalf("error expected: %s got: %s", tt, expr.String()) |
||||
} |
||||
}) |
||||
} |
||||
} |
||||
@ -0,0 +1,502 @@ |
||||
package logql |
||||
|
||||
import ( |
||||
"container/heap" |
||||
"context" |
||||
"math" |
||||
"sort" |
||||
"time" |
||||
|
||||
"github.com/grafana/loki/pkg/helpers" |
||||
"github.com/grafana/loki/pkg/iter" |
||||
"github.com/grafana/loki/pkg/logproto" |
||||
"github.com/pkg/errors" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"github.com/prometheus/client_golang/prometheus/promauto" |
||||
"github.com/prometheus/prometheus/pkg/labels" |
||||
"github.com/prometheus/prometheus/promql" |
||||
) |
||||
|
||||
var ( |
||||
queryTime = promauto.NewHistogramVec(prometheus.HistogramOpts{ |
||||
Namespace: "logql", |
||||
Name: "query_duration_seconds", |
||||
Help: "LogQL query timings", |
||||
Buckets: prometheus.DefBuckets, |
||||
}, []string{"query_type"}) |
||||
) |
||||
|
||||
// ValueTypeStreams promql.ValueType for log streams
|
||||
const ValueTypeStreams = "streams" |
||||
|
||||
// Streams is promql.Value
|
||||
type Streams []*logproto.Stream |
||||
|
||||
// Type implements `promql.Value`
|
||||
func (Streams) Type() promql.ValueType { return ValueTypeStreams } |
||||
|
||||
// String implements `promql.Value`
|
||||
func (Streams) String() string { |
||||
return "" |
||||
} |
||||
|
||||
// EngineOpts is the list of options to use with the LogQL query engine.
|
||||
type EngineOpts struct { |
||||
// Timeout for queries execution
|
||||
Timeout time.Duration `yaml:"timeout"` |
||||
// MaxLookBackPeriod is the maximun amount of time to look back for log lines.
|
||||
// only used for instant log queries.
|
||||
MaxLookBackPeriod time.Duration `yaml:"max_look_back_period"` |
||||
} |
||||
|
||||
func (opts *EngineOpts) applyDefault() { |
||||
if opts.Timeout == 0 { |
||||
opts.Timeout = 3 * time.Minute |
||||
} |
||||
if opts.MaxLookBackPeriod == 0 { |
||||
opts.MaxLookBackPeriod = 30 * time.Second |
||||
} |
||||
} |
||||
|
||||
// Engine is the LogQL engine.
|
||||
type Engine struct { |
||||
timeout time.Duration |
||||
maxLookBackPeriod time.Duration |
||||
} |
||||
|
||||
// NewEngine creates a new LogQL engine.
|
||||
func NewEngine(opts EngineOpts) *Engine { |
||||
opts.applyDefault() |
||||
return &Engine{ |
||||
timeout: opts.Timeout, |
||||
maxLookBackPeriod: opts.MaxLookBackPeriod, |
||||
} |
||||
} |
||||
|
||||
// Query is a LogQL query to be executed.
|
||||
type Query interface { |
||||
// Exec processes the query.
|
||||
Exec(ctx context.Context) (promql.Value, error) |
||||
} |
||||
|
||||
type query struct { |
||||
querier Querier |
||||
qs string |
||||
start, end time.Time |
||||
step time.Duration |
||||
direction logproto.Direction |
||||
limit uint32 |
||||
|
||||
ng *Engine |
||||
} |
||||
|
||||
func (q *query) isInstant() bool { |
||||
return q.start == q.end && q.step == 0 |
||||
} |
||||
|
||||
// Exec Implements `Query`
|
||||
func (q *query) Exec(ctx context.Context) (promql.Value, error) { |
||||
var queryType string |
||||
if q.isInstant() { |
||||
queryType = "instant" |
||||
} else { |
||||
queryType = "range" |
||||
} |
||||
timer := prometheus.NewTimer(queryTime.WithLabelValues(queryType)) |
||||
defer timer.ObserveDuration() |
||||
return q.ng.exec(ctx, q) |
||||
} |
||||
|
||||
// NewRangeQuery creates a new LogQL range query.
|
||||
func (ng *Engine) NewRangeQuery( |
||||
q Querier, |
||||
qs string, |
||||
start, end time.Time, step time.Duration, |
||||
direction logproto.Direction, limit uint32) Query { |
||||
return &query{ |
||||
querier: q, |
||||
qs: qs, |
||||
start: start, |
||||
end: end, |
||||
step: step, |
||||
direction: direction, |
||||
limit: limit, |
||||
ng: ng, |
||||
} |
||||
} |
||||
|
||||
// NewInstantQuery creates a new LogQL instant query.
|
||||
func (ng *Engine) NewInstantQuery( |
||||
q Querier, |
||||
qs string, |
||||
ts time.Time, |
||||
direction logproto.Direction, limit uint32) Query { |
||||
return &query{ |
||||
querier: q, |
||||
qs: qs, |
||||
start: ts, |
||||
end: ts, |
||||
step: 0, |
||||
direction: direction, |
||||
limit: limit, |
||||
ng: ng, |
||||
} |
||||
} |
||||
|
||||
func (ng *Engine) exec(ctx context.Context, q *query) (promql.Value, error) { |
||||
ctx, cancel := context.WithTimeout(ctx, ng.timeout) |
||||
defer cancel() |
||||
|
||||
expr, err := ParseExpr(q.qs) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
switch e := expr.(type) { |
||||
case SampleExpr: |
||||
if err := ng.setupIterators(ctx, e, q); err != nil { |
||||
return nil, err |
||||
} |
||||
return ng.evalSample(e, q), nil |
||||
|
||||
case LogSelectorExpr: |
||||
params := SelectParams{ |
||||
QueryRequest: &logproto.QueryRequest{ |
||||
Start: q.start, |
||||
End: q.end, |
||||
Limit: q.limit, |
||||
Direction: q.direction, |
||||
Selector: e.String(), |
||||
}, |
||||
} |
||||
// instant query, we look back to find logs near the requested ts.
|
||||
if q.isInstant() { |
||||
params.Start = params.Start.Add(-ng.maxLookBackPeriod) |
||||
} |
||||
iter, err := q.querier.Select(ctx, params) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
defer helpers.LogError("closing iterator", iter.Close) |
||||
return readStreams(iter, q.limit) |
||||
} |
||||
|
||||
return nil, nil |
||||
} |
||||
|
||||
// setupIterators walk through the AST tree and build iterators required to eval samples.
|
||||
func (ng *Engine) setupIterators(ctx context.Context, expr SampleExpr, q *query) error { |
||||
if expr == nil { |
||||
return nil |
||||
} |
||||
switch e := expr.(type) { |
||||
case *vectorAggregationExpr: |
||||
return ng.setupIterators(ctx, e.left, q) |
||||
case *rangeAggregationExpr: |
||||
iter, err := q.querier.Select(ctx, SelectParams{ |
||||
&logproto.QueryRequest{ |
||||
Start: q.start.Add(-e.left.interval), |
||||
End: q.end, |
||||
Limit: 0, |
||||
Direction: logproto.FORWARD, |
||||
Selector: e.Selector().String(), |
||||
}, |
||||
}) |
||||
e.iterator = newRangeVectorIterator(iter, e.left.interval.Nanoseconds(), q.step.Nanoseconds(), |
||||
q.start.UnixNano(), q.end.UnixNano()) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// evalSample evaluate a sampleExpr
|
||||
func (ng *Engine) evalSample(expr SampleExpr, q *query) promql.Value { |
||||
defer helpers.LogError("closing SampleExpr", expr.Close) |
||||
|
||||
stepEvaluator := expr.Evaluator() |
||||
seriesIndex := map[uint64]*promql.Series{} |
||||
|
||||
next, ts, vec := stepEvaluator.Next() |
||||
if q.isInstant() { |
||||
return vec |
||||
} |
||||
for next { |
||||
for _, p := range vec { |
||||
var ( |
||||
series *promql.Series |
||||
hash = p.Metric.Hash() |
||||
ok bool |
||||
) |
||||
|
||||
series, ok = seriesIndex[hash] |
||||
if !ok { |
||||
series = &promql.Series{ |
||||
Metric: p.Metric, |
||||
} |
||||
seriesIndex[hash] = series |
||||
} |
||||
series.Points = append(series.Points, promql.Point{ |
||||
T: ts, |
||||
V: p.V, |
||||
}) |
||||
} |
||||
next, ts, vec = stepEvaluator.Next() |
||||
} |
||||
|
||||
series := make([]promql.Series, 0, len(seriesIndex)) |
||||
for _, s := range seriesIndex { |
||||
series = append(series, *s) |
||||
} |
||||
result := promql.Matrix(series) |
||||
sort.Sort(result) |
||||
return result |
||||
} |
||||
|
||||
func readStreams(i iter.EntryIterator, size uint32) (Streams, error) { |
||||
streams := map[string]*logproto.Stream{} |
||||
respSize := uint32(0) |
||||
for ; respSize < size && i.Next(); respSize++ { |
||||
labels, entry := i.Labels(), i.Entry() |
||||
stream, ok := streams[labels] |
||||
if !ok { |
||||
stream = &logproto.Stream{ |
||||
Labels: labels, |
||||
} |
||||
streams[labels] = stream |
||||
} |
||||
stream.Entries = append(stream.Entries, entry) |
||||
} |
||||
|
||||
result := make([]*logproto.Stream, 0, len(streams)) |
||||
for _, stream := range streams { |
||||
result = append(result, stream) |
||||
} |
||||
return result, i.Error() |
||||
} |
||||
|
||||
type groupedAggregation struct { |
||||
labels labels.Labels |
||||
value float64 |
||||
mean float64 |
||||
groupCount int |
||||
heap vectorByValueHeap |
||||
reverseHeap vectorByReverseValueHeap |
||||
} |
||||
|
||||
// Evaluator implements `SampleExpr` for a vectorAggregationExpr
|
||||
// this is copied and adapted from Prometheus vector aggregation code.
|
||||
func (v *vectorAggregationExpr) Evaluator() StepEvaluator { |
||||
return StepEvaluatorFn(func() (bool, int64, promql.Vector) { |
||||
next, ts, vec := v.left.Evaluator().Next() |
||||
if !next { |
||||
return false, 0, promql.Vector{} |
||||
} |
||||
result := map[uint64]*groupedAggregation{} |
||||
if v.operation == OpTypeTopK || v.operation == OpTypeBottomK { |
||||
if v.params < 1 { |
||||
return next, ts, promql.Vector{} |
||||
} |
||||
|
||||
} |
||||
for _, s := range vec { |
||||
metric := s.Metric |
||||
|
||||
var ( |
||||
groupingKey uint64 |
||||
) |
||||
if v.grouping.without { |
||||
groupingKey = metric.HashWithoutLabels(v.grouping.groups...) |
||||
} else { |
||||
groupingKey = metric.HashForLabels(v.grouping.groups...) |
||||
} |
||||
group, ok := result[groupingKey] |
||||
// Add a new group if it doesn't exist.
|
||||
if !ok { |
||||
var m labels.Labels |
||||
|
||||
if v.grouping.without { |
||||
lb := labels.NewBuilder(metric) |
||||
lb.Del(v.grouping.groups...) |
||||
lb.Del(labels.MetricName) |
||||
m = lb.Labels() |
||||
} else { |
||||
m = make(labels.Labels, 0, len(v.grouping.groups)) |
||||
for _, l := range metric { |
||||
for _, n := range v.grouping.groups { |
||||
if l.Name == n { |
||||
m = append(m, l) |
||||
break |
||||
} |
||||
} |
||||
} |
||||
sort.Sort(m) |
||||
} |
||||
result[groupingKey] = &groupedAggregation{ |
||||
labels: m, |
||||
value: s.V, |
||||
mean: s.V, |
||||
groupCount: 1, |
||||
} |
||||
|
||||
inputVecLen := len(vec) |
||||
resultSize := v.params |
||||
if v.params > inputVecLen { |
||||
resultSize = inputVecLen |
||||
} |
||||
if v.operation == OpTypeStdvar || v.operation == OpTypeStddev { |
||||
result[groupingKey].value = 0.0 |
||||
} else if v.operation == OpTypeTopK { |
||||
result[groupingKey].heap = make(vectorByValueHeap, 0, resultSize) |
||||
heap.Push(&result[groupingKey].heap, &promql.Sample{ |
||||
Point: promql.Point{V: s.V}, |
||||
Metric: s.Metric, |
||||
}) |
||||
} else if v.operation == OpTypeBottomK { |
||||
result[groupingKey].reverseHeap = make(vectorByReverseValueHeap, 0, resultSize) |
||||
heap.Push(&result[groupingKey].reverseHeap, &promql.Sample{ |
||||
Point: promql.Point{V: s.V}, |
||||
Metric: s.Metric, |
||||
}) |
||||
} |
||||
continue |
||||
} |
||||
switch v.operation { |
||||
case OpTypeSum: |
||||
group.value += s.V |
||||
|
||||
case OpTypeAvg: |
||||
group.groupCount++ |
||||
group.mean += (s.V - group.mean) / float64(group.groupCount) |
||||
|
||||
case OpTypeMax: |
||||
if group.value < s.V || math.IsNaN(group.value) { |
||||
group.value = s.V |
||||
} |
||||
|
||||
case OpTypeMin: |
||||
if group.value > s.V || math.IsNaN(group.value) { |
||||
group.value = s.V |
||||
} |
||||
|
||||
case OpTypeCount: |
||||
group.groupCount++ |
||||
|
||||
case OpTypeStddev, OpTypeStdvar: |
||||
group.groupCount++ |
||||
delta := s.V - group.mean |
||||
group.mean += delta / float64(group.groupCount) |
||||
group.value += delta * (s.V - group.mean) |
||||
|
||||
case OpTypeTopK: |
||||
if len(group.heap) < v.params || group.heap[0].V < s.V || math.IsNaN(group.heap[0].V) { |
||||
if len(group.heap) == v.params { |
||||
heap.Pop(&group.heap) |
||||
} |
||||
heap.Push(&group.heap, &promql.Sample{ |
||||
Point: promql.Point{V: s.V}, |
||||
Metric: s.Metric, |
||||
}) |
||||
} |
||||
|
||||
case OpTypeBottomK: |
||||
if len(group.reverseHeap) < v.params || group.reverseHeap[0].V > s.V || math.IsNaN(group.reverseHeap[0].V) { |
||||
if len(group.reverseHeap) == v.params { |
||||
heap.Pop(&group.reverseHeap) |
||||
} |
||||
heap.Push(&group.reverseHeap, &promql.Sample{ |
||||
Point: promql.Point{V: s.V}, |
||||
Metric: s.Metric, |
||||
}) |
||||
} |
||||
default: |
||||
panic(errors.Errorf("expected aggregation operator but got %q", v.operation)) |
||||
} |
||||
} |
||||
vec = vec[:0] |
||||
for _, aggr := range result { |
||||
switch v.operation { |
||||
case OpTypeAvg: |
||||
aggr.value = aggr.mean |
||||
|
||||
case OpTypeCount: |
||||
aggr.value = float64(aggr.groupCount) |
||||
|
||||
case OpTypeStddev: |
||||
aggr.value = math.Sqrt(aggr.value / float64(aggr.groupCount)) |
||||
|
||||
case OpTypeStdvar: |
||||
aggr.value = aggr.value / float64(aggr.groupCount) |
||||
|
||||
case OpTypeTopK: |
||||
// The heap keeps the lowest value on top, so reverse it.
|
||||
sort.Sort(sort.Reverse(aggr.heap)) |
||||
for _, v := range aggr.heap { |
||||
vec = append(vec, promql.Sample{ |
||||
Metric: v.Metric, |
||||
Point: promql.Point{ |
||||
T: ts, |
||||
V: aggr.value, |
||||
}, |
||||
}) |
||||
} |
||||
continue // Bypass default append.
|
||||
|
||||
case OpTypeBottomK: |
||||
// The heap keeps the lowest value on top, so reverse it.
|
||||
sort.Sort(sort.Reverse(aggr.reverseHeap)) |
||||
for _, v := range aggr.reverseHeap { |
||||
vec = append(vec, promql.Sample{ |
||||
Metric: v.Metric, |
||||
Point: promql.Point{ |
||||
T: ts, |
||||
V: v.V, |
||||
}, |
||||
}) |
||||
} |
||||
continue // Bypass default append.
|
||||
default: |
||||
} |
||||
vec = append(vec, promql.Sample{ |
||||
Metric: aggr.labels, |
||||
Point: promql.Point{ |
||||
T: ts, |
||||
V: aggr.value, |
||||
}, |
||||
}) |
||||
} |
||||
return next, ts, vec |
||||
}) |
||||
} |
||||
|
||||
// Evaluator implements `SampleExpr` for a rangeAggregationExpr
|
||||
func (e *rangeAggregationExpr) Evaluator() StepEvaluator { |
||||
var fn RangeVectorAggregator |
||||
switch e.operation { |
||||
case OpTypeRate: |
||||
fn = rate(e.left.interval) |
||||
case OpTypeCountOverTime: |
||||
fn = count |
||||
} |
||||
return StepEvaluatorFn(func() (bool, int64, promql.Vector) { |
||||
next := e.iterator.Next() |
||||
if !next { |
||||
return false, 0, promql.Vector{} |
||||
} |
||||
ts, vec := e.iterator.At(fn) |
||||
return true, ts, vec |
||||
}) |
||||
} |
||||
|
||||
// rate calculate the per-second rate of log lines.
|
||||
func rate(selRange time.Duration) func(ts int64, samples []promql.Point) float64 { |
||||
return func(ts int64, samples []promql.Point) float64 { |
||||
return float64(len(samples)) / selRange.Seconds() |
||||
} |
||||
} |
||||
|
||||
// count counts the amount of log lines.
|
||||
func count(ts int64, samples []promql.Point) float64 { |
||||
return float64(len(samples)) |
||||
} |
||||
@ -0,0 +1,129 @@ |
||||
package logql |
||||
|
||||
import ( |
||||
"github.com/grafana/loki/pkg/iter" |
||||
"github.com/prometheus/prometheus/promql" |
||||
) |
||||
|
||||
// RangeVectorAggregator aggregates samples for a given range of samples.
|
||||
// It receives the current nano-seconds timestamp and the list of point within
|
||||
// the range.
|
||||
type RangeVectorAggregator func(int64, []promql.Point) float64 |
||||
|
||||
// RangeVectorIterator iterates through a range of samples.
|
||||
// To fetch the current vector use `At` with a `RangeVectorAggregator`.
|
||||
type RangeVectorIterator interface { |
||||
Next() bool |
||||
At(aggregator RangeVectorAggregator) (int64, promql.Vector) |
||||
Close() error |
||||
} |
||||
|
||||
type rangeVectorIterator struct { |
||||
iter iter.PeekingEntryIterator |
||||
selRange, step, end, current int64 |
||||
window map[string]*promql.Series |
||||
} |
||||
|
||||
func newRangeVectorIterator( |
||||
it iter.EntryIterator, |
||||
selRange, step, start, end int64) *rangeVectorIterator { |
||||
// forces at least one step.
|
||||
if step == 0 { |
||||
step = 1 |
||||
} |
||||
return &rangeVectorIterator{ |
||||
iter: iter.NewPeekingIterator(it), |
||||
step: step, |
||||
end: end, |
||||
selRange: selRange, |
||||
current: start - step, // first loop iteration will set it to start
|
||||
window: map[string]*promql.Series{}, |
||||
} |
||||
} |
||||
|
||||
func (r *rangeVectorIterator) Next() bool { |
||||
// slides the range window to the next position
|
||||
r.current = r.current + r.step |
||||
if r.current > r.end { |
||||
return false |
||||
} |
||||
rangeEnd := r.current |
||||
rangeStart := r.current - r.selRange |
||||
// load samples
|
||||
r.popBack(rangeStart) |
||||
r.load(rangeStart, rangeEnd) |
||||
return true |
||||
} |
||||
|
||||
func (r *rangeVectorIterator) Close() error { |
||||
return r.iter.Close() |
||||
} |
||||
|
||||
// popBack removes all entries out of the current window from the back.
|
||||
func (r *rangeVectorIterator) popBack(newStart int64) { |
||||
// possible improvement: if there is no overlap we can just remove all.
|
||||
for fp := range r.window { |
||||
lastPoint := 0 |
||||
for i, p := range r.window[fp].Points { |
||||
if p.T <= newStart { |
||||
lastPoint = i |
||||
continue |
||||
} |
||||
break |
||||
} |
||||
r.window[fp].Points = r.window[fp].Points[lastPoint+1:] |
||||
if len(r.window[fp].Points) == 0 { |
||||
delete(r.window, fp) |
||||
} |
||||
} |
||||
} |
||||
|
||||
// load the next sample range window.
|
||||
func (r *rangeVectorIterator) load(start, end int64) { |
||||
for lbs, entry, hasNext := r.iter.Peek(); hasNext; lbs, entry, hasNext = r.iter.Peek() { |
||||
if entry.Timestamp.UnixNano() > end { |
||||
// not consuming the iterator as this belong to another range.
|
||||
return |
||||
} |
||||
// the lower bound of the range is not inclusive
|
||||
if entry.Timestamp.UnixNano() <= start { |
||||
_ = r.iter.Next() |
||||
continue |
||||
} |
||||
// adds the sample.
|
||||
var series *promql.Series |
||||
var ok bool |
||||
series, ok = r.window[lbs] |
||||
if !ok { |
||||
series = &promql.Series{ |
||||
Points: []promql.Point{}, |
||||
} |
||||
r.window[lbs] = series |
||||
} |
||||
series.Points = append(series.Points, promql.Point{ |
||||
T: entry.Timestamp.UnixNano(), |
||||
V: 1, |
||||
}) |
||||
_ = r.iter.Next() |
||||
} |
||||
} |
||||
|
||||
func (r *rangeVectorIterator) At(aggregator RangeVectorAggregator) (int64, promql.Vector) { |
||||
result := make([]promql.Sample, 0, len(r.window)) |
||||
for lbs, series := range r.window { |
||||
labels, err := promql.ParseMetric(lbs) |
||||
if err != nil { |
||||
continue |
||||
} |
||||
|
||||
result = append(result, promql.Sample{ |
||||
Point: promql.Point{ |
||||
V: aggregator(r.current, series.Points), |
||||
T: r.current, |
||||
}, |
||||
Metric: labels, |
||||
}) |
||||
|
||||
} |
||||
return r.current, result |
||||
} |
||||
@ -0,0 +1,138 @@ |
||||
package logql |
||||
|
||||
import ( |
||||
"fmt" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/grafana/loki/pkg/iter" |
||||
"github.com/grafana/loki/pkg/logproto" |
||||
"github.com/prometheus/prometheus/promql" |
||||
"github.com/stretchr/testify/require" |
||||
) |
||||
|
||||
var entries = []logproto.Entry{ |
||||
{Timestamp: time.Unix(2, 0)}, |
||||
{Timestamp: time.Unix(5, 0)}, |
||||
{Timestamp: time.Unix(6, 0)}, |
||||
{Timestamp: time.Unix(10, 0)}, |
||||
{Timestamp: time.Unix(10, 1)}, |
||||
{Timestamp: time.Unix(11, 0)}, |
||||
{Timestamp: time.Unix(35, 0)}, |
||||
{Timestamp: time.Unix(35, 1)}, |
||||
{Timestamp: time.Unix(40, 0)}, |
||||
{Timestamp: time.Unix(100, 0)}, |
||||
{Timestamp: time.Unix(100, 1)}, |
||||
} |
||||
|
||||
var labelFoo, _ = promql.ParseMetric("{app=\"foo\"}") |
||||
var labelBar, _ = promql.ParseMetric("{app=\"bar\"}") |
||||
|
||||
func newEntryIterator() iter.EntryIterator { |
||||
return iter.NewHeapIterator([]iter.EntryIterator{ |
||||
iter.NewStreamIterator(&logproto.Stream{ |
||||
Labels: labelFoo.String(), |
||||
Entries: entries, |
||||
}), |
||||
iter.NewStreamIterator(&logproto.Stream{ |
||||
Labels: labelBar.String(), |
||||
Entries: entries, |
||||
}), |
||||
}, logproto.FORWARD) |
||||
} |
||||
|
||||
func newPoint(t time.Time, v float64) promql.Point { |
||||
return promql.Point{T: t.UnixNano(), V: v} |
||||
} |
||||
|
||||
func Test_RangeVectorIterator(t *testing.T) { |
||||
tests := []struct { |
||||
selRange int64 |
||||
step int64 |
||||
expectedVectors []promql.Vector |
||||
expectedTs []time.Time |
||||
}{ |
||||
{ |
||||
(5 * time.Second).Nanoseconds(), // no overlap
|
||||
(30 * time.Second).Nanoseconds(), |
||||
[]promql.Vector{ |
||||
[]promql.Sample{ |
||||
{Point: newPoint(time.Unix(10, 0), 2), Metric: labelBar}, |
||||
{Point: newPoint(time.Unix(10, 0), 2), Metric: labelFoo}, |
||||
}, |
||||
[]promql.Sample{ |
||||
{Point: newPoint(time.Unix(40, 0), 2), Metric: labelBar}, |
||||
{Point: newPoint(time.Unix(40, 0), 2), Metric: labelFoo}, |
||||
}, |
||||
{}, |
||||
[]promql.Sample{ |
||||
{Point: newPoint(time.Unix(100, 0), 1), Metric: labelBar}, |
||||
{Point: newPoint(time.Unix(100, 0), 1), Metric: labelFoo}, |
||||
}, |
||||
}, |
||||
[]time.Time{time.Unix(10, 0), time.Unix(40, 0), time.Unix(70, 0), time.Unix(100, 0)}, |
||||
}, |
||||
{ |
||||
(35 * time.Second).Nanoseconds(), // will overlap by 5 sec
|
||||
(30 * time.Second).Nanoseconds(), |
||||
[]promql.Vector{ |
||||
[]promql.Sample{ |
||||
{Point: newPoint(time.Unix(10, 0), 4), Metric: labelBar}, |
||||
{Point: newPoint(time.Unix(10, 0), 4), Metric: labelFoo}, |
||||
}, |
||||
[]promql.Sample{ |
||||
{Point: newPoint(time.Unix(40, 0), 7), Metric: labelBar}, |
||||
{Point: newPoint(time.Unix(40, 0), 7), Metric: labelFoo}, |
||||
}, |
||||
[]promql.Sample{ |
||||
{Point: newPoint(time.Unix(70, 0), 2), Metric: labelBar}, |
||||
{Point: newPoint(time.Unix(70, 0), 2), Metric: labelFoo}, |
||||
}, |
||||
[]promql.Sample{ |
||||
{Point: newPoint(time.Unix(100, 0), 1), Metric: labelBar}, |
||||
{Point: newPoint(time.Unix(100, 0), 1), Metric: labelFoo}, |
||||
}, |
||||
}, |
||||
[]time.Time{time.Unix(10, 0), time.Unix(40, 0), time.Unix(70, 0), time.Unix(100, 0)}, |
||||
}, |
||||
{ |
||||
(30 * time.Second).Nanoseconds(), // same range
|
||||
(30 * time.Second).Nanoseconds(), |
||||
[]promql.Vector{ |
||||
[]promql.Sample{ |
||||
{Point: newPoint(time.Unix(10, 0), 4), Metric: labelBar}, |
||||
{Point: newPoint(time.Unix(10, 0), 4), Metric: labelFoo}, |
||||
}, |
||||
[]promql.Sample{ |
||||
{Point: newPoint(time.Unix(40, 0), 5), Metric: labelBar}, |
||||
{Point: newPoint(time.Unix(40, 0), 5), Metric: labelFoo}, |
||||
}, |
||||
[]promql.Sample{}, |
||||
[]promql.Sample{ |
||||
{Point: newPoint(time.Unix(100, 0), 1), Metric: labelBar}, |
||||
{Point: newPoint(time.Unix(100, 0), 1), Metric: labelFoo}, |
||||
}, |
||||
}, |
||||
[]time.Time{time.Unix(10, 0), time.Unix(40, 0), time.Unix(70, 0), time.Unix(100, 0)}, |
||||
}, |
||||
} |
||||
|
||||
for _, tt := range tests { |
||||
t.Run( |
||||
fmt.Sprintf("logs[%s] - step: %s", time.Duration(tt.selRange), time.Duration(tt.step)), |
||||
func(t *testing.T) { |
||||
it := newRangeVectorIterator(newEntryIterator(), tt.selRange, |
||||
tt.step, time.Unix(10, 0).UnixNano(), time.Unix(100, 0).UnixNano()) |
||||
|
||||
i := 0 |
||||
for it.Next() { |
||||
ts, v := it.At(count) |
||||
require.ElementsMatch(t, tt.expectedVectors[i], v) |
||||
require.Equal(t, tt.expectedTs[i].UnixNano(), ts) |
||||
i++ |
||||
} |
||||
require.Equal(t, len(tt.expectedTs), i) |
||||
require.Equal(t, len(tt.expectedVectors), i) |
||||
}) |
||||
} |
||||
} |
||||
@ -0,0 +1,65 @@ |
||||
package logql |
||||
|
||||
import ( |
||||
"math" |
||||
|
||||
"github.com/prometheus/prometheus/promql" |
||||
) |
||||
|
||||
type vectorByValueHeap promql.Vector |
||||
|
||||
func (s vectorByValueHeap) Len() int { |
||||
return len(s) |
||||
} |
||||
|
||||
func (s vectorByValueHeap) Less(i, j int) bool { |
||||
if math.IsNaN(s[i].V) { |
||||
return true |
||||
} |
||||
return s[i].V < s[j].V |
||||
} |
||||
|
||||
func (s vectorByValueHeap) Swap(i, j int) { |
||||
s[i], s[j] = s[j], s[i] |
||||
} |
||||
|
||||
func (s *vectorByValueHeap) Push(x interface{}) { |
||||
*s = append(*s, *(x.(*promql.Sample))) |
||||
} |
||||
|
||||
func (s *vectorByValueHeap) Pop() interface{} { |
||||
old := *s |
||||
n := len(old) |
||||
el := old[n-1] |
||||
*s = old[0 : n-1] |
||||
return el |
||||
} |
||||
|
||||
type vectorByReverseValueHeap promql.Vector |
||||
|
||||
func (s vectorByReverseValueHeap) Len() int { |
||||
return len(s) |
||||
} |
||||
|
||||
func (s vectorByReverseValueHeap) Less(i, j int) bool { |
||||
if math.IsNaN(s[i].V) { |
||||
return true |
||||
} |
||||
return s[i].V > s[j].V |
||||
} |
||||
|
||||
func (s vectorByReverseValueHeap) Swap(i, j int) { |
||||
s[i], s[j] = s[j], s[i] |
||||
} |
||||
|
||||
func (s *vectorByReverseValueHeap) Push(x interface{}) { |
||||
*s = append(*s, *(x.(*promql.Sample))) |
||||
} |
||||
|
||||
func (s *vectorByReverseValueHeap) Pop() interface{} { |
||||
old := *s |
||||
n := len(old) |
||||
el := old[n-1] |
||||
*s = old[0 : n-1] |
||||
return el |
||||
} |
||||
Loading…
Reference in new issue