fix: Adjust with offset in last, first and quantile over time queries. (#15915)

**What this PR does / why we need it**:
The query sharding for `last_over_time`, `first_over_time` and `quantile_over_time` would not adjust start and end by the passed `offset` for the underlying range query.

This change introduces tests and fixes the adjustment.
pull/15984/head
Karsten Jeschkies 1 year ago committed by GitHub
parent 57c27d9408
commit 83339cbde0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 7
      pkg/logql/downstream.go
  2. 14
      pkg/logql/downstream_test.go
  3. 29
      pkg/logql/first_last_over_time.go
  4. 28
      pkg/logql/quantile_over_time_sketch.go
  5. 4
      pkg/logql/range_vector_test.go
  6. 2
      pkg/logql/shardmapper.go

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"strings"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
@ -304,6 +305,7 @@ func (e *QuantileSketchMergeExpr) Walk(f syntax.WalkFn) {
type MergeFirstOverTimeExpr struct {
syntax.SampleExpr
downstreams []DownstreamSampleExpr
offset time.Duration
}
func (e MergeFirstOverTimeExpr) String() string {
@ -332,6 +334,7 @@ func (e *MergeFirstOverTimeExpr) Walk(f syntax.WalkFn) {
type MergeLastOverTimeExpr struct {
syntax.SampleExpr
downstreams []DownstreamSampleExpr
offset time.Duration
}
func (e MergeLastOverTimeExpr) String() string {
@ -590,7 +593,7 @@ func (ev *DownstreamEvaluator) NewStepEvaluator(
}
}
return NewMergeFirstOverTimeStepEvaluator(params, xs), nil
return NewMergeFirstOverTimeStepEvaluator(params, xs, e.offset), nil
case *MergeLastOverTimeExpr:
queries := make([]DownstreamQuery, len(e.downstreams))
@ -625,7 +628,7 @@ func (ev *DownstreamEvaluator) NewStepEvaluator(
return nil, fmt.Errorf("unexpected type (%s) uncoercible to StepEvaluator", data.Type())
}
}
return NewMergeLastOverTimeStepEvaluator(params, xs), nil
return NewMergeLastOverTimeStepEvaluator(params, xs, e.offset), nil
case *CountMinSketchEvalExpr:
queries := make([]DownstreamQuery, len(e.downstreams))

@ -64,6 +64,7 @@ func TestMappingEquivalence(t *testing.T) {
{`avg_over_time({a=~".+"} | logfmt | drop level | unwrap value [1s])`, true, nil},
{`avg_over_time({a=~".+"} | logfmt | drop level | unwrap value [1s]) without (stream)`, true, nil},
{`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s])`, true, []string{ShardQuantileOverTime}},
{`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s] offset 2s)`, true, []string{ShardQuantileOverTime}},
{
`
(quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s]) by (a) > 1)
@ -75,8 +76,12 @@ func TestMappingEquivalence(t *testing.T) {
},
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false, []string{ShardFirstOverTime}},
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false, []string{ShardFirstOverTime}},
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s] offset 2s) by (a)`, false, []string{ShardFirstOverTime}},
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s] offset -2s) by (a)`, false, []string{ShardFirstOverTime}},
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false, []string{ShardLastOverTime}},
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false, []string{ShardLastOverTime}},
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s] offset 2s) by (a)`, false, []string{ShardLastOverTime}},
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s] offset -2s) by (a)`, false, []string{ShardLastOverTime}},
// topk prefers already-seen values in tiebreakers. Since the test data generates
// the same log lines for each series & the resulting promql.Vectors aren't deterministically
// sorted by labels, we don't expect this to pass.
@ -190,6 +195,7 @@ func TestMappingEquivalenceSketches(t *testing.T) {
}{
{`quantile_over_time(0.70, {a=~".+"} | logfmt | unwrap value [1s]) by (a)`, 0.05},
{`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s]) by (a)`, 0.02},
{`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s] offset 2s) by (a)`, 0.02},
} {
q := NewMockQuerier(
shards,
@ -241,8 +247,8 @@ func TestMappingEquivalenceSketches(t *testing.T) {
// plus set step and interval to 0
params, err := NewLiteralParams(
tc.query,
time.Unix(1, 0),
time.Unix(1, 0),
time.Unix(10, 0),
time.Unix(10, 0),
0,
0,
logproto.FORWARD,
@ -294,7 +300,7 @@ func TestApproxTopkSketches(t *testing.T) {
shardedQuery string
regularQuery string
realtiveError float64
//cardinalityEstimate int
// cardinalityEstimate int
}{
// Note:our data generation results in less spread between topk things for 10k streams than for 100k streams
// if we have 1k streams, we can get much more accurate results for topk 10 than topk 100
@ -304,7 +310,7 @@ func TestApproxTopkSketches(t *testing.T) {
shardedQuery: `approx_topk(3, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`,
regularQuery: `topk(3, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`,
realtiveError: 0.0012,
//cardinalityEstimate: 3,
// cardinalityEstimate: 3,
},
{
labelShards: 10,

@ -16,6 +16,15 @@ func newFirstWithTimestampIterator(
it iter.PeekingSampleIterator,
selRange, step, start, end, offset int64,
) RangeVectorIterator {
// forces at least one step.
if step == 0 {
step = 1
}
if offset != 0 {
start = start - offset
end = end - offset
}
inner := &batchRangeVectorIterator{
iter: it,
step: step,
@ -70,6 +79,15 @@ func newLastWithTimestampIterator(
it iter.PeekingSampleIterator,
selRange, step, start, end, offset int64,
) RangeVectorIterator {
// forces at least one step.
if step == 0 {
step = 1
}
if offset != 0 {
start = start - offset
end = end - offset
}
inner := &batchRangeVectorIterator{
iter: it,
step: step,
@ -127,6 +145,7 @@ type mergeOverTimeStepEvaluator struct {
step time.Duration
matrices []promql.Matrix
merge func(promql.Vector, int, int, promql.Series) promql.Vector
offset time.Duration
}
// Next returns the first or last element within one step of each matrix.
@ -170,6 +189,10 @@ func (e *mergeOverTimeStepEvaluator) pop(r, s int) {
// inRange returns true if t is in step range of ts.
func (e *mergeOverTimeStepEvaluator) inRange(t, ts int64) bool {
// The time stamp needs to be adjusted because the original datapoint at t is
// from a shifted query.
ts -= e.offset.Milliseconds()
// special case instant queries
if e.step.Milliseconds() == 0 {
return true
@ -181,7 +204,7 @@ func (*mergeOverTimeStepEvaluator) Close() error { return nil }
func (*mergeOverTimeStepEvaluator) Error() error { return nil }
func NewMergeFirstOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEvaluator {
func NewMergeFirstOverTimeStepEvaluator(params Params, m []promql.Matrix, offset time.Duration) StepEvaluator {
if len(m) == 0 {
return EmptyEvaluator[SampleVector]{}
}
@ -199,6 +222,7 @@ func NewMergeFirstOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEv
step: step,
matrices: m,
merge: mergeFirstOverTime,
offset: offset,
}
}
@ -218,7 +242,7 @@ func mergeFirstOverTime(vec promql.Vector, pos int, nSeries int, series promql.S
return vec
}
func NewMergeLastOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEvaluator {
func NewMergeLastOverTimeStepEvaluator(params Params, m []promql.Matrix, offset time.Duration) StepEvaluator {
if len(m) == 0 {
return EmptyEvaluator[SampleVector]{}
}
@ -236,6 +260,7 @@ func NewMergeLastOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEva
step: step,
matrices: m,
merge: mergeLastOverTime,
offset: offset,
}
}

@ -185,6 +185,15 @@ func newQuantileSketchIterator(
it iter.PeekingSampleIterator,
selRange, step, start, end, offset int64,
) RangeVectorIterator {
// forces at least one step.
if step == 0 {
step = 1
}
if offset != 0 {
start = start - offset
end = end - offset
}
inner := &batchRangeVectorIterator{
iter: it,
step: step,
@ -302,23 +311,20 @@ func JoinQuantileSketchVector(next bool, r StepResult, stepEvaluator StepEvaluat
// QuantileSketchMatrixStepEvaluator steps through a matrix of quantile sketch
// vectors, ie t-digest or DDSketch structures per time step.
type QuantileSketchMatrixStepEvaluator struct {
start, end, ts time.Time
step time.Duration
m ProbabilisticQuantileMatrix
end, ts time.Time
step time.Duration
m ProbabilisticQuantileMatrix
}
func NewQuantileSketchMatrixStepEvaluator(m ProbabilisticQuantileMatrix, params Params) *QuantileSketchMatrixStepEvaluator {
var (
start = params.Start()
end = params.End()
step = params.Step()
step = params.Step()
)
return &QuantileSketchMatrixStepEvaluator{
start: start,
end: end,
ts: start.Add(-step), // will be corrected on first Next() call
step: step,
m: m,
end: params.End(),
ts: params.Start().Add(-step), // will be corrected on first Next() call
step: step,
m: m,
}
}

@ -67,7 +67,6 @@ func newPoint(t time.Time, v float64) promql.FPoint {
}
func Benchmark_RangeVectorIteratorCompare(b *testing.B) {
// no overlap test case.
buildStreamingIt := func() (RangeVectorIterator, error) {
tt := struct {
@ -183,7 +182,6 @@ func Benchmark_RangeVectorIteratorCompare(b *testing.B) {
}
}
})
}
func Benchmark_RangeVectorIterator(b *testing.B) {
@ -214,7 +212,6 @@ func Benchmark_RangeVectorIterator(b *testing.B) {
i++
}
}
}
func Test_RangeVectorIterator_InstantQuery(t *testing.T) {
@ -445,6 +442,7 @@ func Test_RangeVectorIterator(t *testing.T) {
time.Unix(110, 0), time.Unix(120, 0),
},
{
// TODO: use this test case
(5 * time.Second).Nanoseconds(), // no overlap
(30 * time.Second).Nanoseconds(),
(10 * time.Second).Nanoseconds(),

@ -593,6 +593,7 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,
return &MergeFirstOverTimeExpr{
downstreams: downstreams,
offset: expr.Left.Offset,
}, bytesPerShard, nil
case syntax.OpRangeTypeLast:
if !m.lastOverTimeSharding {
@ -623,6 +624,7 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,
return &MergeLastOverTimeExpr{
downstreams: downstreams,
offset: expr.Left.Offset,
}, bytesPerShard, nil
default:
// don't shard if there's not an appropriate optimization

Loading…
Cancel
Save