From 5a570cb921545aa5aaff7532b2cad8799f151c33 Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Tue, 11 Jul 2023 10:18:46 -0600 Subject: [PATCH] Query ingesters for SeriesVolume in single binary (#9888) This change is necessary to get recent SeriesVolume data from the ingesters when running without an `AsyncStore` in single binary. It changes the queriers to query both the ingester and the store, rather than relying on the `AsyncStore` to query the ingesters. --- pkg/querier/querier.go | 58 ++++++++++++++++++--- pkg/querier/querier_test.go | 100 +++++++++++++++++++++++++++++++++--- 2 files changed, 143 insertions(+), 15 deletions(-) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 8087d3b7f7..cbbaf5294e 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -802,12 +802,54 @@ func (q *SingleTenantQuerier) SeriesVolume(ctx context.Context, req *logproto.Vo "limit", req.Limit, ) - return q.store.SeriesVolume( - ctx, - userID, - req.From, - req.Through, - req.Limit, - matchers..., - ) + ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(req.From.Time(), req.Through.Time()) + + queryIngesters := !q.cfg.QueryStoreOnly && ingesterQueryInterval != nil + queryStore := !q.cfg.QueryIngesterOnly && storeQueryInterval != nil + + numResponses := 0 + if queryIngesters { + numResponses++ + } + if queryStore { + numResponses++ + } + responses := make([]*logproto.VolumeResponse, 0, numResponses) + + if queryIngesters { + // Make a copy of the request before modifying + // because the initial request is used below to query stores + + resp, err := q.ingesterQuerier.SeriesVolume( + ctx, + userID, + model.TimeFromUnix(ingesterQueryInterval.start.Unix()), + model.TimeFromUnix(ingesterQueryInterval.end.Unix()), + req.Limit, + matchers..., + ) + if err != nil { + return nil, err + } + + responses = append(responses, resp) + } + + if queryStore { + resp, err := q.store.SeriesVolume( + ctx, + userID, + model.TimeFromUnix(storeQueryInterval.start.Unix()), + model.TimeFromUnix(storeQueryInterval.end.Unix()), + req.Limit, + matchers..., + ) + if err != nil { + return nil, err + } + + responses = append(responses, resp) + } + + return seriesvolume.Merge(responses, req.Limit), nil } diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 9488c03858..2bb842e8df 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -966,8 +966,8 @@ func TestQuerier_RequestingIngesters(t *testing.T) { } } -func TestQuerier_LabeleVolumes(t *testing.T) { - t.Run("it returns label volumes from the store", func(t *testing.T) { +func TestQuerier_SeriesVolumes(t *testing.T) { + t.Run("it returns series volumes from the store", func(t *testing.T) { ret := &logproto.VolumeResponse{Volumes: []logproto.Volume{ {Name: "foo", Volume: 38}, }} @@ -975,19 +975,105 @@ func TestQuerier_LabeleVolumes(t *testing.T) { limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) + ingesterClient := newQuerierClientMock() store := newStoreMock() store.On("SeriesVolume", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(ret, nil) - querier := SingleTenantQuerier{ - store: store, - limits: limits, - } - req := &logproto.VolumeRequest{From: 0, Through: 1000, Matchers: `{}`} + conf := mockQuerierConfig() + conf.QueryIngestersWithin = time.Minute * 30 + conf.IngesterQueryStoreMaxLookback = conf.QueryIngestersWithin + + querier, err := newQuerier( + conf, + mockIngesterClientConfig(), + newIngesterClientMockFactory(ingesterClient), + mockReadRingWithOneActiveIngester(), + &mockDeleteGettter{}, + store, limits) + require.NoError(t, err) + + now := time.Now() + from := model.TimeFromUnix(now.Add(-1 * time.Hour).Unix()) + through := model.TimeFromUnix(now.Add(-35 * time.Minute).Unix()) + req := &logproto.VolumeRequest{From: from, Through: through, Matchers: `{}`, Limit: 10} ctx := user.InjectOrgID(context.Background(), "test") resp, err := querier.SeriesVolume(ctx, req) require.NoError(t, err) require.Equal(t, []logproto.Volume{{Name: "foo", Volume: 38}}, resp.Volumes) }) + + t.Run("it returns series volumes from the ingester", func(t *testing.T) { + ret := &logproto.VolumeResponse{Volumes: []logproto.Volume{ + {Name: "foo", Volume: 38}, + }} + + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + + ingesterClient := newQuerierClientMock() + ingesterClient.On("GetSeriesVolume", mock.Anything, mock.Anything, mock.Anything).Return(ret, nil) + + store := newStoreMock() + + conf := mockQuerierConfig() + conf.QueryIngestersWithin = time.Minute * 30 + conf.IngesterQueryStoreMaxLookback = conf.QueryIngestersWithin + + querier, err := newQuerier( + conf, + mockIngesterClientConfig(), + newIngesterClientMockFactory(ingesterClient), + mockReadRingWithOneActiveIngester(), + &mockDeleteGettter{}, + store, limits) + require.NoError(t, err) + + now := time.Now() + from := model.TimeFromUnix(now.Add(-15 * time.Minute).Unix()) + through := model.TimeFromUnix(now.Unix()) + req := &logproto.VolumeRequest{From: from, Through: through, Matchers: `{}`, Limit: 10} + ctx := user.InjectOrgID(context.Background(), "test") + resp, err := querier.SeriesVolume(ctx, req) + require.NoError(t, err) + require.Equal(t, []logproto.Volume{{Name: "foo", Volume: 38}}, resp.Volumes) + }) + + t.Run("it merges series volumes from the store and ingester", func(t *testing.T) { + ret := &logproto.VolumeResponse{Volumes: []logproto.Volume{ + {Name: "foo", Volume: 38}, + }} + + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + + ingesterClient := newQuerierClientMock() + ingesterClient.On("GetSeriesVolume", mock.Anything, mock.Anything, mock.Anything).Return(ret, nil) + + store := newStoreMock() + store.On("SeriesVolume", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(ret, nil) + + conf := mockQuerierConfig() + conf.QueryIngestersWithin = time.Minute * 30 + conf.IngesterQueryStoreMaxLookback = conf.QueryIngestersWithin + + querier, err := newQuerier( + conf, + mockIngesterClientConfig(), + newIngesterClientMockFactory(ingesterClient), + mockReadRingWithOneActiveIngester(), + &mockDeleteGettter{}, + store, limits) + require.NoError(t, err) + + now := time.Now() + from := model.TimeFromUnix(now.Add(-time.Hour).Unix()) + through := model.TimeFromUnix(now.Unix()) + req := &logproto.VolumeRequest{From: from, Through: through, Matchers: `{}`, Limit: 10} + ctx := user.InjectOrgID(context.Background(), "test") + resp, err := querier.SeriesVolume(ctx, req) + require.NoError(t, err) + require.Equal(t, []logproto.Volume{{Name: "foo", Volume: 76}}, resp.Volumes) + }) } func setupIngesterQuerierMocks(conf Config, limits *validation.Overrides) (*querierClientMock, *storeMock, *SingleTenantQuerier, error) {