From 306831f15100421e2c26841f380e62d7bb6bc96d Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 27 Feb 2017 10:46:15 +0100 Subject: [PATCH] Add per-block state ULID --- block.go | 31 ++++++++++++------------------- compact.go | 7 ++++++- db.go | 10 ++++++++++ head.go | 21 ++++++++++++++++++--- 4 files changed, 46 insertions(+), 23 deletions(-) diff --git a/block.go b/block.go index 3f92b0863d..ad369a4a1f 100644 --- a/block.go +++ b/block.go @@ -7,6 +7,7 @@ import ( "path/filepath" "sort" + "github.com/oklog/ulid" "github.com/pkg/errors" ) @@ -24,16 +25,15 @@ type Block interface { // Series returns a SeriesReader over the block's data. Chunks() ChunkReader - // Persisted returns whether the block is already persisted, - // and no longer being appended to. - Persisted() bool - // Close releases all underlying resources of the block. Close() error } // BlockMeta provides meta information about a block. type BlockMeta struct { + // Unique identifier for the block and its contents. Changes on compaction. + ULID ulid.ULID `json:"ulid"` + // Sequence number of the block. Sequence int `json:"sequence"` @@ -118,7 +118,7 @@ func newPersistedBlock(dir string) (*persistedBlock, error) { return nil, err } - cr, err := newChunkReader(filepath.Join(dir, "chunks")) + cr, err := newChunkReader(chunkDir(dir)) if err != nil { return nil, err } @@ -137,28 +137,21 @@ func newPersistedBlock(dir string) (*persistedBlock, error) { } func (pb *persistedBlock) Close() error { - err0 := pb.chunkr.Close() - err1 := pb.indexr.Close() + var merr MultiError - if err0 != nil { - return err0 - } - return err1 + merr.Add(pb.chunkr.Close()) + merr.Add(pb.indexr.Close()) + + return merr.Err() } func (pb *persistedBlock) Dir() string { return pb.dir } -func (pb *persistedBlock) Persisted() bool { return true } func (pb *persistedBlock) Index() IndexReader { return pb.indexr } func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr } func (pb *persistedBlock) Meta() BlockMeta { return pb.meta } -func chunksFileName(path string) string { - return filepath.Join(path, "chunks-000") -} - -func indexFileName(path string) string { - return filepath.Join(path, "index-000") -} +func chunkDir(dir string) string { return filepath.Join(dir, "chunks") } +func walDir(dir string) string { return filepath.Join(dir, "wal") } type mmapFile struct { f *os.File diff --git a/compact.go b/compact.go index 4b883b8308..016f34881f 100644 --- a/compact.go +++ b/compact.go @@ -1,12 +1,14 @@ package tsdb import ( + "math/rand" "os" "path/filepath" "time" "github.com/coreos/etcd/pkg/fileutil" "github.com/fabxc/tsdb/labels" + "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" ) @@ -127,12 +129,15 @@ func (c *compactor) match(bs []compactionInfo) bool { return uint64(bs[len(bs)-1].maxt-bs[0].mint) <= c.opts.maxBlockRange } +var entropy = rand.New(rand.NewSource(time.Now().UnixNano())) + func mergeBlockMetas(blocks ...Block) (res BlockMeta) { m0 := blocks[0].Meta() res.Sequence = m0.Sequence res.MinTime = m0.MinTime res.MaxTime = blocks[len(blocks)-1].Meta().MaxTime + res.ULID = ulid.MustNew(ulid.Now(), entropy) g := m0.Compaction.Generation if g == 0 && len(blocks) > 1 { @@ -163,7 +168,7 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) { return err } - chunkw, err := newChunkWriter(filepath.Join(dir, "chunks")) + chunkw, err := newChunkWriter(chunkDir(dir)) if err != nil { return errors.Wrap(err, "open chunk writer") } diff --git a/db.go b/db.go index 9fc8dde3c9..4ab2bb3982 100644 --- a/db.go +++ b/db.go @@ -4,6 +4,7 @@ package tsdb import ( "bytes" "fmt" + "io" "io/ioutil" "math" "os" @@ -889,3 +890,12 @@ func yoloString(b []byte) string { } return *((*string)(unsafe.Pointer(&h))) } + +func closeAll(cs ...io.Closer) error { + var merr MultiError + + for _, c := range cs { + merr.Add(c.Close()) + } + return merr.Err() +} diff --git a/head.go b/head.go index cd0167ffb0..260fb962e7 100644 --- a/head.go +++ b/head.go @@ -14,6 +14,7 @@ import ( "github.com/fabxc/tsdb/chunks" "github.com/fabxc/tsdb/labels" "github.com/go-kit/kit/log" + "github.com/oklog/ulid" "github.com/pkg/errors" ) @@ -62,11 +63,16 @@ type headBlock struct { } func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*headBlock, error) { - if err := os.MkdirAll(dir, 0755); err != nil { + if err := os.MkdirAll(dir, 0777); err != nil { + return nil, err + } + ulid, err := ulid.New(ulid.Now(), entropy) + if err != nil { return nil, err } if err := writeMetaFile(dir, &BlockMeta{ + ULID: ulid, Sequence: seq, MinTime: mint, MaxTime: maxt, @@ -133,10 +139,19 @@ func (h *headBlock) inBounds(t int64) bool { // Close syncs all data and closes underlying resources of the head block. func (h *headBlock) Close() error { - if err := writeMetaFile(h.dir, &h.meta); err != nil { + if err := h.wal.Close(); err != nil { + return err + } + // Check whether the head block still exists in the underlying dir + // or has already been replaced with a compacted version + meta, err := readMetaFile(h.dir) + if err != nil { return err } - return h.wal.Close() + if meta.ULID == h.meta.ULID { + return writeMetaFile(h.dir, &h.meta) + } + return nil } func (h *headBlock) Meta() BlockMeta {