From e0ca67dd4563e41c57b2f1409ef235b76b2a1a6e Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Thu, 11 Jul 2024 10:06:28 +0200 Subject: [PATCH] fix(query engine): Include lines with ts equal to end timestamp of the query range when executing range aggregations (#13448) **Background** When performing range vector aggregations, such as `count_over_time({env="dev"}[1h])`, the query range is divided into multiple steps at which the aggregation operation (e.g. counting the log lines) is evaluated. Each step starts at `current step - step interval` and ends at `current step`, as depicted in the following chart. The select range for the logs is extended by the `step interval` into the past, in order to select logs for calculating the first step. ![screenshot_20240711_092352](https://github.com/grafana/loki/assets/281260/9ca6eaf5-148e-4743-aefa-6ff7071d64ad) However, the select range for logs is `start` inclusive and `end` exclusive (written as `[start, end)`), but the evaluation of the steps for the range aggregation is `start` exclusive and `end` inclusive (written as `(start, end]`). This leads to the problem that the very first timestamp at the beginning of the select range and the very last timestamp at the end of the select range are not included in the range aggregation. The "missing" last timestamp is not a problem, because a) in an instant query it is not supposed to be included anyway because of the `[start, end)` inclusivity of the query range and b) in a range query the last point of the previous step will be part of the next step evaluation. **Issue** The missing first timestamp, however, gets problematic when executing an instant query and the log timestamps are exactly at the start of the query range. This can happen when the query is split in the query frontend into multiple smaller time ranges, e.g. `1h`, `30m`, ... Since the sub queries are executed independently on the queriers, all logs that have a timestamp exactly a multiple of the split interval, e.g. 00:00, 01:00, 02:00, ... for a 1h interval, are dismissed and therefore missing in the query result over the full time range of the original query. **Fix** In order to avoid the missing logs that have a timestamp a multiple of the split interval in instant queries, we need to adjust the query range for logs to also include the `end` timestamp (written as `[start, end]`). This is done by adding a "leap nanosecond" to the `end` timestamp of the log select range. This ensures that the included `end` timestamp of the step evaluation is also included in the log selection. --- Signed-off-by: Christian Haudum --- pkg/logql/engine_test.go | 28 ++++--------------- pkg/logql/evaluator.go | 18 ++++++++---- .../shipper/indexshipper/tsdb/bounds.go | 13 ++++++--- .../shipper/indexshipper/tsdb/bounds_test.go | 6 ++++ .../indexshipper/tsdb/multi_file_index.go | 2 +- 5 files changed, 34 insertions(+), 33 deletions(-) diff --git a/pkg/logql/engine_test.go b/pkg/logql/engine_test.go index 274276a02c..b409422c1c 100644 --- a/pkg/logql/engine_test.go +++ b/pkg/logql/engine_test.go @@ -167,7 +167,7 @@ func TestEngine_LogsRateUnwrap(t *testing.T) { } } -func TestEngine_LogsInstantQuery(t *testing.T) { +func TestEngine_InstantQuery(t *testing.T) { t.Parallel() for _, test := range []struct { qs string @@ -182,26 +182,6 @@ func TestEngine_LogsInstantQuery(t *testing.T) { expected interface{} }{ - { - `{app="foo"}`, time.Unix(30, 0), logproto.FORWARD, 10, - [][]logproto.Stream{ - {newStream(testSize, identity, `{app="foo"}`)}, - }, - []SelectLogParams{ - {&logproto.QueryRequest{Direction: logproto.FORWARD, Start: time.Unix(0, 0), End: time.Unix(30, 0), Limit: 10, Selector: `{app="foo"}`}}, - }, - logqlmodel.Streams([]logproto.Stream{newStream(10, identity, `{app="foo"}`)}), - }, - { - `{app="bar"} |= "foo" |~ ".+bar"`, time.Unix(30, 0), logproto.BACKWARD, 30, - [][]logproto.Stream{ - {newStream(testSize, identity, `{app="bar"}`)}, - }, - []SelectLogParams{ - {&logproto.QueryRequest{Direction: logproto.BACKWARD, Start: time.Unix(0, 0), End: time.Unix(30, 0), Limit: 30, Selector: `{app="bar"}|="foo"|~".+bar"`}}, - }, - logqlmodel.Streams([]logproto.Stream{newStream(30, identity, `{app="bar"}`)}), - }, { `rate({app="foo"} |~".+bar" [1m])`, time.Unix(60, 0), logproto.BACKWARD, 10, [][]logproto.Series{ @@ -975,7 +955,6 @@ func TestEngine_LogsInstantQuery(t *testing.T) { } { test := test t.Run(fmt.Sprintf("%s %s", test.qs, test.direction), func(t *testing.T) { - t.Parallel() eng := NewEngine(EngineOpts{}, newQuerierRecorder(t, test.data, test.params), NoLimits, log.NewNopLogger()) @@ -2755,6 +2734,11 @@ func (q *querierRecorder) SelectSamples(_ context.Context, p SelectSampleParams) } func paramsID(p interface{}) string { + switch params := p.(type) { + case SelectLogParams: + case SelectSampleParams: + return fmt.Sprintf("%d", params.Plan.Hash()) + } b, err := json.Marshal(p) if err != nil { panic(err) diff --git a/pkg/logql/evaluator.go b/pkg/logql/evaluator.go index cdf05829c2..1216efedd7 100644 --- a/pkg/logql/evaluator.go +++ b/pkg/logql/evaluator.go @@ -331,9 +331,12 @@ func (ev *DefaultEvaluator) NewStepEvaluator( nextEvFactory = SampleEvaluatorFunc(func(ctx context.Context, _ SampleEvaluatorFactory, _ syntax.SampleExpr, _ Params) (StepEvaluator, error) { it, err := ev.querier.SelectSamples(ctx, SelectSampleParams{ &logproto.SampleQueryRequest{ - Start: q.Start().Add(-rangExpr.Left.Interval).Add(-rangExpr.Left.Offset), - End: q.End().Add(-rangExpr.Left.Offset), - Selector: e.String(), // intentionally send the vector for reducing labels. + // extend startTs backwards by step + Start: q.Start().Add(-rangExpr.Left.Interval).Add(-rangExpr.Left.Offset), + // add leap nanosecond to endTs to include lines exactly at endTs. range iterators work on start exclusive, end inclusive ranges + End: q.End().Add(-rangExpr.Left.Offset).Add(time.Nanosecond), + // intentionally send the vector for reducing labels. + Selector: e.String(), Shards: q.Shards(), Plan: &plan.QueryPlan{ AST: expr, @@ -351,9 +354,12 @@ func (ev *DefaultEvaluator) NewStepEvaluator( case *syntax.RangeAggregationExpr: it, err := ev.querier.SelectSamples(ctx, SelectSampleParams{ &logproto.SampleQueryRequest{ - Start: q.Start().Add(-e.Left.Interval).Add(-e.Left.Offset), - End: q.End().Add(-e.Left.Offset), - Selector: expr.String(), + // extend startTs backwards by step + Start: q.Start().Add(-e.Left.Interval).Add(-e.Left.Offset), + // add leap nanosecond to endTs to include lines exactly at endTs. range iterators work on start exclusive, end inclusive ranges + End: q.End().Add(-e.Left.Offset).Add(time.Nanosecond), + // intentionally send the vector for reducing labels. + Selector: e.String(), Shards: q.Shards(), Plan: &plan.QueryPlan{ AST: expr, diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/bounds.go b/pkg/storage/stores/shipper/indexshipper/tsdb/bounds.go index f3a9cd6921..1b4423a476 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/bounds.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/bounds.go @@ -6,6 +6,7 @@ import ( "github.com/prometheus/common/model" ) +// TODO(chaudum): Replace with new v1.Interval struct type Bounded interface { Bounds() (model.Time, model.Time) } @@ -34,9 +35,13 @@ func newBounds(mint, maxt model.Time) bounds { return bounds{mint: mint, maxt: m func (b bounds) Bounds() (model.Time, model.Time) { return b.mint, b.maxt } -func Overlap(a, b Bounded) bool { - aFrom, aThrough := a.Bounds() - bFrom, bThrough := b.Bounds() +// Overlap checks whether the given chunk or index bounds +// overlap with the bounds of a query range. +// chunk/index bounds are defined as [from, through] +// query bounds are defined as [from, through) +func Overlap(chk, qry Bounded) bool { + chkFrom, chkThrough := chk.Bounds() + qryFrom, qryThrough := qry.Bounds() - return aFrom < bThrough && aThrough > bFrom + return chkFrom < qryThrough && chkThrough >= qryFrom } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/bounds_test.go b/pkg/storage/stores/shipper/indexshipper/tsdb/bounds_test.go index c9222f3efc..de6825f0fc 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/bounds_test.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/bounds_test.go @@ -28,6 +28,12 @@ func TestOverlap(t *testing.T) { // ensure [start,end) inclusivity works as expected a: newBounds(1, 5), b: newBounds(5, 6), + overlap: true, + }, + { + // ensure [start,end) inclusivity works as expected + a: newBounds(5, 6), + b: newBounds(1, 5), overlap: false, }, } { diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/multi_file_index.go b/pkg/storage/stores/shipper/indexshipper/tsdb/multi_file_index.go index 0c60448430..23bab83e41 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/multi_file_index.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/multi_file_index.go @@ -115,7 +115,7 @@ func (i *MultiIndex) forMatchingIndices(ctx context.Context, from, through model queryBounds := newBounds(from, through) return i.iter.For(ctx, i.maxParallel, func(ctx context.Context, idx Index) error { - if Overlap(queryBounds, idx) { + if Overlap(idx, queryBounds) { if i.filterer != nil { // TODO(owen-d): Find a nicer way