Querier: prevent unnecessary calls to ingesters (#5984)

* Restrict gRPC calls to ingesters when requesting series if there is no overlap within the query_ingesters_within value

* Add tests for isWithinIngesterMaxLookbackPeriod

* Adding tests for calculateIngesterMaxLookbackPeriod

* Only send query for range overlapping with ingesters

* Restrict gRPC calls to ingesters when requesting series if there is no overlap within the query_ingesters_within value

* Add tests for isWithinIngesterMaxLookbackPeriod

* Adding tests for calculateIngesterMaxLookbackPeriod

* Only send query for range overlapping with ingesters

* Add test for when to query ingester and storage

* Fix deadlock in Series when just queriying the ingesters

* Refactored slightly for better readability

* Added CHANGELOG entry

* Fix typo

* Rephrase test cases names

* Move block to outer loop

Co-authored-by: Salva Corts <salva.corts@grafana.com>
pull/5719/head^2
Danny Kopping 4 years ago committed by GitHub
parent 337659602e
commit 3fa6cc9fde
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 77
      pkg/querier/querier.go
  3. 322
      pkg/querier/querier_test.go

@ -1,4 +1,5 @@
## Main
* [5984](https://github.com/grafana/loki/pull/5984) **dannykopping** and **salvacorts**: Querier: prevent unnecessary calls to ingesters.
* [5899](https://github.com/grafana/loki/pull/5899) **simonswine**: Update go image to 1.17.9.
* [5888](https://github.com/grafana/loki/pull/5888) **Papawy** Fix common config net interface name overwritten by ring common config
* [5799](https://github.com/grafana/loki/pull/5799) **cyriltovena** Fix deduping issues when multiple entries with the same timestamp exist.

@ -237,19 +237,42 @@ func (q *SingleTenantQuerier) deletesForUser(ctx context.Context, startT, endT t
return deletes, nil
}
func (q *SingleTenantQuerier) isWithinIngesterMaxLookbackPeriod(maxLookback time.Duration, queryEnd time.Time) bool {
// if no lookback limits are configured, always consider this within the range of the lookback period
if maxLookback <= 0 {
return true
}
// find the first instance that we would want to query the ingester from...
ingesterOldestStartTime := time.Now().Add(-maxLookback)
// ...and if the query range ends before that, don't query the ingester
return queryEnd.After(ingesterOldestStartTime)
}
func (q *SingleTenantQuerier) calculateIngesterMaxLookbackPeriod() time.Duration {
mlb := time.Duration(-1)
if q.cfg.IngesterQueryStoreMaxLookback != 0 {
// IngesterQueryStoreMaxLookback takes the precedence over QueryIngestersWithin while also limiting the store query range.
mlb = q.cfg.IngesterQueryStoreMaxLookback
} else if q.cfg.QueryIngestersWithin != 0 {
mlb = q.cfg.QueryIngestersWithin
}
return mlb
}
func (q *SingleTenantQuerier) buildQueryIntervals(queryStart, queryEnd time.Time) (*interval, *interval) {
// limitQueryInterval is a flag for whether store queries should be limited to start time of ingester queries.
limitQueryInterval := false
// ingesterMLB having -1 means query ingester for whole duration.
ingesterMLB := time.Duration(-1)
if q.cfg.IngesterQueryStoreMaxLookback != 0 {
// IngesterQueryStoreMaxLookback takes the precedence over QueryIngestersWithin while also limiting the store query range.
limitQueryInterval = true
ingesterMLB = q.cfg.IngesterQueryStoreMaxLookback
} else if q.cfg.QueryIngestersWithin != 0 {
ingesterMLB = q.cfg.QueryIngestersWithin
}
ingesterMLB := q.calculateIngesterMaxLookbackPeriod()
// query ingester for whole duration.
if ingesterMLB == -1 {
i := &interval{
@ -266,15 +289,18 @@ func (q *SingleTenantQuerier) buildQueryIntervals(queryStart, queryEnd time.Time
return i, i
}
ingesterQueryWithinRange := q.isWithinIngesterMaxLookbackPeriod(ingesterMLB, queryEnd)
// see if there is an overlap between ingester query interval and actual query interval, if not just do the store query.
ingesterOldestStartTime := time.Now().Add(-ingesterMLB)
if queryEnd.Before(ingesterOldestStartTime) {
if !ingesterQueryWithinRange {
return nil, &interval{
start: queryStart,
end: queryEnd,
}
}
ingesterOldestStartTime := time.Now().Add(-ingesterMLB)
// if there is an overlap and we are not limiting the query interval then do both store and ingester query for whole query interval.
if !limitQueryInterval {
i := &interval{
@ -327,17 +353,25 @@ func (q *SingleTenantQuerier) Label(ctx context.Context, req *logproto.LabelRequ
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(q.cfg.QueryTimeout))
defer cancel()
ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(*req.Start, *req.End)
var ingesterValues [][]string
if !q.cfg.QueryStoreOnly {
ingesterValues, err = q.ingesterQuerier.Label(ctx, req)
if !q.cfg.QueryStoreOnly && ingesterQueryInterval != nil {
timeFramedReq := *req
timeFramedReq.Start = &ingesterQueryInterval.start
timeFramedReq.End = &ingesterQueryInterval.end
ingesterValues, err = q.ingesterQuerier.Label(ctx, &timeFramedReq)
if err != nil {
return nil, err
}
}
var storeValues []string
if !q.cfg.QueryIngesterOnly {
from, through := model.TimeFromUnixNano(req.Start.UnixNano()), model.TimeFromUnixNano(req.End.UnixNano())
if !q.cfg.QueryIngesterOnly && storeQueryInterval != nil {
from := model.TimeFromUnixNano(storeQueryInterval.start.UnixNano())
through := model.TimeFromUnixNano(storeQueryInterval.end.UnixNano())
if req.Values {
storeValues, err = q.store.LabelValuesForMetricName(ctx, userID, from, through, "logs", req.Name)
if err != nil {
@ -446,13 +480,17 @@ func (q *SingleTenantQuerier) awaitSeries(ctx context.Context, req *logproto.Ser
series := make(chan [][]logproto.SeriesIdentifier, 2)
errs := make(chan error, 2)
ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(req.Start, req.End)
// fetch series from ingesters and store concurrently
if q.cfg.QueryStoreOnly {
series <- [][]logproto.SeriesIdentifier{}
} else {
if !q.cfg.QueryStoreOnly && ingesterQueryInterval != nil {
timeFramedReq := *req
timeFramedReq.Start = ingesterQueryInterval.start
timeFramedReq.End = ingesterQueryInterval.end
go func() {
// fetch series identifiers from ingesters
resps, err := q.ingesterQuerier.Series(ctx, req)
resps, err := q.ingesterQuerier.Series(ctx, &timeFramedReq)
if err != nil {
errs <- err
return
@ -460,17 +498,24 @@ func (q *SingleTenantQuerier) awaitSeries(ctx context.Context, req *logproto.Ser
series <- resps
}()
} else {
// If only queriying the store or the query range does not overlap with the ingester max lookback period (defined by `query_ingesters_within`)
// then don't call out to the ingesters, and send an empty result back to the channel
series <- [][]logproto.SeriesIdentifier{}
}
if !q.cfg.QueryIngesterOnly {
if !q.cfg.QueryIngesterOnly && storeQueryInterval != nil {
go func() {
storeValues, err := q.seriesForMatchers(ctx, req.Start, req.End, req.GetGroups(), req.Shards)
storeValues, err := q.seriesForMatchers(ctx, storeQueryInterval.start, storeQueryInterval.end, req.GetGroups(), req.Shards)
if err != nil {
errs <- err
return
}
series <- [][]logproto.SeriesIdentifier{storeValues}
}()
} else {
// If we are not querying the store, send an empty result back to the channel
series <- [][]logproto.SeriesIdentifier{}
}
var sets [][]logproto.SeriesIdentifier

@ -686,6 +686,328 @@ func TestQuerier_buildQueryIntervals(t *testing.T) {
}
}
func TestQuerier_calculateIngesterMaxLookbackPeriod(t *testing.T) {
for _, tc := range []struct {
name string
ingesterQueryStoreMaxLookback time.Duration
queryIngestersWithin time.Duration
expected time.Duration
}{
{
name: "defaults are set; infinite lookback period if no values are set",
expected: -1,
},
{
name: "only setting ingesterQueryStoreMaxLookback",
ingesterQueryStoreMaxLookback: time.Hour,
expected: time.Hour,
},
{
name: "setting both ingesterQueryStoreMaxLookback and queryIngestersWithin; ingesterQueryStoreMaxLookback takes precedence",
ingesterQueryStoreMaxLookback: time.Hour,
queryIngestersWithin: time.Minute,
expected: time.Hour,
},
{
name: "only setting queryIngestersWithin",
queryIngestersWithin: time.Minute,
expected: time.Minute,
},
} {
t.Run(tc.name, func(t *testing.T) {
querier := SingleTenantQuerier{cfg: Config{
IngesterQueryStoreMaxLookback: tc.ingesterQueryStoreMaxLookback,
QueryIngestersWithin: tc.queryIngestersWithin,
}}
assert.Equal(t, tc.expected, querier.calculateIngesterMaxLookbackPeriod())
})
}
}
func TestQuerier_isWithinIngesterMaxLookbackPeriod(t *testing.T) {
overlappingQuery := interval{
start: time.Now().Add(-6 * time.Hour),
end: time.Now(),
}
nonOverlappingQuery := interval{
start: time.Now().Add(-24 * time.Hour),
end: time.Now().Add(-12 * time.Hour),
}
for _, tc := range []struct {
name string
ingesterQueryStoreMaxLookback time.Duration
queryIngestersWithin time.Duration
overlappingWithinRange bool
nonOverlappingWithinRange bool
}{
{
name: "default values, query ingesters and store for whole duration",
overlappingWithinRange: true,
nonOverlappingWithinRange: true,
},
{
name: "ingesterQueryStoreMaxLookback set to 1h",
ingesterQueryStoreMaxLookback: time.Hour,
overlappingWithinRange: true,
nonOverlappingWithinRange: false,
},
{
name: "ingesterQueryStoreMaxLookback set to 10h",
ingesterQueryStoreMaxLookback: 10 * time.Hour,
overlappingWithinRange: true,
nonOverlappingWithinRange: false,
},
{
name: "ingesterQueryStoreMaxLookback set to 1h and queryIngestersWithin set to 16h, ingesterQueryStoreMaxLookback takes precedence",
ingesterQueryStoreMaxLookback: time.Hour,
queryIngestersWithin: 16 * time.Hour, // if used, this would put the nonOverlapping query in range
overlappingWithinRange: true,
nonOverlappingWithinRange: false,
},
{
name: "ingesterQueryStoreMaxLookback set to -1, query just ingesters",
ingesterQueryStoreMaxLookback: -1,
overlappingWithinRange: true,
nonOverlappingWithinRange: true,
},
{
name: "queryIngestersWithin set to 1h",
queryIngestersWithin: time.Hour,
overlappingWithinRange: true,
nonOverlappingWithinRange: false,
},
{
name: "queryIngestersWithin set to 10h",
queryIngestersWithin: 10 * time.Hour,
overlappingWithinRange: true,
nonOverlappingWithinRange: false,
},
} {
t.Run(tc.name, func(t *testing.T) {
querier := SingleTenantQuerier{cfg: Config{
IngesterQueryStoreMaxLookback: tc.ingesterQueryStoreMaxLookback,
QueryIngestersWithin: tc.queryIngestersWithin,
}}
lookbackPeriod := querier.calculateIngesterMaxLookbackPeriod()
assert.Equal(t, tc.overlappingWithinRange, querier.isWithinIngesterMaxLookbackPeriod(lookbackPeriod, overlappingQuery.end))
assert.Equal(t, tc.nonOverlappingWithinRange, querier.isWithinIngesterMaxLookbackPeriod(lookbackPeriod, nonOverlappingQuery.end))
})
}
}
func TestQuerier_RequestingIngesters(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "test")
requestMapping := map[string]struct {
ingesterMethod string
storeMethod string
}{
"SelectLogs": {
ingesterMethod: "Query",
storeMethod: "SelectLogs",
},
"SelectSamples": {
ingesterMethod: "QuerySample",
storeMethod: "SelectSamples",
},
"LabelValuesForMetricName": {
ingesterMethod: "Label",
storeMethod: "LabelValuesForMetricName",
},
"LabelNamesForMetricName": {
ingesterMethod: "Label",
storeMethod: "LabelNamesForMetricName",
},
"Series": {
ingesterMethod: "Series",
storeMethod: "Series",
},
}
tests := []struct {
desc string
start, end time.Time
setIngesterQueryStoreMaxLookback bool
expectedCallsStore int
expectedCallsIngesters int
}{
{
desc: "Data in storage and ingesters",
start: time.Now().Add(-time.Hour * 2),
end: time.Now(),
expectedCallsStore: 1,
expectedCallsIngesters: 1,
},
{
desc: "Data in ingesters (IngesterQueryStoreMaxLookback not set)",
start: time.Now().Add(-time.Minute * 15),
end: time.Now(),
expectedCallsStore: 1,
expectedCallsIngesters: 1,
},
{
desc: "Data only in storage",
start: time.Now().Add(-time.Hour * 2),
end: time.Now().Add(-time.Hour * 1),
expectedCallsStore: 1,
expectedCallsIngesters: 0,
},
{
desc: "Data in ingesters (IngesterQueryStoreMaxLookback set)",
start: time.Now().Add(-time.Minute * 15),
end: time.Now(),
setIngesterQueryStoreMaxLookback: true,
expectedCallsStore: 0,
expectedCallsIngesters: 1,
},
}
requests := []struct {
name string
do func(querier *SingleTenantQuerier, start, end time.Time) error
}{
{
name: "SelectLogs",
do: func(querier *SingleTenantQuerier, start, end time.Time) error {
_, err := querier.SelectLogs(ctx, logql.SelectLogParams{
QueryRequest: &logproto.QueryRequest{
Selector: "{type=\"test\", fail=\"yes\"} |= \"foo\"",
Limit: 10,
Start: start,
End: end,
Direction: logproto.FORWARD,
},
})
return err
},
},
{
name: "SelectSamples",
do: func(querier *SingleTenantQuerier, start, end time.Time) error {
_, err := querier.SelectSamples(ctx, logql.SelectSampleParams{
SampleQueryRequest: &logproto.SampleQueryRequest{
Selector: "count_over_time({foo=\"bar\"}[5m])",
Start: start,
End: end,
},
})
return err
},
},
{
name: "LabelValuesForMetricName",
do: func(querier *SingleTenantQuerier, start, end time.Time) error {
_, err := querier.Label(ctx, &logproto.LabelRequest{
Name: "type",
Values: true,
Start: &start,
End: &end,
})
return err
},
},
{
name: "LabelNamesForMetricName",
do: func(querier *SingleTenantQuerier, start, end time.Time) error {
_, err := querier.Label(ctx, &logproto.LabelRequest{
Values: false,
Start: &start,
End: &end,
})
return err
},
},
{
name: "Series",
do: func(querier *SingleTenantQuerier, start, end time.Time) error {
_, err := querier.Series(ctx, &logproto.SeriesRequest{
Start: start,
End: end,
})
return err
},
},
}
for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) {
conf := mockQuerierConfig()
conf.QueryIngestersWithin = time.Minute * 30
if tc.setIngesterQueryStoreMaxLookback {
conf.IngesterQueryStoreMaxLookback = conf.QueryIngestersWithin
}
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
for _, request := range requests {
t.Run(request.name, func(t *testing.T) {
ingesterClient, store, querier, err := setupIngesterQuerierMocks(conf, limits)
require.NoError(t, err)
err = request.do(querier, tc.start, tc.end)
require.NoError(t, err)
callsIngesters := ingesterClient.GetMockedCallsByMethod(requestMapping[request.name].ingesterMethod)
assert.Equal(t, tc.expectedCallsIngesters, len(callsIngesters))
callsStore := store.GetMockedCallsByMethod(requestMapping[request.name].storeMethod)
assert.Equal(t, tc.expectedCallsStore, len(callsStore))
})
}
})
}
}
func setupIngesterQuerierMocks(conf Config, limits *validation.Overrides) (*querierClientMock, *storeMock, *SingleTenantQuerier, error) {
queryClient := newQueryClientMock()
queryClient.On("Recv").Return(mockQueryResponse([]logproto.Stream{mockStream(1, 1)}), nil)
querySampleClient := newQuerySampleClientMock()
querySampleClient.On("Recv").Return(mockQueryResponse([]logproto.Stream{mockStream(1, 1)}), nil)
ingesterClient := newQuerierClientMock()
ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything).Return(queryClient, nil)
ingesterClient.On("QuerySample", mock.Anything, mock.Anything, mock.Anything).Return(querySampleClient, nil)
ingesterClient.On("Label", mock.Anything, mock.Anything, mock.Anything).Return(mockLabelResponse([]string{"bar"}), nil)
ingesterClient.On("Series", mock.Anything, mock.Anything, mock.Anything).Return(&logproto.SeriesResponse{
Series: []logproto.SeriesIdentifier{
{
Labels: map[string]string{"bar": "1"},
},
},
}, nil)
store := newStoreMock()
store.On("SelectLogs", mock.Anything, mock.Anything).Return(mockStreamIterator(0, 1), nil)
store.On("SelectSamples", mock.Anything, mock.Anything).Return(mockSampleIterator(querySampleClient), nil)
store.On("LabelValuesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{"1", "2", "3"}, nil)
store.On("LabelNamesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{"foo"}, nil)
store.On("Series", mock.Anything, mock.Anything).Return([]logproto.SeriesIdentifier{
{Labels: map[string]string{"foo": "1"}},
}, nil)
querier, err := newQuerier(
conf,
mockIngesterClientConfig(),
newIngesterClientMockFactory(ingesterClient),
mockReadRingWithOneActiveIngester(),
&mockDeleteGettter{},
store, limits)
if err != nil {
return nil, nil, nil, err
}
return ingesterClient, store, querier, nil
}
type fakeTimeLimits struct {
maxQueryLookback time.Duration
maxQueryLength time.Duration

Loading…
Cancel
Save