Query ingesters for SeriesVolume in single binary (#9888)

This change is necessary to get recent SeriesVolume data from the
ingesters when running without an `AsyncStore` in single binary.
It changes the queriers to query both the ingester and the store,
rather than relying on the `AsyncStore` to query the ingesters.
pull/9878/head
Trevor Whitney 2 years ago committed by GitHub
parent cb2b3ca9ea
commit 5a570cb921
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 58
      pkg/querier/querier.go
  2. 100
      pkg/querier/querier_test.go

@ -802,12 +802,54 @@ func (q *SingleTenantQuerier) SeriesVolume(ctx context.Context, req *logproto.Vo
"limit", req.Limit,
)
return q.store.SeriesVolume(
ctx,
userID,
req.From,
req.Through,
req.Limit,
matchers...,
)
ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(req.From.Time(), req.Through.Time())
queryIngesters := !q.cfg.QueryStoreOnly && ingesterQueryInterval != nil
queryStore := !q.cfg.QueryIngesterOnly && storeQueryInterval != nil
numResponses := 0
if queryIngesters {
numResponses++
}
if queryStore {
numResponses++
}
responses := make([]*logproto.VolumeResponse, 0, numResponses)
if queryIngesters {
// Make a copy of the request before modifying
// because the initial request is used below to query stores
resp, err := q.ingesterQuerier.SeriesVolume(
ctx,
userID,
model.TimeFromUnix(ingesterQueryInterval.start.Unix()),
model.TimeFromUnix(ingesterQueryInterval.end.Unix()),
req.Limit,
matchers...,
)
if err != nil {
return nil, err
}
responses = append(responses, resp)
}
if queryStore {
resp, err := q.store.SeriesVolume(
ctx,
userID,
model.TimeFromUnix(storeQueryInterval.start.Unix()),
model.TimeFromUnix(storeQueryInterval.end.Unix()),
req.Limit,
matchers...,
)
if err != nil {
return nil, err
}
responses = append(responses, resp)
}
return seriesvolume.Merge(responses, req.Limit), nil
}

@ -966,8 +966,8 @@ func TestQuerier_RequestingIngesters(t *testing.T) {
}
}
func TestQuerier_LabeleVolumes(t *testing.T) {
t.Run("it returns label volumes from the store", func(t *testing.T) {
func TestQuerier_SeriesVolumes(t *testing.T) {
t.Run("it returns series volumes from the store", func(t *testing.T) {
ret := &logproto.VolumeResponse{Volumes: []logproto.Volume{
{Name: "foo", Volume: 38},
}}
@ -975,19 +975,105 @@ func TestQuerier_LabeleVolumes(t *testing.T) {
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
ingesterClient := newQuerierClientMock()
store := newStoreMock()
store.On("SeriesVolume", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(ret, nil)
querier := SingleTenantQuerier{
store: store,
limits: limits,
}
req := &logproto.VolumeRequest{From: 0, Through: 1000, Matchers: `{}`}
conf := mockQuerierConfig()
conf.QueryIngestersWithin = time.Minute * 30
conf.IngesterQueryStoreMaxLookback = conf.QueryIngestersWithin
querier, err := newQuerier(
conf,
mockIngesterClientConfig(),
newIngesterClientMockFactory(ingesterClient),
mockReadRingWithOneActiveIngester(),
&mockDeleteGettter{},
store, limits)
require.NoError(t, err)
now := time.Now()
from := model.TimeFromUnix(now.Add(-1 * time.Hour).Unix())
through := model.TimeFromUnix(now.Add(-35 * time.Minute).Unix())
req := &logproto.VolumeRequest{From: from, Through: through, Matchers: `{}`, Limit: 10}
ctx := user.InjectOrgID(context.Background(), "test")
resp, err := querier.SeriesVolume(ctx, req)
require.NoError(t, err)
require.Equal(t, []logproto.Volume{{Name: "foo", Volume: 38}}, resp.Volumes)
})
t.Run("it returns series volumes from the ingester", func(t *testing.T) {
ret := &logproto.VolumeResponse{Volumes: []logproto.Volume{
{Name: "foo", Volume: 38},
}}
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
ingesterClient := newQuerierClientMock()
ingesterClient.On("GetSeriesVolume", mock.Anything, mock.Anything, mock.Anything).Return(ret, nil)
store := newStoreMock()
conf := mockQuerierConfig()
conf.QueryIngestersWithin = time.Minute * 30
conf.IngesterQueryStoreMaxLookback = conf.QueryIngestersWithin
querier, err := newQuerier(
conf,
mockIngesterClientConfig(),
newIngesterClientMockFactory(ingesterClient),
mockReadRingWithOneActiveIngester(),
&mockDeleteGettter{},
store, limits)
require.NoError(t, err)
now := time.Now()
from := model.TimeFromUnix(now.Add(-15 * time.Minute).Unix())
through := model.TimeFromUnix(now.Unix())
req := &logproto.VolumeRequest{From: from, Through: through, Matchers: `{}`, Limit: 10}
ctx := user.InjectOrgID(context.Background(), "test")
resp, err := querier.SeriesVolume(ctx, req)
require.NoError(t, err)
require.Equal(t, []logproto.Volume{{Name: "foo", Volume: 38}}, resp.Volumes)
})
t.Run("it merges series volumes from the store and ingester", func(t *testing.T) {
ret := &logproto.VolumeResponse{Volumes: []logproto.Volume{
{Name: "foo", Volume: 38},
}}
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
ingesterClient := newQuerierClientMock()
ingesterClient.On("GetSeriesVolume", mock.Anything, mock.Anything, mock.Anything).Return(ret, nil)
store := newStoreMock()
store.On("SeriesVolume", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(ret, nil)
conf := mockQuerierConfig()
conf.QueryIngestersWithin = time.Minute * 30
conf.IngesterQueryStoreMaxLookback = conf.QueryIngestersWithin
querier, err := newQuerier(
conf,
mockIngesterClientConfig(),
newIngesterClientMockFactory(ingesterClient),
mockReadRingWithOneActiveIngester(),
&mockDeleteGettter{},
store, limits)
require.NoError(t, err)
now := time.Now()
from := model.TimeFromUnix(now.Add(-time.Hour).Unix())
through := model.TimeFromUnix(now.Unix())
req := &logproto.VolumeRequest{From: from, Through: through, Matchers: `{}`, Limit: 10}
ctx := user.InjectOrgID(context.Background(), "test")
resp, err := querier.SeriesVolume(ctx, req)
require.NoError(t, err)
require.Equal(t, []logproto.Volume{{Name: "foo", Volume: 76}}, resp.Volumes)
})
}
func setupIngesterQuerierMocks(conf Config, limits *validation.Overrides) (*querierClientMock, *storeMock, *SingleTenantQuerier, error) {

Loading…
Cancel
Save