diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index 330540d7b4..60c1c74d60 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -2421,6 +2421,14 @@ ruler_remote_write_sigv4_config: # remote client id as key. [ruler_remote_write_config: ] +# Timeout for a remote rule evaluation. Defaults to the value of +# 'querier.query-timeout'. +[ruler_remote_evaluation_timeout: ] + +# Maximum size (in bytes) of the allowable response size from a remote rule +# evaluation. Set to 0 to allow any response size (default). +[ruler_remote_evaluation_max_response_size: ] + # Deletion mode. Can be one of 'disabled', 'filter-only', or # 'filter-and-delete'. When set to 'filter-only' or 'filter-and-delete', and if # retention_enabled is true, then the log entry deletion API endpoints are diff --git a/docs/sources/operations/scalability.md b/docs/sources/operations/scalability.md index f426a3db21..c7c5ab1ca0 100644 --- a/docs/sources/operations/scalability.md +++ b/docs/sources/operations/scalability.md @@ -27,3 +27,62 @@ The query scheduler process itself can be started via the `-target=query-schedul In compute-constrained environments, garbage collection can become a significant performance factor. Frequently-run garbage collection interferes with running the application by using CPU resources. The use of memory ballast can mitigate the issue. Memory ballast allocates extra, but unused virtual memory in order to inflate the quantity of live heap space. Garbage collection is triggered by the growth of heap space usage. The inflated quantity of heap space reduces the perceived growth, so garbage collection occurs less frequently. Configure memory ballast using the ballast_bytes configuration option. + +## Remote rule evaluation + +_This feature was first proposed in [`LID-0002`](https://github.com/grafana/loki/pull/8129); it contains the design decisions +which informed the implementation._ + +By default, the `ruler` component embeds a query engine to evaluate rules. This generally works fine, except when rules +are complex or have to process a large amount of data regularly. Poor performance of the `ruler` manifests as recording rules metrics +with gaps or missed alerts. This situation can be detected by alerting on the `cortex_prometheus_rule_group_iterations_missed_total` metric +when it has a non-zero value. + +A solution to this problem is to externalize rule evaluation from the `ruler` process. The `ruler` embedded query engine +is single-threaded, meaning that rules are not split, sharded, or otherwise accelerated like regular Loki queries. The `query-frontend` +component exists explicitly for this purpose and, when combined with a number of `querier` instances, can massively +improve rule evaluation performance and lead to fewer missed iterations. + +It is generally recommended to create a separate `query-frontend` deployment and `querier` pool from your existing one - which handles adhoc +queries via Grafana, `logcli`, or the API. Rules should be given priority over adhoc queries because they are used to produce +metrics or alerts which may be crucial to the reliable operation of your service; if you use the same `query-frontend` and `querier` pool +for both, your rules will be executed with the same priority as adhoc queries which could lead to unpredictable performance. + +To enable remote rule evaluation, set the following configuration options: + +```yaml +ruler: + evaluation: + mode: remote + query_frontend: + address: dns:///: +``` + +See [`here`](/configuration/#ruler) for further configuration options. + +When you enable remote rule evaluation, the `ruler` component becomes a gRPC client to the `query-frontend` service; +this will result in far lower `ruler` resource usage because the majority of the work has been externalized. +The LogQL queries coming from the `ruler` will be executed against the given `query-frontend` service. +Requests will be load-balanced across all `query-frontend` IPs if the `dns:///` prefix is used. + +> **Note:** Queries that fail to execute are _not_ retried. + +### Limits & Observability + +Remote rule evaluation can be tuned with the following options: + +- `ruler_remote_evaluation_timeout`: maximum allowable execution time for rule evaluations +- `ruler_remote_evaluation_max_response_size`: maximum allowable response size over gRPC connection from `query-frontend` to `ruler` + +Both of these can be specified globally in the [`limits_config`](/configuration/#limits_config) section +or on a [per-tenant basis](/configuration/#runtime-configuration-file). + +Remote rule evaluation exposes a number of metrics: + +- `loki_ruler_remote_eval_request_duration_seconds`: time taken for rule evaluation (histogram) +- `loki_ruler_remote_eval_response_bytes`: number of bytes in rule evaluation response (histogram) +- `loki_ruler_remote_eval_response_samples`: number of samples in rule evaluation response (histogram) +- `loki_ruler_remote_eval_success_total`: successful rule evaluations (counter) +- `loki_ruler_remote_eval_failure_total`: unsuccessful rule evaluations with reasons (counter) + +Each of these metrics are per-tenant, so cardinality must be taken into consideration. \ No newline at end of file diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index f385a8fd86..d854f8ddad 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1004,9 +1004,14 @@ func (t *Loki) initRuleEvaluator() (services.Service, error) { break } - evaluator, err = ruler.NewLocalEvaluator(&t.Cfg.Ruler.Evaluation, engine, logger) + evaluator, err = ruler.NewLocalEvaluator(engine, logger) case ruler.EvalModeRemote: - evaluator, err = ruler.NewRemoteEvaluator(&t.Cfg.Ruler.Evaluation, logger) + qfClient, e := ruler.DialQueryFrontend(&t.Cfg.Ruler.Evaluation.QueryFrontend) + if e != nil { + return nil, fmt.Errorf("failed to dial query frontend for remote rule evaluation: %w", err) + } + + evaluator, err = ruler.NewRemoteEvaluator(qfClient, t.Overrides, logger, prometheus.DefaultRegisterer) default: err = fmt.Errorf("unknown rule evaluation mode %q", mode) } diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index 2242fe817d..f05515c522 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -48,11 +48,14 @@ type RulesLimits interface { RulerRemoteWriteQueueMaxBackoff(userID string) time.Duration RulerRemoteWriteQueueRetryOnRateLimit(userID string) bool RulerRemoteWriteSigV4Config(userID string) *sigv4.SigV4Config + + RulerRemoteEvaluationTimeout(userID string) time.Duration + RulerRemoteEvaluationMaxResponseSize(userID string) int64 } -// engineQueryFunc returns a new query function using the rules.EngineQueryFunc function +// queryFunc returns a new query function using the rules.EngineQueryFunc function // and passing an altered timestamp. -func engineQueryFunc(evaluator Evaluator, overrides RulesLimits, checker readyChecker, userID string) rules.QueryFunc { +func queryFunc(evaluator Evaluator, overrides RulesLimits, checker readyChecker, userID string) rules.QueryFunc { return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) { // check if storage instance is ready; if not, fail the rule evaluation; // we do this to prevent an attempt to append new samples before the WAL appender is ready @@ -131,8 +134,8 @@ func MultiTenantRuleManager(cfg Config, evaluator Evaluator, overrides RulesLimi registry.configureTenantStorage(userID) logger = log.With(logger, "user", userID) - queryFunc := engineQueryFunc(evaluator, overrides, registry, userID) - memStore := NewMemStore(userID, queryFunc, newMemstoreMetrics(reg), 5*time.Minute, log.With(logger, "subcomponent", "MemStore")) + queryFn := queryFunc(evaluator, overrides, registry, userID) + memStore := NewMemStore(userID, queryFn, newMemstoreMetrics(reg), 5*time.Minute, log.With(logger, "subcomponent", "MemStore")) // GroupLoader builds a cache of the rules as they're loaded by the // manager.This is used to back the memstore @@ -141,7 +144,7 @@ func MultiTenantRuleManager(cfg Config, evaluator Evaluator, overrides RulesLimi mgr := rules.NewManager(&rules.ManagerOptions{ Appendable: registry, Queryable: memStore, - QueryFunc: queryFunc, + QueryFunc: queryFn, Context: user.InjectOrgID(ctx, userID), ExternalURL: cfg.ExternalURL.URL, NotifyFunc: ruler.SendAlerts(notifier, cfg.ExternalURL.URL.String()), diff --git a/pkg/ruler/compat_test.go b/pkg/ruler/compat_test.go index 759d4d67ca..c1d95511bd 100644 --- a/pkg/ruler/compat_test.go +++ b/pkg/ruler/compat_test.go @@ -46,10 +46,10 @@ func TestNonMetricQuery(t *testing.T) { require.Nil(t, err) engine := logql.NewEngine(logql.EngineOpts{}, &FakeQuerier{}, overrides, log.Logger) - eval, err := NewLocalEvaluator(&EvaluationConfig{Mode: EvalModeLocal}, engine, log.Logger) + eval, err := NewLocalEvaluator(engine, log.Logger) require.NoError(t, err) - queryFunc := engineQueryFunc(eval, overrides, fakeChecker{}, "fake") + queryFunc := queryFunc(eval, overrides, fakeChecker{}, "fake") _, err = queryFunc(context.TODO(), `{job="nginx"}`, time.Now()) require.Error(t, err, "rule result is not a vector or scalar") diff --git a/pkg/ruler/evaluator_local.go b/pkg/ruler/evaluator_local.go index dc92a2a3c1..fed0f2f02e 100644 --- a/pkg/ruler/evaluator_local.go +++ b/pkg/ruler/evaluator_local.go @@ -15,20 +15,16 @@ import ( const EvalModeLocal = "local" type LocalEvaluator struct { - cfg *EvaluationConfig engine *logql.Engine logger log.Logger } -func NewLocalEvaluator(cfg *EvaluationConfig, engine *logql.Engine, logger log.Logger) (*LocalEvaluator, error) { - if cfg == nil { - return nil, fmt.Errorf("given config is nil") - } +func NewLocalEvaluator(engine *logql.Engine, logger log.Logger) (*LocalEvaluator, error) { if engine == nil { return nil, fmt.Errorf("given engine is nil") } - return &LocalEvaluator{cfg: cfg, engine: engine, logger: logger}, nil + return &LocalEvaluator{engine: engine, logger: logger}, nil } func (l *LocalEvaluator) Eval(ctx context.Context, qs string, now time.Time) (*logqlmodel.Result, error) { diff --git a/pkg/ruler/evaluator_remote.go b/pkg/ruler/evaluator_remote.go index 6647dd156f..13c742612a 100644 --- a/pkg/ruler/evaluator_remote.go +++ b/pkg/ruler/evaluator_remote.go @@ -11,6 +11,7 @@ import ( "encoding/json" "flag" "fmt" + "io" "net/http" "net/textproto" "net/url" @@ -23,8 +24,10 @@ import ( grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" otgrpc "github.com/opentracing-contrib/go-grpc" "github.com/opentracing/opentracing-go" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/promql" "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/instrument" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/user" "google.golang.org/grpc" @@ -35,6 +38,7 @@ import ( "github.com/grafana/loki/pkg/logqlmodel" "github.com/grafana/loki/pkg/querier/series" "github.com/grafana/loki/pkg/util/build" + "github.com/grafana/loki/pkg/util/httpreq" "github.com/grafana/loki/pkg/util/spanlogger" ) @@ -42,45 +46,127 @@ const ( keepAlive = time.Second * 10 keepAliveTimeout = time.Second * 5 - serviceConfig = `{"loadBalancingPolicy": "round_robin"}` - + serviceConfig = `{"loadBalancingPolicy": "round_robin"}` queryEndpointPath = "/loki/api/v1/query" + mimeTypeFormPost = "application/x-www-form-urlencoded" - mimeTypeFormPost = "application/x-www-form-urlencoded" + EvalModeRemote = "remote" ) -const EvalModeRemote = "remote" +var ( + userAgent = fmt.Sprintf("loki-ruler/%s", build.Version) +) -var userAgent = fmt.Sprintf("loki-ruler/%s", build.Version) +type metrics struct { + reqDurationSecs *prometheus.HistogramVec + responseSizeBytes *prometheus.HistogramVec + responseSizeSamples *prometheus.HistogramVec -type RemoteEvaluator struct { - rq *remoteQuerier - logger log.Logger + successfulEvals *prometheus.CounterVec + failedEvals *prometheus.CounterVec } -func NewRemoteEvaluator(cfg *EvaluationConfig, logger log.Logger) (*RemoteEvaluator, error) { - qfClient, err := dialQueryFrontend(cfg.QueryFrontend) - if err != nil { - return nil, fmt.Errorf("failed to dial query frontend for remote rule evaluation: %w", err) - } +type RemoteEvaluator struct { + client httpgrpc.HTTPClient + overrides RulesLimits + logger log.Logger + metrics *metrics +} + +func NewRemoteEvaluator(client httpgrpc.HTTPClient, overrides RulesLimits, logger log.Logger, registerer prometheus.Registerer) (*RemoteEvaluator, error) { return &RemoteEvaluator{ - rq: newRemoteQuerier(qfClient, logger, WithOrgIDMiddleware), - logger: logger, + client: client, + overrides: overrides, + logger: logger, + metrics: newMetrics(registerer), }, nil } +func newMetrics(registerer prometheus.Registerer) *metrics { + reqDurationSecs := prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "loki", + Subsystem: "ruler_remote_eval", + Name: "request_duration_seconds", + // 0.005000, 0.015000, 0.045000, 0.135000, 0.405000, 1.215000, 3.645000, 10.935000, 32.805000 + Buckets: prometheus.ExponentialBuckets(0.005, 3, 9), + }, []string{"user"}) + responseSizeBytes := prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "loki", + Subsystem: "ruler_remote_eval", + Name: "response_bytes", + // 32, 128, 512, 2K, 8K, 32K, 128K, 512K, 2M, 8M + Buckets: prometheus.ExponentialBuckets(32, 4, 10), + }, []string{"user"}) + responseSizeSamples := prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "loki", + Subsystem: "ruler_remote_eval", + Name: "response_samples", + // 1, 4, 16, 64, 256, 1024, 4096, 16384, 65536, 262144 + Buckets: prometheus.ExponentialBuckets(1, 4, 10), + }, []string{"user"}) + + successfulEvals := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "loki", + Subsystem: "ruler_remote_eval", + Name: "success_total", + }, []string{"user"}) + failedEvals := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "loki", + Subsystem: "ruler_remote_eval", + Name: "failure_total", + }, []string{"reason", "user"}) + + registerer.MustRegister( + reqDurationSecs, + responseSizeBytes, + responseSizeSamples, + successfulEvals, + failedEvals, + ) + + return &metrics{ + reqDurationSecs: reqDurationSecs, + responseSizeBytes: responseSizeBytes, + responseSizeSamples: responseSizeSamples, + + successfulEvals: successfulEvals, + failedEvals: failedEvals, + } +} + +type queryResponse struct { + res *logqlmodel.Result + err error +} + func (r *RemoteEvaluator) Eval(ctx context.Context, qs string, now time.Time) (*logqlmodel.Result, error) { - res, err := r.rq.Query(ctx, qs, now) + orgID, err := user.ExtractOrgID(ctx) if err != nil { - return nil, fmt.Errorf("failed to perform remote evaluation of query %q: %w", qs, err) + return nil, fmt.Errorf("failed to retrieve tenant ID from context: %w", err) } - return res, err + ch := make(chan queryResponse, 1) + + timeout := r.overrides.RulerRemoteEvaluationTimeout(orgID) + tCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + go r.Query(tCtx, ch, orgID, qs, now) + + for { + select { + case <-tCtx.Done(): + r.metrics.failedEvals.WithLabelValues("timeout", orgID).Inc() + return nil, fmt.Errorf("remote rule evaluation exceeded deadline of %fs (defined by ruler_remote_evaluation_timeout): %w", timeout.Seconds(), tCtx.Err()) + case res := <-ch: + return res.res, res.err + } + } } -// dialQueryFrontend creates and initializes a new httpgrpc.HTTPClient taking a QueryFrontendConfig configuration. -func dialQueryFrontend(cfg QueryFrontendConfig) (httpgrpc.HTTPClient, error) { +// DialQueryFrontend creates and initializes a new httpgrpc.HTTPClient taking a QueryFrontendConfig configuration. +func DialQueryFrontend(cfg *QueryFrontendConfig) (httpgrpc.HTTPClient, error) { tlsDialOptions, err := cfg.TLS.GetGRPCDialOptions(cfg.TLSEnabled) if err != nil { return nil, err @@ -115,35 +201,16 @@ func dialQueryFrontend(cfg QueryFrontendConfig) (httpgrpc.HTTPClient, error) { // Middleware provides a mechanism to inspect outgoing remote querier requests. type Middleware func(ctx context.Context, req *httpgrpc.HTTPRequest) error -// remoteQuerier executes read operations against a httpgrpc.HTTPClient. -type remoteQuerier struct { - client httpgrpc.HTTPClient - middlewares []Middleware - logger log.Logger -} - -// newRemoteQuerier creates and initializes a new remoteQuerier instance. -func newRemoteQuerier( - client httpgrpc.HTTPClient, - logger log.Logger, - middlewares ...Middleware, -) *remoteQuerier { - return &remoteQuerier{ - client: client, - middlewares: middlewares, - logger: logger, - } -} - // Query performs a query for the given time. -func (q *remoteQuerier) Query(ctx context.Context, qs string, t time.Time) (*logqlmodel.Result, error) { - logger, ctx := spanlogger.NewWithLogger(ctx, q.logger, "ruler.remoteEvaluation.Query") +func (r *RemoteEvaluator) Query(ctx context.Context, ch chan<- queryResponse, orgID, qs string, t time.Time) { + logger, ctx := spanlogger.NewWithLogger(ctx, r.logger, "ruler.remoteEvaluation.Query") defer logger.Span.Finish() - return q.query(ctx, qs, t, logger) + res, err := r.query(ctx, orgID, qs, t, logger) + ch <- queryResponse{res, err} } -func (q *remoteQuerier) query(ctx context.Context, query string, ts time.Time, logger log.Logger) (*logqlmodel.Result, error) { +func (r *RemoteEvaluator) query(ctx context.Context, orgID, query string, ts time.Time, logger log.Logger) (*logqlmodel.Result, error) { args := make(url.Values) args.Set("query", query) args.Set("direction", "forward") @@ -161,36 +228,67 @@ func (q *remoteQuerier) query(ctx context.Context, query string, ts time.Time, l {Key: textproto.CanonicalMIMEHeaderKey("User-Agent"), Values: []string{userAgent}}, {Key: textproto.CanonicalMIMEHeaderKey("Content-Type"), Values: []string{mimeTypeFormPost}}, {Key: textproto.CanonicalMIMEHeaderKey("Content-Length"), Values: []string{strconv.Itoa(len(body))}}, + {Key: textproto.CanonicalMIMEHeaderKey(string(httpreq.QueryTagsHTTPHeader)), Values: []string{"ruler"}}, + {Key: textproto.CanonicalMIMEHeaderKey(user.OrgIDHeaderName), Values: []string{orgID}}, }, } - for _, mdw := range q.middlewares { - if err := mdw(ctx, &req); err != nil { - return nil, err - } + start := time.Now() + resp, err := r.client.Handle(ctx, &req) + + instrument.ObserveWithExemplar(ctx, r.metrics.reqDurationSecs.WithLabelValues(orgID), time.Since(start).Seconds()) + + if resp != nil { + instrument.ObserveWithExemplar(ctx, r.metrics.responseSizeBytes.WithLabelValues(orgID), float64(len(resp.Body))) } - start := time.Now() - resp, err := q.client.Handle(ctx, &req) + log := log.With(logger, "query_hash", hash, "query", query, "instant", ts, "response_time", time.Since(start).String()) + if err != nil { - level.Warn(logger).Log("msg", "failed to remotely evaluate query expression", "err", err, "query_hash", hash, "qs", query, "ts", ts, "response_time", time.Since(start).Seconds()) - return nil, err + r.metrics.failedEvals.WithLabelValues("error", orgID).Inc() + + level.Warn(log).Log("msg", "failed to evaluate rule", "err", err) + return nil, fmt.Errorf("rule evaluation failed: %w", err) } + + fullBody := resp.Body + // created a limited reader to avoid logging the entire response body should it be very large + limitedBody := io.LimitReader(bytes.NewReader(fullBody), 1024) + + // TODO(dannyk): consider retrying if the rule has a very high interval, or the rule is very sensitive to missing samples + // i.e. critical alerts or recording rules producing crucial RemoteEvaluatorMetrics series if resp.Code/100 != 2 { - return nil, fmt.Errorf("unexpected response status code %d: %s", resp.Code, string(resp.Body)) + r.metrics.failedEvals.WithLabelValues("upstream_error", orgID).Inc() + + level.Warn(log).Log("msg", "rule evaluation failed with non-2xx response", "response_code", resp.Code, "response_body", limitedBody) + return nil, fmt.Errorf("unsuccessful/unexpected response - status code %d", resp.Code) } - level.Debug(logger).Log("msg", "query expression successfully evaluated", "query_hash", hash, "qs", query, "ts", ts, "response_time", time.Since(start).Seconds()) - return decodeResponse(resp) + maxSize := r.overrides.RulerRemoteEvaluationMaxResponseSize(orgID) + if maxSize > 0 && int64(len(fullBody)) >= maxSize { + r.metrics.failedEvals.WithLabelValues("max_size", orgID).Inc() + + level.Error(log).Log("msg", "rule evaluation exceeded max size", "max_size", maxSize, "response_size", len(fullBody)) + return nil, fmt.Errorf("%d bytes exceeds response size limit of %d (defined by ruler_remote_evaluation_max_response_size)", len(resp.Body), maxSize) + } + + level.Debug(log).Log("msg", "rule evaluation succeeded") + r.metrics.successfulEvals.WithLabelValues(orgID).Inc() + + return r.decodeResponse(ctx, resp, orgID) } -func decodeResponse(resp *httpgrpc.HTTPResponse) (*logqlmodel.Result, error) { +func (r *RemoteEvaluator) decodeResponse(ctx context.Context, resp *httpgrpc.HTTPResponse, orgID string) (*logqlmodel.Result, error) { + fullBody := resp.Body + // created a limited reader to avoid logging the entire response body should it be very large + limitedBody := io.LimitReader(bytes.NewReader(fullBody), 1024) + var decoded loghttp.QueryResponse - if err := json.NewDecoder(bytes.NewReader(resp.Body)).Decode(&decoded); err != nil { - return nil, err + if err := json.NewDecoder(bytes.NewReader(fullBody)).Decode(&decoded); err != nil { + return nil, fmt.Errorf("unexpected body encoding, not valid JSON: %w, body: %s", err, limitedBody) } - if decoded.Status == "error" { - return nil, fmt.Errorf("query response error: %s", decoded.Status) + if decoded.Status != loghttp.QueryStatusSuccess { + return nil, fmt.Errorf("query response error: status %q, body: %s", decoded.Status, limitedBody) } switch decoded.Data.ResultType { @@ -205,6 +303,8 @@ func decodeResponse(resp *httpgrpc.HTTPResponse) (*logqlmodel.Result, error) { }) } + instrument.ObserveWithExemplar(ctx, r.metrics.responseSizeSamples.WithLabelValues(orgID), float64(len(res))) + return &logqlmodel.Result{ Statistics: decoded.Data.Statistics, Data: res, @@ -215,28 +315,15 @@ func decodeResponse(resp *httpgrpc.HTTPResponse) (*logqlmodel.Result, error) { res.T = scalar.Timestamp.Unix() res.V = float64(scalar.Value) + instrument.ObserveWithExemplar(ctx, r.metrics.responseSizeSamples.WithLabelValues(orgID), 1) + return &logqlmodel.Result{ Statistics: decoded.Data.Statistics, Data: res, }, nil default: - return nil, fmt.Errorf("unsupported result type %s", decoded.Data.ResultType) - } -} - -// WithOrgIDMiddleware attaches 'X-Scope-OrgID' header value to the outgoing request by inspecting the passed context. -// In case the expression to evaluate corresponds to a federated rule, the ExtractTenantIDs function will take care -// of normalizing and concatenating source tenants by separating them with a '|' character. -func WithOrgIDMiddleware(ctx context.Context, req *httpgrpc.HTTPRequest) error { - orgID, err := user.ExtractOrgID(ctx) - if err != nil { - return err + return nil, fmt.Errorf("unsupported result type: %q", decoded.Data.ResultType) } - req.Headers = append(req.Headers, &httpgrpc.Header{ - Key: textproto.CanonicalMIMEHeaderKey(user.OrgIDHeaderName), - Values: []string{orgID}, - }) - return nil } // QueryFrontendConfig defines query-frontend transport configuration. diff --git a/pkg/ruler/evaluator_remote_test.go b/pkg/ruler/evaluator_remote_test.go new file mode 100644 index 0000000000..c309cdf7ef --- /dev/null +++ b/pkg/ruler/evaluator_remote_test.go @@ -0,0 +1,418 @@ +package ruler + +import ( + "context" + "crypto/rand" + "encoding/json" + "fmt" + "net/http" + "testing" + "time" + + "github.com/grafana/dskit/flagext" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/promql" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/user" + "google.golang.org/grpc" + + "github.com/grafana/loki/pkg/loghttp" + "github.com/grafana/loki/pkg/util/log" + "github.com/grafana/loki/pkg/validation" +) + +type mockClient struct { + handleFn func(ctx context.Context, in *httpgrpc.HTTPRequest, opts ...grpc.CallOption) (*httpgrpc.HTTPResponse, error) +} + +func (m mockClient) Handle(ctx context.Context, in *httpgrpc.HTTPRequest, opts ...grpc.CallOption) (*httpgrpc.HTTPResponse, error) { + if m.handleFn == nil { + return nil, fmt.Errorf("no handle function set") + } + + return m.handleFn(ctx, in, opts...) +} + +func TestRemoteEvalQueryTimeout(t *testing.T) { + const timeout = 200 * time.Millisecond + + defaultLimits := defaultLimitsTestConfig() + defaultLimits.RulerRemoteEvaluationTimeout = timeout + + limits, err := validation.NewOverrides(defaultLimits, nil) + require.NoError(t, err) + + cli := mockClient{ + handleFn: func(ctx context.Context, in *httpgrpc.HTTPRequest, opts ...grpc.CallOption) (*httpgrpc.HTTPResponse, error) { + // sleep for slightly longer than the timeout + time.Sleep(timeout + (100 * time.Millisecond)) + return &httpgrpc.HTTPResponse{ + Code: http.StatusInternalServerError, + Headers: nil, + Body: []byte("will not get here, will timeout before"), + }, nil + }, + } + + ev, err := NewRemoteEvaluator(cli, limits, log.Logger, prometheus.NewRegistry()) + require.NoError(t, err) + + ctx := context.Background() + ctx = user.InjectOrgID(ctx, "test") + + _, err = ev.Eval(ctx, "sum(rate({foo=\"bar\"}[5m]))", time.Now()) + require.Error(t, err) + // cannot use require.ErrorIs(t, err, context.DeadlineExceeded) because the original error is not wrapped correctly + require.ErrorContains(t, err, "context deadline exceeded") +} + +func TestRemoteEvalMaxResponseSize(t *testing.T) { + const maxSize = 2 * 1024 * 1024 // 2MiB + const exceededSize = maxSize + 1 + + defaultLimits := defaultLimitsTestConfig() + defaultLimits.RulerRemoteEvaluationMaxResponseSize = maxSize + + limits, err := validation.NewOverrides(defaultLimits, nil) + require.NoError(t, err) + + cli := mockClient{ + handleFn: func(ctx context.Context, in *httpgrpc.HTTPRequest, opts ...grpc.CallOption) (*httpgrpc.HTTPResponse, error) { + // generate a response of random bytes that's just too big for the max response size + var resp = make([]byte, exceededSize) + _, err = rand.Read(resp) + require.NoError(t, err) + + return &httpgrpc.HTTPResponse{ + Code: http.StatusOK, + Headers: nil, + Body: resp, + }, nil + }, + } + + ev, err := NewRemoteEvaluator(cli, limits, log.Logger, prometheus.NewRegistry()) + require.NoError(t, err) + + ctx := context.Background() + ctx = user.InjectOrgID(ctx, "test") + + _, err = ev.Eval(ctx, "sum(rate({foo=\"bar\"}[5m]))", time.Now()) + require.Error(t, err) + // cannot use require.ErrorIs(t, err, context.DeadlineExceeded) because the original error is not wrapped correctly + require.ErrorContains(t, err, fmt.Sprintf("%d bytes exceeds response size limit of %d", exceededSize, maxSize)) +} + +func TestRemoteEvalScalar(t *testing.T) { + defaultLimits := defaultLimitsTestConfig() + limits, err := validation.NewOverrides(defaultLimits, nil) + require.NoError(t, err) + + var ( + now = time.Now() + value = 19 + ) + + cli := mockClient{ + handleFn: func(ctx context.Context, in *httpgrpc.HTTPRequest, opts ...grpc.CallOption) (*httpgrpc.HTTPResponse, error) { + // this is somewhat bleeding the abstraction, but it's more idiomatic/readable than constructing + // the expected JSON response by hand + resp := loghttp.QueryResponse{ + Status: loghttp.QueryStatusSuccess, + Data: loghttp.QueryResponseData{ + ResultType: loghttp.ResultTypeScalar, + Result: loghttp.Scalar{ + Value: model.SampleValue(value), + Timestamp: model.TimeFromUnixNano(now.UnixNano()), + }, + }, + } + + out, err := json.Marshal(resp) + require.NoError(t, err) + + return &httpgrpc.HTTPResponse{ + Code: http.StatusOK, + Headers: nil, + Body: out, + }, nil + }, + } + + ev, err := NewRemoteEvaluator(cli, limits, log.Logger, prometheus.NewRegistry()) + require.NoError(t, err) + + ctx := context.Background() + ctx = user.InjectOrgID(ctx, "test") + + res, err := ev.Eval(ctx, "19", now) + require.NoError(t, err) + require.IsType(t, promql.Scalar{}, res.Data) + require.EqualValues(t, value, res.Data.(promql.Scalar).V) + // we lose nanosecond precision due to promql types + require.Equal(t, now.Unix(), res.Data.(promql.Scalar).T) +} + +// TestRemoteEvalEmptyScalarResponse validates that an empty scalar response is valid and does not cause an error +func TestRemoteEvalEmptyScalarResponse(t *testing.T) { + defaultLimits := defaultLimitsTestConfig() + limits, err := validation.NewOverrides(defaultLimits, nil) + require.NoError(t, err) + + cli := mockClient{ + handleFn: func(ctx context.Context, in *httpgrpc.HTTPRequest, opts ...grpc.CallOption) (*httpgrpc.HTTPResponse, error) { + // this is somewhat bleeding the abstraction, but it's more idiomatic/readable than constructing + // the expected JSON response by hand + resp := loghttp.QueryResponse{ + Status: loghttp.QueryStatusSuccess, + Data: loghttp.QueryResponseData{ + ResultType: loghttp.ResultTypeScalar, + Result: loghttp.Scalar{}, + }, + } + + out, err := json.Marshal(resp) + require.NoError(t, err) + + return &httpgrpc.HTTPResponse{ + Code: http.StatusOK, + Headers: nil, + Body: out, + }, nil + }, + } + + ev, err := NewRemoteEvaluator(cli, limits, log.Logger, prometheus.NewRegistry()) + require.NoError(t, err) + + ctx := context.Background() + ctx = user.InjectOrgID(ctx, "test") + + res, err := ev.Eval(ctx, "sum(rate({foo=\"bar\"}[5m]))", time.Now()) + require.NoError(t, err) + require.Empty(t, res.Data) +} + +// TestRemoteEvalEmptyVectorResponse validates that an empty vector response is valid and does not cause an error +func TestRemoteEvalVectorResponse(t *testing.T) { + defaultLimits := defaultLimitsTestConfig() + limits, err := validation.NewOverrides(defaultLimits, nil) + require.NoError(t, err) + + now := time.Now() + value := 35891 + + cli := mockClient{ + handleFn: func(ctx context.Context, in *httpgrpc.HTTPRequest, opts ...grpc.CallOption) (*httpgrpc.HTTPResponse, error) { + // this is somewhat bleeding the abstraction, but it's more idiomatic/readable than constructing + // the expected JSON response by hand + resp := loghttp.QueryResponse{ + Status: loghttp.QueryStatusSuccess, + Data: loghttp.QueryResponseData{ + ResultType: loghttp.ResultTypeVector, + Result: loghttp.Vector{ + { + Timestamp: model.TimeFromUnixNano(now.UnixNano()), + Value: model.SampleValue(value), + Metric: map[model.LabelName]model.LabelValue{ + model.LabelName("foo"): model.LabelValue("bar"), + }, + }, + { + Timestamp: model.TimeFromUnixNano(now.UnixNano()), + Value: model.SampleValue(value), + Metric: map[model.LabelName]model.LabelValue{ + model.LabelName("bar"): model.LabelValue("baz"), + }, + }, + }, + }, + } + + out, err := json.Marshal(resp) + require.NoError(t, err) + + return &httpgrpc.HTTPResponse{ + Code: http.StatusOK, + Headers: nil, + Body: out, + }, nil + }, + } + + ev, err := NewRemoteEvaluator(cli, limits, log.Logger, prometheus.NewRegistry()) + require.NoError(t, err) + + ctx := context.Background() + ctx = user.InjectOrgID(ctx, "test") + + res, err := ev.Eval(ctx, "sum(rate({foo=\"bar\"}[5m]))", now) + require.NoError(t, err) + require.Len(t, res.Data, 2) + require.IsType(t, promql.Vector{}, res.Data) + vector := res.Data.(promql.Vector) + require.EqualValues(t, now.UnixMilli(), vector[0].T) + require.EqualValues(t, value, vector[0].V) + require.EqualValues(t, map[string]string{ + "foo": "bar", + }, vector[0].Metric.Map()) +} + +// TestRemoteEvalEmptyVectorResponse validates that an empty vector response is valid and does not cause an error +func TestRemoteEvalEmptyVectorResponse(t *testing.T) { + defaultLimits := defaultLimitsTestConfig() + limits, err := validation.NewOverrides(defaultLimits, nil) + require.NoError(t, err) + + cli := mockClient{ + handleFn: func(ctx context.Context, in *httpgrpc.HTTPRequest, opts ...grpc.CallOption) (*httpgrpc.HTTPResponse, error) { + // this is somewhat bleeding the abstraction, but it's more idiomatic/readable than constructing + // the expected JSON response by hand + resp := loghttp.QueryResponse{ + Status: loghttp.QueryStatusSuccess, + Data: loghttp.QueryResponseData{ + ResultType: loghttp.ResultTypeVector, + Result: loghttp.Vector{}, + }, + } + + out, err := json.Marshal(resp) + require.NoError(t, err) + + return &httpgrpc.HTTPResponse{ + Code: http.StatusOK, + Headers: nil, + Body: out, + }, nil + }, + } + + ev, err := NewRemoteEvaluator(cli, limits, log.Logger, prometheus.NewRegistry()) + require.NoError(t, err) + + ctx := context.Background() + ctx = user.InjectOrgID(ctx, "test") + + _, err = ev.Eval(ctx, "sum(rate({foo=\"bar\"}[5m]))", time.Now()) + require.NoError(t, err) +} + +func TestRemoteEvalErrorResponse(t *testing.T) { + defaultLimits := defaultLimitsTestConfig() + limits, err := validation.NewOverrides(defaultLimits, nil) + require.NoError(t, err) + + var respErr = fmt.Errorf("some error occurred") + + cli := mockClient{ + handleFn: func(ctx context.Context, in *httpgrpc.HTTPRequest, opts ...grpc.CallOption) (*httpgrpc.HTTPResponse, error) { + return nil, respErr + }, + } + + ev, err := NewRemoteEvaluator(cli, limits, log.Logger, prometheus.NewRegistry()) + require.NoError(t, err) + + ctx := context.Background() + ctx = user.InjectOrgID(ctx, "test") + + _, err = ev.Eval(ctx, "sum(rate({foo=\"bar\"}[5m]))", time.Now()) + require.ErrorContains(t, err, "rule evaluation failed") + require.ErrorContains(t, err, respErr.Error()) +} + +func TestRemoteEvalNon2xxResponse(t *testing.T) { + defaultLimits := defaultLimitsTestConfig() + limits, err := validation.NewOverrides(defaultLimits, nil) + require.NoError(t, err) + + const httpErr = http.StatusInternalServerError + + cli := mockClient{ + handleFn: func(ctx context.Context, in *httpgrpc.HTTPRequest, opts ...grpc.CallOption) (*httpgrpc.HTTPResponse, error) { + return &httpgrpc.HTTPResponse{ + Code: httpErr, + }, nil + }, + } + + ev, err := NewRemoteEvaluator(cli, limits, log.Logger, prometheus.NewRegistry()) + require.NoError(t, err) + + ctx := context.Background() + ctx = user.InjectOrgID(ctx, "test") + + _, err = ev.Eval(ctx, "sum(rate({foo=\"bar\"}[5m]))", time.Now()) + require.ErrorContains(t, err, fmt.Sprintf("unsuccessful/unexpected response - status code %d", httpErr)) +} + +func TestRemoteEvalNonJSONResponse(t *testing.T) { + defaultLimits := defaultLimitsTestConfig() + limits, err := validation.NewOverrides(defaultLimits, nil) + require.NoError(t, err) + + cli := mockClient{ + handleFn: func(ctx context.Context, in *httpgrpc.HTTPRequest, opts ...grpc.CallOption) (*httpgrpc.HTTPResponse, error) { + return &httpgrpc.HTTPResponse{ + Code: http.StatusOK, + Body: []byte("this is not json"), + }, nil + }, + } + + ev, err := NewRemoteEvaluator(cli, limits, log.Logger, prometheus.NewRegistry()) + require.NoError(t, err) + + ctx := context.Background() + ctx = user.InjectOrgID(ctx, "test") + + _, err = ev.Eval(ctx, "sum(rate({foo=\"bar\"}[5m]))", time.Now()) + require.ErrorContains(t, err, "unexpected body encoding, not valid JSON") +} + +func TestRemoteEvalUnsupportedResultResponse(t *testing.T) { + defaultLimits := defaultLimitsTestConfig() + limits, err := validation.NewOverrides(defaultLimits, nil) + require.NoError(t, err) + + cli := mockClient{ + handleFn: func(ctx context.Context, in *httpgrpc.HTTPRequest, opts ...grpc.CallOption) (*httpgrpc.HTTPResponse, error) { + // this is somewhat bleeding the abstraction, but it's more idiomatic/readable than constructing + // the expected JSON response by hand + resp := loghttp.QueryResponse{ + Status: loghttp.QueryStatusSuccess, + Data: loghttp.QueryResponseData{ + // stream responses are not yet supported + ResultType: loghttp.ResultTypeStream, + Result: loghttp.Streams{}, + }, + } + + out, err := json.Marshal(resp) + require.NoError(t, err) + + return &httpgrpc.HTTPResponse{ + Code: http.StatusOK, + Headers: nil, + Body: out, + }, nil + }, + } + + ev, err := NewRemoteEvaluator(cli, limits, log.Logger, prometheus.NewRegistry()) + require.NoError(t, err) + + ctx := context.Background() + ctx = user.InjectOrgID(ctx, "test") + + _, err = ev.Eval(ctx, "sum(rate({foo=\"bar\"}[5m]))", time.Now()) + require.ErrorContains(t, err, fmt.Sprintf("unsupported result type: %q", loghttp.ResultTypeStream)) +} + +func defaultLimitsTestConfig() validation.Limits { + limits := validation.Limits{} + flagext.DefaultValues(&limits) + return limits +} diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 6ebf8fbcd5..3b18ae9918 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -145,6 +145,10 @@ type Limits struct { RulerRemoteWriteConfig map[string]config.RemoteWriteConfig `yaml:"ruler_remote_write_config,omitempty" json:"ruler_remote_write_config,omitempty" doc:"description=Configures global and per-tenant limits for remote write clients. A map with remote client id as key."` + // TODO(dannyk): possible enhancement is to align this with rule group interval + RulerRemoteEvaluationTimeout time.Duration `yaml:"ruler_remote_evaluation_timeout" json:"ruler_remote_evaluation_timeout" doc:"description=Timeout for a remote rule evaluation. Defaults to the value of 'querier.query-timeout'."` + RulerRemoteEvaluationMaxResponseSize int64 `yaml:"ruler_remote_evaluation_max_response_size" json:"ruler_remote_evaluation_max_response_size" doc:"description=Maximum size (in bytes) of the allowable response size from a remote rule evaluation. Set to 0 to allow any response size (default)."` + // Global and per tenant deletion mode DeletionMode string `yaml:"deletion_mode" json:"deletion_mode"` @@ -626,6 +630,22 @@ func (o *Overrides) RulerRemoteWriteConfig(userID string, id string) *config.Rem return nil } +// RulerRemoteEvaluationTimeout returns the duration after which to timeout a remote rule evaluation request for a given user. +func (o *Overrides) RulerRemoteEvaluationTimeout(userID string) time.Duration { + // if not defined, use the base query timeout + timeout := o.getOverridesForUser(userID).RulerRemoteEvaluationTimeout + if timeout <= 0 { + return time.Duration(o.getOverridesForUser(userID).QueryTimeout) + } + + return timeout +} + +// RulerRemoteEvaluationMaxResponseSize returns the maximum allowable response size from a remote rule evaluation for a given user. +func (o *Overrides) RulerRemoteEvaluationMaxResponseSize(userID string) int64 { + return o.getOverridesForUser(userID).RulerRemoteEvaluationMaxResponseSize +} + // RetentionPeriod returns the retention period for a given user. func (o *Overrides) RetentionPeriod(userID string) time.Duration { return time.Duration(o.getOverridesForUser(userID).RetentionPeriod)