diff --git a/block.go b/block.go index 36db476801..4cb3c048fb 100644 --- a/block.go +++ b/block.go @@ -77,6 +77,7 @@ type BlockMetaCompaction struct { Level int `json:"level"` // ULIDs of all source head blocks that went into the block. Sources []ulid.ULID `json:"sources,omitempty"` + Failed bool `json:"failed,omitempty"` } const ( @@ -142,10 +143,9 @@ type Block struct { dir string meta BlockMeta - chunkr ChunkReader - indexr IndexReader - - tombstones tombstoneReader + chunkr ChunkReader + indexr IndexReader + tombstones TombstoneReader } // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used @@ -245,6 +245,11 @@ func (pb *Block) Tombstones() (TombstoneReader, error) { return blockTombstoneReader{TombstoneReader: pb.tombstones, b: pb}, nil } +func (pb *Block) setCompactionFailed() error { + pb.meta.Compaction.Failed = true + return writeMetaFile(pb.dir, &pb.meta) +} + type blockIndexReader struct { IndexReader b *Block @@ -284,13 +289,15 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error { return ErrClosing } - pr := newPostingsReader(pb.indexr) - p, absent := pr.Select(ms...) + p, absent, err := PostingsForMatchers(pb.indexr, ms...) + if err != nil { + return errors.Wrap(err, "select series") + } ir := pb.indexr // Choose only valid postings which have chunks in the time-range. - stones := map[uint64]Intervals{} + stones := memTombstones{} var lset labels.Labels var chks []ChunkMeta @@ -322,27 +329,42 @@ Outer: return p.Err() } - // Merge the current and new tombstones. - for k, v := range stones { - pb.tombstones.add(k, v[0]) + err = pb.tombstones.Iter(func(id uint64, ivs Intervals) error { + for _, iv := range ivs { + stones.add(id, iv) + pb.meta.Stats.NumTombstones++ + } + return nil + }) + if err != nil { + return err } + pb.tombstones = stones if err := writeTombstoneFile(pb.dir, pb.tombstones); err != nil { return err } - - pb.meta.Stats.NumTombstones = uint64(len(pb.tombstones)) return writeMetaFile(pb.dir, &pb.meta) } // CleanTombstones will rewrite the block if there any tombstones to remove them // and returns if there was a re-write. func (pb *Block) CleanTombstones(dest string, c Compactor) (bool, error) { - if len(pb.tombstones) == 0 { + numStones := 0 + + pb.tombstones.Iter(func(id uint64, ivs Intervals) error { + for _ = range ivs { + numStones++ + } + + return nil + }) + + if numStones == 0 { return false, nil } - if err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime); err != nil { + if _, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime); err != nil { return false, err } diff --git a/block_test.go b/block_test.go index e75d4ac3f2..664f743f61 100644 --- a/block_test.go +++ b/block_test.go @@ -12,3 +12,43 @@ // limitations under the License. package tsdb + +import ( + "io/ioutil" + "os" + "testing" +) + +func TestSetCompactionFailed(t *testing.T) { + tmpdir, err := ioutil.TempDir("", "test-tsdb") + Ok(t, err) + + b := createEmptyBlock(t, tmpdir) + + Equals(t, false, b.meta.Compaction.Failed) + Ok(t, b.setCompactionFailed()) + Equals(t, true, b.meta.Compaction.Failed) + Ok(t, b.Close()) + + b, err = OpenBlock(tmpdir, nil) + Ok(t, err) + Equals(t, true, b.meta.Compaction.Failed) +} + +func createEmptyBlock(t *testing.T, dir string) *Block { + Ok(t, os.MkdirAll(dir, 0777)) + + Ok(t, writeMetaFile(dir, &BlockMeta{})) + + ir, err := newIndexWriter(dir) + Ok(t, err) + Ok(t, ir.Close()) + + Ok(t, os.MkdirAll(chunkDir(dir), 0777)) + + Ok(t, writeTombstoneFile(dir, EmptyTombstoneReader())) + + b, err := OpenBlock(dir, nil) + Ok(t, err) + return b +} diff --git a/compact.go b/compact.go index 955ba3cafb..abc42aaea4 100644 --- a/compact.go +++ b/compact.go @@ -52,7 +52,7 @@ type Compactor interface { Plan(dir string) ([]string, error) // Write persists a Block into a directory. - Write(dest string, b BlockReader, mint, maxt int64) error + Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) // Compact runs compaction against the provided directories. Must // only be called concurrently with results of Plan(). @@ -205,7 +205,15 @@ func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta { continue } + Outer: for _, p := range parts { + // Donot select the range if it has a block whose compaction failed. + for _, dm := range p { + if dm.meta.Compaction.Failed { + continue Outer + } + } + mint := p[0].meta.MinTime maxt := p[len(p)-1].meta.MaxTime // Pick the range of blocks if it spans the full range (potentially with gaps) @@ -297,6 +305,7 @@ func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta { // provided directories. func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) { var blocks []BlockReader + var bs []*Block var metas []*BlockMeta for _, d := range dirs { @@ -313,15 +322,30 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) { metas = append(metas, meta) blocks = append(blocks, b) + bs = append(bs, b) } entropy := rand.New(rand.NewSource(time.Now().UnixNano())) uid := ulid.MustNew(ulid.Now(), entropy) - return c.write(dest, compactBlockMetas(uid, metas...), blocks...) + err = c.write(dest, compactBlockMetas(uid, metas...), blocks...) + if err == nil { + return nil + } + + var merr MultiError + merr.Add(err) + + for _, b := range bs { + if err := b.setCompactionFailed(); err != nil { + merr.Add(errors.Wrapf(err, "setting compaction failed for block: %s", b.Dir())) + } + } + + return merr } -func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) error { +func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) { entropy := rand.New(rand.NewSource(time.Now().UnixNano())) uid := ulid.MustNew(ulid.Now(), entropy) @@ -333,7 +357,7 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) e meta.Compaction.Level = 1 meta.Compaction.Sources = []ulid.ULID{uid} - return c.write(dest, meta, b) + return uid, c.write(dest, meta, b) } // instrumentedChunkWriter is used for level 1 compactions to record statistics @@ -360,17 +384,21 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...ChunkMeta) error { func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) { level.Info(c.logger).Log("msg", "compact blocks", "count", len(blocks), "mint", meta.MinTime, "maxt", meta.MaxTime) + dir := filepath.Join(dest, meta.ULID.String()) + tmp := dir + ".tmp" + defer func(t time.Time) { if err != nil { c.metrics.failed.Inc() + // TODO(gouthamve): Handle error how? + if err := os.RemoveAll(tmp); err != nil { + level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error()) + } } c.metrics.ran.Inc() c.metrics.duration.Observe(time.Since(t).Seconds()) }(time.Now()) - dir := filepath.Join(dest, meta.ULID.String()) - tmp := dir + ".tmp" - if err = os.RemoveAll(tmp); err != nil { return err } @@ -418,7 +446,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe } // Create an empty tombstones file. - if err := writeTombstoneFile(tmp, newEmptyTombstoneReader()); err != nil { + if err := writeTombstoneFile(tmp, EmptyTombstoneReader()); err != nil { return errors.Wrap(err, "write new tombstones file") } @@ -453,7 +481,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe // of the provided blocks. It returns meta information for the new block. func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error { var ( - set compactionSet + set ChunkSeriesSet allSymbols = make(map[string]struct{}, 1<<16) closers = []io.Closer{} ) @@ -589,7 +617,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } } - for l := range postings.m { + for _, l := range postings.sortedKeys() { if err := indexw.WritePostings(l.Name, l.Value, postings.get(l.Name, l.Value)); err != nil { return errors.Wrap(err, "write postings") } @@ -597,18 +625,11 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, return nil } -type compactionSet interface { - Next() bool - At() (labels.Labels, []ChunkMeta, Intervals) - Err() error -} - type compactionSeriesSet struct { p Postings index IndexReader chunks ChunkReader tombstones TombstoneReader - series SeriesSet l labels.Labels c []ChunkMeta @@ -631,7 +652,11 @@ func (c *compactionSeriesSet) Next() bool { } var err error - c.intervals = c.tombstones.Get(c.p.At()) + c.intervals, err = c.tombstones.Get(c.p.At()) + if err != nil { + c.err = errors.Wrap(err, "get tombstones") + return false + } if err = c.index.Series(c.p.At(), &c.l, &c.c); err != nil { c.err = errors.Wrapf(err, "get series %d", c.p.At()) @@ -675,7 +700,7 @@ func (c *compactionSeriesSet) At() (labels.Labels, []ChunkMeta, Intervals) { } type compactionMerger struct { - a, b compactionSet + a, b ChunkSeriesSet aok, bok bool l labels.Labels @@ -688,7 +713,7 @@ type compactionSeries struct { chunks []*ChunkMeta } -func newCompactionMerger(a, b compactionSet) (*compactionMerger, error) { +func newCompactionMerger(a, b ChunkSeriesSet) (*compactionMerger, error) { c := &compactionMerger{ a: a, b: b, diff --git a/compact_test.go b/compact_test.go index d1650cd16a..a8ae48fce0 100644 --- a/compact_test.go +++ b/compact_test.go @@ -14,8 +14,13 @@ package tsdb import ( + "io/ioutil" + "os" + "path/filepath" "testing" + "github.com/go-kit/kit/log" + "github.com/pkg/errors" "github.com/stretchr/testify/require" ) @@ -157,17 +162,6 @@ func TestLeveledCompactor_plan(t *testing.T) { }, nil) require.NoError(t, err) - metaRange := func(name string, mint, maxt int64, stats *BlockStats) dirMeta { - meta := &BlockMeta{MinTime: mint, MaxTime: maxt} - if stats != nil { - meta.Stats = *stats - } - return dirMeta{ - dir: name, - meta: meta, - } - } - cases := []struct { metas []dirMeta expected []string @@ -274,3 +268,85 @@ func TestLeveledCompactor_plan(t *testing.T) { require.Equal(t, c.expected, res, "test case %d", i) } } + +func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) { + compactor, err := NewLeveledCompactor(nil, nil, []int64{ + 20, + 60, + 240, + 720, + 2160, + }, nil) + Ok(t, err) + + cases := []struct { + metas []dirMeta + }{ + { + metas: []dirMeta{ + metaRange("1", 0, 20, nil), + metaRange("2", 20, 40, nil), + metaRange("3", 40, 60, nil), + }, + }, + { + metas: []dirMeta{ + metaRange("1", 0, 20, nil), + metaRange("2", 20, 40, nil), + metaRange("3", 60, 80, nil), + }, + }, + { + metas: []dirMeta{ + metaRange("1", 0, 20, nil), + metaRange("2", 20, 40, nil), + metaRange("3", 40, 60, nil), + metaRange("4", 60, 120, nil), + metaRange("5", 120, 180, nil), + }, + }, + } + + for _, c := range cases { + c.metas[1].meta.Compaction.Failed = true + res, err := compactor.plan(c.metas) + Ok(t, err) + + Equals(t, []string(nil), res) + } +} + +func TestCompactionFailWillCleanUpTempDir(t *testing.T) { + compactor, err := NewLeveledCompactor(nil, log.NewNopLogger(), []int64{ + 20, + 60, + 240, + 720, + 2160, + }, nil) + Ok(t, err) + + tmpdir, err := ioutil.TempDir("", "test") + Ok(t, err) + + NotOk(t, compactor.write(tmpdir, &BlockMeta{}, erringBReader{})) + _, err = os.Stat(filepath.Join(tmpdir, BlockMeta{}.ULID.String()) + ".tmp") + Assert(t, os.IsNotExist(err), "directory is not cleaned up") +} + +func metaRange(name string, mint, maxt int64, stats *BlockStats) dirMeta { + meta := &BlockMeta{MinTime: mint, MaxTime: maxt} + if stats != nil { + meta.Stats = *stats + } + return dirMeta{ + dir: name, + meta: meta, + } +} + +type erringBReader struct{} + +func (erringBReader) Index() (IndexReader, error) { return nil, errors.New("index") } +func (erringBReader) Chunks() (ChunkReader, error) { return nil, errors.New("chunks") } +func (erringBReader) Tombstones() (TombstoneReader, error) { return nil, errors.New("tombstones") } diff --git a/db.go b/db.go index 47afb4c134..1264cff49c 100644 --- a/db.go +++ b/db.go @@ -283,16 +283,23 @@ func (db *DB) retentionCutoff() (bool, error) { } db.mtx.RLock() - defer db.mtx.RUnlock() + blocks := db.blocks[:] + db.mtx.RUnlock() - if len(db.blocks) == 0 { + if len(blocks) == 0 { return false, nil } - last := db.blocks[len(db.blocks)-1] + last := blocks[len(db.blocks)-1] + mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration) + dirs, err := retentionCutoffDirs(db.dir, mint) + if err != nil { + return false, err + } - return retentionCutoff(db.dir, mint) + // This will close the dirs and then delete the dirs. + return len(dirs) > 0, db.reload(dirs...) } // Appender opens a new appender against the database. @@ -350,7 +357,7 @@ func (db *DB) compact() (changes bool, err error) { mint: mint, maxt: maxt, } - if err = db.compactor.Write(db.dir, head, mint, maxt); err != nil { + if _, err = db.compactor.Write(db.dir, head, mint, maxt); err != nil { return changes, errors.Wrap(err, "persist head block") } changes = true @@ -394,40 +401,37 @@ func (db *DB) compact() (changes bool, err error) { return changes, nil } -// retentionCutoff deletes all directories of blocks in dir that are strictly +// retentionCutoffDirs returns all directories of blocks in dir that are strictly // before mint. -func retentionCutoff(dir string, mint int64) (bool, error) { +func retentionCutoffDirs(dir string, mint int64) ([]string, error) { df, err := fileutil.OpenDir(dir) if err != nil { - return false, errors.Wrapf(err, "open directory") + return nil, errors.Wrapf(err, "open directory") } defer df.Close() dirs, err := blockDirs(dir) if err != nil { - return false, errors.Wrapf(err, "list block dirs %s", dir) + return nil, errors.Wrapf(err, "list block dirs %s", dir) } - changes := false + delDirs := []string{} for _, dir := range dirs { meta, err := readMetaFile(dir) if err != nil { - return changes, errors.Wrapf(err, "read block meta %s", dir) + return nil, errors.Wrapf(err, "read block meta %s", dir) } // The first block we encounter marks that we crossed the boundary // of deletable blocks. if meta.MaxTime >= mint { break } - changes = true - if err := os.RemoveAll(dir); err != nil { - return changes, err - } + delDirs = append(delDirs, dir) } - return changes, fileutil.Fsync(df) + return delDirs, nil } func (db *DB) getBlock(id ulid.ULID) (*Block, bool) { @@ -621,7 +625,8 @@ func (db *DB) Snapshot(dir string) error { return errors.Wrap(err, "error snapshotting headblock") } } - return db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime()) + _, err := db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime()) + return errors.Wrap(err, "snapshot head block") } // Querier returns a new querier over the data partition for the given time range. diff --git a/db_test.go b/db_test.go index b4bb0c9bce..ff20963013 100644 --- a/db_test.go +++ b/db_test.go @@ -27,17 +27,21 @@ import ( ) func openTestDB(t testing.TB, opts *Options) (db *DB, close func()) { - tmpdir, _ := ioutil.TempDir("", "test") + tmpdir, err := ioutil.TempDir("", "test") + Ok(t, err) - db, err := Open(tmpdir, nil, nil, opts) + db, err = Open(tmpdir, nil, nil, opts) require.NoError(t, err) // Do not close the test database by default as it will deadlock on test failures. return db, func() { os.RemoveAll(tmpdir) } } -// Convert a SeriesSet into a form useable with reflect.DeepEqual. -func readSeriesSet(t testing.TB, ss SeriesSet) map[string][]sample { +// query runs a matcher query against the querier and fully expands its data. +func query(t testing.TB, q Querier, matchers ...labels.Matcher) map[string][]sample { + ss, err := q.Select(matchers...) + Ok(t, err) + result := map[string][]sample{} for ss.Next() { @@ -49,12 +53,12 @@ func readSeriesSet(t testing.TB, ss SeriesSet) map[string][]sample { t, v := it.At() samples = append(samples, sample{t: t, v: v}) } - require.NoError(t, it.Err()) + Ok(t, it.Err()) name := series.Labels().String() result[name] = samples } - require.NoError(t, ss.Err()) + Ok(t, ss.Err()) return result } @@ -70,7 +74,7 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) { querier, err := db.Querier(0, 1) require.NoError(t, err) - seriesSet := readSeriesSet(t, querier.Select(labels.NewEqualMatcher("foo", "bar"))) + seriesSet := query(t, querier, labels.NewEqualMatcher("foo", "bar")) require.Equal(t, seriesSet, map[string][]sample{}) require.NoError(t, querier.Close()) @@ -82,7 +86,7 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) { require.NoError(t, err) defer querier.Close() - seriesSet = readSeriesSet(t, querier.Select(labels.NewEqualMatcher("foo", "bar"))) + seriesSet = query(t, querier, labels.NewEqualMatcher("foo", "bar")) require.Equal(t, seriesSet, map[string][]sample{`{foo="bar"}`: []sample{{t: 0, v: 0}}}) } @@ -102,7 +106,7 @@ func TestDataNotAvailableAfterRollback(t *testing.T) { require.NoError(t, err) defer querier.Close() - seriesSet := readSeriesSet(t, querier.Select(labels.NewEqualMatcher("foo", "bar"))) + seriesSet := query(t, querier, labels.NewEqualMatcher("foo", "bar")) require.Equal(t, seriesSet, map[string][]sample{}) } @@ -146,7 +150,7 @@ func TestDBAppenderAddRef(t *testing.T) { q, err := db.Querier(0, 200) require.NoError(t, err) - res := readSeriesSet(t, q.Select(labels.NewEqualMatcher("a", "b"))) + res := query(t, q, labels.NewEqualMatcher("a", "b")) require.Equal(t, map[string][]sample{ labels.FromStrings("a", "b").String(): []sample{ @@ -198,7 +202,8 @@ Outer: q, err := db.Querier(0, numSamples) require.NoError(t, err) - res := q.Select(labels.NewEqualMatcher("a", "b")) + res, err := q.Select(labels.NewEqualMatcher("a", "b")) + require.NoError(t, err) expSamples := make([]sample, 0, len(c.remaint)) for _, ts := range c.remaint { @@ -294,8 +299,7 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) { q, err := db.Querier(0, 10) require.NoError(t, err) - ss := q.Select(labels.NewEqualMatcher("a", "b")) - ssMap := readSeriesSet(t, ss) + ssMap := query(t, q, labels.NewEqualMatcher("a", "b")) require.Equal(t, map[string][]sample{ labels.New(labels.Label{"a", "b"}).String(): []sample{{0, 1}}, @@ -314,8 +318,7 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) { q, err = db.Querier(0, 10) require.NoError(t, err) - ss = q.Select(labels.NewEqualMatcher("a", "b")) - ssMap = readSeriesSet(t, ss) + ssMap = query(t, q, labels.NewEqualMatcher("a", "b")) require.Equal(t, map[string][]sample{ labels.New(labels.Label{"a", "b"}).String(): []sample{{0, 1}, {10, 3}}, @@ -352,7 +355,9 @@ func TestDB_Snapshot(t *testing.T) { defer querier.Close() // sum values - seriesSet := querier.Select(labels.NewEqualMatcher("foo", "bar")) + seriesSet, err := querier.Select(labels.NewEqualMatcher("foo", "bar")) + require.NoError(t, err) + sum := 0.0 for seriesSet.Next() { series := seriesSet.At().Iterator() @@ -500,7 +505,8 @@ func TestDB_e2e(t *testing.T) { q, err := db.Querier(mint, maxt) require.NoError(t, err) - ss := q.Select(qry.ms...) + ss, err := q.Select(qry.ms...) + require.NoError(t, err) result := map[string][]sample{} @@ -526,7 +532,8 @@ func TestDB_e2e(t *testing.T) { } func TestWALFlushedOnDBClose(t *testing.T) { - tmpdir, _ := ioutil.TempDir("", "test") + tmpdir, err := ioutil.TempDir("", "test") + Ok(t, err) defer os.RemoveAll(tmpdir) db, err := Open(tmpdir, nil, nil, nil) @@ -601,7 +608,8 @@ func TestTombstoneClean(t *testing.T) { q, err := db.Querier(0, numSamples) require.NoError(t, err) - res := q.Select(labels.NewEqualMatcher("a", "b")) + res, err := q.Select(labels.NewEqualMatcher("a", "b")) + require.NoError(t, err) expSamples := make([]sample, 0, len(c.remaint)) for _, ts := range c.remaint { @@ -637,7 +645,64 @@ func TestTombstoneClean(t *testing.T) { } for _, b := range db.blocks { - Equals(t, 0, len(b.tombstones)) + Equals(t, emptyTombstoneReader, b.tombstones) } } } + +func TestDB_Retention(t *testing.T) { + tmpdir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(tmpdir) + + db, err := Open(tmpdir, nil, nil, nil) + require.NoError(t, err) + + lbls := labels.Labels{labels.Label{Name: "labelname", Value: "labelvalue"}} + + app := db.Appender() + _, err = app.Add(lbls, 0, 1) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + // create snapshot to make it create a block. + // TODO(gouthamve): Add a method to compact headblock. + snap, err := ioutil.TempDir("", "snap") + require.NoError(t, err) + require.NoError(t, db.Snapshot(snap)) + require.NoError(t, db.Close()) + defer os.RemoveAll(snap) + + // reopen DB from snapshot + db, err = Open(snap, nil, nil, nil) + require.NoError(t, err) + + Equals(t, 1, len(db.blocks)) + + app = db.Appender() + _, err = app.Add(lbls, 100, 1) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + // Snapshot again to create another block. + snap, err = ioutil.TempDir("", "snap") + require.NoError(t, err) + require.NoError(t, db.Snapshot(snap)) + require.NoError(t, db.Close()) + defer os.RemoveAll(snap) + + // reopen DB from snapshot + db, err = Open(snap, nil, nil, &Options{ + RetentionDuration: 10, + BlockRanges: []int64{50}, + }) + require.NoError(t, err) + + Equals(t, 2, len(db.blocks)) + + // Now call rentention. + changes, err := db.retentionCutoff() + Ok(t, err) + Assert(t, changes, "there should be changes") + Equals(t, 1, len(db.blocks)) + Equals(t, int64(100), db.blocks[0].meta.MaxTime) // To verify its the right block. +} diff --git a/head.go b/head.go index bd22148d16..d149cb1d9c 100644 --- a/head.go +++ b/head.go @@ -66,7 +66,7 @@ type Head struct { postings *memPostings // postings lists for terms - tombstones tombstoneReader + tombstones memTombstones } type headMetrics struct { @@ -186,7 +186,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( values: map[string]stringset{}, symbols: map[string]struct{}{}, postings: newUnorderedMemPostings(), - tombstones: newEmptyTombstoneReader(), + tombstones: memTombstones{}, } h.metrics = newHeadMetrics(h, r) @@ -574,8 +574,10 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { ir := h.indexRange(mint, maxt) - pr := newPostingsReader(ir) - p, absent := pr.Select(ms...) + p, absent, err := PostingsForMatchers(ir, ms...) + if err != nil { + return errors.Wrap(err, "select series") + } var stones []Stone diff --git a/head_test.go b/head_test.go index a265d5b343..c42c19b5e5 100644 --- a/head_test.go +++ b/head_test.go @@ -318,7 +318,7 @@ func TestHeadDeleteSimple(t *testing.T) { Outer: for _, c := range cases { // Reset the tombstones. - head.tombstones = newEmptyTombstoneReader() + head.tombstones = memTombstones{} // Delete the ranges. for _, r := range c.intervals { @@ -328,7 +328,8 @@ Outer: // Compare the result. q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime()) require.NoError(t, err) - res := q.Select(labels.NewEqualMatcher("a", "b")) + res, err := q.Select(labels.NewEqualMatcher("a", "b")) + require.NoError(t, err) expSamples := make([]sample, 0, len(c.remaint)) for _, ts := range c.remaint { diff --git a/postings.go b/postings.go index 2647f4dd8e..1ebc7c5769 100644 --- a/postings.go +++ b/postings.go @@ -50,6 +50,25 @@ func newUnorderedMemPostings() *memPostings { } } +// sortedKeys returns a list of sorted label keys of the postings. +func (p *memPostings) sortedKeys() []labels.Label { + p.mtx.RLock() + keys := make([]labels.Label, 0, len(p.m)) + + for l := range p.m { + keys = append(keys, l) + } + p.mtx.RUnlock() + + sort.Slice(keys, func(i, j int) bool { + if d := strings.Compare(keys[i].Name, keys[j].Name); d != 0 { + return d < 0 + } + return keys[i].Value < keys[j].Value + }) + return keys +} + // Postings returns an iterator over the postings list for s. func (p *memPostings) get(name, value string) Postings { p.mtx.RLock() @@ -165,6 +184,11 @@ func (e errPostings) Err() error { return e.err } var emptyPostings = errPostings{} +// EmptyPostings returns a postings list that's always empty. +func EmptyPostings() Postings { + return emptyPostings +} + // Intersect returns a new postings list over the intersection of the // input postings. func Intersect(its ...Postings) Postings { diff --git a/querier.go b/querier.go index ed8b64ceac..37672c7156 100644 --- a/querier.go +++ b/querier.go @@ -27,7 +27,7 @@ import ( // time range. type Querier interface { // Select returns a set of series that matches the given label matchers. - Select(...labels.Matcher) SeriesSet + Select(...labels.Matcher) (SeriesSet, error) // LabelValues returns all potential values for a label name. LabelValues(string) ([]string, error) @@ -81,20 +81,29 @@ func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) { return nil, fmt.Errorf("not implemented") } -func (q *querier) Select(ms ...labels.Matcher) SeriesSet { +func (q *querier) Select(ms ...labels.Matcher) (SeriesSet, error) { return q.sel(q.blocks, ms) } -func (q *querier) sel(qs []Querier, ms []labels.Matcher) SeriesSet { +func (q *querier) sel(qs []Querier, ms []labels.Matcher) (SeriesSet, error) { if len(qs) == 0 { - return nopSeriesSet{} + return EmptySeriesSet(), nil } if len(qs) == 1 { return qs[0].Select(ms...) } l := len(qs) / 2 - return newMergedSeriesSet(q.sel(qs[:l], ms), q.sel(qs[l:], ms)) + + a, err := q.sel(qs[:l], ms) + if err != nil { + return nil, err + } + b, err := q.sel(qs[l:], ms) + if err != nil { + return nil, err + } + return newMergedSeriesSet(a, b), nil } func (q *querier) Close() error { @@ -141,20 +150,14 @@ type blockQuerier struct { mint, maxt int64 } -func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet { - pr := newPostingsReader(q.index) - - p, absent := pr.Select(ms...) - +func (q *blockQuerier) Select(ms ...labels.Matcher) (SeriesSet, error) { + base, err := LookupChunkSeries(q.index, q.tombstones, ms...) + if err != nil { + return nil, err + } return &blockSeriesSet{ set: &populatedChunkSeries{ - set: &baseChunkSeries{ - p: p, - index: q.index, - absent: absent, - - tombstones: q.tombstones, - }, + set: base, chunks: q.chunks, mint: q.mint, maxt: q.maxt, @@ -162,7 +165,7 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet { mint: q.mint, maxt: q.maxt, - } + }, nil } func (q *blockQuerier) LabelValues(name string) ([]string, error) { @@ -196,16 +199,10 @@ func (q *blockQuerier) Close() error { return merr.Err() } -// postingsReader is used to select matching postings from an IndexReader. -type postingsReader struct { - index IndexReader -} - -func newPostingsReader(i IndexReader) *postingsReader { - return &postingsReader{index: i} -} - -func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string) { +// PostingsForMatchers assembles a single postings iterator against the index reader +// based on the given matchers. It returns a list of label names that must be manually +// checked to not exist in series the postings list points to. +func PostingsForMatchers(index IndexReader, ms ...labels.Matcher) (Postings, []string, error) { var ( its []Postings absent []string @@ -217,12 +214,13 @@ func (r *postingsReader) Select(ms ...labels.Matcher) (Postings, []string) { absent = append(absent, m.Name()) continue } - its = append(its, r.selectSingle(m)) + it, err := postingsForMatcher(index, m) + if err != nil { + return nil, nil, err + } + its = append(its, it) } - - p := Intersect(its...) - - return r.index.SortedPostings(p), absent + return index.SortedPostings(Intersect(its...)), absent, nil } // tuplesByPrefix uses binary search to find prefix matches within ts. @@ -256,33 +254,33 @@ func tuplesByPrefix(m *labels.PrefixMatcher, ts StringTuples) ([]string, error) return matches, nil } -func (r *postingsReader) selectSingle(m labels.Matcher) Postings { +func postingsForMatcher(index IndexReader, m labels.Matcher) (Postings, error) { // Fast-path for equal matching. if em, ok := m.(*labels.EqualMatcher); ok { - it, err := r.index.Postings(em.Name(), em.Value()) + it, err := index.Postings(em.Name(), em.Value()) if err != nil { - return errPostings{err: err} + return nil, err } - return it + return it, nil } - tpls, err := r.index.LabelValues(m.Name()) + tpls, err := index.LabelValues(m.Name()) if err != nil { - return errPostings{err: err} + return nil, err } var res []string if pm, ok := m.(*labels.PrefixMatcher); ok { res, err = tuplesByPrefix(pm, tpls) if err != nil { - return errPostings{err: err} + return nil, err } } else { for i := 0; i < tpls.Len(); i++ { vals, err := tpls.At(i) if err != nil { - return errPostings{err: err} + return nil, err } if m.Matches(vals[0]) { res = append(res, vals[0]) @@ -291,20 +289,20 @@ func (r *postingsReader) selectSingle(m labels.Matcher) Postings { } if len(res) == 0 { - return emptyPostings + return EmptyPostings(), nil } var rit []Postings for _, v := range res { - it, err := r.index.Postings(m.Name(), v) + it, err := index.Postings(m.Name(), v) if err != nil { - return errPostings{err: err} + return nil, err } rit = append(rit, it) } - return Merge(rit...) + return Merge(rit...), nil } func mergeStrings(a, b []string) []string { @@ -342,11 +340,12 @@ type SeriesSet interface { Err() error } -type nopSeriesSet struct{} +var emptySeriesSet = errSeriesSet{} -func (nopSeriesSet) Next() bool { return false } -func (nopSeriesSet) At() Series { return nil } -func (nopSeriesSet) Err() error { return nil } +// EmptySeriesSet returns a series set that's always empty. +func EmptySeriesSet() SeriesSet { + return emptySeriesSet +} // mergedSeriesSet takes two series sets as a single series set. The input series sets // must be sorted and sequential in time, i.e. if they have the same label set, @@ -418,7 +417,7 @@ func (s *mergedSeriesSet) Next() bool { return true } -type chunkSeriesSet interface { +type ChunkSeriesSet interface { Next() bool At() (labels.Labels, []ChunkMeta, Intervals) Err() error @@ -438,6 +437,24 @@ type baseChunkSeries struct { err error } +// LookupChunkSeries retrieves all series for the given matchers and returns a ChunkSeriesSet +// over them. It drops chunks based on tombstones in the given reader. +func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher) (ChunkSeriesSet, error) { + if tr == nil { + tr = EmptyTombstoneReader() + } + p, absent, err := PostingsForMatchers(ir, ms...) + if err != nil { + return nil, err + } + return &baseChunkSeries{ + p: p, + index: ir, + tombstones: tr, + absent: absent, + }, nil +} + func (s *baseChunkSeries) At() (labels.Labels, []ChunkMeta, Intervals) { return s.lset, s.chks, s.intervals } @@ -448,6 +465,7 @@ func (s *baseChunkSeries) Next() bool { var ( lset labels.Labels chunks []ChunkMeta + err error ) Outer: for s.p.Next() { @@ -470,7 +488,11 @@ Outer: s.lset = lset s.chks = chunks - s.intervals = s.tombstones.Get(s.p.At()) + s.intervals, err = s.tombstones.Get(s.p.At()) + if err != nil { + s.err = errors.Wrap(err, "get tombstones") + return false + } if len(s.intervals) > 0 { // Only those chunks that are not entirely deleted. @@ -496,7 +518,7 @@ Outer: // with known chunk references. It filters out chunks that do not fit the // given time range. type populatedChunkSeries struct { - set chunkSeriesSet + set ChunkSeriesSet chunks ChunkReader mint, maxt int64 @@ -553,7 +575,7 @@ func (s *populatedChunkSeries) Next() bool { // blockSeriesSet is a set of series from an inverted index query. type blockSeriesSet struct { - set chunkSeriesSet + set ChunkSeriesSet err error cur Series diff --git a/querier_test.go b/querier_test.go index b6f95bf7ce..301a2b0467 100644 --- a/querier_test.go +++ b/querier_test.go @@ -454,13 +454,14 @@ Outer: querier := &blockQuerier{ index: ir, chunks: cr, - tombstones: newEmptyTombstoneReader(), + tombstones: EmptyTombstoneReader(), mint: c.mint, maxt: c.maxt, } - res := querier.Select(c.ms...) + res, err := querier.Select(c.ms...) + require.NoError(t, err) for { eok, rok := c.exp.Next(), res.Next() @@ -505,7 +506,7 @@ func TestBlockQuerierDelete(t *testing.T) { chunks [][]sample } - tombstones tombstoneReader + tombstones TombstoneReader queries []query }{ data: []struct { @@ -553,13 +554,11 @@ func TestBlockQuerierDelete(t *testing.T) { }, }, }, - tombstones: newTombstoneReader( - map[uint64]Intervals{ - 1: Intervals{{1, 3}}, - 2: Intervals{{1, 3}, {6, 10}}, - 3: Intervals{{6, 10}}, - }, - ), + tombstones: memTombstones{ + 1: Intervals{{1, 3}}, + 2: Intervals{{1, 3}, {6, 10}}, + 3: Intervals{{6, 10}}, + }, queries: []query{ { @@ -632,7 +631,8 @@ Outer: maxt: c.maxt, } - res := querier.Select(c.ms...) + res, err := querier.Select(c.ms...) + require.NoError(t, err) for { eok, rok := c.exp.Next(), res.Next() @@ -734,7 +734,7 @@ func TestBaseChunkSeries(t *testing.T) { bcs := &baseChunkSeries{ p: newListPostings(tc.postings), index: mi, - tombstones: newEmptyTombstoneReader(), + tombstones: EmptyTombstoneReader(), } i := 0 @@ -1228,7 +1228,7 @@ func BenchmarkMergedSeriesSet(b *testing.B) { sel = func(sets []SeriesSet) SeriesSet { if len(sets) == 0 { - return nopSeriesSet{} + return EmptySeriesSet() } if len(sets) == 1 { return sets[0] diff --git a/tombstones.go b/tombstones.go index d43cd0bd0c..8ca089e617 100644 --- a/tombstones.go +++ b/tombstones.go @@ -35,12 +35,17 @@ const ( // TombstoneReader gives access to tombstone intervals by series reference. type TombstoneReader interface { - Get(ref uint64) Intervals + // Get returns deletion intervals for the series with the given reference. + Get(ref uint64) (Intervals, error) + // Iter calls the given function for each encountered interval. + Iter(func(uint64, Intervals) error) error + + // Close any underlying resources Close() error } -func writeTombstoneFile(dir string, tr tombstoneReader) error { +func writeTombstoneFile(dir string, tr TombstoneReader) error { path := filepath.Join(dir, tombstoneFilename) tmp := path + ".tmp" hash := newCRC32() @@ -67,19 +72,21 @@ func writeTombstoneFile(dir string, tr tombstoneReader) error { mw := io.MultiWriter(f, hash) - for k, v := range tr { - for _, itv := range v { + tr.Iter(func(ref uint64, ivs Intervals) error { + for _, iv := range ivs { buf.reset() - buf.putUvarint64(k) - buf.putVarint64(itv.Mint) - buf.putVarint64(itv.Maxt) + + buf.putUvarint64(ref) + buf.putVarint64(iv.Mint) + buf.putVarint64(iv.Maxt) _, err = mw.Write(buf.get()) if err != nil { return err } } - } + return nil + }) _, err = f.Write(hash.Sum(nil)) if err != nil { @@ -100,7 +107,7 @@ type Stone struct { intervals Intervals } -func readTombstones(dir string) (tombstoneReader, error) { +func readTombstones(dir string) (memTombstones, error) { b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) if err != nil { return nil, err @@ -131,7 +138,8 @@ func readTombstones(dir string) (tombstoneReader, error) { return nil, errors.New("checksum did not match") } - stonesMap := newEmptyTombstoneReader() + stonesMap := memTombstones{} + for d.len() > 0 { k := d.uvarint64() mint := d.varint64() @@ -143,28 +151,36 @@ func readTombstones(dir string) (tombstoneReader, error) { stonesMap.add(k, Interval{mint, maxt}) } - return newTombstoneReader(stonesMap), nil + return stonesMap, nil } -type tombstoneReader map[uint64]Intervals +type memTombstones map[uint64]Intervals -func newTombstoneReader(ts map[uint64]Intervals) tombstoneReader { - return tombstoneReader(ts) +var emptyTombstoneReader = memTombstones{} + +// EmptyTombstoneReader returns a TombstoneReader that is always empty. +func EmptyTombstoneReader() TombstoneReader { + return emptyTombstoneReader } -func newEmptyTombstoneReader() tombstoneReader { - return tombstoneReader(make(map[uint64]Intervals)) +func (t memTombstones) Get(ref uint64) (Intervals, error) { + return t[ref], nil } -func (t tombstoneReader) Get(ref uint64) Intervals { - return t[ref] +func (t memTombstones) Iter(f func(uint64, Intervals) error) error { + for ref, ivs := range t { + if err := f(ref, ivs); err != nil { + return err + } + } + return nil } -func (t tombstoneReader) add(ref uint64, itv Interval) { +func (t memTombstones) add(ref uint64, itv Interval) { t[ref] = t[ref].add(itv) } -func (tombstoneReader) Close() error { +func (memTombstones) Close() error { return nil } diff --git a/tombstones_test.go b/tombstones_test.go index 9265b76b38..eb124fc941 100644 --- a/tombstones_test.go +++ b/tombstones_test.go @@ -29,7 +29,7 @@ func TestWriteAndReadbackTombStones(t *testing.T) { ref := uint64(0) - stones := make(map[uint64]Intervals) + stones := memTombstones{} // Generate the tombstones. for i := 0; i < 100; i++ { ref += uint64(rand.Int31n(10)) + 1 @@ -43,13 +43,13 @@ func TestWriteAndReadbackTombStones(t *testing.T) { stones[ref] = dranges } - require.NoError(t, writeTombstoneFile(tmpdir, newTombstoneReader(stones))) + require.NoError(t, writeTombstoneFile(tmpdir, stones)) restr, err := readTombstones(tmpdir) require.NoError(t, err) - exptr := newTombstoneReader(stones) + // Compare the two readers. - require.Equal(t, exptr, restr) + require.Equal(t, stones, restr) } func TestAddingNewIntervals(t *testing.T) {