introduces "entry too far behind" instrumentation for unordered writes (#4592)

pull/4595/head
Owen Diehl 5 years ago committed by GitHub
parent de7ab3fde2
commit cd80bc5e7e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      pkg/chunkenc/interface.go
  2. 12
      pkg/chunkenc/interface_test.go
  3. 17
      pkg/ingester/stream.go
  4. 2
      pkg/ingester/stream_test.go
  5. 1
      pkg/validation/validate.go

@ -17,11 +17,16 @@ import (
var (
ErrChunkFull = errors.New("chunk full")
ErrOutOfOrder = errors.New("entry out of order")
ErrTooFarBehind = errors.New("entry too far behind")
ErrInvalidSize = errors.New("invalid size")
ErrInvalidFlag = errors.New("invalid flag")
ErrInvalidChecksum = errors.New("invalid chunk checksum")
)
func IsOutOfOrderErr(err error) bool {
return err == ErrOutOfOrder || err == ErrTooFarBehind
}
// Encoding is the identifier for a chunk encoding.
type Encoding byte

@ -1,6 +1,10 @@
package chunkenc
import "testing"
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestParseEncoding(t *testing.T) {
tests := []struct {
@ -24,3 +28,9 @@ func TestParseEncoding(t *testing.T) {
})
}
}
func TestIsOutOfOrderErr(t *testing.T) {
for _, err := range []error{ErrOutOfOrder, ErrTooFarBehind} {
require.Equal(t, true, IsOutOfOrderErr(err))
}
}

@ -209,8 +209,12 @@ func (s *stream) Push(
var rateLimitedSamples, rateLimitedBytes int
defer func() {
if outOfOrderSamples > 0 {
validation.DiscardedSamples.WithLabelValues(validation.OutOfOrder, s.tenant).Add(float64(outOfOrderSamples))
validation.DiscardedBytes.WithLabelValues(validation.OutOfOrder, s.tenant).Add(float64(outOfOrderBytes))
name := validation.OutOfOrder
if s.unorderedWrites {
name = validation.TooFarBehind
}
validation.DiscardedSamples.WithLabelValues(name, s.tenant).Add(float64(outOfOrderSamples))
validation.DiscardedBytes.WithLabelValues(name, s.tenant).Add(float64(outOfOrderBytes))
}
if rateLimitedSamples > 0 {
validation.DiscardedSamples.WithLabelValues(validation.StreamRateLimit, s.tenant).Add(float64(rateLimitedSamples))
@ -253,12 +257,12 @@ func (s *stream) Push(
// The validity window for unordered writes is the highest timestamp present minus 1/2 * max-chunk-age.
if s.unorderedWrites && !s.highestTs.IsZero() && s.highestTs.Add(-s.cfg.MaxChunkAge/2).After(entries[i].Timestamp) {
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], chunkenc.ErrOutOfOrder})
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], chunkenc.ErrTooFarBehind})
outOfOrderSamples++
outOfOrderBytes += len(entries[i].Line)
} else if err := chunk.chunk.Append(&entries[i]); err != nil {
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], err})
if err == chunkenc.ErrOutOfOrder {
if chunkenc.IsOutOfOrderErr(err) {
outOfOrderSamples++
outOfOrderBytes += len(entries[i].Line)
}
@ -324,11 +328,12 @@ func (s *stream) Push(
if len(failedEntriesWithError) > 0 {
lastEntryWithErr := failedEntriesWithError[len(failedEntriesWithError)-1]
_, ok := lastEntryWithErr.e.(*validation.ErrStreamRateLimit)
if lastEntryWithErr.e != chunkenc.ErrOutOfOrder && !ok {
outOfOrder := chunkenc.IsOutOfOrderErr(lastEntryWithErr.e)
if !outOfOrder && !ok {
return bytesAdded, lastEntryWithErr.e
}
var statusCode int
if lastEntryWithErr.e == chunkenc.ErrOutOfOrder {
if outOfOrder {
statusCode = http.StatusBadRequest
}
if ok {

@ -77,7 +77,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) {
var expected bytes.Buffer
for i := 0; i < tc.expectErrs; i++ {
fmt.Fprintf(&expected,
"entry with timestamp %s ignored, reason: 'entry out of order' for stream: {foo=\"bar\"},\n",
"entry with timestamp %s ignored, reason: 'entry too far behind' for stream: {foo=\"bar\"},\n",
time.Unix(int64(i), 0).String(),
)
}

@ -31,6 +31,7 @@ const (
// rather than the overall ingestion rate limit.
StreamRateLimit = "per_stream_rate_limit"
OutOfOrder = "out_of_order"
TooFarBehind = "too_far_behind"
// GreaterThanMaxSampleAge is a reason for discarding log lines which are older than the current time - `reject_old_samples_max_age`
GreaterThanMaxSampleAge = "greater_than_max_sample_age"
GreaterThanMaxSampleAgeErrorMsg = "entry for stream '%s' has timestamp too old: %v"

Loading…
Cancel
Save