Map rpc Status back to user error (#11449)

**What this PR does / why we need it**:
Errors forwarded from the querier to the frontend would be reported with
the wrong error code and message. This change fixes this.


**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [x] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e3ec)
- [ ] If the change is deprecating or removing a configuration option,
update the `deprecated-config.yaml` and `deleted-config.yaml` files
respectively in the `tools/deprecated-config-checker` directory.
[Example
PR](0d4416a4b0)
pull/11394/head
Karsten Jeschkies 2 years ago committed by GitHub
parent 9a3bcba174
commit b18078827e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 33
      integration/client/client.go
  2. 5
      integration/loki_micro_services_test.go
  3. 8
      pkg/querier/queryrange/marshal.go
  4. 5
      pkg/querier/worker/util.go
  5. 31
      pkg/util/server/error.go
  6. 27
      pkg/util/server/error_test.go

@ -662,6 +662,39 @@ func (c *Client) Series(ctx context.Context, matcher string) ([]map[string]strin
return values.Data, nil
}
func (c *Client) Stats(ctx context.Context, query string) ([]map[string]int, error) {
ctx, cancelFunc := context.WithTimeout(ctx, requestTimeout)
defer cancelFunc()
v := url.Values{}
v.Set("query", query)
u, err := url.Parse(c.baseURL)
if err != nil {
panic(err)
}
u.Path = "/loki/api/v1/index/stats"
u.RawQuery = v.Encode()
buf, statusCode, err := c.run(ctx, u.String())
if err != nil {
return nil, err
}
if statusCode/100 != 2 {
return nil, fmt.Errorf("request failed with status code %d: %w", statusCode, errors.New(string(buf)))
}
var values struct {
Data []map[string]int `json:"data"`
}
if err := json.Unmarshal(buf, &values); err != nil {
return nil, err
}
return values.Data, nil
}
type TailResult struct {
Response loghttp.TailResponse
Err error

@ -146,6 +146,11 @@ func TestMicroServicesIngestQuery(t *testing.T) {
assert.ElementsMatch(t, []map[string]string{{"job": "fake"}}, resp)
})
t.Run("stats error", func(t *testing.T) {
_, err := cliQueryFrontend.Series(context.Background(), `{job="fake"}|= "search"`)
require.ErrorContains(t, err, "status code 400: only label matchers are supported")
})
t.Run("per-request-limits", func(t *testing.T) {
queryLimitsPolicy := client.InjectHeadersOption(map[string][]string{querylimits.HTTPHeaderQueryLimitsKey: {`{"maxQueryLength": "1m"}`}})
cliQueryFrontendLimited := client.New(tenantID, "", tQueryFrontend.HTTPURL(), queryLimitsPolicy)

@ -27,6 +27,7 @@ import (
"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
"github.com/grafana/loki/pkg/util/httpreq"
"github.com/grafana/loki/pkg/util/querylimits"
"github.com/grafana/loki/pkg/util/server"
)
const (
@ -247,6 +248,13 @@ func QueryResponseWrap(res queryrangebase.Response) (*QueryResponse, error) {
return p, nil
}
// QueryResponseWrapError wraps an error in the QueryResponse protobuf.
func QueryResponseWrapError(err error) *QueryResponse {
return &QueryResponse{
Status: server.WrapError(err),
}
}
func (Codec) QueryRequestUnwrap(ctx context.Context, req *QueryRequest) (queryrangebase.Request, context.Context, error) {
if req == nil {
return nil, ctx, nil

@ -139,10 +139,7 @@ func handleQueryRequest(ctx context.Context, request *queryrange.QueryRequest, h
// This block covers any errors that are not gRPC errors and will include all query errors.
// It's important to map non-retryable errors to a non 5xx status code so they will not be retried.
code, err := server.ClientHTTPStatusAndError(err)
return &queryrange.QueryResponse{
Status: status.New(codes.Code(code), err.Error()).Proto(),
}
return queryrange.QueryResponseWrapError(err)
}
response, err := queryrange.QueryResponseWrap(resp)

@ -9,7 +9,9 @@ import (
"github.com/grafana/dskit/user"
"github.com/prometheus/prometheus/promql"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/gogo/googleapis/google/rpc"
"github.com/gogo/status"
"github.com/grafana/loki/pkg/logqlmodel"
storage_errors "github.com/grafana/loki/pkg/storage/errors"
@ -46,13 +48,20 @@ func ClientHTTPStatusAndError(err error) (int, error) {
return http.StatusGatewayTimeout, errors.New(ErrDeadlineExceeded)
}
s, isRPC := status.FromError(err)
if s, isRPC := status.FromError(err); isRPC {
if s.Code() == codes.DeadlineExceeded {
return http.StatusGatewayTimeout, errors.New(ErrDeadlineExceeded)
} else if int(s.Code())/100 == 4 || int(s.Code())/100 == 5 {
return int(s.Code()), errors.New(s.Message())
}
return http.StatusInternalServerError, err
}
switch {
case errors.Is(err, context.Canceled) ||
(errors.As(err, &promErr) && errors.Is(promErr.Err, context.Canceled)):
return StatusClientClosedRequest, errors.New(ErrClientCanceled)
case errors.Is(err, context.DeadlineExceeded) ||
(isRPC && s.Code() == codes.DeadlineExceeded):
case errors.Is(err, context.DeadlineExceeded):
return http.StatusGatewayTimeout, errors.New(ErrDeadlineExceeded)
case errors.As(err, &queryErr):
return http.StatusBadRequest, err
@ -67,3 +76,17 @@ func ClientHTTPStatusAndError(err error) (int, error) {
return http.StatusInternalServerError, err
}
}
// WrapError wraps an error in a protobuf status.
func WrapError(err error) *rpc.Status {
if s, ok := status.FromError(err); ok {
return s.Proto()
}
code, err := ClientHTTPStatusAndError(err)
return status.New(codes.Code(code), err.Error()).Proto()
}
func UnwrapError(s *rpc.Status) error {
return status.ErrorProto(s)
}

@ -9,13 +9,12 @@ import (
"net/http/httptest"
"testing"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/gogo/status"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/user"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"github.com/grafana/loki/pkg/logqlmodel"
storage_errors "github.com/grafana/loki/pkg/storage/errors"
@ -32,9 +31,9 @@ func Test_writeError(t *testing.T) {
}{
{"cancelled", context.Canceled, ErrClientCanceled, StatusClientClosedRequest},
{"cancelled multi", util.MultiError{context.Canceled, context.Canceled}, ErrClientCanceled, StatusClientClosedRequest},
{"rpc cancelled", status.New(codes.Canceled, context.Canceled.Error()).Err(), "rpc error: code = Canceled desc = context canceled", http.StatusInternalServerError},
{"rpc cancelled multi", util.MultiError{status.New(codes.Canceled, context.Canceled.Error()).Err(), status.New(codes.Canceled, context.Canceled.Error()).Err()}, "2 errors: rpc error: code = Canceled desc = context canceled; rpc error: code = Canceled desc = context canceled", http.StatusInternalServerError},
{"mixed context and rpc cancelled", util.MultiError{context.Canceled, status.New(codes.Canceled, context.Canceled.Error()).Err()}, "2 errors: context canceled; rpc error: code = Canceled desc = context canceled", http.StatusInternalServerError},
{"rpc cancelled", status.Error(codes.Canceled, context.Canceled.Error()), "rpc error: code = Canceled desc = context canceled", http.StatusInternalServerError},
{"rpc cancelled multi", util.MultiError{status.Error(codes.Canceled, context.Canceled.Error()), status.Error(codes.Canceled, context.Canceled.Error())}, "2 errors: rpc error: code = Canceled desc = context canceled; rpc error: code = Canceled desc = context canceled", http.StatusInternalServerError},
{"mixed context and rpc cancelled", util.MultiError{context.Canceled, status.Error(codes.Canceled, context.Canceled.Error())}, "2 errors: context canceled; rpc error: code = Canceled desc = context canceled", http.StatusInternalServerError},
{"mixed context, rpc cancelled and another", util.MultiError{errors.New("standard error"), context.Canceled, status.New(codes.Canceled, context.Canceled.Error()).Err()}, "3 errors: standard error; context canceled; rpc error: code = Canceled desc = context canceled", http.StatusInternalServerError},
{"cancelled storage", promql.ErrStorage{Err: context.Canceled}, ErrClientCanceled, StatusClientClosedRequest},
{"orgid", user.ErrNoOrgID, user.ErrNoOrgID.Error(), http.StatusBadRequest},
@ -56,9 +55,19 @@ func Test_writeError(t *testing.T) {
WriteError(tt.err, rec)
require.Equal(t, tt.expectedStatus, rec.Result().StatusCode)
b, err := io.ReadAll(rec.Result().Body)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
require.Equal(t, tt.msg, string(b[:len(b)-1]))
})
t.Run(tt.name+"-roundtrip", func(t *testing.T) {
status := WrapError(tt.err)
unwrappedErr := UnwrapError(status)
rec := httptest.NewRecorder()
WriteError(unwrappedErr, rec)
require.Equal(t, tt.expectedStatus, rec.Result().StatusCode)
b, err := io.ReadAll(rec.Result().Body)
require.NoError(t, err)
require.Equal(t, tt.msg, string(b[:len(b)-1]))
})
}

Loading…
Cancel
Save