feat: query persisted patterns (#17980)

Signed-off-by: Trevor Whitney <trevorjwhitney@gmail.com>
pull/17990/head
Trevor Whitney 3 weeks ago committed by GitHub
parent fd7321c88b
commit fdcc12da4d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 4
      docs/sources/shared/configuration.md
  2. 85
      pkg/logproto/extensions.go
  3. 124
      pkg/logproto/extensions_test.go
  4. 4
      pkg/loki/modules.go
  5. 5
      pkg/pattern/flush.go
  6. 7
      pkg/pattern/ingester.go
  7. 177
      pkg/querier/http.go
  8. 230
      pkg/querier/http_test.go
  9. 104
      pkg/querier/intervals.go
  10. 48
      pkg/querier/pattern/querier.go
  11. 110
      pkg/querier/querier.go
  12. 85
      pkg/querier/querier_mock_test.go
  13. 28
      pkg/querier/querier_test.go
  14. 44
      pkg/querier/queryrange/codec.go
  15. 54
      pkg/querier/queryrange/roundtrip.go
  16. 1
      pkg/querier/queryrange/roundtrip_test.go

@ -863,6 +863,10 @@ pattern_ingester:
# CLI flag: -pattern-ingester.max-allowed-line-length # CLI flag: -pattern-ingester.max-allowed-line-length
[max_allowed_line_length: <int> | default = 3000] [max_allowed_line_length: <int> | default = 3000]
# How long to retain patterns in the pattern ingester after they are pushed.
# CLI flag: -pattern-ingester.retain-for
[retain_for: <duration> | default = 3h]
# The index_gateway block configures the Loki index gateway server, responsible # The index_gateway block configures the Loki index gateway server, responsible
# for serving index queries without the need to constantly interact with the # for serving index queries without the need to constantly interact with the
# object store. # object store.

@ -1,9 +1,12 @@
package logproto package logproto
import ( import (
"fmt"
"slices"
"sort" "sort"
"strings" "strings"
"sync/atomic" //lint:ignore faillint we can't use go.uber.org/atomic with a protobuf struct without wrapping it. "sync/atomic" //lint:ignore faillint we can't use go.uber.org/atomic with a protobuf struct without wrapping it.
"time"
"github.com/cespare/xxhash/v2" "github.com/cespare/xxhash/v2"
"github.com/dustin/go-humanize" "github.com/dustin/go-humanize"
@ -11,7 +14,9 @@ import (
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
"github.com/grafana/loki/v3/pkg/util/constants"
) )
// This is the separator define in the Prometheus Labels.Hash function. // This is the separator define in the Prometheus Labels.Hash function.
@ -189,3 +194,83 @@ func (m *ShardsResponse) Merge(other *ShardsResponse) {
m.ChunkGroups = append(m.ChunkGroups, other.ChunkGroups...) m.ChunkGroups = append(m.ChunkGroups, other.ChunkGroups...)
m.Statistics.Merge(other.Statistics) m.Statistics.Merge(other.Statistics)
} }
func (m *QueryPatternsRequest) GetSampleQuery() (string, error) {
expr, err := syntax.ParseExpr(m.Query)
if err != nil {
return "", err
}
// Extract matchers from the expression
var matchers []*labels.Matcher
switch e := expr.(type) {
case *syntax.MatchersExpr:
matchers = e.Mts
case syntax.LogSelectorExpr:
matchers = e.Matchers()
default:
// Cannot extract matchers
return "", nil
}
// Find service_name from matchers
var serviceName string
var serviceMatcher labels.MatchType
for i, m := range matchers {
if m.Name == "service_name" {
matchers = slices.Delete(matchers, i, i+1)
serviceName = m.Value
serviceMatcher = m.Type
break
}
}
if serviceName == "" {
serviceName = ".+"
serviceMatcher = labels.MatchRegexp
}
// Build LogQL query for persisted patterns
logqlQuery := buildPatternLogQLQuery(serviceName, serviceMatcher, matchers, m.Step)
return logqlQuery, nil
}
func buildPatternLogQLQuery(serviceName string, serviceMatcher labels.MatchType, matchers []*labels.Matcher, step int64) string {
// Use step duration for sum_over_time window
stepDuration := max(time.Duration(step)*time.Millisecond, 10*time.Second)
if len(matchers) == 0 {
return buildPatterLogQLQueryString(serviceName, serviceMatcher.String(), "", stepDuration.String())
}
stringBuilder := strings.Builder{}
for i, matcher := range matchers {
stringBuilder.WriteString(matcher.String())
if i < len(matchers)-1 {
stringBuilder.WriteString(" | ")
}
}
return buildPatterLogQLQueryString(serviceName, serviceMatcher.String(), stringBuilder.String(), stepDuration.String())
}
func buildPatterLogQLQueryString(serviceName, serviceMatcher, matchers, step string) string {
decodePatternTransform := `label_format decoded_pattern=` + "`{{urldecode .detected_pattern}}`"
matchAndTransform := ""
if matchers == "" {
matchAndTransform = decodePatternTransform
} else {
matchAndTransform = fmt.Sprintf(`%s | %s`, matchers, decodePatternTransform)
}
return fmt.Sprintf(
`sum by (decoded_pattern, %s) (sum_over_time({__pattern__%s"%s"} | logfmt | %s | unwrap count [%s]))`,
constants.LevelLabel,
serviceMatcher,
serviceName,
matchAndTransform,
step,
)
}

@ -40,3 +40,127 @@ func TestShard_SpaceFor(t *testing.T) {
}) })
} }
} }
func TestQueryPatternsRequest_GetSampleQuery(t *testing.T) {
tests := []struct {
name string
query string
step int64
expected string
expectedError bool
}{
{
name: "simple selector with service_name",
query: `{service_name="test-service"}`,
step: 60000, // 1 minute in milliseconds
expected: `sum by (decoded_pattern, detected_level) (sum_over_time({__pattern__="test-service"} | logfmt | ` +
"label_format decoded_pattern=`{{urldecode .detected_pattern}}` " +
`| unwrap count [1m0s]))`,
},
{
name: "selector with service_name and additional labels",
query: `{service_name="test-service", env="prod", cluster="us-east-1"}`,
step: 300000, // 5 minutes in milliseconds
expected: `sum by (decoded_pattern, detected_level) (sum_over_time({__pattern__="test-service"} | logfmt | ` +
"env=\"prod\" | cluster=\"us-east-1\" | " +
"label_format decoded_pattern=`{{urldecode .detected_pattern}}` " +
`| unwrap count [5m0s]))`,
},
{
name: "selector with service_name and additional labels and match types",
query: `{service_name="test-service", env=~"prod", cluster!="us-east-1", foo!~"bar"}`,
step: 300000, // 5 minutes in milliseconds
expected: `sum by (decoded_pattern, detected_level) (sum_over_time({__pattern__="test-service"} | logfmt | ` +
"env=~\"prod\" | cluster!=\"us-east-1\" | foo!~\"bar\" | " +
"label_format decoded_pattern=`{{urldecode .detected_pattern}}` " +
`| unwrap count [5m0s]))`,
},
{
name: "small step gets minimum 10s window",
query: `{service_name="test-service"}`,
step: 5000, // 5 seconds in milliseconds
expected: `sum by (decoded_pattern, detected_level) (sum_over_time({__pattern__="test-service"} | logfmt | ` +
"label_format decoded_pattern=`{{urldecode .detected_pattern}}` " +
`| unwrap count [10s]))`,
},
{
name: "simple regex selector with service_name",
query: `{service_name=~"test-service"}`,
step: 10000, // 10 seconds in milliseconds
expected: `sum by (decoded_pattern, detected_level) (sum_over_time({__pattern__=~"test-service"} | logfmt | ` +
"label_format decoded_pattern=`{{urldecode .detected_pattern}}` " +
`| unwrap count [10s]))`,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
req := &QueryPatternsRequest{
Query: tc.query,
Step: tc.step,
}
result, err := req.GetSampleQuery()
if tc.expectedError {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Equal(t, tc.expected, result)
}
})
}
}
func TestQueryPatternsRequest_GetSampleQuery_NoServiceName(t *testing.T) {
tests := []struct {
name string
query string
expected string
expectedError bool
}{
{
name: "no service_name label",
query: `{env="prod", cluster="us-east-1"}`,
expected: `sum by (decoded_pattern, detected_level) (sum_over_time({__pattern__=~".+"} | logfmt | ` +
"env=\"prod\" | cluster=\"us-east-1\" | " +
"label_format decoded_pattern=`{{urldecode .detected_pattern}}` " +
`| unwrap count [1m0s]))`,
},
{
name: "no service_name label, mixed match types",
query: `{env!="prod", cluster=~"us-east-1", app!~"foo"}`,
expected: `sum by (decoded_pattern, detected_level) (sum_over_time({__pattern__=~".+"} | logfmt | ` +
"env!=\"prod\" | cluster=~\"us-east-1\" | app!~\"foo\" | " +
"label_format decoded_pattern=`{{urldecode .detected_pattern}}` " +
`| unwrap count [1m0s]))`,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
req := &QueryPatternsRequest{
Query: tc.query,
Step: 60000,
}
result, err := req.GetSampleQuery()
if tc.expectedError {
require.Error(t, err)
} else {
require.NoError(t, err)
require.Equal(t, tc.expected, result)
}
})
}
}
func TestQueryPatternsRequest_GetSampleQuery_InvalidQuery(t *testing.T) {
req := &QueryPatternsRequest{
Query: `{invalid query syntax`,
Step: 60000,
}
_, err := req.GetSampleQuery()
require.Error(t, err)
}

@ -563,6 +563,10 @@ func (t *Loki) initQuerier() (services.Service, error) {
if t.Cfg.Ingester.QueryStoreMaxLookBackPeriod != 0 { if t.Cfg.Ingester.QueryStoreMaxLookBackPeriod != 0 {
t.Cfg.Querier.IngesterQueryStoreMaxLookback = t.Cfg.Ingester.QueryStoreMaxLookBackPeriod t.Cfg.Querier.IngesterQueryStoreMaxLookback = t.Cfg.Ingester.QueryStoreMaxLookBackPeriod
} }
// Use Pattern ingester RetainFor value to determine when to query pattern ingesters
t.Cfg.Querier.QueryPatternIngestersWithin = t.Cfg.Pattern.RetainFor
// Querier worker's max concurrent must be the same as the querier setting // Querier worker's max concurrent must be the same as the querier setting
t.Cfg.Worker.MaxConcurrent = t.Cfg.Querier.MaxConcurrent t.Cfg.Worker.MaxConcurrent = t.Cfg.Querier.MaxConcurrent
deleteStore, err := t.deleteRequestsClient("querier", t.Overrides) deleteStore, err := t.deleteRequestsClient("querier", t.Overrides)

@ -2,7 +2,6 @@ package pattern
import ( import (
"fmt" "fmt"
"time"
"github.com/go-kit/log/level" "github.com/go-kit/log/level"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
@ -10,8 +9,6 @@ import (
"github.com/grafana/loki/v3/pkg/util" "github.com/grafana/loki/v3/pkg/util"
) )
const retainSampleFor = 3 * time.Hour
func (i *Ingester) initFlushQueues() { func (i *Ingester) initFlushQueues() {
// i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes) // i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes)
for j := 0; j < i.cfg.ConcurrentFlushes; j++ { for j := 0; j < i.cfg.ConcurrentFlushes; j++ {
@ -66,7 +63,7 @@ func (i *Ingester) sweepInstance(instance *instance, _, mayRemoveStreams bool) {
_ = instance.streams.ForEach(func(s *stream) (bool, error) { _ = instance.streams.ForEach(func(s *stream) (bool, error) {
if mayRemoveStreams { if mayRemoveStreams {
instance.streams.WithLock(func() { instance.streams.WithLock(func() {
if s.prune(retainSampleFor) { if s.prune(i.cfg.RetainFor) {
instance.removeStream(s) instance.removeStream(s)
} }
}) })

@ -45,6 +45,7 @@ type Config struct {
TeeConfig TeeConfig `yaml:"tee_config,omitempty" doc:"description=Configures the pattern tee which forwards requests to the pattern ingester."` TeeConfig TeeConfig `yaml:"tee_config,omitempty" doc:"description=Configures the pattern tee which forwards requests to the pattern ingester."`
ConnectionTimeout time.Duration `yaml:"connection_timeout"` ConnectionTimeout time.Duration `yaml:"connection_timeout"`
MaxAllowedLineLength int `yaml:"max_allowed_line_length,omitempty" doc:"description=The maximum length of log lines that can be used for pattern detection."` MaxAllowedLineLength int `yaml:"max_allowed_line_length,omitempty" doc:"description=The maximum length of log lines that can be used for pattern detection."`
RetainFor time.Duration `yaml:"retain_for,omitempty" doc:"description=How long to retain patterns in the pattern ingester after they are pushed."`
// For testing. // For testing.
factory ring_client.PoolFactory `yaml:"-"` factory ring_client.PoolFactory `yaml:"-"`
@ -100,6 +101,12 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) {
drain.DefaultConfig().MaxAllowedLineLength, drain.DefaultConfig().MaxAllowedLineLength,
"The maximum length of log lines that can be used for pattern detection.", "The maximum length of log lines that can be used for pattern detection.",
) )
fs.DurationVar(
&cfg.RetainFor,
"pattern-ingester.retain-for",
3*time.Hour,
"How long to retain patterns in the pattern ingester after they are pushed.",
)
} }
type TeeConfig struct { type TeeConfig struct {

@ -2,9 +2,11 @@ package querier
import ( import (
"context" "context"
"fmt"
"net/http" "net/http"
"slices" "slices"
"strconv" "strconv"
"sync"
"time" "time"
"github.com/go-kit/log" "github.com/go-kit/log"
@ -15,10 +17,13 @@ import (
"github.com/grafana/dskit/user" "github.com/grafana/dskit/user"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/promql/parser"
"github.com/thanos-io/objstore" "github.com/thanos-io/objstore"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
"github.com/grafana/loki/v3/pkg/engine" "github.com/grafana/loki/v3/pkg/engine"
"github.com/grafana/loki/v3/pkg/loghttp" "github.com/grafana/loki/v3/pkg/loghttp"
@ -28,6 +33,7 @@ import (
"github.com/grafana/loki/v3/pkg/logqlmodel" "github.com/grafana/loki/v3/pkg/logqlmodel"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats" "github.com/grafana/loki/v3/pkg/logqlmodel/stats"
querier_limits "github.com/grafana/loki/v3/pkg/querier/limits" querier_limits "github.com/grafana/loki/v3/pkg/querier/limits"
"github.com/grafana/loki/v3/pkg/querier/pattern"
"github.com/grafana/loki/v3/pkg/querier/queryrange" "github.com/grafana/loki/v3/pkg/querier/queryrange"
index_stats "github.com/grafana/loki/v3/pkg/storage/stores/index/stats" index_stats "github.com/grafana/loki/v3/pkg/storage/stores/index/stats"
"github.com/grafana/loki/v3/pkg/tracing" "github.com/grafana/loki/v3/pkg/tracing"
@ -393,19 +399,184 @@ func (q *QuerierAPI) DetectedFieldsHandler(ctx context.Context, req *logproto.De
return resp, nil return resp, nil
} }
type asyncPatternResponses struct {
responses []*logproto.QueryPatternsResponse
lock sync.Mutex
}
// currently, the only contention is when adding, however if that behavior changes we may need to lock in the read methods as well
func (r *asyncPatternResponses) add(resp *logproto.QueryPatternsResponse) {
if resp == nil {
return
}
r.lock.Lock()
defer r.lock.Unlock()
r.responses = append(r.responses, resp)
}
func (r *asyncPatternResponses) get() []*logproto.QueryPatternsResponse {
return r.responses
}
func (r *asyncPatternResponses) len() int {
return len(r.responses)
}
func (q *QuerierAPI) PatternsHandler(ctx context.Context, req *logproto.QueryPatternsRequest) (*logproto.QueryPatternsResponse, error) { func (q *QuerierAPI) PatternsHandler(ctx context.Context, req *logproto.QueryPatternsRequest) (*logproto.QueryPatternsResponse, error) {
resp, err := q.querier.Patterns(ctx, req) // Calculate query intervals for ingester vs store
if err != nil { ingesterQueryInterval, storeQueryInterval := BuildQueryIntervalsWithLookback(q.cfg, req.Start, req.End, q.cfg.QueryPatternIngestersWithin)
responses := asyncPatternResponses{}
g, ctx := errgroup.WithContext(ctx)
// Query pattern ingesters for recent data
if ingesterQueryInterval != nil && !q.cfg.QueryStoreOnly && q.querier != nil {
g.Go(func() error {
splitReq := *req
splitReq.Start = ingesterQueryInterval.start
splitReq.End = ingesterQueryInterval.end
resp, err := q.querier.Patterns(ctx, &splitReq)
if err != nil {
return err
}
responses.add(resp)
return nil
})
}
// Query store for older data by converting to LogQL query
if storeQueryInterval != nil && !q.cfg.QueryIngesterOnly && q.engineV1 != nil {
g.Go(func() error {
storeReq := *req
storeReq.Start = storeQueryInterval.start
storeReq.End = storeQueryInterval.end
resp, err := q.queryStoreForPatterns(ctx, &storeReq)
if err != nil {
return err
}
responses.add(resp)
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err return nil, err
} }
if resp == nil { // Some stores don't implement this
// Merge responses
if responses.len() == 0 {
return &logproto.QueryPatternsResponse{ return &logproto.QueryPatternsResponse{
Series: []*logproto.PatternSeries{}, Series: []*logproto.PatternSeries{},
}, nil }, nil
} }
return pattern.MergePatternResponses(responses.get()), nil
}
func (q *QuerierAPI) queryStoreForPatterns(ctx context.Context, req *logproto.QueryPatternsRequest) (*logproto.QueryPatternsResponse, error) {
params, err := queryrange.ParamsFromRequest(req)
if err != nil {
return nil, err
}
// Patterns are persisted as logfmt'd strings, so we need to to run the query using the LogQL engine
// in order to extract the metric values from them.
query := q.engineV1.Query(params)
res, err := query.Exec(ctx)
if err != nil {
return nil, err
}
result := make(map[string]map[string]map[int64]float64) // level -> pattern -> timestamp -> value
switch v := res.Data.(type) {
case promql.Vector:
for _, s := range v {
lvl, pattern := getLevelAndPattern(s.Metric)
if pattern == "" {
continue
}
if result[lvl] == nil {
result[lvl] = make(map[string]map[int64]float64)
}
if result[lvl][pattern] == nil {
result[lvl][pattern] = make(map[int64]float64)
}
result[lvl][pattern][s.T] = s.F
}
case promql.Matrix:
for _, s := range v {
lvl, pattern := getLevelAndPattern(s.Metric)
if pattern == "" {
continue
}
if result[lvl] == nil {
result[lvl] = make(map[string]map[int64]float64)
}
if result[lvl][pattern] == nil {
result[lvl][pattern] = make(map[int64]float64)
}
for _, f := range s.Floats {
result[lvl][pattern][f.T] = f.F
}
}
default:
return nil, fmt.Errorf("unexpected type (%s) when querying store for patterns. Expected %s or %s", res.Data.Type(), parser.ValueTypeMatrix, parser.ValueTypeVector)
}
// Convert to pattern response format
resp := &logproto.QueryPatternsResponse{
Series: make([]*logproto.PatternSeries, 0, len(result)),
}
for lvl, patterns := range result {
for pattern, samples := range patterns {
series := &logproto.PatternSeries{
Pattern: pattern,
Level: lvl,
Samples: make([]*logproto.PatternSample, 0, len(samples)),
}
// Convert samples map to slice and sort by timestamp
timestamps := make([]int64, 0, len(samples))
for ts := range samples {
timestamps = append(timestamps, ts)
}
slices.Sort(timestamps)
level.Debug(q.logger).Log(
"msg", "earliest pattern sample from store",
"timestamp", timestamps[0],
)
for _, ts := range timestamps {
series.Samples = append(series.Samples, &logproto.PatternSample{
Timestamp: model.Time(ts),
Value: int64(samples[ts]),
})
}
resp.Series = append(resp.Series, series)
}
}
return resp, nil return resp, nil
} }
func getLevelAndPattern(metric labels.Labels) (string, string) {
lvl := metric.Get(constants.LevelLabel)
if lvl == "" {
lvl = constants.LogLevelUnknown
}
pattern := metric.Get("decoded_pattern")
return lvl, pattern
}
func (q *QuerierAPI) validateMaxEntriesLimits(ctx context.Context, expr syntax.Expr, limit uint32) error { func (q *QuerierAPI) validateMaxEntriesLimits(ctx context.Context, expr syntax.Expr, limit uint32) error {
tenantIDs, err := tenant.TenantIDs(ctx) tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil { if err != nil {

@ -421,3 +421,233 @@ func setupAPI(t *testing.T, querier *querierMock, enableMetricAggregation bool)
api := NewQuerierAPI(Config{}, querier, limits, nil, nil, log.NewNopLogger()) api := NewQuerierAPI(Config{}, querier, limits, nil, nil, log.NewNopLogger())
return api return api
} }
func TestPatternsHandler(t *testing.T) {
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
ctx := user.InjectOrgID(context.Background(), "test")
now := time.Now()
// Mock pattern responses
mockPatternResponse := func(patterns []string) *logproto.QueryPatternsResponse {
resp := &logproto.QueryPatternsResponse{
Series: make([]*logproto.PatternSeries, 0),
}
for i, pattern := range patterns {
resp.Series = append(resp.Series, &logproto.PatternSeries{
Pattern: pattern,
Level: constants.LogLevelInfo,
Samples: []*logproto.PatternSample{
{
Timestamp: model.Time(now.Unix() * 1000),
Value: int64((i + 1) * 100),
},
},
})
}
return resp
}
tests := []struct {
name string
start, end time.Time
queryIngestersWithin time.Duration
queryPatternIngestersWithin time.Duration
ingesterQueryStoreMaxLookback time.Duration
setupQuerier func() Querier
patternsFromStore []string
expectedPatterns []string
expectedError error
queryStoreOnly bool
queryIngesterOnly bool
}{
{
name: "query both ingester and store when time range overlaps and ingesterQueryStoreMaxLookback is not set",
start: now.Add(-2 * time.Hour),
end: now,
queryIngestersWithin: 3 * time.Hour,
setupQuerier: func() Querier {
q := &querierMock{}
q.On("Patterns", mock.Anything, mock.MatchedBy(func(req *logproto.QueryPatternsRequest) bool {
// Should query recent data only (within queryIngestersWithin)
return req.Start.After(now.Add(-3*time.Hour)) && req.End.Equal(now)
})).Return(mockPatternResponse([]string{"pattern1", "pattern2"}), nil)
return q
},
patternsFromStore: []string{"pattern3", "pattern4"},
expectedPatterns: []string{"pattern1", "pattern2", "pattern3", "pattern4"},
},
{
name: "query only store when time range is older than ingester lookback",
start: now.Add(-10 * time.Hour),
end: now.Add(-4 * time.Hour),
queryIngestersWithin: 3 * time.Hour,
setupQuerier: func() Querier {
// Should not be called
q := &querierMock{}
return q
},
patternsFromStore: []string{"old_pattern1", "old_pattern2"},
expectedPatterns: []string{"old_pattern1", "old_pattern2"},
},
{
name: "query only ingester when time range is recent and ingesterQueryStoreMaxLookback is set",
start: now.Add(-30 * time.Minute),
end: now,
queryIngestersWithin: 3 * time.Hour,
ingesterQueryStoreMaxLookback: 1 * time.Hour,
setupQuerier: func() Querier {
q := &querierMock{}
q.On("Patterns", mock.Anything, mock.MatchedBy(func(req *logproto.QueryPatternsRequest) bool {
return req.Start.Equal(now.Add(-30*time.Minute)) && req.End.Equal(now)
})).Return(mockPatternResponse([]string{"recent_pattern1", "recent_pattern2"}), nil)
return q
},
patternsFromStore: []string{"old_pattern1", "old_pattern2"},
expectedPatterns: []string{"recent_pattern1", "recent_pattern2"},
},
{
name: "query store only when configured",
start: now.Add(-2 * time.Hour),
end: now,
queryStoreOnly: true,
setupQuerier: func() Querier {
// Should not be called
return newQuerierMock()
},
patternsFromStore: []string{"store_only_pattern"},
expectedPatterns: []string{"store_only_pattern"},
},
{
name: "query ingester only when configured",
start: now.Add(-2 * time.Hour),
end: now,
queryIngesterOnly: true,
queryIngestersWithin: 3 * time.Hour,
setupQuerier: func() Querier {
q := &querierMock{}
q.On("Patterns", mock.Anything, mock.Anything).Return(mockPatternResponse([]string{"ingester_only_pattern"}), nil)
return q
},
patternsFromStore: []string{"store_only_pattern"},
expectedPatterns: []string{"ingester_only_pattern"},
},
{
name: "returns empty response when no patterns found",
start: now.Add(-2 * time.Hour),
end: now,
queryIngestersWithin: 3 * time.Hour,
setupQuerier: func() Querier {
q := &querierMock{}
q.On("Patterns", mock.Anything, mock.Anything).Return(&logproto.QueryPatternsResponse{Series: []*logproto.PatternSeries{}}, nil)
return q
},
},
{
name: "query uses pattern-specific lookback when configured",
start: now.Add(-2 * time.Hour),
end: now.Add(-90 * time.Minute), // Query ends before pattern ingester lookback
queryIngestersWithin: 3 * time.Hour,
queryPatternIngestersWithin: 1 * time.Hour, // Pattern ingester only looks back 1 hour
setupQuerier: func() Querier {
// Should not be called since query is entirely outside pattern ingester lookback
return newQuerierMock()
},
patternsFromStore: []string{"pattern_from_store"},
expectedPatterns: []string{"pattern_from_store"},
},
{
name: "merges patterns from both sources correctly",
start: now.Add(-2 * time.Hour),
end: now,
queryIngestersWithin: 3 * time.Hour,
setupQuerier: func() Querier {
q := &querierMock{}
q.On("Patterns", mock.Anything, mock.Anything).Return(&logproto.QueryPatternsResponse{
Series: []*logproto.PatternSeries{
{
Pattern: "common_pattern",
Samples: []*logproto.PatternSample{
{Timestamp: model.Time(now.Add(-30*time.Minute).Unix() * 1000), Value: 100},
},
},
{
Pattern: "ingester_pattern",
Samples: []*logproto.PatternSample{
{Timestamp: model.Time(now.Add(-15*time.Minute).Unix() * 1000), Value: 200},
},
},
},
}, nil)
return q
},
patternsFromStore: []string{"common_pattern", "store_pattern"},
expectedPatterns: []string{"common_pattern", "ingester_pattern", "store_pattern"},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
conf := mockQuerierConfig()
conf.QueryIngestersWithin = tc.queryIngestersWithin
if tc.queryPatternIngestersWithin != 0 {
conf.QueryPatternIngestersWithin = tc.queryPatternIngestersWithin
} else {
conf.QueryPatternIngestersWithin = tc.queryIngestersWithin
}
conf.IngesterQueryStoreMaxLookback = tc.ingesterQueryStoreMaxLookback
conf.QueryStoreOnly = tc.queryStoreOnly
conf.QueryIngesterOnly = tc.queryIngesterOnly
querier := tc.setupQuerier()
// Create a mock engine that returns the expected patterns from store
engineMock := newMockEngineWithPatterns(tc.patternsFromStore)
api := &QuerierAPI{
cfg: conf,
querier: querier,
limits: limits,
engineV1: engineMock,
logger: log.NewNopLogger(),
}
req := &logproto.QueryPatternsRequest{
Query: `{service_name="test-service"}`,
Start: tc.start,
End: tc.end,
Step: time.Minute.Milliseconds(),
}
resp, err := api.PatternsHandler(ctx, req)
if tc.expectedError != nil {
require.Error(t, err)
require.Equal(t, tc.expectedError, err)
return
}
require.NoError(t, err)
require.NotNil(t, resp)
// Collect all patterns from response
foundPatterns := make(map[string]bool)
for _, series := range resp.Series {
foundPatterns[series.Pattern] = true
}
// Check expected patterns from ingester
for _, pattern := range tc.expectedPatterns {
require.True(t, foundPatterns[pattern], "Expected pattern %s, not found", pattern)
}
require.Equal(t, len(tc.expectedPatterns), len(resp.Series), "Unexpected number of patterns in response")
// Verify mocks were called as expected (if it's a mock)
if q, ok := querier.(*querierMock); ok {
q.AssertExpectations(t)
}
})
}
}

@ -0,0 +1,104 @@
package querier
import "time"
type QueryInterval struct {
start, end time.Time
}
func BuildQueryIntervalsWithLookback(cfg Config, queryStart, queryEnd time.Time, queryIngestersWithin time.Duration) (*QueryInterval, *QueryInterval) {
// limitQueryInterval is a flag for whether store queries should be limited to start time of ingester queries.
limitQueryInterval := cfg.IngesterQueryStoreMaxLookback != 0
// ingesterMLB having -1 means query ingester for whole duration.
ingesterMLB := calculateIngesterMaxLookbackPeriod(cfg, queryIngestersWithin)
// query ingester for whole duration.
if ingesterMLB == -1 {
i := &QueryInterval{
start: queryStart,
end: queryEnd,
}
if limitQueryInterval {
// query only ingesters.
return i, nil
}
// query both stores and ingesters without limiting the query interval.
return i, i
}
ingesterQueryWithinRange := isWithinIngesterMaxLookbackPeriod(ingesterMLB, queryEnd)
// see if there is an overlap between ingester query interval and actual query interval, if not just do the store query.
if !ingesterQueryWithinRange {
return nil, &QueryInterval{
start: queryStart,
end: queryEnd,
}
}
ingesterOldestStartTime := time.Now().Add(-ingesterMLB)
// if there is an overlap and we are not limiting the query interval then do both store and ingester query for whole query interval.
if !limitQueryInterval {
i := &QueryInterval{
start: queryStart,
end: queryEnd,
}
return i, i
}
// since we are limiting the query interval, check if the query touches just the ingesters, if yes then query just the ingesters.
if ingesterOldestStartTime.Before(queryStart) {
return &QueryInterval{
start: queryStart,
end: queryEnd,
}, nil
}
// limit the start of ingester query interval to ingesterOldestStartTime.
ingesterQueryInterval := &QueryInterval{
start: ingesterOldestStartTime,
end: queryEnd,
}
// limit the end of ingester query interval to ingesterOldestStartTime.
storeQueryInterval := &QueryInterval{
start: queryStart,
end: ingesterOldestStartTime,
}
// query touches only ingester query interval so do not do store query.
if storeQueryInterval.start.After(storeQueryInterval.end) {
storeQueryInterval = nil
}
return ingesterQueryInterval, storeQueryInterval
}
func calculateIngesterMaxLookbackPeriod(cfg Config, queryIngestersWithin time.Duration) time.Duration {
mlb := time.Duration(-1)
if cfg.IngesterQueryStoreMaxLookback != 0 {
// IngesterQueryStoreMaxLookback takes the precedence over QueryIngestersWithin while also limiting the store query range.
mlb = cfg.IngesterQueryStoreMaxLookback
} else if queryIngestersWithin != 0 {
mlb = queryIngestersWithin
}
return mlb
}
func isWithinIngesterMaxLookbackPeriod(maxLookback time.Duration, queryEnd time.Time) bool {
// if no lookback limits are configured, always consider this within the range of the lookback period
if maxLookback <= 0 {
return true
}
// find the first instance that we would want to query the ingester from...
ingesterOldestStartTime := time.Now().Add(-maxLookback)
// ...and if the query range ends before that, don't query the ingester
return queryEnd.After(ingesterOldestStartTime)
}

@ -2,6 +2,7 @@ package pattern
import ( import (
"context" "context"
"sort"
"github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logproto"
) )
@ -9,3 +10,50 @@ import (
type PatterQuerier interface { type PatterQuerier interface {
Patterns(ctx context.Context, req *logproto.QueryPatternsRequest) (*logproto.QueryPatternsResponse, error) Patterns(ctx context.Context, req *logproto.QueryPatternsRequest) (*logproto.QueryPatternsResponse, error)
} }
func MergePatternResponses(responses []*logproto.QueryPatternsResponse) *logproto.QueryPatternsResponse {
if len(responses) == 0 {
return &logproto.QueryPatternsResponse{
Series: []*logproto.PatternSeries{},
}
}
if len(responses) == 1 {
return responses[0]
}
// Merge patterns by pattern string
patternMap := make(map[string]*logproto.PatternSeries)
for _, resp := range responses {
if resp == nil {
continue
}
for _, series := range resp.Series {
existing, exists := patternMap[series.Pattern]
if !exists {
patternMap[series.Pattern] = series
continue
}
// Merge samples
existing.Samples = append(existing.Samples, series.Samples...)
}
}
// Sort samples within each series by timestamp
result := &logproto.QueryPatternsResponse{
Series: make([]*logproto.PatternSeries, 0, len(patternMap)),
}
for _, series := range patternMap {
// Sort samples by timestamp
sort.Slice(series.Samples, func(i, j int) bool {
return series.Samples[i].Timestamp < series.Samples[j].Timestamp
})
result.Series = append(result.Series, series)
}
return result
}

@ -49,23 +49,21 @@ import (
var tracer = otel.Tracer("pkg/querier") var tracer = otel.Tracer("pkg/querier")
type interval struct {
start, end time.Time
}
// Config for a querier. // Config for a querier.
type Config struct { type Config struct {
TailMaxDuration time.Duration `yaml:"tail_max_duration"` TailMaxDuration time.Duration `yaml:"tail_max_duration"`
ExtraQueryDelay time.Duration `yaml:"extra_query_delay,omitempty"` ExtraQueryDelay time.Duration `yaml:"extra_query_delay,omitempty"`
QueryIngestersWithin time.Duration `yaml:"query_ingesters_within,omitempty"` QueryIngestersWithin time.Duration `yaml:"query_ingesters_within,omitempty"`
IngesterQueryStoreMaxLookback time.Duration `yaml:"-"` Engine logql.EngineOpts `yaml:"engine,omitempty"`
Engine logql.EngineOpts `yaml:"engine,omitempty"` MaxConcurrent int `yaml:"max_concurrent"`
MaxConcurrent int `yaml:"max_concurrent"` QueryStoreOnly bool `yaml:"query_store_only"`
QueryStoreOnly bool `yaml:"query_store_only"` QueryIngesterOnly bool `yaml:"query_ingester_only"`
QueryIngesterOnly bool `yaml:"query_ingester_only"` MultiTenantQueriesEnabled bool `yaml:"multi_tenant_queries_enabled"`
MultiTenantQueriesEnabled bool `yaml:"multi_tenant_queries_enabled"` PerRequestLimitsEnabled bool `yaml:"per_request_limits_enabled"`
PerRequestLimitsEnabled bool `yaml:"per_request_limits_enabled"` QueryPartitionIngesters bool `yaml:"query_partition_ingesters" category:"experimental"`
QueryPartitionIngesters bool `yaml:"query_partition_ingesters" category:"experimental"`
IngesterQueryStoreMaxLookback time.Duration `yaml:"-"`
QueryPatternIngestersWithin time.Duration `yaml:"-"`
} }
// RegisterFlags register flags. // RegisterFlags register flags.
@ -289,92 +287,20 @@ func (q *SingleTenantQuerier) isWithinIngesterMaxLookbackPeriod(maxLookback time
return queryEnd.After(ingesterOldestStartTime) return queryEnd.After(ingesterOldestStartTime)
} }
func (q *SingleTenantQuerier) calculateIngesterMaxLookbackPeriod() time.Duration { func (q *SingleTenantQuerier) calculateIngesterMaxLookbackPeriod(queryIngestersWithin time.Duration) time.Duration {
mlb := time.Duration(-1) mlb := time.Duration(-1)
if q.cfg.IngesterQueryStoreMaxLookback != 0 { if q.cfg.IngesterQueryStoreMaxLookback != 0 {
// IngesterQueryStoreMaxLookback takes the precedence over QueryIngestersWithin while also limiting the store query range. // IngesterQueryStoreMaxLookback takes the precedence over QueryIngestersWithin while also limiting the store query range.
mlb = q.cfg.IngesterQueryStoreMaxLookback mlb = q.cfg.IngesterQueryStoreMaxLookback
} else if q.cfg.QueryIngestersWithin != 0 { } else if queryIngestersWithin != 0 {
mlb = q.cfg.QueryIngestersWithin mlb = queryIngestersWithin
} }
return mlb return mlb
} }
func (q *SingleTenantQuerier) buildQueryIntervals(queryStart, queryEnd time.Time) (*interval, *interval) { func (q *SingleTenantQuerier) buildQueryIntervals(queryStart, queryEnd time.Time) (*QueryInterval, *QueryInterval) {
// limitQueryInterval is a flag for whether store queries should be limited to start time of ingester queries. return BuildQueryIntervalsWithLookback(q.cfg, queryStart, queryEnd, q.cfg.QueryIngestersWithin)
limitQueryInterval := false
// ingesterMLB having -1 means query ingester for whole duration.
if q.cfg.IngesterQueryStoreMaxLookback != 0 {
// IngesterQueryStoreMaxLookback takes the precedence over QueryIngestersWithin while also limiting the store query range.
limitQueryInterval = true
}
ingesterMLB := q.calculateIngesterMaxLookbackPeriod()
// query ingester for whole duration.
if ingesterMLB == -1 {
i := &interval{
start: queryStart,
end: queryEnd,
}
if limitQueryInterval {
// query only ingesters.
return i, nil
}
// query both stores and ingesters without limiting the query interval.
return i, i
}
ingesterQueryWithinRange := q.isWithinIngesterMaxLookbackPeriod(ingesterMLB, queryEnd)
// see if there is an overlap between ingester query interval and actual query interval, if not just do the store query.
if !ingesterQueryWithinRange {
return nil, &interval{
start: queryStart,
end: queryEnd,
}
}
ingesterOldestStartTime := time.Now().Add(-ingesterMLB)
// if there is an overlap and we are not limiting the query interval then do both store and ingester query for whole query interval.
if !limitQueryInterval {
i := &interval{
start: queryStart,
end: queryEnd,
}
return i, i
}
// since we are limiting the query interval, check if the query touches just the ingesters, if yes then query just the ingesters.
if ingesterOldestStartTime.Before(queryStart) {
return &interval{
start: queryStart,
end: queryEnd,
}, nil
}
// limit the start of ingester query interval to ingesterOldestStartTime.
ingesterQueryInterval := &interval{
start: ingesterOldestStartTime,
end: queryEnd,
}
// limit the end of ingester query interval to ingesterOldestStartTime.
storeQueryInterval := &interval{
start: queryStart,
end: ingesterOldestStartTime,
}
// query touches only ingester query interval so do not do store query.
if storeQueryInterval.start.After(storeQueryInterval.end) {
storeQueryInterval = nil
}
return ingesterQueryInterval, storeQueryInterval
} }
// Label does the heavy lifting for a Label query. // Label does the heavy lifting for a Label query.

@ -15,6 +15,8 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/mock"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/health/grpc_health_v1"
grpc_metadata "google.golang.org/grpc/metadata" grpc_metadata "google.golang.org/grpc/metadata"
@ -792,10 +794,91 @@ func (e *engineMock) Query(p logql.Params) logql.Query {
type queryMock struct { type queryMock struct {
result logqlmodel.Result result logqlmodel.Result
err error
} }
func (q queryMock) Exec(_ context.Context) (logqlmodel.Result, error) { func (q queryMock) Exec(_ context.Context) (logqlmodel.Result, error) {
return q.result, nil return q.result, q.err
}
// mockPatternQuerier implements pattern.PatterQuerier interface for testing
type mockPatternQuerier struct {
mock.Mock
}
func newMockPatternQuerier() *mockPatternQuerier {
return &mockPatternQuerier{}
}
func (m *mockPatternQuerier) Patterns(ctx context.Context, req *logproto.QueryPatternsRequest) (*logproto.QueryPatternsResponse, error) {
args := m.Called(ctx, req)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(*logproto.QueryPatternsResponse), args.Error(1)
}
// Helper types and functions for pattern tests
type patternSample struct {
pattern string
timestamp int64
value int64
}
// newMockEngineWithPatterns creates a mock engine that returns patterns when queried
func newMockEngineWithPatterns(patterns []string) logql.Engine {
engine := &engineMock{}
// Create a result with the patterns
matrix := promql.Matrix{}
for _, pattern := range patterns {
matrix = append(matrix, promql.Series{
Metric: labels.Labels{
{Name: "service_name", Value: "test-service"},
{Name: "decoded_pattern", Value: pattern},
},
Floats: []promql.FPoint{
{T: time.Now().UnixMilli(), F: 100},
},
})
}
result := logqlmodel.Result{
Data: matrix,
}
// Mock the Query method to return a query that returns our result
engine.On("Query", mock.Anything).Return(&queryMock{result: result, err: nil})
return engine
}
// newMockEngineWithPatternsAndTimestamps creates a mock engine that returns patterns with specific timestamps
func newMockEngineWithPatternsAndTimestamps(patternSamples []patternSample) logql.Engine {
engine := &engineMock{}
// Create a result with the patterns
matrix := promql.Matrix{}
for _, ps := range patternSamples {
matrix = append(matrix, promql.Series{
Metric: labels.Labels{
{Name: "service_name", Value: "test-service"},
{Name: "decoded_pattern", Value: ps.pattern},
},
Floats: []promql.FPoint{
{T: ps.timestamp, F: float64(ps.value)},
},
})
}
result := logqlmodel.Result{
Data: matrix,
}
// Mock the Query method to return a query that returns our result
engine.On("Query", mock.Anything).Return(&queryMock{result: result, err: nil})
return engine
} }
type mockTenantLimits map[string]*validation.Limits type mockTenantLimits map[string]*validation.Limits

@ -424,19 +424,19 @@ func TestQuerier_buildQueryIntervals(t *testing.T) {
// For simplicity it is always assumed that ingesterQueryStoreMaxLookback and queryIngestersWithin both would be set upto 11 hours so // For simplicity it is always assumed that ingesterQueryStoreMaxLookback and queryIngestersWithin both would be set upto 11 hours so
// overlappingQuery has range of last 11 hours while nonOverlappingQuery has range older than last 11 hours. // overlappingQuery has range of last 11 hours while nonOverlappingQuery has range older than last 11 hours.
// We would test the cases below with both the queries. // We would test the cases below with both the queries.
overlappingQuery := interval{ overlappingQuery := QueryInterval{
start: time.Now().Add(-6 * time.Hour), start: time.Now().Add(-6 * time.Hour),
end: time.Now(), end: time.Now(),
} }
nonOverlappingQuery := interval{ nonOverlappingQuery := QueryInterval{
start: time.Now().Add(-24 * time.Hour), start: time.Now().Add(-24 * time.Hour),
end: time.Now().Add(-12 * time.Hour), end: time.Now().Add(-12 * time.Hour),
} }
type response struct { type response struct {
ingesterQueryInterval *interval ingesterQueryInterval *QueryInterval
storeQueryInterval *interval storeQueryInterval *QueryInterval
} }
compareResponse := func(t *testing.T, expectedResponse, actualResponse response) { compareResponse := func(t *testing.T, expectedResponse, actualResponse response) {
@ -477,11 +477,11 @@ func TestQuerier_buildQueryIntervals(t *testing.T) {
name: "ingesterQueryStoreMaxLookback set to 1h", name: "ingesterQueryStoreMaxLookback set to 1h",
ingesterQueryStoreMaxLookback: time.Hour, ingesterQueryStoreMaxLookback: time.Hour,
overlappingQueryExpectedResponse: response{ // query ingesters for last 1h and store until last 1h. overlappingQueryExpectedResponse: response{ // query ingesters for last 1h and store until last 1h.
ingesterQueryInterval: &interval{ ingesterQueryInterval: &QueryInterval{
start: time.Now().Add(-time.Hour), start: time.Now().Add(-time.Hour),
end: overlappingQuery.end, end: overlappingQuery.end,
}, },
storeQueryInterval: &interval{ storeQueryInterval: &QueryInterval{
start: overlappingQuery.start, start: overlappingQuery.start,
end: time.Now().Add(-time.Hour), end: time.Now().Add(-time.Hour),
}, },
@ -505,11 +505,11 @@ func TestQuerier_buildQueryIntervals(t *testing.T) {
ingesterQueryStoreMaxLookback: time.Hour, ingesterQueryStoreMaxLookback: time.Hour,
queryIngestersWithin: 2 * time.Hour, queryIngestersWithin: 2 * time.Hour,
overlappingQueryExpectedResponse: response{ // query ingesters for last 1h and store until last 1h. overlappingQueryExpectedResponse: response{ // query ingesters for last 1h and store until last 1h.
ingesterQueryInterval: &interval{ ingesterQueryInterval: &QueryInterval{
start: time.Now().Add(-time.Hour), start: time.Now().Add(-time.Hour),
end: overlappingQuery.end, end: overlappingQuery.end,
}, },
storeQueryInterval: &interval{ storeQueryInterval: &QueryInterval{
start: overlappingQuery.start, start: overlappingQuery.start,
end: time.Now().Add(-time.Hour), end: time.Now().Add(-time.Hour),
}, },
@ -523,11 +523,11 @@ func TestQuerier_buildQueryIntervals(t *testing.T) {
ingesterQueryStoreMaxLookback: 2 * time.Hour, ingesterQueryStoreMaxLookback: 2 * time.Hour,
queryIngestersWithin: time.Hour, queryIngestersWithin: time.Hour,
overlappingQueryExpectedResponse: response{ // query ingesters for last 2h and store until last 2h. overlappingQueryExpectedResponse: response{ // query ingesters for last 2h and store until last 2h.
ingesterQueryInterval: &interval{ ingesterQueryInterval: &QueryInterval{
start: time.Now().Add(-2 * time.Hour), start: time.Now().Add(-2 * time.Hour),
end: overlappingQuery.end, end: overlappingQuery.end,
}, },
storeQueryInterval: &interval{ storeQueryInterval: &QueryInterval{
start: overlappingQuery.start, start: overlappingQuery.start,
end: time.Now().Add(-2 * time.Hour), end: time.Now().Add(-2 * time.Hour),
}, },
@ -624,18 +624,18 @@ func TestQuerier_calculateIngesterMaxLookbackPeriod(t *testing.T) {
QueryIngestersWithin: tc.queryIngestersWithin, QueryIngestersWithin: tc.queryIngestersWithin,
}} }}
assert.Equal(t, tc.expected, querier.calculateIngesterMaxLookbackPeriod()) assert.Equal(t, tc.expected, querier.calculateIngesterMaxLookbackPeriod(querier.cfg.QueryIngestersWithin))
}) })
} }
} }
func TestQuerier_isWithinIngesterMaxLookbackPeriod(t *testing.T) { func TestQuerier_isWithinIngesterMaxLookbackPeriod(t *testing.T) {
overlappingQuery := interval{ overlappingQuery := QueryInterval{
start: time.Now().Add(-6 * time.Hour), start: time.Now().Add(-6 * time.Hour),
end: time.Now(), end: time.Now(),
} }
nonOverlappingQuery := interval{ nonOverlappingQuery := QueryInterval{
start: time.Now().Add(-24 * time.Hour), start: time.Now().Add(-24 * time.Hour),
end: time.Now().Add(-12 * time.Hour), end: time.Now().Add(-12 * time.Hour),
} }
@ -696,7 +696,7 @@ func TestQuerier_isWithinIngesterMaxLookbackPeriod(t *testing.T) {
QueryIngestersWithin: tc.queryIngestersWithin, QueryIngestersWithin: tc.queryIngestersWithin,
}} }}
lookbackPeriod := querier.calculateIngesterMaxLookbackPeriod() lookbackPeriod := querier.calculateIngesterMaxLookbackPeriod(querier.cfg.QueryIngestersWithin)
assert.Equal(t, tc.overlappingWithinRange, querier.isWithinIngesterMaxLookbackPeriod(lookbackPeriod, overlappingQuery.end)) assert.Equal(t, tc.overlappingWithinRange, querier.isWithinIngesterMaxLookbackPeriod(lookbackPeriod, overlappingQuery.end))
assert.Equal(t, tc.nonOverlappingWithinRange, querier.isWithinIngesterMaxLookbackPeriod(lookbackPeriod, nonOverlappingQuery.end)) assert.Equal(t, tc.nonOverlappingWithinRange, querier.isWithinIngesterMaxLookbackPeriod(lookbackPeriod, nonOverlappingQuery.end))
}) })

@ -32,6 +32,7 @@ import (
"github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/logqlmodel" "github.com/grafana/loki/v3/pkg/logqlmodel"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats" "github.com/grafana/loki/v3/pkg/logqlmodel/stats"
"github.com/grafana/loki/v3/pkg/querier/pattern"
"github.com/grafana/loki/v3/pkg/querier/plan" "github.com/grafana/loki/v3/pkg/querier/plan"
"github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase" "github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache" "github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache"
@ -333,10 +334,7 @@ func (Codec) DecodeRequest(_ context.Context, r *http.Request, _ []string) (quer
return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()) return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error())
} }
disableCacheReq := false disableCacheReq := strings.ToLower(strings.TrimSpace(r.Header.Get(cacheControlHeader))) == noCacheVal
if strings.ToLower(strings.TrimSpace(r.Header.Get(cacheControlHeader))) == noCacheVal {
disableCacheReq = true
}
switch op := getOperation(r.URL.Path); op { switch op := getOperation(r.URL.Path); op {
case QueryRangeOp: case QueryRangeOp:
@ -1597,6 +1595,19 @@ func (Codec) MergeResponse(responses ...queryrangebase.Response) (queryrangebase
}, },
Headers: headers, Headers: headers,
}, nil }, nil
case *QueryPatternsResponse:
resp0 := responses[0].(*QueryPatternsResponse)
logprotoResps := make([]*logproto.QueryPatternsResponse, 0, len(responses))
for _, r := range responses {
logprotoResps = append(logprotoResps, r.(*QueryPatternsResponse).Response)
}
mergedPatterns := pattern.MergePatternResponses(logprotoResps)
return &QueryPatternsResponse{
Response: &logproto.QueryPatternsResponse{Series: mergedPatterns.Series},
Headers: resp0.Headers,
}, nil
default: default:
return nil, fmt.Errorf("unknown response type (%T) in merging responses", responses[0]) return nil, fmt.Errorf("unknown response type (%T) in merging responses", responses[0])
} }
@ -1799,8 +1810,31 @@ func ParamsFromRequest(req queryrangebase.Request) (logql.Params, error) {
return &paramsDetectedLabelsWrapper{ return &paramsDetectedLabelsWrapper{
DetectedLabelsRequest: r, DetectedLabelsRequest: r,
}, nil }, nil
case *logproto.QueryPatternsRequest:
// We turn a QueryPatternsRequest into a LokiRequest when querying the store
// for persisted patterns, which we store as a specially crafted log line in the actual chunks.
// We need to leverage the logic of the query engine to properly extract them correctly.
query, err := r.GetSampleQuery()
if err != nil {
return nil, err
}
expr, err := syntax.ParseExpr(query)
if err != nil {
return nil, err
}
return &paramsRangeWrapper{
LokiRequest: &LokiRequest{
Query: expr.String(),
Step: r.GetStep(),
StartTs: r.GetStart(),
EndTs: r.GetEnd(),
Plan: &plan.QueryPlan{AST: expr},
},
}, nil
default: default:
return nil, fmt.Errorf("expected one of the *LokiRequest, *LokiInstantRequest, *LokiSeriesRequest, *LokiLabelNamesRequest, *DetectedFieldsRequest, got (%T)", r) return nil, fmt.Errorf("expected one of the *LokiRequest, *LokiInstantRequest, *LokiSeriesRequest, *LokiLabelNamesRequest, *DetectedFieldsRequest, *QueryPatternsRequest got (%T)", r)
} }
} }

@ -258,6 +258,36 @@ func NewMiddleware(
return nil, nil, err return nil, nil, err
} }
// Pattern middleware is just a metric middleware with sharding disabled.
// At some point hopefully we can implement sharding for pattern queries and remove this.
patternConfig := Config{
Config: base.Config{
AlignQueriesWithStep: cfg.AlignQueriesWithStep,
ResultsCacheConfig: cfg.ResultsCacheConfig,
CacheResults: cfg.CacheResults,
MaxRetries: cfg.MaxRetries,
ShardedQueries: false,
ShardAggregations: []string{},
},
Transformer: cfg.Transformer,
CacheIndexStatsResults: cfg.CacheIndexStatsResults,
StatsCacheConfig: cfg.StatsCacheConfig,
CacheVolumeResults: cfg.CacheVolumeResults,
VolumeCacheConfig: cfg.VolumeCacheConfig,
CacheInstantMetricResults: cfg.CacheInstantMetricResults,
InstantMetricCacheConfig: cfg.InstantMetricCacheConfig,
InstantMetricQuerySplitAlign: cfg.InstantMetricQuerySplitAlign,
CacheSeriesResults: cfg.CacheSeriesResults,
SeriesCacheConfig: cfg.SeriesCacheConfig,
CacheLabelResults: cfg.CacheLabelResults,
LabelsCacheConfig: cfg.LabelsCacheConfig,
}
patternTripperware, err := NewMetricTripperware(patternConfig, engineOpts, log, limits, schema, codec, iqo, resultsCache,
cacheGenNumLoader, retentionEnabled, PrometheusExtractor{}, metrics, indexStatsTripperware, metricsNamespace)
if err != nil {
return nil, nil, err
}
return base.MiddlewareFunc(func(next base.Handler) base.Handler { return base.MiddlewareFunc(func(next base.Handler) base.Handler {
var ( var (
metricRT = metricsTripperware.Wrap(next) metricRT = metricsTripperware.Wrap(next)
@ -270,6 +300,7 @@ func NewMiddleware(
seriesVolumeRT = seriesVolumeTripperware.Wrap(next) seriesVolumeRT = seriesVolumeTripperware.Wrap(next)
detectedFieldsRT = detectedFieldsTripperware.Wrap(next) detectedFieldsRT = detectedFieldsTripperware.Wrap(next)
detectedLabelsRT = detectedLabelsTripperware.Wrap(next) detectedLabelsRT = detectedLabelsTripperware.Wrap(next)
patternRT = patternTripperware.Wrap(next)
) )
return newRoundTripper( return newRoundTripper(
@ -285,6 +316,7 @@ func NewMiddleware(
seriesVolumeRT, seriesVolumeRT,
detectedFieldsRT, detectedFieldsRT,
detectedLabelsRT, detectedLabelsRT,
patternRT,
limits, limits,
) )
}), StopperWrapper{resultsCache, statsCache, volumeCache}, nil }), StopperWrapper{resultsCache, statsCache, volumeCache}, nil
@ -340,7 +372,7 @@ func NewDetectedLabelsCardinalityFilter(rt queryrangebase.Handler) queryrangebas
type roundTripper struct { type roundTripper struct {
logger log.Logger logger log.Logger
next, limited, log, metric, series, labels, instantMetric, indexStats, seriesVolume, detectedFields, detectedLabels base.Handler next, limited, log, metric, series, labels, instantMetric, indexStats, seriesVolume, detectedFields, detectedLabels, pattern base.Handler
limits Limits limits Limits
} }
@ -348,7 +380,7 @@ type roundTripper struct {
// newRoundTripper creates a new queryrange roundtripper // newRoundTripper creates a new queryrange roundtripper
func newRoundTripper( func newRoundTripper(
logger log.Logger, logger log.Logger,
next, limited, log, metric, series, labels, instantMetric, indexStats, seriesVolume, detectedFields, detectedLabels base.Handler, next, limited, log, metric, series, labels, instantMetric, indexStats, seriesVolume, detectedFields, detectedLabels, pattern base.Handler,
limits Limits, limits Limits,
) roundTripper { ) roundTripper {
return roundTripper{ return roundTripper{
@ -364,6 +396,7 @@ func newRoundTripper(
seriesVolume: seriesVolume, seriesVolume: seriesVolume,
detectedFields: detectedFields, detectedFields: detectedFields,
detectedLabels: detectedLabels, detectedLabels: detectedLabels,
pattern: pattern,
next: next, next: next,
} }
} }
@ -524,6 +557,23 @@ func (r roundTripper) Do(ctx context.Context, req base.Request) (base.Response,
"start", op.Start, "start", op.Start,
) )
return r.detectedLabels.Do(ctx, req) return r.detectedLabels.Do(ctx, req)
case *logproto.QueryPatternsRequest:
expr, err := syntax.ParseExpr(req.GetQuery())
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error())
}
groups, err := syntax.MatcherGroups(expr)
if err != nil {
level.Warn(logger).Log("msg", "unexpected matcher groups error in roundtripper", "err", err)
}
for _, g := range groups {
if err := validateMatchers(ctx, r.limits, g.Matchers); err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error())
}
}
return r.pattern.Do(ctx, req)
default: default:
return r.next.Do(ctx, req) return r.next.Do(ctx, req)
} }

@ -993,6 +993,7 @@ func TestPostQueries(t *testing.T) {
handler, handler,
handler, handler,
handler, handler,
handler,
fakeLimits{}, fakeLimits{},
).Do(ctx, lreq) ).Do(ctx, lreq)
require.NoError(t, err) require.NoError(t, err)

Loading…
Cancel
Save