From 587a6d20e938f4f58e5a49563a3c267762cf89eb Mon Sep 17 00:00:00 2001 From: Shantanu Alshi Date: Tue, 23 Apr 2024 21:02:06 +0530 Subject: [PATCH] feat: Detected labels from store (#12441) --- pkg/ingester/ingester.go | 1 - pkg/ingester/ingester_test.go | 123 ++++++++++++ pkg/ingester/instance.go | 26 ++- pkg/ingester/instance_test.go | 49 +++++ pkg/logql/metrics.go | 42 +++- pkg/loki/modules.go | 3 +- pkg/querier/ingester_querier.go | 17 +- pkg/querier/ingester_querier_test.go | 81 ++++---- pkg/querier/multi_tenant_querier.go | 6 +- pkg/querier/querier.go | 94 +++++++-- pkg/querier/querier_mock_test.go | 10 + pkg/querier/querier_test.go | 290 ++++++++++++++++++++++++++- pkg/querier/queryrange/codec.go | 12 -- pkg/querier/queryrange/roundtrip.go | 43 +++- 14 files changed, 714 insertions(+), 83 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index b0d197623d..e99de0d153 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1395,7 +1395,6 @@ func (i *Ingester) GetDetectedLabels(ctx context.Context, req *logproto.Detected if err != nil { return nil, err } - level.Info(i.logger).Log("msg", matchers) } labelMap, err := instance.LabelsWithValues(ctx, *req.Start, matchers...) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 94fd5700c6..6722e6cfcc 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -784,6 +784,129 @@ func Test_InMemoryLabels(t *testing.T) { require.Equal(t, []string{"bar", "foo"}, res.Values) } +func TestIngester_GetDetectedLabels(t *testing.T) { + ctx := user.InjectOrgID(context.Background(), "test") + + ingesterConfig := defaultIngesterTestConfig(t) + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + store := &mockStore{ + chunks: map[string][]chunk.Chunk{}, + } + + i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger()) + require.NoError(t, err) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + // Push labels + req := logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: `{foo="bar",bar="baz1"}`, + }, + { + Labels: `{foo="bar",bar="baz2"}`, + }, + { + Labels: `{foo="bar1",bar="baz3"}`, + }, + { + Labels: `{foo="foo1",bar="baz1"}`, + }, + { + Labels: `{foo="foo",bar="baz1"}`, + }, + }, + } + for i := 0; i < 10; i++ { + req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{ + Timestamp: time.Unix(0, 0), + Line: fmt.Sprintf("line %d", i), + }) + req.Streams[1].Entries = append(req.Streams[1].Entries, logproto.Entry{ + Timestamp: time.Unix(0, 0), + Line: fmt.Sprintf("line %d", i), + }) + } + + _, err = i.Push(ctx, &req) + require.NoError(t, err) + + res, err := i.GetDetectedLabels(ctx, &logproto.DetectedLabelsRequest{ + Start: &[]time.Time{time.Now().Add(11 * time.Nanosecond)}[0], + End: nil, + Query: "", + }) + + require.NoError(t, err) + fooValues, ok := res.Labels["foo"] + require.True(t, ok) + barValues, ok := res.Labels["bar"] + require.True(t, ok) + require.Equal(t, 4, len(fooValues.Values)) + require.Equal(t, 3, len(barValues.Values)) +} + +func TestIngester_GetDetectedLabelsWithQuery(t *testing.T) { + ctx := user.InjectOrgID(context.Background(), "test") + + ingesterConfig := defaultIngesterTestConfig(t) + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + store := &mockStore{ + chunks: map[string][]chunk.Chunk{}, + } + + i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger()) + require.NoError(t, err) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + // Push labels + req := logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: `{foo="bar",bar="baz1"}`, + }, + { + Labels: `{foo="bar",bar="baz2"}`, + }, + { + Labels: `{foo="bar1",bar="baz3"}`, + }, + { + Labels: `{foo="foo1",bar="baz4"}`, + }, + }, + } + for i := 0; i < 10; i++ { + req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{ + Timestamp: time.Unix(0, 0), + Line: fmt.Sprintf("line %d", i), + }) + req.Streams[1].Entries = append(req.Streams[1].Entries, logproto.Entry{ + Timestamp: time.Unix(0, 0), + Line: fmt.Sprintf("line %d", i), + }) + } + + _, err = i.Push(ctx, &req) + require.NoError(t, err) + + res, err := i.GetDetectedLabels(ctx, &logproto.DetectedLabelsRequest{ + Start: &[]time.Time{time.Now().Add(11 * time.Nanosecond)}[0], + End: nil, + Query: `{foo="bar"}`, + }) + + require.NoError(t, err) + fooValues, ok := res.Labels["foo"] + require.True(t, ok) + barValues, ok := res.Labels["bar"] + require.True(t, ok) + require.Equal(t, 1, len(fooValues.Values)) + require.Equal(t, 2, len(barValues.Values)) +} + func Test_DedupeIngester(t *testing.T) { var ( requests = int64(400) diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 7e9112fad1..eb98f8a39b 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -588,9 +588,31 @@ type UniqueValues map[string]struct{} // LabelsWithValues returns the label names with all the unique values depending on the request func (i *instance) LabelsWithValues(ctx context.Context, startTime time.Time, matchers ...*labels.Matcher) (map[string]UniqueValues, error) { - // TODO (shantanu): Figure out how to get the label names from index directly when no matchers are given. - labelMap := make(map[string]UniqueValues) + if len(matchers) == 0 { + labelsFromIndex, err := i.index.LabelNames(startTime, nil) + if err != nil { + return nil, err + } + + for _, label := range labelsFromIndex { + values, err := i.index.LabelValues(startTime, label, nil) + if err != nil { + return nil, err + } + existingValues, exists := labelMap[label] + if !exists { + existingValues = make(map[string]struct{}) + } + for _, v := range values { + existingValues[v] = struct{}{} + } + labelMap[label] = existingValues + } + + return labelMap, nil + } + err := i.forMatchingStreams(ctx, startTime, matchers, nil, func(s *stream) error { for _, label := range s.labels { v, exists := labelMap[label.Name] diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 0cd5838251..acc5864fc5 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -1480,6 +1480,55 @@ func insertData(t *testing.T, instance *instance) { } } +func TestInstance_LabelsWithValues(t *testing.T) { + instance, currentTime, _ := setupTestStreams(t) + start := []time.Time{currentTime.Add(11 * time.Nanosecond)}[0] + m, err := labels.NewMatcher(labels.MatchEqual, "app", "test") + require.NoError(t, err) + + t.Run("label names with no matchers returns all detected labels", func(t *testing.T) { + var matchers []*labels.Matcher + res, err := instance.LabelsWithValues(context.Background(), start, matchers...) + completeResponse := map[string]UniqueValues{ + "app": map[string]struct{}{ + "test": {}, + "test2": {}, + }, + "job": map[string]struct{}{ + "varlogs": {}, + "varlogs2": {}, + }, + } + require.NoError(t, err) + require.Equal(t, completeResponse, res) + }) + + t.Run("label names with matcher returns response with matching detected labels", func(t *testing.T) { + matchers := []*labels.Matcher{m} + res, err := instance.LabelsWithValues(context.Background(), start, matchers...) + responseWithMatchingLabel := map[string]UniqueValues{ + "app": map[string]struct{}{ + "test": {}, + }, + "job": map[string]struct{}{ + "varlogs": {}, + "varlogs2": {}, + }, + } + require.NoError(t, err) + require.Equal(t, responseWithMatchingLabel, res) + }) + + t.Run("label names matchers and no start time returns a empty response", func(t *testing.T) { + matchers := []*labels.Matcher{m} + var st time.Time + res, err := instance.LabelsWithValues(context.Background(), st, matchers...) + + require.NoError(t, err) + require.Equal(t, map[string]UniqueValues{}, res) + }) +} + type fakeQueryServer func(*logproto.QueryResponse) error func (f fakeQueryServer) Send(res *logproto.QueryResponse) error { diff --git a/pkg/logql/metrics.go b/pkg/logql/metrics.go index ed8405fc4e..052446c6b5 100644 --- a/pkg/logql/metrics.go +++ b/pkg/logql/metrics.go @@ -580,6 +580,44 @@ func extractShard(shards []string) *astmapper.ShardAnnotation { return &shard } -func RecordDetectedLabelsQueryMetrics(_ context.Context, _ log.Logger, _ time.Time, _ time.Time, _ string, _ string, _ logql_stats.Result) { - // TODO(shantanu) log metrics here +func RecordDetectedLabelsQueryMetrics(ctx context.Context, log log.Logger, start time.Time, end time.Time, query string, status string, stats logql_stats.Result) { + var ( + logger = fixLogger(ctx, log) + latencyType = latencyTypeFast + queryType = QueryTypeVolume + ) + + // Tag throughput metric by latency type based on a threshold. + // Latency below the threshold is fast, above is slow. + if stats.Summary.ExecTime > slowQueryThresholdSecond { + latencyType = latencyTypeSlow + } + + rangeType := "range" + + level.Info(logger).Log( + "api", "detected_labels", + "latency", latencyType, + "query_type", queryType, + "query", query, + "query_hash", util.HashedQuery(query), + "start", start.Format(time.RFC3339Nano), + "end", end.Format(time.RFC3339Nano), + "start_delta", time.Since(start), + "end_delta", time.Since(end), + "range_type", rangeType, + "length", end.Sub(start), + "duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))), + "status", status, + "splits", stats.Summary.Splits, + "total_entries", stats.Summary.TotalEntriesReturned, + // cache is accumulated by middleware used by the frontend only; logs from the queriers will not show cache stats + //"cache_volume_results_req", stats.Caches.VolumeResult.EntriesRequested, + //"cache_volume_results_hit", stats.Caches.VolumeResult.EntriesFound, + //"cache_volume_results_stored", stats.Caches.VolumeResult.EntriesStored, + //"cache_volume_results_download_time", stats.Caches.VolumeResult.CacheDownloadTime(), + //"cache_volume_results_query_length_served", stats.Caches.VolumeResult.CacheQueryLengthServed(), + ) + + execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime) } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index ab7bff17bc..1473a5616f 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -889,7 +889,8 @@ func (t *Loki) setupAsyncStore() error { } func (t *Loki) initIngesterQuerier() (_ services.Service, err error) { - t.ingesterQuerier, err = querier.NewIngesterQuerier(t.Cfg.IngesterClient, t.ring, t.Cfg.Querier.ExtraQueryDelay, t.Cfg.MetricsNamespace) + logger := log.With(util_log.Logger, "component", "querier") + t.ingesterQuerier, err = querier.NewIngesterQuerier(t.Cfg.IngesterClient, t.ring, t.Cfg.Querier.ExtraQueryDelay, t.Cfg.MetricsNamespace, logger) if err != nil { return nil, err } diff --git a/pkg/querier/ingester_querier.go b/pkg/querier/ingester_querier.go index 6ac1db113f..e99fe6882d 100644 --- a/pkg/querier/ingester_querier.go +++ b/pkg/querier/ingester_querier.go @@ -6,6 +6,8 @@ import ( "strings" "time" + "github.com/go-kit/log" + "github.com/go-kit/log/level" "golang.org/x/exp/slices" "github.com/grafana/loki/v3/pkg/storage/stores/index/seriesvolume" @@ -41,23 +43,25 @@ type IngesterQuerier struct { ring ring.ReadRing pool *ring_client.Pool extraQueryDelay time.Duration + logger log.Logger } -func NewIngesterQuerier(clientCfg client.Config, ring ring.ReadRing, extraQueryDelay time.Duration, metricsNamespace string) (*IngesterQuerier, error) { +func NewIngesterQuerier(clientCfg client.Config, ring ring.ReadRing, extraQueryDelay time.Duration, metricsNamespace string, logger log.Logger) (*IngesterQuerier, error) { factory := func(addr string) (ring_client.PoolClient, error) { return client.New(clientCfg, addr) } - return newIngesterQuerier(clientCfg, ring, extraQueryDelay, ring_client.PoolAddrFunc(factory), metricsNamespace) + return newIngesterQuerier(clientCfg, ring, extraQueryDelay, ring_client.PoolAddrFunc(factory), metricsNamespace, logger) } // newIngesterQuerier creates a new IngesterQuerier and allows to pass a custom ingester client factory // used for testing purposes -func newIngesterQuerier(clientCfg client.Config, ring ring.ReadRing, extraQueryDelay time.Duration, clientFactory ring_client.PoolFactory, metricsNamespace string) (*IngesterQuerier, error) { +func newIngesterQuerier(clientCfg client.Config, ring ring.ReadRing, extraQueryDelay time.Duration, clientFactory ring_client.PoolFactory, metricsNamespace string, logger log.Logger) (*IngesterQuerier, error) { iq := IngesterQuerier{ ring: ring, pool: clientpool.NewPool("ingester", clientCfg.PoolConfig, ring, clientFactory, util_log.Logger, metricsNamespace), extraQueryDelay: extraQueryDelay, + logger: logger, } err := services.StartAndAwaitRunning(context.Background(), iq.pool) @@ -364,12 +368,17 @@ func (q *IngesterQuerier) DetectedLabel(ctx context.Context, req *logproto.Detec }) if err != nil { + level.Error(q.logger).Log("msg", "error getting detected labels", "err", err) return nil, err } labelMap := make(map[string][]string) for _, resp := range ingesterResponses { - thisIngester := resp.response.(*logproto.LabelToValuesResponse) + thisIngester, ok := resp.response.(*logproto.LabelToValuesResponse) + if !ok { + level.Warn(q.logger).Log("msg", "Cannot convert response to LabelToValuesResponse in detectedlabels", + "response", resp) + } for label, thisIngesterValues := range thisIngester.Labels { var combinedValues []string diff --git a/pkg/querier/ingester_querier_test.go b/pkg/querier/ingester_querier_test.go index d2cb00d82e..713c170f7d 100644 --- a/pkg/querier/ingester_querier_test.go +++ b/pkg/querier/ingester_querier_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/go-kit/log" "go.uber.org/atomic" "google.golang.org/grpc/codes" @@ -104,13 +105,8 @@ func TestIngesterQuerier_earlyExitOnQuorum(t *testing.T) { } else { ingesterClient.On(testData.method, mock.Anything, mock.Anything, mock.Anything).Return(testData.retVal, nil).Run(runFn) } - ingesterQuerier, err := newIngesterQuerier( - mockIngesterClientConfig(), - newReadRingMock(ringIngesters, 1), - mockQuerierConfig().ExtraQueryDelay, - newIngesterClientMockFactory(ingesterClient), - constants.Loki, - ) + + ingesterQuerier, err := newTestIngesterQuerier(newReadRingMock(ringIngesters, 1), ingesterClient) require.NoError(t, err) wg.Add(3) @@ -204,13 +200,7 @@ func TestIngesterQuerier_earlyExitOnQuorum(t *testing.T) { } else { ingesterClient.On(testData.method, mock.Anything, mock.Anything, mock.Anything).Return(testData.retVal, nil).Run(runFn) } - ingesterQuerier, err := newIngesterQuerier( - mockIngesterClientConfig(), - newReadRingMock(ringIngesters, 1), - mockQuerierConfig().ExtraQueryDelay, - newIngesterClientMockFactory(ingesterClient), - constants.Loki, - ) + ingesterQuerier, err := newTestIngesterQuerier(newReadRingMock(ringIngesters, 1), ingesterClient) require.NoError(t, err) wg.Add(3) @@ -302,13 +292,7 @@ func TestQuerier_tailDisconnectedIngesters(t *testing.T) { ingesterClient := newQuerierClientMock() ingesterClient.On("Tail", mock.Anything, &req, mock.Anything).Return(newTailClientMock(), nil) - ingesterQuerier, err := newIngesterQuerier( - mockIngesterClientConfig(), - newReadRingMock(testData.ringIngesters, 0), - mockQuerierConfig().ExtraQueryDelay, - newIngesterClientMockFactory(ingesterClient), - constants.Loki, - ) + ingesterQuerier, err := newTestIngesterQuerier(newReadRingMock(testData.ringIngesters, 0), ingesterClient) require.NoError(t, err) actualClients, err := ingesterQuerier.TailDisconnectedIngesters(context.Background(), &req, testData.connectedIngestersAddr) @@ -365,13 +349,7 @@ func TestIngesterQuerier_Volume(t *testing.T) { ingesterClient := newQuerierClientMock() ingesterClient.On("GetVolume", mock.Anything, mock.Anything, mock.Anything).Return(ret, nil) - ingesterQuerier, err := newIngesterQuerier( - mockIngesterClientConfig(), - newReadRingMock([]ring.InstanceDesc{mockInstanceDesc("1.1.1.1", ring.ACTIVE), mockInstanceDesc("3.3.3.3", ring.ACTIVE)}, 0), - mockQuerierConfig().ExtraQueryDelay, - newIngesterClientMockFactory(ingesterClient), - constants.Loki, - ) + ingesterQuerier, err := newTestIngesterQuerier(newReadRingMock([]ring.InstanceDesc{mockInstanceDesc("1.1.1.1", ring.ACTIVE), mockInstanceDesc("3.3.3.3", ring.ACTIVE)}, 0), ingesterClient) require.NoError(t, err) volumes, err := ingesterQuerier.Volume(context.Background(), "", 0, 1, 10, nil, "labels") @@ -386,13 +364,7 @@ func TestIngesterQuerier_Volume(t *testing.T) { ingesterClient := newQuerierClientMock() ingesterClient.On("GetVolume", mock.Anything, mock.Anything, mock.Anything).Return(nil, status.Error(codes.Unimplemented, "something bad")) - ingesterQuerier, err := newIngesterQuerier( - mockIngesterClientConfig(), - newReadRingMock([]ring.InstanceDesc{mockInstanceDesc("1.1.1.1", ring.ACTIVE), mockInstanceDesc("3.3.3.3", ring.ACTIVE)}, 0), - mockQuerierConfig().ExtraQueryDelay, - newIngesterClientMockFactory(ingesterClient), - constants.Loki, - ) + ingesterQuerier, err := newTestIngesterQuerier(newReadRingMock([]ring.InstanceDesc{mockInstanceDesc("1.1.1.1", ring.ACTIVE), mockInstanceDesc("3.3.3.3", ring.ACTIVE)}, 0), ingesterClient) require.NoError(t, err) volumes, err := ingesterQuerier.Volume(context.Background(), "", 0, 1, 10, nil, "labels") @@ -401,3 +373,42 @@ func TestIngesterQuerier_Volume(t *testing.T) { require.Equal(t, []logproto.Volume(nil), volumes.Volumes) }) } + +func TestIngesterQuerier_DetectedLabels(t *testing.T) { + t.Run("it returns all unique detected labels from all ingesters", func(t *testing.T) { + req := logproto.DetectedLabelsRequest{} + + ingesterClient := newQuerierClientMock() + ingesterClient.On("GetDetectedLabels", mock.Anything, mock.Anything, mock.Anything).Return(&logproto.LabelToValuesResponse{Labels: map[string]*logproto.UniqueLabelValues{ + "cluster": {Values: []string{"ingester"}}, + "foo": {Values: []string{"abc", "abc", "ghi"}}, + "bar": {Values: []string{"cgi", "def"}}, + "all-ids": {Values: []string{"1", "3", "3", "3"}}, + }}, nil) + + readRingMock := newReadRingMock([]ring.InstanceDesc{mockInstanceDesc("1.1.1.1", ring.ACTIVE), mockInstanceDesc("3.3.3.3", ring.ACTIVE)}, 0) + ingesterQuerier, err := newTestIngesterQuerier(readRingMock, ingesterClient) + require.NoError(t, err) + + detectedLabels, err := ingesterQuerier.DetectedLabel(context.Background(), &req) + require.NoError(t, err) + + require.Equal(t, &logproto.LabelToValuesResponse{Labels: map[string]*logproto.UniqueLabelValues{ + "all-ids": {Values: []string{"1", "3"}}, + "bar": {Values: []string{"cgi", "def"}}, + "cluster": {Values: []string{"ingester"}}, + "foo": {Values: []string{"abc", "ghi"}}, + }}, detectedLabels) + }) +} + +func newTestIngesterQuerier(readRingMock *readRingMock, ingesterClient *querierClientMock) (*IngesterQuerier, error) { + return newIngesterQuerier( + mockIngesterClientConfig(), + readRingMock, + mockQuerierConfig().ExtraQueryDelay, + newIngesterClientMockFactory(ingesterClient), + constants.Loki, + log.NewNopLogger(), + ) +} diff --git a/pkg/querier/multi_tenant_querier.go b/pkg/querier/multi_tenant_querier.go index 5489720003..654ae7c2de 100644 --- a/pkg/querier/multi_tenant_querier.go +++ b/pkg/querier/multi_tenant_querier.go @@ -284,7 +284,6 @@ func (q *MultiTenantQuerier) DetectedFields(ctx context.Context, req *logproto.D } func (q *MultiTenantQuerier) DetectedLabels(ctx context.Context, req *logproto.DetectedLabelsRequest) (*logproto.DetectedLabelsResponse, error) { - // TODO(shantanu) tenantIDs, err := tenant.TenantIDs(ctx) if err != nil { return nil, err @@ -294,7 +293,10 @@ func (q *MultiTenantQuerier) DetectedLabels(ctx context.Context, req *logproto.D return q.Querier.DetectedLabels(ctx, req) } - //resp := make([]*logproto.DetectedLabels, len(tenantIDs)) + level.Debug(q.logger).Log( + "msg", "detected labels requested for multiple tenants, but not yet supported. returning static labels", + "tenantIDs", strings.Join(tenantIDs, ","), + ) return &logproto.DetectedLabelsResponse{ DetectedLabels: []*logproto.DetectedLabel{ diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index b2e61b1bdb..bee850fd82 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -909,12 +909,27 @@ func (q *SingleTenantQuerier) Volume(ctx context.Context, req *logproto.VolumeRe return seriesvolume.Merge(responses, req.Limit), nil } +// DetectedLabels fetches labels and values from store and ingesters and filters them by relevance criteria as per logs app. func (q *SingleTenantQuerier) DetectedLabels(ctx context.Context, req *logproto.DetectedLabelsRequest) (*logproto.DetectedLabelsResponse, error) { - var ingesterLabels *logproto.LabelToValuesResponse + userID, err := tenant.TenantID(ctx) + if err != nil { + return nil, err + } var detectedLabels []*logproto.DetectedLabel + staticLabels := map[string]struct{}{"cluster": {}, "namespace": {}, "instance": {}, "pod": {}} + // Enforce the query timeout while querying backends + queryTimeout := q.limits.QueryTimeout(ctx, userID) + ctx, cancel := context.WithDeadline(ctx, time.Now().Add(queryTimeout)) + defer cancel() g, ctx := errgroup.WithContext(ctx) - ingesterQueryInterval, _ := q.buildQueryIntervals(*req.Start, *req.End) + + if *req.Start, *req.End, err = validateQueryTimeRangeLimits(ctx, userID, q.limits, *req.Start, *req.End); err != nil { + return nil, err + } + ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(*req.Start, *req.End) + + var ingesterLabels *logproto.LabelToValuesResponse if !q.cfg.QueryStoreOnly && ingesterQueryInterval != nil { g.Go(func() error { var err error @@ -923,7 +938,33 @@ func (q *SingleTenantQuerier) DetectedLabels(ctx context.Context, req *logproto. splitReq.End = &ingesterQueryInterval.end ingesterLabels, err = q.ingesterQuerier.DetectedLabel(ctx, &splitReq) - level.Info(q.logger).Log("msg", ingesterLabels) + return err + }) + } + + storeLabelsMap := make(map[string][]string) + if !q.cfg.QueryIngesterOnly && storeQueryInterval != nil { + var matchers []*labels.Matcher + if req.Query != "" { + matchers, err = syntax.ParseMatchers(req.Query, true) + if err != nil { + return nil, err + } + } + g.Go(func() error { + var err error + start := model.TimeFromUnixNano(storeQueryInterval.start.UnixNano()) + end := model.TimeFromUnixNano(storeQueryInterval.end.UnixNano()) + storeLabels, err := q.store.LabelNamesForMetricName(ctx, userID, start, end, "logs") + for _, label := range storeLabels { + values, err := q.store.LabelValuesForMetricName(ctx, userID, start, end, "logs", label, matchers...) + if err != nil { + return err + } + if q.isLabelRelevant(label, values, staticLabels) { + storeLabelsMap[label] = values + } + } return err }) } @@ -932,18 +973,43 @@ func (q *SingleTenantQuerier) DetectedLabels(ctx context.Context, req *logproto. return nil, err } - if ingesterLabels == nil { + if ingesterLabels == nil && len(storeLabelsMap) == 0 { return &logproto.DetectedLabelsResponse{ DetectedLabels: []*logproto.DetectedLabel{}, }, nil } - for label, values := range ingesterLabels.Labels { - if q.isLabelRelevant(label, values) { - detectedLabels = append(detectedLabels, &logproto.DetectedLabel{Label: label, Cardinality: uint64(len(values.Values))}) + // append static labels before so they are in sorted order + for l := range staticLabels { + if values, present := ingesterLabels.Labels[l]; present { + detectedLabels = append(detectedLabels, &logproto.DetectedLabel{Label: l, Cardinality: uint64(len(values.Values))}) } } + if ingesterLabels != nil { + for label, values := range ingesterLabels.Labels { + if q.isLabelRelevant(label, values.Values, staticLabels) { + combinedValues := values.Values + storeValues, storeHasLabel := storeLabelsMap[label] + if storeHasLabel { + combinedValues = append(combinedValues, storeValues...) + } + + slices.Sort(combinedValues) + uniqueValues := slices.Compact(combinedValues) + // TODO(shantanu): There's a bug here. Unique values can go above 50. Will need a bit of refactoring + detectedLabels = append(detectedLabels, &logproto.DetectedLabel{Label: label, Cardinality: uint64(len(uniqueValues))}) + delete(storeLabelsMap, label) + } + } + } + + for label, values := range storeLabelsMap { + slices.Sort(values) + uniqueValues := slices.Compact(values) + detectedLabels = append(detectedLabels, &logproto.DetectedLabel{Label: label, Cardinality: uint64(len(uniqueValues))}) + } + return &logproto.DetectedLabelsResponse{ DetectedLabels: detectedLabels, }, nil @@ -965,13 +1031,13 @@ func (q *SingleTenantQuerier) Patterns(ctx context.Context, req *logproto.QueryP return res, err } -func (q *SingleTenantQuerier) isLabelRelevant(label string, values *logproto.UniqueLabelValues) bool { - staticLabels := []string{"pod", "namespace", "cluster", "instance"} - cardinality := len(values.Values) - // TODO(shantanu) make these values configurable - if !slices.Contains(staticLabels, label) && - (cardinality < 1 || cardinality > 50) || - containsAllIDTypes(values.Values) { +// isLabelRelevant returns if the label is relevant for logs app. A label is relevant if it is not of any numeric, UUID or GUID type +// It is also not relevant to return if the values are less than 1 or beyond 50. +func (q *SingleTenantQuerier) isLabelRelevant(label string, values []string, staticLabels map[string]struct{}) bool { + cardinality := len(values) + _, isStaticLabel := staticLabels[label] + if isStaticLabel || (cardinality < 2 || cardinality > 50) || + containsAllIDTypes(values) { return false } diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go index 3d5edc50b8..6d025a9e0d 100644 --- a/pkg/querier/querier_mock_test.go +++ b/pkg/querier/querier_mock_test.go @@ -111,6 +111,16 @@ func (c *querierClientMock) GetChunkIDs(ctx context.Context, in *logproto.GetChu return res.(*logproto.GetChunkIDsResponse), args.Error(1) } +func (c *querierClientMock) GetDetectedLabels(ctx context.Context, in *logproto.DetectedLabelsRequest, opts ...grpc.CallOption) (*logproto.LabelToValuesResponse, error) { + args := c.Called(ctx, in, opts) + res := args.Get(0) + if res == nil { + return (*logproto.LabelToValuesResponse)(nil), args.Error(1) + } + return res.(*logproto.LabelToValuesResponse), args.Error(1) + +} + func (c *querierClientMock) GetVolume(ctx context.Context, in *logproto.VolumeRequest, opts ...grpc.CallOption) (*logproto.VolumeResponse, error) { args := c.Called(ctx, in, opts) res := args.Get(0) diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index df87a72df3..e6c228f049 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -5,6 +5,7 @@ import ( "errors" "io" "net/http" + "strconv" "testing" "time" @@ -19,6 +20,8 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + util_log "github.com/grafana/loki/v3/pkg/util/log" + "github.com/grafana/loki/v3/pkg/compactor/deletion" "github.com/grafana/loki/v3/pkg/ingester/client" "github.com/grafana/loki/v3/pkg/logproto" @@ -1148,6 +1151,13 @@ func setupIngesterQuerierMocks(conf Config, limits *validation.Overrides) (*quer }, }, }, nil) + ingesterClient.On("GetDetectedLabels", mock.Anything, mock.Anything, mock.Anything).Return(&logproto.DetectedLabelsResponse{ + DetectedLabels: []*logproto.DetectedLabel{ + {Label: "pod", Cardinality: 1}, + {Label: "namespace", Cardinality: 3}, + {Label: "customerId", Cardinality: 200}, + }, + }, nil) store := newStoreMock() store.On("SelectLogs", mock.Anything, mock.Anything).Return(mockStreamIterator(0, 1), nil) @@ -1351,7 +1361,7 @@ func TestQuerier_SelectSamplesWithDeletes(t *testing.T) { } func newQuerier(cfg Config, clientCfg client.Config, clientFactory ring_client.PoolFactory, ring ring.ReadRing, dg *mockDeleteGettter, store storage.Store, limits *validation.Overrides) (*SingleTenantQuerier, error) { - iq, err := newIngesterQuerier(clientCfg, ring, cfg.ExtraQueryDelay, clientFactory, constants.Loki) + iq, err := newIngesterQuerier(clientCfg, ring, cfg.ExtraQueryDelay, clientFactory, constants.Loki, util_log.Logger) if err != nil { return nil, err } @@ -1373,44 +1383,306 @@ func TestQuerier_isLabelRelevant(t *testing.T) { for _, tc := range []struct { name string label string - values *logproto.UniqueLabelValues + values []string expected bool }{ { label: "uuidv4 values are not relevant", - values: &logproto.UniqueLabelValues{Values: []string{"751e8ee6-b377-4b2e-b7b5-5508fbe980ef", "6b7e2663-8ecb-42e1-8bdc-0c5de70185b3", "2e1e67ff-be4f-47b8-aee1-5d67ff1ddabf", "c95b2d62-74ed-4ed7-a8a1-eb72fc67946e"}}, + values: []string{"751e8ee6-b377-4b2e-b7b5-5508fbe980ef", "6b7e2663-8ecb-42e1-8bdc-0c5de70185b3", "2e1e67ff-be4f-47b8-aee1-5d67ff1ddabf", "c95b2d62-74ed-4ed7-a8a1-eb72fc67946e"}, expected: false, }, { label: "guid values are not relevant", - values: &logproto.UniqueLabelValues{Values: []string{"57808f62-f117-4a22-84a0-bc3282c7f106", "5076e837-cd8d-4dd7-95ff-fecb087dccf6", "2e2a6554-1744-4399-b89a-88ae79c27096", "d3c31248-ec0c-4bc4-b11c-8fb1cfb42e62"}}, + values: []string{"57808f62-f117-4a22-84a0-bc3282c7f106", "5076e837-cd8d-4dd7-95ff-fecb087dccf6", "2e2a6554-1744-4399-b89a-88ae79c27096", "d3c31248-ec0c-4bc4-b11c-8fb1cfb42e62"}, expected: false, }, { label: "integer values are not relevant", - values: &logproto.UniqueLabelValues{Values: []string{"1", "2", "3", "4"}}, + values: []string{"1", "2", "3", "4"}, expected: false, }, { label: "string values are relevant", - values: &logproto.UniqueLabelValues{Values: []string{"ingester", "querier", "query-frontend", "index-gateway"}}, + values: []string{"ingester", "querier", "query-frontend", "index-gateway"}, expected: true, }, { label: "guid with braces are not relevant", - values: &logproto.UniqueLabelValues{Values: []string{"{E9550CF7-58D9-48B9-8845-D9800C651AAC}", "{1617921B-1749-4FF0-A058-31AFB5D98149}", "{C119D92E-A4B9-48A3-A92C-6CA8AA8A6CCC}", "{228AAF1D-2DE7-4909-A4E9-246A7FA9D988}"}}, + values: []string{"{E9550CF7-58D9-48B9-8845-D9800C651AAC}", "{1617921B-1749-4FF0-A058-31AFB5D98149}", "{C119D92E-A4B9-48A3-A92C-6CA8AA8A6CCC}", "{228AAF1D-2DE7-4909-A4E9-246A7FA9D988}"}, expected: false, }, { label: "float values are not relevant", - values: &logproto.UniqueLabelValues{Values: []string{"1.2", "2.5", "3.3", "4.1"}}, + values: []string{"1.2", "2.5", "3.3", "4.1"}, expected: false, }, } { t.Run(tc.name, func(t *testing.T) { querier := &SingleTenantQuerier{cfg: mockQuerierConfig()} - assert.Equal(t, tc.expected, querier.isLabelRelevant(tc.label, tc.values)) + assert.Equal(t, tc.expected, querier.isLabelRelevant(tc.label, tc.values, map[string]struct{}{"host": {}, "cluster": {}, "namespace": {}, "instance": {}, "pod": {}})) }) } } + +func TestQuerier_DetectedLabels(t *testing.T) { + manyValues := []string{} + now := time.Now() + for i := 0; i < 60; i++ { + manyValues = append(manyValues, "a"+strconv.Itoa(i)) + } + + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + ctx := user.InjectOrgID(context.Background(), "test") + + conf := mockQuerierConfig() + conf.IngesterQueryStoreMaxLookback = 0 + + request := logproto.DetectedLabelsRequest{ + Start: &now, + End: &now, + Query: "", + } + + t.Run("when both store and ingester responses are present, a combined response is returned", func(t *testing.T) { + ingesterResponse := logproto.LabelToValuesResponse{Labels: map[string]*logproto.UniqueLabelValues{ + "cluster": {Values: []string{"ingester"}}, + "ingesterLabel": {Values: []string{"abc", "def", "ghi", "abc"}}, + }} + + ingesterClient := newQuerierClientMock() + storeClient := newStoreMock() + + ingesterClient.On("GetDetectedLabels", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(&ingesterResponse, nil) + storeClient.On("LabelNamesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return([]string{"storeLabel"}, nil). + On("LabelValuesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, "storeLabel", mock.Anything). + Return([]string{"val1", "val2"}, nil) + + querier, err := newQuerier( + conf, + mockIngesterClientConfig(), + newIngesterClientMockFactory(ingesterClient), + mockReadRingWithOneActiveIngester(), + &mockDeleteGettter{}, + storeClient, limits) + require.NoError(t, err) + + resp, err := querier.DetectedLabels(ctx, &request) + require.NoError(t, err) + + calls := ingesterClient.GetMockedCallsByMethod("GetDetectedLabels") + assert.Equal(t, 1, len(calls)) + + detectedLabels := resp.DetectedLabels + assert.Len(t, detectedLabels, 3) + assert.Contains(t, detectedLabels, &logproto.DetectedLabel{Label: "storeLabel", Cardinality: 2}) + assert.Contains(t, detectedLabels, &logproto.DetectedLabel{Label: "ingesterLabel", Cardinality: 3}) + }) + + t.Run("when both store and ingester responses are present, duplicates are removed", func(t *testing.T) { + ingesterResponse := logproto.LabelToValuesResponse{Labels: map[string]*logproto.UniqueLabelValues{ + "cluster": {Values: []string{"ingester"}}, + "ingesterLabel": {Values: []string{"abc", "def", "ghi", "abc"}}, + "commonLabel": {Values: []string{"abc", "def", "ghi", "abc"}}, + }} + + ingesterClient := newQuerierClientMock() + storeClient := newStoreMock() + + ingesterClient.On("GetDetectedLabels", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(&ingesterResponse, nil) + storeClient.On("LabelNamesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return([]string{"storeLabel", "commonLabel"}, nil). + On("LabelValuesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, "storeLabel", mock.Anything). + Return([]string{"val1", "val2"}, nil). + On("LabelValuesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, "commonLabel", mock.Anything). + Return([]string{"def", "xyz", "lmo", "abc"}, nil) + + querier, err := newQuerier( + conf, + mockIngesterClientConfig(), + newIngesterClientMockFactory(ingesterClient), + mockReadRingWithOneActiveIngester(), + &mockDeleteGettter{}, + storeClient, limits) + require.NoError(t, err) + + resp, err := querier.DetectedLabels(ctx, &request) + require.NoError(t, err) + + calls := ingesterClient.GetMockedCallsByMethod("GetDetectedLabels") + assert.Equal(t, 1, len(calls)) + + detectedLabels := resp.DetectedLabels + assert.Len(t, detectedLabels, 4) + assert.Contains(t, detectedLabels, &logproto.DetectedLabel{Label: "storeLabel", Cardinality: 2}) + assert.Contains(t, detectedLabels, &logproto.DetectedLabel{Label: "ingesterLabel", Cardinality: 3}) + assert.Contains(t, detectedLabels, &logproto.DetectedLabel{Label: "commonLabel", Cardinality: 5}) + }) + + t.Run("returns a response when ingester data is empty", func(t *testing.T) { + ingesterClient := newQuerierClientMock() + storeClient := newStoreMock() + + ingesterClient.On("GetDetectedLabels", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(&logproto.LabelToValuesResponse{}, nil) + storeClient.On("LabelNamesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return([]string{"storeLabel1", "storeLabel2"}, nil). + On("LabelValuesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, "storeLabel1", mock.Anything). + Return([]string{"val1", "val2"}, nil). + On("LabelValuesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, "storeLabel2", mock.Anything). + Return([]string{"val1", "val2"}, nil) + + querier, err := newQuerier( + conf, + mockIngesterClientConfig(), + newIngesterClientMockFactory(ingesterClient), + mockReadRingWithOneActiveIngester(), + &mockDeleteGettter{}, + storeClient, limits) + require.NoError(t, err) + + resp, err := querier.DetectedLabels(ctx, &request) + require.NoError(t, err) + + detectedLabels := resp.DetectedLabels + assert.Len(t, detectedLabels, 2) + assert.Contains(t, detectedLabels, &logproto.DetectedLabel{Label: "storeLabel1", Cardinality: 2}) + assert.Contains(t, detectedLabels, &logproto.DetectedLabel{Label: "storeLabel2", Cardinality: 2}) + }) + + t.Run("returns a response when store data is empty", func(t *testing.T) { + ingesterResponse := logproto.LabelToValuesResponse{Labels: map[string]*logproto.UniqueLabelValues{ + "cluster": {Values: []string{"ingester"}}, + "ingesterLabel": {Values: []string{"abc", "def", "ghi", "abc"}}, + }} + + ingesterClient := newQuerierClientMock() + storeClient := newStoreMock() + + ingesterClient.On("GetDetectedLabels", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(&ingesterResponse, nil) + storeClient.On("LabelNamesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return([]string{}, nil) + + querier, err := newQuerier( + conf, + mockIngesterClientConfig(), + newIngesterClientMockFactory(ingesterClient), + mockReadRingWithOneActiveIngester(), + &mockDeleteGettter{}, + storeClient, limits) + require.NoError(t, err) + + resp, err := querier.DetectedLabels(ctx, &request) + require.NoError(t, err) + + detectedLabels := resp.DetectedLabels + assert.Len(t, detectedLabels, 2) + assert.Contains(t, detectedLabels, &logproto.DetectedLabel{Label: "cluster", Cardinality: 1}) + assert.Contains(t, detectedLabels, &logproto.DetectedLabel{Label: "ingesterLabel", Cardinality: 3}) + }) + + t.Run("id types like uuids, guids and numbers are not relevant detected labels", func(t *testing.T) { + ingesterResponse := logproto.LabelToValuesResponse{Labels: map[string]*logproto.UniqueLabelValues{ + "all-ints": {Values: []string{"1", "2", "3", "4"}}, + "all-floats": {Values: []string{"1.2", "2.3", "3.4", "4.5"}}, + "all-uuids": {Values: []string{"751e8ee6-b377-4b2e-b7b5-5508fbe980ef", "6b7e2663-8ecb-42e1-8bdc-0c5de70185b3", "2e1e67ff-be4f-47b8-aee1-5d67ff1ddabf", "c95b2d62-74ed-4ed7-a8a1-eb72fc67946e"}}, + }} + + ingesterClient := newQuerierClientMock() + storeClient := newStoreMock() + + ingesterClient.On("GetDetectedLabels", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(&ingesterResponse, nil) + storeClient.On("LabelNamesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return([]string{}, nil) + + querier, err := newQuerier( + conf, + mockIngesterClientConfig(), + newIngesterClientMockFactory(ingesterClient), + mockReadRingWithOneActiveIngester(), + &mockDeleteGettter{}, + storeClient, limits) + require.NoError(t, err) + + resp, err := querier.DetectedLabels(ctx, &request) + require.NoError(t, err) + + detectedLabels := resp.DetectedLabels + assert.Len(t, detectedLabels, 0) + }) + + t.Run("labels with more than required cardinality are not relevant", func(t *testing.T) { + ingesterResponse := logproto.LabelToValuesResponse{Labels: map[string]*logproto.UniqueLabelValues{ + "less-than-m-values": {Values: []string{"val1"}}, + "more-than-n-values": {Values: manyValues}, + }} + + ingesterClient := newQuerierClientMock() + storeClient := newStoreMock() + + ingesterClient.On("GetDetectedLabels", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(&ingesterResponse, nil) + storeClient.On("LabelNamesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return([]string{}, nil) + + querier, err := newQuerier( + conf, + mockIngesterClientConfig(), + newIngesterClientMockFactory(ingesterClient), + mockReadRingWithOneActiveIngester(), + &mockDeleteGettter{}, + storeClient, limits) + require.NoError(t, err) + + resp, err := querier.DetectedLabels(ctx, &request) + require.NoError(t, err) + + detectedLabels := resp.DetectedLabels + assert.Len(t, detectedLabels, 0) + }) + + t.Run("static labels are always returned no matter their cardinality or value types", func(t *testing.T) { + ingesterResponse := logproto.LabelToValuesResponse{Labels: map[string]*logproto.UniqueLabelValues{ + "cluster": {Values: []string{"val1"}}, + "namespace": {Values: manyValues}, + "pod": {Values: []string{"1", "2", "3", "4"}}, + }} + + ingesterClient := newQuerierClientMock() + storeClient := newStoreMock() + + ingesterClient.On("GetDetectedLabels", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(&ingesterResponse, nil) + storeClient.On("LabelNamesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return([]string{}, nil) + request := logproto.DetectedLabelsRequest{ + Start: &now, + End: &now, + Query: "", + } + + querier, err := newQuerier( + conf, + mockIngesterClientConfig(), + newIngesterClientMockFactory(ingesterClient), + mockReadRingWithOneActiveIngester(), + &mockDeleteGettter{}, + storeClient, limits) + require.NoError(t, err) + + resp, err := querier.DetectedLabels(ctx, &request) + require.NoError(t, err) + + detectedLabels := resp.DetectedLabels + assert.Len(t, detectedLabels, 3) + assert.Contains(t, detectedLabels, &logproto.DetectedLabel{Label: "cluster", Cardinality: 1}) + assert.Contains(t, detectedLabels, &logproto.DetectedLabel{Label: "pod", Cardinality: 4}) + assert.Contains(t, detectedLabels, &logproto.DetectedLabel{Label: "namespace", Cardinality: 60}) + }) +} diff --git a/pkg/querier/queryrange/codec.go b/pkg/querier/queryrange/codec.go index 874313d498..f73eef10d5 100644 --- a/pkg/querier/queryrange/codec.go +++ b/pkg/querier/queryrange/codec.go @@ -269,18 +269,6 @@ type DetectedLabelsRequest struct { logproto.DetectedLabelsRequest } -// NewDetectedLabelsRequest creates a new request for detected labels -func NewDetectedLabelsRequest(start, end time.Time, query, path string) *DetectedLabelsRequest { - return &DetectedLabelsRequest{ - DetectedLabelsRequest: logproto.DetectedLabelsRequest{ - Start: &start, - End: &end, - Query: query, - }, - path: path, - } -} - func (r *DetectedLabelsRequest) AsProto() *logproto.DetectedLabelsRequest { return &r.DetectedLabelsRequest } diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index e5b9db82cd..ff7c4ba4db 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -253,6 +253,19 @@ func NewMiddleware( return nil, nil, err } + detectedLabelsTripperware, err := NewDetectedLabelsTripperware( + cfg, + engineOpts, + log, + limits, + schema, + metrics, + indexStatsTripperware, + metricsNamespace) + + if err != nil { + return nil, nil, err + } return base.MiddlewareFunc(func(next base.Handler) base.Handler { var ( metricRT = metricsTripperware.Wrap(next) @@ -264,13 +277,41 @@ func NewMiddleware( statsRT = indexStatsTripperware.Wrap(next) seriesVolumeRT = seriesVolumeTripperware.Wrap(next) detectedFieldsRT = detectedFieldsTripperware.Wrap(next) - detectedLabelsRT = next // TODO(shantanu): add middlewares + detectedLabelsRT = detectedLabelsTripperware.Wrap(next) ) return newRoundTripper(log, next, limitedRT, logFilterRT, metricRT, seriesRT, labelsRT, instantRT, statsRT, seriesVolumeRT, detectedFieldsRT, detectedLabelsRT, limits) }), StopperWrapper{resultsCache, statsCache, volumeCache}, nil } +func NewDetectedLabelsTripperware(cfg Config, opts logql.EngineOpts, logger log.Logger, l Limits, schema config.SchemaConfig, metrics *Metrics, mw base.Middleware, namespace string) (base.Middleware, error) { + return base.MiddlewareFunc(func(next base.Handler) base.Handler { + statsHandler := mw.Wrap(next) + + queryRangeMiddleware := []base.Middleware{ + StatsCollectorMiddleware(), + NewLimitsMiddleware(l), + NewQuerySizeLimiterMiddleware(schema.Configs, opts, logger, l, statsHandler), + base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics), + } + + // The sharding middleware takes care of enforcing this limit for both shardable and non-shardable queries. + // If we are not using sharding, we enforce the limit by adding this middleware after time splitting. + queryRangeMiddleware = append(queryRangeMiddleware, + NewQuerierSizeLimiterMiddleware(schema.Configs, opts, logger, l, statsHandler), + ) + + if cfg.MaxRetries > 0 { + queryRangeMiddleware = append( + queryRangeMiddleware, base.InstrumentMiddleware("retry", metrics.InstrumentMiddlewareMetrics), + base.NewRetryMiddleware(logger, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, namespace), + ) + } + + return NewLimitedRoundTripper(next, l, schema.Configs, queryRangeMiddleware...) + }), nil +} + type roundTripper struct { logger log.Logger