From 4e893a0a88b2c8c3ea9d1d8b1037f54d8fb7f1af Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Thu, 23 Mar 2023 20:58:54 +0530 Subject: [PATCH] tsdb: sample chunk info from tsdb index to limit the amount of chunkrefs we read from index (#8742) **What this PR does / why we need it**: Previously we used to read the info of all the chunks from the index and then filter it out in a layer above within the tsdb code. This wastes a lot of resources when there are too many chunks in the index, but we just need a few of them based on the query range. Before jumping into how and why I went with chunk sampling, here are some points to consider: * Chunks in the index are sorted by the start time of the chunk. Since this does not tell us much about the end time of the chunks, we can only skip chunks that start after the end time of the query, which still would make us process lots of chunks when the query touches chunks that are near the end of the table boundary. * Data is written to tsdb with variable length encoding. This means we can't skip/jump chunks since each chunk info might vary in the number of bytes we write. Here is how I have implemented the sampling approach: * Chunks are sampled considering their end times from the index and stored in memory. * Here is how `chunkSample` is defined: ``` type chunkSample struct { largestMaxt int64 // holds largest chunk end time we have seen so far. In other words all the earlier chunks have maxt <= largestMaxt idx int // index of the chunk in the list which helps with determining position of sampled chunk offset int // offset is relative to beginning chunk info block i.e after series labels info and chunk count etc prevChunkMaxt int64 // chunk times are stored as deltas. This is used for calculating mint of sampled chunk } ``` * When a query comes in, we will find `chunkSample`, which has the largest "largestMaxt" that is less than the given query start time. In other words, find a chunk sample which skips all/most of the chunks that end before the query start time. * Once we have found a chunk sample which skips all/most of the chunks that end before the query start, we will sequentially go through chunks and consider only the once that overlap with the query range. We will stop processing chunks as soon as we see a chunk that starts after the end time of the query since the chunks are sorted by start time. * Sampling of chunks is done lazily for only the series that are queried, so we do not waste any resources on sampling series that are not queried. * To avoid sampling too many chunks, I am sampling chunks at `1h` steps i.e given a sampled chunk with chunk end time `t`, the next chunk would be sampled with end time >= `t + 1h`. This means typically, we should have ~28 chunks sampled for each series queried from each index file, considering 2h default chunk length and chunks overlapping multiple tables. Here are the benchmark results showing the difference it makes: ``` benchmark old ns/op new ns/op delta BenchmarkTSDBIndex_GetChunkRefs-10 12420741 4764309 -61.64% BenchmarkTSDBIndex_GetChunkRefs-10 12412014 4794156 -61.37% BenchmarkTSDBIndex_GetChunkRefs-10 12382716 4748571 -61.65% BenchmarkTSDBIndex_GetChunkRefs-10 12391397 4691054 -62.14% BenchmarkTSDBIndex_GetChunkRefs-10 12272200 5023567 -59.07% benchmark old allocs new allocs delta BenchmarkTSDBIndex_GetChunkRefs-10 345653 40 -99.99% BenchmarkTSDBIndex_GetChunkRefs-10 345653 40 -99.99% BenchmarkTSDBIndex_GetChunkRefs-10 345653 40 -99.99% BenchmarkTSDBIndex_GetChunkRefs-10 345653 40 -99.99% BenchmarkTSDBIndex_GetChunkRefs-10 345653 40 -99.99% benchmark old bytes new bytes delta BenchmarkTSDBIndex_GetChunkRefs-10 27286536 6398855 -76.55% BenchmarkTSDBIndex_GetChunkRefs-10 27286571 6399276 -76.55% BenchmarkTSDBIndex_GetChunkRefs-10 27286566 6400699 -76.54% BenchmarkTSDBIndex_GetChunkRefs-10 27286561 6399158 -76.55% BenchmarkTSDBIndex_GetChunkRefs-10 27286580 6399643 -76.55% ``` **Checklist** - [x] Tests updated --- pkg/storage/stores/tsdb/compactor.go | 6 +- pkg/storage/stores/tsdb/compactor_test.go | 8 +- pkg/storage/stores/tsdb/head_manager.go | 3 +- pkg/storage/stores/tsdb/head_read.go | 13 +- pkg/storage/stores/tsdb/index/index.go | 214 +++++++-- pkg/storage/stores/tsdb/index/index_test.go | 407 +++++++++++++++++- pkg/storage/stores/tsdb/index_client.go | 4 +- pkg/storage/stores/tsdb/index_client_test.go | 12 +- pkg/storage/stores/tsdb/querier.go | 2 +- pkg/storage/stores/tsdb/querier_test.go | 5 +- pkg/storage/stores/tsdb/single_file_index.go | 88 ++-- .../stores/tsdb/single_file_index_test.go | 53 +++ 12 files changed, 695 insertions(+), 120 deletions(-) diff --git a/pkg/storage/stores/tsdb/compactor.go b/pkg/storage/stores/tsdb/compactor.go index 472074a76c..11f66ed320 100644 --- a/pkg/storage/stores/tsdb/compactor.go +++ b/pkg/storage/stores/tsdb/compactor.go @@ -48,7 +48,7 @@ func (i indexProcessor) OpenCompactedIndexFile(ctx context.Context, path, tableN }() builder := NewBuilder() - err = indexFile.(*TSDBFile).Index.(*TSDBIndex).forSeries(ctx, nil, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { + err = indexFile.(*TSDBFile).Index.(*TSDBIndex).forSeries(ctx, nil, 0, math.MaxInt64, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { builder.AddSeries(lbls.Copy(), fp, chks) }, labels.MustNewMatcher(labels.MatchEqual, "", "")) if err != nil { @@ -197,7 +197,7 @@ func setupBuilder(ctx context.Context, userID string, sourceIndexSet compactor.I // add users index from multi-tenant indexes to the builder for _, idx := range multiTenantIndexes { - err := idx.(*TSDBFile).Index.(*TSDBIndex).forSeries(ctx, nil, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { + err := idx.(*TSDBFile).Index.(*TSDBIndex).forSeries(ctx, nil, 0, math.MaxInt64, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { builder.AddSeries(withoutTenantLabel(lbls.Copy()), fp, chks) }, withTenantLabelMatcher(userID, []*labels.Matcher{})...) if err != nil { @@ -229,7 +229,7 @@ func setupBuilder(ctx context.Context, userID string, sourceIndexSet compactor.I } }() - err = indexFile.(*TSDBFile).Index.(*TSDBIndex).forSeries(ctx, nil, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { + err = indexFile.(*TSDBFile).Index.(*TSDBIndex).forSeries(ctx, nil, 0, math.MaxInt64, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { builder.AddSeries(lbls.Copy(), fp, chks) }, labels.MustNewMatcher(labels.MatchEqual, "", "")) if err != nil { diff --git a/pkg/storage/stores/tsdb/compactor_test.go b/pkg/storage/stores/tsdb/compactor_test.go index 2e1f99789b..e8d4acdd7e 100644 --- a/pkg/storage/stores/tsdb/compactor_test.go +++ b/pkg/storage/stores/tsdb/compactor_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "math" "path" "path/filepath" "strings" @@ -192,12 +193,13 @@ func buildStream(lbls labels.Labels, chunks index.ChunkMetas, userLabel string) } } +// buildChunkMetas builds 1ms wide chunk metas from -> to. func buildChunkMetas(from, to int64) index.ChunkMetas { var chunkMetas index.ChunkMetas for i := from; i <= to; i++ { chunkMetas = append(chunkMetas, index.ChunkMeta{ MinTime: i, - MaxTime: i, + MaxTime: i + 1, Checksum: uint32(i), Entries: 1, }) @@ -594,7 +596,7 @@ func TestCompactor_Compact(t *testing.T) { require.NoError(t, err) actualChunks = map[string]index.ChunkMetas{} - err = indexFile.(*TSDBFile).Index.(*TSDBIndex).forSeries(context.Background(), nil, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { + err = indexFile.(*TSDBFile).Index.(*TSDBIndex).forSeries(context.Background(), nil, 0, math.MaxInt64, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { actualChunks[lbls.String()] = chks }, labels.MustNewMatcher(labels.MatchEqual, "", "")) require.NoError(t, err) @@ -808,7 +810,7 @@ func TestCompactedIndex(t *testing.T) { require.NoError(t, err) foundChunks := map[string]index.ChunkMetas{} - err = indexFile.(*TSDBFile).Index.(*TSDBIndex).forSeries(context.Background(), nil, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { + err = indexFile.(*TSDBFile).Index.(*TSDBIndex).forSeries(context.Background(), nil, 0, math.MaxInt64, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { foundChunks[lbls.String()] = append(index.ChunkMetas{}, chks...) }, labels.MustNewMatcher(labels.MatchEqual, "", "")) require.NoError(t, err) diff --git a/pkg/storage/stores/tsdb/head_manager.go b/pkg/storage/stores/tsdb/head_manager.go index e0102bbca0..1a1726601e 100644 --- a/pkg/storage/stores/tsdb/head_manager.go +++ b/pkg/storage/stores/tsdb/head_manager.go @@ -3,6 +3,7 @@ package tsdb import ( "context" "fmt" + "math" "os" "path/filepath" "sort" @@ -747,7 +748,7 @@ func (t *tenantHeads) forAll(fn func(user string, ls labels.Labels, fp uint64, c chks []index.ChunkMeta ) - fp, err := idx.Series(ps.At(), &ls, &chks) + fp, err := idx.Series(ps.At(), 0, math.MaxInt64, &ls, &chks) if err != nil { return errors.Wrapf(err, "iterating postings for tenant: %s", user) diff --git a/pkg/storage/stores/tsdb/head_read.go b/pkg/storage/stores/tsdb/head_read.go index 5ff524115c..0bfb342d76 100644 --- a/pkg/storage/stores/tsdb/head_read.go +++ b/pkg/storage/stores/tsdb/head_read.go @@ -17,6 +17,7 @@ import ( "math" "sort" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" @@ -121,7 +122,7 @@ func (h *headIndexReader) Postings(name string, shard *index.ShardAnnotation, va } // Series returns the series for the given reference. -func (h *headIndexReader) Series(ref storage.SeriesRef, lbls *labels.Labels, chks *[]index.ChunkMeta) (uint64, error) { +func (h *headIndexReader) Series(ref storage.SeriesRef, from int64, through int64, lbls *labels.Labels, chks *[]index.ChunkMeta) (uint64, error) { s := h.head.series.getByID(uint64(ref)) if s == nil { @@ -130,8 +131,16 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, lbls *labels.Labels, chk } *lbls = append((*lbls)[:0], s.ls...) + queryBounds := newBounds(model.Time(from), model.Time(through)) + + *chks = (*chks)[:0] s.Lock() - *chks = append((*chks)[:0], s.chks...) + for _, chk := range s.chks { + if !Overlap(chk, queryBounds) { + continue + } + *chks = append(*chks, chk) + } s.Unlock() return s.fp, nil diff --git a/pkg/storage/stores/tsdb/index/index.go b/pkg/storage/stores/tsdb/index/index.go index 4218e9d6be..52b0491ae1 100644 --- a/pkg/storage/stores/tsdb/index/index.go +++ b/pkg/storage/stores/tsdb/index/index.go @@ -26,6 +26,8 @@ import ( "os" "path/filepath" "sort" + "sync" + "time" "unsafe" "github.com/pkg/errors" @@ -54,6 +56,8 @@ const ( // store every 1024 series' fingerprints in the fingerprint offsets table fingerprintInterval = 1 << 10 + + millisecondsInHour = int64(time.Hour / time.Millisecond) ) type indexWriterStage uint8 @@ -1328,7 +1332,10 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) { return nil, errors.Wrap(err, "loading fingerprint offsets") } - r.dec = &Decoder{LookupSymbol: r.lookupSymbol} + r.dec = &Decoder{ + LookupSymbol: r.lookupSymbol, + chunksSample: map[storage.SeriesRef]*chunkSamples{}, + } return r, nil } @@ -1722,7 +1729,7 @@ func (r *Reader) LabelValueFor(id storage.SeriesRef, label string) (string, erro } // Series reads the series with the given ID and writes its labels and chunks into lbls and chks. -func (r *Reader) Series(id storage.SeriesRef, lbls *labels.Labels, chks *[]ChunkMeta) (uint64, error) { +func (r *Reader) Series(id storage.SeriesRef, from int64, through int64, lbls *labels.Labels, chks *[]ChunkMeta) (uint64, error) { offset := id // In version 2 series IDs are no longer exact references but series are 16-byte padded // and the ID is the multiple of 16 of the actual position. @@ -1734,7 +1741,7 @@ func (r *Reader) Series(id storage.SeriesRef, lbls *labels.Labels, chks *[]Chunk return 0, d.Err() } - fprint, err := r.dec.Series(d.Get(), lbls, chks) + fprint, err := r.dec.Series(d.Get(), id, from, through, lbls, chks) if err != nil { return 0, errors.Wrap(err, "read series") } @@ -1892,12 +1899,56 @@ func (s *stringListIter) Next() bool { func (s stringListIter) At() string { return s.cur } func (s stringListIter) Err() error { return nil } +type chunkSample struct { + largestMaxt int64 // holds largest chunk end time we have seen so far. In other words all the earlier chunks have maxt <= largestMaxt + idx int // index of the chunk in the list which helps with determining position of sampled chunk + offset int // offset is relative to beginning chunk info block i.e after series labels info and chunk count etc + prevChunkMaxt int64 // chunk times are stored as deltas. This is used for calculating mint of sampled chunk +} + +type chunkSamples struct { + sync.RWMutex + chunks []chunkSample +} + +func newChunkSamples() *chunkSamples { + return &chunkSamples{ + chunks: make([]chunkSample, 0, 30), + } +} + +// getChunkSampleForQueryStarting returns back chunk sample which has largest "largestMaxt" that is less than given query start time. +// In other words, return back chunk sample which skips all the chunks that end before query start time. +// If query start is before all "largestMaxt", we would return first chunk sample. +// If query start is after all "largestMaxt", we would return nil. +func (c *chunkSamples) getChunkSampleForQueryStarting(ts int64) *chunkSample { + c.RLock() + defer c.RUnlock() + + // first find position of chunk sample which has smallest "largestMaxt" after ts + i := sort.Search(len(c.chunks), func(i int) bool { + return c.chunks[i].largestMaxt >= ts + }) + + if i >= len(c.chunks) { + return nil + } + + // there could be more chunks of interest between this and previous sample, so we should process chunks from previous sample + if i > 0 { + i-- + } + return &c.chunks[i] +} + // Decoder provides decoding methods for the v1 and v2 index file format. // // It currently does not contain decoding methods for all entry types but can be extended // by them if there's demand. type Decoder struct { - LookupSymbol func(uint32) (string, error) + LookupSymbol func(uint32) (string, error) + chunksSample map[storage.SeriesRef]*chunkSamples + chunksSampleMtx sync.RWMutex } // Postings returns a postings list for b and its number of elements. @@ -1966,8 +2017,79 @@ func (dec *Decoder) LabelValueFor(b []byte, label string) (string, error) { return "", d.Err() } +func (dec *Decoder) getOrCreateChunksSample(d encoding.Decbuf, seriesRef storage.SeriesRef, numChunks int) (*chunkSamples, error) { + dec.chunksSampleMtx.Lock() + sample, ok := dec.chunksSample[seriesRef] + if ok { + dec.chunksSampleMtx.Unlock() + return sample, nil + } + + sample = newChunkSamples() + dec.chunksSample[seriesRef] = sample + sample.Lock() + defer sample.Unlock() + + dec.chunksSampleMtx.Unlock() + + if err := buildChunkSamples(d, numChunks, sample); err != nil { + return nil, err + } + + return sample, nil +} + +// buildChunkSamples samples chunks considering maxt of the indexed chunks. +// It would always sample first and last chunk for returning earlier when query falls out of range on either ends. +// First chunk onwards it would only sample chunks that have maxt greater by at least 1h than previous sampled chunk's maxt. +func buildChunkSamples(d encoding.Decbuf, numChunks int, info *chunkSamples) error { + bufLen := d.Len() + + chunkPos := bufLen - d.Len() + chunkMeta := &ChunkMeta{} + if err := readChunkMeta(&d, 0, chunkMeta); err != nil { + return errors.Wrapf(d.Err(), "read meta for chunk %d", 0) + } + + info.chunks = append(info.chunks, chunkSample{ + largestMaxt: chunkMeta.MaxTime, + idx: 0, + offset: chunkPos, + }) + + t0 := chunkMeta.MaxTime + largestMaxt := chunkMeta.MaxTime + prevLargestMaxt := largestMaxt + + for i := 1; i < numChunks; i++ { + chunkPos = bufLen - d.Len() + if err := readChunkMeta(&d, t0, chunkMeta); err != nil { + return errors.Wrapf(d.Err(), "read meta for chunk %d", i) + } + if chunkMeta.MaxTime > largestMaxt { + largestMaxt = chunkMeta.MaxTime + } + + if d.Err() != nil { + return errors.Wrapf(d.Err(), "read meta for chunk %d", i) + } + + if i == numChunks-1 || largestMaxt-prevLargestMaxt >= millisecondsInHour { + prevLargestMaxt = largestMaxt + info.chunks = append(info.chunks, chunkSample{ + idx: i, + prevChunkMaxt: t0, + largestMaxt: largestMaxt, + offset: chunkPos, + }) + } + t0 = chunkMeta.MaxTime + } + return d.Err() +} + // Series decodes a series entry from the given byte slice into lset and chks. -func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]ChunkMeta) (uint64, error) { +func (dec *Decoder) Series(b []byte, seriesRef storage.SeriesRef, from int64, through int64, lbls *labels.Labels, chks *[]ChunkMeta) (uint64, error) { *lbls = (*lbls)[:0] *chks = (*chks)[:0] @@ -2003,46 +2125,64 @@ func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]ChunkMeta) (ui return 0, d.Err() } - t0 := d.Varint64() - maxt := int64(d.Uvarint64()) + t0 - kb := uint32(d.Uvarint()) - entries := uint32(d.Uvarint64()) - checksum := d.Be32() + chunksSample, err := dec.getOrCreateChunksSample(encoding.DecWrap(tsdb_enc.Decbuf{B: d.Get()}), seriesRef, k) + if err != nil { + return 0, err + } - *chks = append(*chks, ChunkMeta{ - Checksum: checksum, - MinTime: t0, - MaxTime: maxt, - KB: kb, - Entries: entries, - }) - t0 = maxt - - for i := 1; i < k; i++ { - // Decode the diff against previous chunk as varint - // instead of uvarint because chunks may overlap - mint := d.Varint64() + t0 - maxt := int64(d.Uvarint64()) + mint - kb := uint32(d.Uvarint()) - entries := uint32(d.Uvarint64()) - checksum := d.Be32() - t0 = maxt + cs := chunksSample.getChunkSampleForQueryStarting(from) + if cs == nil { + return fprint, nil + } + d.Skip(cs.offset) - if d.Err() != nil { - return 0, errors.Wrapf(d.Err(), "read meta for chunk %d", i) + chunkMeta := &ChunkMeta{} + if err := readChunkMeta(&d, cs.prevChunkMaxt, chunkMeta); err != nil { + return 0, errors.Wrapf(d.Err(), "read meta for chunk %d", cs.idx) + } + + if overlap(from, through, chunkMeta.MinTime, chunkMeta.MaxTime) { + *chks = append(*chks, *chunkMeta) + } + t0 := chunkMeta.MaxTime + + for i := cs.idx + 1; i < k; i++ { + if err := readChunkMeta(&d, t0, chunkMeta); err != nil { + return 0, errors.Wrapf(d.Err(), "read meta for chunk %d", cs.idx) } + t0 = chunkMeta.MaxTime - *chks = append(*chks, ChunkMeta{ - Checksum: checksum, - MinTime: mint, - MaxTime: maxt, - KB: kb, - Entries: entries, - }) + if !overlap(from, through, chunkMeta.MinTime, chunkMeta.MaxTime) { + continue + } + if chunkMeta.MinTime >= through { + break + } + *chks = append(*chks, *chunkMeta) } return fprint, d.Err() } +func readChunkMeta(d *encoding.Decbuf, prevChunkMaxt int64, chunkMeta *ChunkMeta) error { + // Decode the diff against previous chunk as varint + // instead of uvarint because chunks may overlap + chunkMeta.MinTime = d.Varint64() + prevChunkMaxt + chunkMeta.MaxTime = int64(d.Uvarint64()) + chunkMeta.MinTime + chunkMeta.KB = uint32(d.Uvarint()) + chunkMeta.Entries = uint32(d.Uvarint64()) + chunkMeta.Checksum = d.Be32() + + if d.Err() != nil { + return d.Err() + } + + return nil +} + func yoloString(b []byte) string { return *((*string)(unsafe.Pointer(&b))) } + +func overlap(aFrom, aThrough, bFrom, bThrough int64) bool { + return aFrom < bThrough && aThrough > bFrom +} diff --git a/pkg/storage/stores/tsdb/index/index_test.go b/pkg/storage/stores/tsdb/index/index_test.go index 547f2350d1..8aa1ccfb92 100644 --- a/pkg/storage/stores/tsdb/index/index_test.go +++ b/pkg/storage/stores/tsdb/index/index_test.go @@ -17,20 +17,23 @@ import ( "context" "fmt" "hash/crc32" + "math" "math/rand" "os" "path/filepath" "sort" "testing" + "time" "github.com/pkg/errors" "github.com/stretchr/testify/require" "go.uber.org/goleak" + "github.com/grafana/loki/pkg/util/encoding" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/tsdb/encoding" + tsdb_enc "github.com/prometheus/prometheus/tsdb/encoding" "github.com/prometheus/prometheus/util/testutil" ) @@ -185,7 +188,7 @@ func TestIndexRW_Postings(t *testing.T) { var c []ChunkMeta for i := 0; p.Next(); i++ { - _, err := ir.Series(p.At(), &l, &c) + _, err := ir.Series(p.At(), 0, math.MaxInt64, &l, &c) require.NoError(t, err) require.Equal(t, 0, len(c)) @@ -200,7 +203,7 @@ func TestIndexRW_Postings(t *testing.T) { return errors.Errorf("unexpected key length for label indices table %d", len(key)) } - d := encoding.NewDecbufAt(ir.b, int(off), castagnoliTable) + d := tsdb_enc.NewDecbufAt(ir.b, int(off), castagnoliTable) vals := []string{} nc := d.Be32int() if nc != 1 { @@ -304,7 +307,7 @@ func TestPostingsMany(t *testing.T) { var lbls labels.Labels var metas []ChunkMeta for it.Next() { - _, err := ir.Series(it.At(), &lbls, &metas) + _, err := ir.Series(it.At(), 0, math.MaxInt64, &lbls, &metas) require.NoError(t, err) got = append(got, lbls.Get("i")) } @@ -420,7 +423,7 @@ func TestPersistence_index_e2e(t *testing.T) { ref := gotp.At() - _, err := ir.Series(ref, &lset, &chks) + _, err := ir.Series(ref, 0, math.MaxInt64, &lset, &chks) require.NoError(t, err) err = mi.Series(expp.At(), &explset, &expchks) @@ -467,7 +470,7 @@ func TestPersistence_index_e2e(t *testing.T) { func TestDecbufUvarintWithInvalidBuffer(t *testing.T) { b := RealByteSlice([]byte{0x81, 0x81, 0x81, 0x81, 0x81, 0x81}) - db := encoding.NewDecbufUvarintAt(b, 0, castagnoliTable) + db := tsdb_enc.NewDecbufUvarintAt(b, 0, castagnoliTable) require.Error(t, db.Err()) } @@ -544,3 +547,395 @@ func TestDecoder_Postings_WrongInput(t *testing.T) { _, _, err := (&Decoder{}).Postings([]byte("the cake is a lie")) require.Error(t, err) } + +func TestDecoder_ChunkSamples(t *testing.T) { + dir := t.TempDir() + + lbls := []labels.Labels{ + {{Name: "fizz", Value: "buzz"}}, + {{Name: "ping", Value: "pong"}}, + } + + symbols := map[string]struct{}{} + for _, lset := range lbls { + for _, l := range lset { + symbols[l.Name] = struct{}{} + symbols[l.Value] = struct{}{} + } + } + + now := model.Now() + + for name, tc := range map[string]struct { + chunkMetas []ChunkMeta + expectedChunkSamples []chunkSample + }{ + "no overlapping chunks": { + chunkMetas: []ChunkMeta{ + { + MinTime: int64(now), + MaxTime: int64(now.Add(30 * time.Minute)), + }, + { + MinTime: int64(now.Add(40 * time.Minute)), + MaxTime: int64(now.Add(80 * time.Minute)), + }, + { + MinTime: int64(now.Add(90 * time.Minute)), + MaxTime: int64(now.Add(120 * time.Minute)), + }, + { + MinTime: int64(now.Add(130 * time.Minute)), + MaxTime: int64(now.Add(150 * time.Minute)), + }, + }, + expectedChunkSamples: []chunkSample{ + { + largestMaxt: int64(now.Add(30 * time.Minute)), + idx: 0, + prevChunkMaxt: 0, + }, + { + largestMaxt: int64(now.Add(120 * time.Minute)), + idx: 2, + prevChunkMaxt: int64(now.Add(80 * time.Minute)), + }, + { + largestMaxt: int64(now.Add(150 * time.Minute)), + idx: 3, + prevChunkMaxt: int64(now.Add(120 * time.Minute)), + }, + }, + }, + "overlapping chunks": { + chunkMetas: []ChunkMeta{ + { + MinTime: int64(now), + MaxTime: int64(now.Add(30 * time.Minute)), + }, + { + MinTime: int64(now.Add(20 * time.Minute)), + MaxTime: int64(now.Add(80 * time.Minute)), + }, + { + MinTime: int64(now.Add(70 * time.Minute)), + MaxTime: int64(now.Add(120 * time.Minute)), + }, + { + MinTime: int64(now.Add(100 * time.Minute)), + MaxTime: int64(now.Add(110 * time.Minute)), + }, + }, + expectedChunkSamples: []chunkSample{ + { + largestMaxt: int64(now.Add(30 * time.Minute)), + idx: 0, + prevChunkMaxt: 0, + }, + { + largestMaxt: int64(now.Add(120 * time.Minute)), + idx: 2, + prevChunkMaxt: int64(now.Add(80 * time.Minute)), + }, + { + largestMaxt: int64(now.Add(120 * time.Minute)), + idx: 3, + prevChunkMaxt: int64(now.Add(120 * time.Minute)), + }, + }, + }, + "first chunk overlapping all chunks": { + chunkMetas: []ChunkMeta{ + { + MinTime: int64(now), + MaxTime: int64(now.Add(180 * time.Minute)), + }, + { + MinTime: int64(now.Add(20 * time.Minute)), + MaxTime: int64(now.Add(80 * time.Minute)), + }, + { + MinTime: int64(now.Add(70 * time.Minute)), + MaxTime: int64(now.Add(120 * time.Minute)), + }, + { + MinTime: int64(now.Add(110 * time.Minute)), + MaxTime: int64(now.Add(150 * time.Minute)), + }, + }, + expectedChunkSamples: []chunkSample{ + { + largestMaxt: int64(now.Add(180 * time.Minute)), + idx: 0, + prevChunkMaxt: 0, + }, + { + largestMaxt: int64(now.Add(180 * time.Minute)), + idx: 3, + prevChunkMaxt: int64(now.Add(120 * time.Minute)), + }, + }, + }, + "large gaps between chunks": { + chunkMetas: []ChunkMeta{ + { + MinTime: int64(now), + MaxTime: int64(now.Add(30 * time.Minute)), + }, + { + MinTime: int64(now.Add(200 * time.Minute)), + MaxTime: int64(now.Add(280 * time.Minute)), + }, + { + MinTime: int64(now.Add(500 * time.Minute)), + MaxTime: int64(now.Add(520 * time.Minute)), + }, + { + MinTime: int64(now.Add(800 * time.Minute)), + MaxTime: int64(now.Add(835 * time.Minute)), + }, + }, + expectedChunkSamples: []chunkSample{ + { + largestMaxt: int64(now.Add(30 * time.Minute)), + idx: 0, + prevChunkMaxt: 0, + }, + { + largestMaxt: int64(now.Add(280 * time.Minute)), + idx: 1, + prevChunkMaxt: int64(now.Add(30 * time.Minute)), + }, + { + largestMaxt: int64(now.Add(520 * time.Minute)), + idx: 2, + prevChunkMaxt: int64(now.Add(280 * time.Minute)), + }, + { + largestMaxt: int64(now.Add(835 * time.Minute)), + idx: 3, + prevChunkMaxt: int64(now.Add(520 * time.Minute)), + }, + }, + }, + } { + t.Run(name, func(t *testing.T) { + iw, err := NewWriter(context.Background(), filepath.Join(dir, name)) + require.NoError(t, err) + + syms := []string{} + for s := range symbols { + syms = append(syms, s) + } + sort.Strings(syms) + for _, s := range syms { + require.NoError(t, iw.AddSymbol(s)) + } + + for i, l := range lbls { + err = iw.AddSeries(storage.SeriesRef(i), l, model.Fingerprint(l.Hash()), tc.chunkMetas...) + require.NoError(t, err) + } + + err = iw.Close() + require.NoError(t, err) + + ir, err := NewFileReader(filepath.Join(dir, name)) + require.NoError(t, err) + + postings, err := ir.Postings("fizz", nil, "buzz") + require.NoError(t, err) + + require.True(t, postings.Next()) + var lset labels.Labels + var chks []ChunkMeta + + // there should be no chunk samples + require.Nil(t, ir.dec.chunksSample[postings.At()]) + + // read series so that chunk samples get built + _, err = ir.Series(postings.At(), 0, math.MaxInt64, &lset, &chks) + require.NoError(t, err) + + require.Equal(t, tc.chunkMetas, chks) + require.Equal(t, lset, lbls[0]) + + // there should be chunk samples for only the series we read + require.Len(t, ir.dec.chunksSample, 1) + require.NotNil(t, ir.dec.chunksSample[postings.At()]) + require.Len(t, ir.dec.chunksSample[postings.At()].chunks, len(tc.expectedChunkSamples)) + + // build decoder for the series we read to verify the samples + offset := postings.At() * 16 + d := encoding.DecWrap(tsdb_enc.NewDecbufUvarintAt(ir.b, int(offset), castagnoliTable)) + require.NoError(t, d.Err()) + + // read chunk metadata to positing the decoder at the beginning of first chunk + d.Be64() + k := d.Uvarint() + + for i := 0; i < k; i++ { + d.Uvarint() + d.Uvarint() + } + require.Equal(t, len(tc.chunkMetas), d.Uvarint()) + for i, cs := range ir.dec.chunksSample[postings.At()].chunks { + require.Equal(t, tc.expectedChunkSamples[i].idx, cs.idx) + require.Equal(t, tc.expectedChunkSamples[i].largestMaxt, cs.largestMaxt) + require.Equal(t, tc.expectedChunkSamples[i].prevChunkMaxt, cs.prevChunkMaxt) + + dw := encoding.DecWrap(tsdb_enc.Decbuf{B: d.Get()}) + dw.Skip(cs.offset) + chunkMeta := ChunkMeta{} + require.NoError(t, readChunkMeta(&dw, cs.prevChunkMaxt, &chunkMeta)) + require.Equal(t, tc.chunkMetas[tc.expectedChunkSamples[i].idx], chunkMeta) + } + + require.NoError(t, ir.Close()) + }) + } +} + +func TestChunkSamples_getChunkSampleForQueryStarting(t *testing.T) { + for name, tc := range map[string]struct { + chunkSamples *chunkSamples + queryMint int64 + expectedChunkSampleIdx int + }{ + "mint greater than largestMaxt": { + chunkSamples: &chunkSamples{ + chunks: []chunkSample{ + { + largestMaxt: 100, + idx: 0, + offset: 0, + prevChunkMaxt: 0, + }, + { + largestMaxt: 200, + idx: 5, + offset: 5, + prevChunkMaxt: 50, + }, + }, + }, + queryMint: 250, + expectedChunkSampleIdx: -1, + }, + "mint smaller than first largestMaxt": { + chunkSamples: &chunkSamples{ + chunks: []chunkSample{ + { + largestMaxt: 100, + idx: 0, + offset: 0, + prevChunkMaxt: 0, + }, + { + largestMaxt: 200, + idx: 5, + offset: 5, + prevChunkMaxt: 50, + }, + }, + }, + queryMint: 50, + expectedChunkSampleIdx: 0, + }, + "intermediate chunk sample": { + chunkSamples: &chunkSamples{ + chunks: []chunkSample{ + { + largestMaxt: 100, + idx: 0, + offset: 0, + prevChunkMaxt: 0, + }, + { + largestMaxt: 200, + idx: 5, + offset: 5, + prevChunkMaxt: 50, + }, + { + largestMaxt: 350, + idx: 7, + offset: 7, + prevChunkMaxt: 150, + }, + { + largestMaxt: 500, + idx: 9, + offset: 9, + prevChunkMaxt: 250, + }, + }, + }, + queryMint: 250, + expectedChunkSampleIdx: 1, + }, + "mint matching samples largestMaxt": { + chunkSamples: &chunkSamples{ + chunks: []chunkSample{ + { + largestMaxt: 100, + idx: 0, + offset: 0, + prevChunkMaxt: 0, + }, + { + largestMaxt: 200, + idx: 5, + offset: 5, + prevChunkMaxt: 50, + }, + { + largestMaxt: 350, + idx: 7, + offset: 7, + prevChunkMaxt: 150, + }, + { + largestMaxt: 500, + idx: 9, + offset: 9, + prevChunkMaxt: 250, + }, + }, + }, + queryMint: 350, + expectedChunkSampleIdx: 1, + }, + "same chunk sampled": { + chunkSamples: &chunkSamples{ + chunks: []chunkSample{ + { + largestMaxt: 100, + idx: 0, + offset: 0, + prevChunkMaxt: 0, + }, + { + largestMaxt: 100, + idx: 0, + offset: 0, + prevChunkMaxt: 0, + }, + }, + }, + queryMint: 50, + expectedChunkSampleIdx: 0, + }, + } { + t.Run(name, func(t *testing.T) { + chunkSample := tc.chunkSamples.getChunkSampleForQueryStarting(tc.queryMint) + if tc.expectedChunkSampleIdx == -1 { + require.Nil(t, chunkSample) + return + } + + require.NotNil(t, chunkSample) + require.Equal(t, tc.chunkSamples.chunks[tc.expectedChunkSampleIdx], *chunkSample) + }) + } +} diff --git a/pkg/storage/stores/tsdb/index_client.go b/pkg/storage/stores/tsdb/index_client.go index 880164dcef..46d94f848e 100644 --- a/pkg/storage/stores/tsdb/index_client.go +++ b/pkg/storage/stores/tsdb/index_client.go @@ -204,15 +204,13 @@ func (c *IndexClient) Stats(ctx context.Context, userID string, from, through mo acc = &stats.Stats{} } - queryBounds := newBounds(from, through) - for idx, interval := range intervals { if err := c.idx.Stats(ctx, userID, interval.Start, interval.End, acc, shard, func(chk index.ChunkMeta) bool { // for the first split, purely do overlap check to also include chunks having // start time earlier than start time of the table interval we are querying. // for all other splits, consider only chunks that have from >= interval.Start // so that we start after the start time of the index table we are querying. - if Overlap(queryBounds, chk) && (idx == 0 || chk.From() >= interval.Start) { + if idx == 0 || chk.From() >= interval.Start { return true } return false diff --git a/pkg/storage/stores/tsdb/index_client_test.go b/pkg/storage/stores/tsdb/index_client_test.go index b3e20c1bb6..57737d3b93 100644 --- a/pkg/storage/stores/tsdb/index_client_test.go +++ b/pkg/storage/stores/tsdb/index_client_test.go @@ -163,8 +163,8 @@ func TestIndexClient_Stats(t *testing.T) { Start: indexStartYesterday, End: indexStartToday + 1000, }, - expectedNumChunks: 298, // 2 chunks not included at indexStartYesterday since start time is not inclusive - expectedNumEntries: 298, + expectedNumChunks: 300, + expectedNumEntries: 300, expectedNumStreams: 2, }, { @@ -173,8 +173,8 @@ func TestIndexClient_Stats(t *testing.T) { Start: indexStartToday, End: indexStartToday + 1000, }, - expectedNumChunks: 99, // 1 chunk not included at indexStartToday since start time is not inclusive - expectedNumEntries: 99, + expectedNumChunks: 100, + expectedNumEntries: 100, expectedNumStreams: 1, }, { @@ -183,8 +183,8 @@ func TestIndexClient_Stats(t *testing.T) { Start: indexStartToday + 50, End: indexStartToday + 60, }, - expectedNumChunks: 9, // start and end are not inclusive - expectedNumEntries: 9, + expectedNumChunks: 10, // end time not inclusive + expectedNumEntries: 10, expectedNumStreams: 1, }, { diff --git a/pkg/storage/stores/tsdb/querier.go b/pkg/storage/stores/tsdb/querier.go index f7cb23bd7a..3518330582 100644 --- a/pkg/storage/stores/tsdb/querier.go +++ b/pkg/storage/stores/tsdb/querier.go @@ -66,7 +66,7 @@ type IndexReader interface { // Series populates the given labels and chunk metas for the series identified // by the reference. // Returns storage.ErrNotFound if the ref does not resolve to a known series. - Series(ref storage.SeriesRef, lset *labels.Labels, chks *[]index.ChunkMeta) (uint64, error) + Series(ref storage.SeriesRef, from int64, through int64, lset *labels.Labels, chks *[]index.ChunkMeta) (uint64, error) // LabelNames returns all the unique label names present in the index in sorted order. LabelNames(matchers ...*labels.Matcher) ([]string, error) diff --git a/pkg/storage/stores/tsdb/querier_test.go b/pkg/storage/stores/tsdb/querier_test.go index 0ac949604f..1d29327b8f 100644 --- a/pkg/storage/stores/tsdb/querier_test.go +++ b/pkg/storage/stores/tsdb/querier_test.go @@ -2,6 +2,7 @@ package tsdb import ( "context" + "math" "testing" "time" @@ -113,12 +114,12 @@ func TestQueryIndex(t *testing.T) { ) require.True(t, p.Next()) - _, err = reader.Series(p.At(), &ls, &chks) + _, err = reader.Series(p.At(), 0, math.MaxInt64, &ls, &chks) require.Nil(t, err) require.Equal(t, cases[0].labels.String(), ls.String()) require.Equal(t, cases[0].chunks, chks) require.True(t, p.Next()) - _, err = reader.Series(p.At(), &ls, &chks) + _, err = reader.Series(p.At(), 0, math.MaxInt64, &ls, &chks) require.Nil(t, err) require.Equal(t, cases[1].labels.String(), ls.String()) require.Equal(t, cases[1].chunks, chks) diff --git a/pkg/storage/stores/tsdb/single_file_index.go b/pkg/storage/stores/tsdb/single_file_index.go index 095ee07edb..7f1d60cbc9 100644 --- a/pkg/storage/stores/tsdb/single_file_index.go +++ b/pkg/storage/stores/tsdb/single_file_index.go @@ -102,12 +102,7 @@ func (i *TSDBIndex) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) { // fn must NOT capture it's arguments. They're reused across series iterations and returned to // a pool after completion. -func (i *TSDBIndex) forSeries( - ctx context.Context, - shard *index.ShardAnnotation, - fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta), - matchers ...*labels.Matcher, -) error { +func (i *TSDBIndex) forSeries(ctx context.Context, shard *index.ShardAnnotation, from model.Time, through model.Time, fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta), matchers ...*labels.Matcher) error { p, err := PostingsForMatchers(i.reader, shard, matchers...) if err != nil { return err @@ -123,7 +118,7 @@ func (i *TSDBIndex) forSeries( } for p.Next() { - hash, err := i.reader.Series(p.At(), &ls, &chks) + hash, err := i.reader.Series(p.At(), int64(from), int64(through), &ls, &chks) if err != nil { return err } @@ -143,32 +138,23 @@ func (i *TSDBIndex) forSeries( } func (i *TSDBIndex) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error) { - queryBounds := newBounds(from, through) if res == nil { res = ChunkRefsPool.Get() } res = res[:0] - if err := i.forSeries(ctx, shard, - func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { - // TODO(owen-d): use logarithmic approach - for _, chk := range chks { + if err := i.forSeries(ctx, shard, from, through, func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { + for _, chk := range chks { - // current chunk is outside the range of this request - if !Overlap(queryBounds, chk) { - continue - } - - res = append(res, ChunkRef{ - User: userID, // assumed to be the same, will be enforced by caller. - Fingerprint: fp, - Start: chk.From(), - End: chk.Through(), - Checksum: chk.Checksum, - }) - } - }, - matchers...); err != nil { + res = append(res, ChunkRef{ + User: userID, // assumed to be the same, will be enforced by caller. + Fingerprint: fp, + Start: chk.From(), + End: chk.Through(), + Checksum: chk.Checksum, + }) + } + }, matchers...); err != nil { return nil, err } @@ -176,27 +162,20 @@ func (i *TSDBIndex) GetChunkRefs(ctx context.Context, userID string, from, throu } func (i *TSDBIndex) Series(ctx context.Context, _ string, from, through model.Time, res []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error) { - queryBounds := newBounds(from, through) if res == nil { res = SeriesPool.Get() } res = res[:0] - if err := i.forSeries(ctx, shard, - func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { - // TODO(owen-d): use logarithmic approach - for _, chk := range chks { - if Overlap(queryBounds, chk) { - // this series has at least one chunk in the desired range - res = append(res, Series{ - Labels: ls.Copy(), - Fingerprint: fp, - }) - break - } - } - }, - matchers...); err != nil { + if err := i.forSeries(ctx, shard, from, through, func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { + if len(chks) == 0 { + return + } + res = append(res, Series{ + Labels: ls.Copy(), + Fingerprint: fp, + }) + }, matchers...); err != nil { return nil, err } @@ -233,21 +212,18 @@ func (i *TSDBIndex) Identifier(string) SingleTenantTSDBIdentifier { } func (i *TSDBIndex) Stats(ctx context.Context, userID string, from, through model.Time, acc IndexStatsAccumulator, shard *index.ShardAnnotation, shouldIncludeChunk shouldIncludeChunk, matchers ...*labels.Matcher) error { - if err := i.forSeries(ctx, shard, - func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { - // TODO(owen-d): use logarithmic approach - var addedStream bool - for _, chk := range chks { - if shouldIncludeChunk(chk) { - if !addedStream { - acc.AddStream(fp) - addedStream = true - } - acc.AddChunk(fp, chk) + if err := i.forSeries(ctx, shard, from, through, func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { + var addedStream bool + for _, chk := range chks { + if shouldIncludeChunk(chk) { + if !addedStream { + acc.AddStream(fp) + addedStream = true } + acc.AddChunk(fp, chk) } - }, - matchers...); err != nil { + } + }, matchers...); err != nil { return err } diff --git a/pkg/storage/stores/tsdb/single_file_index_test.go b/pkg/storage/stores/tsdb/single_file_index_test.go index c1d0a5a464..dd4b9395cf 100644 --- a/pkg/storage/stores/tsdb/single_file_index_test.go +++ b/pkg/storage/stores/tsdb/single_file_index_test.go @@ -2,8 +2,10 @@ package tsdb import ( "context" + "math/rand" "sort" "testing" + "time" "github.com/go-kit/log" "github.com/prometheus/common/model" @@ -201,3 +203,54 @@ func TestSingleIdx(t *testing.T) { } } + +func BenchmarkTSDBIndex_GetChunkRefs(b *testing.B) { + now := model.Now() + queryFrom, queryThrough := now.Add(3*time.Hour).Add(time.Millisecond), now.Add(5*time.Hour).Add(-time.Millisecond) + queryBounds := newBounds(queryFrom, queryThrough) + numChunksToMatch := 0 + + var chunkMetas []index.ChunkMeta + // build a chunk for every second with randomized chunk length + for from, through := now, now.Add(24*time.Hour); from <= through; from = from.Add(time.Second) { + // randomize chunk length between 1-120 mins + chunkLenMin := rand.Intn(120) + if chunkLenMin == 0 { + chunkLenMin = 1 + } + chunkMeta := index.ChunkMeta{ + MinTime: int64(from), + MaxTime: int64(from.Add(time.Duration(chunkLenMin) * time.Minute)), + Checksum: uint32(from), + Entries: 1, + } + chunkMetas = append(chunkMetas, chunkMeta) + if Overlap(chunkMeta, queryBounds) { + numChunksToMatch++ + } + } + + tempDir := b.TempDir() + tsdbIndex := BuildIndex(b, tempDir, []LoadableSeries{ + { + Labels: mustParseLabels(`{foo="bar", fizz="buzz"}`), + Chunks: chunkMetas, + }, + { + Labels: mustParseLabels(`{foo="bar", ping="pong"}`), + Chunks: chunkMetas, + }, + { + Labels: mustParseLabels(`{foo1="bar1", ping="pong"}`), + Chunks: chunkMetas, + }, + }) + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + chkRefs, err := tsdbIndex.GetChunkRefs(context.Background(), "fake", queryFrom, queryThrough, nil, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + require.NoError(b, err) + require.Len(b, chkRefs, numChunksToMatch*2) + } +}