chore(engine): improve performance of matching window calculation (#19013)

Currently range aggregation linearly scans through all the possible time windows (a 24h query with a 1m step results in 1440 windows) to find the matching ones. This is quite expensive as this matching is done for each row.

This PR updates the matching code to use arithmetic which runs in constant time for cases where step >= range and for overlapping windows (step < range) it uses binary search.

Co-authored-by: Ashwanth Goli <iamashwanth@gmail.com>
Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/18951/head^2
Ashwanth 4 months ago committed by GitHub
parent 6a953bb492
commit d950d4075d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 216
      pkg/engine/executor/range_aggregation.go
  2. 326
      pkg/engine/executor/range_aggregation_test.go

@ -4,6 +4,8 @@ import (
"context"
"errors"
"fmt"
"slices"
"sort"
"time"
"github.com/apache/arrow-go/v18/arrow"
@ -24,6 +26,22 @@ type rangeAggregationOptions struct {
step time.Duration // step used for range queries
}
// window is a time interval where start is exclusive and end is inclusive
// Refer to [logql.batchRangeVectorIterator].
type window struct {
start, end time.Time
}
// Contains returns if the timestamp t is within the bounds of the window.
// The window start is exclusive, the window end is inclusive.
func (w window) Contains(t time.Time) bool {
return t.After(w.start) && !t.After(w.end)
}
// timestampMatchingWindowsFunc resolves matching range interval windows for a specific timestamp.
// The list can be empty if the timestamp is out of bounds or does not match any of the range windows.
type timestampMatchingWindowsFunc func(time.Time) []window
// rangeAggregationPipeline is a pipeline that performs aggregations over a time window.
//
// 1. It reads from the input pipelines
@ -37,8 +55,8 @@ type rangeAggregationPipeline struct {
inputsExhausted bool // indicates if all inputs are exhausted
aggregator *aggregator
matchingTimeWindows func(t time.Time) []time.Time // function to find matching time windows for a given timestamp
evaluator expressionEvaluator // used to evaluate column expressions
windowsForTimestamp timestampMatchingWindowsFunc // function to find matching time windows for a given timestamp
evaluator expressionEvaluator // used to evaluate column expressions
opts rangeAggregationOptions
}
@ -53,21 +71,10 @@ func newRangeAggregationPipeline(inputs []Pipeline, evaluator expressionEvaluato
}
func (r *rangeAggregationPipeline) init() {
windows := []struct {
// lower bound is not inclusive
// refer to [logql.batchRangeVectorIterator]
startTs time.Time
endTs time.Time
}{}
windows := []window{}
cur := r.opts.startTs
for cur.Compare(r.opts.endTs) <= 0 {
windows = append(windows, struct {
startTs time.Time
endTs time.Time
}{
startTs: cur.Add(-r.opts.rangeInterval),
endTs: cur,
})
windows = append(windows, window{start: cur.Add(-r.opts.rangeInterval), end: cur})
if r.opts.step == 0 {
break
@ -77,26 +84,8 @@ func (r *rangeAggregationPipeline) init() {
cur = cur.Add(r.opts.step)
}
var (
lowerbound = r.opts.startTs.Add(-r.opts.rangeInterval)
upperbound = r.opts.endTs
)
r.matchingTimeWindows = func(t time.Time) []time.Time {
if t.Compare(lowerbound) <= 0 || t.Compare(upperbound) > 0 {
return nil // out of range
}
var ret []time.Time
for _, window := range windows {
if t.Compare(window.startTs) > 0 && t.Compare(window.endTs) <= 0 {
ret = append(ret, window.endTs)
}
}
return ret
}
f := newMatcherFactoryFromOpts(r.opts)
r.windowsForTimestamp = f.createMatcher(windows)
r.aggregator = newAggregator(r.opts.partitionBy, len(windows))
}
@ -181,7 +170,7 @@ func (r *rangeAggregationPipeline) read(ctx context.Context) (arrow.Record, erro
tsCol := vec.ToArray().(*array.Timestamp)
for row := range int(record.NumRows()) {
windows := r.matchingTimeWindows(tsCol.Value(row).ToTime(arrow.Nanosecond))
windows := r.windowsForTimestamp(tsCol.Value(row).ToTime(arrow.Nanosecond))
if len(windows) == 0 {
continue // out of range, skip this row
}
@ -192,8 +181,8 @@ func (r *rangeAggregationPipeline) read(ctx context.Context) (arrow.Record, erro
labelValues[col] = arr.Value(row)
}
for _, ts := range windows {
r.aggregator.Add(ts, 1, labelValues)
for _, w := range windows {
r.aggregator.Add(w.end, 1, labelValues)
}
}
}
@ -225,3 +214,154 @@ func (r *rangeAggregationPipeline) Inputs() []Pipeline {
func (r *rangeAggregationPipeline) Transport() Transport {
return Local
}
func newMatcherFactoryFromOpts(opts rangeAggregationOptions) *matcherFactory {
return &matcherFactory{
start: opts.startTs,
step: opts.step,
interval: opts.rangeInterval,
bounds: window{
start: opts.startTs.Add(-opts.rangeInterval),
end: opts.endTs,
},
}
}
type matcherFactory struct {
start time.Time
step time.Duration
interval time.Duration
bounds window
}
func (f *matcherFactory) createMatcher(windows []window) timestampMatchingWindowsFunc {
switch {
case f.step == 0:
// For instant queries, step == 0, meaning that all samples fall into the one and same step.
// A sample timestamp will always match the only time window available, unless the timestamp it out of range.
return f.createExactMatcher(windows)
case f.step == f.interval:
// If the step is equal to the range interval (e.g. when used $__auto in Grafana), then a sample timestamp matches exactly one time window.
return f.createAlignedMatcher(windows)
case f.step > f.interval:
// If the step is greater than the range interval, then a sample timestamp matches either one time window or no time window (and will be discarded).
return f.createGappedMatcher(windows)
case f.step < f.interval:
// If the step is smaller than the range interval, then a sample timestamp matches either one or multiple time windows.
return f.createOverlappingMatcher(windows)
default:
panic("invalid step and range interval")
}
}
// createExactMatcher is used for instant queries.
// The function returns a matcher that always returns the first aggregation window from the given windows if the timestamp is not out of range.
// It is expected that len(windows) is exactly 1, but it is not enforced.
//
// steps |---------x-------|
// interval |---------x-------|
func (f *matcherFactory) createExactMatcher(windows []window) timestampMatchingWindowsFunc {
return func(t time.Time) []window {
if !f.bounds.Contains(t) {
return nil // out of range
}
if len(windows) == 0 {
return nil
}
return []window{windows[0]}
}
}
// createAlignedMatcher is used for range queries.
// The function returns a matcher that always returns exactly one aggregation window that matches the timestamp if the timestamp is not out of range.
//
// steps |-----|---x-|-----|
// interval |-----|
// interval |---x-|
// interval |-----|
func (f *matcherFactory) createAlignedMatcher(windows []window) timestampMatchingWindowsFunc {
startNs := f.start.UnixNano()
stepNs := f.step.Nanoseconds()
return func(t time.Time) []window {
if !f.bounds.Contains(t) {
return nil // out of range
}
tNs := t.UnixNano()
// valid timestamps for window i: t > startNs + (i-1) * intervalNs && t <= startNs + i * intervalNs
windowIndex := (tNs - startNs + stepNs - 1) / stepNs // subtract 1ns because we are calculating 0-based indexes
return []window{windows[windowIndex]}
}
}
// createGappedMatcher is used for range queries.
// The function returns a matcher that either returns exactly one aggregation window that matches the timestamp, or none,
// if the timestamp is out of bounds or within bounds, but is within a "gap" between the end of an interval and the beginning of the next interval.
//
// steps |-----|---x-|-----|
// interval |--|
// interval |x-|
// interval |--|
func (f *matcherFactory) createGappedMatcher(windows []window) timestampMatchingWindowsFunc {
startNs := f.start.UnixNano()
stepNs := f.step.Nanoseconds()
return func(t time.Time) []window {
if !f.bounds.Contains(t) {
return nil // out of range
}
tNs := t.UnixNano()
// For gapped windows, window i covers: (start + i*step - interval, start + i*step]
windowIndex := (tNs - startNs + stepNs - 1) / stepNs // subtract 1ns because we are calculating 0-based indexes
matchingWindow := windows[windowIndex]
// Verify the timestamp is within the window (not in a gap)
if tNs > matchingWindow.start.UnixNano() {
return []window{matchingWindow}
}
return nil // timestamp is in a gap
}
}
// createOverlappingMatcher is used for range queries.
// The function returns a matcher that returns one or more aggregation windows that match the timestamp, if the timestamp is not out of range.
//
// steps |-----|---x-|-----|
// interval |x-------|
// interval |------x-|
// interval |--------|
func (f *matcherFactory) createOverlappingMatcher(windows []window) timestampMatchingWindowsFunc {
return func(t time.Time) []window {
if !f.bounds.Contains(t) {
return nil // out of range
}
// Find the last window that could contain the timestamp.
// We need to find the last window where t > window.startTs
// so search for the first window where t <= window.startTs
firstOOBIndex := sort.Search(len(windows), func(i int) bool {
return t.Compare(windows[i].start) <= 0
})
windowIndex := firstOOBIndex - 1
if windowIndex < 0 {
return nil
}
// Iterate backwards from last matching window to find all matches
var result []window
for _, window := range slices.Backward(windows[:windowIndex+1]) {
if t.Compare(window.start) > 0 && t.Compare(window.end) <= 0 {
result = append(result, window)
} else if t.Compare(window.end) > 0 {
// we've gone past all possible matches
break
}
}
return result
}
}

@ -312,3 +312,329 @@ func TestRangeAggregationPipeline(t *testing.T) {
require.ElementsMatch(t, expect, rows)
})
}
func TestMatcher(t *testing.T) {
t.Run("exactMatcher", func(t *testing.T) {
opts := rangeAggregationOptions{
startTs: time.Unix(1000, 0),
endTs: time.Unix(1000, 0),
rangeInterval: 1000 * time.Second, // covers time range from 0 - 1000
step: 0, // instant query
}
// Create a single window for instant query
windows := []window{
{start: time.Unix(0, 0), end: time.Unix(1000, 0)},
}
f := newMatcherFactoryFromOpts(opts)
matcher := f.createExactMatcher(windows)
tests := []struct {
name string
timestamp time.Time
expected []window
}{
{
name: "timestamp exactly at lowerbound (exclusive boundary)",
timestamp: f.bounds.start,
expected: nil, // should return nil as lowerbound is exclusive
},
{
name: "timestamp greater than upperbound",
timestamp: f.bounds.end.Add(1 * time.Nanosecond),
expected: nil, // should return nil as lowerbound is exclusive
},
{
name: "timestamp exactly at upperbound (inclusive boundary)",
timestamp: f.bounds.end,
expected: []window{windows[0]}, // should return window as upperbound is inclusive
},
{
name: "timestamp just after lowerbound",
timestamp: f.bounds.start.Add(1 * time.Nanosecond),
expected: []window{windows[0]},
},
{
name: "timestamp just before upperbound",
timestamp: f.bounds.end.Add(-1 * time.Nanosecond),
expected: []window{windows[0]},
},
{
name: "timestamp within range",
timestamp: time.Unix(500, 0),
expected: []window{windows[0]},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := matcher(tt.timestamp)
if tt.expected == nil {
require.Nil(t, result, "timestamp %v should not match any window", tt.timestamp)
} else {
requireEqualWindows(t, tt.expected, result)
}
})
}
t.Run("empty windows slice", func(t *testing.T) {
windows := []window{}
matcher := f.createExactMatcher(windows)
// Should return nil for any timestamp when windows is empty
result := matcher(time.Unix(998, 0))
require.Nil(t, result)
})
t.Run("multiple windows (should return first)", func(t *testing.T) {
windows := []window{
{start: f.bounds.start, end: f.start},
{start: f.bounds.start.Add(100 * time.Second), end: f.start.Add(100 * time.Second)},
{start: f.bounds.start.Add(200 * time.Second), end: f.start.Add(200 * time.Second)},
}
matcher := f.createExactMatcher(windows)
// Should always return the first window for valid timestamps
result := matcher(time.Unix(998, 0))
require.Equal(t, []window{windows[0]}, result)
})
})
t.Run("alignedMatcher", func(t *testing.T) {
opts := rangeAggregationOptions{
startTs: time.Unix(100, 0),
endTs: time.Unix(300, 0),
rangeInterval: 100 * time.Second,
step: 100 * time.Second, // step == rangeInterval
}
// Create windows that align with lower/upper bounds and step
windows := []window{
{start: time.Unix(0, 0), end: time.Unix(100, 0)},
{start: time.Unix(100, 0), end: time.Unix(200, 0)},
{start: time.Unix(200, 0), end: time.Unix(300, 0)},
}
f := newMatcherFactoryFromOpts(opts)
matcher := f.createAlignedMatcher(windows)
tests := []struct {
name string
timestamp time.Time
expected []window
}{
{
name: "timestamp exactly at lowerbound (exclusive boundary)",
timestamp: f.bounds.start,
expected: nil, // should return nil as lowerbound is exclusive
},
{
name: "timestamp greater than upperbound",
timestamp: f.bounds.end.Add(1 * time.Nanosecond),
expected: nil, // should return nil as lowerbound is exclusive
},
{
name: "timestamp exactly at upperbound (inclusive boundary)",
timestamp: f.bounds.end,
expected: []window{windows[2]}, // should return window as upperbound is inclusive
},
{
name: "timestamp just after lowerbound",
timestamp: f.bounds.start.Add(1 * time.Nanosecond),
expected: []window{windows[0]},
},
{
name: "timestamp just before upperbound",
timestamp: f.bounds.end.Add(-1 * time.Nanosecond),
expected: []window{windows[2]},
},
{
name: "timestamp within range of window 1 (100-200]",
timestamp: time.Unix(150, 0),
expected: []window{windows[1]},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := matcher(tt.timestamp)
if tt.expected == nil {
require.Nil(t, result, "timestamp %v should not match any window", tt.timestamp)
} else {
requireEqualWindows(t, tt.expected, result)
}
})
}
})
t.Run("gappedMatcher", func(t *testing.T) {
opts := rangeAggregationOptions{
startTs: time.Unix(100, 0),
endTs: time.Unix(300, 0),
rangeInterval: 80 * time.Second,
step: 100 * time.Second, // step > rangeInterval
}
// Create windows that align with lower/upper bounds and step
windows := []window{
{start: time.Unix(20, 0), end: time.Unix(100, 0)},
{start: time.Unix(120, 0), end: time.Unix(200, 0)},
{start: time.Unix(220, 0), end: time.Unix(300, 0)},
}
f := newMatcherFactoryFromOpts(opts)
matcher := f.createGappedMatcher(windows)
tests := []struct {
name string
timestamp time.Time
expected []window
}{
{
name: "timestamp exactly at lowerbound (exclusive boundary)",
timestamp: f.bounds.start,
expected: nil, // should return nil as lowerbound is exclusive
},
{
name: "timestamp greater than upperbound",
timestamp: f.bounds.end.Add(1 * time.Nanosecond),
expected: nil, // should return nil as lowerbound is exclusive
},
{
name: "timestamp in gap",
timestamp: time.Unix(110, 0),
expected: nil, // should return nil as the timestamp is in the "gap" between windows[0].end and windows[1].start
},
{
name: "timestamp in gap at exactly start of window 1",
timestamp: time.Unix(120, 0),
expected: nil, // lower bound is exclusive
},
{
name: "timestamp exactly at end of window 0",
timestamp: time.Unix(100, 0),
expected: []window{windows[0]}, // should return window as upperbound is inclusive
},
{
name: "timestamp exactly at end of window 1",
timestamp: time.Unix(200, 0),
expected: []window{windows[1]}, // should return window as upperbound is inclusive
},
{
name: "timestamp exactly at end of window 2",
timestamp: time.Unix(300, 0),
expected: []window{windows[2]}, // should return window as upperbound is inclusive
},
{
name: "timestamp just before upperbound",
timestamp: f.bounds.end.Add(-1 * time.Nanosecond),
expected: []window{windows[2]},
},
{
name: "timestamp within range of window 1 (100-200]",
timestamp: time.Unix(150, 0),
expected: []window{windows[1]},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := matcher(tt.timestamp)
if tt.expected == nil {
require.Nil(t, result, "timestamp %v should not match any window", tt.timestamp)
} else {
requireEqualWindows(t, tt.expected, result)
}
})
}
})
t.Run("overlappingMatcher", func(t *testing.T) {
opts := rangeAggregationOptions{
startTs: time.Unix(100, 0),
endTs: time.Unix(300, 0),
rangeInterval: 120 * time.Second,
step: 100 * time.Second, // step < rangeInterval
}
// Create windows that align with lower/upper bounds and step
windows := []window{
{start: time.Unix(-20, 0), end: time.Unix(100, 0)},
{start: time.Unix(80, 0), end: time.Unix(200, 0)},
{start: time.Unix(180, 0), end: time.Unix(300, 0)},
}
f := newMatcherFactoryFromOpts(opts)
matcher := f.createOverlappingMatcher(windows)
tests := []struct {
name string
timestamp time.Time
expected []window
}{
{
name: "timestamp exactly at lowerbound (exclusive boundary)",
timestamp: f.bounds.start,
expected: nil, // should return nil as lowerbound is exclusive
},
{
name: "timestamp exactly at upperbound (inclusive boundary)",
timestamp: f.bounds.end,
expected: []window{windows[2]}, // should return window as upperbound is inclusive
},
{
name: "timestamp exactly at start of window 1",
timestamp: time.Unix(80, 0),
expected: []window{windows[0]},
},
{
name: "timestamp exactly at overlap of window 0 and window 1",
timestamp: time.Unix(90, 0),
expected: []window{windows[0], windows[1]},
},
{
name: "timestamp exactly at end of window 0",
timestamp: time.Unix(100, 0),
expected: []window{windows[0], windows[1]},
},
{
name: "timestamp just before upperbound",
timestamp: f.bounds.end.Add(-1 * time.Nanosecond),
expected: []window{windows[2]},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := matcher(tt.timestamp)
if tt.expected == nil {
require.Nil(t, result, "timestamp %v should not match any window", tt.timestamp)
} else {
requireEqualWindows(t, tt.expected, result)
}
})
}
})
}
// requireEqualWindows asserts that two slices of window structs contain the same elements.
func requireEqualWindows(t *testing.T, expected, actual []window) {
t.Helper()
slices.SortStableFunc(expected, func(a, b window) int { return a.end.Compare(b.end) })
slices.SortStableFunc(actual, func(a, b window) int { return a.end.Compare(b.end) })
require.Equal(t, len(expected), len(actual), "window slices should have the same length")
for i := 0; i < len(expected); i++ {
require.Equal(t, expected[i].start.UnixNano(), actual[i].start.UnixNano(),
"window[%d] start time mismatch: expected %s, actual %s", i, expected[i].start, actual[i].start)
require.Equal(t, expected[i].end.UnixNano(), actual[i].end.UnixNano(),
"window[%d] end time mismatch: expected %s, actual %s", i, expected[i].end, actual[i].end)
}
}

Loading…
Cancel
Save