diff --git a/chunks/chunk.go b/chunks/chunk.go index 75f2a5c627..6bff827357 100644 --- a/chunks/chunk.go +++ b/chunks/chunk.go @@ -51,7 +51,7 @@ type Appender interface { // Iterator is a simple iterator that can only get the next value. type Iterator interface { - Values() (int64, float64) + At() (int64, float64) Err() error Next() bool } diff --git a/chunks/chunk_test.go b/chunks/chunk_test.go index d7a1698ded..e45343c410 100644 --- a/chunks/chunk_test.go +++ b/chunks/chunk_test.go @@ -67,7 +67,7 @@ func testChunk(c Chunk) error { it := c.Iterator() var res []pair for it.Next() { - ts, v := it.Values() + ts, v := it.At() res = append(res, pair{t: ts, v: v}) } if it.Err() != nil { @@ -125,7 +125,7 @@ func benchmarkIterator(b *testing.B, newChunk func() Chunk) { it := c.Iterator() for it.Next() { - _, v := it.Values() + _, v := it.At() res = append(res, v) } if it.Err() != io.EOF { diff --git a/chunks/xor.go b/chunks/xor.go index c0b8e83cc2..8acdda6ee0 100644 --- a/chunks/xor.go +++ b/chunks/xor.go @@ -190,7 +190,7 @@ type xorIterator struct { err error } -func (it *xorIterator) Values() (int64, float64) { +func (it *xorIterator) At() (int64, float64) { return it.t, it.val } diff --git a/compact.go b/compact.go index c6539179fd..7149edbf42 100644 --- a/compact.go +++ b/compact.go @@ -196,7 +196,7 @@ func (c *compactionSeriesSet) Next() bool { return false } - c.l, c.c, c.err = c.index.Series(c.p.Value()) + c.l, c.c, c.err = c.index.Series(c.p.At()) if c.err != nil { return false } diff --git a/db.go b/db.go index 07da446155..2e92838044 100644 --- a/db.go +++ b/db.go @@ -95,8 +95,6 @@ func (db *DB) Close() error { var g errgroup.Group for _, shard := range db.shards { - // Fix closure argument to goroutine. - shard := shard g.Go(shard.Close) } diff --git a/postings.go b/postings.go index 9dde3e678a..b3afe831af 100644 --- a/postings.go +++ b/postings.go @@ -39,8 +39,8 @@ type Postings interface { // true if a value was found. Seek(v uint32) bool - // Value returns the value at the current iterator position. - Value() uint32 + // At returns the value at the current iterator position. + At() uint32 // Err returns the last error of the iterator. Err() error @@ -53,12 +53,12 @@ type errPostings struct { func (e errPostings) Next() bool { return false } func (e errPostings) Seek(uint32) bool { return false } -func (e errPostings) Value() uint32 { return 0 } +func (e errPostings) At() uint32 { return 0 } func (e errPostings) Err() error { return e.err } func expandPostings(p Postings) (res []uint32, err error) { for p.Next() { - res = append(res, p.Value()) + res = append(res, p.At()) } return res, p.Err() } @@ -93,7 +93,7 @@ func newIntersectPostings(a, b Postings) *intersectPostings { return it } -func (it *intersectPostings) Value() uint32 { +func (it *intersectPostings) At() uint32 { return it.cur } @@ -102,7 +102,7 @@ func (it *intersectPostings) Next() bool { if !it.aok || !it.bok { return false } - av, bv := it.a.Value(), it.b.Value() + av, bv := it.a.At(), it.b.At() if av < bv { it.aok = it.a.Seek(bv) @@ -157,7 +157,7 @@ func newMergePostings(a, b Postings) *mergePostings { return it } -func (it *mergePostings) Value() uint32 { +func (it *mergePostings) At() uint32 { return it.cur } @@ -167,17 +167,17 @@ func (it *mergePostings) Next() bool { } if !it.aok { - it.cur = it.b.Value() + it.cur = it.b.At() it.bok = it.b.Next() return true } if !it.bok { - it.cur = it.a.Value() + it.cur = it.a.At() it.aok = it.a.Next() return true } - acur, bcur := it.a.Value(), it.b.Value() + acur, bcur := it.a.At(), it.b.At() if acur < bcur { it.cur = acur @@ -219,7 +219,7 @@ func newListPostings(list []uint32) *listPostings { return &listPostings{list: list, idx: -1} } -func (it *listPostings) Value() uint32 { +func (it *listPostings) At() uint32 { return it.list[it.idx] } diff --git a/querier.go b/querier.go index 15bdc78e43..4cacc76319 100644 --- a/querier.go +++ b/querier.go @@ -33,8 +33,6 @@ type Series interface { // Iterator returns a new iterator of the data of the series. Iterator() SeriesIterator - - // Ref() uint32 } // querier merges query results from a set of shard querieres. @@ -313,15 +311,15 @@ func (q *blockQuerier) Close() error { // SeriesSet contains a set of series. type SeriesSet interface { Next() bool - Series() Series + At() Series Err() error } type nopSeriesSet struct{} -func (nopSeriesSet) Next() bool { return false } -func (nopSeriesSet) Series() Series { return nil } -func (nopSeriesSet) Err() error { return nil } +func (nopSeriesSet) Next() bool { return false } +func (nopSeriesSet) At() Series { return nil } +func (nopSeriesSet) Err() error { return nil } type mergedSeriesSet struct { sets []SeriesSet @@ -330,8 +328,8 @@ type mergedSeriesSet struct { err error } -func (s *mergedSeriesSet) Series() Series { return s.sets[s.cur].Series() } -func (s *mergedSeriesSet) Err() error { return s.sets[s.cur].Err() } +func (s *mergedSeriesSet) At() Series { return s.sets[s.cur].At() } +func (s *mergedSeriesSet) Err() error { return s.sets[s.cur].Err() } func (s *mergedSeriesSet) Next() bool { // TODO(fabxc): We just emit the sets one after one. They are each @@ -365,7 +363,7 @@ func newShardSeriesSet(a, b SeriesSet) *shardSeriesSet { return s } -func (s *shardSeriesSet) Series() Series { +func (s *shardSeriesSet) At() Series { return s.cur } @@ -383,7 +381,7 @@ func (s *shardSeriesSet) compare() int { if s.bdone { return -1 } - return labels.Compare(s.a.Series().Labels(), s.a.Series().Labels()) + return labels.Compare(s.a.At().Labels(), s.a.At().Labels()) } func (s *shardSeriesSet) Next() bool { @@ -394,15 +392,15 @@ func (s *shardSeriesSet) Next() bool { d := s.compare() // Both sets contain the current series. Chain them into a single one. if d > 0 { - s.cur = s.b.Series() + s.cur = s.b.At() s.bdone = !s.b.Next() } else if d < 0 { - s.cur = s.a.Series() + s.cur = s.a.At() s.adone = !s.a.Next() } else { - s.cur = &chainedSeries{series: []Series{s.a.Series(), s.b.Series()}} + s.cur = &chainedSeries{series: []Series{s.a.At(), s.b.At()}} s.adone = !s.a.Next() s.bdone = !s.b.Next() } @@ -425,7 +423,7 @@ func (s *blockSeriesSet) Next() bool { // Step through the postings iterator to find potential series. outer: for s.it.Next() { - lset, chunks, err := s.index.Series(s.it.Value()) + lset, chunks, err := s.index.Series(s.it.At()) if err != nil { s.err = err return false @@ -467,8 +465,8 @@ outer: return false } -func (s *blockSeriesSet) Series() Series { return s.cur } -func (s *blockSeriesSet) Err() error { return s.err } +func (s *blockSeriesSet) At() Series { return s.cur } +func (s *blockSeriesSet) Err() error { return s.err } // chunkSeries is a series that is backed by a sequence of chunks holding // time series data. @@ -510,7 +508,7 @@ type SeriesIterator interface { // before tt. Seek(t int64) bool // Values returns the current timestamp/value pair. - Values() (t int64, v float64) + At() (t int64, v float64) // Next advances the iterator by one. Next() bool // Err returns the current error. @@ -575,8 +573,8 @@ func (it *chainedSeriesIterator) Next() bool { return it.Next() } -func (it *chainedSeriesIterator) Values() (t int64, v float64) { - return it.cur.Values() +func (it *chainedSeriesIterator) At() (t int64, v float64) { + return it.cur.At() } func (it *chainedSeriesIterator) Err() error { @@ -625,7 +623,7 @@ func (it *chunkSeriesIterator) Seek(t int64) (ok bool) { it.cur = it.chunks[x].Iterator() for it.cur.Next() { - t0, _ := it.cur.Values() + t0, _ := it.cur.At() if t0 >= t { return true } @@ -633,8 +631,8 @@ func (it *chunkSeriesIterator) Seek(t int64) (ok bool) { return false } -func (it *chunkSeriesIterator) Values() (t int64, v float64) { - return it.cur.Values() +func (it *chunkSeriesIterator) At() (t int64, v float64) { + return it.cur.At() } func (it *chunkSeriesIterator) Next() bool { @@ -700,7 +698,7 @@ func (b *BufferedSeriesIterator) Seek(t int64) bool { if !ok { return false } - b.lastTime, _ = b.Values() + b.lastTime, _ = b.At() } if b.lastTime >= t { @@ -718,18 +716,18 @@ func (b *BufferedSeriesIterator) Seek(t int64) bool { // Next advances the iterator to the next element. func (b *BufferedSeriesIterator) Next() bool { // Add current element to buffer before advancing. - b.buf.add(b.it.Values()) + b.buf.add(b.it.At()) ok := b.it.Next() if ok { - b.lastTime, _ = b.Values() + b.lastTime, _ = b.At() } return ok } // Values returns the current element of the iterator. -func (b *BufferedSeriesIterator) Values() (int64, float64) { - return b.it.Values() +func (b *BufferedSeriesIterator) At() (int64, float64) { + return b.it.At() } // Err returns the last encountered error. @@ -786,7 +784,7 @@ func (it *sampleRingIterator) Err() error { return nil } -func (it *sampleRingIterator) Values() (int64, float64) { +func (it *sampleRingIterator) At() (int64, float64) { return it.r.at(it.i) } diff --git a/querier_test.go b/querier_test.go index 831fc24360..1ad093bb7a 100644 --- a/querier_test.go +++ b/querier_test.go @@ -10,16 +10,16 @@ import ( ) type mockSeriesIterator struct { - seek func(int64) bool - values func() (int64, float64) - next func() bool - err func() error + seek func(int64) bool + at func() (int64, float64) + next func() bool + err func() error } -func (m *mockSeriesIterator) Seek(t int64) bool { return m.seek(t) } -func (m *mockSeriesIterator) Values() (int64, float64) { return m.values() } -func (m *mockSeriesIterator) Next() bool { return m.next() } -func (m *mockSeriesIterator) Err() error { return m.err() } +func (m *mockSeriesIterator) Seek(t int64) bool { return m.seek(t) } +func (m *mockSeriesIterator) At() (int64, float64) { return m.at() } +func (m *mockSeriesIterator) Next() bool { return m.next() } +func (m *mockSeriesIterator) Err() error { return m.err() } type mockSeries struct { labels func() labels.Labels @@ -38,7 +38,7 @@ func newListSeriesIterator(list []sample) *listSeriesIterator { return &listSeriesIterator{list: list, idx: -1} } -func (it *listSeriesIterator) Values() (int64, float64) { +func (it *listSeriesIterator) At() (int64, float64) { s := it.list[it.idx] return s.t, s.v } @@ -71,9 +71,9 @@ type mockSeriesSet struct { err func() error } -func (m *mockSeriesSet) Next() bool { return m.next() } -func (m *mockSeriesSet) Series() Series { return m.series() } -func (m *mockSeriesSet) Err() error { return m.err() } +func (m *mockSeriesSet) Next() bool { return m.next() } +func (m *mockSeriesSet) At() Series { return m.series() } +func (m *mockSeriesSet) Err() error { return m.err() } func newListSeriesSet(list []Series) *mockSeriesSet { i := -1 @@ -152,8 +152,8 @@ Outer: if !eok { continue Outer } - sexp := c.exp.Series() - sres := res.Series() + sexp := c.exp.At() + sres := res.At() require.Equal(t, sexp.Labels(), sres.Labels(), "labels") @@ -168,7 +168,7 @@ Outer: func expandSeriesIterator(it SeriesIterator) (r []sample, err error) { for it.Next() { - t, v := it.Values() + t, v := it.At() r = append(r, sample{t: t, v: v}) } @@ -243,13 +243,13 @@ func TestBufferedSeriesIterator(t *testing.T) { var b []sample bit := it.Buffer() for bit.Next() { - t, v := bit.Values() + t, v := bit.At() b = append(b, sample{t: t, v: v}) } require.Equal(t, exp, b, "buffer mismatch") } sampleEq := func(ets int64, ev float64) { - ts, v := it.Values() + ts, v := it.At() require.Equal(t, ets, ts, "timestamp mismatch") require.Equal(t, ev, v, "value mismatch") } diff --git a/wal.go b/wal.go index 18f66edb95..20fb70b034 100644 --- a/wal.go +++ b/wal.go @@ -136,10 +136,11 @@ func newWALEncoder(f *os.File) (*walEncoder, error) { if err != nil { return nil, err } - return &walEncoder{ + enc := &walEncoder{ w: ioutil.NewPageWriter(f, walPageBytes, int(offset)), buf: make([]byte, 0, 1024*1024), - }, nil + } + return enc, nil } func (e *walEncoder) flush() error { diff --git a/writer.go b/writer.go index a05d43586a..5b74b4c82f 100644 --- a/writer.go +++ b/writer.go @@ -424,9 +424,9 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error { var refs []uint32 for it.Next() { - s, ok := w.series[it.Value()] + s, ok := w.series[it.At()] if !ok { - return errors.Errorf("series for reference %d not found", it.Value()) + return errors.Errorf("series for reference %d not found", it.At()) } refs = append(refs, s.offset) }