Bytes aggregations (#2150)

* Add bytes_rate and bytes_over_time.

Still need to works on test.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add more tests.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* lint.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* More lint fixes.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* More comments.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add documentation.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Review feedbacks.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fix bad conflic resolution.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
k20
Cyril Tovena 5 years ago committed by GitHub
parent 635dd0add5
commit 18f1b0640d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      docs/logql.md
  2. 6
      pkg/logql/ast.go
  3. 12
      pkg/logql/engine.go
  4. 48
      pkg/logql/engine_test.go
  5. 48
      pkg/logql/evaluator.go
  6. 7
      pkg/logql/expr.y
  7. 297
      pkg/logql/expr.y.go
  8. 63
      pkg/logql/functions.go
  9. 56
      pkg/logql/lex.go
  10. 118
      pkg/logql/parser_test.go
  11. 32
      pkg/logql/range_vector.go
  12. 13
      pkg/logql/range_vector_test.go
  13. 99
      pkg/logql/series_extractor.go
  14. 159
      pkg/logql/series_extractor_test.go
  15. 9
      pkg/logql/shardmapper.go
  16. 58
      pkg/logql/shardmapper_test.go
  17. 4
      pkg/logql/test_utils.go
  18. 4
      pkg/promtail/client/config_test.go
  19. 5
      pkg/querier/queryrange/downstreamer_test.go
  20. 3
      pkg/querier/queryrange/querysharding_test.go

@ -107,9 +107,11 @@ transform it into an instance vector.
The currently supported functions for operating over are:
- `rate`: calculate the number of entries per second
- `rate`: calculates the number of entries per second
- `count_over_time`: counts the entries for each log stream within the given
range.
- `bytes_rate`: calculates the number of bytes per second for each stream.
- `bytes_over_time`: counts the amount of bytes used by each log stream for a given range.
> `count_over_time({job="mysql"}[5m])`

@ -198,8 +198,10 @@ const (
OpTypeTopK = "topk"
// range vector ops
OpTypeCountOverTime = "count_over_time"
OpTypeRate = "rate"
OpRangeTypeCount = "count_over_time"
OpRangeTypeRate = "rate"
OpRangeTypeBytes = "bytes_over_time"
OpRangeTypeBytesRate = "bytes_rate"
// binops - logical/set
OpTypeOr = "or"

@ -313,15 +313,3 @@ type groupedAggregation struct {
heap vectorByValueHeap
reverseHeap vectorByReverseValueHeap
}
// rate calculate the per-second rate of log lines.
func rate(selRange time.Duration) func(ts int64, samples []promql.Point) float64 {
return func(ts int64, samples []promql.Point) float64 {
return float64(len(samples)) / selRange.Seconds()
}
}
// count counts the amount of log lines.
func count(ts int64, samples []promql.Point) float64 {
return float64(len(samples))
}

@ -1159,6 +1159,54 @@ func TestEngine_RangeQuery(t *testing.T) {
},
},
},
{
`bytes_rate({app="foo"}[30s])`, time.Unix(60, 0), time.Unix(120, 0), 15 * time.Second, 0, logproto.FORWARD, 10,
[][]logproto.Stream{
{logproto.Stream{
Labels: `{app="foo"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(45, 0), Line: "0123456789"}, // 10 bytes / 30s for the first point.
{Timestamp: time.Unix(60, 0), Line: ""},
{Timestamp: time.Unix(75, 0), Line: ""},
{Timestamp: time.Unix(90, 0), Line: ""},
{Timestamp: time.Unix(105, 0), Line: ""},
},
}},
},
[]SelectParams{
{&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(30, 0), End: time.Unix(120, 0), Limit: 0, Selector: `{app="foo"}`}},
},
promql.Matrix{
promql.Series{
Metric: labels.Labels{{Name: "app", Value: "foo"}},
Points: []promql.Point{{T: 60 * 1000, V: 10. / 30.}, {T: 75 * 1000, V: 0}, {T: 90 * 1000, V: 0}, {T: 105 * 1000, V: 0}, {T: 120 * 1000, V: 0}},
},
},
},
{
`bytes_over_time({app="foo"}[30s])`, time.Unix(60, 0), time.Unix(120, 0), 15 * time.Second, 0, logproto.FORWARD, 10,
[][]logproto.Stream{
{logproto.Stream{
Labels: `{app="foo"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(45, 0), Line: "01234"}, // 5 bytes
{Timestamp: time.Unix(60, 0), Line: ""},
{Timestamp: time.Unix(75, 0), Line: ""},
{Timestamp: time.Unix(90, 0), Line: ""},
{Timestamp: time.Unix(105, 0), Line: "0123"}, // 4 bytes
},
}},
},
[]SelectParams{
{&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(30, 0), End: time.Unix(120, 0), Limit: 0, Selector: `{app="foo"}`}},
},
promql.Matrix{
promql.Series{
Metric: labels.Labels{{Name: "app", Value: "foo"}},
Points: []promql.Point{{T: 60 * 1000, V: 5.}, {T: 75 * 1000, V: 0}, {T: 90 * 1000, V: 0}, {T: 105 * 1000, V: 4.}, {T: 120 * 1000, V: 4.}},
},
},
},
} {
test := test
t.Run(fmt.Sprintf("%s %s", test.qs, test.direction), func(t *testing.T) {

@ -378,28 +378,42 @@ func rangeAggEvaluator(
expr *rangeAggregationExpr,
q Params,
) (StepEvaluator, error) {
vecIter := newRangeVectorIterator(entryIter, expr.left.interval.Nanoseconds(), q.Step().Nanoseconds(),
q.Start().UnixNano(), q.End().UnixNano())
var fn RangeVectorAggregator
switch expr.operation {
case OpTypeRate:
fn = rate(expr.left.interval)
case OpTypeCountOverTime:
fn = count
agg, err := expr.aggregator()
if err != nil {
return nil, err
}
extractor, err := expr.extractor()
if err != nil {
return nil, err
}
return rangeVectorEvaluator{
iter: newRangeVectorIterator(
newSeriesIterator(entryIter, extractor),
expr.left.interval.Nanoseconds(),
q.Step().Nanoseconds(),
q.Start().UnixNano(), q.End().UnixNano(),
),
agg: agg,
}, nil
}
return newStepEvaluator(func() (bool, int64, promql.Vector) {
next := vecIter.Next()
if !next {
return false, 0, promql.Vector{}
}
ts, vec := vecIter.At(fn)
return true, ts, vec
type rangeVectorEvaluator struct {
agg RangeVectorAggregator
iter RangeVectorIterator
}
}, vecIter.Close)
func (r rangeVectorEvaluator) Next() (bool, int64, promql.Vector) {
next := r.iter.Next()
if !next {
return false, 0, promql.Vector{}
}
ts, vec := r.iter.At(r.agg)
return true, ts, vec
}
func (r rangeVectorEvaluator) Close() error { return r.iter.Close() }
// binOpExpr explicitly does not handle when both legs are literals as
// it makes the type system simpler and these are reduced in mustNewBinOpExpr
func binOpStepEvaluator(

@ -52,6 +52,7 @@ import (
%token <duration> DURATION
%token <val> MATCHERS LABELS EQ NEQ RE NRE OPEN_BRACE CLOSE_BRACE OPEN_BRACKET CLOSE_BRACKET COMMA DOT PIPE_MATCH PIPE_EXACT
OPEN_PARENTHESIS CLOSE_PARENTHESIS BY WITHOUT COUNT_OVER_TIME RATE SUM AVG MAX MIN COUNT STDDEV STDVAR BOTTOMK TOPK
BYTES_OVER_TIME BYTES_RATE
// Operators are listed with increasing precedence.
%left <binOp> OR
@ -166,8 +167,10 @@ vectorOp:
;
rangeOp:
COUNT_OVER_TIME { $$ = OpTypeCountOverTime }
| RATE { $$ = OpTypeRate }
COUNT_OVER_TIME { $$ = OpRangeTypeCount }
| RATE { $$ = OpRangeTypeRate }
| BYTES_OVER_TIME { $$ = OpRangeTypeBytes }
| BYTES_RATE { $$ = OpRangeTypeBytesRate }
;

@ -69,15 +69,17 @@ const STDDEV = 57375
const STDVAR = 57376
const BOTTOMK = 57377
const TOPK = 57378
const OR = 57379
const AND = 57380
const UNLESS = 57381
const ADD = 57382
const SUB = 57383
const MUL = 57384
const DIV = 57385
const MOD = 57386
const POW = 57387
const BYTES_OVER_TIME = 57379
const BYTES_RATE = 57380
const OR = 57381
const AND = 57382
const UNLESS = 57383
const ADD = 57384
const SUB = 57385
const MUL = 57386
const DIV = 57387
const MOD = 57388
const POW = 57389
var exprToknames = [...]string{
"$end",
@ -116,6 +118,8 @@ var exprToknames = [...]string{
"STDVAR",
"BOTTOMK",
"TOPK",
"BYTES_OVER_TIME",
"BYTES_RATE",
"OR",
"AND",
"UNLESS",
@ -132,7 +136,7 @@ const exprEofCode = 1
const exprErrCode = 2
const exprInitialStackSize = 16
//line pkg/logql/expr.y:183
//line pkg/logql/expr.y:186
//line yacctab:1
var exprExca = [...]int{
@ -142,8 +146,6 @@ var exprExca = [...]int{
-1, 3,
1, 2,
23, 2,
37, 2,
38, 2,
39, 2,
40, 2,
41, 2,
@ -151,10 +153,10 @@ var exprExca = [...]int{
43, 2,
44, 2,
45, 2,
46, 2,
47, 2,
-2, 0,
-1, 44,
37, 2,
38, 2,
-1, 46,
39, 2,
40, 2,
41, 2,
@ -162,55 +164,58 @@ var exprExca = [...]int{
43, 2,
44, 2,
45, 2,
46, 2,
47, 2,
-2, 0,
}
const exprPrivate = 57344
const exprLast = 199
const exprLast = 202
var exprAct = [...]int{
52, 4, 37, 100, 48, 3, 78, 14, 43, 36,
112, 51, 44, 53, 54, 11, 31, 32, 33, 34,
35, 36, 108, 6, 53, 54, 97, 17, 18, 19,
20, 22, 23, 21, 24, 25, 26, 27, 82, 109,
109, 15, 16, 11, 111, 110, 11, 33, 34, 35,
36, 80, 85, 81, 6, 84, 79, 68, 17, 18,
19, 20, 22, 23, 21, 24, 25, 26, 27, 83,
50, 98, 15, 16, 73, 104, 67, 86, 103, 66,
90, 91, 56, 55, 89, 99, 95, 96, 88, 102,
29, 30, 31, 32, 33, 34, 35, 36, 106, 91,
107, 28, 29, 30, 31, 32, 33, 34, 35, 36,
45, 2, 38, 114, 87, 113, 92, 94, 47, 101,
49, 42, 49, 41, 10, 42, 9, 41, 13, 8,
39, 40, 5, 69, 39, 40, 12, 105, 71, 57,
58, 59, 60, 61, 62, 63, 64, 65, 92, 7,
46, 70, 38, 1, 72, 0, 0, 42, 0, 41,
0, 42, 0, 41, 0, 0, 39, 40, 0, 93,
39, 40, 38, 69, 0, 38, 0, 94, 0, 0,
0, 42, 0, 41, 42, 0, 41, 0, 0, 0,
39, 40, 0, 39, 40, 74, 75, 76, 77,
54, 4, 39, 102, 50, 3, 80, 14, 45, 38,
114, 53, 46, 55, 56, 11, 33, 34, 35, 36,
37, 38, 110, 6, 55, 56, 99, 17, 18, 21,
22, 24, 25, 23, 26, 27, 28, 29, 19, 20,
84, 111, 111, 15, 16, 11, 113, 112, 11, 35,
36, 37, 38, 82, 87, 83, 6, 86, 81, 70,
17, 18, 21, 22, 24, 25, 23, 26, 27, 28,
29, 19, 20, 85, 100, 52, 15, 16, 75, 88,
58, 106, 57, 93, 105, 47, 2, 101, 97, 98,
92, 104, 31, 32, 33, 34, 35, 36, 37, 38,
108, 93, 109, 30, 31, 32, 33, 34, 35, 36,
37, 38, 91, 90, 89, 116, 59, 60, 61, 62,
63, 64, 65, 66, 67, 40, 115, 10, 69, 94,
96, 68, 103, 49, 44, 51, 43, 51, 44, 9,
43, 73, 94, 41, 42, 13, 71, 41, 42, 8,
107, 44, 5, 43, 72, 40, 12, 74, 7, 48,
41, 42, 1, 95, 44, 0, 43, 0, 0, 0,
40, 0, 0, 41, 42, 96, 71, 0, 0, 44,
0, 43, 40, 76, 77, 78, 79, 0, 41, 42,
0, 44, 0, 43, 0, 0, 0, 0, 0, 0,
41, 42,
}
var exprPact = [...]int{
1, -1000, 64, 173, -1000, -1000, 1, -1000, -1000, -1000,
-1000, 116, 48, -11, -1000, 77, 76, -1000, -1000, -1000,
-1000, -1000, -1000, -1000, -1000, -1000, -1000, -1000, 1, 1,
1, 1, 1, 1, 1, 1, 1, 74, -1000, -1000,
-1000, -1000, -1000, 34, 150, 64, 136, 59, -1000, 185,
29, 32, 47, 33, 30, -1000, -1000, 52, -24, -24,
5, 5, -36, -36, -36, -36, -1000, -1000, -1000, -1000,
-1000, -1000, 118, -1000, 109, 83, 79, 75, 146, 170,
29, 3, 53, 1, 115, 115, -1000, -1000, -1000, -1000,
-1000, 73, -1000, -1000, -1000, 110, 114, 0, 1, -1,
22, -1000, 21, -1000, -1000, -1000, -1000, -13, -1000, 111,
-1000, -1000, 0, -1000, -1000,
1, -1000, 64, 180, -1000, -1000, 1, -1000, -1000, -1000,
-1000, 131, 53, -11, -1000, 76, 74, -1000, -1000, -1000,
-1000, -1000, -1000, -1000, -1000, -1000, -1000, -1000, -1000, -1000,
1, 1, 1, 1, 1, 1, 1, 1, 1, 126,
-1000, -1000, -1000, -1000, -1000, 36, 153, 64, 139, 63,
-1000, 173, 31, 34, 51, 35, 32, -1000, -1000, 52,
-26, -26, 5, 5, -38, -38, -38, -38, -1000, -1000,
-1000, -1000, -1000, -1000, 133, -1000, 109, 108, 107, 85,
140, 168, 31, 3, 56, 1, 128, 128, -1000, -1000,
-1000, -1000, -1000, 79, -1000, -1000, -1000, 123, 127, 0,
1, -1, 24, -1000, 23, -1000, -1000, -1000, -1000, -13,
-1000, 122, -1000, -1000, 0, -1000, -1000,
}
var exprPgo = [...]int{
0, 153, 110, 2, 0, 3, 5, 1, 6, 4,
150, 149, 136, 132, 129, 128, 126, 124,
0, 162, 85, 2, 0, 3, 5, 1, 6, 4,
159, 158, 156, 152, 149, 145, 139, 127,
}
var exprR1 = [...]int{
@ -220,7 +225,7 @@ var exprR1 = [...]int{
13, 13, 10, 10, 9, 9, 9, 9, 16, 16,
16, 16, 16, 16, 16, 16, 16, 17, 17, 17,
15, 15, 15, 15, 15, 15, 15, 15, 15, 12,
12, 5, 5, 4, 4,
12, 12, 12, 5, 5, 4, 4,
}
var exprR2 = [...]int{
@ -230,37 +235,37 @@ var exprR2 = [...]int{
3, 3, 1, 3, 3, 3, 3, 3, 3, 3,
3, 3, 3, 3, 3, 3, 3, 1, 2, 2,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 3, 4, 4,
1, 1, 1, 1, 3, 4, 4,
}
var exprChk = [...]int{
-1000, -1, -2, -6, -7, -13, 22, -11, -14, -16,
-17, 14, -12, -15, 6, 40, 41, 26, 27, 28,
29, 32, 30, 31, 33, 34, 35, 36, 37, 38,
39, 40, 41, 42, 43, 44, 45, -3, 2, 20,
21, 13, 11, -7, -6, -2, -10, 2, -9, 4,
22, 22, -4, 24, 25, 6, 6, -2, -2, -2,
-2, -2, -2, -2, -2, -2, 5, 2, 23, 23,
15, 2, 18, 15, 10, 11, 12, 13, -8, -6,
22, -7, 6, 22, 22, 22, -9, 5, 5, 5,
5, -3, 2, 23, 7, -6, -8, 23, 18, -7,
-5, 4, -5, 5, 2, 23, -4, -7, 23, 18,
23, 23, 23, 4, -4,
-17, 14, -12, -15, 6, 42, 43, 26, 27, 37,
38, 28, 29, 32, 30, 31, 33, 34, 35, 36,
39, 40, 41, 42, 43, 44, 45, 46, 47, -3,
2, 20, 21, 13, 11, -7, -6, -2, -10, 2,
-9, 4, 22, 22, -4, 24, 25, 6, 6, -2,
-2, -2, -2, -2, -2, -2, -2, -2, 5, 2,
23, 23, 15, 2, 18, 15, 10, 11, 12, 13,
-8, -6, 22, -7, 6, 22, 22, 22, -9, 5,
5, 5, 5, -3, 2, 23, 7, -6, -8, 23,
18, -7, -5, 4, -5, 5, 2, 23, -4, -7,
23, 18, 23, 23, 23, 4, -4,
}
var exprDef = [...]int{
0, -2, 1, -2, 3, 9, 0, 4, 5, 6,
7, 0, 0, 0, 47, 0, 0, 59, 60, 50,
51, 52, 53, 54, 55, 56, 57, 58, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 13, 25,
26, 27, 28, 3, -2, 0, 0, 0, 32, 0,
0, 0, 0, 0, 0, 48, 49, 38, 39, 40,
41, 42, 43, 44, 45, 46, 10, 12, 8, 11,
29, 30, 0, 31, 0, 0, 0, 0, 0, 0,
0, 3, 47, 0, 0, 0, 33, 34, 35, 36,
37, 0, 18, 19, 14, 0, 0, 20, 0, 3,
0, 61, 0, 15, 17, 16, 22, 3, 21, 0,
63, 64, 23, 62, 24,
7, 0, 0, 0, 47, 0, 0, 59, 60, 61,
62, 50, 51, 52, 53, 54, 55, 56, 57, 58,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
13, 25, 26, 27, 28, 3, -2, 0, 0, 0,
32, 0, 0, 0, 0, 0, 0, 48, 49, 38,
39, 40, 41, 42, 43, 44, 45, 46, 10, 12,
8, 11, 29, 30, 0, 31, 0, 0, 0, 0,
0, 0, 0, 3, 47, 0, 0, 0, 33, 34,
35, 36, 37, 0, 18, 19, 14, 0, 0, 20,
0, 3, 0, 63, 0, 15, 17, 16, 22, 3,
21, 0, 65, 66, 23, 64, 24,
}
var exprTok1 = [...]int{
@ -272,7 +277,7 @@ var exprTok2 = [...]int{
12, 13, 14, 15, 16, 17, 18, 19, 20, 21,
22, 23, 24, 25, 26, 27, 28, 29, 30, 31,
32, 33, 34, 35, 36, 37, 38, 39, 40, 41,
42, 43, 44, 45,
42, 43, 44, 45, 46, 47,
}
var exprTok3 = [...]int{
0,
@ -617,360 +622,372 @@ exprdefault:
case 1:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:65
//line pkg/logql/expr.y:66
{
exprlex.(*lexer).expr = exprDollar[1].Expr
}
case 2:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:68
//line pkg/logql/expr.y:69
{
exprVAL.Expr = exprDollar[1].LogExpr
}
case 3:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:69
//line pkg/logql/expr.y:70
{
exprVAL.Expr = exprDollar[1].MetricExpr
}
case 4:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:73
//line pkg/logql/expr.y:74
{
exprVAL.MetricExpr = exprDollar[1].RangeAggregationExpr
}
case 5:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:74
//line pkg/logql/expr.y:75
{
exprVAL.MetricExpr = exprDollar[1].VectorAggregationExpr
}
case 6:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:75
//line pkg/logql/expr.y:76
{
exprVAL.MetricExpr = exprDollar[1].BinOpExpr
}
case 7:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:76
//line pkg/logql/expr.y:77
{
exprVAL.MetricExpr = exprDollar[1].LiteralExpr
}
case 8:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:77
//line pkg/logql/expr.y:78
{
exprVAL.MetricExpr = exprDollar[2].MetricExpr
}
case 9:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:81
//line pkg/logql/expr.y:82
{
exprVAL.LogExpr = newMatcherExpr(exprDollar[1].Selector)
}
case 10:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:82
//line pkg/logql/expr.y:83
{
exprVAL.LogExpr = NewFilterExpr(exprDollar[1].LogExpr, exprDollar[2].Filter, exprDollar[3].str)
}
case 11:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:83
//line pkg/logql/expr.y:84
{
exprVAL.LogExpr = exprDollar[2].LogExpr
}
case 14:
exprDollar = exprS[exprpt-2 : exprpt+1]
//line pkg/logql/expr.y:89
//line pkg/logql/expr.y:90
{
exprVAL.LogRangeExpr = newLogRange(exprDollar[1].LogExpr, exprDollar[2].duration)
}
case 15:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:90
//line pkg/logql/expr.y:91
{
exprVAL.LogRangeExpr = addFilterToLogRangeExpr(exprDollar[1].LogRangeExpr, exprDollar[2].Filter, exprDollar[3].str)
}
case 16:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:91
//line pkg/logql/expr.y:92
{
exprVAL.LogRangeExpr = exprDollar[2].LogRangeExpr
}
case 19:
exprDollar = exprS[exprpt-4 : exprpt+1]
//line pkg/logql/expr.y:96
//line pkg/logql/expr.y:97
{
exprVAL.RangeAggregationExpr = newRangeAggregationExpr(exprDollar[3].LogRangeExpr, exprDollar[1].RangeOp)
}
case 20:
exprDollar = exprS[exprpt-4 : exprpt+1]
//line pkg/logql/expr.y:100
//line pkg/logql/expr.y:101
{
exprVAL.VectorAggregationExpr = mustNewVectorAggregationExpr(exprDollar[3].MetricExpr, exprDollar[1].VectorOp, nil, nil)
}
case 21:
exprDollar = exprS[exprpt-5 : exprpt+1]
//line pkg/logql/expr.y:101
//line pkg/logql/expr.y:102
{
exprVAL.VectorAggregationExpr = mustNewVectorAggregationExpr(exprDollar[4].MetricExpr, exprDollar[1].VectorOp, exprDollar[2].Grouping, nil)
}
case 22:
exprDollar = exprS[exprpt-5 : exprpt+1]
//line pkg/logql/expr.y:102
//line pkg/logql/expr.y:103
{
exprVAL.VectorAggregationExpr = mustNewVectorAggregationExpr(exprDollar[3].MetricExpr, exprDollar[1].VectorOp, exprDollar[5].Grouping, nil)
}
case 23:
exprDollar = exprS[exprpt-6 : exprpt+1]
//line pkg/logql/expr.y:104
//line pkg/logql/expr.y:105
{
exprVAL.VectorAggregationExpr = mustNewVectorAggregationExpr(exprDollar[5].MetricExpr, exprDollar[1].VectorOp, nil, &exprDollar[3].str)
}
case 24:
exprDollar = exprS[exprpt-7 : exprpt+1]
//line pkg/logql/expr.y:105
//line pkg/logql/expr.y:106
{
exprVAL.VectorAggregationExpr = mustNewVectorAggregationExpr(exprDollar[5].MetricExpr, exprDollar[1].VectorOp, exprDollar[7].Grouping, &exprDollar[3].str)
}
case 25:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:109
//line pkg/logql/expr.y:110
{
exprVAL.Filter = labels.MatchRegexp
}
case 26:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:110
//line pkg/logql/expr.y:111
{
exprVAL.Filter = labels.MatchEqual
}
case 27:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:111
//line pkg/logql/expr.y:112
{
exprVAL.Filter = labels.MatchNotRegexp
}
case 28:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:112
//line pkg/logql/expr.y:113
{
exprVAL.Filter = labels.MatchNotEqual
}
case 29:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:116
//line pkg/logql/expr.y:117
{
exprVAL.Selector = exprDollar[2].Matchers
}
case 30:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:117
//line pkg/logql/expr.y:118
{
exprVAL.Selector = exprDollar[2].Matchers
}
case 31:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:118
//line pkg/logql/expr.y:119
{
}
case 32:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:122
//line pkg/logql/expr.y:123
{
exprVAL.Matchers = []*labels.Matcher{exprDollar[1].Matcher}
}
case 33:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:123
//line pkg/logql/expr.y:124
{
exprVAL.Matchers = append(exprDollar[1].Matchers, exprDollar[3].Matcher)
}
case 34:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:127
//line pkg/logql/expr.y:128
{
exprVAL.Matcher = mustNewMatcher(labels.MatchEqual, exprDollar[1].str, exprDollar[3].str)
}
case 35:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:128
//line pkg/logql/expr.y:129
{
exprVAL.Matcher = mustNewMatcher(labels.MatchNotEqual, exprDollar[1].str, exprDollar[3].str)
}
case 36:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:129
//line pkg/logql/expr.y:130
{
exprVAL.Matcher = mustNewMatcher(labels.MatchRegexp, exprDollar[1].str, exprDollar[3].str)
}
case 37:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:130
//line pkg/logql/expr.y:131
{
exprVAL.Matcher = mustNewMatcher(labels.MatchNotRegexp, exprDollar[1].str, exprDollar[3].str)
}
case 38:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:139
//line pkg/logql/expr.y:140
{
exprVAL.BinOpExpr = mustNewBinOpExpr("or", exprDollar[1].Expr, exprDollar[3].Expr)
}
case 39:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:140
//line pkg/logql/expr.y:141
{
exprVAL.BinOpExpr = mustNewBinOpExpr("and", exprDollar[1].Expr, exprDollar[3].Expr)
}
case 40:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:141
//line pkg/logql/expr.y:142
{
exprVAL.BinOpExpr = mustNewBinOpExpr("unless", exprDollar[1].Expr, exprDollar[3].Expr)
}
case 41:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:142
//line pkg/logql/expr.y:143
{
exprVAL.BinOpExpr = mustNewBinOpExpr("+", exprDollar[1].Expr, exprDollar[3].Expr)
}
case 42:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:143
//line pkg/logql/expr.y:144
{
exprVAL.BinOpExpr = mustNewBinOpExpr("-", exprDollar[1].Expr, exprDollar[3].Expr)
}
case 43:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:144
//line pkg/logql/expr.y:145
{
exprVAL.BinOpExpr = mustNewBinOpExpr("*", exprDollar[1].Expr, exprDollar[3].Expr)
}
case 44:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:145
//line pkg/logql/expr.y:146
{
exprVAL.BinOpExpr = mustNewBinOpExpr("/", exprDollar[1].Expr, exprDollar[3].Expr)
}
case 45:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:146
//line pkg/logql/expr.y:147
{
exprVAL.BinOpExpr = mustNewBinOpExpr("%", exprDollar[1].Expr, exprDollar[3].Expr)
}
case 46:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:147
//line pkg/logql/expr.y:148
{
exprVAL.BinOpExpr = mustNewBinOpExpr("^", exprDollar[1].Expr, exprDollar[3].Expr)
}
case 47:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:151
//line pkg/logql/expr.y:152
{
exprVAL.LiteralExpr = mustNewLiteralExpr(exprDollar[1].str, false)
}
case 48:
exprDollar = exprS[exprpt-2 : exprpt+1]
//line pkg/logql/expr.y:152
//line pkg/logql/expr.y:153
{
exprVAL.LiteralExpr = mustNewLiteralExpr(exprDollar[2].str, false)
}
case 49:
exprDollar = exprS[exprpt-2 : exprpt+1]
//line pkg/logql/expr.y:153
//line pkg/logql/expr.y:154
{
exprVAL.LiteralExpr = mustNewLiteralExpr(exprDollar[2].str, true)
}
case 50:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:157
//line pkg/logql/expr.y:158
{
exprVAL.VectorOp = OpTypeSum
}
case 51:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:158
//line pkg/logql/expr.y:159
{
exprVAL.VectorOp = OpTypeAvg
}
case 52:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:159
//line pkg/logql/expr.y:160
{
exprVAL.VectorOp = OpTypeCount
}
case 53:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:160
//line pkg/logql/expr.y:161
{
exprVAL.VectorOp = OpTypeMax
}
case 54:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:161
//line pkg/logql/expr.y:162
{
exprVAL.VectorOp = OpTypeMin
}
case 55:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:162
//line pkg/logql/expr.y:163
{
exprVAL.VectorOp = OpTypeStddev
}
case 56:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:163
//line pkg/logql/expr.y:164
{
exprVAL.VectorOp = OpTypeStdvar
}
case 57:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:164
//line pkg/logql/expr.y:165
{
exprVAL.VectorOp = OpTypeBottomK
}
case 58:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:165
//line pkg/logql/expr.y:166
{
exprVAL.VectorOp = OpTypeTopK
}
case 59:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:169
//line pkg/logql/expr.y:170
{
exprVAL.RangeOp = OpTypeCountOverTime
exprVAL.RangeOp = OpRangeTypeCount
}
case 60:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:170
//line pkg/logql/expr.y:171
{
exprVAL.RangeOp = OpTypeRate
exprVAL.RangeOp = OpRangeTypeRate
}
case 61:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:175
//line pkg/logql/expr.y:172
{
exprVAL.Labels = []string{exprDollar[1].str}
exprVAL.RangeOp = OpRangeTypeBytes
}
case 62:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:173
{
exprVAL.RangeOp = OpRangeTypeBytesRate
}
case 63:
exprDollar = exprS[exprpt-1 : exprpt+1]
//line pkg/logql/expr.y:178
{
exprVAL.Labels = []string{exprDollar[1].str}
}
case 64:
exprDollar = exprS[exprpt-3 : exprpt+1]
//line pkg/logql/expr.y:176
//line pkg/logql/expr.y:179
{
exprVAL.Labels = append(exprDollar[1].Labels, exprDollar[3].str)
}
case 63:
case 65:
exprDollar = exprS[exprpt-4 : exprpt+1]
//line pkg/logql/expr.y:180
//line pkg/logql/expr.y:183
{
exprVAL.Grouping = &grouping{without: false, groups: exprDollar[3].Labels}
}
case 64:
case 66:
exprDollar = exprS[exprpt-4 : exprpt+1]
//line pkg/logql/expr.y:181
//line pkg/logql/expr.y:184
{
exprVAL.Grouping = &grouping{without: true, groups: exprDollar[3].Labels}
}

@ -0,0 +1,63 @@
package logql
import (
"fmt"
"time"
"github.com/prometheus/prometheus/promql"
)
const unsupportedErr = "unsupported range vector aggregation operation: %s"
func (r rangeAggregationExpr) extractor() (SampleExtractor, error) {
switch r.operation {
case OpRangeTypeRate, OpRangeTypeCount:
return extractCount, nil
case OpRangeTypeBytes, OpRangeTypeBytesRate:
return extractBytes, nil
default:
return nil, fmt.Errorf(unsupportedErr, r.operation)
}
}
func (r rangeAggregationExpr) aggregator() (RangeVectorAggregator, error) {
switch r.operation {
case OpRangeTypeRate:
return rateLogs(r.left.interval), nil
case OpRangeTypeCount:
return countOverTime, nil
case OpRangeTypeBytesRate:
return rateLogBytes(r.left.interval), nil
case OpRangeTypeBytes:
return sumOverTime, nil
default:
return nil, fmt.Errorf(unsupportedErr, r.operation)
}
}
// rateLogs calculates the per-second rate of log lines.
func rateLogs(selRange time.Duration) func(samples []promql.Point) float64 {
return func(samples []promql.Point) float64 {
return float64(len(samples)) / selRange.Seconds()
}
}
// rateLogBytes calculates the per-second rate of log bytes.
func rateLogBytes(selRange time.Duration) func(samples []promql.Point) float64 {
return func(samples []promql.Point) float64 {
return sumOverTime(samples) / selRange.Seconds()
}
}
// countOverTime counts the amount of log lines.
func countOverTime(samples []promql.Point) float64 {
return float64(len(samples))
}
func sumOverTime(samples []promql.Point) float64 {
var sum float64
for _, v := range samples {
sum += v.V
}
return sum
}

@ -9,33 +9,35 @@ import (
)
var tokens = map[string]int{
",": COMMA,
".": DOT,
"{": OPEN_BRACE,
"}": CLOSE_BRACE,
"=": EQ,
"!=": NEQ,
"=~": RE,
"!~": NRE,
"|=": PIPE_EXACT,
"|~": PIPE_MATCH,
"(": OPEN_PARENTHESIS,
")": CLOSE_PARENTHESIS,
"by": BY,
"without": WITHOUT,
OpTypeCountOverTime: COUNT_OVER_TIME,
"[": OPEN_BRACKET,
"]": CLOSE_BRACKET,
OpTypeRate: RATE,
OpTypeSum: SUM,
OpTypeAvg: AVG,
OpTypeMax: MAX,
OpTypeMin: MIN,
OpTypeCount: COUNT,
OpTypeStddev: STDDEV,
OpTypeStdvar: STDVAR,
OpTypeBottomK: BOTTOMK,
OpTypeTopK: TOPK,
",": COMMA,
".": DOT,
"{": OPEN_BRACE,
"}": CLOSE_BRACE,
"=": EQ,
"!=": NEQ,
"=~": RE,
"!~": NRE,
"|=": PIPE_EXACT,
"|~": PIPE_MATCH,
"(": OPEN_PARENTHESIS,
")": CLOSE_PARENTHESIS,
"by": BY,
"without": WITHOUT,
"[": OPEN_BRACKET,
"]": CLOSE_BRACKET,
OpRangeTypeRate: RATE,
OpRangeTypeCount: COUNT_OVER_TIME,
OpRangeTypeBytesRate: BYTES_RATE,
OpRangeTypeBytes: BYTES_OVER_TIME,
OpTypeSum: SUM,
OpTypeAvg: AVG,
OpTypeMax: MAX,
OpTypeMin: MIN,
OpTypeCount: COUNT,
OpTypeStddev: STDDEV,
OpTypeStdvar: STDVAR,
OpTypeBottomK: BOTTOMK,
OpTypeTopK: TOPK,
// binops
OpTypeOr: OR,

@ -107,6 +107,26 @@ func TestParse(t *testing.T) {
operation: "count_over_time",
},
},
{
in: `bytes_over_time({ foo !~ "bar" }[12m])`,
exp: &rangeAggregationExpr{
left: &logRange{
left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}},
interval: 12 * time.Minute,
},
operation: OpRangeTypeBytes,
},
},
{
in: `bytes_rate({ foo !~ "bar" }[12m])`,
exp: &rangeAggregationExpr{
left: &logRange{
left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchNotRegexp, "foo", "bar")}},
interval: 12 * time.Minute,
},
operation: OpRangeTypeBytesRate,
},
},
{
in: `rate({ foo !~ "bar" }[5h])`,
exp: &rangeAggregationExpr{
@ -355,7 +375,31 @@ func TestParse(t *testing.T) {
match: "flap",
},
interval: 5 * time.Minute,
}, OpTypeCountOverTime),
}, OpRangeTypeCount),
},
{
in: `bytes_over_time(({foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap")[5m])`,
exp: newRangeAggregationExpr(
&logRange{
left: &filterExpr{
left: &filterExpr{
left: &filterExpr{
left: &filterExpr{
left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}},
ty: labels.MatchEqual,
match: "baz",
},
ty: labels.MatchRegexp,
match: "blip",
},
ty: labels.MatchNotEqual,
match: "flip",
},
ty: labels.MatchNotRegexp,
match: "flap",
},
interval: 5 * time.Minute,
}, OpRangeTypeBytes),
},
{
in: `sum(count_over_time(({foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap")[5m])) by (foo)`,
@ -379,7 +423,37 @@ func TestParse(t *testing.T) {
match: "flap",
},
interval: 5 * time.Minute,
}, OpTypeCountOverTime),
}, OpRangeTypeCount),
"sum",
&grouping{
without: false,
groups: []string{"foo"},
},
nil),
},
{
in: `sum(bytes_rate(({foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap")[5m])) by (foo)`,
exp: mustNewVectorAggregationExpr(newRangeAggregationExpr(
&logRange{
left: &filterExpr{
left: &filterExpr{
left: &filterExpr{
left: &filterExpr{
left: &matchersExpr{matchers: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")}},
ty: labels.MatchEqual,
match: "baz",
},
ty: labels.MatchRegexp,
match: "blip",
},
ty: labels.MatchNotEqual,
match: "flip",
},
ty: labels.MatchNotRegexp,
match: "flap",
},
interval: 5 * time.Minute,
}, OpRangeTypeBytesRate),
"sum",
&grouping{
without: false,
@ -409,7 +483,7 @@ func TestParse(t *testing.T) {
match: "flap",
},
interval: 5 * time.Minute,
}, OpTypeCountOverTime),
}, OpRangeTypeCount),
"topk",
&grouping{
without: true,
@ -441,7 +515,7 @@ func TestParse(t *testing.T) {
match: "flap",
},
interval: 5 * time.Minute,
}, OpTypeRate),
}, OpRangeTypeRate),
"sum",
&grouping{
without: false,
@ -474,7 +548,7 @@ func TestParse(t *testing.T) {
match: "flap",
},
interval: 5 * time.Minute,
}, OpTypeCountOverTime),
}, OpRangeTypeCount),
},
{
in: `sum(count_over_time({foo="bar"}[5m] |= "baz" |~ "blip" != "flip" !~ "flap")) by (foo)`,
@ -498,7 +572,7 @@ func TestParse(t *testing.T) {
match: "flap",
},
interval: 5 * time.Minute,
}, OpTypeCountOverTime),
}, OpRangeTypeCount),
"sum",
&grouping{
without: false,
@ -528,7 +602,7 @@ func TestParse(t *testing.T) {
match: "flap",
},
interval: 5 * time.Minute,
}, OpTypeCountOverTime),
}, OpRangeTypeCount),
"topk",
&grouping{
without: true,
@ -560,7 +634,7 @@ func TestParse(t *testing.T) {
match: "flap",
},
interval: 5 * time.Minute,
}, OpTypeRate),
}, OpRangeTypeRate),
"sum",
&grouping{
without: false,
@ -632,7 +706,7 @@ func TestParse(t *testing.T) {
},
},
interval: 5 * time.Minute,
}, OpTypeCountOverTime),
}, OpRangeTypeCount),
"sum",
&grouping{
without: false,
@ -648,7 +722,7 @@ func TestParse(t *testing.T) {
},
},
interval: 5 * time.Minute,
}, OpTypeCountOverTime),
}, OpRangeTypeCount),
"sum",
&grouping{
without: false,
@ -665,7 +739,7 @@ func TestParse(t *testing.T) {
},
},
interval: 5 * time.Minute,
}, OpTypeCountOverTime),
}, OpRangeTypeCount),
"sum",
&grouping{
without: false,
@ -693,7 +767,7 @@ func TestParse(t *testing.T) {
},
},
interval: 5 * time.Minute,
}, OpTypeCountOverTime),
}, OpRangeTypeCount),
"sum",
&grouping{
without: false,
@ -709,7 +783,7 @@ func TestParse(t *testing.T) {
},
},
interval: 5 * time.Minute,
}, OpTypeCountOverTime),
}, OpRangeTypeCount),
"sum",
&grouping{
without: false,
@ -726,7 +800,7 @@ func TestParse(t *testing.T) {
},
},
interval: 5 * time.Minute,
}, OpTypeCountOverTime),
}, OpRangeTypeCount),
"sum",
&grouping{
without: false,
@ -753,7 +827,7 @@ func TestParse(t *testing.T) {
},
},
interval: 5 * time.Minute,
}, OpTypeCountOverTime),
}, OpRangeTypeCount),
"sum",
&grouping{
without: false,
@ -771,7 +845,7 @@ func TestParse(t *testing.T) {
},
},
interval: 5 * time.Minute,
}, OpTypeCountOverTime),
}, OpRangeTypeCount),
"sum",
&grouping{
without: false,
@ -787,7 +861,7 @@ func TestParse(t *testing.T) {
},
},
interval: 5 * time.Minute,
}, OpTypeCountOverTime),
}, OpRangeTypeCount),
"sum",
&grouping{
without: false,
@ -818,7 +892,7 @@ func TestParse(t *testing.T) {
ty: labels.MatchEqual,
},
interval: 5 * time.Minute,
}, OpTypeCountOverTime),
}, OpRangeTypeCount),
newRangeAggregationExpr(
&logRange{
left: &matchersExpr{
@ -827,7 +901,7 @@ func TestParse(t *testing.T) {
},
},
interval: 5 * time.Minute,
}, OpTypeCountOverTime)), OpTypeSum, &grouping{groups: []string{"job"}}, nil),
}, OpRangeTypeCount)), OpTypeSum, &grouping{groups: []string{"job"}}, nil),
},
{
in: `sum by (job) (
@ -849,7 +923,7 @@ func TestParse(t *testing.T) {
ty: labels.MatchEqual,
},
interval: 5 * time.Minute,
}, OpTypeCountOverTime),
}, OpRangeTypeCount),
newRangeAggregationExpr(
&logRange{
left: &matchersExpr{
@ -858,7 +932,7 @@ func TestParse(t *testing.T) {
},
},
interval: 5 * time.Minute,
}, OpTypeCountOverTime)), OpTypeSum, &grouping{groups: []string{"job"}}, nil),
}, OpRangeTypeCount)), OpTypeSum, &grouping{groups: []string{"job"}}, nil),
mustNewLiteralExpr("100", false),
),
},
@ -876,7 +950,7 @@ func TestParse(t *testing.T) {
},
},
interval: 5 * time.Minute,
}, OpTypeCountOverTime),
}, OpRangeTypeCount),
"sum",
&grouping{
without: false,

@ -6,14 +6,12 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/grafana/loki/pkg/iter"
)
// RangeVectorAggregator aggregates samples for a given range of samples.
// It receives the current milliseconds timestamp and the list of point within
// the range.
type RangeVectorAggregator func(int64, []promql.Point) float64
type RangeVectorAggregator func([]promql.Point) float64
// RangeVectorIterator iterates through a range of samples.
// To fetch the current vector use `At` with a `RangeVectorAggregator`.
@ -24,21 +22,21 @@ type RangeVectorIterator interface {
}
type rangeVectorIterator struct {
iter iter.PeekingEntryIterator
iter SeriesIterator
selRange, step, end, current int64
window map[string]*promql.Series
metrics map[string]labels.Labels
}
func newRangeVectorIterator(
it iter.EntryIterator,
it SeriesIterator,
selRange, step, start, end int64) *rangeVectorIterator {
// forces at least one step.
if step == 0 {
step = 1
}
return &rangeVectorIterator{
iter: iter.NewPeekingIterator(it),
iter: it,
step: step,
end: end,
selRange: selRange,
@ -93,38 +91,38 @@ func (r *rangeVectorIterator) popBack(newStart int64) {
// load the next sample range window.
func (r *rangeVectorIterator) load(start, end int64) {
for lbs, entry, hasNext := r.iter.Peek(); hasNext; lbs, entry, hasNext = r.iter.Peek() {
if entry.Timestamp.UnixNano() > end {
for sample, hasNext := r.iter.Peek(); hasNext; sample, hasNext = r.iter.Peek() {
if sample.TimestampNano > end {
// not consuming the iterator as this belong to another range.
return
}
// the lower bound of the range is not inclusive
if entry.Timestamp.UnixNano() <= start {
if sample.TimestampNano <= start {
_ = r.iter.Next()
continue
}
// adds the sample.
var series *promql.Series
var ok bool
series, ok = r.window[lbs]
series, ok = r.window[sample.Labels]
if !ok {
var metric labels.Labels
if metric, ok = r.metrics[lbs]; !ok {
if metric, ok = r.metrics[sample.Labels]; !ok {
var err error
metric, err = parser.ParseMetric(lbs)
metric, err = parser.ParseMetric(sample.Labels)
if err != nil {
continue
}
r.metrics[lbs] = metric
r.metrics[sample.Labels] = metric
}
series = getSeries()
series.Metric = metric
r.window[lbs] = series
r.window[sample.Labels] = series
}
p := promql.Point{
T: entry.Timestamp.UnixNano(),
V: 1,
T: sample.TimestampNano,
V: sample.Value,
}
series.Points = append(series.Points, p)
_ = r.iter.Next()
@ -138,7 +136,7 @@ func (r *rangeVectorIterator) At(aggregator RangeVectorAggregator) (int64, promq
for _, series := range r.window {
result = append(result, promql.Sample{
Point: promql.Point{
V: aggregator(ts, series.Points),
V: aggregator(series.Points),
T: ts,
},
Metric: series.Metric,

@ -44,6 +44,13 @@ func newEntryIterator() iter.EntryIterator {
}, logproto.FORWARD)
}
func newfakeSeriesIterator() SeriesIterator {
return &seriesIterator{
iter: iter.NewPeekingIterator(newEntryIterator()),
sampler: extractCount,
}
}
func newPoint(t time.Time, v float64) promql.Point {
return promql.Point{T: t.UnixNano() / 1e+6, V: v}
}
@ -123,7 +130,7 @@ func Test_RangeVectorIterator(t *testing.T) {
time.Unix(10, 0), time.Unix(100, 0),
},
{
(50 * time.Second).Nanoseconds(), // all step are overlaping
(50 * time.Second).Nanoseconds(), // all step are overlapping
(10 * time.Second).Nanoseconds(),
[]promql.Vector{
[]promql.Sample{
@ -144,12 +151,12 @@ func Test_RangeVectorIterator(t *testing.T) {
t.Run(
fmt.Sprintf("logs[%s] - step: %s", time.Duration(tt.selRange), time.Duration(tt.step)),
func(t *testing.T) {
it := newRangeVectorIterator(newEntryIterator(), tt.selRange,
it := newRangeVectorIterator(newfakeSeriesIterator(), tt.selRange,
tt.step, tt.start.UnixNano(), tt.end.UnixNano())
i := 0
for it.Next() {
ts, v := it.At(count)
ts, v := it.At(countOverTime)
require.ElementsMatch(t, tt.expectedVectors[i], v)
require.Equal(t, tt.expectedTs[i].UnixNano()/1e+6, ts)
i++

@ -0,0 +1,99 @@
package logql
import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
)
var (
extractBytes = bytesSampleExtractor{}
extractCount = countSampleExtractor{}
)
// SeriesIterator is an iterator that iterate over a stream of logs and returns sample.
type SeriesIterator interface {
Close() error
Next() bool
Peek() (Sample, bool)
}
// Sample is a series sample
type Sample struct {
Labels string
Value float64
TimestampNano int64
}
type seriesIterator struct {
iter iter.PeekingEntryIterator
sampler SampleExtractor
updated bool
cur Sample
}
func newSeriesIterator(it iter.EntryIterator, sampler SampleExtractor) SeriesIterator {
return &seriesIterator{
iter: iter.NewPeekingIterator(it),
sampler: sampler,
}
}
func (e *seriesIterator) Close() error {
return e.iter.Close()
}
func (e *seriesIterator) Next() bool {
e.updated = false
return e.iter.Next()
}
func (e *seriesIterator) Peek() (Sample, bool) {
if e.updated {
return e.cur, true
}
for {
lbs, entry, ok := e.iter.Peek()
if !ok {
return Sample{}, false
}
// transform
e.cur, ok = e.sampler.From(lbs, entry)
if ok {
break
}
if !e.iter.Next() {
return Sample{}, false
}
}
e.updated = true
return e.cur, true
}
// SampleExtractor transforms a log entry into a sample.
// In case of failure the second return value will be false.
type SampleExtractor interface {
From(labels string, e logproto.Entry) (Sample, bool)
}
type countSampleExtractor struct{}
func (countSampleExtractor) From(lbs string, entry logproto.Entry) (Sample, bool) {
return Sample{
Labels: lbs,
TimestampNano: entry.Timestamp.UnixNano(),
Value: 1.,
}, true
}
type bytesSampleExtractor struct{}
func (bytesSampleExtractor) From(lbs string, entry logproto.Entry) (Sample, bool) {
return Sample{
Labels: lbs,
TimestampNano: entry.Timestamp.UnixNano(),
Value: float64(len(entry.Line)),
}, true
}

@ -0,0 +1,159 @@
package logql
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
)
func Test_seriesIterator_Peek(t *testing.T) {
type expectation struct {
ok bool
sample Sample
}
for _, test := range []struct {
name string
it SeriesIterator
expectations []expectation
}{
{
"count",
newSeriesIterator(iter.NewStreamIterator(newStream(5, identity, `{app="foo"}`)), extractCount),
[]expectation{
{true, Sample{Labels: `{app="foo"}`, TimestampNano: 0, Value: 1}},
{true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(1, 0).UnixNano(), Value: 1}},
{true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(2, 0).UnixNano(), Value: 1}},
{true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(3, 0).UnixNano(), Value: 1}},
{true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(4, 0).UnixNano(), Value: 1}},
{false, Sample{}},
},
},
{
"bytes empty",
newSeriesIterator(
iter.NewStreamIterator(
newStream(
3,
func(i int64) logproto.Entry {
return logproto.Entry{
Timestamp: time.Unix(i, 0),
}
},
`{app="foo"}`,
),
),
extractBytes,
),
[]expectation{
{true, Sample{Labels: `{app="foo"}`, TimestampNano: 0, Value: 0}},
{true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(1, 0).UnixNano(), Value: 0}},
{true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(2, 0).UnixNano(), Value: 0}},
{false, Sample{}},
},
},
{
"bytes",
newSeriesIterator(
iter.NewStreamIterator(
newStream(
3,
func(i int64) logproto.Entry {
return logproto.Entry{
Timestamp: time.Unix(i, 0),
Line: "foo",
}
},
`{app="foo"}`,
),
),
extractBytes,
),
[]expectation{
{true, Sample{Labels: `{app="foo"}`, TimestampNano: 0, Value: 3}},
{true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(1, 0).UnixNano(), Value: 3}},
{true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(2, 0).UnixNano(), Value: 3}},
{false, Sample{}},
},
},
{
"bytes backward",
newSeriesIterator(
iter.NewStreamsIterator(context.Background(),
[]logproto.Stream{
newStream(
3,
func(i int64) logproto.Entry {
return logproto.Entry{
Timestamp: time.Unix(i, 0),
Line: "foo",
}
},
`{app="foo"}`,
),
newStream(
3,
func(i int64) logproto.Entry {
return logproto.Entry{
Timestamp: time.Unix(i, 0),
Line: "barr",
}
},
`{app="barr"}`,
),
},
logproto.BACKWARD,
),
extractBytes,
),
[]expectation{
{true, Sample{Labels: `{app="foo"}`, TimestampNano: 0, Value: 3}},
{true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(1, 0).UnixNano(), Value: 3}},
{true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(2, 0).UnixNano(), Value: 3}},
{true, Sample{Labels: `{app="barr"}`, TimestampNano: 0, Value: 4}},
{true, Sample{Labels: `{app="barr"}`, TimestampNano: time.Unix(1, 0).UnixNano(), Value: 4}},
{true, Sample{Labels: `{app="barr"}`, TimestampNano: time.Unix(2, 0).UnixNano(), Value: 4}},
{false, Sample{}},
},
},
{
"skip first",
newSeriesIterator(iter.NewStreamIterator(newStream(2, identity, `{app="foo"}`)), fakeSampler{}),
[]expectation{
{true, Sample{Labels: `{app="foo"}`, TimestampNano: time.Unix(1, 0).UnixNano(), Value: 10}},
{false, Sample{}},
},
},
} {
t.Run(test.name, func(t *testing.T) {
for _, e := range test.expectations {
sample, ok := test.it.Peek()
require.Equal(t, e.ok, ok)
if !e.ok {
continue
}
require.Equal(t, e.sample, sample)
test.it.Next()
}
require.NoError(t, test.it.Close())
})
}
}
// fakeSampler is a Sampler that returns no value for 0 timestamp otherwise always 10
type fakeSampler struct{}
func (fakeSampler) From(lbs string, entry logproto.Entry) (Sample, bool) {
if entry.Timestamp.UnixNano() == 0 {
return Sample{}, false
}
return Sample{
Labels: lbs,
TimestampNano: entry.Timestamp.UnixNano(),
Value: 10,
}, true
}

@ -279,9 +279,10 @@ func (m ShardMapper) mapVectorAggregationExpr(expr *vectorAggregationExpr, r *sh
func (m ShardMapper) mapRangeAggregationExpr(expr *rangeAggregationExpr, r *shardRecorder) SampleExpr {
switch expr.operation {
case OpTypeCountOverTime, OpTypeRate:
case OpRangeTypeCount, OpRangeTypeRate, OpRangeTypeBytesRate, OpRangeTypeBytes:
// count_over_time(x) -> count_over_time(x, shard=1) ++ count_over_time(x, shard=2)...
// rate(x) -> rate(x, shard=1) ++ rate(x, shard=2)...
// same goes for bytes_rate and bytes_over_time
return m.mapSampleExpr(expr, r)
default:
return expr
@ -323,8 +324,10 @@ var shardableOps = map[string]bool{
OpTypeCount: true,
// range vector ops
OpTypeCountOverTime: true,
OpTypeRate: true,
OpRangeTypeCount: true,
OpRangeTypeRate: true,
OpRangeTypeBytes: true,
OpRangeTypeBytesRate: true,
// binops - arith
OpTypeAdd: true,

@ -61,7 +61,7 @@ func TestMapSampleExpr(t *testing.T) {
}{
{
in: &rangeAggregationExpr{
operation: OpTypeRate,
operation: OpRangeTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
@ -78,7 +78,7 @@ func TestMapSampleExpr(t *testing.T) {
Of: 2,
},
SampleExpr: &rangeAggregationExpr{
operation: OpTypeRate,
operation: OpRangeTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
@ -96,7 +96,7 @@ func TestMapSampleExpr(t *testing.T) {
Of: 2,
},
SampleExpr: &rangeAggregationExpr{
operation: OpTypeRate,
operation: OpRangeTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
@ -254,7 +254,7 @@ func TestMapping(t *testing.T) {
Of: 2,
},
SampleExpr: &rangeAggregationExpr{
operation: OpTypeRate,
operation: OpRangeTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
@ -272,7 +272,7 @@ func TestMapping(t *testing.T) {
Of: 2,
},
SampleExpr: &rangeAggregationExpr{
operation: OpTypeRate,
operation: OpRangeTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
@ -296,7 +296,7 @@ func TestMapping(t *testing.T) {
Of: 2,
},
SampleExpr: &rangeAggregationExpr{
operation: OpTypeCountOverTime,
operation: OpRangeTypeCount,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
@ -314,7 +314,7 @@ func TestMapping(t *testing.T) {
Of: 2,
},
SampleExpr: &rangeAggregationExpr{
operation: OpTypeCountOverTime,
operation: OpRangeTypeCount,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
@ -344,7 +344,7 @@ func TestMapping(t *testing.T) {
grouping: &grouping{},
operation: OpTypeSum,
left: &rangeAggregationExpr{
operation: OpTypeRate,
operation: OpRangeTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
@ -366,7 +366,7 @@ func TestMapping(t *testing.T) {
grouping: &grouping{},
operation: OpTypeSum,
left: &rangeAggregationExpr{
operation: OpTypeRate,
operation: OpRangeTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
@ -396,7 +396,7 @@ func TestMapping(t *testing.T) {
Of: 2,
},
SampleExpr: &rangeAggregationExpr{
operation: OpTypeRate,
operation: OpRangeTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
@ -414,7 +414,7 @@ func TestMapping(t *testing.T) {
Of: 2,
},
SampleExpr: &rangeAggregationExpr{
operation: OpTypeRate,
operation: OpRangeTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
@ -445,7 +445,7 @@ func TestMapping(t *testing.T) {
Of: 2,
},
SampleExpr: &rangeAggregationExpr{
operation: OpTypeRate,
operation: OpRangeTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
@ -463,7 +463,7 @@ func TestMapping(t *testing.T) {
Of: 2,
},
SampleExpr: &rangeAggregationExpr{
operation: OpTypeRate,
operation: OpRangeTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
@ -494,7 +494,7 @@ func TestMapping(t *testing.T) {
grouping: &grouping{},
operation: OpTypeCount,
left: &rangeAggregationExpr{
operation: OpTypeRate,
operation: OpRangeTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
@ -516,7 +516,7 @@ func TestMapping(t *testing.T) {
grouping: &grouping{},
operation: OpTypeCount,
left: &rangeAggregationExpr{
operation: OpTypeRate,
operation: OpRangeTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
@ -550,7 +550,7 @@ func TestMapping(t *testing.T) {
grouping: &grouping{},
operation: OpTypeSum,
left: &rangeAggregationExpr{
operation: OpTypeRate,
operation: OpRangeTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
@ -572,7 +572,7 @@ func TestMapping(t *testing.T) {
grouping: &grouping{},
operation: OpTypeSum,
left: &rangeAggregationExpr{
operation: OpTypeRate,
operation: OpRangeTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
@ -601,7 +601,7 @@ func TestMapping(t *testing.T) {
grouping: &grouping{},
operation: OpTypeCount,
left: &rangeAggregationExpr{
operation: OpTypeRate,
operation: OpRangeTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
@ -623,7 +623,7 @@ func TestMapping(t *testing.T) {
grouping: &grouping{},
operation: OpTypeCount,
left: &rangeAggregationExpr{
operation: OpTypeRate,
operation: OpRangeTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
@ -663,7 +663,7 @@ func TestMapping(t *testing.T) {
},
operation: OpTypeSum,
left: &rangeAggregationExpr{
operation: OpTypeRate,
operation: OpRangeTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
@ -687,7 +687,7 @@ func TestMapping(t *testing.T) {
},
operation: OpTypeSum,
left: &rangeAggregationExpr{
operation: OpTypeRate,
operation: OpRangeTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
@ -721,7 +721,7 @@ func TestMapping(t *testing.T) {
Of: 2,
},
SampleExpr: &rangeAggregationExpr{
operation: OpTypeRate,
operation: OpRangeTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
@ -739,7 +739,7 @@ func TestMapping(t *testing.T) {
Of: 2,
},
SampleExpr: &rangeAggregationExpr{
operation: OpTypeRate,
operation: OpRangeTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
@ -775,7 +775,7 @@ func TestMapping(t *testing.T) {
grouping: &grouping{},
operation: OpTypeCount,
left: &rangeAggregationExpr{
operation: OpTypeRate,
operation: OpRangeTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
@ -797,7 +797,7 @@ func TestMapping(t *testing.T) {
grouping: &grouping{},
operation: OpTypeCount,
left: &rangeAggregationExpr{
operation: OpTypeRate,
operation: OpRangeTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
@ -839,7 +839,7 @@ func TestMapping(t *testing.T) {
},
operation: OpTypeSum,
left: &rangeAggregationExpr{
operation: OpTypeRate,
operation: OpRangeTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
@ -863,7 +863,7 @@ func TestMapping(t *testing.T) {
},
operation: OpTypeSum,
left: &rangeAggregationExpr{
operation: OpTypeRate,
operation: OpRangeTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
@ -893,7 +893,7 @@ func TestMapping(t *testing.T) {
grouping: &grouping{},
operation: OpTypeCount,
left: &rangeAggregationExpr{
operation: OpTypeRate,
operation: OpRangeTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{
@ -915,7 +915,7 @@ func TestMapping(t *testing.T) {
grouping: &grouping{},
operation: OpTypeCount,
left: &rangeAggregationExpr{
operation: OpTypeRate,
operation: OpRangeTypeRate,
left: &logRange{
left: &matchersExpr{
matchers: []*labels.Matcher{

@ -103,7 +103,7 @@ type MockDownstreamer struct {
func (m MockDownstreamer) Downstreamer() Downstreamer { return m }
func (d MockDownstreamer) Downstream(ctx context.Context, queries []DownstreamQuery) ([]Result, error) {
func (m MockDownstreamer) Downstream(ctx context.Context, queries []DownstreamQuery) ([]Result, error) {
results := make([]Result, 0, len(queries))
for _, query := range queries {
params := NewLiteralParams(
@ -116,7 +116,7 @@ func (d MockDownstreamer) Downstream(ctx context.Context, queries []DownstreamQu
query.Params.Limit(),
query.Shards.Encode(),
)
res, err := d.Query(params).Exec(ctx)
res, err := m.Query(params).Exec(ctx)
if err != nil {
return nil, err
}

@ -41,7 +41,7 @@ func Test_Config(t *testing.T) {
clientDefaultConfig,
Config{
URL: flagext.URLValue{
u,
URL: u,
},
BackoffConfig: util.BackoffConfig{
MaxBackoff: 5 * time.Minute,
@ -75,7 +75,7 @@ func Test_Config(t *testing.T) {
require.NoError(t, err)
if !reflect.DeepEqual(tc.expectedConfig, clientConfig) {
t.Errorf("Configs does not match, expected: %v, recieved: %v", tc.expectedConfig, clientConfig)
t.Errorf("Configs does not match, expected: %v, received: %v", tc.expectedConfig, clientConfig)
}
}
}

@ -7,6 +7,8 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/grafana/loki/pkg/logproto"
@ -14,7 +16,6 @@ import (
"github.com/grafana/loki/pkg/logql/stats"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/require"
)
func testSampleStreams() []queryrange.SampleStream {
@ -193,7 +194,7 @@ func TestDownstreamHandler(t *testing.T) {
ensureParallelism(t, in, in.parallelism)
}
// Consumes the locks in an instance, making sure they're all available. Does not replace them and thus instance is unusuable after. This is a cleanup test to ensure internal state
// Consumes the locks in an instance, making sure they're all available. Does not replace them and thus instance is unusable after. This is a cleanup test to ensure internal state
func ensureParallelism(t *testing.T, in *instance, n int) {
for i := 0; i < n; i++ {
select {

@ -10,10 +10,11 @@ import (
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/go-kit/kit/log"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/stretchr/testify/require"
)
var (

Loading…
Cancel
Save