From d950d4075d2eaec3348b85a6fdef70cdb717318c Mon Sep 17 00:00:00 2001 From: Ashwanth Date: Tue, 2 Sep 2025 19:16:11 +0530 Subject: [PATCH] chore(engine): improve performance of matching window calculation (#19013) Currently range aggregation linearly scans through all the possible time windows (a 24h query with a 1m step results in 1440 windows) to find the matching ones. This is quite expensive as this matching is done for each row. This PR updates the matching code to use arithmetic which runs in constant time for cases where step >= range and for overlapping windows (step < range) it uses binary search. Co-authored-by: Ashwanth Goli Signed-off-by: Christian Haudum --- pkg/engine/executor/range_aggregation.go | 216 ++++++++++-- pkg/engine/executor/range_aggregation_test.go | 326 ++++++++++++++++++ 2 files changed, 504 insertions(+), 38 deletions(-) diff --git a/pkg/engine/executor/range_aggregation.go b/pkg/engine/executor/range_aggregation.go index 1774d9ab25..2987853bdf 100644 --- a/pkg/engine/executor/range_aggregation.go +++ b/pkg/engine/executor/range_aggregation.go @@ -4,6 +4,8 @@ import ( "context" "errors" "fmt" + "slices" + "sort" "time" "github.com/apache/arrow-go/v18/arrow" @@ -24,6 +26,22 @@ type rangeAggregationOptions struct { step time.Duration // step used for range queries } +// window is a time interval where start is exclusive and end is inclusive +// Refer to [logql.batchRangeVectorIterator]. +type window struct { + start, end time.Time +} + +// Contains returns if the timestamp t is within the bounds of the window. +// The window start is exclusive, the window end is inclusive. +func (w window) Contains(t time.Time) bool { + return t.After(w.start) && !t.After(w.end) +} + +// timestampMatchingWindowsFunc resolves matching range interval windows for a specific timestamp. +// The list can be empty if the timestamp is out of bounds or does not match any of the range windows. +type timestampMatchingWindowsFunc func(time.Time) []window + // rangeAggregationPipeline is a pipeline that performs aggregations over a time window. // // 1. It reads from the input pipelines @@ -37,8 +55,8 @@ type rangeAggregationPipeline struct { inputsExhausted bool // indicates if all inputs are exhausted aggregator *aggregator - matchingTimeWindows func(t time.Time) []time.Time // function to find matching time windows for a given timestamp - evaluator expressionEvaluator // used to evaluate column expressions + windowsForTimestamp timestampMatchingWindowsFunc // function to find matching time windows for a given timestamp + evaluator expressionEvaluator // used to evaluate column expressions opts rangeAggregationOptions } @@ -53,21 +71,10 @@ func newRangeAggregationPipeline(inputs []Pipeline, evaluator expressionEvaluato } func (r *rangeAggregationPipeline) init() { - windows := []struct { - // lower bound is not inclusive - // refer to [logql.batchRangeVectorIterator] - startTs time.Time - endTs time.Time - }{} + windows := []window{} cur := r.opts.startTs for cur.Compare(r.opts.endTs) <= 0 { - windows = append(windows, struct { - startTs time.Time - endTs time.Time - }{ - startTs: cur.Add(-r.opts.rangeInterval), - endTs: cur, - }) + windows = append(windows, window{start: cur.Add(-r.opts.rangeInterval), end: cur}) if r.opts.step == 0 { break @@ -77,26 +84,8 @@ func (r *rangeAggregationPipeline) init() { cur = cur.Add(r.opts.step) } - var ( - lowerbound = r.opts.startTs.Add(-r.opts.rangeInterval) - upperbound = r.opts.endTs - ) - - r.matchingTimeWindows = func(t time.Time) []time.Time { - if t.Compare(lowerbound) <= 0 || t.Compare(upperbound) > 0 { - return nil // out of range - } - - var ret []time.Time - for _, window := range windows { - if t.Compare(window.startTs) > 0 && t.Compare(window.endTs) <= 0 { - ret = append(ret, window.endTs) - } - } - - return ret - } - + f := newMatcherFactoryFromOpts(r.opts) + r.windowsForTimestamp = f.createMatcher(windows) r.aggregator = newAggregator(r.opts.partitionBy, len(windows)) } @@ -181,7 +170,7 @@ func (r *rangeAggregationPipeline) read(ctx context.Context) (arrow.Record, erro tsCol := vec.ToArray().(*array.Timestamp) for row := range int(record.NumRows()) { - windows := r.matchingTimeWindows(tsCol.Value(row).ToTime(arrow.Nanosecond)) + windows := r.windowsForTimestamp(tsCol.Value(row).ToTime(arrow.Nanosecond)) if len(windows) == 0 { continue // out of range, skip this row } @@ -192,8 +181,8 @@ func (r *rangeAggregationPipeline) read(ctx context.Context) (arrow.Record, erro labelValues[col] = arr.Value(row) } - for _, ts := range windows { - r.aggregator.Add(ts, 1, labelValues) + for _, w := range windows { + r.aggregator.Add(w.end, 1, labelValues) } } } @@ -225,3 +214,154 @@ func (r *rangeAggregationPipeline) Inputs() []Pipeline { func (r *rangeAggregationPipeline) Transport() Transport { return Local } + +func newMatcherFactoryFromOpts(opts rangeAggregationOptions) *matcherFactory { + return &matcherFactory{ + start: opts.startTs, + step: opts.step, + interval: opts.rangeInterval, + bounds: window{ + start: opts.startTs.Add(-opts.rangeInterval), + end: opts.endTs, + }, + } +} + +type matcherFactory struct { + start time.Time + step time.Duration + interval time.Duration + bounds window +} + +func (f *matcherFactory) createMatcher(windows []window) timestampMatchingWindowsFunc { + switch { + case f.step == 0: + // For instant queries, step == 0, meaning that all samples fall into the one and same step. + // A sample timestamp will always match the only time window available, unless the timestamp it out of range. + return f.createExactMatcher(windows) + case f.step == f.interval: + // If the step is equal to the range interval (e.g. when used $__auto in Grafana), then a sample timestamp matches exactly one time window. + return f.createAlignedMatcher(windows) + case f.step > f.interval: + // If the step is greater than the range interval, then a sample timestamp matches either one time window or no time window (and will be discarded). + return f.createGappedMatcher(windows) + case f.step < f.interval: + // If the step is smaller than the range interval, then a sample timestamp matches either one or multiple time windows. + return f.createOverlappingMatcher(windows) + default: + panic("invalid step and range interval") + } +} + +// createExactMatcher is used for instant queries. +// The function returns a matcher that always returns the first aggregation window from the given windows if the timestamp is not out of range. +// It is expected that len(windows) is exactly 1, but it is not enforced. +// +// steps |---------x-------| +// interval |---------x-------| +func (f *matcherFactory) createExactMatcher(windows []window) timestampMatchingWindowsFunc { + return func(t time.Time) []window { + if !f.bounds.Contains(t) { + return nil // out of range + } + if len(windows) == 0 { + return nil + } + return []window{windows[0]} + } +} + +// createAlignedMatcher is used for range queries. +// The function returns a matcher that always returns exactly one aggregation window that matches the timestamp if the timestamp is not out of range. +// +// steps |-----|---x-|-----| +// interval |-----| +// interval |---x-| +// interval |-----| +func (f *matcherFactory) createAlignedMatcher(windows []window) timestampMatchingWindowsFunc { + startNs := f.start.UnixNano() + stepNs := f.step.Nanoseconds() + + return func(t time.Time) []window { + if !f.bounds.Contains(t) { + return nil // out of range + } + + tNs := t.UnixNano() + // valid timestamps for window i: t > startNs + (i-1) * intervalNs && t <= startNs + i * intervalNs + windowIndex := (tNs - startNs + stepNs - 1) / stepNs // subtract 1ns because we are calculating 0-based indexes + return []window{windows[windowIndex]} + } +} + +// createGappedMatcher is used for range queries. +// The function returns a matcher that either returns exactly one aggregation window that matches the timestamp, or none, +// if the timestamp is out of bounds or within bounds, but is within a "gap" between the end of an interval and the beginning of the next interval. +// +// steps |-----|---x-|-----| +// interval |--| +// interval |x-| +// interval |--| +func (f *matcherFactory) createGappedMatcher(windows []window) timestampMatchingWindowsFunc { + startNs := f.start.UnixNano() + stepNs := f.step.Nanoseconds() + + return func(t time.Time) []window { + if !f.bounds.Contains(t) { + return nil // out of range + } + + tNs := t.UnixNano() + // For gapped windows, window i covers: (start + i*step - interval, start + i*step] + windowIndex := (tNs - startNs + stepNs - 1) / stepNs // subtract 1ns because we are calculating 0-based indexes + matchingWindow := windows[windowIndex] + + // Verify the timestamp is within the window (not in a gap) + if tNs > matchingWindow.start.UnixNano() { + return []window{matchingWindow} + } + + return nil // timestamp is in a gap + } +} + +// createOverlappingMatcher is used for range queries. +// The function returns a matcher that returns one or more aggregation windows that match the timestamp, if the timestamp is not out of range. +// +// steps |-----|---x-|-----| +// interval |x-------| +// interval |------x-| +// interval |--------| +func (f *matcherFactory) createOverlappingMatcher(windows []window) timestampMatchingWindowsFunc { + return func(t time.Time) []window { + if !f.bounds.Contains(t) { + return nil // out of range + } + + // Find the last window that could contain the timestamp. + // We need to find the last window where t > window.startTs + // so search for the first window where t <= window.startTs + firstOOBIndex := sort.Search(len(windows), func(i int) bool { + return t.Compare(windows[i].start) <= 0 + }) + + windowIndex := firstOOBIndex - 1 + if windowIndex < 0 { + return nil + } + + // Iterate backwards from last matching window to find all matches + var result []window + for _, window := range slices.Backward(windows[:windowIndex+1]) { + if t.Compare(window.start) > 0 && t.Compare(window.end) <= 0 { + result = append(result, window) + } else if t.Compare(window.end) > 0 { + // we've gone past all possible matches + break + } + } + + return result + } +} diff --git a/pkg/engine/executor/range_aggregation_test.go b/pkg/engine/executor/range_aggregation_test.go index 15e692dddb..2708b70d65 100644 --- a/pkg/engine/executor/range_aggregation_test.go +++ b/pkg/engine/executor/range_aggregation_test.go @@ -312,3 +312,329 @@ func TestRangeAggregationPipeline(t *testing.T) { require.ElementsMatch(t, expect, rows) }) } + +func TestMatcher(t *testing.T) { + t.Run("exactMatcher", func(t *testing.T) { + opts := rangeAggregationOptions{ + startTs: time.Unix(1000, 0), + endTs: time.Unix(1000, 0), + rangeInterval: 1000 * time.Second, // covers time range from 0 - 1000 + step: 0, // instant query + } + + // Create a single window for instant query + windows := []window{ + {start: time.Unix(0, 0), end: time.Unix(1000, 0)}, + } + + f := newMatcherFactoryFromOpts(opts) + matcher := f.createExactMatcher(windows) + + tests := []struct { + name string + timestamp time.Time + expected []window + }{ + { + name: "timestamp exactly at lowerbound (exclusive boundary)", + timestamp: f.bounds.start, + expected: nil, // should return nil as lowerbound is exclusive + }, + { + name: "timestamp greater than upperbound", + timestamp: f.bounds.end.Add(1 * time.Nanosecond), + expected: nil, // should return nil as lowerbound is exclusive + }, + { + name: "timestamp exactly at upperbound (inclusive boundary)", + timestamp: f.bounds.end, + expected: []window{windows[0]}, // should return window as upperbound is inclusive + }, + { + name: "timestamp just after lowerbound", + timestamp: f.bounds.start.Add(1 * time.Nanosecond), + expected: []window{windows[0]}, + }, + { + name: "timestamp just before upperbound", + timestamp: f.bounds.end.Add(-1 * time.Nanosecond), + expected: []window{windows[0]}, + }, + { + name: "timestamp within range", + timestamp: time.Unix(500, 0), + expected: []window{windows[0]}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := matcher(tt.timestamp) + + if tt.expected == nil { + require.Nil(t, result, "timestamp %v should not match any window", tt.timestamp) + } else { + requireEqualWindows(t, tt.expected, result) + } + }) + } + + t.Run("empty windows slice", func(t *testing.T) { + windows := []window{} + matcher := f.createExactMatcher(windows) + + // Should return nil for any timestamp when windows is empty + result := matcher(time.Unix(998, 0)) + require.Nil(t, result) + }) + + t.Run("multiple windows (should return first)", func(t *testing.T) { + windows := []window{ + {start: f.bounds.start, end: f.start}, + {start: f.bounds.start.Add(100 * time.Second), end: f.start.Add(100 * time.Second)}, + {start: f.bounds.start.Add(200 * time.Second), end: f.start.Add(200 * time.Second)}, + } + matcher := f.createExactMatcher(windows) + + // Should always return the first window for valid timestamps + result := matcher(time.Unix(998, 0)) + require.Equal(t, []window{windows[0]}, result) + }) + }) + + t.Run("alignedMatcher", func(t *testing.T) { + opts := rangeAggregationOptions{ + startTs: time.Unix(100, 0), + endTs: time.Unix(300, 0), + rangeInterval: 100 * time.Second, + step: 100 * time.Second, // step == rangeInterval + } + + // Create windows that align with lower/upper bounds and step + windows := []window{ + {start: time.Unix(0, 0), end: time.Unix(100, 0)}, + {start: time.Unix(100, 0), end: time.Unix(200, 0)}, + {start: time.Unix(200, 0), end: time.Unix(300, 0)}, + } + + f := newMatcherFactoryFromOpts(opts) + matcher := f.createAlignedMatcher(windows) + + tests := []struct { + name string + timestamp time.Time + expected []window + }{ + { + name: "timestamp exactly at lowerbound (exclusive boundary)", + timestamp: f.bounds.start, + expected: nil, // should return nil as lowerbound is exclusive + }, + { + name: "timestamp greater than upperbound", + timestamp: f.bounds.end.Add(1 * time.Nanosecond), + expected: nil, // should return nil as lowerbound is exclusive + }, + { + name: "timestamp exactly at upperbound (inclusive boundary)", + timestamp: f.bounds.end, + expected: []window{windows[2]}, // should return window as upperbound is inclusive + }, + { + name: "timestamp just after lowerbound", + timestamp: f.bounds.start.Add(1 * time.Nanosecond), + expected: []window{windows[0]}, + }, + { + name: "timestamp just before upperbound", + timestamp: f.bounds.end.Add(-1 * time.Nanosecond), + expected: []window{windows[2]}, + }, + { + name: "timestamp within range of window 1 (100-200]", + timestamp: time.Unix(150, 0), + expected: []window{windows[1]}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := matcher(tt.timestamp) + + if tt.expected == nil { + require.Nil(t, result, "timestamp %v should not match any window", tt.timestamp) + } else { + requireEqualWindows(t, tt.expected, result) + } + }) + } + }) + + t.Run("gappedMatcher", func(t *testing.T) { + opts := rangeAggregationOptions{ + startTs: time.Unix(100, 0), + endTs: time.Unix(300, 0), + rangeInterval: 80 * time.Second, + step: 100 * time.Second, // step > rangeInterval + } + + // Create windows that align with lower/upper bounds and step + windows := []window{ + {start: time.Unix(20, 0), end: time.Unix(100, 0)}, + {start: time.Unix(120, 0), end: time.Unix(200, 0)}, + {start: time.Unix(220, 0), end: time.Unix(300, 0)}, + } + + f := newMatcherFactoryFromOpts(opts) + matcher := f.createGappedMatcher(windows) + + tests := []struct { + name string + timestamp time.Time + expected []window + }{ + { + name: "timestamp exactly at lowerbound (exclusive boundary)", + timestamp: f.bounds.start, + expected: nil, // should return nil as lowerbound is exclusive + }, + { + name: "timestamp greater than upperbound", + timestamp: f.bounds.end.Add(1 * time.Nanosecond), + expected: nil, // should return nil as lowerbound is exclusive + }, + { + name: "timestamp in gap", + timestamp: time.Unix(110, 0), + expected: nil, // should return nil as the timestamp is in the "gap" between windows[0].end and windows[1].start + }, + { + name: "timestamp in gap at exactly start of window 1", + timestamp: time.Unix(120, 0), + expected: nil, // lower bound is exclusive + }, + { + name: "timestamp exactly at end of window 0", + timestamp: time.Unix(100, 0), + expected: []window{windows[0]}, // should return window as upperbound is inclusive + }, + { + name: "timestamp exactly at end of window 1", + timestamp: time.Unix(200, 0), + expected: []window{windows[1]}, // should return window as upperbound is inclusive + }, + { + name: "timestamp exactly at end of window 2", + timestamp: time.Unix(300, 0), + expected: []window{windows[2]}, // should return window as upperbound is inclusive + }, + { + name: "timestamp just before upperbound", + timestamp: f.bounds.end.Add(-1 * time.Nanosecond), + expected: []window{windows[2]}, + }, + { + name: "timestamp within range of window 1 (100-200]", + timestamp: time.Unix(150, 0), + expected: []window{windows[1]}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := matcher(tt.timestamp) + + if tt.expected == nil { + require.Nil(t, result, "timestamp %v should not match any window", tt.timestamp) + } else { + requireEqualWindows(t, tt.expected, result) + } + }) + } + }) + + t.Run("overlappingMatcher", func(t *testing.T) { + opts := rangeAggregationOptions{ + startTs: time.Unix(100, 0), + endTs: time.Unix(300, 0), + rangeInterval: 120 * time.Second, + step: 100 * time.Second, // step < rangeInterval + } + + // Create windows that align with lower/upper bounds and step + windows := []window{ + {start: time.Unix(-20, 0), end: time.Unix(100, 0)}, + {start: time.Unix(80, 0), end: time.Unix(200, 0)}, + {start: time.Unix(180, 0), end: time.Unix(300, 0)}, + } + + f := newMatcherFactoryFromOpts(opts) + matcher := f.createOverlappingMatcher(windows) + + tests := []struct { + name string + timestamp time.Time + expected []window + }{ + { + name: "timestamp exactly at lowerbound (exclusive boundary)", + timestamp: f.bounds.start, + expected: nil, // should return nil as lowerbound is exclusive + }, + { + name: "timestamp exactly at upperbound (inclusive boundary)", + timestamp: f.bounds.end, + expected: []window{windows[2]}, // should return window as upperbound is inclusive + }, + { + name: "timestamp exactly at start of window 1", + timestamp: time.Unix(80, 0), + expected: []window{windows[0]}, + }, + { + name: "timestamp exactly at overlap of window 0 and window 1", + timestamp: time.Unix(90, 0), + expected: []window{windows[0], windows[1]}, + }, + { + name: "timestamp exactly at end of window 0", + timestamp: time.Unix(100, 0), + expected: []window{windows[0], windows[1]}, + }, + { + name: "timestamp just before upperbound", + timestamp: f.bounds.end.Add(-1 * time.Nanosecond), + expected: []window{windows[2]}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := matcher(tt.timestamp) + + if tt.expected == nil { + require.Nil(t, result, "timestamp %v should not match any window", tt.timestamp) + } else { + requireEqualWindows(t, tt.expected, result) + } + }) + } + }) +} + +// requireEqualWindows asserts that two slices of window structs contain the same elements. +func requireEqualWindows(t *testing.T, expected, actual []window) { + t.Helper() + + slices.SortStableFunc(expected, func(a, b window) int { return a.end.Compare(b.end) }) + slices.SortStableFunc(actual, func(a, b window) int { return a.end.Compare(b.end) }) + + require.Equal(t, len(expected), len(actual), "window slices should have the same length") + + for i := 0; i < len(expected); i++ { + require.Equal(t, expected[i].start.UnixNano(), actual[i].start.UnixNano(), + "window[%d] start time mismatch: expected %s, actual %s", i, expected[i].start, actual[i].start) + require.Equal(t, expected[i].end.UnixNano(), actual[i].end.UnixNano(), + "window[%d] end time mismatch: expected %s, actual %s", i, expected[i].end, actual[i].end) + } +}