package querier import ( "context" "fmt" "net/http" "slices" "strconv" "sync" "time" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/middleware" "github.com/grafana/dskit/tenant" "github.com/grafana/dskit/user" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" "github.com/thanos-io/objstore" "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" "github.com/grafana/loki/v3/pkg/dataobj/metastore" "github.com/grafana/loki/v3/pkg/engine" "github.com/grafana/loki/v3/pkg/loghttp" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql" "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/logqlmodel" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" querier_limits "github.com/grafana/loki/v3/pkg/querier/limits" "github.com/grafana/loki/v3/pkg/querier/pattern" "github.com/grafana/loki/v3/pkg/querier/queryrange" index_stats "github.com/grafana/loki/v3/pkg/storage/stores/index/stats" "github.com/grafana/loki/v3/pkg/tracing" "github.com/grafana/loki/v3/pkg/util/constants" "github.com/grafana/loki/v3/pkg/util/httpreq" utillog "github.com/grafana/loki/v3/pkg/util/log" serverutil "github.com/grafana/loki/v3/pkg/util/server" "github.com/grafana/loki/v3/pkg/util/spanlogger" util_validation "github.com/grafana/loki/v3/pkg/util/validation" ) type QueryResponse struct { ResultType parser.ValueType `json:"resultType"` Result parser.Value `json:"result"` } // nolint // QuerierAPI defines HTTP handler functions for the querier. type QuerierAPI struct { querier Querier cfgV1 Config // current/legacy engine config cfgV2 engine.Config // new/next generation engine config engineV1 logql.Engine // current/legacy query engine engineV2 logql.Engine // new/next generation query engine limits querier_limits.Limits logger log.Logger } // NewQuerierAPI returns an instance of the QuerierAPI. func NewQuerierAPI(v1Cfg Config, v2Cfg engine.Config, ms metastore.Metastore, querier Querier, limits querier_limits.Limits, store objstore.Bucket, reg prometheus.Registerer, logger log.Logger) *QuerierAPI { q := &QuerierAPI{ cfgV1: v1Cfg, cfgV2: v2Cfg, limits: limits, querier: querier, engineV1: logql.NewEngine(v1Cfg.Engine, querier, limits, logger), logger: logger, } if v2Cfg.Enable { q.engineV2 = engine.NewBasic(v2Cfg.Executor, ms, store, limits, reg, logger) } return q } // RangeQueryHandler is a http.HandlerFunc for range queries and legacy log queries func (q *QuerierAPI) RangeQueryHandler(ctx context.Context, req *queryrange.LokiRequest) (logqlmodel.Result, error) { var result logqlmodel.Result logger := utillog.WithContext(ctx, q.logger) if err := q.validateMaxEntriesLimits(ctx, req.Plan.AST, req.Limit); err != nil { return result, err } params, err := queryrange.ParamsFromRequest(req) if err != nil { return result, err } if q.cfgV2.Enable && hasDataObjectsAvailable(q.cfgV2, params.Start(), params.End()) { query := q.engineV2.Query(params) result, err = query.Exec(ctx) if err == nil { return result, err } if !errors.Is(err, engine.ErrNotSupported) { level.Error(logger).Log("msg", "query execution failed with new query engine", "err", err) return result, errors.Wrap(err, "failed with new execution engine") } level.Warn(logger).Log("msg", "falling back to legacy query engine", "err", err) } query := q.engineV1.Query(params) return query.Exec(ctx) } func hasDataObjectsAvailable(config engine.Config, start, end time.Time) bool { // Data objects in object storage lag behind 20-30 minutes. // We are generous and only enable v2 engine queries that end earlier than 1DataObjStorageLag ago (default 1h), // to ensure data objects are available. v2Start, v2End := config.ValidQueryRange() return end.Before(v2End) && start.After(v2Start) } // InstantQueryHandler is a http.HandlerFunc for instant queries. func (q *QuerierAPI) InstantQueryHandler(ctx context.Context, req *queryrange.LokiInstantRequest) (logqlmodel.Result, error) { // do not allow log selector expression (aka log query) as instant query if _, ok := req.Plan.AST.(syntax.SampleExpr); !ok { return logqlmodel.Result{}, logqlmodel.ErrUnsupportedSyntaxForInstantQuery } if err := q.validateMaxEntriesLimits(ctx, req.Plan.AST, req.Limit); err != nil { return logqlmodel.Result{}, err } params, err := queryrange.ParamsFromRequest(req) if err != nil { return logqlmodel.Result{}, err } if q.cfgV2.Enable && hasDataObjectsAvailable(q.cfgV2, params.Start(), params.End()) { query := q.engineV2.Query(params) result, err := query.Exec(ctx) if err == nil { return result, err } logger := utillog.WithContext(ctx, q.logger) if !errors.Is(err, engine.ErrNotSupported) { level.Error(logger).Log("msg", "query execution failed with new query engine", "err", err) return result, errors.Wrap(err, "failed with new execution engine") } level.Warn(logger).Log("msg", "falling back to legacy query engine", "err", err) } query := q.engineV1.Query(params) return query.Exec(ctx) } // LabelHandler is a http.HandlerFunc for handling label queries. func (q *QuerierAPI) LabelHandler(ctx context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) { timer := prometheus.NewTimer(logql.QueryTime.WithLabelValues(logql.QueryTypeLabels)) defer timer.ObserveDuration() start := time.Now() statsCtx, ctx := stats.NewContext(ctx) resp, err := q.querier.Label(ctx, req) if err != nil { return nil, err } if resp != nil && q.metricAggregationEnabled(ctx) { resp.Values = q.filterAggregatedMetricsLabel(resp.Values) } queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration) resLength := 0 if resp != nil { resLength = len(resp.Values) } statResult := statsCtx.Result(time.Since(start), queueTime, resLength) sp := trace.SpanFromContext(ctx) sp.SetAttributes(tracing.KeyValuesToOTelAttributes(statResult.KVList())...) status, _ := serverutil.ClientHTTPStatusAndError(err) logql.RecordLabelQueryMetrics(ctx, utillog.Logger, *req.Start, *req.End, req.Name, req.Query, strconv.Itoa(status), statResult) return resp, err } func (q *QuerierAPI) metricAggregationEnabled(ctx context.Context) bool { orgID, _ := user.ExtractOrgID(ctx) return q.limits.MetricAggregationEnabled(orgID) } // SeriesHandler returns the list of time series that match a certain label set. // See https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers func (q *QuerierAPI) SeriesHandler(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, stats.Result, error) { timer := prometheus.NewTimer(logql.QueryTime.WithLabelValues(logql.QueryTypeSeries)) defer timer.ObserveDuration() start := time.Now() statsCtx, ctx := stats.NewContext(ctx) // filtering out aggregated metrics is quicker if we had a matcher for it, ie. __aggregated__metric__="" // however, that only works fo non-empty matchers, so we still need the filter the response aggMetricsRequestedInAnyGroup := false if q.metricAggregationEnabled(ctx) { var grpsWithAggMetricsFilter []string var err error grpsWithAggMetricsFilter, aggMetricsRequestedInAnyGroup, err = q.filterAggregatedMetrics(req.GetGroups()) if err != nil { return nil, stats.Result{}, err } req.Groups = grpsWithAggMetricsFilter } resp, err := q.querier.Series(ctx, req) queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration) resLength := 0 if resp != nil { resLength = len(resp.Series) // filter the response to catch the empty matcher case if !aggMetricsRequestedInAnyGroup && q.metricAggregationEnabled(ctx) { resp = q.filterAggregatedMetricsFromSeriesResp(resp) } } statResult := statsCtx.Result(time.Since(start), queueTime, resLength) sp := trace.SpanFromContext(ctx) sp.SetAttributes(tracing.KeyValuesToOTelAttributes(statResult.KVList())...) status, _ := serverutil.ClientHTTPStatusAndError(err) logql.RecordSeriesQueryMetrics(ctx, utillog.Logger, req.Start, req.End, req.Groups, strconv.Itoa(status), req.GetShards(), statResult) return resp, statResult, err } // IndexStatsHandler queries the index for the data statistics related to a query func (q *QuerierAPI) IndexStatsHandler(ctx context.Context, req *loghttp.RangeQuery) (*logproto.IndexStatsResponse, error) { timer := prometheus.NewTimer(logql.QueryTime.WithLabelValues(logql.QueryTypeStats)) defer timer.ObserveDuration() start := time.Now() statsCtx, ctx := stats.NewContext(ctx) // TODO(karsten): we might want to change IndexStats to receive a logproto.IndexStatsRequest instead resp, err := q.querier.IndexStats(ctx, req) if resp == nil { // Some stores don't implement this resp = &index_stats.Stats{} } queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration) statResult := statsCtx.Result(time.Since(start), queueTime, 1) sp := trace.SpanFromContext(ctx) sp.SetAttributes(tracing.KeyValuesToOTelAttributes(statResult.KVList())...) status, _ := serverutil.ClientHTTPStatusAndError(err) logql.RecordStatsQueryMetrics(ctx, utillog.Logger, req.Start, req.End, req.Query, strconv.Itoa(status), statResult) return resp, err } func (q *QuerierAPI) IndexShardsHandler(ctx context.Context, req *loghttp.RangeQuery, targetBytesPerShard uint64) (*logproto.ShardsResponse, error) { timer := prometheus.NewTimer(logql.QueryTime.WithLabelValues(logql.QueryTypeShards)) defer timer.ObserveDuration() start := time.Now() statsCtx, ctx := stats.NewContext(ctx) resp, err := q.querier.IndexShards(ctx, req, targetBytesPerShard) queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration) resLength := 0 if resp != nil { resLength = len(resp.Shards) stats.JoinResults(ctx, resp.Statistics) } statResult := statsCtx.Result(time.Since(start), queueTime, resLength) sp := trace.SpanFromContext(ctx) sp.SetAttributes(tracing.KeyValuesToOTelAttributes(statResult.KVList())...) status, _ := serverutil.ClientHTTPStatusAndError(err) logql.RecordShardsQueryMetrics( ctx, utillog.Logger, req.Start, req.End, req.Query, targetBytesPerShard, strconv.Itoa(status), resLength, statResult, ) return resp, err } // TODO(trevorwhitney): add test for the handler split // VolumeHandler queries the index label volumes related to the passed matchers and given time range. // Returns either N values where N is the time range / step and a single value for a time range depending on the request. func (q *QuerierAPI) VolumeHandler(ctx context.Context, req *logproto.VolumeRequest) (*logproto.VolumeResponse, error) { timer := prometheus.NewTimer(logql.QueryTime.WithLabelValues(logql.QueryTypeVolume)) defer timer.ObserveDuration() start := time.Now() statsCtx, ctx := stats.NewContext(ctx) resp, err := q.querier.Volume(ctx, req) if err != nil { return nil, err } if resp == nil { // Some stores don't implement this return &logproto.VolumeResponse{Volumes: []logproto.Volume{}}, nil } queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration) statResult := statsCtx.Result(time.Since(start), queueTime, 1) sp := trace.SpanFromContext(ctx) sp.SetAttributes(tracing.KeyValuesToOTelAttributes(statResult.KVList())...) status, _ := serverutil.ClientHTTPStatusAndError(err) logql.RecordVolumeQueryMetrics(ctx, utillog.Logger, req.From.Time(), req.Through.Time(), req.GetQuery(), uint32(req.GetLimit()), time.Duration(req.GetStep()), strconv.Itoa(status), statResult) return resp, nil } // filterAggregatedMetrics adds a matcher to exclude aggregated metrics and patterns unless explicitly requested func (q *QuerierAPI) filterAggregatedMetrics(groups []string) ([]string, bool, error) { // cannot add filter to an empty matcher set if len(groups) == 0 { return groups, false, nil } noAggMetrics, err := labels.NewMatcher( labels.MatchEqual, constants.AggregatedMetricLabel, "", ) if err != nil { return nil, false, err } noPatterns, err := labels.NewMatcher( labels.MatchEqual, constants.PatternLabel, "", ) if err != nil { return nil, false, err } newGroups := make([]string, 0, len(groups)+1) aggMetricsRequestedInAnyGroup := false for _, group := range groups { grp, err := syntax.ParseMatchers(group, false) if err != nil { return nil, false, err } aggMetricsRequested := false patternsRequested := false for _, m := range grp { if m.Name == constants.AggregatedMetricLabel { aggMetricsRequested = true aggMetricsRequestedInAnyGroup = true } if m.Name == constants.PatternLabel { patternsRequested = true aggMetricsRequestedInAnyGroup = true } } if !aggMetricsRequested { grp = append(grp, noAggMetrics) } if !patternsRequested { grp = append(grp, noPatterns) } newGroups = append(newGroups, syntax.MatchersString(grp)) } return newGroups, aggMetricsRequestedInAnyGroup, nil } func (q *QuerierAPI) filterAggregatedMetricsFromSeriesResp(resp *logproto.SeriesResponse) *logproto.SeriesResponse { for i := 0; i < len(resp.Series); i++ { keys := make([]string, 0, len(resp.Series[i].Labels)) for _, label := range resp.Series[i].Labels { keys = append(keys, label.Key) } if slices.Contains(keys, constants.AggregatedMetricLabel) || slices.Contains(keys, constants.PatternLabel) { resp.Series = slices.Delete(resp.Series, i, i+1) i-- } } return resp } func (q *QuerierAPI) filterAggregatedMetricsLabel(labels []string) []string { newLabels := make([]string, 0, len(labels)) for _, label := range labels { if label == constants.AggregatedMetricLabel || label == constants.PatternLabel { continue } newLabels = append(newLabels, label) } return newLabels } func (q *QuerierAPI) DetectedFieldsHandler(ctx context.Context, req *logproto.DetectedFieldsRequest) (*logproto.DetectedFieldsResponse, error) { resp, err := q.querier.DetectedFields(ctx, req) if err != nil { return nil, err } if resp == nil { // Some stores don't implement this level.Debug(spanlogger.FromContext(ctx, q.logger)).Log( "msg", "queried store for detected fields that does not support it, no response from querier.DetectedFields", ) return &logproto.DetectedFieldsResponse{ Fields: []*logproto.DetectedField{}, Limit: req.GetLimit(), }, nil } return resp, nil } type asyncPatternResponses struct { responses []*logproto.QueryPatternsResponse lock sync.Mutex } // currently, the only contention is when adding, however if that behavior changes we may need to lock in the read methods as well func (r *asyncPatternResponses) add(resp *logproto.QueryPatternsResponse) { if resp == nil { return } r.lock.Lock() defer r.lock.Unlock() r.responses = append(r.responses, resp) } func (r *asyncPatternResponses) get() []*logproto.QueryPatternsResponse { return r.responses } func (r *asyncPatternResponses) len() int { return len(r.responses) } func (q *QuerierAPI) PatternsHandler(ctx context.Context, req *logproto.QueryPatternsRequest) (*logproto.QueryPatternsResponse, error) { // Calculate query intervals for ingester vs store ingesterQueryInterval, storeQueryInterval := BuildQueryIntervalsWithLookback(q.cfgV1, req.Start, req.End, q.cfgV1.QueryPatternIngestersWithin) responses := asyncPatternResponses{} g, ctx := errgroup.WithContext(ctx) // Query pattern ingesters for recent data if ingesterQueryInterval != nil && !q.cfgV1.QueryStoreOnly && q.querier != nil { g.Go(func() error { splitReq := *req splitReq.Start = ingesterQueryInterval.start splitReq.End = ingesterQueryInterval.end resp, err := q.querier.Patterns(ctx, &splitReq) if err != nil { return err } responses.add(resp) return nil }) } // Query store for older data by converting to LogQL query // Only query the store if pattern persistence is enabled for at least one tenant if storeQueryInterval != nil && !q.cfgV1.QueryIngesterOnly && q.engineV1 != nil { tenantIDs, err := tenant.TenantIDs(ctx) if err != nil { return nil, err } // Only query the store if pattern persistence is enabled for at least one tenant patternPersistenceEnabled := false for _, tenantID := range tenantIDs { if q.limits.PatternPersistenceEnabled(tenantID) { patternPersistenceEnabled = true break } } if patternPersistenceEnabled { g.Go(func() error { storeReq := *req storeReq.Start = storeQueryInterval.start storeReq.End = storeQueryInterval.end resp, err := q.queryStoreForPatterns(ctx, &storeReq) if err != nil { return err } responses.add(resp) return nil }) } } if err := g.Wait(); err != nil { return nil, err } // Merge responses if responses.len() == 0 { return &logproto.QueryPatternsResponse{ Series: []*logproto.PatternSeries{}, }, nil } return pattern.MergePatternResponses(responses.get()), nil } func (q *QuerierAPI) queryStoreForPatterns(ctx context.Context, req *logproto.QueryPatternsRequest) (*logproto.QueryPatternsResponse, error) { params, err := queryrange.ParamsFromRequest(req) if err != nil { return nil, err } // Patterns are persisted as logfmt'd strings, so we need to to run the query using the LogQL engine // in order to extract the metric values from them. query := q.engineV1.Query(params) res, err := query.Exec(ctx) if err != nil { return nil, err } result := make(map[string]map[string]map[int64]float64) // level -> pattern -> timestamp -> value switch v := res.Data.(type) { case promql.Vector: for _, s := range v { lvl, pattern := getLevelAndPattern(s.Metric) if pattern == "" { continue } if result[lvl] == nil { result[lvl] = make(map[string]map[int64]float64) } if result[lvl][pattern] == nil { result[lvl][pattern] = make(map[int64]float64) } result[lvl][pattern][s.T] = s.F } case promql.Matrix: for _, s := range v { lvl, pattern := getLevelAndPattern(s.Metric) if pattern == "" { continue } if result[lvl] == nil { result[lvl] = make(map[string]map[int64]float64) } if result[lvl][pattern] == nil { result[lvl][pattern] = make(map[int64]float64) } for _, f := range s.Floats { result[lvl][pattern][f.T] = f.F } } default: return nil, fmt.Errorf("unexpected type (%s) when querying store for patterns. Expected %s or %s", res.Data.Type(), parser.ValueTypeMatrix, parser.ValueTypeVector) } // Convert to pattern response format resp := &logproto.QueryPatternsResponse{ Series: make([]*logproto.PatternSeries, 0, len(result)), } for lvl, patterns := range result { for pattern, samples := range patterns { series := &logproto.PatternSeries{ Pattern: pattern, Level: lvl, Samples: make([]*logproto.PatternSample, 0, len(samples)), } // Convert samples map to slice and sort by timestamp timestamps := make([]int64, 0, len(samples)) for ts := range samples { timestamps = append(timestamps, ts) } slices.Sort(timestamps) level.Debug(q.logger).Log( "msg", "earliest pattern sample from store", "timestamp", timestamps[0], ) for _, ts := range timestamps { series.Samples = append(series.Samples, &logproto.PatternSample{ Timestamp: model.Time(ts), Value: int64(samples[ts]), }) } resp.Series = append(resp.Series, series) } } return resp, nil } func getLevelAndPattern(metric labels.Labels) (string, string) { lvl := metric.Get(constants.LevelLabel) if lvl == "" { lvl = constants.LogLevelUnknown } pattern := metric.Get("decoded_pattern") return lvl, pattern } func (q *QuerierAPI) validateMaxEntriesLimits(ctx context.Context, expr syntax.Expr, limit uint32) error { tenantIDs, err := tenant.TenantIDs(ctx) if err != nil { return httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()) } // entry limit does not apply to metric queries. if _, ok := expr.(syntax.SampleExpr); ok { return nil } maxEntriesCapture := func(id string) int { return q.limits.MaxEntriesLimitPerQuery(ctx, id) } maxEntriesLimit := util_validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, maxEntriesCapture) if int(limit) > maxEntriesLimit && maxEntriesLimit != 0 { return httpgrpc.Errorf(http.StatusBadRequest, "max entries limit per query exceeded, limit > max_entries_limit_per_query (%d > %d)", limit, maxEntriesLimit) } return nil } // DetectedLabelsHandler returns a response for detected labels func (q *QuerierAPI) DetectedLabelsHandler(ctx context.Context, req *logproto.DetectedLabelsRequest) (*logproto.DetectedLabelsResponse, error) { resp, err := q.querier.DetectedLabels(ctx, req) if err != nil { return nil, err } return resp, nil } // WrapQuerySpanAndTimeout applies a context deadline and a span logger to a query call. // // The timeout is based on the per-tenant query timeout configuration. func WrapQuerySpanAndTimeout(call string, limits querier_limits.Limits) middleware.Interface { return middleware.Func(func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { ctx, sp := tracer.Start(req.Context(), call) defer sp.End() log := spanlogger.FromContext(req.Context(), utillog.Logger) defer log.Finish() tenants, err := tenant.TenantIDs(ctx) if err != nil { level.Error(log).Log("msg", "couldn't fetch tenantID", "err", err) serverutil.WriteError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), w) return } timeoutCapture := func(id string) time.Duration { return limits.QueryTimeout(ctx, id) } timeout := util_validation.SmallestPositiveNonZeroDurationPerTenant(tenants, timeoutCapture) newCtx, cancel := context.WithTimeoutCause(ctx, timeout, errors.New("query timeout reached")) defer cancel() newReq := req.WithContext(newCtx) next.ServeHTTP(w, newReq) }) }) }