|
|
|
@ -121,8 +121,6 @@ func NewBlockQuerier(b BlockReader, mint, maxt int64) (storage.Querier, error) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.SeriesSet { |
|
|
|
|
mint := q.mint |
|
|
|
|
maxt := q.maxt |
|
|
|
|
p, err := PostingsForMatchers(q.index, ms...) |
|
|
|
|
if err != nil { |
|
|
|
|
return storage.ErrSeriesSet(err) |
|
|
|
@ -131,16 +129,15 @@ func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms .. |
|
|
|
|
p = q.index.SortedPostings(p) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
mint := q.mint |
|
|
|
|
maxt := q.maxt |
|
|
|
|
discardSamples := false |
|
|
|
|
if hints != nil { |
|
|
|
|
mint = hints.Start |
|
|
|
|
maxt = hints.End |
|
|
|
|
if hints.Func == "series" { |
|
|
|
|
// When you're only looking up metadata (for example series API), you don't need to load any chunks.
|
|
|
|
|
return newBlockSeriesSet(q.index, newNopChunkReader(), q.tombstones, p, mint, maxt) |
|
|
|
|
} |
|
|
|
|
discardSamples = hints.Func == storage.DiscardSamplesFunc |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return newBlockSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt) |
|
|
|
|
return newBlockSeriesSet(q.index, q.chunks, q.tombstones, p, discardSamples, mint, maxt) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// blockChunkQuerier provides chunk querying access to a single block database.
|
|
|
|
@ -158,12 +155,6 @@ func NewBlockChunkQuerier(b BlockReader, mint, maxt int64) (storage.ChunkQuerier |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (q *blockChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet { |
|
|
|
|
mint := q.mint |
|
|
|
|
maxt := q.maxt |
|
|
|
|
if hints != nil { |
|
|
|
|
mint = hints.Start |
|
|
|
|
maxt = hints.End |
|
|
|
|
} |
|
|
|
|
p, err := PostingsForMatchers(q.index, ms...) |
|
|
|
|
if err != nil { |
|
|
|
|
return storage.ErrChunkSeriesSet(err) |
|
|
|
@ -171,7 +162,16 @@ func (q *blockChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, |
|
|
|
|
if sortSeries { |
|
|
|
|
p = q.index.SortedPostings(p) |
|
|
|
|
} |
|
|
|
|
return newBlockChunkSeriesSet(q.index, q.chunks, q.tombstones, p, mint, maxt) |
|
|
|
|
|
|
|
|
|
mint := q.mint |
|
|
|
|
maxt := q.maxt |
|
|
|
|
discardSamples := false |
|
|
|
|
if hints != nil { |
|
|
|
|
mint = hints.Start |
|
|
|
|
maxt = hints.End |
|
|
|
|
discardSamples = hints.Func == storage.DiscardSamplesFunc |
|
|
|
|
} |
|
|
|
|
return newBlockChunkSeriesSet(q.index, q.chunks, q.tombstones, p, discardSamples, mint, maxt) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func findSetMatches(pattern string) []string { |
|
|
|
@ -374,86 +374,70 @@ func inversePostingsForMatcher(ix IndexReader, m *labels.Matcher) (index.Posting |
|
|
|
|
// See newBlockSeriesSet and newBlockChunkSeriesSet to use it for either sample or chunk iterating.
|
|
|
|
|
type blockBaseSeriesSet struct { |
|
|
|
|
p index.Postings |
|
|
|
|
index IndexReader |
|
|
|
|
series index.SeriesSelector |
|
|
|
|
|
|
|
|
|
chunks ChunkReader |
|
|
|
|
tombstones tombstones.Reader |
|
|
|
|
mint, maxt int64 |
|
|
|
|
skipChunks bool |
|
|
|
|
|
|
|
|
|
currIterFn func() *populateWithDelGenericSeriesIterator |
|
|
|
|
currLabels labels.Labels |
|
|
|
|
|
|
|
|
|
bufChks []chunks.Meta |
|
|
|
|
bufLbls labels.Labels |
|
|
|
|
bufIntervals tombstones.Intervals |
|
|
|
|
err error |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (b *blockBaseSeriesSet) Next() bool { |
|
|
|
|
for b.p.Next() { |
|
|
|
|
if err := b.index.Series(b.p.At(), &b.bufLbls, &b.bufChks); err != nil { |
|
|
|
|
// Postings may be stale. Skip if no underlying series exists.
|
|
|
|
|
if errors.Cause(err) == storage.ErrNotFound { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
b.err = errors.Wrapf(err, "get series %d", b.p.At()) |
|
|
|
|
return false |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if len(b.bufChks) == 0 { |
|
|
|
|
continue |
|
|
|
|
edgesIntervals := tombstones.Intervals{ |
|
|
|
|
tombstones.Interval{Mint: math.MinInt64, Maxt: b.mint - 1}, |
|
|
|
|
tombstones.Interval{Mint: b.maxt + 1, Maxt: math.MaxInt64}, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for b.p.Next() { |
|
|
|
|
intervals, err := b.tombstones.Get(b.p.At()) |
|
|
|
|
if err != nil { |
|
|
|
|
b.err = errors.Wrap(err, "get tombstones") |
|
|
|
|
return false |
|
|
|
|
} |
|
|
|
|
b.bufIntervals = b.bufIntervals[:0] |
|
|
|
|
if len(intervals) > 0 { |
|
|
|
|
b.bufIntervals = append(b.bufIntervals, intervals...) |
|
|
|
|
} |
|
|
|
|
b.bufIntervals = b.bufIntervals.Add(edgesIntervals[0]).Add(edgesIntervals[1]) |
|
|
|
|
|
|
|
|
|
// NOTE:
|
|
|
|
|
// * block time range is half-open: [meta.MinTime, meta.MaxTime).
|
|
|
|
|
// * chunks are both closed: [chk.MinTime, chk.MaxTime].
|
|
|
|
|
// * requested time ranges are closed: [req.Start, req.End].
|
|
|
|
|
|
|
|
|
|
var trimFront, trimBack bool |
|
|
|
|
|
|
|
|
|
// Copy chunks as iteratables are reusable.
|
|
|
|
|
chks := make([]chunks.Meta, 0, len(b.bufChks)) |
|
|
|
|
|
|
|
|
|
// Prefilter chunks and pick those which are not entirely deleted or totally outside of the requested range.
|
|
|
|
|
for _, chk := range b.bufChks { |
|
|
|
|
if chk.MaxTime < b.mint { |
|
|
|
|
lset, bufChks, err := b.series.Select(b.p.At(), b.skipChunks, b.bufIntervals...) |
|
|
|
|
if err != nil { |
|
|
|
|
// Postings may be stale. Skip if no underlying series exists.
|
|
|
|
|
if errors.Cause(err) == storage.ErrNotFound { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
if chk.MinTime > b.maxt { |
|
|
|
|
// Postings may be stale. Skip if no underlying series exists.
|
|
|
|
|
if errors.Cause(err) == index.ErrNoChunkMatched { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if !(tombstones.Interval{Mint: chk.MinTime, Maxt: chk.MaxTime}.IsSubrange(intervals)) { |
|
|
|
|
chks = append(chks, chk) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// If still not entirely deleted, check if trim is needed based on requested time range.
|
|
|
|
|
if chk.MinTime < b.mint { |
|
|
|
|
trimFront = true |
|
|
|
|
} |
|
|
|
|
if chk.MaxTime > b.maxt { |
|
|
|
|
trimBack = true |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if len(chks) == 0 { |
|
|
|
|
continue |
|
|
|
|
b.err = errors.Wrapf(err, "get series %d", b.p.At()) |
|
|
|
|
return false |
|
|
|
|
} |
|
|
|
|
// Copy labels as they can be used across iterations.
|
|
|
|
|
b.currLabels = make(labels.Labels, len(lset)) |
|
|
|
|
copy(b.currLabels, lset) |
|
|
|
|
|
|
|
|
|
if trimFront { |
|
|
|
|
intervals = intervals.Add(tombstones.Interval{Mint: math.MinInt64, Maxt: b.mint - 1}) |
|
|
|
|
if b.skipChunks { |
|
|
|
|
b.currIterFn = func() *populateWithDelGenericSeriesIterator { |
|
|
|
|
panic("discard samples hint was passed, no sample iteration was expected") |
|
|
|
|
} |
|
|
|
|
if trimBack { |
|
|
|
|
intervals = intervals.Add(tombstones.Interval{Mint: b.maxt + 1, Maxt: math.MaxInt64}) |
|
|
|
|
return true |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
b.currLabels = make(labels.Labels, len(b.bufLbls)) |
|
|
|
|
copy(b.currLabels, b.bufLbls) |
|
|
|
|
// NOTE:
|
|
|
|
|
// * block time range is half-open: [meta.MinTime, meta.MaxTime).
|
|
|
|
|
// * chunks are both closed: [chk.MinTime, chk.MaxTime].
|
|
|
|
|
// * requested time ranges are closed: [req.Start, req.End].
|
|
|
|
|
|
|
|
|
|
// Copy chunks as iteratables are reusable.
|
|
|
|
|
chks := make([]chunks.Meta, len(bufChks)) |
|
|
|
|
copy(chks, bufChks) |
|
|
|
|
b.currIterFn = func() *populateWithDelGenericSeriesIterator { |
|
|
|
|
return newPopulateWithDelGenericSeriesIterator(b.chunks, chks, intervals) |
|
|
|
|
} |
|
|
|
@ -666,16 +650,16 @@ type blockSeriesSet struct { |
|
|
|
|
blockBaseSeriesSet |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func newBlockSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64) storage.SeriesSet { |
|
|
|
|
func newBlockSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, skipChunks bool, mint, maxt int64) storage.SeriesSet { |
|
|
|
|
return &blockSeriesSet{ |
|
|
|
|
blockBaseSeriesSet{ |
|
|
|
|
index: i, |
|
|
|
|
series: i.Series(), |
|
|
|
|
chunks: c, |
|
|
|
|
tombstones: t, |
|
|
|
|
p: p, |
|
|
|
|
mint: mint, |
|
|
|
|
maxt: maxt, |
|
|
|
|
bufLbls: make(labels.Labels, 0, 10), |
|
|
|
|
skipChunks: skipChunks, |
|
|
|
|
}, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -698,16 +682,16 @@ type blockChunkSeriesSet struct { |
|
|
|
|
blockBaseSeriesSet |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func newBlockChunkSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, mint, maxt int64) storage.ChunkSeriesSet { |
|
|
|
|
func newBlockChunkSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, skipChunks bool, mint, maxt int64) storage.ChunkSeriesSet { |
|
|
|
|
return &blockChunkSeriesSet{ |
|
|
|
|
blockBaseSeriesSet{ |
|
|
|
|
index: i, |
|
|
|
|
series: i.Series(), |
|
|
|
|
chunks: c, |
|
|
|
|
tombstones: t, |
|
|
|
|
p: p, |
|
|
|
|
mint: mint, |
|
|
|
|
maxt: maxt, |
|
|
|
|
bufLbls: make(labels.Labels, 0, 10), |
|
|
|
|
skipChunks: skipChunks, |
|
|
|
|
}, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -830,17 +814,3 @@ Outer: |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (it *DeletedIterator) Err() error { return it.Iter.Err() } |
|
|
|
|
|
|
|
|
|
type nopChunkReader struct { |
|
|
|
|
emptyChunk chunkenc.Chunk |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func newNopChunkReader() ChunkReader { |
|
|
|
|
return nopChunkReader{ |
|
|
|
|
emptyChunk: chunkenc.NewXORChunk(), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (cr nopChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) { return cr.emptyChunk, nil } |
|
|
|
|
|
|
|
|
|
func (cr nopChunkReader) Close() error { return nil } |
|
|
|
|