Fixes 500 in the querier when returning multiple errors. (#3348)

This correctly implement the `Is` error interface and uses the `util.MultiError`
everywhere we need it.

This way we can correcltly access if this was a cancellation which should not translate into 500.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
pull/3388/head
Cyril Tovena 5 years ago committed by GitHub
parent 99bfd1cc5e
commit e21d6edc10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      pkg/iter/entry_iterator.go
  2. 5
      pkg/iter/sample_iterator.go
  3. 3
      pkg/logql/engine_test.go
  4. 4
      pkg/logql/evaluator.go
  5. 9
      pkg/logql/sharding.go
  6. 11
      pkg/util/errors.go
  7. 3
      pkg/util/server/error_test.go

@ -3,7 +3,6 @@ package iter
import (
"container/heap"
"context"
"fmt"
"io"
"sync"
"time"
@ -11,6 +10,7 @@ import (
"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/stats"
"github.com/grafana/loki/pkg/util"
)
// EntryIterator iterates over entries in time-order.
@ -285,7 +285,7 @@ func (i *heapIterator) Error() error {
case 1:
return i.errs[0]
default:
return fmt.Errorf("Multiple errors: %+v", i.errs)
return util.MultiError(i.errs)
}
}
@ -502,7 +502,6 @@ func NewReversedIter(it EntryIterator, limit uint32, preload bool) (EntryIterato
entriesWithLabels: make([]entryWithLabels, 0, 1024),
limit: limit,
}, it.Error()
if err != nil {
return nil, err
}
@ -578,7 +577,6 @@ func NewEntryReversedIter(it EntryIterator) (EntryIterator, error) {
iter: it,
buf: entryBufferPool.Get().(*entryBuffer),
}, it.Error()
if err != nil {
return nil, err
}

@ -3,12 +3,12 @@ package iter
import (
"container/heap"
"context"
"fmt"
"io"
"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/stats"
"github.com/grafana/loki/pkg/util"
)
// SampleIterator iterates over samples in time-order.
@ -154,7 +154,6 @@ type heapSampleIterator struct {
// NewHeapSampleIterator returns a new iterator which uses a heap to merge together
// entries for multiple iterators.
func NewHeapSampleIterator(ctx context.Context, is []SampleIterator) SampleIterator {
return &heapSampleIterator{
stats: stats.GetChunkData(ctx),
is: is,
@ -268,7 +267,7 @@ func (i *heapSampleIterator) Error() error {
case 1:
return i.errs[0]
default:
return fmt.Errorf("Multiple errors: %+v", i.errs)
return util.MultiError(i.errs)
}
}

@ -20,12 +20,13 @@ import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/stats"
"github.com/grafana/loki/pkg/util"
)
var (
testSize = int64(300)
ErrMock = errors.New("mock error")
ErrMockMultiple = errors.New("Multiple errors: [mock error mock error]")
ErrMockMultiple = util.MultiError{ErrMock, ErrMock}
)
func TestEngine_LogsInstantQuery(t *testing.T) {

@ -3,7 +3,6 @@ package logql
import (
"container/heap"
"context"
"fmt"
"math"
"sort"
"time"
@ -15,6 +14,7 @@ import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/util"
)
type QueryRangeType string
@ -614,7 +614,7 @@ func binOpStepEvaluator(
case 1:
return errs[0]
default:
return fmt.Errorf("Multiple errors: %+v", errs)
return util.MultiError(errs)
}
})
}

@ -13,6 +13,7 @@ import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logql/stats"
"github.com/grafana/loki/pkg/util"
)
/*
@ -44,7 +45,6 @@ func NewShardedEngine(opts EngineOpts, downstreamable Downstreamable, metrics *S
metrics: metrics,
limits: limits,
}
}
// Query constructs a Query
@ -173,7 +173,6 @@ func (ev DownstreamEvaluator) Downstream(ctx context.Context, queries []Downstre
}
return results, nil
}
type errorQuerier struct{}
@ -181,6 +180,7 @@ type errorQuerier struct{}
func (errorQuerier) SelectLogs(ctx context.Context, p SelectLogParams) (iter.EntryIterator, error) {
return nil, errors.New("Unimplemented")
}
func (errorQuerier) SelectSamples(ctx context.Context, p SelectSampleParams) (iter.SampleIterator, error) {
return nil, errors.New("Unimplemented")
}
@ -276,7 +276,6 @@ func (ev *DownstreamEvaluator) Iterator(
Params: params,
Shards: shards,
}})
if err != nil {
return nil, err
}
@ -333,7 +332,6 @@ func ConcatEvaluator(evaluators []StepEvaluator) (StepEvaluator, error) {
vec = append(vec, cur...)
}
return done, ts, vec
},
func() (lastErr error) {
for _, eval := range evaluators {
@ -356,7 +354,7 @@ func ConcatEvaluator(evaluators []StepEvaluator) (StepEvaluator, error) {
case 1:
return errs[0]
default:
return fmt.Errorf("Multiple errors: %+v", errs)
return util.MultiError(errs)
}
},
)
@ -394,5 +392,4 @@ func ResultIterator(res Result, params Params) (iter.EntryIterator, error) {
return nil, fmt.Errorf("unexpected type (%s) for ResultIterator; expected %s", res.Data.Type(), ValueTypeStreams)
}
return iter.NewStreamsIterator(context.Background(), streams, params.Direction()), nil
}

@ -2,6 +2,7 @@ package util
import (
"bytes"
"errors"
"fmt"
"google.golang.org/grpc/codes"
@ -50,6 +51,16 @@ func (es MultiError) Err() error {
return es
}
// Is tells if all errors are the same as the target error.
func (es MultiError) Is(target error) bool {
for _, err := range es {
if !errors.Is(err, target) {
return false
}
}
return true
}
// IsConnCanceled returns true, if error is from a closed gRPC connection.
// copied from https://github.com/etcd-io/etcd/blob/7f47de84146bdc9225d2080ec8678ca8189a2d2b/clientv3/client.go#L646
func IsConnCanceled(err error) bool {

@ -15,6 +15,7 @@ import (
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/util"
)
func Test_writeError(t *testing.T) {
@ -26,6 +27,7 @@ func Test_writeError(t *testing.T) {
expectedStatus int
}{
{"cancelled", context.Canceled, ErrClientCanceled, StatusClientClosedRequest},
{"cancelled multi", util.MultiError{context.Canceled, context.Canceled}, ErrClientCanceled, StatusClientClosedRequest},
{"orgid", user.ErrNoOrgID, user.ErrNoOrgID.Error(), http.StatusBadRequest},
{"deadline", context.DeadlineExceeded, ErrDeadlineExceeded, http.StatusGatewayTimeout},
{"parse error", logql.ParseError{}, "parse error : ", http.StatusBadRequest},
@ -33,6 +35,7 @@ func Test_writeError(t *testing.T) {
{"internal", errors.New("foo"), "foo", http.StatusInternalServerError},
{"query error", chunk.ErrQueryMustContainMetricName, chunk.ErrQueryMustContainMetricName.Error(), http.StatusBadRequest},
{"wrapped query error", fmt.Errorf("wrapped: %w", chunk.ErrQueryMustContainMetricName), "wrapped: " + chunk.ErrQueryMustContainMetricName.Error(), http.StatusBadRequest},
{"multi mixed", util.MultiError{context.Canceled, context.DeadlineExceeded}, "2 errors: context canceled; context deadline exceeded", http.StatusInternalServerError},
} {
t.Run(tt.name, func(t *testing.T) {
rec := httptest.NewRecorder()

Loading…
Cancel
Save