diff --git a/pkg/storage/stores/tsdb/compactor.go b/pkg/storage/stores/tsdb/compactor.go index 472074a76c..11f66ed320 100644 --- a/pkg/storage/stores/tsdb/compactor.go +++ b/pkg/storage/stores/tsdb/compactor.go @@ -48,7 +48,7 @@ func (i indexProcessor) OpenCompactedIndexFile(ctx context.Context, path, tableN }() builder := NewBuilder() - err = indexFile.(*TSDBFile).Index.(*TSDBIndex).forSeries(ctx, nil, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { + err = indexFile.(*TSDBFile).Index.(*TSDBIndex).forSeries(ctx, nil, 0, math.MaxInt64, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { builder.AddSeries(lbls.Copy(), fp, chks) }, labels.MustNewMatcher(labels.MatchEqual, "", "")) if err != nil { @@ -197,7 +197,7 @@ func setupBuilder(ctx context.Context, userID string, sourceIndexSet compactor.I // add users index from multi-tenant indexes to the builder for _, idx := range multiTenantIndexes { - err := idx.(*TSDBFile).Index.(*TSDBIndex).forSeries(ctx, nil, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { + err := idx.(*TSDBFile).Index.(*TSDBIndex).forSeries(ctx, nil, 0, math.MaxInt64, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { builder.AddSeries(withoutTenantLabel(lbls.Copy()), fp, chks) }, withTenantLabelMatcher(userID, []*labels.Matcher{})...) if err != nil { @@ -229,7 +229,7 @@ func setupBuilder(ctx context.Context, userID string, sourceIndexSet compactor.I } }() - err = indexFile.(*TSDBFile).Index.(*TSDBIndex).forSeries(ctx, nil, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { + err = indexFile.(*TSDBFile).Index.(*TSDBIndex).forSeries(ctx, nil, 0, math.MaxInt64, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { builder.AddSeries(lbls.Copy(), fp, chks) }, labels.MustNewMatcher(labels.MatchEqual, "", "")) if err != nil { diff --git a/pkg/storage/stores/tsdb/compactor_test.go b/pkg/storage/stores/tsdb/compactor_test.go index 2e1f99789b..e8d4acdd7e 100644 --- a/pkg/storage/stores/tsdb/compactor_test.go +++ b/pkg/storage/stores/tsdb/compactor_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "math" "path" "path/filepath" "strings" @@ -192,12 +193,13 @@ func buildStream(lbls labels.Labels, chunks index.ChunkMetas, userLabel string) } } +// buildChunkMetas builds 1ms wide chunk metas from -> to. func buildChunkMetas(from, to int64) index.ChunkMetas { var chunkMetas index.ChunkMetas for i := from; i <= to; i++ { chunkMetas = append(chunkMetas, index.ChunkMeta{ MinTime: i, - MaxTime: i, + MaxTime: i + 1, Checksum: uint32(i), Entries: 1, }) @@ -594,7 +596,7 @@ func TestCompactor_Compact(t *testing.T) { require.NoError(t, err) actualChunks = map[string]index.ChunkMetas{} - err = indexFile.(*TSDBFile).Index.(*TSDBIndex).forSeries(context.Background(), nil, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { + err = indexFile.(*TSDBFile).Index.(*TSDBIndex).forSeries(context.Background(), nil, 0, math.MaxInt64, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { actualChunks[lbls.String()] = chks }, labels.MustNewMatcher(labels.MatchEqual, "", "")) require.NoError(t, err) @@ -808,7 +810,7 @@ func TestCompactedIndex(t *testing.T) { require.NoError(t, err) foundChunks := map[string]index.ChunkMetas{} - err = indexFile.(*TSDBFile).Index.(*TSDBIndex).forSeries(context.Background(), nil, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { + err = indexFile.(*TSDBFile).Index.(*TSDBIndex).forSeries(context.Background(), nil, 0, math.MaxInt64, func(lbls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { foundChunks[lbls.String()] = append(index.ChunkMetas{}, chks...) }, labels.MustNewMatcher(labels.MatchEqual, "", "")) require.NoError(t, err) diff --git a/pkg/storage/stores/tsdb/head_manager.go b/pkg/storage/stores/tsdb/head_manager.go index e0102bbca0..1a1726601e 100644 --- a/pkg/storage/stores/tsdb/head_manager.go +++ b/pkg/storage/stores/tsdb/head_manager.go @@ -3,6 +3,7 @@ package tsdb import ( "context" "fmt" + "math" "os" "path/filepath" "sort" @@ -747,7 +748,7 @@ func (t *tenantHeads) forAll(fn func(user string, ls labels.Labels, fp uint64, c chks []index.ChunkMeta ) - fp, err := idx.Series(ps.At(), &ls, &chks) + fp, err := idx.Series(ps.At(), 0, math.MaxInt64, &ls, &chks) if err != nil { return errors.Wrapf(err, "iterating postings for tenant: %s", user) diff --git a/pkg/storage/stores/tsdb/head_read.go b/pkg/storage/stores/tsdb/head_read.go index 5ff524115c..0bfb342d76 100644 --- a/pkg/storage/stores/tsdb/head_read.go +++ b/pkg/storage/stores/tsdb/head_read.go @@ -17,6 +17,7 @@ import ( "math" "sort" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" @@ -121,7 +122,7 @@ func (h *headIndexReader) Postings(name string, shard *index.ShardAnnotation, va } // Series returns the series for the given reference. -func (h *headIndexReader) Series(ref storage.SeriesRef, lbls *labels.Labels, chks *[]index.ChunkMeta) (uint64, error) { +func (h *headIndexReader) Series(ref storage.SeriesRef, from int64, through int64, lbls *labels.Labels, chks *[]index.ChunkMeta) (uint64, error) { s := h.head.series.getByID(uint64(ref)) if s == nil { @@ -130,8 +131,16 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, lbls *labels.Labels, chk } *lbls = append((*lbls)[:0], s.ls...) + queryBounds := newBounds(model.Time(from), model.Time(through)) + + *chks = (*chks)[:0] s.Lock() - *chks = append((*chks)[:0], s.chks...) + for _, chk := range s.chks { + if !Overlap(chk, queryBounds) { + continue + } + *chks = append(*chks, chk) + } s.Unlock() return s.fp, nil diff --git a/pkg/storage/stores/tsdb/index/index.go b/pkg/storage/stores/tsdb/index/index.go index 4218e9d6be..52b0491ae1 100644 --- a/pkg/storage/stores/tsdb/index/index.go +++ b/pkg/storage/stores/tsdb/index/index.go @@ -26,6 +26,8 @@ import ( "os" "path/filepath" "sort" + "sync" + "time" "unsafe" "github.com/pkg/errors" @@ -54,6 +56,8 @@ const ( // store every 1024 series' fingerprints in the fingerprint offsets table fingerprintInterval = 1 << 10 + + millisecondsInHour = int64(time.Hour / time.Millisecond) ) type indexWriterStage uint8 @@ -1328,7 +1332,10 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) { return nil, errors.Wrap(err, "loading fingerprint offsets") } - r.dec = &Decoder{LookupSymbol: r.lookupSymbol} + r.dec = &Decoder{ + LookupSymbol: r.lookupSymbol, + chunksSample: map[storage.SeriesRef]*chunkSamples{}, + } return r, nil } @@ -1722,7 +1729,7 @@ func (r *Reader) LabelValueFor(id storage.SeriesRef, label string) (string, erro } // Series reads the series with the given ID and writes its labels and chunks into lbls and chks. -func (r *Reader) Series(id storage.SeriesRef, lbls *labels.Labels, chks *[]ChunkMeta) (uint64, error) { +func (r *Reader) Series(id storage.SeriesRef, from int64, through int64, lbls *labels.Labels, chks *[]ChunkMeta) (uint64, error) { offset := id // In version 2 series IDs are no longer exact references but series are 16-byte padded // and the ID is the multiple of 16 of the actual position. @@ -1734,7 +1741,7 @@ func (r *Reader) Series(id storage.SeriesRef, lbls *labels.Labels, chks *[]Chunk return 0, d.Err() } - fprint, err := r.dec.Series(d.Get(), lbls, chks) + fprint, err := r.dec.Series(d.Get(), id, from, through, lbls, chks) if err != nil { return 0, errors.Wrap(err, "read series") } @@ -1892,12 +1899,56 @@ func (s *stringListIter) Next() bool { func (s stringListIter) At() string { return s.cur } func (s stringListIter) Err() error { return nil } +type chunkSample struct { + largestMaxt int64 // holds largest chunk end time we have seen so far. In other words all the earlier chunks have maxt <= largestMaxt + idx int // index of the chunk in the list which helps with determining position of sampled chunk + offset int // offset is relative to beginning chunk info block i.e after series labels info and chunk count etc + prevChunkMaxt int64 // chunk times are stored as deltas. This is used for calculating mint of sampled chunk +} + +type chunkSamples struct { + sync.RWMutex + chunks []chunkSample +} + +func newChunkSamples() *chunkSamples { + return &chunkSamples{ + chunks: make([]chunkSample, 0, 30), + } +} + +// getChunkSampleForQueryStarting returns back chunk sample which has largest "largestMaxt" that is less than given query start time. +// In other words, return back chunk sample which skips all the chunks that end before query start time. +// If query start is before all "largestMaxt", we would return first chunk sample. +// If query start is after all "largestMaxt", we would return nil. +func (c *chunkSamples) getChunkSampleForQueryStarting(ts int64) *chunkSample { + c.RLock() + defer c.RUnlock() + + // first find position of chunk sample which has smallest "largestMaxt" after ts + i := sort.Search(len(c.chunks), func(i int) bool { + return c.chunks[i].largestMaxt >= ts + }) + + if i >= len(c.chunks) { + return nil + } + + // there could be more chunks of interest between this and previous sample, so we should process chunks from previous sample + if i > 0 { + i-- + } + return &c.chunks[i] +} + // Decoder provides decoding methods for the v1 and v2 index file format. // // It currently does not contain decoding methods for all entry types but can be extended // by them if there's demand. type Decoder struct { - LookupSymbol func(uint32) (string, error) + LookupSymbol func(uint32) (string, error) + chunksSample map[storage.SeriesRef]*chunkSamples + chunksSampleMtx sync.RWMutex } // Postings returns a postings list for b and its number of elements. @@ -1966,8 +2017,79 @@ func (dec *Decoder) LabelValueFor(b []byte, label string) (string, error) { return "", d.Err() } +func (dec *Decoder) getOrCreateChunksSample(d encoding.Decbuf, seriesRef storage.SeriesRef, numChunks int) (*chunkSamples, error) { + dec.chunksSampleMtx.Lock() + sample, ok := dec.chunksSample[seriesRef] + if ok { + dec.chunksSampleMtx.Unlock() + return sample, nil + } + + sample = newChunkSamples() + dec.chunksSample[seriesRef] = sample + sample.Lock() + defer sample.Unlock() + + dec.chunksSampleMtx.Unlock() + + if err := buildChunkSamples(d, numChunks, sample); err != nil { + return nil, err + } + + return sample, nil +} + +// buildChunkSamples samples chunks considering maxt of the indexed chunks. +// It would always sample first and last chunk for returning earlier when query falls out of range on either ends. +// First chunk onwards it would only sample chunks that have maxt greater by at least 1h than previous sampled chunk's maxt. +func buildChunkSamples(d encoding.Decbuf, numChunks int, info *chunkSamples) error { + bufLen := d.Len() + + chunkPos := bufLen - d.Len() + chunkMeta := &ChunkMeta{} + if err := readChunkMeta(&d, 0, chunkMeta); err != nil { + return errors.Wrapf(d.Err(), "read meta for chunk %d", 0) + } + + info.chunks = append(info.chunks, chunkSample{ + largestMaxt: chunkMeta.MaxTime, + idx: 0, + offset: chunkPos, + }) + + t0 := chunkMeta.MaxTime + largestMaxt := chunkMeta.MaxTime + prevLargestMaxt := largestMaxt + + for i := 1; i < numChunks; i++ { + chunkPos = bufLen - d.Len() + if err := readChunkMeta(&d, t0, chunkMeta); err != nil { + return errors.Wrapf(d.Err(), "read meta for chunk %d", i) + } + if chunkMeta.MaxTime > largestMaxt { + largestMaxt = chunkMeta.MaxTime + } + + if d.Err() != nil { + return errors.Wrapf(d.Err(), "read meta for chunk %d", i) + } + + if i == numChunks-1 || largestMaxt-prevLargestMaxt >= millisecondsInHour { + prevLargestMaxt = largestMaxt + info.chunks = append(info.chunks, chunkSample{ + idx: i, + prevChunkMaxt: t0, + largestMaxt: largestMaxt, + offset: chunkPos, + }) + } + t0 = chunkMeta.MaxTime + } + return d.Err() +} + // Series decodes a series entry from the given byte slice into lset and chks. -func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]ChunkMeta) (uint64, error) { +func (dec *Decoder) Series(b []byte, seriesRef storage.SeriesRef, from int64, through int64, lbls *labels.Labels, chks *[]ChunkMeta) (uint64, error) { *lbls = (*lbls)[:0] *chks = (*chks)[:0] @@ -2003,46 +2125,64 @@ func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]ChunkMeta) (ui return 0, d.Err() } - t0 := d.Varint64() - maxt := int64(d.Uvarint64()) + t0 - kb := uint32(d.Uvarint()) - entries := uint32(d.Uvarint64()) - checksum := d.Be32() + chunksSample, err := dec.getOrCreateChunksSample(encoding.DecWrap(tsdb_enc.Decbuf{B: d.Get()}), seriesRef, k) + if err != nil { + return 0, err + } - *chks = append(*chks, ChunkMeta{ - Checksum: checksum, - MinTime: t0, - MaxTime: maxt, - KB: kb, - Entries: entries, - }) - t0 = maxt - - for i := 1; i < k; i++ { - // Decode the diff against previous chunk as varint - // instead of uvarint because chunks may overlap - mint := d.Varint64() + t0 - maxt := int64(d.Uvarint64()) + mint - kb := uint32(d.Uvarint()) - entries := uint32(d.Uvarint64()) - checksum := d.Be32() - t0 = maxt + cs := chunksSample.getChunkSampleForQueryStarting(from) + if cs == nil { + return fprint, nil + } + d.Skip(cs.offset) - if d.Err() != nil { - return 0, errors.Wrapf(d.Err(), "read meta for chunk %d", i) + chunkMeta := &ChunkMeta{} + if err := readChunkMeta(&d, cs.prevChunkMaxt, chunkMeta); err != nil { + return 0, errors.Wrapf(d.Err(), "read meta for chunk %d", cs.idx) + } + + if overlap(from, through, chunkMeta.MinTime, chunkMeta.MaxTime) { + *chks = append(*chks, *chunkMeta) + } + t0 := chunkMeta.MaxTime + + for i := cs.idx + 1; i < k; i++ { + if err := readChunkMeta(&d, t0, chunkMeta); err != nil { + return 0, errors.Wrapf(d.Err(), "read meta for chunk %d", cs.idx) } + t0 = chunkMeta.MaxTime - *chks = append(*chks, ChunkMeta{ - Checksum: checksum, - MinTime: mint, - MaxTime: maxt, - KB: kb, - Entries: entries, - }) + if !overlap(from, through, chunkMeta.MinTime, chunkMeta.MaxTime) { + continue + } + if chunkMeta.MinTime >= through { + break + } + *chks = append(*chks, *chunkMeta) } return fprint, d.Err() } +func readChunkMeta(d *encoding.Decbuf, prevChunkMaxt int64, chunkMeta *ChunkMeta) error { + // Decode the diff against previous chunk as varint + // instead of uvarint because chunks may overlap + chunkMeta.MinTime = d.Varint64() + prevChunkMaxt + chunkMeta.MaxTime = int64(d.Uvarint64()) + chunkMeta.MinTime + chunkMeta.KB = uint32(d.Uvarint()) + chunkMeta.Entries = uint32(d.Uvarint64()) + chunkMeta.Checksum = d.Be32() + + if d.Err() != nil { + return d.Err() + } + + return nil +} + func yoloString(b []byte) string { return *((*string)(unsafe.Pointer(&b))) } + +func overlap(aFrom, aThrough, bFrom, bThrough int64) bool { + return aFrom < bThrough && aThrough > bFrom +} diff --git a/pkg/storage/stores/tsdb/index/index_test.go b/pkg/storage/stores/tsdb/index/index_test.go index 547f2350d1..8aa1ccfb92 100644 --- a/pkg/storage/stores/tsdb/index/index_test.go +++ b/pkg/storage/stores/tsdb/index/index_test.go @@ -17,20 +17,23 @@ import ( "context" "fmt" "hash/crc32" + "math" "math/rand" "os" "path/filepath" "sort" "testing" + "time" "github.com/pkg/errors" "github.com/stretchr/testify/require" "go.uber.org/goleak" + "github.com/grafana/loki/pkg/util/encoding" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/tsdb/encoding" + tsdb_enc "github.com/prometheus/prometheus/tsdb/encoding" "github.com/prometheus/prometheus/util/testutil" ) @@ -185,7 +188,7 @@ func TestIndexRW_Postings(t *testing.T) { var c []ChunkMeta for i := 0; p.Next(); i++ { - _, err := ir.Series(p.At(), &l, &c) + _, err := ir.Series(p.At(), 0, math.MaxInt64, &l, &c) require.NoError(t, err) require.Equal(t, 0, len(c)) @@ -200,7 +203,7 @@ func TestIndexRW_Postings(t *testing.T) { return errors.Errorf("unexpected key length for label indices table %d", len(key)) } - d := encoding.NewDecbufAt(ir.b, int(off), castagnoliTable) + d := tsdb_enc.NewDecbufAt(ir.b, int(off), castagnoliTable) vals := []string{} nc := d.Be32int() if nc != 1 { @@ -304,7 +307,7 @@ func TestPostingsMany(t *testing.T) { var lbls labels.Labels var metas []ChunkMeta for it.Next() { - _, err := ir.Series(it.At(), &lbls, &metas) + _, err := ir.Series(it.At(), 0, math.MaxInt64, &lbls, &metas) require.NoError(t, err) got = append(got, lbls.Get("i")) } @@ -420,7 +423,7 @@ func TestPersistence_index_e2e(t *testing.T) { ref := gotp.At() - _, err := ir.Series(ref, &lset, &chks) + _, err := ir.Series(ref, 0, math.MaxInt64, &lset, &chks) require.NoError(t, err) err = mi.Series(expp.At(), &explset, &expchks) @@ -467,7 +470,7 @@ func TestPersistence_index_e2e(t *testing.T) { func TestDecbufUvarintWithInvalidBuffer(t *testing.T) { b := RealByteSlice([]byte{0x81, 0x81, 0x81, 0x81, 0x81, 0x81}) - db := encoding.NewDecbufUvarintAt(b, 0, castagnoliTable) + db := tsdb_enc.NewDecbufUvarintAt(b, 0, castagnoliTable) require.Error(t, db.Err()) } @@ -544,3 +547,395 @@ func TestDecoder_Postings_WrongInput(t *testing.T) { _, _, err := (&Decoder{}).Postings([]byte("the cake is a lie")) require.Error(t, err) } + +func TestDecoder_ChunkSamples(t *testing.T) { + dir := t.TempDir() + + lbls := []labels.Labels{ + {{Name: "fizz", Value: "buzz"}}, + {{Name: "ping", Value: "pong"}}, + } + + symbols := map[string]struct{}{} + for _, lset := range lbls { + for _, l := range lset { + symbols[l.Name] = struct{}{} + symbols[l.Value] = struct{}{} + } + } + + now := model.Now() + + for name, tc := range map[string]struct { + chunkMetas []ChunkMeta + expectedChunkSamples []chunkSample + }{ + "no overlapping chunks": { + chunkMetas: []ChunkMeta{ + { + MinTime: int64(now), + MaxTime: int64(now.Add(30 * time.Minute)), + }, + { + MinTime: int64(now.Add(40 * time.Minute)), + MaxTime: int64(now.Add(80 * time.Minute)), + }, + { + MinTime: int64(now.Add(90 * time.Minute)), + MaxTime: int64(now.Add(120 * time.Minute)), + }, + { + MinTime: int64(now.Add(130 * time.Minute)), + MaxTime: int64(now.Add(150 * time.Minute)), + }, + }, + expectedChunkSamples: []chunkSample{ + { + largestMaxt: int64(now.Add(30 * time.Minute)), + idx: 0, + prevChunkMaxt: 0, + }, + { + largestMaxt: int64(now.Add(120 * time.Minute)), + idx: 2, + prevChunkMaxt: int64(now.Add(80 * time.Minute)), + }, + { + largestMaxt: int64(now.Add(150 * time.Minute)), + idx: 3, + prevChunkMaxt: int64(now.Add(120 * time.Minute)), + }, + }, + }, + "overlapping chunks": { + chunkMetas: []ChunkMeta{ + { + MinTime: int64(now), + MaxTime: int64(now.Add(30 * time.Minute)), + }, + { + MinTime: int64(now.Add(20 * time.Minute)), + MaxTime: int64(now.Add(80 * time.Minute)), + }, + { + MinTime: int64(now.Add(70 * time.Minute)), + MaxTime: int64(now.Add(120 * time.Minute)), + }, + { + MinTime: int64(now.Add(100 * time.Minute)), + MaxTime: int64(now.Add(110 * time.Minute)), + }, + }, + expectedChunkSamples: []chunkSample{ + { + largestMaxt: int64(now.Add(30 * time.Minute)), + idx: 0, + prevChunkMaxt: 0, + }, + { + largestMaxt: int64(now.Add(120 * time.Minute)), + idx: 2, + prevChunkMaxt: int64(now.Add(80 * time.Minute)), + }, + { + largestMaxt: int64(now.Add(120 * time.Minute)), + idx: 3, + prevChunkMaxt: int64(now.Add(120 * time.Minute)), + }, + }, + }, + "first chunk overlapping all chunks": { + chunkMetas: []ChunkMeta{ + { + MinTime: int64(now), + MaxTime: int64(now.Add(180 * time.Minute)), + }, + { + MinTime: int64(now.Add(20 * time.Minute)), + MaxTime: int64(now.Add(80 * time.Minute)), + }, + { + MinTime: int64(now.Add(70 * time.Minute)), + MaxTime: int64(now.Add(120 * time.Minute)), + }, + { + MinTime: int64(now.Add(110 * time.Minute)), + MaxTime: int64(now.Add(150 * time.Minute)), + }, + }, + expectedChunkSamples: []chunkSample{ + { + largestMaxt: int64(now.Add(180 * time.Minute)), + idx: 0, + prevChunkMaxt: 0, + }, + { + largestMaxt: int64(now.Add(180 * time.Minute)), + idx: 3, + prevChunkMaxt: int64(now.Add(120 * time.Minute)), + }, + }, + }, + "large gaps between chunks": { + chunkMetas: []ChunkMeta{ + { + MinTime: int64(now), + MaxTime: int64(now.Add(30 * time.Minute)), + }, + { + MinTime: int64(now.Add(200 * time.Minute)), + MaxTime: int64(now.Add(280 * time.Minute)), + }, + { + MinTime: int64(now.Add(500 * time.Minute)), + MaxTime: int64(now.Add(520 * time.Minute)), + }, + { + MinTime: int64(now.Add(800 * time.Minute)), + MaxTime: int64(now.Add(835 * time.Minute)), + }, + }, + expectedChunkSamples: []chunkSample{ + { + largestMaxt: int64(now.Add(30 * time.Minute)), + idx: 0, + prevChunkMaxt: 0, + }, + { + largestMaxt: int64(now.Add(280 * time.Minute)), + idx: 1, + prevChunkMaxt: int64(now.Add(30 * time.Minute)), + }, + { + largestMaxt: int64(now.Add(520 * time.Minute)), + idx: 2, + prevChunkMaxt: int64(now.Add(280 * time.Minute)), + }, + { + largestMaxt: int64(now.Add(835 * time.Minute)), + idx: 3, + prevChunkMaxt: int64(now.Add(520 * time.Minute)), + }, + }, + }, + } { + t.Run(name, func(t *testing.T) { + iw, err := NewWriter(context.Background(), filepath.Join(dir, name)) + require.NoError(t, err) + + syms := []string{} + for s := range symbols { + syms = append(syms, s) + } + sort.Strings(syms) + for _, s := range syms { + require.NoError(t, iw.AddSymbol(s)) + } + + for i, l := range lbls { + err = iw.AddSeries(storage.SeriesRef(i), l, model.Fingerprint(l.Hash()), tc.chunkMetas...) + require.NoError(t, err) + } + + err = iw.Close() + require.NoError(t, err) + + ir, err := NewFileReader(filepath.Join(dir, name)) + require.NoError(t, err) + + postings, err := ir.Postings("fizz", nil, "buzz") + require.NoError(t, err) + + require.True(t, postings.Next()) + var lset labels.Labels + var chks []ChunkMeta + + // there should be no chunk samples + require.Nil(t, ir.dec.chunksSample[postings.At()]) + + // read series so that chunk samples get built + _, err = ir.Series(postings.At(), 0, math.MaxInt64, &lset, &chks) + require.NoError(t, err) + + require.Equal(t, tc.chunkMetas, chks) + require.Equal(t, lset, lbls[0]) + + // there should be chunk samples for only the series we read + require.Len(t, ir.dec.chunksSample, 1) + require.NotNil(t, ir.dec.chunksSample[postings.At()]) + require.Len(t, ir.dec.chunksSample[postings.At()].chunks, len(tc.expectedChunkSamples)) + + // build decoder for the series we read to verify the samples + offset := postings.At() * 16 + d := encoding.DecWrap(tsdb_enc.NewDecbufUvarintAt(ir.b, int(offset), castagnoliTable)) + require.NoError(t, d.Err()) + + // read chunk metadata to positing the decoder at the beginning of first chunk + d.Be64() + k := d.Uvarint() + + for i := 0; i < k; i++ { + d.Uvarint() + d.Uvarint() + } + require.Equal(t, len(tc.chunkMetas), d.Uvarint()) + for i, cs := range ir.dec.chunksSample[postings.At()].chunks { + require.Equal(t, tc.expectedChunkSamples[i].idx, cs.idx) + require.Equal(t, tc.expectedChunkSamples[i].largestMaxt, cs.largestMaxt) + require.Equal(t, tc.expectedChunkSamples[i].prevChunkMaxt, cs.prevChunkMaxt) + + dw := encoding.DecWrap(tsdb_enc.Decbuf{B: d.Get()}) + dw.Skip(cs.offset) + chunkMeta := ChunkMeta{} + require.NoError(t, readChunkMeta(&dw, cs.prevChunkMaxt, &chunkMeta)) + require.Equal(t, tc.chunkMetas[tc.expectedChunkSamples[i].idx], chunkMeta) + } + + require.NoError(t, ir.Close()) + }) + } +} + +func TestChunkSamples_getChunkSampleForQueryStarting(t *testing.T) { + for name, tc := range map[string]struct { + chunkSamples *chunkSamples + queryMint int64 + expectedChunkSampleIdx int + }{ + "mint greater than largestMaxt": { + chunkSamples: &chunkSamples{ + chunks: []chunkSample{ + { + largestMaxt: 100, + idx: 0, + offset: 0, + prevChunkMaxt: 0, + }, + { + largestMaxt: 200, + idx: 5, + offset: 5, + prevChunkMaxt: 50, + }, + }, + }, + queryMint: 250, + expectedChunkSampleIdx: -1, + }, + "mint smaller than first largestMaxt": { + chunkSamples: &chunkSamples{ + chunks: []chunkSample{ + { + largestMaxt: 100, + idx: 0, + offset: 0, + prevChunkMaxt: 0, + }, + { + largestMaxt: 200, + idx: 5, + offset: 5, + prevChunkMaxt: 50, + }, + }, + }, + queryMint: 50, + expectedChunkSampleIdx: 0, + }, + "intermediate chunk sample": { + chunkSamples: &chunkSamples{ + chunks: []chunkSample{ + { + largestMaxt: 100, + idx: 0, + offset: 0, + prevChunkMaxt: 0, + }, + { + largestMaxt: 200, + idx: 5, + offset: 5, + prevChunkMaxt: 50, + }, + { + largestMaxt: 350, + idx: 7, + offset: 7, + prevChunkMaxt: 150, + }, + { + largestMaxt: 500, + idx: 9, + offset: 9, + prevChunkMaxt: 250, + }, + }, + }, + queryMint: 250, + expectedChunkSampleIdx: 1, + }, + "mint matching samples largestMaxt": { + chunkSamples: &chunkSamples{ + chunks: []chunkSample{ + { + largestMaxt: 100, + idx: 0, + offset: 0, + prevChunkMaxt: 0, + }, + { + largestMaxt: 200, + idx: 5, + offset: 5, + prevChunkMaxt: 50, + }, + { + largestMaxt: 350, + idx: 7, + offset: 7, + prevChunkMaxt: 150, + }, + { + largestMaxt: 500, + idx: 9, + offset: 9, + prevChunkMaxt: 250, + }, + }, + }, + queryMint: 350, + expectedChunkSampleIdx: 1, + }, + "same chunk sampled": { + chunkSamples: &chunkSamples{ + chunks: []chunkSample{ + { + largestMaxt: 100, + idx: 0, + offset: 0, + prevChunkMaxt: 0, + }, + { + largestMaxt: 100, + idx: 0, + offset: 0, + prevChunkMaxt: 0, + }, + }, + }, + queryMint: 50, + expectedChunkSampleIdx: 0, + }, + } { + t.Run(name, func(t *testing.T) { + chunkSample := tc.chunkSamples.getChunkSampleForQueryStarting(tc.queryMint) + if tc.expectedChunkSampleIdx == -1 { + require.Nil(t, chunkSample) + return + } + + require.NotNil(t, chunkSample) + require.Equal(t, tc.chunkSamples.chunks[tc.expectedChunkSampleIdx], *chunkSample) + }) + } +} diff --git a/pkg/storage/stores/tsdb/index_client.go b/pkg/storage/stores/tsdb/index_client.go index 880164dcef..46d94f848e 100644 --- a/pkg/storage/stores/tsdb/index_client.go +++ b/pkg/storage/stores/tsdb/index_client.go @@ -204,15 +204,13 @@ func (c *IndexClient) Stats(ctx context.Context, userID string, from, through mo acc = &stats.Stats{} } - queryBounds := newBounds(from, through) - for idx, interval := range intervals { if err := c.idx.Stats(ctx, userID, interval.Start, interval.End, acc, shard, func(chk index.ChunkMeta) bool { // for the first split, purely do overlap check to also include chunks having // start time earlier than start time of the table interval we are querying. // for all other splits, consider only chunks that have from >= interval.Start // so that we start after the start time of the index table we are querying. - if Overlap(queryBounds, chk) && (idx == 0 || chk.From() >= interval.Start) { + if idx == 0 || chk.From() >= interval.Start { return true } return false diff --git a/pkg/storage/stores/tsdb/index_client_test.go b/pkg/storage/stores/tsdb/index_client_test.go index b3e20c1bb6..57737d3b93 100644 --- a/pkg/storage/stores/tsdb/index_client_test.go +++ b/pkg/storage/stores/tsdb/index_client_test.go @@ -163,8 +163,8 @@ func TestIndexClient_Stats(t *testing.T) { Start: indexStartYesterday, End: indexStartToday + 1000, }, - expectedNumChunks: 298, // 2 chunks not included at indexStartYesterday since start time is not inclusive - expectedNumEntries: 298, + expectedNumChunks: 300, + expectedNumEntries: 300, expectedNumStreams: 2, }, { @@ -173,8 +173,8 @@ func TestIndexClient_Stats(t *testing.T) { Start: indexStartToday, End: indexStartToday + 1000, }, - expectedNumChunks: 99, // 1 chunk not included at indexStartToday since start time is not inclusive - expectedNumEntries: 99, + expectedNumChunks: 100, + expectedNumEntries: 100, expectedNumStreams: 1, }, { @@ -183,8 +183,8 @@ func TestIndexClient_Stats(t *testing.T) { Start: indexStartToday + 50, End: indexStartToday + 60, }, - expectedNumChunks: 9, // start and end are not inclusive - expectedNumEntries: 9, + expectedNumChunks: 10, // end time not inclusive + expectedNumEntries: 10, expectedNumStreams: 1, }, { diff --git a/pkg/storage/stores/tsdb/querier.go b/pkg/storage/stores/tsdb/querier.go index f7cb23bd7a..3518330582 100644 --- a/pkg/storage/stores/tsdb/querier.go +++ b/pkg/storage/stores/tsdb/querier.go @@ -66,7 +66,7 @@ type IndexReader interface { // Series populates the given labels and chunk metas for the series identified // by the reference. // Returns storage.ErrNotFound if the ref does not resolve to a known series. - Series(ref storage.SeriesRef, lset *labels.Labels, chks *[]index.ChunkMeta) (uint64, error) + Series(ref storage.SeriesRef, from int64, through int64, lset *labels.Labels, chks *[]index.ChunkMeta) (uint64, error) // LabelNames returns all the unique label names present in the index in sorted order. LabelNames(matchers ...*labels.Matcher) ([]string, error) diff --git a/pkg/storage/stores/tsdb/querier_test.go b/pkg/storage/stores/tsdb/querier_test.go index 0ac949604f..1d29327b8f 100644 --- a/pkg/storage/stores/tsdb/querier_test.go +++ b/pkg/storage/stores/tsdb/querier_test.go @@ -2,6 +2,7 @@ package tsdb import ( "context" + "math" "testing" "time" @@ -113,12 +114,12 @@ func TestQueryIndex(t *testing.T) { ) require.True(t, p.Next()) - _, err = reader.Series(p.At(), &ls, &chks) + _, err = reader.Series(p.At(), 0, math.MaxInt64, &ls, &chks) require.Nil(t, err) require.Equal(t, cases[0].labels.String(), ls.String()) require.Equal(t, cases[0].chunks, chks) require.True(t, p.Next()) - _, err = reader.Series(p.At(), &ls, &chks) + _, err = reader.Series(p.At(), 0, math.MaxInt64, &ls, &chks) require.Nil(t, err) require.Equal(t, cases[1].labels.String(), ls.String()) require.Equal(t, cases[1].chunks, chks) diff --git a/pkg/storage/stores/tsdb/single_file_index.go b/pkg/storage/stores/tsdb/single_file_index.go index 095ee07edb..7f1d60cbc9 100644 --- a/pkg/storage/stores/tsdb/single_file_index.go +++ b/pkg/storage/stores/tsdb/single_file_index.go @@ -102,12 +102,7 @@ func (i *TSDBIndex) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) { // fn must NOT capture it's arguments. They're reused across series iterations and returned to // a pool after completion. -func (i *TSDBIndex) forSeries( - ctx context.Context, - shard *index.ShardAnnotation, - fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta), - matchers ...*labels.Matcher, -) error { +func (i *TSDBIndex) forSeries(ctx context.Context, shard *index.ShardAnnotation, from model.Time, through model.Time, fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta), matchers ...*labels.Matcher) error { p, err := PostingsForMatchers(i.reader, shard, matchers...) if err != nil { return err @@ -123,7 +118,7 @@ func (i *TSDBIndex) forSeries( } for p.Next() { - hash, err := i.reader.Series(p.At(), &ls, &chks) + hash, err := i.reader.Series(p.At(), int64(from), int64(through), &ls, &chks) if err != nil { return err } @@ -143,32 +138,23 @@ func (i *TSDBIndex) forSeries( } func (i *TSDBIndex) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error) { - queryBounds := newBounds(from, through) if res == nil { res = ChunkRefsPool.Get() } res = res[:0] - if err := i.forSeries(ctx, shard, - func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { - // TODO(owen-d): use logarithmic approach - for _, chk := range chks { + if err := i.forSeries(ctx, shard, from, through, func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { + for _, chk := range chks { - // current chunk is outside the range of this request - if !Overlap(queryBounds, chk) { - continue - } - - res = append(res, ChunkRef{ - User: userID, // assumed to be the same, will be enforced by caller. - Fingerprint: fp, - Start: chk.From(), - End: chk.Through(), - Checksum: chk.Checksum, - }) - } - }, - matchers...); err != nil { + res = append(res, ChunkRef{ + User: userID, // assumed to be the same, will be enforced by caller. + Fingerprint: fp, + Start: chk.From(), + End: chk.Through(), + Checksum: chk.Checksum, + }) + } + }, matchers...); err != nil { return nil, err } @@ -176,27 +162,20 @@ func (i *TSDBIndex) GetChunkRefs(ctx context.Context, userID string, from, throu } func (i *TSDBIndex) Series(ctx context.Context, _ string, from, through model.Time, res []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error) { - queryBounds := newBounds(from, through) if res == nil { res = SeriesPool.Get() } res = res[:0] - if err := i.forSeries(ctx, shard, - func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { - // TODO(owen-d): use logarithmic approach - for _, chk := range chks { - if Overlap(queryBounds, chk) { - // this series has at least one chunk in the desired range - res = append(res, Series{ - Labels: ls.Copy(), - Fingerprint: fp, - }) - break - } - } - }, - matchers...); err != nil { + if err := i.forSeries(ctx, shard, from, through, func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { + if len(chks) == 0 { + return + } + res = append(res, Series{ + Labels: ls.Copy(), + Fingerprint: fp, + }) + }, matchers...); err != nil { return nil, err } @@ -233,21 +212,18 @@ func (i *TSDBIndex) Identifier(string) SingleTenantTSDBIdentifier { } func (i *TSDBIndex) Stats(ctx context.Context, userID string, from, through model.Time, acc IndexStatsAccumulator, shard *index.ShardAnnotation, shouldIncludeChunk shouldIncludeChunk, matchers ...*labels.Matcher) error { - if err := i.forSeries(ctx, shard, - func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { - // TODO(owen-d): use logarithmic approach - var addedStream bool - for _, chk := range chks { - if shouldIncludeChunk(chk) { - if !addedStream { - acc.AddStream(fp) - addedStream = true - } - acc.AddChunk(fp, chk) + if err := i.forSeries(ctx, shard, from, through, func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) { + var addedStream bool + for _, chk := range chks { + if shouldIncludeChunk(chk) { + if !addedStream { + acc.AddStream(fp) + addedStream = true } + acc.AddChunk(fp, chk) } - }, - matchers...); err != nil { + } + }, matchers...); err != nil { return err } diff --git a/pkg/storage/stores/tsdb/single_file_index_test.go b/pkg/storage/stores/tsdb/single_file_index_test.go index c1d0a5a464..dd4b9395cf 100644 --- a/pkg/storage/stores/tsdb/single_file_index_test.go +++ b/pkg/storage/stores/tsdb/single_file_index_test.go @@ -2,8 +2,10 @@ package tsdb import ( "context" + "math/rand" "sort" "testing" + "time" "github.com/go-kit/log" "github.com/prometheus/common/model" @@ -201,3 +203,54 @@ func TestSingleIdx(t *testing.T) { } } + +func BenchmarkTSDBIndex_GetChunkRefs(b *testing.B) { + now := model.Now() + queryFrom, queryThrough := now.Add(3*time.Hour).Add(time.Millisecond), now.Add(5*time.Hour).Add(-time.Millisecond) + queryBounds := newBounds(queryFrom, queryThrough) + numChunksToMatch := 0 + + var chunkMetas []index.ChunkMeta + // build a chunk for every second with randomized chunk length + for from, through := now, now.Add(24*time.Hour); from <= through; from = from.Add(time.Second) { + // randomize chunk length between 1-120 mins + chunkLenMin := rand.Intn(120) + if chunkLenMin == 0 { + chunkLenMin = 1 + } + chunkMeta := index.ChunkMeta{ + MinTime: int64(from), + MaxTime: int64(from.Add(time.Duration(chunkLenMin) * time.Minute)), + Checksum: uint32(from), + Entries: 1, + } + chunkMetas = append(chunkMetas, chunkMeta) + if Overlap(chunkMeta, queryBounds) { + numChunksToMatch++ + } + } + + tempDir := b.TempDir() + tsdbIndex := BuildIndex(b, tempDir, []LoadableSeries{ + { + Labels: mustParseLabels(`{foo="bar", fizz="buzz"}`), + Chunks: chunkMetas, + }, + { + Labels: mustParseLabels(`{foo="bar", ping="pong"}`), + Chunks: chunkMetas, + }, + { + Labels: mustParseLabels(`{foo1="bar1", ping="pong"}`), + Chunks: chunkMetas, + }, + }) + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + chkRefs, err := tsdbIndex.GetChunkRefs(context.Background(), "fake", queryFrom, queryThrough, nil, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + require.NoError(b, err) + require.Len(b, chkRefs, numChunksToMatch*2) + } +}