chore: Remove unnecessary errors from splitter. (#16109)

**What this PR does / why we need it**:
Originally the splitter middleware had to parse the query and thus could return an error. However, parsing is done earlier now so no error is returned.
pull/16130/head
Karsten Jeschkies 1 year ago committed by GitHub
parent 9c4081026f
commit 8e37d5f817
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 9
      pkg/querier/queryrange/limits.go
  2. 6
      pkg/querier/queryrange/querysharding.go
  3. 14
      pkg/querier/queryrange/split_by_interval.go
  4. 20
      pkg/querier/queryrange/split_by_interval_test.go
  5. 34
      pkg/querier/queryrange/splitters.go

@ -48,9 +48,7 @@ const (
limErrQuerierTooManyBytesShardableTmpl = "shard query is too large to execute on a single querier: (query: %s, limit: %s); consider adding more specific stream selectors or reduce the time range of the query"
)
var (
ErrMaxQueryParalellism = fmt.Errorf("querying is disabled, please contact your Loki operator")
)
var ErrMaxQueryParalellism = fmt.Errorf("querying is disabled, please contact your Loki operator")
type Limits queryrange_limits.Limits
@ -480,9 +478,7 @@ func (s *SemaphoreWithTiming) Acquire(ctx context.Context, n int64) (time.Durati
}
func (rt limitedRoundTripper) Do(c context.Context, request queryrangebase.Request) (queryrangebase.Response, error) {
var (
ctx, cancel = context.WithCancel(c)
)
ctx, cancel := context.WithCancel(c)
defer func() {
cancel()
}()
@ -523,7 +519,6 @@ func (rt limitedRoundTripper) Do(c context.Context, request queryrangebase.Reque
// Note: It is the responsibility of the caller to run
// the handler in parallel.
elapsed, err := semWithTiming.Acquire(ctx, int64(1))
if err != nil {
return nil, fmt.Errorf("could not acquire work: %w", err)
}

@ -162,11 +162,7 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrangebase.Request) (que
return nil, err
}
maxRVDuration, maxOffset, err := maxRangeVectorAndOffsetDuration(params.GetExpression())
if err != nil {
level.Warn(spLogger).Log("err", err.Error(), "msg", "failed to get range-vector and offset duration so skipped AST mapper for request")
return ast.next.Do(ctx, r)
}
maxRVDuration, maxOffset := maxRangeVectorAndOffsetDuration(params.GetExpression())
conf, err := ast.confs.GetConf(int64(model.Time(r.GetStart().UnixMilli()).Add(-maxRVDuration).Add(-maxOffset)), int64(model.Time(r.GetEnd().UnixMilli()).Add(-maxOffset)))
// cannot shard with this timerange

@ -195,10 +195,7 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrangebase.Request) (que
return h.next.Do(ctx, r)
}
intervals, err := h.splitter.split(time.Now().UTC(), tenantIDs, r, interval)
if err != nil {
return nil, err
}
intervals := h.splitter.split(time.Now().UTC(), tenantIDs, r, interval)
h.metrics.splits.Observe(float64(len(intervals)))
@ -260,13 +257,14 @@ func maxRangeVectorAndOffsetDurationFromQueryString(q string) (time.Duration, ti
if err != nil {
return 0, 0, err
}
return maxRangeVectorAndOffsetDuration(parsed)
dur, offset := maxRangeVectorAndOffsetDuration(parsed)
return dur, offset, nil
}
// maxRangeVectorAndOffsetDuration returns the maximum range vector and offset duration within a LogQL query.
func maxRangeVectorAndOffsetDuration(expr syntax.Expr) (time.Duration, time.Duration, error) {
func maxRangeVectorAndOffsetDuration(expr syntax.Expr) (time.Duration, time.Duration) {
if _, ok := expr.(syntax.SampleExpr); !ok {
return 0, 0, nil
return 0, 0
}
var maxRVDuration, maxOffset time.Duration
@ -280,5 +278,5 @@ func maxRangeVectorAndOffsetDuration(expr syntax.Expr) (time.Duration, time.Dura
}
}
})
return maxRVDuration, maxOffset, nil
return maxRVDuration, maxOffset
}

@ -500,8 +500,7 @@ func Test_splitQuery(t *testing.T) {
intervals.splitter = newDefaultSplitter(fakeLimits{}, nil)
}
splits, err := intervals.splitter.split(refTime, []string{tenantID}, req, intervals.splitInterval)
require.NoError(t, err)
splits := intervals.splitter.split(refTime, []string{tenantID}, req, intervals.splitInterval)
assertSplits(t, want, splits)
})
}
@ -738,8 +737,7 @@ func Test_splitRecentMetadataQuery(t *testing.T) {
intervals.splitter = newDefaultSplitter(fakeLimits{}, nil)
}
splits, err := intervals.splitter.split(refTime, []string{tenantID}, req, intervals.splitInterval)
require.NoError(t, err)
splits := intervals.splitter.split(refTime, []string{tenantID}, req, intervals.splitInterval)
assertSplits(t, want, splits)
})
}
@ -1136,7 +1134,7 @@ func Test_splitMetricQuery(t *testing.T) {
},
expected: []queryrangebase.Request{
&LokiRequest{
StartTs: time.Date(2023, 1, 15, 7, 05, 30, 0, time.UTC), // start time is aligned down to step of 15s
StartTs: time.Date(2023, 1, 15, 7, 0o5, 30, 0, time.UTC), // start time is aligned down to step of 15s
EndTs: time.Date(2023, 1, 15, 7, 29, 45, 0, time.UTC),
Step: 15 * seconds,
Query: shortRange,
@ -1349,8 +1347,7 @@ func Test_splitMetricQuery(t *testing.T) {
ms = tc.splitter.(*metricQuerySplitter)
}
splits, err := ms.split(refTime, []string{tenantID}, tc.input, tc.splitInterval)
require.NoError(t, err)
splits := ms.split(refTime, []string{tenantID}, tc.input, tc.splitInterval)
if !assert.Equal(t, tc.expected, splits) {
t.Logf("expected and actual do not match\n")
defer t.Fail()
@ -1660,7 +1657,8 @@ func Test_seriesvolume_splitByInterval_Do(t *testing.T) {
{Name: `{foo="bar"}`, Volume: 38},
{Name: `{bar="baz"}`, Volume: 28},
},
Limit: 2},
Limit: 2,
},
Headers: nil,
}, nil
})
@ -1700,7 +1698,8 @@ func Test_seriesvolume_splitByInterval_Do(t *testing.T) {
{Name: `{foo="bar"}`, Volume: 38},
{Name: `{fizz="buzz"}`, Volume: 28},
},
Limit: 1},
Limit: 1,
},
Headers: nil,
}, nil
})
@ -1740,7 +1739,8 @@ func Test_seriesvolume_splitByInterval_Do(t *testing.T) {
{Name: `{foo="bar"}`, Volume: 38},
{Name: `{bar="baz"}`, Volume: 28},
},
Limit: 2},
Limit: 2,
},
Headers: nil,
}, nil
})

@ -16,7 +16,7 @@ import (
)
type splitter interface {
split(execTime time.Time, tenantIDs []string, request queryrangebase.Request, interval time.Duration) ([]queryrangebase.Request, error)
split(execTime time.Time, tenantIDs []string, request queryrangebase.Request, interval time.Duration) []queryrangebase.Request
}
type defaultSplitter struct {
@ -28,7 +28,7 @@ func newDefaultSplitter(limits Limits, iqo util.IngesterQueryOptions) *defaultSp
return &defaultSplitter{limits, iqo}
}
func (s *defaultSplitter) split(execTime time.Time, tenantIDs []string, req queryrangebase.Request, interval time.Duration) ([]queryrangebase.Request, error) {
func (s *defaultSplitter) split(execTime time.Time, tenantIDs []string, req queryrangebase.Request, interval time.Duration) []queryrangebase.Request {
var (
reqs []queryrangebase.Request
factory func(start, end time.Time)
@ -126,7 +126,7 @@ func (s *defaultSplitter) split(execTime time.Time, tenantIDs []string, req quer
}
default:
level.Warn(util_log.Logger).Log("msg", fmt.Sprintf("splitter: unsupported request type: %T", req))
return nil, nil
return nil
}
var (
@ -175,7 +175,7 @@ func (s *defaultSplitter) split(execTime time.Time, tenantIDs []string, req quer
// query only overlaps ingester query window or recent metadata query window, nothing more to do
if start.After(end) || start.Equal(end) {
return reqs, nil
return reqs
}
// copy the splits, reset the results
@ -191,7 +191,7 @@ func (s *defaultSplitter) split(execTime time.Time, tenantIDs []string, req quer
// move the ingester or recent metadata splits to the end to maintain correct order
reqs = append(reqs, splitsBeforeRebound...)
return reqs, nil
return reqs
}
type metricQuerySplitter struct {
@ -205,15 +205,12 @@ func newMetricQuerySplitter(limits Limits, iqo util.IngesterQueryOptions) *metri
// reduceSplitIntervalForRangeVector reduces the split interval for a range query based on the duration of the range vector.
// Large range vector durations will not be split into smaller intervals because it can cause the queries to be slow by over-processing data.
func (s *metricQuerySplitter) reduceSplitIntervalForRangeVector(r *LokiRequest, interval time.Duration) (time.Duration, error) {
maxRange, _, err := maxRangeVectorAndOffsetDuration(r.Plan.AST)
if err != nil {
return 0, err
}
func (s *metricQuerySplitter) reduceSplitIntervalForRangeVector(r *LokiRequest, interval time.Duration) time.Duration {
maxRange, _ := maxRangeVectorAndOffsetDuration(r.Plan.AST)
if maxRange > interval {
return maxRange, nil
return maxRange
}
return interval, nil
return interval
}
// Round up to the step before the next interval boundary.
@ -229,15 +226,12 @@ func (s *metricQuerySplitter) nextIntervalBoundary(t time.Time, step int64, inte
return time.Unix(0, target)
}
func (s *metricQuerySplitter) split(execTime time.Time, tenantIDs []string, r queryrangebase.Request, interval time.Duration) ([]queryrangebase.Request, error) {
func (s *metricQuerySplitter) split(execTime time.Time, tenantIDs []string, r queryrangebase.Request, interval time.Duration) []queryrangebase.Request {
var reqs []queryrangebase.Request
lokiReq := r.(*LokiRequest)
interval, err := s.reduceSplitIntervalForRangeVector(lokiReq, interval)
if err != nil {
return nil, err
}
interval = s.reduceSplitIntervalForRangeVector(lokiReq, interval)
start, end := s.alignStartEnd(r.GetStep(), lokiReq.StartTs, lokiReq.EndTs)
@ -262,7 +256,7 @@ func (s *metricQuerySplitter) split(execTime time.Time, tenantIDs []string, r qu
if lokiReq.Step >= interval.Milliseconds() {
util.ForInterval(time.Duration(lokiReq.Step*1e6), lokiReq.StartTs, lokiReq.EndTs, false, factory)
return reqs, nil
return reqs
}
var (
@ -291,7 +285,7 @@ func (s *metricQuerySplitter) split(execTime time.Time, tenantIDs []string, r qu
// query only overlaps ingester query window, nothing more to do
if start.After(end) || start.Equal(end) {
return reqs, nil
return reqs
}
// copy the splits, reset the results
@ -308,7 +302,7 @@ func (s *metricQuerySplitter) split(execTime time.Time, tenantIDs []string, r qu
// move the ingester splits to the end to maintain correct order
reqs = append(reqs, ingesterSplits...)
return reqs, nil
return reqs
}
func (s *metricQuerySplitter) alignStartEnd(step int64, start, end time.Time) (time.Time, time.Time) {

Loading…
Cancel
Save