chore(logging): Add entry's timestamp when rejected with `too far behind` (#12933)

Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>
pull/12946/head
Kaviraj Kanagaraj 1 year ago committed by GitHub
parent 5ada92b190
commit b05c4f7288
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 10
      pkg/chunkenc/interface.go
  2. 4
      pkg/chunkenc/interface_test.go
  3. 2
      pkg/ingester/stream.go
  4. 3
      pkg/ingester/stream_test.go

@ -24,6 +24,10 @@ var (
)
type errTooFarBehind struct {
// original timestmap of the entry itself.
entryTs time.Time
// cutoff is the oldest acceptable timstamp of the `stream` that entry belongs to.
cutoff time.Time
}
@ -32,12 +36,12 @@ func IsErrTooFarBehind(err error) bool {
return ok
}
func ErrTooFarBehind(cutoff time.Time) error {
return &errTooFarBehind{cutoff: cutoff}
func ErrTooFarBehind(entryTs, cutoff time.Time) error {
return &errTooFarBehind{entryTs: entryTs, cutoff: cutoff}
}
func (m *errTooFarBehind) Error() string {
return "entry too far behind, oldest acceptable timestamp is: " + m.cutoff.Format(time.RFC3339)
return fmt.Sprintf("entry too far behind, entry timestamp is: %s, oldest acceptable timestamp is: %s", m.entryTs.Format(time.RFC3339), m.cutoff.Format(time.RFC3339))
}
func IsOutOfOrderErr(err error) bool {

@ -31,7 +31,9 @@ func TestParseEncoding(t *testing.T) {
}
func TestIsOutOfOrderErr(t *testing.T) {
for _, err := range []error{ErrOutOfOrder, ErrTooFarBehind(time.Now())} {
now := time.Now()
for _, err := range []error{ErrOutOfOrder, ErrTooFarBehind(now, now)} {
require.Equal(t, true, IsOutOfOrderErr(err))
}
}

@ -394,7 +394,7 @@ func (s *stream) validateEntries(entries []logproto.Entry, isReplay, rateLimitWh
// The validity window for unordered writes is the highest timestamp present minus 1/2 * max-chunk-age.
cutoff := highestTs.Add(-s.cfg.MaxChunkAge / 2)
if !isReplay && s.unorderedWrites && !highestTs.IsZero() && cutoff.After(entries[i].Timestamp) {
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], chunkenc.ErrTooFarBehind(cutoff)})
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], chunkenc.ErrTooFarBehind(entries[i].Timestamp, cutoff)})
s.writeFailures.Log(s.tenant, fmt.Errorf("%w for stream %s", failedEntriesWithError[len(failedEntriesWithError)-1].e, s.labels))
outOfOrderSamples++
outOfOrderBytes += lineBytes

@ -84,8 +84,9 @@ func TestMaxReturnedStreamsErrors(t *testing.T) {
var expected bytes.Buffer
for i := 0; i < tc.expectErrs; i++ {
fmt.Fprintf(&expected,
"entry with timestamp %s ignored, reason: 'entry too far behind, oldest acceptable timestamp is: %s',\n",
"entry with timestamp %s ignored, reason: 'entry too far behind, entry timestamp is: %s, oldest acceptable timestamp is: %s',\n",
time.Unix(int64(i), 0).String(),
newLines[i].Timestamp.Format(time.RFC3339),
time.Unix(int64(numLogs), 0).Format(time.RFC3339),
)
}

Loading…
Cancel
Save