diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 9997b637f1..d57d813f1f 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -227,7 +227,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log validatedSamplesSize := 0 validatedSamplesCount := 0 - validationContext := d.validator.getValidationContextFor(userID) + validationContext := d.validator.getValidationContextForTime(time.Now(), userID) for _, stream := range req.Streams { // Truncate first so subsequent steps have consistent line lengths diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 7eeb8d5389..77b7a93975 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -146,7 +146,7 @@ func Benchmark_SortLabelsOnPush(b *testing.B) { d := prepare(&testing.T{}, limits, nil, func(addr string) (ring_client.PoolClient, error) { return ingester, nil }) defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck request := makeWriteRequest(10, 10) - vCtx := d.validator.getValidationContextFor("123") + vCtx := d.validator.getValidationContextForTime(testTime, "123") for n := 0; n < b.N; n++ { stream := request.Streams[0] stream.Labels = `{buzz="f", a="b"}` diff --git a/pkg/distributor/validator.go b/pkg/distributor/validator.go index c279c74625..f69dc5c18e 100644 --- a/pkg/distributor/validator.go +++ b/pkg/distributor/validator.go @@ -13,6 +13,10 @@ import ( "github.com/grafana/loki/pkg/validation" ) +const ( + timeFormat = time.RFC3339 +) + type Validator struct { Limits } @@ -39,8 +43,7 @@ type validationContext struct { userID string } -func (v Validator) getValidationContextFor(userID string) validationContext { - now := time.Now() +func (v Validator) getValidationContextForTime(now time.Time, userID string) validationContext { return validationContext{ userID: userID, rejectOldSample: v.RejectOldSamples(userID), @@ -57,16 +60,21 @@ func (v Validator) getValidationContextFor(userID string) validationContext { // ValidateEntry returns an error if the entry is invalid func (v Validator) ValidateEntry(ctx validationContext, labels string, entry logproto.Entry) error { ts := entry.Timestamp.UnixNano() + + // Makes time string on the error message formatted consistently. + formatedEntryTime := entry.Timestamp.Format(timeFormat) + formatedRejectMaxAgeTime := time.Unix(0, ctx.rejectOldSampleMaxAge).Format(timeFormat) + if ctx.rejectOldSample && ts < ctx.rejectOldSampleMaxAge { validation.DiscardedSamples.WithLabelValues(validation.GreaterThanMaxSampleAge, ctx.userID).Inc() validation.DiscardedBytes.WithLabelValues(validation.GreaterThanMaxSampleAge, ctx.userID).Add(float64(len(entry.Line))) - return httpgrpc.Errorf(http.StatusBadRequest, validation.GreaterThanMaxSampleAgeErrorMsg, labels, entry.Timestamp) + return httpgrpc.Errorf(http.StatusBadRequest, validation.GreaterThanMaxSampleAgeErrorMsg, labels, formatedEntryTime, formatedRejectMaxAgeTime) } if ts > ctx.creationGracePeriod { validation.DiscardedSamples.WithLabelValues(validation.TooFarInFuture, ctx.userID).Inc() validation.DiscardedBytes.WithLabelValues(validation.TooFarInFuture, ctx.userID).Add(float64(len(entry.Line))) - return httpgrpc.Errorf(http.StatusBadRequest, validation.TooFarInFutureErrorMsg, labels, entry.Timestamp) + return httpgrpc.Errorf(http.StatusBadRequest, validation.TooFarInFutureErrorMsg, labels, formatedEntryTime) } if maxSize := ctx.maxLineSize; maxSize != 0 && len(entry.Line) > maxSize { diff --git a/pkg/distributor/validator_test.go b/pkg/distributor/validator_test.go index af3625e347..1426cfc681 100644 --- a/pkg/distributor/validator_test.go +++ b/pkg/distributor/validator_test.go @@ -59,14 +59,20 @@ func TestValidator_ValidateEntry(t *testing.T) { }, }, logproto.Entry{Timestamp: testTime.Add(-time.Hour * 5), Line: "test"}, - httpgrpc.Errorf(http.StatusBadRequest, validation.GreaterThanMaxSampleAgeErrorMsg, testStreamLabels, testTime.Add(-time.Hour*5)), + httpgrpc.Errorf( + http.StatusBadRequest, + validation.GreaterThanMaxSampleAgeErrorMsg, + testStreamLabels, + testTime.Add(-time.Hour*5).Format(timeFormat), + testTime.Add(-1*time.Hour).Format(timeFormat), // same as RejectOldSamplesMaxAge + ), }, { "test too new", "test", nil, logproto.Entry{Timestamp: testTime.Add(time.Hour * 5), Line: "test"}, - httpgrpc.Errorf(http.StatusBadRequest, validation.TooFarInFutureErrorMsg, testStreamLabels, testTime.Add(time.Hour*5)), + httpgrpc.Errorf(http.StatusBadRequest, validation.TooFarInFutureErrorMsg, testStreamLabels, testTime.Add(time.Hour*5).Format(timeFormat)), }, { "line too long", @@ -89,7 +95,7 @@ func TestValidator_ValidateEntry(t *testing.T) { v, err := NewValidator(o) assert.NoError(t, err) - err = v.ValidateEntry(v.getValidationContextFor(tt.userID), testStreamLabels, tt.entry) + err = v.ValidateEntry(v.getValidationContextForTime(testTime, tt.userID), testStreamLabels, tt.entry) assert.Equal(t, tt.expected, err) }) } @@ -190,7 +196,7 @@ func TestValidator_ValidateLabels(t *testing.T) { v, err := NewValidator(o) assert.NoError(t, err) - err = v.ValidateLabels(v.getValidationContextFor(tt.userID), mustParseLabels(tt.labels), logproto.Stream{Labels: tt.labels}) + err = v.ValidateLabels(v.getValidationContextForTime(testTime, tt.userID), mustParseLabels(tt.labels), logproto.Stream{Labels: tt.labels}) assert.Equal(t, tt.expected, err) }) } diff --git a/pkg/validation/validate.go b/pkg/validation/validate.go index d8fb7903af..e38f1bd337 100644 --- a/pkg/validation/validate.go +++ b/pkg/validation/validate.go @@ -34,7 +34,7 @@ const ( TooFarBehind = "too_far_behind" // GreaterThanMaxSampleAge is a reason for discarding log lines which are older than the current time - `reject_old_samples_max_age` GreaterThanMaxSampleAge = "greater_than_max_sample_age" - GreaterThanMaxSampleAgeErrorMsg = "entry for stream '%s' has timestamp too old: %v" + GreaterThanMaxSampleAgeErrorMsg = "entry for stream '%s' has timestamp too old: %v, oldest acceptable timestamp is: %v" // TooFarInFuture is a reason for discarding log lines which are newer than the current time + `creation_grace_period` TooFarInFuture = "too_far_in_future" TooFarInFutureErrorMsg = "entry for stream '%s' has timestamp too new: %v"