diff --git a/pkg/iter/entry_iterator.go b/pkg/iter/entry_iterator.go index 7b7445483e..c79d8676da 100644 --- a/pkg/iter/entry_iterator.go +++ b/pkg/iter/entry_iterator.go @@ -3,7 +3,6 @@ package iter import ( "container/heap" "context" - "fmt" "io" "sync" "time" @@ -11,6 +10,7 @@ import ( "github.com/grafana/loki/pkg/helpers" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/stats" + "github.com/grafana/loki/pkg/util" ) // EntryIterator iterates over entries in time-order. @@ -285,7 +285,7 @@ func (i *heapIterator) Error() error { case 1: return i.errs[0] default: - return fmt.Errorf("Multiple errors: %+v", i.errs) + return util.MultiError(i.errs) } } @@ -502,7 +502,6 @@ func NewReversedIter(it EntryIterator, limit uint32, preload bool) (EntryIterato entriesWithLabels: make([]entryWithLabels, 0, 1024), limit: limit, }, it.Error() - if err != nil { return nil, err } @@ -578,7 +577,6 @@ func NewEntryReversedIter(it EntryIterator) (EntryIterator, error) { iter: it, buf: entryBufferPool.Get().(*entryBuffer), }, it.Error() - if err != nil { return nil, err } diff --git a/pkg/iter/sample_iterator.go b/pkg/iter/sample_iterator.go index a2ab973c38..a2ec592d43 100644 --- a/pkg/iter/sample_iterator.go +++ b/pkg/iter/sample_iterator.go @@ -3,12 +3,12 @@ package iter import ( "container/heap" "context" - "fmt" "io" "github.com/grafana/loki/pkg/helpers" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/stats" + "github.com/grafana/loki/pkg/util" ) // SampleIterator iterates over samples in time-order. @@ -154,7 +154,6 @@ type heapSampleIterator struct { // NewHeapSampleIterator returns a new iterator which uses a heap to merge together // entries for multiple iterators. func NewHeapSampleIterator(ctx context.Context, is []SampleIterator) SampleIterator { - return &heapSampleIterator{ stats: stats.GetChunkData(ctx), is: is, @@ -268,7 +267,7 @@ func (i *heapSampleIterator) Error() error { case 1: return i.errs[0] default: - return fmt.Errorf("Multiple errors: %+v", i.errs) + return util.MultiError(i.errs) } } diff --git a/pkg/logql/engine_test.go b/pkg/logql/engine_test.go index 0f7c1c5ac4..5d01e5e5d9 100644 --- a/pkg/logql/engine_test.go +++ b/pkg/logql/engine_test.go @@ -20,12 +20,13 @@ import ( "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/stats" + "github.com/grafana/loki/pkg/util" ) var ( testSize = int64(300) ErrMock = errors.New("mock error") - ErrMockMultiple = errors.New("Multiple errors: [mock error mock error]") + ErrMockMultiple = util.MultiError{ErrMock, ErrMock} ) func TestEngine_LogsInstantQuery(t *testing.T) { diff --git a/pkg/logql/evaluator.go b/pkg/logql/evaluator.go index 0dff7d8e4b..db178505fd 100644 --- a/pkg/logql/evaluator.go +++ b/pkg/logql/evaluator.go @@ -3,7 +3,6 @@ package logql import ( "container/heap" "context" - "fmt" "math" "sort" "time" @@ -15,6 +14,7 @@ import ( "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/log" + "github.com/grafana/loki/pkg/util" ) type QueryRangeType string @@ -614,7 +614,7 @@ func binOpStepEvaluator( case 1: return errs[0] default: - return fmt.Errorf("Multiple errors: %+v", errs) + return util.MultiError(errs) } }) } diff --git a/pkg/logql/sharding.go b/pkg/logql/sharding.go index f315000893..78cfbed482 100644 --- a/pkg/logql/sharding.go +++ b/pkg/logql/sharding.go @@ -13,6 +13,7 @@ import ( "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logql/stats" + "github.com/grafana/loki/pkg/util" ) /* @@ -44,7 +45,6 @@ func NewShardedEngine(opts EngineOpts, downstreamable Downstreamable, metrics *S metrics: metrics, limits: limits, } - } // Query constructs a Query @@ -173,7 +173,6 @@ func (ev DownstreamEvaluator) Downstream(ctx context.Context, queries []Downstre } return results, nil - } type errorQuerier struct{} @@ -181,6 +180,7 @@ type errorQuerier struct{} func (errorQuerier) SelectLogs(ctx context.Context, p SelectLogParams) (iter.EntryIterator, error) { return nil, errors.New("Unimplemented") } + func (errorQuerier) SelectSamples(ctx context.Context, p SelectSampleParams) (iter.SampleIterator, error) { return nil, errors.New("Unimplemented") } @@ -276,7 +276,6 @@ func (ev *DownstreamEvaluator) Iterator( Params: params, Shards: shards, }}) - if err != nil { return nil, err } @@ -333,7 +332,6 @@ func ConcatEvaluator(evaluators []StepEvaluator) (StepEvaluator, error) { vec = append(vec, cur...) } return done, ts, vec - }, func() (lastErr error) { for _, eval := range evaluators { @@ -356,7 +354,7 @@ func ConcatEvaluator(evaluators []StepEvaluator) (StepEvaluator, error) { case 1: return errs[0] default: - return fmt.Errorf("Multiple errors: %+v", errs) + return util.MultiError(errs) } }, ) @@ -394,5 +392,4 @@ func ResultIterator(res Result, params Params) (iter.EntryIterator, error) { return nil, fmt.Errorf("unexpected type (%s) for ResultIterator; expected %s", res.Data.Type(), ValueTypeStreams) } return iter.NewStreamsIterator(context.Background(), streams, params.Direction()), nil - } diff --git a/pkg/util/errors.go b/pkg/util/errors.go index 0a07d87039..d2d924ded3 100644 --- a/pkg/util/errors.go +++ b/pkg/util/errors.go @@ -2,6 +2,7 @@ package util import ( "bytes" + "errors" "fmt" "google.golang.org/grpc/codes" @@ -50,6 +51,16 @@ func (es MultiError) Err() error { return es } +// Is tells if all errors are the same as the target error. +func (es MultiError) Is(target error) bool { + for _, err := range es { + if !errors.Is(err, target) { + return false + } + } + return true +} + // IsConnCanceled returns true, if error is from a closed gRPC connection. // copied from https://github.com/etcd-io/etcd/blob/7f47de84146bdc9225d2080ec8678ca8189a2d2b/clientv3/client.go#L646 func IsConnCanceled(err error) bool { diff --git a/pkg/util/server/error_test.go b/pkg/util/server/error_test.go index 5ea16c7fa0..af85c6dac5 100644 --- a/pkg/util/server/error_test.go +++ b/pkg/util/server/error_test.go @@ -15,6 +15,7 @@ import ( "github.com/weaveworks/common/user" "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/util" ) func Test_writeError(t *testing.T) { @@ -26,6 +27,7 @@ func Test_writeError(t *testing.T) { expectedStatus int }{ {"cancelled", context.Canceled, ErrClientCanceled, StatusClientClosedRequest}, + {"cancelled multi", util.MultiError{context.Canceled, context.Canceled}, ErrClientCanceled, StatusClientClosedRequest}, {"orgid", user.ErrNoOrgID, user.ErrNoOrgID.Error(), http.StatusBadRequest}, {"deadline", context.DeadlineExceeded, ErrDeadlineExceeded, http.StatusGatewayTimeout}, {"parse error", logql.ParseError{}, "parse error : ", http.StatusBadRequest}, @@ -33,6 +35,7 @@ func Test_writeError(t *testing.T) { {"internal", errors.New("foo"), "foo", http.StatusInternalServerError}, {"query error", chunk.ErrQueryMustContainMetricName, chunk.ErrQueryMustContainMetricName.Error(), http.StatusBadRequest}, {"wrapped query error", fmt.Errorf("wrapped: %w", chunk.ErrQueryMustContainMetricName), "wrapped: " + chunk.ErrQueryMustContainMetricName.Error(), http.StatusBadRequest}, + {"multi mixed", util.MultiError{context.Canceled, context.DeadlineExceeded}, "2 errors: context canceled; context deadline exceeded", http.StatusInternalServerError}, } { t.Run(tt.name, func(t *testing.T) { rec := httptest.NewRecorder()