diff --git a/compact.go b/compact.go index 7799b640f5..dd66bfd963 100644 --- a/compact.go +++ b/compact.go @@ -87,10 +87,19 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { } type compactorOptions struct { - maxBlockRange uint64 + blockRanges []int64 } func newCompactor(dir string, r prometheus.Registerer, l log.Logger, opts *compactorOptions) *compactor { + if opts.blockRanges == nil { + opts.blockRanges = []int64{ + int64(2 * time.Hour), + int64(6 * time.Hour), + int64(24 * time.Hour), + int64(72 * time.Hour), // 3d + int64(216 * time.Hour), // 9d + } + } return &compactor{ dir: dir, opts: opts, @@ -133,37 +142,84 @@ func (c *compactor) Plan() ([][]string, error) { return dms[i].meta.MinTime < dms[j].meta.MinTime }) - if len(dms) == 0 { + if len(dms) <= 1 { return nil, nil } - sliceDirs := func(i, j int) [][]string { + sliceDirs := func(dms []dirMeta) [][]string { + if len(dms) == 0 { + return nil + } var res []string - for k := i; k < j; k++ { - res = append(res, dms[k].dir) + for _, dm := range dms { + res = append(res, dm.dir) } return [][]string{res} } - // Then we care about compacting multiple blocks, starting with the oldest. - for i := 0; i < len(dms)-compactionBlocksLen+1; i++ { - if c.match(dms[i : i+3]) { - return sliceDirs(i, i+compactionBlocksLen), nil + return sliceDirs(c.selectDirs(dms)), nil +} + +func (c *compactor) selectDirs(ds []dirMeta) []dirMeta { + return selectRecurse(ds, c.opts.blockRanges) +} + +func selectRecurse(dms []dirMeta, intervals []int64) []dirMeta { + if len(intervals) == 0 { + return dms + } + + // Get the blocks by the max interval + blocks := splitByRange(dms, intervals[len(intervals)-1]) + dirs := []dirMeta{} + for i := len(blocks) - 1; i >= 0; i-- { + // We need to choose the oldest blocks to compact. If there are a couple of blocks in + // the largest interval, we should compact those first. + if len(blocks[i]) > 1 { + dirs = blocks[i] + break } } - return nil, nil + // If there are too many blocks, see if a smaller interval will catch them. + // i.e, if we have 0-20, 60-80, 80-100; all fall under 0-240, but we'd rather compact 60-100 + // than all at once. + if len(dirs) > 2 { + smallerDirs := selectRecurse(dirs, intervals[:len(intervals)-1]) + if len(smallerDirs) > 1 { + return smallerDirs + } + } + + return dirs } -func (c *compactor) match(dirs []dirMeta) bool { - g := dirs[0].meta.Compaction.Generation +func splitByRange(ds []dirMeta, tr int64) [][]dirMeta { + splitDirs := [][]dirMeta{} + t0 := ds[0].meta.MinTime - ds[0].meta.MinTime%tr + dirs := []dirMeta{} - for _, d := range dirs { - if d.meta.Compaction.Generation != g { - return false + for _, dir := range ds { + if intervalContains(t0, t0+tr, dir.meta.MinTime) && intervalContains(t0, t0+tr, dir.meta.MaxTime) { + dirs = append(dirs, dir) + continue + } + + if dir.meta.MinTime >= t0+tr { + splitDirs = append(splitDirs, dirs) + dirs = []dirMeta{} + t0 = dir.meta.MinTime - dir.meta.MinTime%tr + if intervalContains(t0, t0+tr, dir.meta.MinTime) && intervalContains(t0, t0+tr, dir.meta.MaxTime) { + dirs = append(dirs, dir) + } } } - return uint64(dirs[len(dirs)-1].meta.MaxTime-dirs[0].meta.MinTime) <= c.opts.maxBlockRange + + if len(dirs) > 0 { + splitDirs = append(splitDirs, dirs) + } + + return splitDirs } func compactBlockMetas(blocks ...BlockMeta) (res BlockMeta) { diff --git a/compact_test.go b/compact_test.go new file mode 100644 index 0000000000..cfe2494b06 --- /dev/null +++ b/compact_test.go @@ -0,0 +1,232 @@ +package tsdb + +import ( + "sort" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCompactionSelect(t *testing.T) { + opts := &compactorOptions{ + blockRanges: []int64{ + 20, + 60, + 240, + 720, + 2160, + }, + } + + type dirMetaSimple struct { + dir string + tr []int64 + } + + cases := []struct { + blocks []dirMetaSimple + planned [][]string + }{ + { + blocks: []dirMetaSimple{ + { + dir: "1", + tr: []int64{0, 20}, + }, + }, + planned: nil, + }, + { + blocks: []dirMetaSimple{ + { + dir: "1", + tr: []int64{0, 20}, + }, + { + dir: "2", + tr: []int64{20, 40}, + }, + { + dir: "3", + tr: []int64{40, 60}, + }, + }, + planned: [][]string{{"1", "2", "3"}}, + }, + { + blocks: []dirMetaSimple{ + { + dir: "1", + tr: []int64{0, 20}, + }, + { + dir: "2", + tr: []int64{20, 40}, + }, + { + dir: "3", + tr: []int64{40, 60}, + }, + { + dir: "4", + tr: []int64{60, 120}, + }, + { + dir: "5", + tr: []int64{120, 180}, + }, + }, + planned: [][]string{{"1", "2", "3"}}, // We still need 0-60 to compact 0-240 + }, + { + blocks: []dirMetaSimple{ + { + dir: "1", + tr: []int64{0, 20}, + }, + { + dir: "2", + tr: []int64{20, 40}, + }, + { + dir: "3", + tr: []int64{40, 60}, + }, + { + dir: "4", + tr: []int64{60, 120}, + }, + { + dir: "5", + tr: []int64{120, 180}, + }, + { + dir: "6", + tr: []int64{720, 960}, + }, + { + dir: "7", + tr: []int64{1200, 1440}, + }, + }, + planned: [][]string{{"6", "7"}}, + }, + { + blocks: []dirMetaSimple{ + { + dir: "1", + tr: []int64{0, 20}, + }, + { + dir: "2", + tr: []int64{60, 80}, + }, + { + dir: "3", + tr: []int64{80, 100}, + }, + }, + planned: [][]string{{"2", "3"}}, + }, + } + + c := &compactor{ + opts: opts, + } + sliceDirs := func(dms []dirMeta) [][]string { + if len(dms) == 0 { + return nil + } + var res []string + for _, dm := range dms { + res = append(res, dm.dir) + } + return [][]string{res} + } + + dmFromSimple := func(dms []dirMetaSimple) []dirMeta { + dirs := make([]dirMeta, 0, len(dms)) + for _, dir := range dms { + dirs = append(dirs, dirMeta{ + dir: dir.dir, + meta: &BlockMeta{ + MinTime: dir.tr[0], + MaxTime: dir.tr[1], + }, + }) + } + + return dirs + } + + for _, tc := range cases { + require.Equal(t, tc.planned, sliceDirs(c.selectDirs(dmFromSimple(tc.blocks)))) + } +} + +func TestSplitByRange(t *testing.T) { + splitterFunc := func(ds []dirMeta, tr int64) [][]dirMeta { + rMap := make(map[int64][]dirMeta) + for _, dir := range ds { + t0 := dir.meta.MinTime - dir.meta.MinTime%tr + if intervalContains(t0, t0+tr, dir.meta.MinTime) && intervalContains(t0, t0+tr, dir.meta.MaxTime) { + rMap[t0] = append(rMap[t0], dir) + } + } + res := make([][]dirMeta, 0, len(rMap)) + for _, v := range rMap { + res = append(res, v) + } + + sort.Slice(res, func(i, j int) bool { + return res[i][0].meta.MinTime < res[j][0].meta.MinTime + }) + + return res + } + + cases := []struct { + trange int64 + ranges [][]int64 + output [][][]int64 + }{ + { + trange: 60, + ranges: [][]int64{{0, 10}}, + }, + { + trange: 60, + ranges: [][]int64{{0, 60}}, + }, + { + trange: 60, + ranges: [][]int64{{0, 10}, {30, 60}}, + }, + { + trange: 60, + ranges: [][]int64{{0, 10}, {60, 90}}, + }, + { + trange: 60, + ranges: [][]int64{{0, 10}, {20, 30}, {90, 120}}, + }, + { + trange: 60, + ranges: [][]int64{{0, 10}, {59, 60}, {60, 120}, {120, 180}, {190, 200}, {200, 210}, {220, 239}}, + }, + } + + for _, c := range cases { + blocks := make([]dirMeta, 0, len(c.ranges)) + for _, r := range c.ranges { + blocks = append(blocks, dirMeta{ + meta: &BlockMeta{ + MinTime: r[0], + MaxTime: r[1], + }, + }) + } + + require.Equal(t, splitterFunc(blocks, c.trange), splitByRange(blocks, c.trange)) + } +} diff --git a/db.go b/db.go index 17f3e5f5f6..9bcd688c8c 100644 --- a/db.go +++ b/db.go @@ -46,7 +46,6 @@ var DefaultOptions = &Options{ WALFlushInterval: 5 * time.Second, RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds - MaxBlockDuration: 24 * 60 * 60 * 1000, // 1 days in milliseconds NoLockfile: false, } @@ -62,9 +61,6 @@ type Options struct { // It's the minimum duration of any persisted block. MinBlockDuration uint64 - // The maximum timestamp range of compacted blocks. - MaxBlockDuration uint64 - // NoLockfile disables creation and consideration of a lock file. NoLockfile bool } @@ -227,9 +223,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db db.lockf = &lockf } - db.compactor = newCompactor(dir, r, l, &compactorOptions{ - maxBlockRange: opts.MaxBlockDuration, - }) + db.compactor = newCompactor(dir, r, l, &compactorOptions{}) if err := db.reloadBlocks(); err != nil { return nil, err