tsdb: sample chunk info from tsdb index to limit the amount of chunkrefs we read from index (#8742)

**What this PR does / why we need it**:
Previously we used to read the info of all the chunks from the index and
then filter it out in a layer above within the tsdb code. This wastes a
lot of resources when there are too many chunks in the index, but we
just need a few of them based on the query range.

Before jumping into how and why I went with chunk sampling, here are
some points to consider:
* Chunks in the index are sorted by the start time of the chunk. Since
this does not tell us much about the end time of the chunks, we can only
skip chunks that start after the end time of the query, which still
would make us process lots of chunks when the query touches chunks that
are near the end of the table boundary.
* Data is written to tsdb with variable length encoding. This means we
can't skip/jump chunks since each chunk info might vary in the number of
bytes we write.

Here is how I have implemented the sampling approach:
* Chunks are sampled considering their end times from the index and
stored in memory.
* Here is how `chunkSample` is defined:
```
type chunkSample struct {
	largestMaxt   int64 // holds largest chunk end time we have seen so far. In other words all the earlier chunks have maxt <= largestMaxt
	idx           int   // index of the chunk in the list which helps with determining position of sampled chunk
	offset        int   // offset is relative to beginning chunk info block i.e after series labels info and chunk count etc
	prevChunkMaxt int64 // chunk times are stored as deltas. This is used for calculating mint of sampled chunk
}
```
* When a query comes in, we will find `chunkSample`, which has the
largest "largestMaxt" that is less than the given query start time. In
other words, find a chunk sample which skips all/most of the chunks that
end before the query start time.
* Once we have found a chunk sample which skips all/most of the chunks
that end before the query start, we will sequentially go through chunks
and consider only the once that overlap with the query range. We will
stop processing chunks as soon as we see a chunk that starts after the
end time of the query since the chunks are sorted by start time.
* Sampling of chunks is done lazily for only the series that are
queried, so we do not waste any resources on sampling series that are
not queried.
* To avoid sampling too many chunks, I am sampling chunks at `1h` steps
i.e given a sampled chunk with chunk end time `t`, the next chunk would
be sampled with end time >= `t + 1h`. This means typically, we should
have ~28 chunks sampled for each series queried from each index file,
considering 2h default chunk length and chunks overlapping multiple
tables.

Here are the benchmark results showing the difference it makes:
```
benchmark                              old ns/op     new ns/op     delta
BenchmarkTSDBIndex_GetChunkRefs-10     12420741      4764309       -61.64%
BenchmarkTSDBIndex_GetChunkRefs-10     12412014      4794156       -61.37%
BenchmarkTSDBIndex_GetChunkRefs-10     12382716      4748571       -61.65%
BenchmarkTSDBIndex_GetChunkRefs-10     12391397      4691054       -62.14%
BenchmarkTSDBIndex_GetChunkRefs-10     12272200      5023567       -59.07%

benchmark                              old allocs     new allocs     delta
BenchmarkTSDBIndex_GetChunkRefs-10     345653         40             -99.99%
BenchmarkTSDBIndex_GetChunkRefs-10     345653         40             -99.99%
BenchmarkTSDBIndex_GetChunkRefs-10     345653         40             -99.99%
BenchmarkTSDBIndex_GetChunkRefs-10     345653         40             -99.99%
BenchmarkTSDBIndex_GetChunkRefs-10     345653         40             -99.99%

benchmark                              old bytes     new bytes     delta
BenchmarkTSDBIndex_GetChunkRefs-10     27286536      6398855       -76.55%
BenchmarkTSDBIndex_GetChunkRefs-10     27286571      6399276       -76.55%
BenchmarkTSDBIndex_GetChunkRefs-10     27286566      6400699       -76.54%
BenchmarkTSDBIndex_GetChunkRefs-10     27286561      6399158       -76.55%
BenchmarkTSDBIndex_GetChunkRefs-10     27286580      6399643       -76.55%
```

**Checklist**
- [x] Tests updated
pull/8888/head
Sandeep Sukhani 3 years ago committed by GitHub
parent 1549fec2fa
commit 4e893a0a88
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      pkg/storage/stores/tsdb/compactor.go
  2. 8
      pkg/storage/stores/tsdb/compactor_test.go
  3. 3
      pkg/storage/stores/tsdb/head_manager.go
  4. 13
      pkg/storage/stores/tsdb/head_read.go
  5. 214
      pkg/storage/stores/tsdb/index/index.go
  6. 407
      pkg/storage/stores/tsdb/index/index_test.go
  7. 4
      pkg/storage/stores/tsdb/index_client.go
  8. 12
      pkg/storage/stores/tsdb/index_client_test.go
  9. 2
      pkg/storage/stores/tsdb/querier.go
  10. 5
      pkg/storage/stores/tsdb/querier_test.go
  11. 88
      pkg/storage/stores/tsdb/single_file_index.go
  12. 53
      pkg/storage/stores/tsdb/single_file_index_test.go

@ -48,7 +48,7 @@ func (i indexProcessor) OpenCompactedIndexFile(ctx context.Context, path, tableN
}()
builder := NewBuilder()
err = indexFile.(*TSDBFile).Index.(*TSDBIndex).forSeries(ctx, nil, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
err = indexFile.(*TSDBFile).Index.(*TSDBIndex).forSeries(ctx, nil, 0, math.MaxInt64, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
builder.AddSeries(lbls.Copy(), fp, chks)
}, labels.MustNewMatcher(labels.MatchEqual, "", ""))
if err != nil {
@ -197,7 +197,7 @@ func setupBuilder(ctx context.Context, userID string, sourceIndexSet compactor.I
// add users index from multi-tenant indexes to the builder
for _, idx := range multiTenantIndexes {
err := idx.(*TSDBFile).Index.(*TSDBIndex).forSeries(ctx, nil, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
err := idx.(*TSDBFile).Index.(*TSDBIndex).forSeries(ctx, nil, 0, math.MaxInt64, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
builder.AddSeries(withoutTenantLabel(lbls.Copy()), fp, chks)
}, withTenantLabelMatcher(userID, []*labels.Matcher{})...)
if err != nil {
@ -229,7 +229,7 @@ func setupBuilder(ctx context.Context, userID string, sourceIndexSet compactor.I
}
}()
err = indexFile.(*TSDBFile).Index.(*TSDBIndex).forSeries(ctx, nil, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
err = indexFile.(*TSDBFile).Index.(*TSDBIndex).forSeries(ctx, nil, 0, math.MaxInt64, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
builder.AddSeries(lbls.Copy(), fp, chks)
}, labels.MustNewMatcher(labels.MatchEqual, "", ""))
if err != nil {

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"math"
"path"
"path/filepath"
"strings"
@ -192,12 +193,13 @@ func buildStream(lbls labels.Labels, chunks index.ChunkMetas, userLabel string)
}
}
// buildChunkMetas builds 1ms wide chunk metas from -> to.
func buildChunkMetas(from, to int64) index.ChunkMetas {
var chunkMetas index.ChunkMetas
for i := from; i <= to; i++ {
chunkMetas = append(chunkMetas, index.ChunkMeta{
MinTime: i,
MaxTime: i,
MaxTime: i + 1,
Checksum: uint32(i),
Entries: 1,
})
@ -594,7 +596,7 @@ func TestCompactor_Compact(t *testing.T) {
require.NoError(t, err)
actualChunks = map[string]index.ChunkMetas{}
err = indexFile.(*TSDBFile).Index.(*TSDBIndex).forSeries(context.Background(), nil, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
err = indexFile.(*TSDBFile).Index.(*TSDBIndex).forSeries(context.Background(), nil, 0, math.MaxInt64, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
actualChunks[lbls.String()] = chks
}, labels.MustNewMatcher(labels.MatchEqual, "", ""))
require.NoError(t, err)
@ -808,7 +810,7 @@ func TestCompactedIndex(t *testing.T) {
require.NoError(t, err)
foundChunks := map[string]index.ChunkMetas{}
err = indexFile.(*TSDBFile).Index.(*TSDBIndex).forSeries(context.Background(), nil, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
err = indexFile.(*TSDBFile).Index.(*TSDBIndex).forSeries(context.Background(), nil, 0, math.MaxInt64, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
foundChunks[lbls.String()] = append(index.ChunkMetas{}, chks...)
}, labels.MustNewMatcher(labels.MatchEqual, "", ""))
require.NoError(t, err)

@ -3,6 +3,7 @@ package tsdb
import (
"context"
"fmt"
"math"
"os"
"path/filepath"
"sort"
@ -747,7 +748,7 @@ func (t *tenantHeads) forAll(fn func(user string, ls labels.Labels, fp uint64, c
chks []index.ChunkMeta
)
fp, err := idx.Series(ps.At(), &ls, &chks)
fp, err := idx.Series(ps.At(), 0, math.MaxInt64, &ls, &chks)
if err != nil {
return errors.Wrapf(err, "iterating postings for tenant: %s", user)

@ -17,6 +17,7 @@ import (
"math"
"sort"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
@ -121,7 +122,7 @@ func (h *headIndexReader) Postings(name string, shard *index.ShardAnnotation, va
}
// Series returns the series for the given reference.
func (h *headIndexReader) Series(ref storage.SeriesRef, lbls *labels.Labels, chks *[]index.ChunkMeta) (uint64, error) {
func (h *headIndexReader) Series(ref storage.SeriesRef, from int64, through int64, lbls *labels.Labels, chks *[]index.ChunkMeta) (uint64, error) {
s := h.head.series.getByID(uint64(ref))
if s == nil {
@ -130,8 +131,16 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, lbls *labels.Labels, chk
}
*lbls = append((*lbls)[:0], s.ls...)
queryBounds := newBounds(model.Time(from), model.Time(through))
*chks = (*chks)[:0]
s.Lock()
*chks = append((*chks)[:0], s.chks...)
for _, chk := range s.chks {
if !Overlap(chk, queryBounds) {
continue
}
*chks = append(*chks, chk)
}
s.Unlock()
return s.fp, nil

@ -26,6 +26,8 @@ import (
"os"
"path/filepath"
"sort"
"sync"
"time"
"unsafe"
"github.com/pkg/errors"
@ -54,6 +56,8 @@ const (
// store every 1024 series' fingerprints in the fingerprint offsets table
fingerprintInterval = 1 << 10
millisecondsInHour = int64(time.Hour / time.Millisecond)
)
type indexWriterStage uint8
@ -1328,7 +1332,10 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
return nil, errors.Wrap(err, "loading fingerprint offsets")
}
r.dec = &Decoder{LookupSymbol: r.lookupSymbol}
r.dec = &Decoder{
LookupSymbol: r.lookupSymbol,
chunksSample: map[storage.SeriesRef]*chunkSamples{},
}
return r, nil
}
@ -1722,7 +1729,7 @@ func (r *Reader) LabelValueFor(id storage.SeriesRef, label string) (string, erro
}
// Series reads the series with the given ID and writes its labels and chunks into lbls and chks.
func (r *Reader) Series(id storage.SeriesRef, lbls *labels.Labels, chks *[]ChunkMeta) (uint64, error) {
func (r *Reader) Series(id storage.SeriesRef, from int64, through int64, lbls *labels.Labels, chks *[]ChunkMeta) (uint64, error) {
offset := id
// In version 2 series IDs are no longer exact references but series are 16-byte padded
// and the ID is the multiple of 16 of the actual position.
@ -1734,7 +1741,7 @@ func (r *Reader) Series(id storage.SeriesRef, lbls *labels.Labels, chks *[]Chunk
return 0, d.Err()
}
fprint, err := r.dec.Series(d.Get(), lbls, chks)
fprint, err := r.dec.Series(d.Get(), id, from, through, lbls, chks)
if err != nil {
return 0, errors.Wrap(err, "read series")
}
@ -1892,12 +1899,56 @@ func (s *stringListIter) Next() bool {
func (s stringListIter) At() string { return s.cur }
func (s stringListIter) Err() error { return nil }
type chunkSample struct {
largestMaxt int64 // holds largest chunk end time we have seen so far. In other words all the earlier chunks have maxt <= largestMaxt
idx int // index of the chunk in the list which helps with determining position of sampled chunk
offset int // offset is relative to beginning chunk info block i.e after series labels info and chunk count etc
prevChunkMaxt int64 // chunk times are stored as deltas. This is used for calculating mint of sampled chunk
}
type chunkSamples struct {
sync.RWMutex
chunks []chunkSample
}
func newChunkSamples() *chunkSamples {
return &chunkSamples{
chunks: make([]chunkSample, 0, 30),
}
}
// getChunkSampleForQueryStarting returns back chunk sample which has largest "largestMaxt" that is less than given query start time.
// In other words, return back chunk sample which skips all the chunks that end before query start time.
// If query start is before all "largestMaxt", we would return first chunk sample.
// If query start is after all "largestMaxt", we would return nil.
func (c *chunkSamples) getChunkSampleForQueryStarting(ts int64) *chunkSample {
c.RLock()
defer c.RUnlock()
// first find position of chunk sample which has smallest "largestMaxt" after ts
i := sort.Search(len(c.chunks), func(i int) bool {
return c.chunks[i].largestMaxt >= ts
})
if i >= len(c.chunks) {
return nil
}
// there could be more chunks of interest between this and previous sample, so we should process chunks from previous sample
if i > 0 {
i--
}
return &c.chunks[i]
}
// Decoder provides decoding methods for the v1 and v2 index file format.
//
// It currently does not contain decoding methods for all entry types but can be extended
// by them if there's demand.
type Decoder struct {
LookupSymbol func(uint32) (string, error)
LookupSymbol func(uint32) (string, error)
chunksSample map[storage.SeriesRef]*chunkSamples
chunksSampleMtx sync.RWMutex
}
// Postings returns a postings list for b and its number of elements.
@ -1966,8 +2017,79 @@ func (dec *Decoder) LabelValueFor(b []byte, label string) (string, error) {
return "", d.Err()
}
func (dec *Decoder) getOrCreateChunksSample(d encoding.Decbuf, seriesRef storage.SeriesRef, numChunks int) (*chunkSamples, error) {
dec.chunksSampleMtx.Lock()
sample, ok := dec.chunksSample[seriesRef]
if ok {
dec.chunksSampleMtx.Unlock()
return sample, nil
}
sample = newChunkSamples()
dec.chunksSample[seriesRef] = sample
sample.Lock()
defer sample.Unlock()
dec.chunksSampleMtx.Unlock()
if err := buildChunkSamples(d, numChunks, sample); err != nil {
return nil, err
}
return sample, nil
}
// buildChunkSamples samples chunks considering maxt of the indexed chunks.
// It would always sample first and last chunk for returning earlier when query falls out of range on either ends.
// First chunk onwards it would only sample chunks that have maxt greater by at least 1h than previous sampled chunk's maxt.
func buildChunkSamples(d encoding.Decbuf, numChunks int, info *chunkSamples) error {
bufLen := d.Len()
chunkPos := bufLen - d.Len()
chunkMeta := &ChunkMeta{}
if err := readChunkMeta(&d, 0, chunkMeta); err != nil {
return errors.Wrapf(d.Err(), "read meta for chunk %d", 0)
}
info.chunks = append(info.chunks, chunkSample{
largestMaxt: chunkMeta.MaxTime,
idx: 0,
offset: chunkPos,
})
t0 := chunkMeta.MaxTime
largestMaxt := chunkMeta.MaxTime
prevLargestMaxt := largestMaxt
for i := 1; i < numChunks; i++ {
chunkPos = bufLen - d.Len()
if err := readChunkMeta(&d, t0, chunkMeta); err != nil {
return errors.Wrapf(d.Err(), "read meta for chunk %d", i)
}
if chunkMeta.MaxTime > largestMaxt {
largestMaxt = chunkMeta.MaxTime
}
if d.Err() != nil {
return errors.Wrapf(d.Err(), "read meta for chunk %d", i)
}
if i == numChunks-1 || largestMaxt-prevLargestMaxt >= millisecondsInHour {
prevLargestMaxt = largestMaxt
info.chunks = append(info.chunks, chunkSample{
idx: i,
prevChunkMaxt: t0,
largestMaxt: largestMaxt,
offset: chunkPos,
})
}
t0 = chunkMeta.MaxTime
}
return d.Err()
}
// Series decodes a series entry from the given byte slice into lset and chks.
func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]ChunkMeta) (uint64, error) {
func (dec *Decoder) Series(b []byte, seriesRef storage.SeriesRef, from int64, through int64, lbls *labels.Labels, chks *[]ChunkMeta) (uint64, error) {
*lbls = (*lbls)[:0]
*chks = (*chks)[:0]
@ -2003,46 +2125,64 @@ func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]ChunkMeta) (ui
return 0, d.Err()
}
t0 := d.Varint64()
maxt := int64(d.Uvarint64()) + t0
kb := uint32(d.Uvarint())
entries := uint32(d.Uvarint64())
checksum := d.Be32()
chunksSample, err := dec.getOrCreateChunksSample(encoding.DecWrap(tsdb_enc.Decbuf{B: d.Get()}), seriesRef, k)
if err != nil {
return 0, err
}
*chks = append(*chks, ChunkMeta{
Checksum: checksum,
MinTime: t0,
MaxTime: maxt,
KB: kb,
Entries: entries,
})
t0 = maxt
for i := 1; i < k; i++ {
// Decode the diff against previous chunk as varint
// instead of uvarint because chunks may overlap
mint := d.Varint64() + t0
maxt := int64(d.Uvarint64()) + mint
kb := uint32(d.Uvarint())
entries := uint32(d.Uvarint64())
checksum := d.Be32()
t0 = maxt
cs := chunksSample.getChunkSampleForQueryStarting(from)
if cs == nil {
return fprint, nil
}
d.Skip(cs.offset)
if d.Err() != nil {
return 0, errors.Wrapf(d.Err(), "read meta for chunk %d", i)
chunkMeta := &ChunkMeta{}
if err := readChunkMeta(&d, cs.prevChunkMaxt, chunkMeta); err != nil {
return 0, errors.Wrapf(d.Err(), "read meta for chunk %d", cs.idx)
}
if overlap(from, through, chunkMeta.MinTime, chunkMeta.MaxTime) {
*chks = append(*chks, *chunkMeta)
}
t0 := chunkMeta.MaxTime
for i := cs.idx + 1; i < k; i++ {
if err := readChunkMeta(&d, t0, chunkMeta); err != nil {
return 0, errors.Wrapf(d.Err(), "read meta for chunk %d", cs.idx)
}
t0 = chunkMeta.MaxTime
*chks = append(*chks, ChunkMeta{
Checksum: checksum,
MinTime: mint,
MaxTime: maxt,
KB: kb,
Entries: entries,
})
if !overlap(from, through, chunkMeta.MinTime, chunkMeta.MaxTime) {
continue
}
if chunkMeta.MinTime >= through {
break
}
*chks = append(*chks, *chunkMeta)
}
return fprint, d.Err()
}
func readChunkMeta(d *encoding.Decbuf, prevChunkMaxt int64, chunkMeta *ChunkMeta) error {
// Decode the diff against previous chunk as varint
// instead of uvarint because chunks may overlap
chunkMeta.MinTime = d.Varint64() + prevChunkMaxt
chunkMeta.MaxTime = int64(d.Uvarint64()) + chunkMeta.MinTime
chunkMeta.KB = uint32(d.Uvarint())
chunkMeta.Entries = uint32(d.Uvarint64())
chunkMeta.Checksum = d.Be32()
if d.Err() != nil {
return d.Err()
}
return nil
}
func yoloString(b []byte) string {
return *((*string)(unsafe.Pointer(&b)))
}
func overlap(aFrom, aThrough, bFrom, bThrough int64) bool {
return aFrom < bThrough && aThrough > bFrom
}

@ -17,20 +17,23 @@ import (
"context"
"fmt"
"hash/crc32"
"math"
"math/rand"
"os"
"path/filepath"
"sort"
"testing"
"time"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"github.com/grafana/loki/pkg/util/encoding"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/encoding"
tsdb_enc "github.com/prometheus/prometheus/tsdb/encoding"
"github.com/prometheus/prometheus/util/testutil"
)
@ -185,7 +188,7 @@ func TestIndexRW_Postings(t *testing.T) {
var c []ChunkMeta
for i := 0; p.Next(); i++ {
_, err := ir.Series(p.At(), &l, &c)
_, err := ir.Series(p.At(), 0, math.MaxInt64, &l, &c)
require.NoError(t, err)
require.Equal(t, 0, len(c))
@ -200,7 +203,7 @@ func TestIndexRW_Postings(t *testing.T) {
return errors.Errorf("unexpected key length for label indices table %d", len(key))
}
d := encoding.NewDecbufAt(ir.b, int(off), castagnoliTable)
d := tsdb_enc.NewDecbufAt(ir.b, int(off), castagnoliTable)
vals := []string{}
nc := d.Be32int()
if nc != 1 {
@ -304,7 +307,7 @@ func TestPostingsMany(t *testing.T) {
var lbls labels.Labels
var metas []ChunkMeta
for it.Next() {
_, err := ir.Series(it.At(), &lbls, &metas)
_, err := ir.Series(it.At(), 0, math.MaxInt64, &lbls, &metas)
require.NoError(t, err)
got = append(got, lbls.Get("i"))
}
@ -420,7 +423,7 @@ func TestPersistence_index_e2e(t *testing.T) {
ref := gotp.At()
_, err := ir.Series(ref, &lset, &chks)
_, err := ir.Series(ref, 0, math.MaxInt64, &lset, &chks)
require.NoError(t, err)
err = mi.Series(expp.At(), &explset, &expchks)
@ -467,7 +470,7 @@ func TestPersistence_index_e2e(t *testing.T) {
func TestDecbufUvarintWithInvalidBuffer(t *testing.T) {
b := RealByteSlice([]byte{0x81, 0x81, 0x81, 0x81, 0x81, 0x81})
db := encoding.NewDecbufUvarintAt(b, 0, castagnoliTable)
db := tsdb_enc.NewDecbufUvarintAt(b, 0, castagnoliTable)
require.Error(t, db.Err())
}
@ -544,3 +547,395 @@ func TestDecoder_Postings_WrongInput(t *testing.T) {
_, _, err := (&Decoder{}).Postings([]byte("the cake is a lie"))
require.Error(t, err)
}
func TestDecoder_ChunkSamples(t *testing.T) {
dir := t.TempDir()
lbls := []labels.Labels{
{{Name: "fizz", Value: "buzz"}},
{{Name: "ping", Value: "pong"}},
}
symbols := map[string]struct{}{}
for _, lset := range lbls {
for _, l := range lset {
symbols[l.Name] = struct{}{}
symbols[l.Value] = struct{}{}
}
}
now := model.Now()
for name, tc := range map[string]struct {
chunkMetas []ChunkMeta
expectedChunkSamples []chunkSample
}{
"no overlapping chunks": {
chunkMetas: []ChunkMeta{
{
MinTime: int64(now),
MaxTime: int64(now.Add(30 * time.Minute)),
},
{
MinTime: int64(now.Add(40 * time.Minute)),
MaxTime: int64(now.Add(80 * time.Minute)),
},
{
MinTime: int64(now.Add(90 * time.Minute)),
MaxTime: int64(now.Add(120 * time.Minute)),
},
{
MinTime: int64(now.Add(130 * time.Minute)),
MaxTime: int64(now.Add(150 * time.Minute)),
},
},
expectedChunkSamples: []chunkSample{
{
largestMaxt: int64(now.Add(30 * time.Minute)),
idx: 0,
prevChunkMaxt: 0,
},
{
largestMaxt: int64(now.Add(120 * time.Minute)),
idx: 2,
prevChunkMaxt: int64(now.Add(80 * time.Minute)),
},
{
largestMaxt: int64(now.Add(150 * time.Minute)),
idx: 3,
prevChunkMaxt: int64(now.Add(120 * time.Minute)),
},
},
},
"overlapping chunks": {
chunkMetas: []ChunkMeta{
{
MinTime: int64(now),
MaxTime: int64(now.Add(30 * time.Minute)),
},
{
MinTime: int64(now.Add(20 * time.Minute)),
MaxTime: int64(now.Add(80 * time.Minute)),
},
{
MinTime: int64(now.Add(70 * time.Minute)),
MaxTime: int64(now.Add(120 * time.Minute)),
},
{
MinTime: int64(now.Add(100 * time.Minute)),
MaxTime: int64(now.Add(110 * time.Minute)),
},
},
expectedChunkSamples: []chunkSample{
{
largestMaxt: int64(now.Add(30 * time.Minute)),
idx: 0,
prevChunkMaxt: 0,
},
{
largestMaxt: int64(now.Add(120 * time.Minute)),
idx: 2,
prevChunkMaxt: int64(now.Add(80 * time.Minute)),
},
{
largestMaxt: int64(now.Add(120 * time.Minute)),
idx: 3,
prevChunkMaxt: int64(now.Add(120 * time.Minute)),
},
},
},
"first chunk overlapping all chunks": {
chunkMetas: []ChunkMeta{
{
MinTime: int64(now),
MaxTime: int64(now.Add(180 * time.Minute)),
},
{
MinTime: int64(now.Add(20 * time.Minute)),
MaxTime: int64(now.Add(80 * time.Minute)),
},
{
MinTime: int64(now.Add(70 * time.Minute)),
MaxTime: int64(now.Add(120 * time.Minute)),
},
{
MinTime: int64(now.Add(110 * time.Minute)),
MaxTime: int64(now.Add(150 * time.Minute)),
},
},
expectedChunkSamples: []chunkSample{
{
largestMaxt: int64(now.Add(180 * time.Minute)),
idx: 0,
prevChunkMaxt: 0,
},
{
largestMaxt: int64(now.Add(180 * time.Minute)),
idx: 3,
prevChunkMaxt: int64(now.Add(120 * time.Minute)),
},
},
},
"large gaps between chunks": {
chunkMetas: []ChunkMeta{
{
MinTime: int64(now),
MaxTime: int64(now.Add(30 * time.Minute)),
},
{
MinTime: int64(now.Add(200 * time.Minute)),
MaxTime: int64(now.Add(280 * time.Minute)),
},
{
MinTime: int64(now.Add(500 * time.Minute)),
MaxTime: int64(now.Add(520 * time.Minute)),
},
{
MinTime: int64(now.Add(800 * time.Minute)),
MaxTime: int64(now.Add(835 * time.Minute)),
},
},
expectedChunkSamples: []chunkSample{
{
largestMaxt: int64(now.Add(30 * time.Minute)),
idx: 0,
prevChunkMaxt: 0,
},
{
largestMaxt: int64(now.Add(280 * time.Minute)),
idx: 1,
prevChunkMaxt: int64(now.Add(30 * time.Minute)),
},
{
largestMaxt: int64(now.Add(520 * time.Minute)),
idx: 2,
prevChunkMaxt: int64(now.Add(280 * time.Minute)),
},
{
largestMaxt: int64(now.Add(835 * time.Minute)),
idx: 3,
prevChunkMaxt: int64(now.Add(520 * time.Minute)),
},
},
},
} {
t.Run(name, func(t *testing.T) {
iw, err := NewWriter(context.Background(), filepath.Join(dir, name))
require.NoError(t, err)
syms := []string{}
for s := range symbols {
syms = append(syms, s)
}
sort.Strings(syms)
for _, s := range syms {
require.NoError(t, iw.AddSymbol(s))
}
for i, l := range lbls {
err = iw.AddSeries(storage.SeriesRef(i), l, model.Fingerprint(l.Hash()), tc.chunkMetas...)
require.NoError(t, err)
}
err = iw.Close()
require.NoError(t, err)
ir, err := NewFileReader(filepath.Join(dir, name))
require.NoError(t, err)
postings, err := ir.Postings("fizz", nil, "buzz")
require.NoError(t, err)
require.True(t, postings.Next())
var lset labels.Labels
var chks []ChunkMeta
// there should be no chunk samples
require.Nil(t, ir.dec.chunksSample[postings.At()])
// read series so that chunk samples get built
_, err = ir.Series(postings.At(), 0, math.MaxInt64, &lset, &chks)
require.NoError(t, err)
require.Equal(t, tc.chunkMetas, chks)
require.Equal(t, lset, lbls[0])
// there should be chunk samples for only the series we read
require.Len(t, ir.dec.chunksSample, 1)
require.NotNil(t, ir.dec.chunksSample[postings.At()])
require.Len(t, ir.dec.chunksSample[postings.At()].chunks, len(tc.expectedChunkSamples))
// build decoder for the series we read to verify the samples
offset := postings.At() * 16
d := encoding.DecWrap(tsdb_enc.NewDecbufUvarintAt(ir.b, int(offset), castagnoliTable))
require.NoError(t, d.Err())
// read chunk metadata to positing the decoder at the beginning of first chunk
d.Be64()
k := d.Uvarint()
for i := 0; i < k; i++ {
d.Uvarint()
d.Uvarint()
}
require.Equal(t, len(tc.chunkMetas), d.Uvarint())
for i, cs := range ir.dec.chunksSample[postings.At()].chunks {
require.Equal(t, tc.expectedChunkSamples[i].idx, cs.idx)
require.Equal(t, tc.expectedChunkSamples[i].largestMaxt, cs.largestMaxt)
require.Equal(t, tc.expectedChunkSamples[i].prevChunkMaxt, cs.prevChunkMaxt)
dw := encoding.DecWrap(tsdb_enc.Decbuf{B: d.Get()})
dw.Skip(cs.offset)
chunkMeta := ChunkMeta{}
require.NoError(t, readChunkMeta(&dw, cs.prevChunkMaxt, &chunkMeta))
require.Equal(t, tc.chunkMetas[tc.expectedChunkSamples[i].idx], chunkMeta)
}
require.NoError(t, ir.Close())
})
}
}
func TestChunkSamples_getChunkSampleForQueryStarting(t *testing.T) {
for name, tc := range map[string]struct {
chunkSamples *chunkSamples
queryMint int64
expectedChunkSampleIdx int
}{
"mint greater than largestMaxt": {
chunkSamples: &chunkSamples{
chunks: []chunkSample{
{
largestMaxt: 100,
idx: 0,
offset: 0,
prevChunkMaxt: 0,
},
{
largestMaxt: 200,
idx: 5,
offset: 5,
prevChunkMaxt: 50,
},
},
},
queryMint: 250,
expectedChunkSampleIdx: -1,
},
"mint smaller than first largestMaxt": {
chunkSamples: &chunkSamples{
chunks: []chunkSample{
{
largestMaxt: 100,
idx: 0,
offset: 0,
prevChunkMaxt: 0,
},
{
largestMaxt: 200,
idx: 5,
offset: 5,
prevChunkMaxt: 50,
},
},
},
queryMint: 50,
expectedChunkSampleIdx: 0,
},
"intermediate chunk sample": {
chunkSamples: &chunkSamples{
chunks: []chunkSample{
{
largestMaxt: 100,
idx: 0,
offset: 0,
prevChunkMaxt: 0,
},
{
largestMaxt: 200,
idx: 5,
offset: 5,
prevChunkMaxt: 50,
},
{
largestMaxt: 350,
idx: 7,
offset: 7,
prevChunkMaxt: 150,
},
{
largestMaxt: 500,
idx: 9,
offset: 9,
prevChunkMaxt: 250,
},
},
},
queryMint: 250,
expectedChunkSampleIdx: 1,
},
"mint matching samples largestMaxt": {
chunkSamples: &chunkSamples{
chunks: []chunkSample{
{
largestMaxt: 100,
idx: 0,
offset: 0,
prevChunkMaxt: 0,
},
{
largestMaxt: 200,
idx: 5,
offset: 5,
prevChunkMaxt: 50,
},
{
largestMaxt: 350,
idx: 7,
offset: 7,
prevChunkMaxt: 150,
},
{
largestMaxt: 500,
idx: 9,
offset: 9,
prevChunkMaxt: 250,
},
},
},
queryMint: 350,
expectedChunkSampleIdx: 1,
},
"same chunk sampled": {
chunkSamples: &chunkSamples{
chunks: []chunkSample{
{
largestMaxt: 100,
idx: 0,
offset: 0,
prevChunkMaxt: 0,
},
{
largestMaxt: 100,
idx: 0,
offset: 0,
prevChunkMaxt: 0,
},
},
},
queryMint: 50,
expectedChunkSampleIdx: 0,
},
} {
t.Run(name, func(t *testing.T) {
chunkSample := tc.chunkSamples.getChunkSampleForQueryStarting(tc.queryMint)
if tc.expectedChunkSampleIdx == -1 {
require.Nil(t, chunkSample)
return
}
require.NotNil(t, chunkSample)
require.Equal(t, tc.chunkSamples.chunks[tc.expectedChunkSampleIdx], *chunkSample)
})
}
}

@ -204,15 +204,13 @@ func (c *IndexClient) Stats(ctx context.Context, userID string, from, through mo
acc = &stats.Stats{}
}
queryBounds := newBounds(from, through)
for idx, interval := range intervals {
if err := c.idx.Stats(ctx, userID, interval.Start, interval.End, acc, shard, func(chk index.ChunkMeta) bool {
// for the first split, purely do overlap check to also include chunks having
// start time earlier than start time of the table interval we are querying.
// for all other splits, consider only chunks that have from >= interval.Start
// so that we start after the start time of the index table we are querying.
if Overlap(queryBounds, chk) && (idx == 0 || chk.From() >= interval.Start) {
if idx == 0 || chk.From() >= interval.Start {
return true
}
return false

@ -163,8 +163,8 @@ func TestIndexClient_Stats(t *testing.T) {
Start: indexStartYesterday,
End: indexStartToday + 1000,
},
expectedNumChunks: 298, // 2 chunks not included at indexStartYesterday since start time is not inclusive
expectedNumEntries: 298,
expectedNumChunks: 300,
expectedNumEntries: 300,
expectedNumStreams: 2,
},
{
@ -173,8 +173,8 @@ func TestIndexClient_Stats(t *testing.T) {
Start: indexStartToday,
End: indexStartToday + 1000,
},
expectedNumChunks: 99, // 1 chunk not included at indexStartToday since start time is not inclusive
expectedNumEntries: 99,
expectedNumChunks: 100,
expectedNumEntries: 100,
expectedNumStreams: 1,
},
{
@ -183,8 +183,8 @@ func TestIndexClient_Stats(t *testing.T) {
Start: indexStartToday + 50,
End: indexStartToday + 60,
},
expectedNumChunks: 9, // start and end are not inclusive
expectedNumEntries: 9,
expectedNumChunks: 10, // end time not inclusive
expectedNumEntries: 10,
expectedNumStreams: 1,
},
{

@ -66,7 +66,7 @@ type IndexReader interface {
// Series populates the given labels and chunk metas for the series identified
// by the reference.
// Returns storage.ErrNotFound if the ref does not resolve to a known series.
Series(ref storage.SeriesRef, lset *labels.Labels, chks *[]index.ChunkMeta) (uint64, error)
Series(ref storage.SeriesRef, from int64, through int64, lset *labels.Labels, chks *[]index.ChunkMeta) (uint64, error)
// LabelNames returns all the unique label names present in the index in sorted order.
LabelNames(matchers ...*labels.Matcher) ([]string, error)

@ -2,6 +2,7 @@ package tsdb
import (
"context"
"math"
"testing"
"time"
@ -113,12 +114,12 @@ func TestQueryIndex(t *testing.T) {
)
require.True(t, p.Next())
_, err = reader.Series(p.At(), &ls, &chks)
_, err = reader.Series(p.At(), 0, math.MaxInt64, &ls, &chks)
require.Nil(t, err)
require.Equal(t, cases[0].labels.String(), ls.String())
require.Equal(t, cases[0].chunks, chks)
require.True(t, p.Next())
_, err = reader.Series(p.At(), &ls, &chks)
_, err = reader.Series(p.At(), 0, math.MaxInt64, &ls, &chks)
require.Nil(t, err)
require.Equal(t, cases[1].labels.String(), ls.String())
require.Equal(t, cases[1].chunks, chks)

@ -102,12 +102,7 @@ func (i *TSDBIndex) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) {
// fn must NOT capture it's arguments. They're reused across series iterations and returned to
// a pool after completion.
func (i *TSDBIndex) forSeries(
ctx context.Context,
shard *index.ShardAnnotation,
fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta),
matchers ...*labels.Matcher,
) error {
func (i *TSDBIndex) forSeries(ctx context.Context, shard *index.ShardAnnotation, from model.Time, through model.Time, fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta), matchers ...*labels.Matcher) error {
p, err := PostingsForMatchers(i.reader, shard, matchers...)
if err != nil {
return err
@ -123,7 +118,7 @@ func (i *TSDBIndex) forSeries(
}
for p.Next() {
hash, err := i.reader.Series(p.At(), &ls, &chks)
hash, err := i.reader.Series(p.At(), int64(from), int64(through), &ls, &chks)
if err != nil {
return err
}
@ -143,32 +138,23 @@ func (i *TSDBIndex) forSeries(
}
func (i *TSDBIndex) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error) {
queryBounds := newBounds(from, through)
if res == nil {
res = ChunkRefsPool.Get()
}
res = res[:0]
if err := i.forSeries(ctx, shard,
func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
// TODO(owen-d): use logarithmic approach
for _, chk := range chks {
if err := i.forSeries(ctx, shard, from, through, func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
for _, chk := range chks {
// current chunk is outside the range of this request
if !Overlap(queryBounds, chk) {
continue
}
res = append(res, ChunkRef{
User: userID, // assumed to be the same, will be enforced by caller.
Fingerprint: fp,
Start: chk.From(),
End: chk.Through(),
Checksum: chk.Checksum,
})
}
},
matchers...); err != nil {
res = append(res, ChunkRef{
User: userID, // assumed to be the same, will be enforced by caller.
Fingerprint: fp,
Start: chk.From(),
End: chk.Through(),
Checksum: chk.Checksum,
})
}
}, matchers...); err != nil {
return nil, err
}
@ -176,27 +162,20 @@ func (i *TSDBIndex) GetChunkRefs(ctx context.Context, userID string, from, throu
}
func (i *TSDBIndex) Series(ctx context.Context, _ string, from, through model.Time, res []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error) {
queryBounds := newBounds(from, through)
if res == nil {
res = SeriesPool.Get()
}
res = res[:0]
if err := i.forSeries(ctx, shard,
func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
// TODO(owen-d): use logarithmic approach
for _, chk := range chks {
if Overlap(queryBounds, chk) {
// this series has at least one chunk in the desired range
res = append(res, Series{
Labels: ls.Copy(),
Fingerprint: fp,
})
break
}
}
},
matchers...); err != nil {
if err := i.forSeries(ctx, shard, from, through, func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
if len(chks) == 0 {
return
}
res = append(res, Series{
Labels: ls.Copy(),
Fingerprint: fp,
})
}, matchers...); err != nil {
return nil, err
}
@ -233,21 +212,18 @@ func (i *TSDBIndex) Identifier(string) SingleTenantTSDBIdentifier {
}
func (i *TSDBIndex) Stats(ctx context.Context, userID string, from, through model.Time, acc IndexStatsAccumulator, shard *index.ShardAnnotation, shouldIncludeChunk shouldIncludeChunk, matchers ...*labels.Matcher) error {
if err := i.forSeries(ctx, shard,
func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
// TODO(owen-d): use logarithmic approach
var addedStream bool
for _, chk := range chks {
if shouldIncludeChunk(chk) {
if !addedStream {
acc.AddStream(fp)
addedStream = true
}
acc.AddChunk(fp, chk)
if err := i.forSeries(ctx, shard, from, through, func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
var addedStream bool
for _, chk := range chks {
if shouldIncludeChunk(chk) {
if !addedStream {
acc.AddStream(fp)
addedStream = true
}
acc.AddChunk(fp, chk)
}
},
matchers...); err != nil {
}
}, matchers...); err != nil {
return err
}

@ -2,8 +2,10 @@ package tsdb
import (
"context"
"math/rand"
"sort"
"testing"
"time"
"github.com/go-kit/log"
"github.com/prometheus/common/model"
@ -201,3 +203,54 @@ func TestSingleIdx(t *testing.T) {
}
}
func BenchmarkTSDBIndex_GetChunkRefs(b *testing.B) {
now := model.Now()
queryFrom, queryThrough := now.Add(3*time.Hour).Add(time.Millisecond), now.Add(5*time.Hour).Add(-time.Millisecond)
queryBounds := newBounds(queryFrom, queryThrough)
numChunksToMatch := 0
var chunkMetas []index.ChunkMeta
// build a chunk for every second with randomized chunk length
for from, through := now, now.Add(24*time.Hour); from <= through; from = from.Add(time.Second) {
// randomize chunk length between 1-120 mins
chunkLenMin := rand.Intn(120)
if chunkLenMin == 0 {
chunkLenMin = 1
}
chunkMeta := index.ChunkMeta{
MinTime: int64(from),
MaxTime: int64(from.Add(time.Duration(chunkLenMin) * time.Minute)),
Checksum: uint32(from),
Entries: 1,
}
chunkMetas = append(chunkMetas, chunkMeta)
if Overlap(chunkMeta, queryBounds) {
numChunksToMatch++
}
}
tempDir := b.TempDir()
tsdbIndex := BuildIndex(b, tempDir, []LoadableSeries{
{
Labels: mustParseLabels(`{foo="bar", fizz="buzz"}`),
Chunks: chunkMetas,
},
{
Labels: mustParseLabels(`{foo="bar", ping="pong"}`),
Chunks: chunkMetas,
},
{
Labels: mustParseLabels(`{foo1="bar1", ping="pong"}`),
Chunks: chunkMetas,
},
})
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
chkRefs, err := tsdbIndex.GetChunkRefs(context.Background(), "fake", queryFrom, queryThrough, nil, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
require.NoError(b, err)
require.Len(b, chkRefs, numChunksToMatch*2)
}
}

Loading…
Cancel
Save