|
|
|
@ -330,11 +330,15 @@ func TestIngesterWALBackpressureCheckpoint(t *testing.T) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func expectCheckpoint(t *testing.T, walDir string, shouldExist bool, max time.Duration) { |
|
|
|
|
once := make(chan struct{}, 1) |
|
|
|
|
once <- struct{}{} |
|
|
|
|
|
|
|
|
|
deadline := time.After(max) |
|
|
|
|
for { |
|
|
|
|
select { |
|
|
|
|
case <-deadline: |
|
|
|
|
require.Fail(t, "timeout while waiting for checkpoint existence:", shouldExist) |
|
|
|
|
case <-once: // Trick to ensure we check immediately before deferring to ticker.
|
|
|
|
|
default: |
|
|
|
|
<-time.After(max / 10) // check 10x over the duration
|
|
|
|
|
} |
|
|
|
@ -637,7 +641,7 @@ func TestIngesterWALReplaysUnorderedToOrdered(t *testing.T) { |
|
|
|
|
|
|
|
|
|
if waitForCheckpoint { |
|
|
|
|
// Ensure we have checkpointed now
|
|
|
|
|
expectCheckpoint(t, walDir, true, ingesterConfig.WAL.CheckpointDuration*2) // give a bit of buffer
|
|
|
|
|
expectCheckpoint(t, walDir, true, ingesterConfig.WAL.CheckpointDuration*10) // give a bit of buffer
|
|
|
|
|
|
|
|
|
|
// Add some more data after the checkpoint
|
|
|
|
|
tmp := end |
|
|
|
|