Improve error message for stream rate limit. (#4207)

* Improve error message for stream rate limit.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Move stream rate limit error message to validation package.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Use ByteSize for number values in stream rate limit error so we can
print human readable rate limits and log line byte lengths.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Fix lint issues.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Don't need to type assert twice.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Compare errors contents instead of doing error equals comparison
directly since the rate limit error is an RPC error.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Don't call Limit on hot path just to log the value in errors.

Signed-off-by: Callum Styan <callumstyan@gmail.com>

* Make coment about rate limit clearer.

Signed-off-by: Callum Styan <callumstyan@gmail.com>
pull/4396/head
Callum Styan 4 years ago committed by GitHub
parent de0c469161
commit 25163470b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      pkg/ingester/stream.go
  2. 11
      pkg/ingester/stream_test.go
  3. 17
      pkg/validation/validate.go

@ -21,6 +21,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/util/flagext"
"github.com/grafana/loki/pkg/validation"
)
@ -49,8 +50,7 @@ var (
)
var (
ErrEntriesExist = errors.New("duplicate push - entries already exist")
ErrStreamRateLimit = errors.New("stream rate limit exceeded")
ErrEntriesExist = errors.New("duplicate push - entries already exist")
)
func init() {
@ -218,6 +218,11 @@ func (s *stream) Push(
}
}()
// This call uses a mutex under the hood, cache the result since we're checking the limit
// on each entry in the push (hot path) and we only use this value when logging entries
// over the rate limit.
limit := s.limiter.lim.Limit()
// Don't fail on the first append error - if samples are sent out of order,
// we still want to append the later ones.
for i := range entries {
@ -240,7 +245,7 @@ func (s *stream) Push(
// Check if this this should be rate limited.
now := time.Now()
if !s.limiter.AllowN(now, len(entries[i].Line)) {
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], ErrStreamRateLimit})
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], &validation.ErrStreamRateLimit{RateLimit: flagext.ByteSize(limit), Labels: s.labelsString, Bytes: flagext.ByteSize(len(entries[i].Line))}})
rateLimitedSamples++
rateLimitedBytes += len(entries[i].Line)
continue
@ -318,14 +323,15 @@ func (s *stream) Push(
if len(failedEntriesWithError) > 0 {
lastEntryWithErr := failedEntriesWithError[len(failedEntriesWithError)-1]
if lastEntryWithErr.e != chunkenc.ErrOutOfOrder && lastEntryWithErr.e != ErrStreamRateLimit {
_, ok := lastEntryWithErr.e.(*validation.ErrStreamRateLimit)
if lastEntryWithErr.e != chunkenc.ErrOutOfOrder && !ok {
return bytesAdded, lastEntryWithErr.e
}
var statusCode int
if lastEntryWithErr.e == chunkenc.ErrOutOfOrder {
statusCode = http.StatusBadRequest
}
if lastEntryWithErr.e == ErrStreamRateLimit {
if ok {
statusCode = http.StatusTooManyRequests
}
// Return a http status 4xx request response with all failed entries.

@ -18,6 +18,7 @@ import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/util/flagext"
"github.com/grafana/loki/pkg/validation"
)
@ -329,13 +330,13 @@ func TestPushRateLimit(t *testing.T) {
NilMetrics,
)
// Counter should be 2 now since the first line will be deduped.
_, err = s.Push(context.Background(), []logproto.Entry{
entries := []logproto.Entry{
{Timestamp: time.Unix(1, 0), Line: "aaaaaaaaaa"},
{Timestamp: time.Unix(1, 0), Line: "aaaaaaaaab"},
}, recordPool.GetRecord(), 0)
require.Contains(t, err.Error(), ErrStreamRateLimit.Error())
require.Contains(t, err.Error(), "total ignored: 1 out of 2")
}
// Counter should be 2 now since the first line will be deduped.
_, err = s.Push(context.Background(), entries, recordPool.GetRecord(), 0)
require.Contains(t, err.Error(), (&validation.ErrStreamRateLimit{RateLimit: l.PerStreamRateLimit, Labels: s.labelsString, Bytes: flagext.ByteSize(len(entries[1].Line))}).Error())
}
func iterEq(t *testing.T, exp []logproto.Entry, got iter.EntryIterator) {

@ -1,7 +1,11 @@
package validation
import (
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/loki/pkg/util/flagext"
)
const (
@ -47,6 +51,19 @@ const (
DuplicateLabelNamesErrorMsg = "stream '%s' has duplicate label name: '%s'"
)
type ErrStreamRateLimit struct {
RateLimit flagext.ByteSize
Labels string
Bytes flagext.ByteSize
}
func (e *ErrStreamRateLimit) Error() string {
return fmt.Sprintf("Per stream rate limit exceeded (limit: %s/sec) while attempting to ingest for stream '%s' totaling %s, consider splitting a stream via additional labels or contact your Loki administrator to see if the limt can be increased",
e.RateLimit.String(),
e.Labels,
e.Bytes.String())
}
// MutatedSamples is a metric of the total number of lines mutated, by reason.
var MutatedSamples = prometheus.NewCounterVec(
prometheus.CounterOpts{

Loading…
Cancel
Save