From fdcc12da4db1ff5643ca6d66945c1e4dcc73f3fb Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Thu, 5 Jun 2025 11:30:39 -0600 Subject: [PATCH] feat: query persisted patterns (#17980) Signed-off-by: Trevor Whitney --- docs/sources/shared/configuration.md | 4 + pkg/logproto/extensions.go | 85 +++++++++ pkg/logproto/extensions_test.go | 124 ++++++++++++ pkg/loki/modules.go | 4 + pkg/pattern/flush.go | 5 +- pkg/pattern/ingester.go | 7 + pkg/querier/http.go | 177 ++++++++++++++++- pkg/querier/http_test.go | 230 +++++++++++++++++++++++ pkg/querier/intervals.go | 104 ++++++++++ pkg/querier/pattern/querier.go | 48 +++++ pkg/querier/querier.go | 110 ++--------- pkg/querier/querier_mock_test.go | 85 ++++++++- pkg/querier/querier_test.go | 28 +-- pkg/querier/queryrange/codec.go | 44 ++++- pkg/querier/queryrange/roundtrip.go | 54 +++++- pkg/querier/queryrange/roundtrip_test.go | 1 + 16 files changed, 989 insertions(+), 121 deletions(-) create mode 100644 pkg/querier/intervals.go diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 8ac10dfcb5..783491f20f 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -863,6 +863,10 @@ pattern_ingester: # CLI flag: -pattern-ingester.max-allowed-line-length [max_allowed_line_length: | default = 3000] + # How long to retain patterns in the pattern ingester after they are pushed. + # CLI flag: -pattern-ingester.retain-for + [retain_for: | default = 3h] + # The index_gateway block configures the Loki index gateway server, responsible # for serving index queries without the need to constantly interact with the # object store. diff --git a/pkg/logproto/extensions.go b/pkg/logproto/extensions.go index 956e841b2b..63d447342a 100644 --- a/pkg/logproto/extensions.go +++ b/pkg/logproto/extensions.go @@ -1,9 +1,12 @@ package logproto import ( + "fmt" + "slices" "sort" "strings" "sync/atomic" //lint:ignore faillint we can't use go.uber.org/atomic with a protobuf struct without wrapping it. + "time" "github.com/cespare/xxhash/v2" "github.com/dustin/go-humanize" @@ -11,7 +14,9 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index" + "github.com/grafana/loki/v3/pkg/util/constants" ) // This is the separator define in the Prometheus Labels.Hash function. @@ -189,3 +194,83 @@ func (m *ShardsResponse) Merge(other *ShardsResponse) { m.ChunkGroups = append(m.ChunkGroups, other.ChunkGroups...) m.Statistics.Merge(other.Statistics) } + +func (m *QueryPatternsRequest) GetSampleQuery() (string, error) { + expr, err := syntax.ParseExpr(m.Query) + if err != nil { + return "", err + } + + // Extract matchers from the expression + var matchers []*labels.Matcher + switch e := expr.(type) { + case *syntax.MatchersExpr: + matchers = e.Mts + case syntax.LogSelectorExpr: + matchers = e.Matchers() + default: + // Cannot extract matchers + return "", nil + } + + // Find service_name from matchers + var serviceName string + var serviceMatcher labels.MatchType + for i, m := range matchers { + if m.Name == "service_name" { + matchers = slices.Delete(matchers, i, i+1) + serviceName = m.Value + serviceMatcher = m.Type + break + } + } + + if serviceName == "" { + serviceName = ".+" + serviceMatcher = labels.MatchRegexp + } + + // Build LogQL query for persisted patterns + logqlQuery := buildPatternLogQLQuery(serviceName, serviceMatcher, matchers, m.Step) + + return logqlQuery, nil +} + +func buildPatternLogQLQuery(serviceName string, serviceMatcher labels.MatchType, matchers []*labels.Matcher, step int64) string { + // Use step duration for sum_over_time window + stepDuration := max(time.Duration(step)*time.Millisecond, 10*time.Second) + + if len(matchers) == 0 { + return buildPatterLogQLQueryString(serviceName, serviceMatcher.String(), "", stepDuration.String()) + } + + stringBuilder := strings.Builder{} + for i, matcher := range matchers { + stringBuilder.WriteString(matcher.String()) + if i < len(matchers)-1 { + stringBuilder.WriteString(" | ") + } + } + + return buildPatterLogQLQueryString(serviceName, serviceMatcher.String(), stringBuilder.String(), stepDuration.String()) +} + +func buildPatterLogQLQueryString(serviceName, serviceMatcher, matchers, step string) string { + decodePatternTransform := `label_format decoded_pattern=` + "`{{urldecode .detected_pattern}}`" + + matchAndTransform := "" + if matchers == "" { + matchAndTransform = decodePatternTransform + } else { + matchAndTransform = fmt.Sprintf(`%s | %s`, matchers, decodePatternTransform) + + } + return fmt.Sprintf( + `sum by (decoded_pattern, %s) (sum_over_time({__pattern__%s"%s"} | logfmt | %s | unwrap count [%s]))`, + constants.LevelLabel, + serviceMatcher, + serviceName, + matchAndTransform, + step, + ) +} diff --git a/pkg/logproto/extensions_test.go b/pkg/logproto/extensions_test.go index d1c96c76bb..c36f0a5a31 100644 --- a/pkg/logproto/extensions_test.go +++ b/pkg/logproto/extensions_test.go @@ -40,3 +40,127 @@ func TestShard_SpaceFor(t *testing.T) { }) } } + +func TestQueryPatternsRequest_GetSampleQuery(t *testing.T) { + tests := []struct { + name string + query string + step int64 + expected string + expectedError bool + }{ + { + name: "simple selector with service_name", + query: `{service_name="test-service"}`, + step: 60000, // 1 minute in milliseconds + expected: `sum by (decoded_pattern, detected_level) (sum_over_time({__pattern__="test-service"} | logfmt | ` + + "label_format decoded_pattern=`{{urldecode .detected_pattern}}` " + + `| unwrap count [1m0s]))`, + }, + { + name: "selector with service_name and additional labels", + query: `{service_name="test-service", env="prod", cluster="us-east-1"}`, + step: 300000, // 5 minutes in milliseconds + expected: `sum by (decoded_pattern, detected_level) (sum_over_time({__pattern__="test-service"} | logfmt | ` + + "env=\"prod\" | cluster=\"us-east-1\" | " + + "label_format decoded_pattern=`{{urldecode .detected_pattern}}` " + + `| unwrap count [5m0s]))`, + }, + { + name: "selector with service_name and additional labels and match types", + query: `{service_name="test-service", env=~"prod", cluster!="us-east-1", foo!~"bar"}`, + step: 300000, // 5 minutes in milliseconds + expected: `sum by (decoded_pattern, detected_level) (sum_over_time({__pattern__="test-service"} | logfmt | ` + + "env=~\"prod\" | cluster!=\"us-east-1\" | foo!~\"bar\" | " + + "label_format decoded_pattern=`{{urldecode .detected_pattern}}` " + + `| unwrap count [5m0s]))`, + }, + { + name: "small step gets minimum 10s window", + query: `{service_name="test-service"}`, + step: 5000, // 5 seconds in milliseconds + expected: `sum by (decoded_pattern, detected_level) (sum_over_time({__pattern__="test-service"} | logfmt | ` + + "label_format decoded_pattern=`{{urldecode .detected_pattern}}` " + + `| unwrap count [10s]))`, + }, + { + name: "simple regex selector with service_name", + query: `{service_name=~"test-service"}`, + step: 10000, // 10 seconds in milliseconds + expected: `sum by (decoded_pattern, detected_level) (sum_over_time({__pattern__=~"test-service"} | logfmt | ` + + "label_format decoded_pattern=`{{urldecode .detected_pattern}}` " + + `| unwrap count [10s]))`, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + req := &QueryPatternsRequest{ + Query: tc.query, + Step: tc.step, + } + + result, err := req.GetSampleQuery() + + if tc.expectedError { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, tc.expected, result) + } + }) + } +} + +func TestQueryPatternsRequest_GetSampleQuery_NoServiceName(t *testing.T) { + tests := []struct { + name string + query string + expected string + expectedError bool + }{ + { + name: "no service_name label", + query: `{env="prod", cluster="us-east-1"}`, + expected: `sum by (decoded_pattern, detected_level) (sum_over_time({__pattern__=~".+"} | logfmt | ` + + "env=\"prod\" | cluster=\"us-east-1\" | " + + "label_format decoded_pattern=`{{urldecode .detected_pattern}}` " + + `| unwrap count [1m0s]))`, + }, + { + name: "no service_name label, mixed match types", + query: `{env!="prod", cluster=~"us-east-1", app!~"foo"}`, + expected: `sum by (decoded_pattern, detected_level) (sum_over_time({__pattern__=~".+"} | logfmt | ` + + "env!=\"prod\" | cluster=~\"us-east-1\" | app!~\"foo\" | " + + "label_format decoded_pattern=`{{urldecode .detected_pattern}}` " + + `| unwrap count [1m0s]))`, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + req := &QueryPatternsRequest{ + Query: tc.query, + Step: 60000, + } + + result, err := req.GetSampleQuery() + if tc.expectedError { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, tc.expected, result) + } + }) + } +} + +func TestQueryPatternsRequest_GetSampleQuery_InvalidQuery(t *testing.T) { + req := &QueryPatternsRequest{ + Query: `{invalid query syntax`, + Step: 60000, + } + + _, err := req.GetSampleQuery() + require.Error(t, err) +} diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 7b6ceb30d4..14360fd0de 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -563,6 +563,10 @@ func (t *Loki) initQuerier() (services.Service, error) { if t.Cfg.Ingester.QueryStoreMaxLookBackPeriod != 0 { t.Cfg.Querier.IngesterQueryStoreMaxLookback = t.Cfg.Ingester.QueryStoreMaxLookBackPeriod } + + // Use Pattern ingester RetainFor value to determine when to query pattern ingesters + t.Cfg.Querier.QueryPatternIngestersWithin = t.Cfg.Pattern.RetainFor + // Querier worker's max concurrent must be the same as the querier setting t.Cfg.Worker.MaxConcurrent = t.Cfg.Querier.MaxConcurrent deleteStore, err := t.deleteRequestsClient("querier", t.Overrides) diff --git a/pkg/pattern/flush.go b/pkg/pattern/flush.go index 6f2eb94158..f94193dcf1 100644 --- a/pkg/pattern/flush.go +++ b/pkg/pattern/flush.go @@ -2,7 +2,6 @@ package pattern import ( "fmt" - "time" "github.com/go-kit/log/level" "github.com/prometheus/common/model" @@ -10,8 +9,6 @@ import ( "github.com/grafana/loki/v3/pkg/util" ) -const retainSampleFor = 3 * time.Hour - func (i *Ingester) initFlushQueues() { // i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes) for j := 0; j < i.cfg.ConcurrentFlushes; j++ { @@ -66,7 +63,7 @@ func (i *Ingester) sweepInstance(instance *instance, _, mayRemoveStreams bool) { _ = instance.streams.ForEach(func(s *stream) (bool, error) { if mayRemoveStreams { instance.streams.WithLock(func() { - if s.prune(retainSampleFor) { + if s.prune(i.cfg.RetainFor) { instance.removeStream(s) } }) diff --git a/pkg/pattern/ingester.go b/pkg/pattern/ingester.go index 6a59e908dc..3e7630d44d 100644 --- a/pkg/pattern/ingester.go +++ b/pkg/pattern/ingester.go @@ -45,6 +45,7 @@ type Config struct { TeeConfig TeeConfig `yaml:"tee_config,omitempty" doc:"description=Configures the pattern tee which forwards requests to the pattern ingester."` ConnectionTimeout time.Duration `yaml:"connection_timeout"` MaxAllowedLineLength int `yaml:"max_allowed_line_length,omitempty" doc:"description=The maximum length of log lines that can be used for pattern detection."` + RetainFor time.Duration `yaml:"retain_for,omitempty" doc:"description=How long to retain patterns in the pattern ingester after they are pushed."` // For testing. factory ring_client.PoolFactory `yaml:"-"` @@ -100,6 +101,12 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) { drain.DefaultConfig().MaxAllowedLineLength, "The maximum length of log lines that can be used for pattern detection.", ) + fs.DurationVar( + &cfg.RetainFor, + "pattern-ingester.retain-for", + 3*time.Hour, + "How long to retain patterns in the pattern ingester after they are pushed.", + ) } type TeeConfig struct { diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 5b0fc4e929..099d55c8c7 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -2,9 +2,11 @@ package querier import ( "context" + "fmt" "net/http" "slices" "strconv" + "sync" "time" "github.com/go-kit/log" @@ -15,10 +17,13 @@ import ( "github.com/grafana/dskit/user" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" "github.com/thanos-io/objstore" "go.opentelemetry.io/otel/trace" + "golang.org/x/sync/errgroup" "github.com/grafana/loki/v3/pkg/engine" "github.com/grafana/loki/v3/pkg/loghttp" @@ -28,6 +33,7 @@ import ( "github.com/grafana/loki/v3/pkg/logqlmodel" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" querier_limits "github.com/grafana/loki/v3/pkg/querier/limits" + "github.com/grafana/loki/v3/pkg/querier/pattern" "github.com/grafana/loki/v3/pkg/querier/queryrange" index_stats "github.com/grafana/loki/v3/pkg/storage/stores/index/stats" "github.com/grafana/loki/v3/pkg/tracing" @@ -393,19 +399,184 @@ func (q *QuerierAPI) DetectedFieldsHandler(ctx context.Context, req *logproto.De return resp, nil } +type asyncPatternResponses struct { + responses []*logproto.QueryPatternsResponse + lock sync.Mutex +} + +// currently, the only contention is when adding, however if that behavior changes we may need to lock in the read methods as well +func (r *asyncPatternResponses) add(resp *logproto.QueryPatternsResponse) { + if resp == nil { + return + } + + r.lock.Lock() + defer r.lock.Unlock() + r.responses = append(r.responses, resp) +} + +func (r *asyncPatternResponses) get() []*logproto.QueryPatternsResponse { + return r.responses +} + +func (r *asyncPatternResponses) len() int { + return len(r.responses) +} + func (q *QuerierAPI) PatternsHandler(ctx context.Context, req *logproto.QueryPatternsRequest) (*logproto.QueryPatternsResponse, error) { - resp, err := q.querier.Patterns(ctx, req) - if err != nil { + // Calculate query intervals for ingester vs store + ingesterQueryInterval, storeQueryInterval := BuildQueryIntervalsWithLookback(q.cfg, req.Start, req.End, q.cfg.QueryPatternIngestersWithin) + + responses := asyncPatternResponses{} + g, ctx := errgroup.WithContext(ctx) + + // Query pattern ingesters for recent data + if ingesterQueryInterval != nil && !q.cfg.QueryStoreOnly && q.querier != nil { + g.Go(func() error { + splitReq := *req + splitReq.Start = ingesterQueryInterval.start + splitReq.End = ingesterQueryInterval.end + + resp, err := q.querier.Patterns(ctx, &splitReq) + if err != nil { + return err + } + responses.add(resp) + return nil + }) + } + + // Query store for older data by converting to LogQL query + if storeQueryInterval != nil && !q.cfg.QueryIngesterOnly && q.engineV1 != nil { + g.Go(func() error { + storeReq := *req + storeReq.Start = storeQueryInterval.start + storeReq.End = storeQueryInterval.end + resp, err := q.queryStoreForPatterns(ctx, &storeReq) + if err != nil { + return err + } + responses.add(resp) + return nil + }) + } + + if err := g.Wait(); err != nil { return nil, err } - if resp == nil { // Some stores don't implement this + + // Merge responses + if responses.len() == 0 { return &logproto.QueryPatternsResponse{ Series: []*logproto.PatternSeries{}, }, nil } + + return pattern.MergePatternResponses(responses.get()), nil +} + +func (q *QuerierAPI) queryStoreForPatterns(ctx context.Context, req *logproto.QueryPatternsRequest) (*logproto.QueryPatternsResponse, error) { + params, err := queryrange.ParamsFromRequest(req) + if err != nil { + return nil, err + } + + // Patterns are persisted as logfmt'd strings, so we need to to run the query using the LogQL engine + // in order to extract the metric values from them. + query := q.engineV1.Query(params) + res, err := query.Exec(ctx) + if err != nil { + return nil, err + } + + result := make(map[string]map[string]map[int64]float64) // level -> pattern -> timestamp -> value + switch v := res.Data.(type) { + case promql.Vector: + for _, s := range v { + lvl, pattern := getLevelAndPattern(s.Metric) + if pattern == "" { + continue + } + + if result[lvl] == nil { + result[lvl] = make(map[string]map[int64]float64) + } + + if result[lvl][pattern] == nil { + result[lvl][pattern] = make(map[int64]float64) + } + result[lvl][pattern][s.T] = s.F + } + case promql.Matrix: + for _, s := range v { + lvl, pattern := getLevelAndPattern(s.Metric) + if pattern == "" { + continue + } + + if result[lvl] == nil { + result[lvl] = make(map[string]map[int64]float64) + } + + if result[lvl][pattern] == nil { + result[lvl][pattern] = make(map[int64]float64) + } + for _, f := range s.Floats { + result[lvl][pattern][f.T] = f.F + } + } + default: + return nil, fmt.Errorf("unexpected type (%s) when querying store for patterns. Expected %s or %s", res.Data.Type(), parser.ValueTypeMatrix, parser.ValueTypeVector) + } + + // Convert to pattern response format + resp := &logproto.QueryPatternsResponse{ + Series: make([]*logproto.PatternSeries, 0, len(result)), + } + + for lvl, patterns := range result { + for pattern, samples := range patterns { + series := &logproto.PatternSeries{ + Pattern: pattern, + Level: lvl, + Samples: make([]*logproto.PatternSample, 0, len(samples)), + } + + // Convert samples map to slice and sort by timestamp + timestamps := make([]int64, 0, len(samples)) + for ts := range samples { + timestamps = append(timestamps, ts) + } + slices.Sort(timestamps) + + level.Debug(q.logger).Log( + "msg", "earliest pattern sample from store", + "timestamp", timestamps[0], + ) + for _, ts := range timestamps { + series.Samples = append(series.Samples, &logproto.PatternSample{ + Timestamp: model.Time(ts), + Value: int64(samples[ts]), + }) + } + + resp.Series = append(resp.Series, series) + } + } + return resp, nil } +func getLevelAndPattern(metric labels.Labels) (string, string) { + lvl := metric.Get(constants.LevelLabel) + if lvl == "" { + lvl = constants.LogLevelUnknown + } + + pattern := metric.Get("decoded_pattern") + return lvl, pattern +} + func (q *QuerierAPI) validateMaxEntriesLimits(ctx context.Context, expr syntax.Expr, limit uint32) error { tenantIDs, err := tenant.TenantIDs(ctx) if err != nil { diff --git a/pkg/querier/http_test.go b/pkg/querier/http_test.go index 916196f2d5..09392a987a 100644 --- a/pkg/querier/http_test.go +++ b/pkg/querier/http_test.go @@ -421,3 +421,233 @@ func setupAPI(t *testing.T, querier *querierMock, enableMetricAggregation bool) api := NewQuerierAPI(Config{}, querier, limits, nil, nil, log.NewNopLogger()) return api } + +func TestPatternsHandler(t *testing.T) { + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + ctx := user.InjectOrgID(context.Background(), "test") + + now := time.Now() + + // Mock pattern responses + mockPatternResponse := func(patterns []string) *logproto.QueryPatternsResponse { + resp := &logproto.QueryPatternsResponse{ + Series: make([]*logproto.PatternSeries, 0), + } + for i, pattern := range patterns { + resp.Series = append(resp.Series, &logproto.PatternSeries{ + Pattern: pattern, + Level: constants.LogLevelInfo, + Samples: []*logproto.PatternSample{ + { + Timestamp: model.Time(now.Unix() * 1000), + Value: int64((i + 1) * 100), + }, + }, + }) + } + return resp + } + + tests := []struct { + name string + start, end time.Time + queryIngestersWithin time.Duration + queryPatternIngestersWithin time.Duration + ingesterQueryStoreMaxLookback time.Duration + setupQuerier func() Querier + patternsFromStore []string + expectedPatterns []string + expectedError error + queryStoreOnly bool + queryIngesterOnly bool + }{ + { + name: "query both ingester and store when time range overlaps and ingesterQueryStoreMaxLookback is not set", + start: now.Add(-2 * time.Hour), + end: now, + queryIngestersWithin: 3 * time.Hour, + setupQuerier: func() Querier { + q := &querierMock{} + q.On("Patterns", mock.Anything, mock.MatchedBy(func(req *logproto.QueryPatternsRequest) bool { + // Should query recent data only (within queryIngestersWithin) + return req.Start.After(now.Add(-3*time.Hour)) && req.End.Equal(now) + })).Return(mockPatternResponse([]string{"pattern1", "pattern2"}), nil) + return q + }, + patternsFromStore: []string{"pattern3", "pattern4"}, + expectedPatterns: []string{"pattern1", "pattern2", "pattern3", "pattern4"}, + }, + { + name: "query only store when time range is older than ingester lookback", + start: now.Add(-10 * time.Hour), + end: now.Add(-4 * time.Hour), + queryIngestersWithin: 3 * time.Hour, + setupQuerier: func() Querier { + // Should not be called + q := &querierMock{} + return q + }, + patternsFromStore: []string{"old_pattern1", "old_pattern2"}, + expectedPatterns: []string{"old_pattern1", "old_pattern2"}, + }, + { + name: "query only ingester when time range is recent and ingesterQueryStoreMaxLookback is set", + start: now.Add(-30 * time.Minute), + end: now, + queryIngestersWithin: 3 * time.Hour, + ingesterQueryStoreMaxLookback: 1 * time.Hour, + setupQuerier: func() Querier { + q := &querierMock{} + q.On("Patterns", mock.Anything, mock.MatchedBy(func(req *logproto.QueryPatternsRequest) bool { + return req.Start.Equal(now.Add(-30*time.Minute)) && req.End.Equal(now) + })).Return(mockPatternResponse([]string{"recent_pattern1", "recent_pattern2"}), nil) + return q + }, + patternsFromStore: []string{"old_pattern1", "old_pattern2"}, + expectedPatterns: []string{"recent_pattern1", "recent_pattern2"}, + }, + { + name: "query store only when configured", + start: now.Add(-2 * time.Hour), + end: now, + queryStoreOnly: true, + setupQuerier: func() Querier { + // Should not be called + return newQuerierMock() + }, + patternsFromStore: []string{"store_only_pattern"}, + expectedPatterns: []string{"store_only_pattern"}, + }, + { + name: "query ingester only when configured", + start: now.Add(-2 * time.Hour), + end: now, + queryIngesterOnly: true, + queryIngestersWithin: 3 * time.Hour, + setupQuerier: func() Querier { + q := &querierMock{} + q.On("Patterns", mock.Anything, mock.Anything).Return(mockPatternResponse([]string{"ingester_only_pattern"}), nil) + return q + }, + patternsFromStore: []string{"store_only_pattern"}, + expectedPatterns: []string{"ingester_only_pattern"}, + }, + { + name: "returns empty response when no patterns found", + start: now.Add(-2 * time.Hour), + end: now, + queryIngestersWithin: 3 * time.Hour, + setupQuerier: func() Querier { + q := &querierMock{} + q.On("Patterns", mock.Anything, mock.Anything).Return(&logproto.QueryPatternsResponse{Series: []*logproto.PatternSeries{}}, nil) + return q + }, + }, + { + name: "query uses pattern-specific lookback when configured", + start: now.Add(-2 * time.Hour), + end: now.Add(-90 * time.Minute), // Query ends before pattern ingester lookback + queryIngestersWithin: 3 * time.Hour, + queryPatternIngestersWithin: 1 * time.Hour, // Pattern ingester only looks back 1 hour + setupQuerier: func() Querier { + // Should not be called since query is entirely outside pattern ingester lookback + return newQuerierMock() + }, + patternsFromStore: []string{"pattern_from_store"}, + expectedPatterns: []string{"pattern_from_store"}, + }, + { + name: "merges patterns from both sources correctly", + start: now.Add(-2 * time.Hour), + end: now, + queryIngestersWithin: 3 * time.Hour, + setupQuerier: func() Querier { + q := &querierMock{} + q.On("Patterns", mock.Anything, mock.Anything).Return(&logproto.QueryPatternsResponse{ + Series: []*logproto.PatternSeries{ + { + Pattern: "common_pattern", + Samples: []*logproto.PatternSample{ + {Timestamp: model.Time(now.Add(-30*time.Minute).Unix() * 1000), Value: 100}, + }, + }, + { + Pattern: "ingester_pattern", + Samples: []*logproto.PatternSample{ + {Timestamp: model.Time(now.Add(-15*time.Minute).Unix() * 1000), Value: 200}, + }, + }, + }, + }, nil) + return q + }, + patternsFromStore: []string{"common_pattern", "store_pattern"}, + expectedPatterns: []string{"common_pattern", "ingester_pattern", "store_pattern"}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + conf := mockQuerierConfig() + conf.QueryIngestersWithin = tc.queryIngestersWithin + if tc.queryPatternIngestersWithin != 0 { + conf.QueryPatternIngestersWithin = tc.queryPatternIngestersWithin + } else { + conf.QueryPatternIngestersWithin = tc.queryIngestersWithin + } + conf.IngesterQueryStoreMaxLookback = tc.ingesterQueryStoreMaxLookback + conf.QueryStoreOnly = tc.queryStoreOnly + conf.QueryIngesterOnly = tc.queryIngesterOnly + + querier := tc.setupQuerier() + + // Create a mock engine that returns the expected patterns from store + engineMock := newMockEngineWithPatterns(tc.patternsFromStore) + + api := &QuerierAPI{ + cfg: conf, + querier: querier, + limits: limits, + engineV1: engineMock, + logger: log.NewNopLogger(), + } + + req := &logproto.QueryPatternsRequest{ + Query: `{service_name="test-service"}`, + Start: tc.start, + End: tc.end, + Step: time.Minute.Milliseconds(), + } + + resp, err := api.PatternsHandler(ctx, req) + + if tc.expectedError != nil { + require.Error(t, err) + require.Equal(t, tc.expectedError, err) + return + } + + require.NoError(t, err) + require.NotNil(t, resp) + + // Collect all patterns from response + foundPatterns := make(map[string]bool) + for _, series := range resp.Series { + foundPatterns[series.Pattern] = true + } + + // Check expected patterns from ingester + for _, pattern := range tc.expectedPatterns { + require.True(t, foundPatterns[pattern], "Expected pattern %s, not found", pattern) + } + + require.Equal(t, len(tc.expectedPatterns), len(resp.Series), "Unexpected number of patterns in response") + + // Verify mocks were called as expected (if it's a mock) + if q, ok := querier.(*querierMock); ok { + q.AssertExpectations(t) + } + }) + } +} diff --git a/pkg/querier/intervals.go b/pkg/querier/intervals.go new file mode 100644 index 0000000000..6e10efab1f --- /dev/null +++ b/pkg/querier/intervals.go @@ -0,0 +1,104 @@ +package querier + +import "time" + +type QueryInterval struct { + start, end time.Time +} + +func BuildQueryIntervalsWithLookback(cfg Config, queryStart, queryEnd time.Time, queryIngestersWithin time.Duration) (*QueryInterval, *QueryInterval) { + // limitQueryInterval is a flag for whether store queries should be limited to start time of ingester queries. + limitQueryInterval := cfg.IngesterQueryStoreMaxLookback != 0 + + // ingesterMLB having -1 means query ingester for whole duration. + ingesterMLB := calculateIngesterMaxLookbackPeriod(cfg, queryIngestersWithin) + + // query ingester for whole duration. + if ingesterMLB == -1 { + i := &QueryInterval{ + start: queryStart, + end: queryEnd, + } + + if limitQueryInterval { + // query only ingesters. + return i, nil + } + + // query both stores and ingesters without limiting the query interval. + return i, i + } + + ingesterQueryWithinRange := isWithinIngesterMaxLookbackPeriod(ingesterMLB, queryEnd) + + // see if there is an overlap between ingester query interval and actual query interval, if not just do the store query. + if !ingesterQueryWithinRange { + return nil, &QueryInterval{ + start: queryStart, + end: queryEnd, + } + } + + ingesterOldestStartTime := time.Now().Add(-ingesterMLB) + + // if there is an overlap and we are not limiting the query interval then do both store and ingester query for whole query interval. + if !limitQueryInterval { + i := &QueryInterval{ + start: queryStart, + end: queryEnd, + } + return i, i + } + + // since we are limiting the query interval, check if the query touches just the ingesters, if yes then query just the ingesters. + if ingesterOldestStartTime.Before(queryStart) { + return &QueryInterval{ + start: queryStart, + end: queryEnd, + }, nil + } + + // limit the start of ingester query interval to ingesterOldestStartTime. + ingesterQueryInterval := &QueryInterval{ + start: ingesterOldestStartTime, + end: queryEnd, + } + + // limit the end of ingester query interval to ingesterOldestStartTime. + storeQueryInterval := &QueryInterval{ + start: queryStart, + end: ingesterOldestStartTime, + } + + // query touches only ingester query interval so do not do store query. + if storeQueryInterval.start.After(storeQueryInterval.end) { + storeQueryInterval = nil + } + + return ingesterQueryInterval, storeQueryInterval +} + +func calculateIngesterMaxLookbackPeriod(cfg Config, queryIngestersWithin time.Duration) time.Duration { + mlb := time.Duration(-1) + if cfg.IngesterQueryStoreMaxLookback != 0 { + // IngesterQueryStoreMaxLookback takes the precedence over QueryIngestersWithin while also limiting the store query range. + mlb = cfg.IngesterQueryStoreMaxLookback + } else if queryIngestersWithin != 0 { + mlb = queryIngestersWithin + } + + return mlb +} + +func isWithinIngesterMaxLookbackPeriod(maxLookback time.Duration, queryEnd time.Time) bool { + // if no lookback limits are configured, always consider this within the range of the lookback period + if maxLookback <= 0 { + return true + } + + // find the first instance that we would want to query the ingester from... + ingesterOldestStartTime := time.Now().Add(-maxLookback) + + // ...and if the query range ends before that, don't query the ingester + return queryEnd.After(ingesterOldestStartTime) +} diff --git a/pkg/querier/pattern/querier.go b/pkg/querier/pattern/querier.go index 34bd9f7e86..542c67517e 100644 --- a/pkg/querier/pattern/querier.go +++ b/pkg/querier/pattern/querier.go @@ -2,6 +2,7 @@ package pattern import ( "context" + "sort" "github.com/grafana/loki/v3/pkg/logproto" ) @@ -9,3 +10,50 @@ import ( type PatterQuerier interface { Patterns(ctx context.Context, req *logproto.QueryPatternsRequest) (*logproto.QueryPatternsResponse, error) } + +func MergePatternResponses(responses []*logproto.QueryPatternsResponse) *logproto.QueryPatternsResponse { + if len(responses) == 0 { + return &logproto.QueryPatternsResponse{ + Series: []*logproto.PatternSeries{}, + } + } + + if len(responses) == 1 { + return responses[0] + } + + // Merge patterns by pattern string + patternMap := make(map[string]*logproto.PatternSeries) + + for _, resp := range responses { + if resp == nil { + continue + } + + for _, series := range resp.Series { + existing, exists := patternMap[series.Pattern] + if !exists { + patternMap[series.Pattern] = series + continue + } + + // Merge samples + existing.Samples = append(existing.Samples, series.Samples...) + } + } + + // Sort samples within each series by timestamp + result := &logproto.QueryPatternsResponse{ + Series: make([]*logproto.PatternSeries, 0, len(patternMap)), + } + + for _, series := range patternMap { + // Sort samples by timestamp + sort.Slice(series.Samples, func(i, j int) bool { + return series.Samples[i].Timestamp < series.Samples[j].Timestamp + }) + result.Series = append(result.Series, series) + } + + return result +} diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 67e933bf7c..3346bd47a6 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -49,23 +49,21 @@ import ( var tracer = otel.Tracer("pkg/querier") -type interval struct { - start, end time.Time -} - // Config for a querier. type Config struct { - TailMaxDuration time.Duration `yaml:"tail_max_duration"` - ExtraQueryDelay time.Duration `yaml:"extra_query_delay,omitempty"` - QueryIngestersWithin time.Duration `yaml:"query_ingesters_within,omitempty"` - IngesterQueryStoreMaxLookback time.Duration `yaml:"-"` - Engine logql.EngineOpts `yaml:"engine,omitempty"` - MaxConcurrent int `yaml:"max_concurrent"` - QueryStoreOnly bool `yaml:"query_store_only"` - QueryIngesterOnly bool `yaml:"query_ingester_only"` - MultiTenantQueriesEnabled bool `yaml:"multi_tenant_queries_enabled"` - PerRequestLimitsEnabled bool `yaml:"per_request_limits_enabled"` - QueryPartitionIngesters bool `yaml:"query_partition_ingesters" category:"experimental"` + TailMaxDuration time.Duration `yaml:"tail_max_duration"` + ExtraQueryDelay time.Duration `yaml:"extra_query_delay,omitempty"` + QueryIngestersWithin time.Duration `yaml:"query_ingesters_within,omitempty"` + Engine logql.EngineOpts `yaml:"engine,omitempty"` + MaxConcurrent int `yaml:"max_concurrent"` + QueryStoreOnly bool `yaml:"query_store_only"` + QueryIngesterOnly bool `yaml:"query_ingester_only"` + MultiTenantQueriesEnabled bool `yaml:"multi_tenant_queries_enabled"` + PerRequestLimitsEnabled bool `yaml:"per_request_limits_enabled"` + QueryPartitionIngesters bool `yaml:"query_partition_ingesters" category:"experimental"` + + IngesterQueryStoreMaxLookback time.Duration `yaml:"-"` + QueryPatternIngestersWithin time.Duration `yaml:"-"` } // RegisterFlags register flags. @@ -289,92 +287,20 @@ func (q *SingleTenantQuerier) isWithinIngesterMaxLookbackPeriod(maxLookback time return queryEnd.After(ingesterOldestStartTime) } -func (q *SingleTenantQuerier) calculateIngesterMaxLookbackPeriod() time.Duration { +func (q *SingleTenantQuerier) calculateIngesterMaxLookbackPeriod(queryIngestersWithin time.Duration) time.Duration { mlb := time.Duration(-1) if q.cfg.IngesterQueryStoreMaxLookback != 0 { // IngesterQueryStoreMaxLookback takes the precedence over QueryIngestersWithin while also limiting the store query range. mlb = q.cfg.IngesterQueryStoreMaxLookback - } else if q.cfg.QueryIngestersWithin != 0 { - mlb = q.cfg.QueryIngestersWithin + } else if queryIngestersWithin != 0 { + mlb = queryIngestersWithin } return mlb } -func (q *SingleTenantQuerier) buildQueryIntervals(queryStart, queryEnd time.Time) (*interval, *interval) { - // limitQueryInterval is a flag for whether store queries should be limited to start time of ingester queries. - limitQueryInterval := false - // ingesterMLB having -1 means query ingester for whole duration. - if q.cfg.IngesterQueryStoreMaxLookback != 0 { - // IngesterQueryStoreMaxLookback takes the precedence over QueryIngestersWithin while also limiting the store query range. - limitQueryInterval = true - } - - ingesterMLB := q.calculateIngesterMaxLookbackPeriod() - - // query ingester for whole duration. - if ingesterMLB == -1 { - i := &interval{ - start: queryStart, - end: queryEnd, - } - - if limitQueryInterval { - // query only ingesters. - return i, nil - } - - // query both stores and ingesters without limiting the query interval. - return i, i - } - - ingesterQueryWithinRange := q.isWithinIngesterMaxLookbackPeriod(ingesterMLB, queryEnd) - - // see if there is an overlap between ingester query interval and actual query interval, if not just do the store query. - if !ingesterQueryWithinRange { - return nil, &interval{ - start: queryStart, - end: queryEnd, - } - } - - ingesterOldestStartTime := time.Now().Add(-ingesterMLB) - - // if there is an overlap and we are not limiting the query interval then do both store and ingester query for whole query interval. - if !limitQueryInterval { - i := &interval{ - start: queryStart, - end: queryEnd, - } - return i, i - } - - // since we are limiting the query interval, check if the query touches just the ingesters, if yes then query just the ingesters. - if ingesterOldestStartTime.Before(queryStart) { - return &interval{ - start: queryStart, - end: queryEnd, - }, nil - } - - // limit the start of ingester query interval to ingesterOldestStartTime. - ingesterQueryInterval := &interval{ - start: ingesterOldestStartTime, - end: queryEnd, - } - - // limit the end of ingester query interval to ingesterOldestStartTime. - storeQueryInterval := &interval{ - start: queryStart, - end: ingesterOldestStartTime, - } - - // query touches only ingester query interval so do not do store query. - if storeQueryInterval.start.After(storeQueryInterval.end) { - storeQueryInterval = nil - } - - return ingesterQueryInterval, storeQueryInterval +func (q *SingleTenantQuerier) buildQueryIntervals(queryStart, queryEnd time.Time) (*QueryInterval, *QueryInterval) { + return BuildQueryIntervalsWithLookback(q.cfg, queryStart, queryEnd, q.cfg.QueryIngestersWithin) } // Label does the heavy lifting for a Label query. diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go index 0b412cd3c6..0648a20409 100644 --- a/pkg/querier/querier_mock_test.go +++ b/pkg/querier/querier_mock_test.go @@ -15,6 +15,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql" + "github.com/stretchr/testify/mock" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" grpc_metadata "google.golang.org/grpc/metadata" @@ -792,10 +794,91 @@ func (e *engineMock) Query(p logql.Params) logql.Query { type queryMock struct { result logqlmodel.Result + err error } func (q queryMock) Exec(_ context.Context) (logqlmodel.Result, error) { - return q.result, nil + return q.result, q.err +} + +// mockPatternQuerier implements pattern.PatterQuerier interface for testing +type mockPatternQuerier struct { + mock.Mock +} + +func newMockPatternQuerier() *mockPatternQuerier { + return &mockPatternQuerier{} +} + +func (m *mockPatternQuerier) Patterns(ctx context.Context, req *logproto.QueryPatternsRequest) (*logproto.QueryPatternsResponse, error) { + args := m.Called(ctx, req) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*logproto.QueryPatternsResponse), args.Error(1) +} + +// Helper types and functions for pattern tests +type patternSample struct { + pattern string + timestamp int64 + value int64 +} + +// newMockEngineWithPatterns creates a mock engine that returns patterns when queried +func newMockEngineWithPatterns(patterns []string) logql.Engine { + engine := &engineMock{} + + // Create a result with the patterns + matrix := promql.Matrix{} + for _, pattern := range patterns { + matrix = append(matrix, promql.Series{ + Metric: labels.Labels{ + {Name: "service_name", Value: "test-service"}, + {Name: "decoded_pattern", Value: pattern}, + }, + Floats: []promql.FPoint{ + {T: time.Now().UnixMilli(), F: 100}, + }, + }) + } + + result := logqlmodel.Result{ + Data: matrix, + } + + // Mock the Query method to return a query that returns our result + engine.On("Query", mock.Anything).Return(&queryMock{result: result, err: nil}) + + return engine +} + +// newMockEngineWithPatternsAndTimestamps creates a mock engine that returns patterns with specific timestamps +func newMockEngineWithPatternsAndTimestamps(patternSamples []patternSample) logql.Engine { + engine := &engineMock{} + + // Create a result with the patterns + matrix := promql.Matrix{} + for _, ps := range patternSamples { + matrix = append(matrix, promql.Series{ + Metric: labels.Labels{ + {Name: "service_name", Value: "test-service"}, + {Name: "decoded_pattern", Value: ps.pattern}, + }, + Floats: []promql.FPoint{ + {T: ps.timestamp, F: float64(ps.value)}, + }, + }) + } + + result := logqlmodel.Result{ + Data: matrix, + } + + // Mock the Query method to return a query that returns our result + engine.On("Query", mock.Anything).Return(&queryMock{result: result, err: nil}) + + return engine } type mockTenantLimits map[string]*validation.Limits diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index cd47aed6e6..de0ad88853 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -424,19 +424,19 @@ func TestQuerier_buildQueryIntervals(t *testing.T) { // For simplicity it is always assumed that ingesterQueryStoreMaxLookback and queryIngestersWithin both would be set upto 11 hours so // overlappingQuery has range of last 11 hours while nonOverlappingQuery has range older than last 11 hours. // We would test the cases below with both the queries. - overlappingQuery := interval{ + overlappingQuery := QueryInterval{ start: time.Now().Add(-6 * time.Hour), end: time.Now(), } - nonOverlappingQuery := interval{ + nonOverlappingQuery := QueryInterval{ start: time.Now().Add(-24 * time.Hour), end: time.Now().Add(-12 * time.Hour), } type response struct { - ingesterQueryInterval *interval - storeQueryInterval *interval + ingesterQueryInterval *QueryInterval + storeQueryInterval *QueryInterval } compareResponse := func(t *testing.T, expectedResponse, actualResponse response) { @@ -477,11 +477,11 @@ func TestQuerier_buildQueryIntervals(t *testing.T) { name: "ingesterQueryStoreMaxLookback set to 1h", ingesterQueryStoreMaxLookback: time.Hour, overlappingQueryExpectedResponse: response{ // query ingesters for last 1h and store until last 1h. - ingesterQueryInterval: &interval{ + ingesterQueryInterval: &QueryInterval{ start: time.Now().Add(-time.Hour), end: overlappingQuery.end, }, - storeQueryInterval: &interval{ + storeQueryInterval: &QueryInterval{ start: overlappingQuery.start, end: time.Now().Add(-time.Hour), }, @@ -505,11 +505,11 @@ func TestQuerier_buildQueryIntervals(t *testing.T) { ingesterQueryStoreMaxLookback: time.Hour, queryIngestersWithin: 2 * time.Hour, overlappingQueryExpectedResponse: response{ // query ingesters for last 1h and store until last 1h. - ingesterQueryInterval: &interval{ + ingesterQueryInterval: &QueryInterval{ start: time.Now().Add(-time.Hour), end: overlappingQuery.end, }, - storeQueryInterval: &interval{ + storeQueryInterval: &QueryInterval{ start: overlappingQuery.start, end: time.Now().Add(-time.Hour), }, @@ -523,11 +523,11 @@ func TestQuerier_buildQueryIntervals(t *testing.T) { ingesterQueryStoreMaxLookback: 2 * time.Hour, queryIngestersWithin: time.Hour, overlappingQueryExpectedResponse: response{ // query ingesters for last 2h and store until last 2h. - ingesterQueryInterval: &interval{ + ingesterQueryInterval: &QueryInterval{ start: time.Now().Add(-2 * time.Hour), end: overlappingQuery.end, }, - storeQueryInterval: &interval{ + storeQueryInterval: &QueryInterval{ start: overlappingQuery.start, end: time.Now().Add(-2 * time.Hour), }, @@ -624,18 +624,18 @@ func TestQuerier_calculateIngesterMaxLookbackPeriod(t *testing.T) { QueryIngestersWithin: tc.queryIngestersWithin, }} - assert.Equal(t, tc.expected, querier.calculateIngesterMaxLookbackPeriod()) + assert.Equal(t, tc.expected, querier.calculateIngesterMaxLookbackPeriod(querier.cfg.QueryIngestersWithin)) }) } } func TestQuerier_isWithinIngesterMaxLookbackPeriod(t *testing.T) { - overlappingQuery := interval{ + overlappingQuery := QueryInterval{ start: time.Now().Add(-6 * time.Hour), end: time.Now(), } - nonOverlappingQuery := interval{ + nonOverlappingQuery := QueryInterval{ start: time.Now().Add(-24 * time.Hour), end: time.Now().Add(-12 * time.Hour), } @@ -696,7 +696,7 @@ func TestQuerier_isWithinIngesterMaxLookbackPeriod(t *testing.T) { QueryIngestersWithin: tc.queryIngestersWithin, }} - lookbackPeriod := querier.calculateIngesterMaxLookbackPeriod() + lookbackPeriod := querier.calculateIngesterMaxLookbackPeriod(querier.cfg.QueryIngestersWithin) assert.Equal(t, tc.overlappingWithinRange, querier.isWithinIngesterMaxLookbackPeriod(lookbackPeriod, overlappingQuery.end)) assert.Equal(t, tc.nonOverlappingWithinRange, querier.isWithinIngesterMaxLookbackPeriod(lookbackPeriod, nonOverlappingQuery.end)) }) diff --git a/pkg/querier/queryrange/codec.go b/pkg/querier/queryrange/codec.go index 34d8900d29..cefec67192 100644 --- a/pkg/querier/queryrange/codec.go +++ b/pkg/querier/queryrange/codec.go @@ -32,6 +32,7 @@ import ( "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/logqlmodel" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" + "github.com/grafana/loki/v3/pkg/querier/pattern" "github.com/grafana/loki/v3/pkg/querier/plan" "github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase" "github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache" @@ -333,10 +334,7 @@ func (Codec) DecodeRequest(_ context.Context, r *http.Request, _ []string) (quer return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()) } - disableCacheReq := false - if strings.ToLower(strings.TrimSpace(r.Header.Get(cacheControlHeader))) == noCacheVal { - disableCacheReq = true - } + disableCacheReq := strings.ToLower(strings.TrimSpace(r.Header.Get(cacheControlHeader))) == noCacheVal switch op := getOperation(r.URL.Path); op { case QueryRangeOp: @@ -1597,6 +1595,19 @@ func (Codec) MergeResponse(responses ...queryrangebase.Response) (queryrangebase }, Headers: headers, }, nil + case *QueryPatternsResponse: + resp0 := responses[0].(*QueryPatternsResponse) + + logprotoResps := make([]*logproto.QueryPatternsResponse, 0, len(responses)) + for _, r := range responses { + logprotoResps = append(logprotoResps, r.(*QueryPatternsResponse).Response) + } + + mergedPatterns := pattern.MergePatternResponses(logprotoResps) + return &QueryPatternsResponse{ + Response: &logproto.QueryPatternsResponse{Series: mergedPatterns.Series}, + Headers: resp0.Headers, + }, nil default: return nil, fmt.Errorf("unknown response type (%T) in merging responses", responses[0]) } @@ -1799,8 +1810,31 @@ func ParamsFromRequest(req queryrangebase.Request) (logql.Params, error) { return ¶msDetectedLabelsWrapper{ DetectedLabelsRequest: r, }, nil + case *logproto.QueryPatternsRequest: + // We turn a QueryPatternsRequest into a LokiRequest when querying the store + // for persisted patterns, which we store as a specially crafted log line in the actual chunks. + // We need to leverage the logic of the query engine to properly extract them correctly. + query, err := r.GetSampleQuery() + if err != nil { + return nil, err + } + + expr, err := syntax.ParseExpr(query) + if err != nil { + return nil, err + } + + return ¶msRangeWrapper{ + LokiRequest: &LokiRequest{ + Query: expr.String(), + Step: r.GetStep(), + StartTs: r.GetStart(), + EndTs: r.GetEnd(), + Plan: &plan.QueryPlan{AST: expr}, + }, + }, nil default: - return nil, fmt.Errorf("expected one of the *LokiRequest, *LokiInstantRequest, *LokiSeriesRequest, *LokiLabelNamesRequest, *DetectedFieldsRequest, got (%T)", r) + return nil, fmt.Errorf("expected one of the *LokiRequest, *LokiInstantRequest, *LokiSeriesRequest, *LokiLabelNamesRequest, *DetectedFieldsRequest, *QueryPatternsRequest got (%T)", r) } } diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 233f8d3005..c7032b2ca7 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -258,6 +258,36 @@ func NewMiddleware( return nil, nil, err } + // Pattern middleware is just a metric middleware with sharding disabled. + // At some point hopefully we can implement sharding for pattern queries and remove this. + patternConfig := Config{ + Config: base.Config{ + AlignQueriesWithStep: cfg.AlignQueriesWithStep, + ResultsCacheConfig: cfg.ResultsCacheConfig, + CacheResults: cfg.CacheResults, + MaxRetries: cfg.MaxRetries, + ShardedQueries: false, + ShardAggregations: []string{}, + }, + Transformer: cfg.Transformer, + CacheIndexStatsResults: cfg.CacheIndexStatsResults, + StatsCacheConfig: cfg.StatsCacheConfig, + CacheVolumeResults: cfg.CacheVolumeResults, + VolumeCacheConfig: cfg.VolumeCacheConfig, + CacheInstantMetricResults: cfg.CacheInstantMetricResults, + InstantMetricCacheConfig: cfg.InstantMetricCacheConfig, + InstantMetricQuerySplitAlign: cfg.InstantMetricQuerySplitAlign, + CacheSeriesResults: cfg.CacheSeriesResults, + SeriesCacheConfig: cfg.SeriesCacheConfig, + CacheLabelResults: cfg.CacheLabelResults, + LabelsCacheConfig: cfg.LabelsCacheConfig, + } + patternTripperware, err := NewMetricTripperware(patternConfig, engineOpts, log, limits, schema, codec, iqo, resultsCache, + cacheGenNumLoader, retentionEnabled, PrometheusExtractor{}, metrics, indexStatsTripperware, metricsNamespace) + if err != nil { + return nil, nil, err + } + return base.MiddlewareFunc(func(next base.Handler) base.Handler { var ( metricRT = metricsTripperware.Wrap(next) @@ -270,6 +300,7 @@ func NewMiddleware( seriesVolumeRT = seriesVolumeTripperware.Wrap(next) detectedFieldsRT = detectedFieldsTripperware.Wrap(next) detectedLabelsRT = detectedLabelsTripperware.Wrap(next) + patternRT = patternTripperware.Wrap(next) ) return newRoundTripper( @@ -285,6 +316,7 @@ func NewMiddleware( seriesVolumeRT, detectedFieldsRT, detectedLabelsRT, + patternRT, limits, ) }), StopperWrapper{resultsCache, statsCache, volumeCache}, nil @@ -340,7 +372,7 @@ func NewDetectedLabelsCardinalityFilter(rt queryrangebase.Handler) queryrangebas type roundTripper struct { logger log.Logger - next, limited, log, metric, series, labels, instantMetric, indexStats, seriesVolume, detectedFields, detectedLabels base.Handler + next, limited, log, metric, series, labels, instantMetric, indexStats, seriesVolume, detectedFields, detectedLabels, pattern base.Handler limits Limits } @@ -348,7 +380,7 @@ type roundTripper struct { // newRoundTripper creates a new queryrange roundtripper func newRoundTripper( logger log.Logger, - next, limited, log, metric, series, labels, instantMetric, indexStats, seriesVolume, detectedFields, detectedLabels base.Handler, + next, limited, log, metric, series, labels, instantMetric, indexStats, seriesVolume, detectedFields, detectedLabels, pattern base.Handler, limits Limits, ) roundTripper { return roundTripper{ @@ -364,6 +396,7 @@ func newRoundTripper( seriesVolume: seriesVolume, detectedFields: detectedFields, detectedLabels: detectedLabels, + pattern: pattern, next: next, } } @@ -524,6 +557,23 @@ func (r roundTripper) Do(ctx context.Context, req base.Request) (base.Response, "start", op.Start, ) return r.detectedLabels.Do(ctx, req) + case *logproto.QueryPatternsRequest: + expr, err := syntax.ParseExpr(req.GetQuery()) + if err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()) + } + + groups, err := syntax.MatcherGroups(expr) + if err != nil { + level.Warn(logger).Log("msg", "unexpected matcher groups error in roundtripper", "err", err) + } + + for _, g := range groups { + if err := validateMatchers(ctx, r.limits, g.Matchers); err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()) + } + } + return r.pattern.Do(ctx, req) default: return r.next.Do(ctx, req) } diff --git a/pkg/querier/queryrange/roundtrip_test.go b/pkg/querier/queryrange/roundtrip_test.go index eba906355a..97e2f8f612 100644 --- a/pkg/querier/queryrange/roundtrip_test.go +++ b/pkg/querier/queryrange/roundtrip_test.go @@ -993,6 +993,7 @@ func TestPostQueries(t *testing.T) { handler, handler, handler, + handler, fakeLimits{}, ).Do(ctx, lreq) require.NoError(t, err)