diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 3722438f70..6b1d63254b 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -301,7 +301,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log validatedLineSize := 0 validatedLineCount := 0 - var validationErrors util.MultiError + var validationErrors util.GroupedErrors validationContext := d.validator.getValidationContextForTime(time.Now(), tenantID) func() { @@ -376,7 +376,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log }() var validationErr error - if validationErrors != nil { + if validationErrors.Err() != nil { validationErr = httpgrpc.Errorf(http.StatusBadRequest, validationErrors.Error()) } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 72287de05f..b17fe7f7d5 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -72,7 +72,7 @@ func TestDistributor(t *testing.T) { streams: 1, maxLineSize: 1, expectedResponse: success, - expectedError: httpgrpc.Errorf(http.StatusBadRequest, "100 errors: %s", strings.Repeat(fmt.Sprintf(validation.LineTooLongErrorMsg+"; ", 1, "{foo=\"bar\"}", 10), 99)), + expectedError: httpgrpc.Errorf(http.StatusBadRequest, "100 errors like: %s", fmt.Sprintf(validation.LineTooLongErrorMsg, 1, "{foo=\"bar\"}", 10)), }, { lines: 100, @@ -87,7 +87,7 @@ func TestDistributor(t *testing.T) { mangleLabels: 1, maxLineSize: 1, expectedResponse: success, - expectedError: httpgrpc.Errorf(http.StatusBadRequest, "11 errors: %s; %s", fmt.Sprintf(validation.InvalidLabelsErrorMsg, "{ab\"", "1:4: parse error: unterminated quoted string"), strings.Repeat(fmt.Sprintf(validation.LineTooLongErrorMsg+"; ", 1, "{foo=\"bar\"}", 10), 9)), + expectedError: httpgrpc.Errorf(http.StatusBadRequest, "1 errors like: %s; 10 errors like: %s", fmt.Sprintf(validation.InvalidLabelsErrorMsg, "{ab\"", "1:4: parse error: unterminated quoted string"), fmt.Sprintf(validation.LineTooLongErrorMsg, 1, "{foo=\"bar\"}", 10)), }, } { t.Run(fmt.Sprintf("[%d](lines=%v)", i, tc.lines), func(t *testing.T) { @@ -112,11 +112,7 @@ func TestDistributor(t *testing.T) { response, err := distributors[i%len(distributors)].Push(ctx, &request) assert.Equal(t, tc.expectedResponse, response) - if tc.expectedError != nil { - assert.Contains(t, err.Error(), tc.expectedError.Error()) - } else { - assert.NoError(t, err) - } + assert.Equal(t, tc.expectedError, err) }) } } diff --git a/pkg/util/errors.go b/pkg/util/errors.go index be82e197ea..3483f38b07 100644 --- a/pkg/util/errors.go +++ b/pkg/util/errors.go @@ -100,6 +100,33 @@ func (es MultiError) IsDeadlineExceeded() bool { return true } +type GroupedErrors struct { + MultiError +} + +func (es GroupedErrors) Error() string { + mapErrs := make(map[string]int, len(es.MultiError)) + for _, err := range es.MultiError { + mapErrs[err.Error()]++ + } + + var idx int + var buf bytes.Buffer + uniqueErrs := len(mapErrs) + for err, n := range mapErrs { + if idx != 0 { + buf.WriteString("; ") + } + if uniqueErrs > 1 || n > 1 { + _, _ = fmt.Fprintf(&buf, "%d errors like: ", n) + } + buf.WriteString(err) + idx++ + } + + return buf.String() +} + // IsConnCanceled returns true, if error is from a closed gRPC connection. // copied from https://github.com/etcd-io/etcd/blob/7f47de84146bdc9225d2080ec8678ca8189a2d2b/clientv3/client.go#L646 func IsConnCanceled(err error) bool {