From b52d836867d3e26d9dcd864ba652d23ce82a97b6 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Fri, 13 Oct 2023 09:05:21 -0700 Subject: [PATCH] Blooms/chunk check (#10886) [WIP] Adds ability to return the list of chunks needed to be downloaded by a query based on bloom membership results --- pkg/storage/bloom/v1/TODO.md | 7 +-- pkg/storage/bloom/v1/block.go | 60 ++++++++++++++++++++++++ pkg/storage/bloom/v1/index.go | 67 +++++++++++++++++++++++++++ pkg/storage/bloom/v1/index_test.go | 73 ++++++++++++++++++++++++++++++ 4 files changed, 201 insertions(+), 6 deletions(-) diff --git a/pkg/storage/bloom/v1/TODO.md b/pkg/storage/bloom/v1/TODO.md index d7d2e49e9c..85d8bfcbfe 100644 --- a/pkg/storage/bloom/v1/TODO.md +++ b/pkg/storage/bloom/v1/TODO.md @@ -1,10 +1,5 @@ * Should be able to read bloom as a []byte without copying it during decoding * It's immutable + partition offsets are calculable, etc - * can encode version, parameters as the last n bytes, each partition's byte range can be determined from that. No need to unpack -* implement streaming encoding.Decbuf over io.ReadSeeker -* Build & load from directories * Less copying! I've taken some shortcuts we'll need to refactor to avoid copying []byte around in a few places * more sophisticated querying methods -* queue access to blooms -* io.reader based decoder -* tar support \ No newline at end of file +* queue access to blooms \ No newline at end of file diff --git a/pkg/storage/bloom/v1/block.go b/pkg/storage/bloom/v1/block.go index 8079474190..34edc53cb4 100644 --- a/pkg/storage/bloom/v1/block.go +++ b/pkg/storage/bloom/v1/block.go @@ -1,6 +1,8 @@ package v1 import ( + "fmt" + "github.com/pkg/errors" "github.com/prometheus/common/model" ) @@ -111,3 +113,61 @@ func (bq *BlockQuerier) Err() error { return bq.blooms.Err() } + +// CheckChunksForSeries checks if the given chunks pass a set of searches in the given bloom block. +// It returns the list of chunks which will need to be downloaded for a query based on the initial list +// passed as the `chks` argument. Chunks will be removed from the result set if they they are indexed in the bloom +// and fail to pass all the searches. +func (bq *BlockQuerier) CheckChunksForSeries(fp model.Fingerprint, chks ChunkRefs, searches [][]byte) (ChunkRefs, error) { + if err := bq.Seek(fp); err != nil { + return chks, errors.Wrapf(err, "seeking to series for fp: %v", fp) + } + + if !bq.series.Next() { + return chks, nil + } + + series := bq.series.At() + if series.Fingerprint != fp { + return chks, nil + } + + bq.blooms.Seek(series.Offset) + if !bq.blooms.Next() { + return chks, fmt.Errorf("seeking to bloom for fp: %v", fp) + } + + bloom := bq.blooms.At() + + // First, see if the search passes the series level bloom before checking for chunks individually + for _, search := range searches { + if !bloom.sbf.Test(search) { + // the entire series bloom didn't pass one of the searches, + // so we can skip checking chunks individually. + // We still return all chunks that are not included in the bloom + // as they may still have the data + return chks.Unless(series.Chunks), nil + } + } + + // TODO(owen-d): pool, memoize chunk search prefix creation + + // Check chunks individually now + mustCheck, inBlooms := chks.Compare(series.Chunks, true) + +outer: + for _, chk := range inBlooms { + for _, search := range searches { + // TODO(owen-d): meld chunk + search into a single byte slice from the block schema + var combined = search + + if !bloom.sbf.Test(combined) { + continue outer + } + } + // chunk passed all searches, add to the list of chunks to download + mustCheck = append(mustCheck, chk) + + } + return mustCheck, nil +} diff --git a/pkg/storage/bloom/v1/index.go b/pkg/storage/bloom/v1/index.go index dec7c22661..fc18eb33ac 100644 --- a/pkg/storage/bloom/v1/index.go +++ b/pkg/storage/bloom/v1/index.go @@ -387,6 +387,18 @@ type ChunkRef struct { Checksum uint32 } +func (r *ChunkRef) Less(other ChunkRef) bool { + if r.Start != other.Start { + return r.Start < other.Start + } + + if r.End != other.End { + return r.End < other.End + } + + return r.Checksum < other.Checksum +} + func (r *ChunkRef) Encode(enc *encoding.Encbuf, previousEnd model.Time) model.Time { // delta encode start time enc.PutVarint64(int64(r.Start - previousEnd)) @@ -417,3 +429,58 @@ func (o *BloomOffset) Decode(dec *encoding.Decbuf, previousOffset BloomOffset) e o.ByteOffset = previousOffset.ByteOffset + dec.Uvarint() return dec.Err() } + +type ChunkRefs []ChunkRef + +func (refs ChunkRefs) Len() int { + return len(refs) +} + +func (refs ChunkRefs) Less(i, j int) bool { + return refs[i].Less(refs[j]) +} + +func (refs ChunkRefs) Swap(i, j int) { + refs[i], refs[j] = refs[j], refs[i] +} + +// Unless returns the chunk refs in this set that are not in the other set. +// Both must be sorted. +func (refs ChunkRefs) Unless(others []ChunkRef) ChunkRefs { + res, _ := refs.Compare(others, false) + return res +} + +// Compare returns two sets of chunk refs, both must be sorted: +// 1) the chunk refs which are in the original set but not in the other set +// 2) the chunk refs which are in both sets +// the `populateInclusive` argument allows avoiding populating the inclusive set +// if it is not needed +// TODO(owen-d): can be improved to use binary search when one list +// is signficantly larger than the other +func (refs ChunkRefs) Compare(others ChunkRefs, populateInclusve bool) (exclusive ChunkRefs, inclusive ChunkRefs) { + var i, j int + for i < len(refs) && j < len(others) { + switch { + + case refs[i] == others[j]: + if populateInclusve { + inclusive = append(inclusive, refs[i]) + } + i++ + j++ + case refs[i].Less(others[j]): + exclusive = append(exclusive, refs[i]) + i++ + default: + j++ + } + } + + // append any remaining refs + if i < len(refs) { + exclusive = append(exclusive, refs[i:]...) + } + + return +} diff --git a/pkg/storage/bloom/v1/index_test.go b/pkg/storage/bloom/v1/index_test.go index 97d6eb9585..08d2a9a216 100644 --- a/pkg/storage/bloom/v1/index_test.go +++ b/pkg/storage/bloom/v1/index_test.go @@ -72,3 +72,76 @@ func TestSeriesEncoding(t *testing.T) { require.Equal(t, src.Offset, offset) require.Equal(t, src, dst) } + +func TestChunkRefCompare(t *testing.T) { + for _, tc := range []struct { + desc string + left, right, exclusive, inclusive ChunkRefs + }{ + { + desc: "empty", + left: nil, + right: nil, + exclusive: nil, + inclusive: nil, + }, + { + desc: "left empty", + left: nil, + right: ChunkRefs{{Start: 1, End: 2}}, + exclusive: nil, + inclusive: nil, + }, + { + desc: "right empty", + left: ChunkRefs{{Start: 1, End: 2}}, + right: nil, + exclusive: ChunkRefs{{Start: 1, End: 2}}, + inclusive: nil, + }, + { + desc: "left before right", + left: ChunkRefs{{Start: 1, End: 2}}, + right: ChunkRefs{{Start: 3, End: 4}}, + exclusive: ChunkRefs{{Start: 1, End: 2}}, + inclusive: nil, + }, + { + desc: "left after right", + left: ChunkRefs{{Start: 3, End: 4}}, + right: ChunkRefs{{Start: 1, End: 2}}, + exclusive: ChunkRefs{{Start: 3, End: 4}}, + inclusive: nil, + }, + { + desc: "left overlaps right", + left: ChunkRefs{ + {Start: 1, End: 3}, + {Start: 2, End: 4}, + {Start: 3, End: 5}, + {Start: 4, End: 6}, + {Start: 5, End: 7}, + }, + right: ChunkRefs{ + {Start: 2, End: 4}, + {Start: 4, End: 6}, + {Start: 5, End: 6}, // not in left + }, + exclusive: ChunkRefs{ + {Start: 1, End: 3}, + {Start: 3, End: 5}, + {Start: 5, End: 7}, + }, + inclusive: ChunkRefs{ + {Start: 2, End: 4}, + {Start: 4, End: 6}, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + exc, inc := tc.left.Compare(tc.right, true) + require.Equal(t, tc.exclusive, exc, "exclusive cmp") + require.Equal(t, tc.inclusive, inc, "inclusive cmp") + }) + } +}