diff --git a/block.go b/block.go index c3e2921c2e..d24198a686 100644 --- a/block.go +++ b/block.go @@ -23,6 +23,10 @@ type block interface { seriesData() seriesDataIterator } +type persistedBlock struct { + +} + type seriesDataIterator interface { next() bool values() (skiplist, []chunks.Chunk) @@ -33,12 +37,10 @@ func compactBlocks(a, b block) error { return nil } -const maxMmapSize = 1 << 20 - type persistedSeries struct { size int dataref []byte - data *[maxMmapSize]byte + data *[maxMapSize]byte } const ( @@ -72,6 +74,32 @@ func (s *persistedSeries) stats() *seriesStats { return (*seriesStats)(unsafe.Pointer(&s.data[seriesMetaSize])) } +// seriesAt returns the series stored at offset as a skiplist and the chunks +// it points to as a byte slice. +func (s *persistedSeries) seriesAt(offset int) (skiplist, []byte, error) { + offset += seriesMetaSize + offset += seriesStatsSize + + switch b := s.data[offset]; b { + case flagStd: + default: + return nil, nil, fmt.Errorf("invalid flag: %x", b) + } + offset++ + + var ( + slLen = *(*uint16)(unsafe.Pointer(&s.data[offset])) + slSize = int(slLen) / int(unsafe.Sizeof(skiplistPair{})) + sl = ((*[maxAllocSize]skiplistPair)(unsafe.Pointer(&s.data[offset+2])))[:slSize] + ) + offset += 3 + + chunksLen := *(*uint32)(unsafe.Pointer(&s.data[offset])) + chunks := ((*[maxAllocSize]byte)(unsafe.Pointer(&s.data[offset])))[:chunksLen] + + return simpleSkiplist(sl), chunks, nil +} + // A skiplist maps offsets to values. The values found in the data at an // offset are strictly greater than the indexed value. type skiplist interface { @@ -115,32 +143,6 @@ func (sl simpleSkiplist) WriteTo(w io.Writer) (n int64, err error) { return n, err } -// seriesAt returns the series stored at offset as a skiplist and the chunks -// it points to as a byte slice. -func (s *persistedSeries) seriesAt(offset int) (skiplist, []byte, error) { - offset += seriesMetaSize - offset += seriesStatsSize - - switch b := s.data[offset]; b { - case flagStd: - default: - return nil, nil, fmt.Errorf("invalid flag: %x", b) - } - offset++ - - var ( - slLen = *(*uint16)(unsafe.Pointer(&s.data[offset])) - slSize = int(slLen) / int(unsafe.Sizeof(skiplistPair{})) - sl = ((*[maxAllocSize]skiplistPair)(unsafe.Pointer(&s.data[offset+2])))[:slSize] - ) - offset += 3 - - chunksLen := *(*uint32)(unsafe.Pointer(&s.data[offset])) - chunks := ((*[maxAllocSize]byte)(unsafe.Pointer(&s.data[offset])))[:chunksLen] - - return simpleSkiplist(sl), chunks, nil -} - type blockWriter struct { block block } diff --git a/cmd/tsdb/Makefile b/cmd/tsdb/Makefile index 770de625c4..221ca893ce 100644 --- a/cmd/tsdb/Makefile +++ b/cmd/tsdb/Makefile @@ -3,14 +3,14 @@ build: bench_default: build @echo ">> running benchmark" - @./tsdb bench write --out=benchout/default --engine=default --metrics=$(NUM_METRICS) testdata.100k + @./tsdb bench write --out=benchout/default --engine=default --metrics=$(NUM_METRICS) testdata.1m @go tool pprof -svg ./tsdb benchout/default/cpu.prof > benchout/default/cpuprof.svg @go tool pprof -svg ./tsdb benchout/default/mem.prof > benchout/default/memprof.svg @go tool pprof -svg ./tsdb benchout/default/block.prof > benchout/default/blockprof.svg bench_tsdb: build @echo ">> running benchmark" - @./tsdb bench write --out=benchout/tsdb --engine=tsdb --metrics=$(NUM_METRICS) testdata.100k + @./tsdb bench write --out=benchout/tsdb --engine=tsdb --metrics=$(NUM_METRICS) testdata.1m @go tool pprof -svg ./tsdb benchout/tsdb/cpu.prof > benchout/tsdb/cpuprof.svg @go tool pprof -svg ./tsdb benchout/tsdb/mem.prof > benchout/tsdb/memprof.svg @go tool pprof -svg ./tsdb benchout/tsdb/block.prof > benchout/tsdb/blockprof.svg diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index a804df376f..ecbe662b3c 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -4,7 +4,6 @@ import ( "fmt" "io" "io/ioutil" - "math/rand" "os" "path/filepath" "runtime" @@ -194,11 +193,11 @@ func (b *writeBenchmark) ingestScrapesShard(metrics []model.Metric, scrapeCount } for i := 0; i < scrapeCount; i++ { - ts = ts + int64(i*10000) + ts += int64(10000) sc.Reset() for _, s := range scrape { - s.value += rand.Int63n(1000) + s.value += 1000 sc.Add(s.labels, float64(s.value)) } if err := b.storage.ingestScrape(ts, &sc); err != nil { diff --git a/db.go b/db.go index 858b448fcc..6cd2b4a3c3 100644 --- a/db.go +++ b/db.go @@ -14,14 +14,15 @@ import ( "github.com/prometheus/common/log" ) -// DefaultOptions used for the DB. +// DefaultOptions used for the DB. They are sane for setups using +// millisecond precision timestamps. var DefaultOptions = &Options{ - StalenessDelta: 5 * time.Minute, + Retention: 15 * 24 * 3600 * 1000, // 15 days } // Options of the DB storage. type Options struct { - StalenessDelta time.Duration + Retention int64 } // DB is a time series storage. @@ -59,7 +60,8 @@ func Open(path string, l log.Logger, opts *Options) (*DB, error) { // TODO(fabxc): validate shard number to be power of 2, which is required // for the bitshift-modulo when finding the right shard. for i := 0; i < numSeriesShards; i++ { - c.shards = append(c.shards, NewSeriesShard()) + path := filepath.Join(path, fmt.Sprintf("%d", i)) + c.shards = append(c.shards, NewSeriesShard(path)) } // TODO(fabxc): run background compaction + GC. @@ -69,26 +71,40 @@ func Open(path string, l log.Logger, opts *Options) (*DB, error) { // Close the database. func (db *DB) Close() error { + var wg sync.WaitGroup + + start := time.Now() + for i, shard := range db.shards { fmt.Println("shard", i) fmt.Println(" num chunks", len(shard.head.forward)) fmt.Println(" num samples", shard.head.samples) - f, err := os.Create(filepath.Join(db.path, fmt.Sprintf("shard-%d-series", i))) - if err != nil { - return err - } - bw := &blockWriter{block: shard.head} - n, err := bw.writeSeries(f) - if err != nil { - return err - } - fmt.Println(" wrote bytes", n) + wg.Add(1) + go func(i int, shard *SeriesShard) { + f, err := os.Create(filepath.Join(db.path, fmt.Sprintf("shard-%d-series", i))) + if err != nil { + panic(err) + } + bw := &blockWriter{block: shard.head} + n, err := bw.writeSeries(f) + if err != nil { + panic(err) + } + fmt.Println(" wrote bytes", n) + if err := f.Sync(); err != nil { + panic(err) + } - if err := f.Close(); err != nil { - return err - } + if err := f.Close(); err != nil { + panic(err) + } + wg.Done() + }(i, shard) } + wg.Wait() + + fmt.Println("final serialization took", time.Since(start)) return nil } @@ -152,6 +168,62 @@ type SeriesIterator interface { Err() error } +const sep = '\xff' + +// SeriesShard handles reads and writes of time series falling into +// a hashed shard of a series. +type SeriesShard struct { + path string + + mtx sync.RWMutex + blocks *Block + head *HeadBlock +} + +// NewSeriesShard returns a new SeriesShard. +func NewSeriesShard(path string) *SeriesShard { + return &SeriesShard{ + path: path, + // TODO(fabxc): restore from checkpoint. + head: &HeadBlock{ + ivIndex: newMemIndex(), + descs: map[uint64][]*chunkDesc{}, + values: map[string]stringset{}, + forward: map[uint32]*chunkDesc{}, + }, + // TODO(fabxc): provide access to persisted blocks. + } +} + +// chunkDesc wraps a plain data chunk and provides cached meta data about it. +type chunkDesc struct { + lset Labels + chunk chunks.Chunk + + // Caching fields. + lastTimestamp int64 + lastValue float64 + + app chunks.Appender // Current appender for the chunks. +} + +func (cd *chunkDesc) append(ts int64, v float64) (err error) { + if cd.app == nil { + cd.app, err = cd.chunk.Appender() + if err != nil { + return err + } + } + if err := cd.app.Append(ts, v); err != nil { + return err + } + + cd.lastTimestamp = ts + cd.lastValue = v + + return nil +} + // LabelRefs contains a reference to a label set that can be resolved // against a Querier. type LabelRefs struct { @@ -300,52 +372,3 @@ func (db *DB) AppendVector(ts int64, v *Vector) error { return nil } - -const sep = '\xff' - -// SeriesShard handles reads and writes of time series falling into -// a hashed shard of a series. -type SeriesShard struct { - mtx sync.RWMutex - blocks *Block - head *HeadBlock -} - -// NewSeriesShard returns a new SeriesShard. -func NewSeriesShard() *SeriesShard { - return &SeriesShard{ - // TODO(fabxc): restore from checkpoint. - head: &HeadBlock{ - index: newMemIndex(), - descs: map[uint64][]*chunkDesc{}, - values: map[string][]string{}, - forward: map[uint32]*chunkDesc{}, - }, - // TODO(fabxc): provide access to persisted blocks. - } -} - -// chunkDesc wraps a plain data chunk and provides cached meta data about it. -type chunkDesc struct { - lset Labels - chunk chunks.Chunk - - // Caching fields. - lastTimestamp int64 - lastValue float64 - - app chunks.Appender // Current appender for the chunks. -} - -func (cd *chunkDesc) append(ts int64, v float64) (err error) { - if cd.app == nil { - cd.app, err = cd.chunk.Appender() - if err != nil { - return err - } - } - cd.lastTimestamp = ts - cd.lastValue = v - - return cd.app.Append(ts, v) -} diff --git a/db_unix.go b/db_unix.go new file mode 100644 index 0000000000..216da9af51 --- /dev/null +++ b/db_unix.go @@ -0,0 +1,53 @@ +// +build !windows,!plan9,!solaris + +package tsdb + +// import ( +// "fmt" +// "syscall" +// "unsafe" +// ) + +// // mmap memory maps a DB's data file. +// func mmap(db *DB, sz int) error { +// // Map the data file to memory. +// b, err := syscall.Mmap(int(db.file.Fd()), 0, sz, syscall.PROT_READ, syscall.MAP_SHARED|db.MmapFlags) +// if err != nil { +// return err +// } + +// // Advise the kernel that the mmap is accessed randomly. +// if err := madvise(b, syscall.MADV_RANDOM); err != nil { +// return fmt.Errorf("madvise: %s", err) +// } + +// // Save the original byte slice and convert to a byte array pointer. +// db.dataref = b +// db.data = (*[maxMapSize]byte)(unsafe.Pointer(&b[0])) +// db.datasz = sz +// return nil +// } + +// // munmap unmaps a DB's data file from memory. +// func munmap(db *DB) error { +// // Ignore the unmap if we have no mapped data. +// if db.dataref == nil { +// return nil +// } + +// // Unmap using the original byte slice. +// err := syscall.Munmap(db.dataref) +// db.dataref = nil +// db.data = nil +// db.datasz = 0 +// return err +// } + +// // NOTE: This function is copied from stdlib because it is not available on darwin. +// func madvise(b []byte, advice int) (err error) { +// _, _, e1 := syscall.Syscall(syscall.SYS_MADVISE, uintptr(unsafe.Pointer(&b[0])), uintptr(len(b)), uintptr(advice)) +// if e1 != 0 { +// err = e1 +// } +// return +// } diff --git a/head.go b/head.go index 857ca47ba6..5b283abb31 100644 --- a/head.go +++ b/head.go @@ -2,6 +2,8 @@ package tsdb import ( "math" + "sort" + "strings" "sync" "github.com/fabxc/tsdb/chunks" @@ -12,19 +14,19 @@ type HeadBlock struct { mtx sync.RWMutex descs map[uint64][]*chunkDesc // labels hash to possible chunks descs forward map[uint32]*chunkDesc // chunk ID to chunk desc - values map[string][]string // label names to possible values - index *memIndex // inverted index for label pairs + values map[string]stringset // label names to possible values + ivIndex *memIndex // inverted index for label pairs - samples uint64 + samples uint64 // total samples in the block. } // get retrieves the chunk with the hash and label set and creates // a new one if it doesn't exist yet. -func (h *HeadBlock) get(hash uint64, lset Labels) (*chunkDesc, bool) { +func (h *HeadBlock) get(hash uint64, lset Labels) *chunkDesc { cds := h.descs[hash] for _, cd := range cds { if cd.lset.Equals(lset) { - return cd, false + return cd } } // None of the given chunks was for the series, create a new one. @@ -32,45 +34,53 @@ func (h *HeadBlock) get(hash uint64, lset Labels) (*chunkDesc, bool) { lset: lset, chunk: chunks.NewXORChunk(int(math.MaxInt64)), } + h.index(cd) h.descs[hash] = append(cds, cd) - return cd, true + return cd } -// append adds the sample to the headblock. If the series is seen -// for the first time it creates a chunk and index entries for it. -// -// TODO(fabxc): switch to single writer and append queue with optimistic concurrency? -func (h *HeadBlock) append(hash uint64, lset Labels, ts int64, v float64) error { - chkd, created := h.get(hash, lset) - if created { - // Add each label pair as a term to the inverted index. - terms := make([]string, 0, len(lset)) - b := make([]byte, 0, 64) - - for _, l := range lset { - b = append(b, l.Name...) - b = append(b, sep) - b = append(b, l.Value...) - - terms = append(terms, string(b)) - b = b[:0] - } - id := h.index.add(terms...) +func (h *HeadBlock) index(chkd *chunkDesc) { + // Add each label pair as a term to the inverted index. + terms := make([]string, 0, len(chkd.lset)) + b := make([]byte, 0, 64) + + for _, l := range chkd.lset { + b = append(b, l.Name...) + b = append(b, sep) + b = append(b, l.Value...) - // Store forward index for the returned ID. - h.forward[id] = chkd + terms = append(terms, string(b)) + b = b[:0] + + // Add to label name to values index. + valset, ok := h.values[l.Name] + if !ok { + valset = stringset{} + h.values[l.Name] = valset + } + valset.set(l.Value) } - if err := chkd.append(ts, v); err != nil { + id := h.ivIndex.add(terms...) + + // Store forward index for the returned ID. + h.forward[id] = chkd +} + +// append adds the sample to the headblock. +func (h *HeadBlock) append(hash uint64, lset Labels, ts int64, v float64) error { + if err := h.get(hash, lset).append(ts, v); err != nil { return err } - h.samples++ return nil } func (h *HeadBlock) stats() *seriesStats { - return &seriesStats{} + return &seriesStats{ + series: uint32(len(h.forward)), + samples: h.samples, + } } func (h *HeadBlock) seriesData() seriesDataIterator { @@ -105,3 +115,27 @@ func (it *chunkDescsIterator) values() (skiplist, []chunks.Chunk) { func (it *chunkDescsIterator) err() error { return nil } + +type stringset map[string]struct{} + +func (ss stringset) set(s string) { + ss[s] = struct{}{} +} + +func (ss stringset) has(s string) bool { + _, ok := ss[s] + return ok +} + +func (ss stringset) String() string { + return strings.Join(ss.slice(), ",") +} + +func (ss stringset) slice() []string { + slice := make([]string, 0, len(ss)) + for k := range ss { + slice = append(slice, k) + } + sort.Strings(slice) + return slice +}