From b18078827ea320bf6d9494f2eef8b315074934c6 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Wed, 13 Dec 2023 15:43:18 +0100 Subject: [PATCH] Map rpc Status back to user error (#11449) **What this PR does / why we need it**: Errors forwarded from the querier to the frontend would be reported with the wrong error code and message. This change fixes this. **Checklist** - [ ] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [x] Tests updated - [ ] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/setup/upgrade/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) - [ ] If the change is deprecating or removing a configuration option, update the `deprecated-config.yaml` and `deleted-config.yaml` files respectively in the `tools/deprecated-config-checker` directory. [Example PR](https://github.com/grafana/loki/pull/10840/commits/0d4416a4b03739583349934b96f272fb4f685d15) --- integration/client/client.go | 33 +++++++++++++++++++++++++ integration/loki_micro_services_test.go | 5 ++++ pkg/querier/queryrange/marshal.go | 8 ++++++ pkg/querier/worker/util.go | 5 +--- pkg/util/server/error.go | 31 ++++++++++++++++++++--- pkg/util/server/error_test.go | 27 +++++++++++++------- 6 files changed, 92 insertions(+), 17 deletions(-) diff --git a/integration/client/client.go b/integration/client/client.go index f293ad81dd..dcf2c036dc 100644 --- a/integration/client/client.go +++ b/integration/client/client.go @@ -662,6 +662,39 @@ func (c *Client) Series(ctx context.Context, matcher string) ([]map[string]strin return values.Data, nil } +func (c *Client) Stats(ctx context.Context, query string) ([]map[string]int, error) { + ctx, cancelFunc := context.WithTimeout(ctx, requestTimeout) + defer cancelFunc() + + v := url.Values{} + v.Set("query", query) + + u, err := url.Parse(c.baseURL) + if err != nil { + panic(err) + } + u.Path = "/loki/api/v1/index/stats" + u.RawQuery = v.Encode() + + buf, statusCode, err := c.run(ctx, u.String()) + if err != nil { + return nil, err + } + + if statusCode/100 != 2 { + return nil, fmt.Errorf("request failed with status code %d: %w", statusCode, errors.New(string(buf))) + } + + var values struct { + Data []map[string]int `json:"data"` + } + if err := json.Unmarshal(buf, &values); err != nil { + return nil, err + } + + return values.Data, nil +} + type TailResult struct { Response loghttp.TailResponse Err error diff --git a/integration/loki_micro_services_test.go b/integration/loki_micro_services_test.go index d85a3ae4a2..5111223eaf 100644 --- a/integration/loki_micro_services_test.go +++ b/integration/loki_micro_services_test.go @@ -146,6 +146,11 @@ func TestMicroServicesIngestQuery(t *testing.T) { assert.ElementsMatch(t, []map[string]string{{"job": "fake"}}, resp) }) + t.Run("stats error", func(t *testing.T) { + _, err := cliQueryFrontend.Series(context.Background(), `{job="fake"}|= "search"`) + require.ErrorContains(t, err, "status code 400: only label matchers are supported") + }) + t.Run("per-request-limits", func(t *testing.T) { queryLimitsPolicy := client.InjectHeadersOption(map[string][]string{querylimits.HTTPHeaderQueryLimitsKey: {`{"maxQueryLength": "1m"}`}}) cliQueryFrontendLimited := client.New(tenantID, "", tQueryFrontend.HTTPURL(), queryLimitsPolicy) diff --git a/pkg/querier/queryrange/marshal.go b/pkg/querier/queryrange/marshal.go index 5b227a2efa..6f6e997865 100644 --- a/pkg/querier/queryrange/marshal.go +++ b/pkg/querier/queryrange/marshal.go @@ -27,6 +27,7 @@ import ( "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" "github.com/grafana/loki/pkg/util/httpreq" "github.com/grafana/loki/pkg/util/querylimits" + "github.com/grafana/loki/pkg/util/server" ) const ( @@ -247,6 +248,13 @@ func QueryResponseWrap(res queryrangebase.Response) (*QueryResponse, error) { return p, nil } +// QueryResponseWrapError wraps an error in the QueryResponse protobuf. +func QueryResponseWrapError(err error) *QueryResponse { + return &QueryResponse{ + Status: server.WrapError(err), + } +} + func (Codec) QueryRequestUnwrap(ctx context.Context, req *QueryRequest) (queryrangebase.Request, context.Context, error) { if req == nil { return nil, ctx, nil diff --git a/pkg/querier/worker/util.go b/pkg/querier/worker/util.go index 7de49d1790..812236809a 100644 --- a/pkg/querier/worker/util.go +++ b/pkg/querier/worker/util.go @@ -139,10 +139,7 @@ func handleQueryRequest(ctx context.Context, request *queryrange.QueryRequest, h // This block covers any errors that are not gRPC errors and will include all query errors. // It's important to map non-retryable errors to a non 5xx status code so they will not be retried. - code, err := server.ClientHTTPStatusAndError(err) - return &queryrange.QueryResponse{ - Status: status.New(codes.Code(code), err.Error()).Proto(), - } + return queryrange.QueryResponseWrapError(err) } response, err := queryrange.QueryResponseWrap(resp) diff --git a/pkg/util/server/error.go b/pkg/util/server/error.go index df2beaea2b..fc04218d5a 100644 --- a/pkg/util/server/error.go +++ b/pkg/util/server/error.go @@ -9,7 +9,9 @@ import ( "github.com/grafana/dskit/user" "github.com/prometheus/prometheus/promql" "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" + + "github.com/gogo/googleapis/google/rpc" + "github.com/gogo/status" "github.com/grafana/loki/pkg/logqlmodel" storage_errors "github.com/grafana/loki/pkg/storage/errors" @@ -46,13 +48,20 @@ func ClientHTTPStatusAndError(err error) (int, error) { return http.StatusGatewayTimeout, errors.New(ErrDeadlineExceeded) } - s, isRPC := status.FromError(err) + if s, isRPC := status.FromError(err); isRPC { + if s.Code() == codes.DeadlineExceeded { + return http.StatusGatewayTimeout, errors.New(ErrDeadlineExceeded) + } else if int(s.Code())/100 == 4 || int(s.Code())/100 == 5 { + return int(s.Code()), errors.New(s.Message()) + } + return http.StatusInternalServerError, err + } + switch { case errors.Is(err, context.Canceled) || (errors.As(err, &promErr) && errors.Is(promErr.Err, context.Canceled)): return StatusClientClosedRequest, errors.New(ErrClientCanceled) - case errors.Is(err, context.DeadlineExceeded) || - (isRPC && s.Code() == codes.DeadlineExceeded): + case errors.Is(err, context.DeadlineExceeded): return http.StatusGatewayTimeout, errors.New(ErrDeadlineExceeded) case errors.As(err, &queryErr): return http.StatusBadRequest, err @@ -67,3 +76,17 @@ func ClientHTTPStatusAndError(err error) (int, error) { return http.StatusInternalServerError, err } } + +// WrapError wraps an error in a protobuf status. +func WrapError(err error) *rpc.Status { + if s, ok := status.FromError(err); ok { + return s.Proto() + } + + code, err := ClientHTTPStatusAndError(err) + return status.New(codes.Code(code), err.Error()).Proto() +} + +func UnwrapError(s *rpc.Status) error { + return status.ErrorProto(s) +} diff --git a/pkg/util/server/error_test.go b/pkg/util/server/error_test.go index 1b8132ff65..1fe15b0322 100644 --- a/pkg/util/server/error_test.go +++ b/pkg/util/server/error_test.go @@ -9,13 +9,12 @@ import ( "net/http/httptest" "testing" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" - + "github.com/gogo/status" "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/user" "github.com/prometheus/prometheus/promql" "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" "github.com/grafana/loki/pkg/logqlmodel" storage_errors "github.com/grafana/loki/pkg/storage/errors" @@ -32,9 +31,9 @@ func Test_writeError(t *testing.T) { }{ {"cancelled", context.Canceled, ErrClientCanceled, StatusClientClosedRequest}, {"cancelled multi", util.MultiError{context.Canceled, context.Canceled}, ErrClientCanceled, StatusClientClosedRequest}, - {"rpc cancelled", status.New(codes.Canceled, context.Canceled.Error()).Err(), "rpc error: code = Canceled desc = context canceled", http.StatusInternalServerError}, - {"rpc cancelled multi", util.MultiError{status.New(codes.Canceled, context.Canceled.Error()).Err(), status.New(codes.Canceled, context.Canceled.Error()).Err()}, "2 errors: rpc error: code = Canceled desc = context canceled; rpc error: code = Canceled desc = context canceled", http.StatusInternalServerError}, - {"mixed context and rpc cancelled", util.MultiError{context.Canceled, status.New(codes.Canceled, context.Canceled.Error()).Err()}, "2 errors: context canceled; rpc error: code = Canceled desc = context canceled", http.StatusInternalServerError}, + {"rpc cancelled", status.Error(codes.Canceled, context.Canceled.Error()), "rpc error: code = Canceled desc = context canceled", http.StatusInternalServerError}, + {"rpc cancelled multi", util.MultiError{status.Error(codes.Canceled, context.Canceled.Error()), status.Error(codes.Canceled, context.Canceled.Error())}, "2 errors: rpc error: code = Canceled desc = context canceled; rpc error: code = Canceled desc = context canceled", http.StatusInternalServerError}, + {"mixed context and rpc cancelled", util.MultiError{context.Canceled, status.Error(codes.Canceled, context.Canceled.Error())}, "2 errors: context canceled; rpc error: code = Canceled desc = context canceled", http.StatusInternalServerError}, {"mixed context, rpc cancelled and another", util.MultiError{errors.New("standard error"), context.Canceled, status.New(codes.Canceled, context.Canceled.Error()).Err()}, "3 errors: standard error; context canceled; rpc error: code = Canceled desc = context canceled", http.StatusInternalServerError}, {"cancelled storage", promql.ErrStorage{Err: context.Canceled}, ErrClientCanceled, StatusClientClosedRequest}, {"orgid", user.ErrNoOrgID, user.ErrNoOrgID.Error(), http.StatusBadRequest}, @@ -56,9 +55,19 @@ func Test_writeError(t *testing.T) { WriteError(tt.err, rec) require.Equal(t, tt.expectedStatus, rec.Result().StatusCode) b, err := io.ReadAll(rec.Result().Body) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + require.Equal(t, tt.msg, string(b[:len(b)-1])) + }) + + t.Run(tt.name+"-roundtrip", func(t *testing.T) { + status := WrapError(tt.err) + unwrappedErr := UnwrapError(status) + + rec := httptest.NewRecorder() + WriteError(unwrappedErr, rec) + require.Equal(t, tt.expectedStatus, rec.Result().StatusCode) + b, err := io.ReadAll(rec.Result().Body) + require.NoError(t, err) require.Equal(t, tt.msg, string(b[:len(b)-1])) }) }