Implement offset modifier for range vector aggregation in LogQL (#3455)

* Implement offset modifier for range vector aggregation in LogQL

implementation for https://github.com/grafana/loki/issues/2785, the offset modifier allows changing the time offset for range vectors in a query to support e.g. selective timeshift in Grafana

Signed-off-by: garrettlish <garrett.li.sh@gmail.com>

* add docs for offset modifier

* refine offset expr

* add offset unit test in engine.go

* remove duplicated offset shift

* add offset test in range vector test

Co-authored-by: garrettlish <garrettlish@163.com>
pull/3525/head
Garrett 5 years ago committed by GitHub
parent 1bec8c301c
commit f70c7ea0bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      docs/sources/logql/_index.md
  2. 29
      pkg/logql/ast.go
  3. 24
      pkg/logql/ast_test.go
  4. 38
      pkg/logql/engine_test.go
  5. 28
      pkg/logql/evaluator.go
  6. 44
      pkg/logql/expr.y
  7. 1059
      pkg/logql/expr.y.go
  8. 16
      pkg/logql/functions_test.go
  9. 1
      pkg/logql/lex.go
  10. 146
      pkg/logql/parser_test.go
  11. 21
      pkg/logql/range_vector.go
  12. 33
      pkg/logql/range_vector_test.go

@ -76,6 +76,15 @@ The same rules that apply for [Prometheus Label Selectors](https://prometheus.io
**Important note:** The `=~` regex operator is fully anchored, meaning regex must match against the *entire* string, including newlines. The regex `.` character does not match newlines by default. If you want the regex dot character to match newlines you can use the single-line flag, like so: `(?s)search_term.+` matches `search_term\n`.
#### Offset modifier
The offset modifier allows changing the time offset for individual range vectors in a query.
For example, the following expression counts all the logs within the last ten minutes to five minutes rather than last five minutes for the MySQL job. Note that the `offset` modifier always needs to follow the range vector selector immediately.
```logql
count_over_time({job="mysql"}[5m] offset 5m) // GOOD
count_over_time({job="mysql"}[5m]) offset 5m // INVALID
```
### Log Pipeline
A log pipeline can be appended to a log stream selector to further process and filter log streams. It usually is composed of one or multiple expressions, each expressions is executed in sequence for each log line. If an expression filters out a log line, the pipeline will stop at this point and start processing the next line.

@ -499,6 +499,7 @@ func newUnwrapExpr(id string, operation string) *unwrapExpr {
type logRange struct {
left LogSelectorExpr
interval time.Duration
offset time.Duration
unwrap *unwrapExpr
}
@ -511,16 +512,41 @@ func (r logRange) String() string {
sb.WriteString(r.unwrap.String())
}
sb.WriteString(fmt.Sprintf("[%v]", model.Duration(r.interval)))
if r.offset != 0 {
offsetExpr := offsetExpr{offset: r.offset}
sb.WriteString(offsetExpr.String())
}
return sb.String()
}
func (r *logRange) Shardable() bool { return r.left.Shardable() }
func newLogRange(left LogSelectorExpr, interval time.Duration, u *unwrapExpr) *logRange {
func newLogRange(left LogSelectorExpr, interval time.Duration, u *unwrapExpr, o *offsetExpr) *logRange {
var offset time.Duration
if o != nil {
offset = o.offset
}
return &logRange{
left: left,
interval: interval,
unwrap: u,
offset: offset,
}
}
type offsetExpr struct {
offset time.Duration
}
func (o *offsetExpr) String() string {
var sb strings.Builder
sb.WriteString(fmt.Sprintf(" %s %s", OpOffset, o.offset.String()))
return sb.String()
}
func newOffsetExpr(offset time.Duration) *offsetExpr {
return &offsetExpr{
offset: offset,
}
}
@ -582,6 +608,7 @@ const (
OpPipe = "|"
OpUnwrap = "unwrap"
OpOffset = "offset"
// conversion Op
OpConvBytes = "bytes"

@ -60,19 +60,27 @@ func Test_SampleExpr_String(t *testing.T) {
for _, tc := range []string{
`rate( ( {job="mysql"} |="error" !="timeout" ) [10s] )`,
`absent_over_time( ( {job="mysql"} |="error" !="timeout" ) [10s] )`,
`absent_over_time( ( {job="mysql"} |="error" !="timeout" ) [10s] offset 10m )`,
`sum without(a) ( rate ( ( {job="mysql"} |="error" !="timeout" ) [10s] ) )`,
`sum by(a) (rate( ( {job="mysql"} |="error" !="timeout" ) [10s] ) )`,
`sum(count_over_time({job="mysql"}[5m]))`,
`sum(count_over_time({job="mysql"}[5m] offset 10m))`,
`sum(count_over_time({job="mysql"} | json [5m]))`,
`sum(count_over_time({job="mysql"} | json [5m] offset 10m))`,
`sum(count_over_time({job="mysql"} | logfmt [5m]))`,
`sum(count_over_time({job="mysql"} | logfmt [5m] offset 10m))`,
`sum(count_over_time({job="mysql"} | unpack | json [5m]))`,
`sum(count_over_time({job="mysql"} | regexp "(?P<foo>foo|bar)" [5m]))`,
`sum(count_over_time({job="mysql"} | regexp "(?P<foo>foo|bar)" [5m] offset 10m))`,
`topk(10,sum(rate({region="us-east1"}[5m])) by (name))`,
`topk by (name)(10,sum(rate({region="us-east1"}[5m])))`,
`avg( rate( ( {job="nginx"} |= "GET" ) [10s] ) ) by (region)`,
`avg(min_over_time({job="nginx"} |= "GET" | unwrap foo[10s])) by (region)`,
`avg(min_over_time({job="nginx"} |= "GET" | unwrap foo[10s] offset 10m)) by (region)`,
`sum by (cluster) (count_over_time({job="mysql"}[5m]))`,
`sum by (cluster) (count_over_time({job="mysql"}[5m] offset 10m))`,
`sum by (cluster) (count_over_time({job="mysql"}[5m])) / sum by (cluster) (count_over_time({job="postgres"}[5m])) `,
`sum by (cluster) (count_over_time({job="mysql"}[5m] offset 10m)) / sum by (cluster) (count_over_time({job="postgres"}[5m] offset 10m)) `,
`
sum by (cluster) (count_over_time({job="postgres"}[5m])) /
sum by (cluster) (count_over_time({job="postgres"}[5m])) /
@ -86,6 +94,8 @@ func Test_SampleExpr_String(t *testing.T) {
)`,
`stdvar_over_time({app="foo"} |= "bar" | json | latency >= 250ms or ( status_code < 500 and status_code > 200)
| line_format "blip{{ .foo }}blop {{.status_code}}" | label_format foo=bar,status_code="buzz{{.bar}}" | unwrap foo [5m])`,
`stdvar_over_time({app="foo"} |= "bar" | json | latency >= 250ms or ( status_code < 500 and status_code > 200)
| line_format "blip{{ .foo }}blop {{.status_code}}" | label_format foo=bar,status_code="buzz{{.bar}}" | unwrap foo [5m] offset 10m)`,
`sum_over_time({namespace="tns"} |= "level=error" | json |foo>=5,bar<25ms|unwrap latency [5m])`,
`sum by (job) (
sum_over_time({namespace="tns"} |= "level=error" | json | foo=5 and bar<25ms | unwrap latency[5m])
@ -130,6 +140,20 @@ func Test_SampleExpr_String(t *testing.T) {
`10 / (5/2)`,
`10 / (count_over_time({job="postgres"}[5m])/2)`,
`{app="foo"} | json response_status="response.status.code", first_param="request.params[0]"`,
`label_replace(
sum by (job) (
sum_over_time(
{namespace="tns"} |= "level=error" | json | avg=5 and bar<25ms | unwrap duration(latency) | __error__!~".*" [5m] offset 1h
)
/
count_over_time({namespace="tns"} | logfmt | label_format foo=bar[5m] offset 1h)
),
"foo",
"$1",
"service",
"(.*):.*"
)
`,
} {
t.Run(tc, func(t *testing.T) {
expr, err := ParseExpr(tc)

@ -106,6 +106,16 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
},
promql.Vector{promql.Sample{Point: promql.Point{T: 60 * 1000, V: 6}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}},
},
{
`count_over_time({app="foo"} |~".+bar" [1m] offset 30s)`, time.Unix(90, 0), logproto.BACKWARD, 10,
[][]logproto.Series{
{newSeries(testSize, factor(10, identity), `{app="foo"}`)}, // 10 , 20 , 30 .. 60 = 6 total
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `count_over_time({app="foo"}|~".+bar"[1m] offset 30s)`}},
},
promql.Vector{promql.Sample{Point: promql.Point{T: 90 * 1000, V: 6}, Metric: labels.Labels{labels.Label{Name: "app", Value: "foo"}}}},
},
{
`count_over_time(({app="foo"} |~".+bar")[5m])`, time.Unix(5*60, 0), logproto.BACKWARD, 10,
[][]logproto.Series{
@ -237,6 +247,34 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
},
},
},
{
`sum(count_over_time({app=~"foo|bar"} |~".+bar" [1m] offset 30s)) by (namespace,app)`, time.Unix(90, 0), logproto.FORWARD, 100,
[][]logproto.Series{
{
newSeries(testSize, factor(10, identity), `{app="foo", namespace="a"}`),
newSeries(testSize, factor(10, identity), `{app="bar", namespace="b"}`),
},
},
[]SelectSampleParams{
{&logproto.SampleQueryRequest{Start: time.Unix(0, 0), End: time.Unix(60, 0), Selector: `sum by (namespace,app) (count_over_time({app=~"foo|bar"} |~".+bar" [1m] offset 30s)) `}},
},
promql.Vector{
promql.Sample{
Point: promql.Point{T: 90 * 1000, V: 6},
Metric: labels.Labels{
labels.Label{Name: "app", Value: "bar"},
labels.Label{Name: "namespace", Value: "b"},
},
},
promql.Sample{
Point: promql.Point{T: 90 * 1000, V: 6},
Metric: labels.Labels{
labels.Label{Name: "app", Value: "foo"},
labels.Label{Name: "namespace", Value: "a"},
},
},
},
},
{
`label_replace(
sum(count_over_time({app=~"foo|bar"} |~".+bar" [1m])) by (namespace,app),

@ -58,13 +58,12 @@ func NewLiteralParams(
// LiteralParams impls Params
type LiteralParams struct {
qs string
start, end time.Time
step time.Duration
interval time.Duration
direction logproto.Direction
limit uint32
shards []string
qs string
start, end time.Time
step, interval time.Duration
direction logproto.Direction
limit uint32
shards []string
}
func (p LiteralParams) Copy() LiteralParams { return p }
@ -175,8 +174,8 @@ func (ev *DefaultEvaluator) StepEvaluator(
nextEv = SampleEvaluatorFunc(func(ctx context.Context, nextEvaluator SampleEvaluator, expr SampleExpr, p Params) (StepEvaluator, error) {
it, err := ev.querier.SelectSamples(ctx, SelectSampleParams{
&logproto.SampleQueryRequest{
Start: q.Start().Add(-rangExpr.left.interval),
End: q.End(),
Start: q.Start().Add(-rangExpr.left.interval).Add(-rangExpr.left.offset),
End: q.End().Add(-rangExpr.left.offset),
Selector: e.String(), // intentionally send the the vector for reducing labels.
Shards: q.Shards(),
},
@ -184,15 +183,15 @@ func (ev *DefaultEvaluator) StepEvaluator(
if err != nil {
return nil, err
}
return rangeAggEvaluator(iter.NewPeekingSampleIterator(it), rangExpr, q)
return rangeAggEvaluator(iter.NewPeekingSampleIterator(it), rangExpr, q, rangExpr.left.offset)
})
}
return vectorAggEvaluator(ctx, nextEv, e, q)
case *rangeAggregationExpr:
it, err := ev.querier.SelectSamples(ctx, SelectSampleParams{
&logproto.SampleQueryRequest{
Start: q.Start().Add(-e.left.interval),
End: q.End(),
Start: q.Start().Add(-e.left.interval).Add(-e.left.offset),
End: q.End().Add(-e.left.offset),
Selector: expr.String(),
Shards: q.Shards(),
},
@ -200,7 +199,7 @@ func (ev *DefaultEvaluator) StepEvaluator(
if err != nil {
return nil, err
}
return rangeAggEvaluator(iter.NewPeekingSampleIterator(it), e, q)
return rangeAggEvaluator(iter.NewPeekingSampleIterator(it), e, q, e.left.offset)
case *binOpExpr:
return binOpStepEvaluator(ctx, nextEv, e, q)
case *labelReplaceExpr:
@ -407,6 +406,7 @@ func rangeAggEvaluator(
it iter.PeekingSampleIterator,
expr *rangeAggregationExpr,
q Params,
o time.Duration,
) (StepEvaluator, error) {
agg, err := expr.aggregator()
if err != nil {
@ -416,7 +416,7 @@ func rangeAggEvaluator(
it,
expr.left.interval.Nanoseconds(),
q.Step().Nanoseconds(),
q.Start().UnixNano(), q.End().UnixNano(),
q.Start().UnixNano(), q.End().UnixNano(), o.Nanoseconds(),
)
if expr.operation == OpRangeTypeAbsent {
return &absentRangeVectorEvaluator{

@ -50,6 +50,7 @@ import (
JSONExpression log.JSONExpression
JSONExpressionList []log.JSONExpression
UnwrapExpr *unwrapExpr
OffsetExpr *offsetExpr
}
%start root
@ -90,6 +91,7 @@ import (
%type <JSONExpressionList> jsonExpressionList
%type <UnwrapExpr> unwrapExpr
%type <UnitFilter> unitFilter
%type <OffsetExpr> offsetExpr
%token <bytes> BYTES
%token <str> IDENTIFIER STRING NUMBER
@ -98,7 +100,7 @@ import (
OPEN_PARENTHESIS CLOSE_PARENTHESIS BY WITHOUT COUNT_OVER_TIME RATE SUM AVG MAX MIN COUNT STDDEV STDVAR BOTTOMK TOPK
BYTES_OVER_TIME BYTES_RATE BOOL JSON REGEXP LOGFMT PIPE LINE_FMT LABEL_FMT UNWRAP AVG_OVER_TIME SUM_OVER_TIME MIN_OVER_TIME
MAX_OVER_TIME STDVAR_OVER_TIME STDDEV_OVER_TIME QUANTILE_OVER_TIME BYTES_CONV DURATION_CONV DURATION_SECONDS_CONV
ABSENT_OVER_TIME LABEL_REPLACE UNPACK
ABSENT_OVER_TIME LABEL_REPLACE UNPACK OFFSET
// Operators are listed with increasing precedence.
%left <binOp> OR
@ -133,19 +135,31 @@ logExpr:
;
logRangeExpr:
selector RANGE { $$ = newLogRange(newMatcherExpr($1), $2, nil) }
| OPEN_PARENTHESIS selector CLOSE_PARENTHESIS RANGE { $$ = newLogRange(newMatcherExpr($2), $4, nil) }
| selector RANGE unwrapExpr { $$ = newLogRange(newMatcherExpr($1), $2 , $3) }
| OPEN_PARENTHESIS selector CLOSE_PARENTHESIS RANGE unwrapExpr { $$ = newLogRange(newMatcherExpr($2), $4 , $5) }
| selector unwrapExpr RANGE { $$ = newLogRange(newMatcherExpr($1), $3, $2 ) }
| OPEN_PARENTHESIS selector unwrapExpr CLOSE_PARENTHESIS RANGE { $$ = newLogRange(newMatcherExpr($2), $5, $3 ) }
| selector pipelineExpr RANGE { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $2), $3, nil ) }
| OPEN_PARENTHESIS selector pipelineExpr CLOSE_PARENTHESIS RANGE { $$ = newLogRange(newPipelineExpr(newMatcherExpr($2), $3), $5, nil ) }
| selector pipelineExpr unwrapExpr RANGE { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $2), $4, $3) }
| OPEN_PARENTHESIS selector pipelineExpr unwrapExpr CLOSE_PARENTHESIS RANGE { $$ = newLogRange(newPipelineExpr(newMatcherExpr($2), $3), $6, $4) }
| selector RANGE pipelineExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $3), $2, nil) }
| selector RANGE pipelineExpr unwrapExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $3), $2, $4 ) }
| OPEN_PARENTHESIS logRangeExpr CLOSE_PARENTHESIS { $$ = $2 }
selector RANGE { $$ = newLogRange(newMatcherExpr($1), $2, nil, nil ) }
| selector RANGE offsetExpr { $$ = newLogRange(newMatcherExpr($1), $2, nil, $3 ) }
| OPEN_PARENTHESIS selector CLOSE_PARENTHESIS RANGE { $$ = newLogRange(newMatcherExpr($2), $4, nil, nil ) }
| OPEN_PARENTHESIS selector CLOSE_PARENTHESIS RANGE offsetExpr { $$ = newLogRange(newMatcherExpr($2), $4, nil, $5 ) }
| selector RANGE unwrapExpr { $$ = newLogRange(newMatcherExpr($1), $2, $3, nil ) }
| selector RANGE offsetExpr unwrapExpr { $$ = newLogRange(newMatcherExpr($1), $2, $4, $3 ) }
| OPEN_PARENTHESIS selector CLOSE_PARENTHESIS RANGE unwrapExpr { $$ = newLogRange(newMatcherExpr($2), $4, $5, nil ) }
| OPEN_PARENTHESIS selector CLOSE_PARENTHESIS RANGE offsetExpr unwrapExpr { $$ = newLogRange(newMatcherExpr($2), $4, $6, $5 ) }
| selector unwrapExpr RANGE { $$ = newLogRange(newMatcherExpr($1), $3, $2, nil ) }
| selector unwrapExpr RANGE offsetExpr { $$ = newLogRange(newMatcherExpr($1), $3, $2, $4 ) }
| OPEN_PARENTHESIS selector unwrapExpr CLOSE_PARENTHESIS RANGE { $$ = newLogRange(newMatcherExpr($2), $5, $3, nil ) }
| OPEN_PARENTHESIS selector unwrapExpr CLOSE_PARENTHESIS RANGE offsetExpr { $$ = newLogRange(newMatcherExpr($2), $5, $3, $6 ) }
| selector pipelineExpr RANGE { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $2), $3, nil, nil ) }
| selector pipelineExpr RANGE offsetExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $2), $3, nil, $4 ) }
| OPEN_PARENTHESIS selector pipelineExpr CLOSE_PARENTHESIS RANGE { $$ = newLogRange(newPipelineExpr(newMatcherExpr($2), $3), $5, nil, nil ) }
| OPEN_PARENTHESIS selector pipelineExpr CLOSE_PARENTHESIS RANGE offsetExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($2), $3), $5, nil, $6 ) }
| selector pipelineExpr unwrapExpr RANGE { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $2), $4, $3, nil ) }
| selector pipelineExpr unwrapExpr RANGE offsetExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $2), $4, $3, $5 ) }
| OPEN_PARENTHESIS selector pipelineExpr unwrapExpr CLOSE_PARENTHESIS RANGE { $$ = newLogRange(newPipelineExpr(newMatcherExpr($2), $3), $6, $4, nil ) }
| OPEN_PARENTHESIS selector pipelineExpr unwrapExpr CLOSE_PARENTHESIS RANGE offsetExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($2), $3), $6, $4, $7 ) }
| selector RANGE pipelineExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $3), $2, nil, nil) }
| selector RANGE offsetExpr pipelineExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $4), $2, nil, $3 ) }
| selector RANGE pipelineExpr unwrapExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $3), $2, $4, nil ) }
| selector RANGE offsetExpr pipelineExpr unwrapExpr { $$ = newLogRange(newPipelineExpr(newMatcherExpr($1), $4), $2, $5, $3 ) }
| OPEN_PARENTHESIS logRangeExpr CLOSE_PARENTHESIS { $$ = $2 }
| logRangeExpr error
;
@ -363,6 +377,8 @@ rangeOp:
| ABSENT_OVER_TIME { $$ = OpRangeTypeAbsent }
;
offsetExpr:
OFFSET DURATION { $$ = newOffsetExpr( $2 ) }
labels:
IDENTIFIER { $$ = []string{ $1 } }

File diff suppressed because it is too large Load Diff

@ -11,12 +11,14 @@ func Test_Extractor(t *testing.T) {
for _, tc := range []string{
`rate( ( {job="mysql"} |="error" !="timeout" ) [10s] )`,
`absent_over_time( ( {job="mysql"} |="error" !="timeout" ) [10s] )`,
`absent_over_time( ( {job="mysql"} |="error" !="timeout" ) [10s] offset 30s )`,
`sum without(a) ( rate ( ( {job="mysql"} |="error" !="timeout" ) [10s] ) )`,
`sum by(a) (rate( ( {job="mysql"} |="error" !="timeout" ) [10s] ) )`,
`sum(count_over_time({job="mysql"}[5m]))`,
`sum(count_over_time({job="mysql"} | json [5m]))`,
`sum(count_over_time({job="mysql"} | logfmt [5m]))`,
`sum(count_over_time({job="mysql"} | regexp "(?P<foo>foo|bar)" [5m]))`,
`sum(count_over_time({job="mysql"} | regexp "(?P<foo>foo|bar)" [5m] offset 1h))`,
`topk(10,sum(rate({region="us-east1"}[5m])) by (name))`,
`topk by (name)(10,sum(rate({region="us-east1"}[5m])))`,
`avg( rate( ( {job="nginx"} |= "GET" ) [10s] ) ) by (region)`,
@ -78,6 +80,20 @@ func Test_Extractor(t *testing.T) {
"(.*):.*"
)
`,
`label_replace(
sum by (job) (
sum_over_time(
{namespace="tns"} |= "level=error" | json | avg=5 and bar<25ms | unwrap duration(latency) | __error__!~".*" [5m] offset 1h
)
/
count_over_time({namespace="tns"} | logfmt | label_format foo=bar[5m] offset 1h)
),
"foo",
"$1",
"service",
"(.*):.*"
)
`,
} {
t.Run(tc, func(t *testing.T) {
expr, err := ParseSampleExpr(tc)

@ -33,6 +33,7 @@ var tokens = map[string]int{
"[": OPEN_BRACKET,
"]": CLOSE_BRACKET,
OpLabelReplace: LABEL_REPLACE,
OpOffset: OFFSET,
// binops
OpTypeOr: OR,

@ -1293,9 +1293,10 @@ func TestParse(t *testing.T) {
},
},
5*time.Minute,
nil),
nil, nil),
OpRangeTypeCount,
nil, nil,
nil,
nil,
),
},
{
@ -1359,7 +1360,8 @@ func TestParse(t *testing.T) {
},
},
5*time.Minute,
newUnwrapExpr("foo", "")),
newUnwrapExpr("foo", ""),
nil),
OpRangeTypeStdvar, nil, nil,
),
},
@ -1389,7 +1391,8 @@ func TestParse(t *testing.T) {
},
},
5*time.Minute,
newUnwrapExpr("foo", OpConvDuration)),
newUnwrapExpr("foo", OpConvDuration),
nil),
OpRangeTypeStdvar, nil, nil,
),
},
@ -1410,7 +1413,30 @@ func TestParse(t *testing.T) {
},
},
5*time.Minute,
newUnwrapExpr("foo", OpConvBytes)),
newUnwrapExpr("foo", OpConvBytes),
nil),
OpRangeTypeSum, nil, nil,
),
},
{
in: `sum_over_time({namespace="tns"} |= "level=error" | json |foo>=5,bar<25ms| unwrap bytes(foo) [5m] offset 5m)`,
exp: newRangeAggregationExpr(
newLogRange(&pipelineExpr{
left: newMatcherExpr([]*labels.Matcher{{Type: labels.MatchEqual, Name: "namespace", Value: "tns"}}),
pipeline: MultiStageExpr{
newLineFilterExpr(nil, labels.MatchEqual, "level=error"),
newLabelParserExpr(OpParserTypeJSON, ""),
&labelFilterExpr{
LabelFilterer: log.NewAndLabelFilter(
log.NewNumericLabelFilter(log.LabelFilterGreaterThanOrEqual, "foo", 5),
log.NewDurationLabelFilter(log.LabelFilterLesserThan, "bar", 25*time.Millisecond),
),
},
},
},
5*time.Minute,
newUnwrapExpr("foo", OpConvBytes),
newOffsetExpr(5*time.Minute)),
OpRangeTypeSum, nil, nil,
),
},
@ -1431,7 +1457,8 @@ func TestParse(t *testing.T) {
},
},
5*time.Minute,
newUnwrapExpr("latency", "")),
newUnwrapExpr("latency", ""),
nil),
OpRangeTypeSum, nil, nil,
),
},
@ -1452,7 +1479,8 @@ func TestParse(t *testing.T) {
},
},
5*time.Minute,
newUnwrapExpr("latency", "")),
newUnwrapExpr("latency", ""),
nil),
OpRangeTypeSum, nil, nil,
),
},
@ -1466,7 +1494,8 @@ func TestParse(t *testing.T) {
},
},
5*time.Minute,
newUnwrapExpr("bar", "")),
newUnwrapExpr("bar", ""),
nil),
OpRangeTypeStddev, nil, nil,
),
},
@ -1476,7 +1505,8 @@ func TestParse(t *testing.T) {
newLogRange(
newMatcherExpr([]*labels.Matcher{{Type: labels.MatchEqual, Name: "app", Value: "foo"}}),
5*time.Minute,
newUnwrapExpr("bar", "")),
newUnwrapExpr("bar", ""),
nil),
OpRangeTypeMin, nil, nil,
),
},
@ -1486,7 +1516,8 @@ func TestParse(t *testing.T) {
newLogRange(
newMatcherExpr([]*labels.Matcher{{Type: labels.MatchEqual, Name: "app", Value: "foo"}}),
5*time.Minute,
newUnwrapExpr("bar", "")),
newUnwrapExpr("bar", ""),
nil),
OpRangeTypeMin, &grouping{}, nil,
),
},
@ -1496,7 +1527,8 @@ func TestParse(t *testing.T) {
newLogRange(
newMatcherExpr([]*labels.Matcher{{Type: labels.MatchEqual, Name: "app", Value: "foo"}}),
5*time.Minute,
newUnwrapExpr("bar", "")),
newUnwrapExpr("bar", ""),
nil),
OpRangeTypeMax, &grouping{without: true}, nil,
),
},
@ -1506,7 +1538,19 @@ func TestParse(t *testing.T) {
newLogRange(
newMatcherExpr([]*labels.Matcher{{Type: labels.MatchEqual, Name: "app", Value: "foo"}}),
5*time.Minute,
newUnwrapExpr("bar", "")),
newUnwrapExpr("bar", ""),
nil),
OpRangeTypeMax, &grouping{without: true, groups: []string{"foo", "bar"}}, nil,
),
},
{
in: `max_over_time({app="foo"} | unwrap bar [5m] offset 5m) without (foo,bar)`,
exp: newRangeAggregationExpr(
newLogRange(
newMatcherExpr([]*labels.Matcher{{Type: labels.MatchEqual, Name: "app", Value: "foo"}}),
5*time.Minute,
newUnwrapExpr("bar", ""),
newOffsetExpr(5*time.Minute)),
OpRangeTypeMax, &grouping{without: true, groups: []string{"foo", "bar"}}, nil,
),
},
@ -1536,7 +1580,8 @@ func TestParse(t *testing.T) {
},
},
5*time.Minute,
newUnwrapExpr("foo", "")),
newUnwrapExpr("foo", ""),
nil),
OpRangeTypeMax, nil, nil,
),
},
@ -1566,7 +1611,8 @@ func TestParse(t *testing.T) {
},
},
5*time.Minute,
newUnwrapExpr("foo", "")),
newUnwrapExpr("foo", ""),
nil),
OpRangeTypeQuantile, nil, NewStringLabelFilter("0.99998"),
),
},
@ -1596,7 +1642,8 @@ func TestParse(t *testing.T) {
},
},
5*time.Minute,
newUnwrapExpr("foo", "")),
newUnwrapExpr("foo", ""),
nil),
OpRangeTypeQuantile, &grouping{without: false, groups: []string{"namespace", "instance"}}, NewStringLabelFilter("0.99998"),
),
},
@ -1626,7 +1673,8 @@ func TestParse(t *testing.T) {
},
},
5*time.Minute,
newUnwrapExpr("foo", "").addPostFilter(log.NewStringLabelFilter(mustNewMatcher(labels.MatchNotRegexp, log.ErrorLabel, ".+")))),
newUnwrapExpr("foo", "").addPostFilter(log.NewStringLabelFilter(mustNewMatcher(labels.MatchNotRegexp, log.ErrorLabel, ".+"))),
nil),
OpRangeTypeQuantile, &grouping{without: false, groups: []string{"namespace", "instance"}}, NewStringLabelFilter("0.99998"),
),
},
@ -1660,7 +1708,47 @@ func TestParse(t *testing.T) {
},
},
5*time.Minute,
newUnwrapExpr("foo", "")),
newUnwrapExpr("foo", ""),
nil),
OpRangeTypeQuantile, &grouping{without: false, groups: []string{"namespace", "instance"}}, NewStringLabelFilter("0.99998"),
),
OpTypeSum,
&grouping{without: true, groups: []string{"foo"}},
nil,
),
},
{
in: `sum without (foo) (
quantile_over_time(0.99998,{app="foo"} |= "bar" | json | latency >= 250ms or ( status_code < 500 and status_code > 200)
| line_format "blip{{ .foo }}blop {{.status_code}}" | label_format foo=bar,status_code="buzz{{.bar}}" | unwrap foo [5m] offset 5m
) by (namespace,instance)
)`,
exp: mustNewVectorAggregationExpr(
newRangeAggregationExpr(
newLogRange(&pipelineExpr{
left: newMatcherExpr([]*labels.Matcher{{Type: labels.MatchEqual, Name: "app", Value: "foo"}}),
pipeline: MultiStageExpr{
newLineFilterExpr(nil, labels.MatchEqual, "bar"),
newLabelParserExpr(OpParserTypeJSON, ""),
&labelFilterExpr{
LabelFilterer: log.NewOrLabelFilter(
log.NewDurationLabelFilter(log.LabelFilterGreaterThanOrEqual, "latency", 250*time.Millisecond),
log.NewAndLabelFilter(
log.NewNumericLabelFilter(log.LabelFilterLesserThan, "status_code", 500.0),
log.NewNumericLabelFilter(log.LabelFilterGreaterThan, "status_code", 200.0),
),
),
},
newLineFmtExpr("blip{{ .foo }}blop {{.status_code}}"),
newLabelFmtExpr([]log.LabelFmt{
log.NewRenameLabelFmt("foo", "bar"),
log.NewTemplateLabelFmt("status_code", "buzz{{.bar}}"),
}),
},
},
5*time.Minute,
newUnwrapExpr("foo", ""),
newOffsetExpr(5*time.Minute)),
OpRangeTypeQuantile, &grouping{without: false, groups: []string{"namespace", "instance"}}, NewStringLabelFilter("0.99998"),
),
OpTypeSum,
@ -1698,7 +1786,8 @@ func TestParse(t *testing.T) {
},
},
5*time.Minute,
newUnwrapExpr("foo", OpConvDuration)),
newUnwrapExpr("foo", OpConvDuration),
nil),
OpRangeTypeQuantile, &grouping{without: false, groups: []string{"namespace", "instance"}}, NewStringLabelFilter("0.99998"),
),
OpTypeSum,
@ -1736,7 +1825,8 @@ func TestParse(t *testing.T) {
},
},
5*time.Minute,
newUnwrapExpr("foo", OpConvDuration)),
newUnwrapExpr("foo", OpConvDuration),
nil),
OpRangeTypeQuantile, &grouping{without: false, groups: []string{"namespace", "instance"}}, NewStringLabelFilter(".99998"),
),
OpTypeSum,
@ -1774,7 +1864,8 @@ func TestParse(t *testing.T) {
},
},
5*time.Minute,
newUnwrapExpr("foo", OpConvDurationSeconds)),
newUnwrapExpr("foo", OpConvDurationSeconds),
nil),
OpRangeTypeQuantile, &grouping{without: false, groups: []string{"namespace", "instance"}}, NewStringLabelFilter(".99998"),
),
OpTypeSum,
@ -1812,7 +1903,8 @@ func TestParse(t *testing.T) {
},
},
5*time.Minute,
newUnwrapExpr("foo", "")),
newUnwrapExpr("foo", ""),
nil),
OpRangeTypeQuantile, &grouping{without: false, groups: []string{"namespace", "instance"}}, NewStringLabelFilter("0.99998"),
),
OpTypeTopK,
@ -1859,7 +1951,8 @@ func TestParse(t *testing.T) {
},
},
5*time.Minute,
newUnwrapExpr("foo", "")),
newUnwrapExpr("foo", ""),
nil),
OpRangeTypeQuantile, &grouping{without: false, groups: []string{"namespace", "instance"}}, NewStringLabelFilter("0.99998"),
),
OpTypeSum,
@ -1890,7 +1983,8 @@ func TestParse(t *testing.T) {
},
},
5*time.Minute,
newUnwrapExpr("foo", "")),
newUnwrapExpr("foo", ""),
nil),
OpRangeTypeAvg, &grouping{without: false, groups: []string{"namespace", "instance"}}, nil,
),
OpTypeAvg,
@ -1944,7 +2038,8 @@ func TestParse(t *testing.T) {
},
},
5*time.Minute,
newUnwrapExpr("foo", "")),
newUnwrapExpr("foo", ""),
nil),
OpRangeTypeQuantile, &grouping{without: false, groups: []string{"namespace", "instance"}}, NewStringLabelFilter("0.99998"),
),
OpTypeSum,
@ -1975,7 +2070,8 @@ func TestParse(t *testing.T) {
},
},
5*time.Minute,
newUnwrapExpr("foo", "")),
newUnwrapExpr("foo", ""),
nil),
OpRangeTypeAvg, &grouping{without: false, groups: []string{"namespace", "instance"}}, nil,
),
OpTypeAvg,

@ -25,26 +25,31 @@ type RangeVectorIterator interface {
}
type rangeVectorIterator struct {
iter iter.PeekingSampleIterator
selRange, step, end, current int64
window map[string]*promql.Series
metrics map[string]labels.Labels
at []promql.Sample
iter iter.PeekingSampleIterator
selRange, step, end, current, offset int64
window map[string]*promql.Series
metrics map[string]labels.Labels
at []promql.Sample
}
func newRangeVectorIterator(
it iter.PeekingSampleIterator,
selRange, step, start, end int64) *rangeVectorIterator {
selRange, step, start, end, offset int64) *rangeVectorIterator {
// forces at least one step.
if step == 0 {
step = 1
}
if offset != 0 {
start = start - offset
end = end - offset
}
return &rangeVectorIterator{
iter: it,
step: step,
end: end,
selRange: selRange,
current: start - step, // first loop iteration will set it to start
offset: offset,
window: map[string]*promql.Series{},
metrics: map[string]labels.Labels{},
}
@ -57,7 +62,7 @@ func (r *rangeVectorIterator) Next() bool {
return false
}
rangeEnd := r.current
rangeStart := r.current - r.selRange
rangeStart := rangeEnd - r.selRange
// load samples
r.popBack(rangeStart)
r.load(rangeStart, rangeEnd)
@ -144,7 +149,7 @@ func (r *rangeVectorIterator) At(aggregator RangeVectorAggregator) (int64, promq
}
r.at = r.at[:0]
// convert ts from nano to milli seconds as the iterator work with nanoseconds
ts := r.current / 1e+6
ts := r.current/1e+6 + r.offset/1e+6
for _, series := range r.window {
r.at = append(r.at, promql.Sample{
Point: promql.Point{

@ -58,6 +58,7 @@ func Test_RangeVectorIterator(t *testing.T) {
tests := []struct {
selRange int64
step int64
offset int64
expectedVectors []promql.Vector
expectedTs []time.Time
start, end time.Time
@ -65,6 +66,7 @@ func Test_RangeVectorIterator(t *testing.T) {
{
(5 * time.Second).Nanoseconds(), // no overlap
(30 * time.Second).Nanoseconds(),
0,
[]promql.Vector{
[]promql.Sample{
{Point: newPoint(time.Unix(10, 0), 2), Metric: labelBar},
@ -86,6 +88,7 @@ func Test_RangeVectorIterator(t *testing.T) {
{
(35 * time.Second).Nanoseconds(), // will overlap by 5 sec
(30 * time.Second).Nanoseconds(),
0,
[]promql.Vector{
[]promql.Sample{
{Point: newPoint(time.Unix(10, 0), 4), Metric: labelBar},
@ -110,6 +113,7 @@ func Test_RangeVectorIterator(t *testing.T) {
{
(30 * time.Second).Nanoseconds(), // same range
(30 * time.Second).Nanoseconds(),
0,
[]promql.Vector{
[]promql.Sample{
{Point: newPoint(time.Unix(10, 0), 4), Metric: labelBar},
@ -131,6 +135,7 @@ func Test_RangeVectorIterator(t *testing.T) {
{
(50 * time.Second).Nanoseconds(), // all step are overlapping
(10 * time.Second).Nanoseconds(),
0,
[]promql.Vector{
[]promql.Sample{
{Point: newPoint(time.Unix(110, 0), 2), Metric: labelBar},
@ -144,14 +149,36 @@ func Test_RangeVectorIterator(t *testing.T) {
[]time.Time{time.Unix(110, 0), time.Unix(120, 0)},
time.Unix(110, 0), time.Unix(120, 0),
},
{
(5 * time.Second).Nanoseconds(), // no overlap
(30 * time.Second).Nanoseconds(),
(10 * time.Second).Nanoseconds(),
[]promql.Vector{
[]promql.Sample{
{Point: newPoint(time.Unix(20, 0), 2), Metric: labelBar},
{Point: newPoint(time.Unix(20, 0), 2), Metric: labelFoo},
},
[]promql.Sample{
{Point: newPoint(time.Unix(50, 0), 2), Metric: labelBar},
{Point: newPoint(time.Unix(50, 0), 2), Metric: labelFoo},
},
{},
[]promql.Sample{
{Point: newPoint(time.Unix(110, 0), 1), Metric: labelBar},
{Point: newPoint(time.Unix(110, 0), 1), Metric: labelFoo},
},
},
[]time.Time{time.Unix(20, 0), time.Unix(50, 0), time.Unix(80, 0), time.Unix(110, 0)},
time.Unix(20, 0), time.Unix(110, 0),
},
}
for _, tt := range tests {
t.Run(
fmt.Sprintf("logs[%s] - step: %s", time.Duration(tt.selRange), time.Duration(tt.step)),
fmt.Sprintf("logs[%s] - step: %s - offset: %s", time.Duration(tt.selRange), time.Duration(tt.step), time.Duration(tt.offset)),
func(t *testing.T) {
it := newRangeVectorIterator(newfakePeekingSampleIterator(), tt.selRange,
tt.step, tt.start.UnixNano(), tt.end.UnixNano())
tt.step, tt.start.UnixNano(), tt.end.UnixNano(), tt.offset)
i := 0
for it.Next() {
@ -173,7 +200,7 @@ func Test_RangeVectorIteratorBadLabels(t *testing.T) {
Samples: samples,
}))
it := newRangeVectorIterator(badIterator, (30 * time.Second).Nanoseconds(),
(30 * time.Second).Nanoseconds(), time.Unix(10, 0).UnixNano(), time.Unix(100, 0).UnixNano())
(30 * time.Second).Nanoseconds(), time.Unix(10, 0).UnixNano(), time.Unix(100, 0).UnixNano(), 0)
ctx, cancel := context.WithCancel(context.Background())
go func() {
defer cancel()

Loading…
Cancel
Save