|
|
|
|
@ -14,6 +14,7 @@ |
|
|
|
|
package tsdb |
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
"fmt" |
|
|
|
|
"io/ioutil" |
|
|
|
|
"math" |
|
|
|
|
"math/rand" |
|
|
|
|
@ -21,6 +22,7 @@ import ( |
|
|
|
|
"path/filepath" |
|
|
|
|
"sort" |
|
|
|
|
"testing" |
|
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
"github.com/oklog/ulid" |
|
|
|
|
"github.com/pkg/errors" |
|
|
|
|
@ -781,6 +783,99 @@ func TestTombstoneClean(t *testing.T) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// TestTombstoneCleanFail tests that a failing TombstoneClean doesn't leave any blocks behind.
|
|
|
|
|
// When TombstoneClean errors the original block that should be rebuilt doesn't get deleted so
|
|
|
|
|
// if TombstoneClean leaves any blocks behind these will overlap.
|
|
|
|
|
func TestTombstoneCleanFail(t *testing.T) { |
|
|
|
|
|
|
|
|
|
db, close := openTestDB(t, nil) |
|
|
|
|
defer close() |
|
|
|
|
|
|
|
|
|
var expectedBlockDirs []string |
|
|
|
|
|
|
|
|
|
// Create some empty blocks pending for compaction.
|
|
|
|
|
// totalBlocks should be >=2 so we have enough blocks to trigger compaction failure.
|
|
|
|
|
totalBlocks := 2 |
|
|
|
|
for i := 0; i < totalBlocks; i++ { |
|
|
|
|
entropy := rand.New(rand.NewSource(time.Now().UnixNano())) |
|
|
|
|
uid := ulid.MustNew(ulid.Now(), entropy) |
|
|
|
|
meta := &BlockMeta{ |
|
|
|
|
Version: 2, |
|
|
|
|
ULID: uid, |
|
|
|
|
} |
|
|
|
|
blockDir := filepath.Join(db.Dir(), uid.String()) |
|
|
|
|
block := createEmptyBlock(t, blockDir, meta) |
|
|
|
|
|
|
|
|
|
// Add some some fake tombstones to trigger the compaction.
|
|
|
|
|
tomb := memTombstones{} |
|
|
|
|
tomb[0] = Intervals{{0, 1}} |
|
|
|
|
block.tombstones = tomb |
|
|
|
|
|
|
|
|
|
db.blocks = append(db.blocks, block) |
|
|
|
|
expectedBlockDirs = append(expectedBlockDirs, blockDir) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Initialize the mockCompactorFailing with a room for a single compaction iteration.
|
|
|
|
|
// mockCompactorFailing will fail on the second iteration so we can check if the cleanup works as expected.
|
|
|
|
|
db.compactor = &mockCompactorFailing{ |
|
|
|
|
t: t, |
|
|
|
|
blocks: db.blocks, |
|
|
|
|
max: totalBlocks + 1, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// The compactor should trigger a failure here.
|
|
|
|
|
testutil.NotOk(t, db.CleanTombstones()) |
|
|
|
|
|
|
|
|
|
// Now check that the CleanTombstones didn't leave any blocks behind after a failure.
|
|
|
|
|
actualBlockDirs, err := blockDirs(db.dir) |
|
|
|
|
testutil.Ok(t, err) |
|
|
|
|
testutil.Equals(t, expectedBlockDirs, actualBlockDirs) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// mockCompactorFailing creates a new empty block on every write and fails when reached the max allowed total.
|
|
|
|
|
type mockCompactorFailing struct { |
|
|
|
|
t *testing.T |
|
|
|
|
blocks []*Block |
|
|
|
|
max int |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (*mockCompactorFailing) Plan(dir string) ([]string, error) { |
|
|
|
|
return nil, nil |
|
|
|
|
} |
|
|
|
|
func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) { |
|
|
|
|
if len(c.blocks) >= c.max { |
|
|
|
|
return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
entropy := rand.New(rand.NewSource(time.Now().UnixNano())) |
|
|
|
|
uid := ulid.MustNew(ulid.Now(), entropy) |
|
|
|
|
meta := &BlockMeta{ |
|
|
|
|
Version: 2, |
|
|
|
|
ULID: uid, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
block := createEmptyBlock(c.t, filepath.Join(dest, meta.ULID.String()), meta) |
|
|
|
|
c.blocks = append(c.blocks, block) |
|
|
|
|
|
|
|
|
|
// Now check that all expected blocks are actually persisted on disk.
|
|
|
|
|
// This way we make sure that the we have some blocks that are supposed to be removed.
|
|
|
|
|
var expectedBlocks []string |
|
|
|
|
for _, b := range c.blocks { |
|
|
|
|
expectedBlocks = append(expectedBlocks, filepath.Join(dest, b.Meta().ULID.String())) |
|
|
|
|
} |
|
|
|
|
actualBlockDirs, err := blockDirs(dest) |
|
|
|
|
testutil.Ok(c.t, err) |
|
|
|
|
|
|
|
|
|
testutil.Equals(c.t, expectedBlocks, actualBlockDirs) |
|
|
|
|
|
|
|
|
|
return block.Meta().ULID, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (*mockCompactorFailing) Compact(dest string, dirs ...string) (ulid.ULID, error) { |
|
|
|
|
return ulid.ULID{}, nil |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func TestDB_Retention(t *testing.T) { |
|
|
|
|
db, close := openTestDB(t, nil) |
|
|
|
|
defer close() |
|
|
|
|
|