diff --git a/pkg/engine/executor/range_aggregation.go b/pkg/engine/executor/range_aggregation.go index 1774d9ab25..2987853bdf 100644 --- a/pkg/engine/executor/range_aggregation.go +++ b/pkg/engine/executor/range_aggregation.go @@ -4,6 +4,8 @@ import ( "context" "errors" "fmt" + "slices" + "sort" "time" "github.com/apache/arrow-go/v18/arrow" @@ -24,6 +26,22 @@ type rangeAggregationOptions struct { step time.Duration // step used for range queries } +// window is a time interval where start is exclusive and end is inclusive +// Refer to [logql.batchRangeVectorIterator]. +type window struct { + start, end time.Time +} + +// Contains returns if the timestamp t is within the bounds of the window. +// The window start is exclusive, the window end is inclusive. +func (w window) Contains(t time.Time) bool { + return t.After(w.start) && !t.After(w.end) +} + +// timestampMatchingWindowsFunc resolves matching range interval windows for a specific timestamp. +// The list can be empty if the timestamp is out of bounds or does not match any of the range windows. +type timestampMatchingWindowsFunc func(time.Time) []window + // rangeAggregationPipeline is a pipeline that performs aggregations over a time window. // // 1. It reads from the input pipelines @@ -37,8 +55,8 @@ type rangeAggregationPipeline struct { inputsExhausted bool // indicates if all inputs are exhausted aggregator *aggregator - matchingTimeWindows func(t time.Time) []time.Time // function to find matching time windows for a given timestamp - evaluator expressionEvaluator // used to evaluate column expressions + windowsForTimestamp timestampMatchingWindowsFunc // function to find matching time windows for a given timestamp + evaluator expressionEvaluator // used to evaluate column expressions opts rangeAggregationOptions } @@ -53,21 +71,10 @@ func newRangeAggregationPipeline(inputs []Pipeline, evaluator expressionEvaluato } func (r *rangeAggregationPipeline) init() { - windows := []struct { - // lower bound is not inclusive - // refer to [logql.batchRangeVectorIterator] - startTs time.Time - endTs time.Time - }{} + windows := []window{} cur := r.opts.startTs for cur.Compare(r.opts.endTs) <= 0 { - windows = append(windows, struct { - startTs time.Time - endTs time.Time - }{ - startTs: cur.Add(-r.opts.rangeInterval), - endTs: cur, - }) + windows = append(windows, window{start: cur.Add(-r.opts.rangeInterval), end: cur}) if r.opts.step == 0 { break @@ -77,26 +84,8 @@ func (r *rangeAggregationPipeline) init() { cur = cur.Add(r.opts.step) } - var ( - lowerbound = r.opts.startTs.Add(-r.opts.rangeInterval) - upperbound = r.opts.endTs - ) - - r.matchingTimeWindows = func(t time.Time) []time.Time { - if t.Compare(lowerbound) <= 0 || t.Compare(upperbound) > 0 { - return nil // out of range - } - - var ret []time.Time - for _, window := range windows { - if t.Compare(window.startTs) > 0 && t.Compare(window.endTs) <= 0 { - ret = append(ret, window.endTs) - } - } - - return ret - } - + f := newMatcherFactoryFromOpts(r.opts) + r.windowsForTimestamp = f.createMatcher(windows) r.aggregator = newAggregator(r.opts.partitionBy, len(windows)) } @@ -181,7 +170,7 @@ func (r *rangeAggregationPipeline) read(ctx context.Context) (arrow.Record, erro tsCol := vec.ToArray().(*array.Timestamp) for row := range int(record.NumRows()) { - windows := r.matchingTimeWindows(tsCol.Value(row).ToTime(arrow.Nanosecond)) + windows := r.windowsForTimestamp(tsCol.Value(row).ToTime(arrow.Nanosecond)) if len(windows) == 0 { continue // out of range, skip this row } @@ -192,8 +181,8 @@ func (r *rangeAggregationPipeline) read(ctx context.Context) (arrow.Record, erro labelValues[col] = arr.Value(row) } - for _, ts := range windows { - r.aggregator.Add(ts, 1, labelValues) + for _, w := range windows { + r.aggregator.Add(w.end, 1, labelValues) } } } @@ -225,3 +214,154 @@ func (r *rangeAggregationPipeline) Inputs() []Pipeline { func (r *rangeAggregationPipeline) Transport() Transport { return Local } + +func newMatcherFactoryFromOpts(opts rangeAggregationOptions) *matcherFactory { + return &matcherFactory{ + start: opts.startTs, + step: opts.step, + interval: opts.rangeInterval, + bounds: window{ + start: opts.startTs.Add(-opts.rangeInterval), + end: opts.endTs, + }, + } +} + +type matcherFactory struct { + start time.Time + step time.Duration + interval time.Duration + bounds window +} + +func (f *matcherFactory) createMatcher(windows []window) timestampMatchingWindowsFunc { + switch { + case f.step == 0: + // For instant queries, step == 0, meaning that all samples fall into the one and same step. + // A sample timestamp will always match the only time window available, unless the timestamp it out of range. + return f.createExactMatcher(windows) + case f.step == f.interval: + // If the step is equal to the range interval (e.g. when used $__auto in Grafana), then a sample timestamp matches exactly one time window. + return f.createAlignedMatcher(windows) + case f.step > f.interval: + // If the step is greater than the range interval, then a sample timestamp matches either one time window or no time window (and will be discarded). + return f.createGappedMatcher(windows) + case f.step < f.interval: + // If the step is smaller than the range interval, then a sample timestamp matches either one or multiple time windows. + return f.createOverlappingMatcher(windows) + default: + panic("invalid step and range interval") + } +} + +// createExactMatcher is used for instant queries. +// The function returns a matcher that always returns the first aggregation window from the given windows if the timestamp is not out of range. +// It is expected that len(windows) is exactly 1, but it is not enforced. +// +// steps |---------x-------| +// interval |---------x-------| +func (f *matcherFactory) createExactMatcher(windows []window) timestampMatchingWindowsFunc { + return func(t time.Time) []window { + if !f.bounds.Contains(t) { + return nil // out of range + } + if len(windows) == 0 { + return nil + } + return []window{windows[0]} + } +} + +// createAlignedMatcher is used for range queries. +// The function returns a matcher that always returns exactly one aggregation window that matches the timestamp if the timestamp is not out of range. +// +// steps |-----|---x-|-----| +// interval |-----| +// interval |---x-| +// interval |-----| +func (f *matcherFactory) createAlignedMatcher(windows []window) timestampMatchingWindowsFunc { + startNs := f.start.UnixNano() + stepNs := f.step.Nanoseconds() + + return func(t time.Time) []window { + if !f.bounds.Contains(t) { + return nil // out of range + } + + tNs := t.UnixNano() + // valid timestamps for window i: t > startNs + (i-1) * intervalNs && t <= startNs + i * intervalNs + windowIndex := (tNs - startNs + stepNs - 1) / stepNs // subtract 1ns because we are calculating 0-based indexes + return []window{windows[windowIndex]} + } +} + +// createGappedMatcher is used for range queries. +// The function returns a matcher that either returns exactly one aggregation window that matches the timestamp, or none, +// if the timestamp is out of bounds or within bounds, but is within a "gap" between the end of an interval and the beginning of the next interval. +// +// steps |-----|---x-|-----| +// interval |--| +// interval |x-| +// interval |--| +func (f *matcherFactory) createGappedMatcher(windows []window) timestampMatchingWindowsFunc { + startNs := f.start.UnixNano() + stepNs := f.step.Nanoseconds() + + return func(t time.Time) []window { + if !f.bounds.Contains(t) { + return nil // out of range + } + + tNs := t.UnixNano() + // For gapped windows, window i covers: (start + i*step - interval, start + i*step] + windowIndex := (tNs - startNs + stepNs - 1) / stepNs // subtract 1ns because we are calculating 0-based indexes + matchingWindow := windows[windowIndex] + + // Verify the timestamp is within the window (not in a gap) + if tNs > matchingWindow.start.UnixNano() { + return []window{matchingWindow} + } + + return nil // timestamp is in a gap + } +} + +// createOverlappingMatcher is used for range queries. +// The function returns a matcher that returns one or more aggregation windows that match the timestamp, if the timestamp is not out of range. +// +// steps |-----|---x-|-----| +// interval |x-------| +// interval |------x-| +// interval |--------| +func (f *matcherFactory) createOverlappingMatcher(windows []window) timestampMatchingWindowsFunc { + return func(t time.Time) []window { + if !f.bounds.Contains(t) { + return nil // out of range + } + + // Find the last window that could contain the timestamp. + // We need to find the last window where t > window.startTs + // so search for the first window where t <= window.startTs + firstOOBIndex := sort.Search(len(windows), func(i int) bool { + return t.Compare(windows[i].start) <= 0 + }) + + windowIndex := firstOOBIndex - 1 + if windowIndex < 0 { + return nil + } + + // Iterate backwards from last matching window to find all matches + var result []window + for _, window := range slices.Backward(windows[:windowIndex+1]) { + if t.Compare(window.start) > 0 && t.Compare(window.end) <= 0 { + result = append(result, window) + } else if t.Compare(window.end) > 0 { + // we've gone past all possible matches + break + } + } + + return result + } +} diff --git a/pkg/engine/executor/range_aggregation_test.go b/pkg/engine/executor/range_aggregation_test.go index 15e692dddb..2708b70d65 100644 --- a/pkg/engine/executor/range_aggregation_test.go +++ b/pkg/engine/executor/range_aggregation_test.go @@ -312,3 +312,329 @@ func TestRangeAggregationPipeline(t *testing.T) { require.ElementsMatch(t, expect, rows) }) } + +func TestMatcher(t *testing.T) { + t.Run("exactMatcher", func(t *testing.T) { + opts := rangeAggregationOptions{ + startTs: time.Unix(1000, 0), + endTs: time.Unix(1000, 0), + rangeInterval: 1000 * time.Second, // covers time range from 0 - 1000 + step: 0, // instant query + } + + // Create a single window for instant query + windows := []window{ + {start: time.Unix(0, 0), end: time.Unix(1000, 0)}, + } + + f := newMatcherFactoryFromOpts(opts) + matcher := f.createExactMatcher(windows) + + tests := []struct { + name string + timestamp time.Time + expected []window + }{ + { + name: "timestamp exactly at lowerbound (exclusive boundary)", + timestamp: f.bounds.start, + expected: nil, // should return nil as lowerbound is exclusive + }, + { + name: "timestamp greater than upperbound", + timestamp: f.bounds.end.Add(1 * time.Nanosecond), + expected: nil, // should return nil as lowerbound is exclusive + }, + { + name: "timestamp exactly at upperbound (inclusive boundary)", + timestamp: f.bounds.end, + expected: []window{windows[0]}, // should return window as upperbound is inclusive + }, + { + name: "timestamp just after lowerbound", + timestamp: f.bounds.start.Add(1 * time.Nanosecond), + expected: []window{windows[0]}, + }, + { + name: "timestamp just before upperbound", + timestamp: f.bounds.end.Add(-1 * time.Nanosecond), + expected: []window{windows[0]}, + }, + { + name: "timestamp within range", + timestamp: time.Unix(500, 0), + expected: []window{windows[0]}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := matcher(tt.timestamp) + + if tt.expected == nil { + require.Nil(t, result, "timestamp %v should not match any window", tt.timestamp) + } else { + requireEqualWindows(t, tt.expected, result) + } + }) + } + + t.Run("empty windows slice", func(t *testing.T) { + windows := []window{} + matcher := f.createExactMatcher(windows) + + // Should return nil for any timestamp when windows is empty + result := matcher(time.Unix(998, 0)) + require.Nil(t, result) + }) + + t.Run("multiple windows (should return first)", func(t *testing.T) { + windows := []window{ + {start: f.bounds.start, end: f.start}, + {start: f.bounds.start.Add(100 * time.Second), end: f.start.Add(100 * time.Second)}, + {start: f.bounds.start.Add(200 * time.Second), end: f.start.Add(200 * time.Second)}, + } + matcher := f.createExactMatcher(windows) + + // Should always return the first window for valid timestamps + result := matcher(time.Unix(998, 0)) + require.Equal(t, []window{windows[0]}, result) + }) + }) + + t.Run("alignedMatcher", func(t *testing.T) { + opts := rangeAggregationOptions{ + startTs: time.Unix(100, 0), + endTs: time.Unix(300, 0), + rangeInterval: 100 * time.Second, + step: 100 * time.Second, // step == rangeInterval + } + + // Create windows that align with lower/upper bounds and step + windows := []window{ + {start: time.Unix(0, 0), end: time.Unix(100, 0)}, + {start: time.Unix(100, 0), end: time.Unix(200, 0)}, + {start: time.Unix(200, 0), end: time.Unix(300, 0)}, + } + + f := newMatcherFactoryFromOpts(opts) + matcher := f.createAlignedMatcher(windows) + + tests := []struct { + name string + timestamp time.Time + expected []window + }{ + { + name: "timestamp exactly at lowerbound (exclusive boundary)", + timestamp: f.bounds.start, + expected: nil, // should return nil as lowerbound is exclusive + }, + { + name: "timestamp greater than upperbound", + timestamp: f.bounds.end.Add(1 * time.Nanosecond), + expected: nil, // should return nil as lowerbound is exclusive + }, + { + name: "timestamp exactly at upperbound (inclusive boundary)", + timestamp: f.bounds.end, + expected: []window{windows[2]}, // should return window as upperbound is inclusive + }, + { + name: "timestamp just after lowerbound", + timestamp: f.bounds.start.Add(1 * time.Nanosecond), + expected: []window{windows[0]}, + }, + { + name: "timestamp just before upperbound", + timestamp: f.bounds.end.Add(-1 * time.Nanosecond), + expected: []window{windows[2]}, + }, + { + name: "timestamp within range of window 1 (100-200]", + timestamp: time.Unix(150, 0), + expected: []window{windows[1]}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := matcher(tt.timestamp) + + if tt.expected == nil { + require.Nil(t, result, "timestamp %v should not match any window", tt.timestamp) + } else { + requireEqualWindows(t, tt.expected, result) + } + }) + } + }) + + t.Run("gappedMatcher", func(t *testing.T) { + opts := rangeAggregationOptions{ + startTs: time.Unix(100, 0), + endTs: time.Unix(300, 0), + rangeInterval: 80 * time.Second, + step: 100 * time.Second, // step > rangeInterval + } + + // Create windows that align with lower/upper bounds and step + windows := []window{ + {start: time.Unix(20, 0), end: time.Unix(100, 0)}, + {start: time.Unix(120, 0), end: time.Unix(200, 0)}, + {start: time.Unix(220, 0), end: time.Unix(300, 0)}, + } + + f := newMatcherFactoryFromOpts(opts) + matcher := f.createGappedMatcher(windows) + + tests := []struct { + name string + timestamp time.Time + expected []window + }{ + { + name: "timestamp exactly at lowerbound (exclusive boundary)", + timestamp: f.bounds.start, + expected: nil, // should return nil as lowerbound is exclusive + }, + { + name: "timestamp greater than upperbound", + timestamp: f.bounds.end.Add(1 * time.Nanosecond), + expected: nil, // should return nil as lowerbound is exclusive + }, + { + name: "timestamp in gap", + timestamp: time.Unix(110, 0), + expected: nil, // should return nil as the timestamp is in the "gap" between windows[0].end and windows[1].start + }, + { + name: "timestamp in gap at exactly start of window 1", + timestamp: time.Unix(120, 0), + expected: nil, // lower bound is exclusive + }, + { + name: "timestamp exactly at end of window 0", + timestamp: time.Unix(100, 0), + expected: []window{windows[0]}, // should return window as upperbound is inclusive + }, + { + name: "timestamp exactly at end of window 1", + timestamp: time.Unix(200, 0), + expected: []window{windows[1]}, // should return window as upperbound is inclusive + }, + { + name: "timestamp exactly at end of window 2", + timestamp: time.Unix(300, 0), + expected: []window{windows[2]}, // should return window as upperbound is inclusive + }, + { + name: "timestamp just before upperbound", + timestamp: f.bounds.end.Add(-1 * time.Nanosecond), + expected: []window{windows[2]}, + }, + { + name: "timestamp within range of window 1 (100-200]", + timestamp: time.Unix(150, 0), + expected: []window{windows[1]}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := matcher(tt.timestamp) + + if tt.expected == nil { + require.Nil(t, result, "timestamp %v should not match any window", tt.timestamp) + } else { + requireEqualWindows(t, tt.expected, result) + } + }) + } + }) + + t.Run("overlappingMatcher", func(t *testing.T) { + opts := rangeAggregationOptions{ + startTs: time.Unix(100, 0), + endTs: time.Unix(300, 0), + rangeInterval: 120 * time.Second, + step: 100 * time.Second, // step < rangeInterval + } + + // Create windows that align with lower/upper bounds and step + windows := []window{ + {start: time.Unix(-20, 0), end: time.Unix(100, 0)}, + {start: time.Unix(80, 0), end: time.Unix(200, 0)}, + {start: time.Unix(180, 0), end: time.Unix(300, 0)}, + } + + f := newMatcherFactoryFromOpts(opts) + matcher := f.createOverlappingMatcher(windows) + + tests := []struct { + name string + timestamp time.Time + expected []window + }{ + { + name: "timestamp exactly at lowerbound (exclusive boundary)", + timestamp: f.bounds.start, + expected: nil, // should return nil as lowerbound is exclusive + }, + { + name: "timestamp exactly at upperbound (inclusive boundary)", + timestamp: f.bounds.end, + expected: []window{windows[2]}, // should return window as upperbound is inclusive + }, + { + name: "timestamp exactly at start of window 1", + timestamp: time.Unix(80, 0), + expected: []window{windows[0]}, + }, + { + name: "timestamp exactly at overlap of window 0 and window 1", + timestamp: time.Unix(90, 0), + expected: []window{windows[0], windows[1]}, + }, + { + name: "timestamp exactly at end of window 0", + timestamp: time.Unix(100, 0), + expected: []window{windows[0], windows[1]}, + }, + { + name: "timestamp just before upperbound", + timestamp: f.bounds.end.Add(-1 * time.Nanosecond), + expected: []window{windows[2]}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := matcher(tt.timestamp) + + if tt.expected == nil { + require.Nil(t, result, "timestamp %v should not match any window", tt.timestamp) + } else { + requireEqualWindows(t, tt.expected, result) + } + }) + } + }) +} + +// requireEqualWindows asserts that two slices of window structs contain the same elements. +func requireEqualWindows(t *testing.T, expected, actual []window) { + t.Helper() + + slices.SortStableFunc(expected, func(a, b window) int { return a.end.Compare(b.end) }) + slices.SortStableFunc(actual, func(a, b window) int { return a.end.Compare(b.end) }) + + require.Equal(t, len(expected), len(actual), "window slices should have the same length") + + for i := 0; i < len(expected); i++ { + require.Equal(t, expected[i].start.UnixNano(), actual[i].start.UnixNano(), + "window[%d] start time mismatch: expected %s, actual %s", i, expected[i].start, actual[i].start) + require.Equal(t, expected[i].end.UnixNano(), actual[i].end.UnixNano(), + "window[%d] end time mismatch: expected %s, actual %s", i, expected[i].end, actual[i].end) + } +}