Improve error message if incoming logs timestamp is far too behind. (#5040)

* Improve error message if incoming logs timestamp is far too behind.

This is part of JSON response giving out as the response to HTTP /push endpoint.

Old message
```
{"code":400,"status":"error","message":"entry for stream '{foo=\"bar\"}' has timestamp too old: 1970-01-01 01:00:00.5 +0100 CET"}
```

New message
```
{"code":400,"status":"error","message":"entry for stream '{foo=\"bar\"}' has timestamp too old: 2021-12-28 01:48:45.5 +0100 CET, accepts timestamp from: 2021-12-29 09:48:45.737756651 +0100 CET"}
```

main rationale being, hard to know what is the closest timestamp that Loki is expecting without going through the config.
Also config is not straight forward (its `latest stream's timestamp - ingester.max-chunk-age/2`)

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>

* Tweaks the error message

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>

* Fix `distributor/validator_test`.

1. Make `time.Now()` mockable.
2. Make time format consistent across error messages.

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>

* Rename `getValidationContextFor` to `getValidationContextForTime`

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>
pull/5049/head
Kaviraj 3 years ago committed by GitHub
parent c0bec07e0d
commit 564f833a30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      pkg/distributor/distributor.go
  2. 2
      pkg/distributor/distributor_test.go
  3. 16
      pkg/distributor/validator.go
  4. 14
      pkg/distributor/validator_test.go
  5. 2
      pkg/validation/validate.go

@ -227,7 +227,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
validatedSamplesSize := 0 validatedSamplesSize := 0
validatedSamplesCount := 0 validatedSamplesCount := 0
validationContext := d.validator.getValidationContextFor(userID) validationContext := d.validator.getValidationContextForTime(time.Now(), userID)
for _, stream := range req.Streams { for _, stream := range req.Streams {
// Truncate first so subsequent steps have consistent line lengths // Truncate first so subsequent steps have consistent line lengths

@ -146,7 +146,7 @@ func Benchmark_SortLabelsOnPush(b *testing.B) {
d := prepare(&testing.T{}, limits, nil, func(addr string) (ring_client.PoolClient, error) { return ingester, nil }) d := prepare(&testing.T{}, limits, nil, func(addr string) (ring_client.PoolClient, error) { return ingester, nil })
defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck
request := makeWriteRequest(10, 10) request := makeWriteRequest(10, 10)
vCtx := d.validator.getValidationContextFor("123") vCtx := d.validator.getValidationContextForTime(testTime, "123")
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
stream := request.Streams[0] stream := request.Streams[0]
stream.Labels = `{buzz="f", a="b"}` stream.Labels = `{buzz="f", a="b"}`

@ -13,6 +13,10 @@ import (
"github.com/grafana/loki/pkg/validation" "github.com/grafana/loki/pkg/validation"
) )
const (
timeFormat = time.RFC3339
)
type Validator struct { type Validator struct {
Limits Limits
} }
@ -39,8 +43,7 @@ type validationContext struct {
userID string userID string
} }
func (v Validator) getValidationContextFor(userID string) validationContext { func (v Validator) getValidationContextForTime(now time.Time, userID string) validationContext {
now := time.Now()
return validationContext{ return validationContext{
userID: userID, userID: userID,
rejectOldSample: v.RejectOldSamples(userID), rejectOldSample: v.RejectOldSamples(userID),
@ -57,16 +60,21 @@ func (v Validator) getValidationContextFor(userID string) validationContext {
// ValidateEntry returns an error if the entry is invalid // ValidateEntry returns an error if the entry is invalid
func (v Validator) ValidateEntry(ctx validationContext, labels string, entry logproto.Entry) error { func (v Validator) ValidateEntry(ctx validationContext, labels string, entry logproto.Entry) error {
ts := entry.Timestamp.UnixNano() ts := entry.Timestamp.UnixNano()
// Makes time string on the error message formatted consistently.
formatedEntryTime := entry.Timestamp.Format(timeFormat)
formatedRejectMaxAgeTime := time.Unix(0, ctx.rejectOldSampleMaxAge).Format(timeFormat)
if ctx.rejectOldSample && ts < ctx.rejectOldSampleMaxAge { if ctx.rejectOldSample && ts < ctx.rejectOldSampleMaxAge {
validation.DiscardedSamples.WithLabelValues(validation.GreaterThanMaxSampleAge, ctx.userID).Inc() validation.DiscardedSamples.WithLabelValues(validation.GreaterThanMaxSampleAge, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.GreaterThanMaxSampleAge, ctx.userID).Add(float64(len(entry.Line))) validation.DiscardedBytes.WithLabelValues(validation.GreaterThanMaxSampleAge, ctx.userID).Add(float64(len(entry.Line)))
return httpgrpc.Errorf(http.StatusBadRequest, validation.GreaterThanMaxSampleAgeErrorMsg, labels, entry.Timestamp) return httpgrpc.Errorf(http.StatusBadRequest, validation.GreaterThanMaxSampleAgeErrorMsg, labels, formatedEntryTime, formatedRejectMaxAgeTime)
} }
if ts > ctx.creationGracePeriod { if ts > ctx.creationGracePeriod {
validation.DiscardedSamples.WithLabelValues(validation.TooFarInFuture, ctx.userID).Inc() validation.DiscardedSamples.WithLabelValues(validation.TooFarInFuture, ctx.userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.TooFarInFuture, ctx.userID).Add(float64(len(entry.Line))) validation.DiscardedBytes.WithLabelValues(validation.TooFarInFuture, ctx.userID).Add(float64(len(entry.Line)))
return httpgrpc.Errorf(http.StatusBadRequest, validation.TooFarInFutureErrorMsg, labels, entry.Timestamp) return httpgrpc.Errorf(http.StatusBadRequest, validation.TooFarInFutureErrorMsg, labels, formatedEntryTime)
} }
if maxSize := ctx.maxLineSize; maxSize != 0 && len(entry.Line) > maxSize { if maxSize := ctx.maxLineSize; maxSize != 0 && len(entry.Line) > maxSize {

@ -59,14 +59,20 @@ func TestValidator_ValidateEntry(t *testing.T) {
}, },
}, },
logproto.Entry{Timestamp: testTime.Add(-time.Hour * 5), Line: "test"}, logproto.Entry{Timestamp: testTime.Add(-time.Hour * 5), Line: "test"},
httpgrpc.Errorf(http.StatusBadRequest, validation.GreaterThanMaxSampleAgeErrorMsg, testStreamLabels, testTime.Add(-time.Hour*5)), httpgrpc.Errorf(
http.StatusBadRequest,
validation.GreaterThanMaxSampleAgeErrorMsg,
testStreamLabels,
testTime.Add(-time.Hour*5).Format(timeFormat),
testTime.Add(-1*time.Hour).Format(timeFormat), // same as RejectOldSamplesMaxAge
),
}, },
{ {
"test too new", "test too new",
"test", "test",
nil, nil,
logproto.Entry{Timestamp: testTime.Add(time.Hour * 5), Line: "test"}, logproto.Entry{Timestamp: testTime.Add(time.Hour * 5), Line: "test"},
httpgrpc.Errorf(http.StatusBadRequest, validation.TooFarInFutureErrorMsg, testStreamLabels, testTime.Add(time.Hour*5)), httpgrpc.Errorf(http.StatusBadRequest, validation.TooFarInFutureErrorMsg, testStreamLabels, testTime.Add(time.Hour*5).Format(timeFormat)),
}, },
{ {
"line too long", "line too long",
@ -89,7 +95,7 @@ func TestValidator_ValidateEntry(t *testing.T) {
v, err := NewValidator(o) v, err := NewValidator(o)
assert.NoError(t, err) assert.NoError(t, err)
err = v.ValidateEntry(v.getValidationContextFor(tt.userID), testStreamLabels, tt.entry) err = v.ValidateEntry(v.getValidationContextForTime(testTime, tt.userID), testStreamLabels, tt.entry)
assert.Equal(t, tt.expected, err) assert.Equal(t, tt.expected, err)
}) })
} }
@ -190,7 +196,7 @@ func TestValidator_ValidateLabels(t *testing.T) {
v, err := NewValidator(o) v, err := NewValidator(o)
assert.NoError(t, err) assert.NoError(t, err)
err = v.ValidateLabels(v.getValidationContextFor(tt.userID), mustParseLabels(tt.labels), logproto.Stream{Labels: tt.labels}) err = v.ValidateLabels(v.getValidationContextForTime(testTime, tt.userID), mustParseLabels(tt.labels), logproto.Stream{Labels: tt.labels})
assert.Equal(t, tt.expected, err) assert.Equal(t, tt.expected, err)
}) })
} }

@ -34,7 +34,7 @@ const (
TooFarBehind = "too_far_behind" TooFarBehind = "too_far_behind"
// GreaterThanMaxSampleAge is a reason for discarding log lines which are older than the current time - `reject_old_samples_max_age` // GreaterThanMaxSampleAge is a reason for discarding log lines which are older than the current time - `reject_old_samples_max_age`
GreaterThanMaxSampleAge = "greater_than_max_sample_age" GreaterThanMaxSampleAge = "greater_than_max_sample_age"
GreaterThanMaxSampleAgeErrorMsg = "entry for stream '%s' has timestamp too old: %v" GreaterThanMaxSampleAgeErrorMsg = "entry for stream '%s' has timestamp too old: %v, oldest acceptable timestamp is: %v"
// TooFarInFuture is a reason for discarding log lines which are newer than the current time + `creation_grace_period` // TooFarInFuture is a reason for discarding log lines which are newer than the current time + `creation_grace_period`
TooFarInFuture = "too_far_in_future" TooFarInFuture = "too_far_in_future"
TooFarInFutureErrorMsg = "entry for stream '%s' has timestamp too new: %v" TooFarInFutureErrorMsg = "entry for stream '%s' has timestamp too new: %v"

Loading…
Cancel
Save