ignore validity window during wal replay (#4596)

pull/4599/head
Owen Diehl 4 years ago committed by GitHub
parent 45113cb375
commit 633b4cab53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      pkg/ingester/stream.go
  2. 45
      pkg/ingester/stream_test.go

@ -182,7 +182,8 @@ func (s *stream) Push(
s.chunkMtx.Lock()
defer s.chunkMtx.Unlock()
if counter > 0 && counter <= s.entryCt {
isReplay := counter > 0
if isReplay && counter <= s.entryCt {
var byteCt int
for _, e := range entries {
byteCt += len(e.Line)
@ -256,7 +257,7 @@ func (s *stream) Push(
}
// The validity window for unordered writes is the highest timestamp present minus 1/2 * max-chunk-age.
if s.unorderedWrites && !s.highestTs.IsZero() && s.highestTs.Add(-s.cfg.MaxChunkAge/2).After(entries[i].Timestamp) {
if !isReplay && s.unorderedWrites && !s.highestTs.IsZero() && s.highestTs.Add(-s.cfg.MaxChunkAge/2).After(entries[i].Timestamp) {
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], chunkenc.ErrTooFarBehind})
outOfOrderSamples++
outOfOrderBytes += len(entries[i].Line)

@ -339,6 +339,51 @@ func TestPushRateLimit(t *testing.T) {
require.Contains(t, err.Error(), (&validation.ErrStreamRateLimit{RateLimit: l.PerStreamRateLimit, Labels: s.labelsString, Bytes: flagext.ByteSize(len(entries[1].Line))}).Error())
}
func TestReplayAppendIgnoresValidityWindow(t *testing.T) {
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
cfg := defaultConfig()
cfg.MaxChunkAge = time.Minute
s := newStream(
cfg,
limiter,
"fake",
model.Fingerprint(0),
labels.Labels{
{Name: "foo", Value: "bar"},
},
true,
NilMetrics,
)
base := time.Now()
entries := []logproto.Entry{
{Timestamp: base, Line: "1"},
}
// Push a first entry (it doesn't matter if we look like we're replaying or not)
_, err = s.Push(context.Background(), entries, nil, 1)
require.Nil(t, err)
// Create a sample outside the validity window
entries = []logproto.Entry{
{Timestamp: base.Add(-time.Hour), Line: "2"},
}
// Pretend it's not a replay, ensure we error
_, err = s.Push(context.Background(), entries, recordPool.GetRecord(), 0)
require.NotNil(t, err)
// Now pretend it's a replay. The same write should succeed.
_, err = s.Push(context.Background(), entries, nil, 2)
require.Nil(t, err)
}
func iterEq(t *testing.T, exp []logproto.Entry, got iter.EntryIterator) {
var i int
for got.Next() {

Loading…
Cancel
Save