diff --git a/querier.go b/querier.go index 9b614e1845..a397b9a632 100644 --- a/querier.go +++ b/querier.go @@ -76,6 +76,9 @@ func (s *DB) Querier(mint, maxt int64) Querier { } func (q *querier) LabelValues(n string) ([]string, error) { + if len(q.blocks) == 0 { + return nil, nil + } res, err := q.blocks[0].LabelValues(n) if err != nil { return nil, err @@ -163,12 +166,16 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet { } return &blockSeriesSet{ - index: q.index, - chunks: q.chunks, - it: p, - absent: absent, - mint: q.mint, - maxt: q.maxt, + set: &populatedChunkSeries{ + set: &baseChunkSeries{ + p: p, + index: q.index, + absent: absent, + }, + chunks: q.chunks, + mint: q.mint, + maxt: q.maxt, + }, } } @@ -361,23 +368,31 @@ func (s *partitionSeriesSet) Next() bool { return true } -// blockSeriesSet is a set of series from an inverted index query. -type blockSeriesSet struct { - index IndexReader - chunks ChunkReader - it Postings // postings list referencing series - absent []string // labels that must not be set for result series - mint, maxt int64 // considered time range +type chunkSeriesSet interface { + Next() bool + At() (labels.Labels, []ChunkMeta) + Err() error +} - err error - cur Series +// baseChunkSeries loads the label set and chunk references for a postings +// list from an index. It filters out series that have labels set that should be unset. +type baseChunkSeries struct { + p Postings + index IndexReader + absent []string // labels that must be unset in results. + + lset labels.Labels + chks []ChunkMeta + err error } -func (s *blockSeriesSet) Next() bool { - // Step through the postings iterator to find potential series. -outer: - for s.it.Next() { - lset, chunks, err := s.index.Series(s.it.At()) +func (s *baseChunkSeries) At() (labels.Labels, []ChunkMeta) { return s.lset, s.chks } +func (s *baseChunkSeries) Err() error { return s.err } + +func (s *baseChunkSeries) Next() bool { +Outer: + for s.p.Next() { + lset, chunks, err := s.index.Series(s.p.At()) if err != nil { s.err = err return false @@ -386,35 +401,87 @@ outer: // If a series contains a label that must be absent, it is skipped as well. for _, abs := range s.absent { if lset.Get(abs) != "" { - continue outer + continue Outer } } - ser := &chunkSeries{ - labels: lset, - chunks: make([]ChunkMeta, 0, len(chunks)), - chunk: s.chunks.Chunk, - } - // Only use chunks that fit the time range. - for _, c := range chunks { + s.lset = lset + s.chks = chunks + + return true + } + if err := s.p.Err(); err != nil { + s.err = err + } + return false +} + +// populatedChunkSeries loads chunk data from a store for a set of series +// with known chunk references. It filters out chunks that do not fit the +// given time range. +type populatedChunkSeries struct { + set chunkSeriesSet + chunks ChunkReader + mint, maxt int64 + + err error + chks []ChunkMeta + lset labels.Labels +} + +func (s *populatedChunkSeries) At() (labels.Labels, []ChunkMeta) { return s.lset, s.chks } +func (s *populatedChunkSeries) Err() error { return s.err } + +func (s *populatedChunkSeries) Next() bool { + for s.set.Next() { + lset, chks := s.set.At() + + for i := range chks { + c := &chks[i] + if c.MaxTime < s.mint { + chks = chks[1:] continue } if c.MinTime > s.maxt { + chks = chks[:i] break } - ser.chunks = append(ser.chunks, c) + c.Chunk, s.err = s.chunks.Chunk(c.Ref) + if s.err != nil { + return false + } } - // If no chunks of the series apply to the time range, skip it. - if len(ser.chunks) == 0 { + if len(chks) == 0 { continue } - s.cur = ser + s.lset = lset + s.chks = chks + + return true + } + if err := s.set.Err(); err != nil { + s.err = err + } + return false +} + +// blockSeriesSet is a set of series from an inverted index query. +type blockSeriesSet struct { + set chunkSeriesSet + err error + cur Series +} + +func (s *blockSeriesSet) Next() bool { + for s.set.Next() { + lset, chunks := s.set.At() + s.cur = &chunkSeries{labels: lset, chunks: chunks} return true } - if s.it.Err() != nil { - s.err = s.it.Err() + if s.set.Err() != nil { + s.err = s.set.Err() } return false } @@ -427,10 +494,6 @@ func (s *blockSeriesSet) Err() error { return s.err } type chunkSeries struct { labels labels.Labels chunks []ChunkMeta // in-order chunk refs - - // chunk is a function that retrieves chunks based on a reference - // number contained in the chunk meta information. - chunk func(ref uint64) (chunks.Chunk, error) } func (s *chunkSeries) Labels() labels.Labels { @@ -438,21 +501,7 @@ func (s *chunkSeries) Labels() labels.Labels { } func (s *chunkSeries) Iterator() SeriesIterator { - var cs []chunks.Chunk - var mints []int64 - - for _, co := range s.chunks { - c, err := s.chunk(co.Ref) - if err != nil { - panic(err) // TODO(fabxc): add error series iterator. - } - cs = append(cs, c) - mints = append(mints, co.MinTime) - } - - // TODO(fabxc): consider pushing chunk retrieval further down. In practice, we - // probably have to touch all chunks anyway and it doesn't matter. - return newChunkSeriesIterator(mints, cs) + return newChunkSeriesIterator(s.chunks) } // SeriesIterator iterates over the data of a time series. @@ -538,43 +587,38 @@ func (it *chainedSeriesIterator) Err() error { // chunkSeriesIterator implements a series iterator on top // of a list of time-sorted, non-overlapping chunks. type chunkSeriesIterator struct { - mints []int64 // minimum timestamps for each iterator - chunks []chunks.Chunk + chunks []ChunkMeta i int cur chunks.Iterator } -func newChunkSeriesIterator(mints []int64, cs []chunks.Chunk) *chunkSeriesIterator { - if len(mints) != len(cs) { - panic("chunk references and chunks length don't match") - } +func newChunkSeriesIterator(cs []ChunkMeta) *chunkSeriesIterator { return &chunkSeriesIterator{ - mints: mints, chunks: cs, i: 0, - cur: cs[0].Iterator(), + cur: cs[0].Chunk.Iterator(), } } func (it *chunkSeriesIterator) Seek(t int64) (ok bool) { // Only do binary search forward to stay in line with other iterators // that can only move forward. - x := sort.Search(len(it.mints[it.i:]), func(i int) bool { return it.mints[i] >= t }) + x := sort.Search(len(it.chunks[it.i:]), func(i int) bool { return it.chunks[i].MinTime >= t }) x += it.i // If the timestamp was not found, it might be in the last chunk. - if x == len(it.mints) { + if x == len(it.chunks) { x-- } // Go to previous chunk if the chunk doesn't exactly start with t. // If we are already at the first chunk, we use it as it's the best we have. - if x > 0 && it.mints[x] > t { + if x > 0 && it.chunks[x].MinTime > t { x-- } it.i = x - it.cur = it.chunks[x].Iterator() + it.cur = it.chunks[x].Chunk.Iterator() for it.cur.Next() { t0, _ := it.cur.At() @@ -601,7 +645,7 @@ func (it *chunkSeriesIterator) Next() bool { } it.i++ - it.cur = it.chunks[it.i].Iterator() + it.cur = it.chunks[it.i].Chunk.Iterator() return it.Next() } diff --git a/rw_test.go b/rw_test.go index 9ab03306f4..4f6dad1697 100644 --- a/rw_test.go +++ b/rw_test.go @@ -4,13 +4,55 @@ import ( "io/ioutil" "math/rand" "os" - "testing" - "sort" + "testing" + "github.com/fabxc/tsdb/chunks" + "github.com/fabxc/tsdb/labels" "github.com/stretchr/testify/require" ) +type mockIndexReader struct { + labelValues func(...string) (StringTuples, error) + postings func(string, string) (Postings, error) + series func(uint32) (labels.Labels, []ChunkMeta, error) + labelIndices func() ([][]string, error) + close func() error +} + +func (ir *mockIndexReader) LabelValues(names ...string) (StringTuples, error) { + return ir.labelValues(names...) +} + +func (ir *mockIndexReader) Postings(name, value string) (Postings, error) { + return ir.postings(name, value) +} + +func (ir *mockIndexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error) { + return ir.series(ref) +} + +func (ir *mockIndexReader) LabelIndices() ([][]string, error) { + return ir.labelIndices() +} + +func (ir *mockIndexReader) Close() error { + return ir.close() +} + +type mockChunkReader struct { + chunk func(ref uint64) (chunks.Chunk, error) + close func() error +} + +func (cr *mockChunkReader) Chunk(ref uint64) (chunks.Chunk, error) { + return cr.chunk(ref) +} + +func (cr *mockChunkReader) Close() error { + return cr.close() +} + func TestPersistence_index_e2e(t *testing.T) { dir, err := ioutil.TempDir("", "test_persistence_e2e") require.NoError(t, err)