Bring back RED metrics for querier when processing scheduler requests. (#11097)

**What this PR does / why we need it**:
A previous change removed the RED metrics for the querier. This adds
them back as part of a middleware.

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [ ] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e3ec)
- [ ] If the change is deprecating or removing a configuration option,
update the `deprecated-config.yaml` and `deleted-config.yaml` files
respectively in the `tools/deprecated-config-checker` directory.
[Example
PR](0d4416a4b0)
pull/11086/head^2
Karsten Jeschkies 2 years ago committed by GitHub
parent 92fa3e3e5c
commit 33b7e51342
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      pkg/loki/loki.go
  2. 11
      pkg/loki/modules.go
  3. 23
      pkg/querier/queryrange/codec.go
  4. 51
      pkg/querier/queryrange/instrument.go

@ -332,7 +332,8 @@ type Loki struct {
HTTPAuthMiddleware middleware.Interface
Codec Codec
Codec Codec
Metrics *server.Metrics
}
// New makes a new Loki.

@ -142,7 +142,9 @@ func (t *Loki) initServer() (services.Service, error) {
// Loki handles signals on its own.
DisableSignalHandling(&t.Cfg.Server)
serv, err := server.New(t.Cfg.Server)
t.Metrics = server.NewServerMetrics(t.Cfg.Server)
serv, err := server.NewWithMetrics(t.Cfg.Server, t.Metrics)
if err != nil {
return nil, err
}
@ -515,12 +517,7 @@ func (t *Loki) initQuerier() (services.Service, error) {
internalHandler := queryrangebase.MergeMiddlewares(
serverutil.RecoveryMiddleware,
queryrange.Instrument{
QueryHandlerMetrics: queryrange.NewQueryHandlerMetrics(
prometheus.DefaultRegisterer,
t.Cfg.MetricsNamespace,
),
},
queryrange.Instrument{Metrics: t.Metrics},
).Wrap(handler)
svc, err := querier.InitWorkerService(

@ -497,6 +497,10 @@ func (Codec) DecodeHTTPGrpcRequest(ctx context.Context, r *httpgrpc.HTTPRequest)
// DecodeHTTPGrpcResponse decodes an httpgrp.HTTPResponse to queryrangebase.Response.
func (Codec) DecodeHTTPGrpcResponse(r *httpgrpc.HTTPResponse, req queryrangebase.Request) (queryrangebase.Response, error) {
if r.Code/100 != 2 {
return nil, httpgrpc.Errorf(int(r.Code), string(r.Body))
}
headers := make(http.Header)
for _, header := range r.Headers {
headers[header.Key] = header.Values
@ -702,6 +706,25 @@ func (c Codec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*ht
}
}
func (c Codec) Path(r queryrangebase.Request) string {
switch request := r.(type) {
case *LokiRequest:
return "loki/api/v1/query_range"
case *LokiSeriesRequest:
return "loki/api/v1/series"
case *LabelRequest:
return request.Path() // NOTE: this could be either /label or /label/{name}/values endpoint. So forward the original path as it is.
case *LokiInstantRequest:
return "/loki/api/v1/query"
case *logproto.IndexStatsRequest:
return "/loki/api/v1/index/stats"
case *logproto.VolumeRequest:
return "/loki/api/v1/index/volume_range"
}
return "other"
}
func (p RequestProtobufCodec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*http.Request, error) {
req, err := p.Codec.EncodeRequest(ctx, r)
if err != nil {

@ -2,34 +2,24 @@ package queryrange
import (
"context"
"fmt"
"strconv"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/instrument"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/server"
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
)
const (
gRPC = "gRPC"
method = "GET"
)
type QueryHandlerMetrics struct {
InflightRequests *prometheus.GaugeVec
}
func NewQueryHandlerMetrics(registerer prometheus.Registerer, metricsNamespace string) *QueryHandlerMetrics {
return &QueryHandlerMetrics{
InflightRequests: promauto.With(registerer).NewGaugeVec(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Name: "inflight_requests",
Help: "Current number of inflight requests.",
}, []string{"method", "route"}),
}
}
type Instrument struct {
*QueryHandlerMetrics
*server.Metrics
}
var _ queryrangebase.Middleware = Instrument{}
@ -37,11 +27,28 @@ var _ queryrangebase.Middleware = Instrument{}
// Wrap implements the queryrangebase.Middleware
func (i Instrument) Wrap(next queryrangebase.Handler) queryrangebase.Handler {
return queryrangebase.HandlerFunc(func(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
route := fmt.Sprintf("%T", r)
inflight := i.InflightRequests.WithLabelValues(gRPC, route)
route := DefaultCodec.Path(r)
route = middleware.MakeLabelValue(route)
inflight := i.InflightRequests.WithLabelValues(method, route)
inflight.Inc()
defer inflight.Dec()
return next.Do(ctx, r)
begin := time.Now()
result, err := next.Do(ctx, r)
i.observe(ctx, route, err, time.Since(begin))
return result, err
})
}
func (i Instrument) observe(ctx context.Context, route string, err error, duration time.Duration) {
respStatus := "200"
if err != nil {
if errResp, ok := httpgrpc.HTTPResponseFromError(err); ok {
respStatus = strconv.Itoa(int(errResp.Code))
} else {
respStatus = "500"
}
}
instrument.ObserveWithExemplar(ctx, i.RequestDuration.WithLabelValues(method, route, respStatus, "false"), duration.Seconds())
}

Loading…
Cancel
Save