From d9ca4b47f52b1331a45f3ace14d12d7b207534dc Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 20 Dec 2016 13:10:37 +0100 Subject: [PATCH] Fix offset errors, fix persisted postings order --- block.go | 3 --- db.go | 2 +- querier.go | 44 ++++++++++++++++++++++++-------------------- reader.go | 9 ++++++--- writer.go | 54 +++++++++++++++++++++++++++++++++++++++++++++--------- 5 files changed, 76 insertions(+), 36 deletions(-) diff --git a/block.go b/block.go index ee3eafcb1f..615f7cdf6c 100644 --- a/block.go +++ b/block.go @@ -38,10 +38,8 @@ type persistedBlock struct { } func newPersistedBlock(path string) (*persistedBlock, error) { - // The directory must be named after the base timestamp for the block. // TODO(fabxc): validate match of name and stats time, validate magic. - fmt.Println("new persisted block", path) // mmap files belonging to the block. chunksf, err := openMmapFile(chunksFileName(path)) if err != nil { @@ -65,7 +63,6 @@ func newPersistedBlock(path string) (*persistedBlock, error) { if err != nil { return nil, err } - fmt.Println("initialized new persisted block with", stats) pb := &persistedBlock{ chunksf: chunksf, diff --git a/db.go b/db.go index 70f4400a63..7613bb3a0d 100644 --- a/db.go +++ b/db.go @@ -42,7 +42,7 @@ type DB struct { // TODO(fabxc): make configurable const ( - shardShift = 2 + shardShift = 0 numShards = 1 << shardShift maxChunkSize = 1024 ) diff --git a/querier.go b/querier.go index 93b31223d6..4581480da3 100644 --- a/querier.go +++ b/querier.go @@ -183,6 +183,22 @@ func (q *shardQuerier) LabelValuesFor(string, Label) ([]string, error) { return nil, fmt.Errorf("not implemented") } +func (q *shardQuerier) Select(ms ...Matcher) SeriesSet { + // Sets from different blocks have no time overlap. The reference numbers + // they emit point to series sorted in lexicographic order. + // We can fully connect partial series by simply comparing with the previous + // label set. + if len(q.blocks) == 0 { + return nopSeriesSet{} + } + r := q.blocks[0].Select(ms...) + + for _, s := range q.blocks[1:] { + r = newShardSeriesSet(r, s.Select(ms...)) + } + return r +} + func (q *shardQuerier) Close() error { return nil } @@ -210,8 +226,6 @@ func (q *blockQuerier) Select(ms ...Matcher) SeriesSet { its = append(its, q.selectSingle(m)) } - // TODO(fabxc): pass down time range so the series iterator - // can be instantiated with it? return &blockSeriesSet{ index: q.index, it: Intersect(its...), @@ -256,6 +270,13 @@ func (q *blockQuerier) selectSingle(m Matcher) Postings { return Intersect(rit...) } +func expandPostings(p Postings) (res []uint32, err error) { + for p.Next() { + res = append(res, p.Value()) + } + return res, p.Err() +} + func (q *blockQuerier) LabelValues(name string) ([]string, error) { tpls, err := q.index.LabelValues(name) if err != nil { @@ -319,23 +340,6 @@ func (s *mergedSeriesSet) Next() bool { return s.Next() } -func (q *shardQuerier) Select(ms ...Matcher) SeriesSet { - // Sets from different blocks have no time overlap. The reference numbers - // they emit point to series sorted in lexicographic order. - // We can fully connect partial series by simply comparing with the previous - // label set. - if len(q.blocks) == 0 { - return nopSeriesSet{} - } - r := q.blocks[0].Select(ms...) - - for _, s := range q.blocks[1:] { - r = &shardSeriesSet{a: r, b: s.Select(ms...)} - } - - return r -} - type shardSeriesSet struct { a, b SeriesSet @@ -408,7 +412,7 @@ func (s *shardSeriesSet) advanceB() { } func (s *shardSeriesSet) Next() bool { - if s.as == nil && s.bs == nil { + if s.as == nil && s.bs == nil || s.Err() != nil { return false } diff --git a/reader.go b/reader.go index e940c851c2..a9d933b6f9 100644 --- a/reader.go +++ b/reader.go @@ -31,6 +31,9 @@ func newSeriesReader(b []byte) (*seriesReader, error) { } func (s *seriesReader) Chunk(offset uint32) (chunks.Chunk, error) { + if int(offset) > len(s.b) { + return nil, errors.Errorf("offset %d beyond data size %d", offset, len(s.b)) + } b := s.b[offset:] l, n := binary.Uvarint(b) @@ -427,14 +430,14 @@ func (t *serializedStringTuples) At(i int) ([]string, error) { if len(t.b) < (i+t.l)*4 { return nil, errInvalidSize } - res := make([]string, t.l) + res := make([]string, 0, t.l) for k := 0; k < t.l; k++ { - offset := binary.BigEndian.Uint32(t.b[i*4:]) + offset := binary.BigEndian.Uint32(t.b[(i+k)*4:]) b, err := t.lookup(offset) if err != nil { - return nil, fmt.Errorf("lookup: %s", err) + return nil, errors.Wrap(err, "symbol lookup") } res = append(res, string(b)) } diff --git a/writer.go b/writer.go index fdb5068851..374dbdd839 100644 --- a/writer.go +++ b/writer.go @@ -8,6 +8,9 @@ import ( "os" "sort" "strings" + + "github.com/bradfitz/slice" + "github.com/pkg/errors" ) const ( @@ -216,13 +219,16 @@ func (w *indexWriter) section(l uint32, flag byte, f func(w io.Writer) error) er binary.BigEndian.PutUint32(b[1:], l) if err := w.write(wr, b[:]); err != nil { - return err + return errors.Wrap(err, "writing header") } if err := f(wr); err != nil { - return err + return errors.Wrap(err, "contents write func") + } + if err := w.write(w.w, h.Sum(nil)); err != nil { + return errors.Wrap(err, "writing checksum") } - return w.write(w.w, h.Sum(nil)) + return nil } func (w *indexWriter) writeMeta() error { @@ -288,11 +294,15 @@ func (w *indexWriter) writeSymbols() error { } sort.Strings(symbols) + // The start of the section plus a 5 byte section header are our base. + // TODO(fabxc): switch to relative offsets and hold sections in a TOC. + base := uint32(w.n) + 5 + buf := [binary.MaxVarintLen32]byte{} b := append(make([]byte, 0, 4096), flagStd) for _, s := range symbols { - w.symbols[s] = uint32(w.n) + uint32(len(b)) + w.symbols[s] = base + uint32(len(b)) n := binary.PutUvarint(buf[:], uint64(len(s))) b = append(b, buf[:n]...) @@ -307,12 +317,26 @@ func (w *indexWriter) writeSymbols() error { } func (w *indexWriter) writeSeries() error { - b := make([]byte, 0, 4096) - buf := make([]byte, binary.MaxVarintLen64) + // Series must be stored sorted along their labels. + series := make([]*indexWriterSeries, 0, len(w.series)) for _, s := range w.series { + series = append(series, s) + } + slice.Sort(series, func(i, j int) bool { + return compareLabels(series[i].labels, series[j].labels) < 0 + }) + + // Current end of file plus 5 bytes for section header. + // TODO(fabxc): switch to relative offsets. + base := uint32(w.n) + 5 + + b := make([]byte, 0, 1<<20) // 1MiB + buf := make([]byte, binary.MaxVarintLen64) + + for _, s := range series { // Write label set symbol references. - s.offset = uint32(w.n) + uint32(len(b)) + s.offset = base + uint32(len(b)) n := binary.PutUvarint(buf, uint64(len(s.labels))) b = append(b, buf[:n]...) @@ -391,10 +415,22 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error { b := make([]byte, 0, 4096) buf := [4]byte{} + // Order of the references in the postings list does not imply order + // of the series references within the persisted block they are mapped to. + // We have to sort the new references again. + var refs []uint32 + for it.Next() { - v := w.series[it.Value()].offset - binary.BigEndian.PutUint32(buf[:], v) + refs = append(refs, w.series[it.Value()].offset) + } + if err := it.Err(); err != nil { + return err + } + + slice.Sort(refs, func(i, j int) bool { return refs[i] < refs[j] }) + for _, r := range refs { + binary.BigEndian.PutUint32(buf[:], r) b = append(b, buf[:]...) }