diff --git a/go.mod b/go.mod index 0994ae21b1..81be6b0584 100644 --- a/go.mod +++ b/go.mod @@ -118,6 +118,7 @@ require ( github.com/IBM/go-sdk-core/v5 v5.13.1 github.com/IBM/ibm-cos-sdk-go v1.10.0 github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc + github.com/efficientgo/core v1.0.0-rc.2 github.com/fsnotify/fsnotify v1.6.0 github.com/grafana/loki/pkg/push v0.0.0-20230127102416-571f88bc5765 github.com/heroku/x v0.0.61 @@ -196,7 +197,6 @@ require ( github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/edsrzf/mmap-go v1.1.0 // indirect - github.com/efficientgo/core v1.0.0-rc.2 // indirect github.com/emicklei/go-restful/v3 v3.10.2 // indirect github.com/envoyproxy/go-control-plane v0.11.1 // indirect github.com/envoyproxy/protoc-gen-validate v1.0.2 // indirect diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index f5c9dfa9b1..0fc52db7f3 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -508,7 +508,7 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me if fromCheckpoint { bc.symbolizer = symbolizerFromCheckpoint(lb) } else { - symbolizer, err := symbolizerFromEnc(lb, getReaderPool(bc.encoding)) + symbolizer, err := symbolizerFromEnc(lb, GetReaderPool(bc.encoding)) if err != nil { return nil, err } @@ -626,7 +626,7 @@ func (c *MemChunk) writeTo(w io.Writer, forCheckpoint bool) (int64, error) { } } else { var err error - n, crcHash, err = c.symbolizer.SerializeTo(w, getWriterPool(c.encoding)) + n, crcHash, err = c.symbolizer.SerializeTo(w, GetWriterPool(c.encoding)) if err != nil { return offset, errors.Wrap(err, "write structured metadata") } @@ -912,7 +912,7 @@ func (c *MemChunk) cut() error { return nil } - b, err := c.head.Serialise(getWriterPool(c.encoding)) + b, err := c.head.Serialise(GetWriterPool(c.encoding)) if err != nil { return err } @@ -1153,14 +1153,14 @@ func (b encBlock) Iterator(ctx context.Context, pipeline log.StreamPipeline, opt if len(b.b) == 0 { return iter.NoopIterator } - return newEntryIterator(ctx, getReaderPool(b.enc), b.b, pipeline, b.format, b.symbolizer, options...) + return newEntryIterator(ctx, GetReaderPool(b.enc), b.b, pipeline, b.format, b.symbolizer, options...) } func (b encBlock) SampleIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.SampleIterator { if len(b.b) == 0 { return iter.NoopIterator } - return newSampleIterator(ctx, getReaderPool(b.enc), b.b, b.format, extractor, b.symbolizer) + return newSampleIterator(ctx, GetReaderPool(b.enc), b.b, b.format, extractor, b.symbolizer) } func (b block) Offset() int { diff --git a/pkg/chunkenc/pool.go b/pkg/chunkenc/pool.go index a19ef28849..ebe1924e8b 100644 --- a/pkg/chunkenc/pool.go +++ b/pkg/chunkenc/pool.go @@ -82,11 +82,11 @@ var ( } ) -func getWriterPool(enc Encoding) WriterPool { - return getReaderPool(enc).(WriterPool) +func GetWriterPool(enc Encoding) WriterPool { + return GetReaderPool(enc).(WriterPool) } -func getReaderPool(enc Encoding) ReaderPool { +func GetReaderPool(enc Encoding) ReaderPool { switch enc { case EncGZIP: return &Gzip diff --git a/pkg/chunkenc/pool_test.go b/pkg/chunkenc/pool_test.go index 9789450067..04ecaadf92 100644 --- a/pkg/chunkenc/pool_test.go +++ b/pkg/chunkenc/pool_test.go @@ -25,8 +25,8 @@ func TestPool(t *testing.T) { var ( buf = bytes.NewBuffer(nil) res = make([]byte, 1024) - wpool = getWriterPool(enc) - rpool = getReaderPool(enc) + wpool = GetWriterPool(enc) + rpool = GetReaderPool(enc) ) w := wpool.GetWriter(buf) diff --git a/pkg/chunkenc/symbols_test.go b/pkg/chunkenc/symbols_test.go index 244acff9ce..86f3440662 100644 --- a/pkg/chunkenc/symbols_test.go +++ b/pkg/chunkenc/symbols_test.go @@ -161,10 +161,10 @@ func TestSymbolizer(t *testing.T) { } buf.Reset() - _, _, err = s.SerializeTo(buf, getWriterPool(encoding)) + _, _, err = s.SerializeTo(buf, GetWriterPool(encoding)) require.NoError(t, err) - loaded, err = symbolizerFromEnc(buf.Bytes(), getReaderPool(encoding)) + loaded, err = symbolizerFromEnc(buf.Bytes(), GetReaderPool(encoding)) require.NoError(t, err) for i, symbols := range tc.expectedSymbols { require.Equal(t, tc.labelsToAdd[i], loaded.Lookup(symbols)) diff --git a/pkg/storage/bloom/spec.go b/pkg/storage/bloom/spec.go new file mode 100644 index 0000000000..19f5940ddf --- /dev/null +++ b/pkg/storage/bloom/spec.go @@ -0,0 +1,29 @@ +package bloom + +import "github.com/prometheus/common/model" + +type Metadata interface { + Version() uint32 + NumSeries() uint64 + NumChunks() uint64 + Size() uint64 // bytes + + // timestamps + From() int64 + Through() int64 + + // series + FromFingerprint() model.Fingerprint + ThroughFingerprint() model.Fingerprint +} + +type Iterator[K any, V any] interface { + Next() bool + Err() error + At() V + Seek(K) Iterator[K, V] +} + +type Block interface { + SeriesIterator() Iterator[model.Fingerprint, []byte] +} diff --git a/pkg/storage/bloom/v1/TODO.md b/pkg/storage/bloom/v1/TODO.md new file mode 100644 index 0000000000..984cb02c43 --- /dev/null +++ b/pkg/storage/bloom/v1/TODO.md @@ -0,0 +1,7 @@ +* Should be able to read bloom as a []byte without copying it during decoding + * It's immutable + partition offsets are calculable, etc + * can encode version, parameters as the last n bytes, each partition's byte range can be determined from that. No need to unpack +* implement streaming encoding.Decbuf over io.ReadSeeker +* Build & load from directories +* Less copying! I've taken some shortcuts we'll need to refactor to avoid copying []byte around in a few places +* more sophisticated querying methods \ No newline at end of file diff --git a/pkg/storage/bloom/v1/block.go b/pkg/storage/bloom/v1/block.go new file mode 100644 index 0000000000..8079474190 --- /dev/null +++ b/pkg/storage/bloom/v1/block.go @@ -0,0 +1,113 @@ +package v1 + +import ( + "github.com/pkg/errors" + "github.com/prometheus/common/model" +) + +type Block struct { + // covers series pages + index BlockIndex + // covers bloom pages + blooms BloomBlock + + // TODO(owen-d): implement + // synthetic header for the entire block + // built from all the pages in the index + header SeriesHeader + + reader BlockReader // should this be decoupled from the struct (accepted as method arg instead)? + + initialized bool +} + +func NewBlock(reader BlockReader) *Block { + return &Block{ + reader: reader, + } +} + +func (b *Block) LoadHeaders() error { + // TODO(owen-d): better control over when to decode + if !b.initialized { + idx, err := b.reader.Index() + if err != nil { + return errors.Wrap(err, "getting index reader") + } + + if err := b.index.DecodeHeaders(idx); err != nil { + return errors.Wrap(err, "decoding index") + } + + blooms, err := b.reader.Blooms() + if err != nil { + return errors.Wrap(err, "getting blooms reader") + } + if err := b.blooms.DecodeHeaders(blooms); err != nil { + return errors.Wrap(err, "decoding blooms") + } + b.initialized = true + } + return nil + +} + +func (b *Block) Series() *LazySeriesIter { + return NewLazySeriesIter(b) +} + +func (b *Block) Blooms() *LazyBloomIter { + return NewLazyBloomIter(b) +} + +type BlockQuerier struct { + series *LazySeriesIter + blooms *LazyBloomIter + + cur *SeriesWithBloom +} + +func NewBlockQuerier(b *Block) *BlockQuerier { + return &BlockQuerier{ + series: NewLazySeriesIter(b), + blooms: NewLazyBloomIter(b), + } +} + +func (bq *BlockQuerier) Seek(fp model.Fingerprint) error { + return bq.series.Seek(fp) +} + +func (bq *BlockQuerier) Next() bool { + if !bq.series.Next() { + return false + } + + series := bq.series.At() + + bq.blooms.Seek(series.Offset) + if !bq.blooms.Next() { + return false + } + + bloom := bq.blooms.At() + + bq.cur = &SeriesWithBloom{ + Series: &series.Series, + Bloom: bloom, + } + return true + +} + +func (bq *BlockQuerier) At() *SeriesWithBloom { + return bq.cur +} + +func (bq *BlockQuerier) Err() error { + if err := bq.series.Err(); err != nil { + return err + } + + return bq.blooms.Err() +} diff --git a/pkg/storage/bloom/v1/block_writer.go b/pkg/storage/bloom/v1/block_writer.go new file mode 100644 index 0000000000..30885d9b87 --- /dev/null +++ b/pkg/storage/bloom/v1/block_writer.go @@ -0,0 +1,186 @@ +package v1 + +import ( + "bytes" + "io" + "os" + "path/filepath" + + "github.com/pkg/errors" + + "github.com/grafana/loki/pkg/storage/chunk/client/util" +) + +const ( + bloomFileName = "bloom" + seriesFileName = "series" +) + +type BlockWriter interface { + Index() (io.WriteCloser, error) + Blooms() (io.WriteCloser, error) + Size() (int, error) // byte size of accumualted index & blooms +} + +type BlockReader interface { + Index() (io.ReadSeeker, error) + Blooms() (io.ReadSeeker, error) +} + +// in memory impl +type MemoryBlockWriter struct { + index, blooms *bytes.Buffer +} + +func NewMemoryBlockWriter(index, blooms *bytes.Buffer) MemoryBlockWriter { + return MemoryBlockWriter{ + index: index, + blooms: blooms, + } +} + +func (b MemoryBlockWriter) Index() (io.WriteCloser, error) { + return NewNoopCloser(b.index), nil +} +func (b MemoryBlockWriter) Blooms() (io.WriteCloser, error) { + return NewNoopCloser(b.blooms), nil +} + +func (b MemoryBlockWriter) Size() (int, error) { + return b.index.Len() + b.blooms.Len(), nil +} + +// Directory based impl +type DirectoryBlockWriter struct { + dir string + blooms, index *os.File + + initialized bool +} + +func NewDirectoryBlockWriter(dir string) *DirectoryBlockWriter { + return &DirectoryBlockWriter{ + dir: dir, + } +} + +func (b *DirectoryBlockWriter) Init() error { + if !b.initialized { + err := util.EnsureDirectory(b.dir) + if err != nil { + return errors.Wrap(err, "creating bloom block dir") + } + + b.index, err = os.Create(filepath.Join(b.dir, seriesFileName)) + if err != nil { + return errors.Wrap(err, "creating series file") + } + + b.blooms, err = os.Create(filepath.Join(b.dir, bloomFileName)) + if err != nil { + return errors.Wrap(err, "creating bloom file") + } + + b.initialized = true + } + return nil +} + +func (b *DirectoryBlockWriter) Index() (io.WriteCloser, error) { + if !b.initialized { + if err := b.Init(); err != nil { + return nil, err + } + } + return b.index, nil +} + +func (b *DirectoryBlockWriter) Blooms() (io.WriteCloser, error) { + if !b.initialized { + if err := b.Init(); err != nil { + return nil, err + } + } + return b.blooms, nil +} + +func (b *DirectoryBlockWriter) Size() (int, error) { + var size int + for _, f := range []*os.File{b.blooms, b.index} { + info, err := f.Stat() + if err != nil { + return 0, errors.Wrapf(err, "error stat'ing file %s", f.Name()) + } + + size += int(info.Size()) + } + return size, nil +} + +// In memory reader +type ByteReader struct { + index, blooms *bytes.Buffer +} + +func NewByteReader(index, blooms *bytes.Buffer) *ByteReader { + return &ByteReader{index: index, blooms: blooms} +} + +func (r *ByteReader) Index() (io.ReadSeeker, error) { + return bytes.NewReader(r.index.Bytes()), nil +} + +func (r *ByteReader) Blooms() (io.ReadSeeker, error) { + return bytes.NewReader(r.blooms.Bytes()), nil +} + +// File reader +type DirectoryBlockReader struct { + dir string + blooms, index *os.File + + initialized bool +} + +func NewDirectoryBlockReader(dir string) *DirectoryBlockReader { + return &DirectoryBlockReader{ + dir: dir, + initialized: false, + } +} + +func (r *DirectoryBlockReader) Init() error { + if !r.initialized { + var err error + r.index, err = os.Open(filepath.Join(r.dir, seriesFileName)) + if err != nil { + return errors.Wrap(err, "opening series file") + } + + r.blooms, err = os.Open(filepath.Join(r.dir, bloomFileName)) + if err != nil { + return errors.Wrap(err, "opening bloom file") + } + + r.initialized = true + } + return nil +} + +func (r *DirectoryBlockReader) Index() (io.ReadSeeker, error) { + if !r.initialized { + if err := r.Init(); err != nil { + return nil, err + } + } + return r.index, nil +} + +func (r *DirectoryBlockReader) Blooms() (io.ReadSeeker, error) { + if !r.initialized { + if err := r.Init(); err != nil { + return nil, err + } + } + return r.blooms, nil +} diff --git a/pkg/storage/bloom/v1/bloom.go b/pkg/storage/bloom/v1/bloom.go new file mode 100644 index 0000000000..8fb2bf5f86 --- /dev/null +++ b/pkg/storage/bloom/v1/bloom.go @@ -0,0 +1,224 @@ +package v1 + +import ( + "bytes" + "fmt" + "io" + + "github.com/owen-d/BoomFilters/boom" + "github.com/pkg/errors" + + "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/util/encoding" +) + +type Bloom struct { + sbf boom.ScalableBloomFilter +} + +func (b *Bloom) Encode(enc *encoding.Encbuf) error { + // divide by 8 b/c bloom capacity is measured in bits, but we want bytes + buf := bytes.NewBuffer(BlockPool.Get(int(b.sbf.Capacity() / 8))) + + _, err := b.sbf.WriteTo(buf) + if err != nil { + return errors.Wrap(err, "encoding bloom filter") + } + + data := buf.Bytes() + enc.PutUvarint(len(data)) // length of bloom filter + enc.PutBytes(data) + BlockPool.Put(data[:0]) // release to pool + return nil +} + +func (b *Bloom) Decode(dec *encoding.Decbuf) error { + ln := dec.Uvarint() + data := dec.Bytes(ln) + + _, err := b.sbf.ReadFrom(bytes.NewReader(data)) + if err != nil { + return errors.Wrap(err, "decoding bloom filter") + } + + return nil +} + +func LazyDecodeBloomPage(dec *encoding.Decbuf, pool chunkenc.ReaderPool, decompressedSize int) (*BloomPageDecoder, error) { + if err := dec.CheckCrc(castagnoliTable); err != nil { + return nil, errors.Wrap(err, "checksumming bloom page") + } + + decompressor, err := pool.GetReader(bytes.NewReader(dec.Get())) + if err != nil { + return nil, errors.Wrap(err, "getting decompressor") + } + + b := BlockPool.Get(decompressedSize)[:decompressedSize] + defer BlockPool.Put(b) + + if _, err = io.ReadFull(decompressor, b); err != nil { + return nil, errors.Wrap(err, "decompressing bloom page") + } + + decoder := NewBloomPageDecoder(b) + + return decoder, nil +} + +func NewBloomPageDecoder(data []byte) *BloomPageDecoder { + // last 8 bytes are the number of blooms in this page + dec := encoding.DecWith(data[len(data)-8:]) + n := int(dec.Be64()) + // reset data to the bloom portion of the page + data = data[:len(data)-8] + dec.B = data + + // reset data to the bloom portion of the page + + decoder := &BloomPageDecoder{ + dec: &dec, + data: data, + n: n, + } + + return decoder +} + +// Decoder is a seekable, reset-able iterator +type BloomPageDecoder struct { + data []byte + dec *encoding.Decbuf + + n int // number of blooms in page + cur *Bloom + err error +} + +func (d *BloomPageDecoder) Reset() { + d.err = nil + d.cur = nil + d.dec.B = d.data +} + +func (d *BloomPageDecoder) Seek(offset int) { + d.dec.B = d.data[offset:] +} + +func (d *BloomPageDecoder) Next() bool { + // end of iteration, no error + if d.dec.Len() == 0 { + return false + } + + var b Bloom + d.err = b.Decode(d.dec) + // end of iteration, error + if d.err != nil { + return false + } + d.cur = &b + return true +} + +func (d *BloomPageDecoder) At() *Bloom { + return d.cur +} + +func (d *BloomPageDecoder) Err() error { + return d.err +} + +type BloomPageHeader struct { + N, Offset, Len, DecompressedLen int +} + +func (h *BloomPageHeader) Encode(enc *encoding.Encbuf) { + enc.PutUvarint(h.N) + enc.PutUvarint(h.Offset) + enc.PutUvarint(h.Len) + enc.PutUvarint(h.DecompressedLen) +} + +func (h *BloomPageHeader) Decode(dec *encoding.Decbuf) error { + h.N = dec.Uvarint() + h.Offset = dec.Uvarint() + h.Len = dec.Uvarint() + h.DecompressedLen = dec.Uvarint() + return dec.Err() +} + +type BloomBlock struct { + schema Schema + pageHeaders []BloomPageHeader +} + +func NewBloomBlock(encoding chunkenc.Encoding) BloomBlock { + return BloomBlock{ + schema: Schema{version: DefaultSchemaVersion, encoding: encoding}, + } +} + +func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) error { + if err := b.schema.DecodeFrom(r); err != nil { + return errors.Wrap(err, "decoding schema") + } + + var ( + err error + dec encoding.Decbuf + ) + // last 12 bytes are (headers offset: 8 byte u64, checksum: 4 byte u32) + if _, err := r.Seek(-12, io.SeekEnd); err != nil { + return errors.Wrap(err, "seeking to bloom headers metadata") + } + dec.B, err = io.ReadAll(r) + if err != nil { + return errors.Wrap(err, "reading bloom headers metadata") + } + + headerOffset := dec.Be64() + + if _, err := r.Seek(int64(headerOffset), io.SeekStart); err != nil { + return errors.Wrap(err, "seeking to bloom headers") + } + dec.B, err = io.ReadAll(r) + if err != nil { + return errors.Wrap(err, "reading bloom page headers") + } + + if err := dec.CheckCrc(castagnoliTable); err != nil { + return errors.Wrap(err, "checksumming page headers") + } + + b.pageHeaders = make([]BloomPageHeader, dec.Uvarint()) + for i := 0; i < len(b.pageHeaders); i++ { + header := &b.pageHeaders[i] + if err := header.Decode(&dec); err != nil { + return errors.Wrapf(err, "decoding %dth series header", i) + } + } + return nil +} + +func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int) (*BloomPageDecoder, error) { + if pageIdx < 0 || pageIdx >= len(b.pageHeaders) { + return nil, fmt.Errorf("invalid page (%d) for bloom page decoding", pageIdx) + } + + page := b.pageHeaders[pageIdx] + + if _, err := r.Seek(int64(page.Offset), io.SeekStart); err != nil { + return nil, errors.Wrap(err, "seeking to bloom page") + } + + data := BlockPool.Get(page.Len)[:page.Len] + _, err := io.ReadFull(r, data) + if err != nil { + return nil, errors.Wrap(err, "reading bloom page") + } + + dec := encoding.DecWith(data) + + return LazyDecodeBloomPage(&dec, b.schema.DecompressorPool(), page.DecompressedLen) +} diff --git a/pkg/storage/bloom/v1/bloom_querier.go b/pkg/storage/bloom/v1/bloom_querier.go new file mode 100644 index 0000000000..372aff8d70 --- /dev/null +++ b/pkg/storage/bloom/v1/bloom_querier.go @@ -0,0 +1,125 @@ +package v1 + +import "github.com/pkg/errors" + +type BloomQuerier interface { + Seek(BloomOffset) (*Bloom, error) +} + +type LazyBloomIter struct { + b *Block + + // state + initialized bool + err error + curPageIndex int + curPage *BloomPageDecoder +} + +func NewLazyBloomIter(b *Block) *LazyBloomIter { + return &LazyBloomIter{ + b: b, + } +} + +func (it *LazyBloomIter) ensureInit() { + // TODO(owen-d): better control over when to decode + if !it.initialized { + if err := it.b.LoadHeaders(); err != nil { + it.err = err + } + it.initialized = true + } +} + +func (it *LazyBloomIter) Seek(offset BloomOffset) { + it.ensureInit() + + // if we need a different page or the current page hasn't been loaded, + // load the desired page + if it.curPageIndex != offset.Page || it.curPage == nil { + r, err := it.b.reader.Blooms() + if err != nil { + it.err = errors.Wrap(err, "getting blooms reader") + return + } + decoder, err := it.b.blooms.BloomPageDecoder(r, offset.Page) + if err != nil { + it.err = errors.Wrap(err, "loading bloom page") + return + } + + it.curPageIndex = offset.Page + it.curPage = decoder + + } + + it.curPage.Seek(offset.ByteOffset) +} + +func (it *LazyBloomIter) Next() bool { + it.ensureInit() + if it.err != nil { + return false + } + return it.next() +} + +func (it *LazyBloomIter) next() bool { + if it.err != nil { + return false + } + + for it.curPageIndex < len(it.b.blooms.pageHeaders) { + // first access of next page + if it.curPage == nil { + r, err := it.b.reader.Blooms() + if err != nil { + it.err = errors.Wrap(err, "getting blooms reader") + return false + } + + it.curPage, err = it.b.blooms.BloomPageDecoder( + r, + it.curPageIndex, + ) + if err != nil { + it.err = err + return false + } + continue + } + + if !it.curPage.Next() { + // there was an error + if it.curPage.Err() != nil { + return false + } + // we've exhausted the current page, progress to next + it.curPageIndex++ + it.curPage = nil + continue + } + + return true + } + + // finished last page + return false +} + +func (it *LazyBloomIter) At() *Bloom { + return it.curPage.At() +} + +func (it *LazyBloomIter) Err() error { + { + if it.err != nil { + return it.err + } + if it.curPage != nil { + return it.curPage.Err() + } + return nil + } +} diff --git a/pkg/storage/bloom/v1/builder.go b/pkg/storage/bloom/v1/builder.go new file mode 100644 index 0000000000..9000eb4799 --- /dev/null +++ b/pkg/storage/bloom/v1/builder.go @@ -0,0 +1,429 @@ +package v1 + +import ( + "bytes" + "fmt" + "hash" + "io" + + "github.com/pkg/errors" + "github.com/prometheus/common/model" + + "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/util/encoding" +) + +// SerieResolver iterates two sequences, one from blocks which have already been indexed +// into blooms and another from TSDBs. The series+chunks which exist in the TSDBs but not the +// blocks need to then be indexed into the blocks. +// type SeriesResolver struct { +// fromBlocks Iterator[Series] +// fromTSDBs Iterator[Series] +// } + +type BlockOptions struct { + schema Schema + + // target size in bytes (decompressed) + // of each page type + SeriesPageSize, BloomPageSize, BlockSize int +} + +type BlockBuilder struct { + opts BlockOptions + + index *IndexBuilder + blooms *BloomBlockBuilder +} + +func NewBlockBuilder(opts BlockOptions, writer BlockWriter) (*BlockBuilder, error) { + index, err := writer.Index() + if err != nil { + return nil, errors.Wrap(err, "initializing index writer") + } + blooms, err := writer.Blooms() + if err != nil { + return nil, errors.Wrap(err, "initializing blooms writer") + } + + return &BlockBuilder{ + opts: opts, + index: NewIndexBuilder(opts, index), + blooms: NewBloomBlockBuilder(opts, blooms), + }, nil +} + +type SeriesWithBloom struct { + Series *Series + Bloom *Bloom +} + +func (b *BlockBuilder) BuildFrom(itr Iterator[SeriesWithBloom]) error { + for itr.Next() { + series := itr.At() + + offset, err := b.blooms.Append(series) + if err != nil { + return errors.Wrapf(err, "writing bloom for series %v", series.Series.Fingerprint) + } + + if err := b.index.Append(SeriesWithOffset{ + Offset: offset, + Series: *series.Series, + }); err != nil { + return errors.Wrapf(err, "writing index for series %v", series.Series.Fingerprint) + } + } + + if err := itr.Err(); err != nil { + return errors.Wrap(err, "iterating series with blooms") + } + + if err := b.blooms.Close(); err != nil { + return errors.Wrap(err, "closing bloom file") + } + if err := b.index.Close(); err != nil { + return errors.Wrap(err, "closing series file") + } + return nil +} + +type BloomBlockBuilder struct { + opts BlockOptions + writer io.WriteCloser + + offset int // track the offset of the file + writtenSchema bool + pages []BloomPageHeader + page PageWriter + scratch *encoding.Encbuf +} + +func NewBloomBlockBuilder(opts BlockOptions, writer io.WriteCloser) *BloomBlockBuilder { + return &BloomBlockBuilder{ + opts: opts, + writer: writer, + page: NewPageWriter(opts.BloomPageSize), + scratch: &encoding.Encbuf{}, + } +} + +func (b *BloomBlockBuilder) WriteSchema() error { + b.scratch.Reset() + b.opts.schema.Encode(b.scratch) + if _, err := b.writer.Write(b.scratch.Get()); err != nil { + return errors.Wrap(err, "writing schema") + } + b.writtenSchema = true + b.offset += b.scratch.Len() + return nil +} + +func (b *BloomBlockBuilder) Append(series SeriesWithBloom) (BloomOffset, error) { + if !b.writtenSchema { + if err := b.WriteSchema(); err != nil { + return BloomOffset{}, errors.Wrap(err, "writing schema") + } + } + + b.scratch.Reset() + if err := series.Bloom.Encode(b.scratch); err != nil { + return BloomOffset{}, errors.Wrapf(err, "encoding bloom for %v", series.Series.Fingerprint) + } + + if !b.page.SpaceFor(b.scratch.Len()) { + if err := b.flushPage(); err != nil { + return BloomOffset{}, errors.Wrap(err, "flushing bloom page") + } + } + + return BloomOffset{ + Page: len(b.pages), + ByteOffset: b.page.Add(b.scratch.Get()), + }, nil +} + +func (b *BloomBlockBuilder) Close() error { + if b.page.Count() > 0 { + if err := b.flushPage(); err != nil { + return errors.Wrap(err, "flushing final bloom page") + } + } + + b.scratch.Reset() + b.scratch.PutUvarint(len(b.pages)) + for _, h := range b.pages { + h.Encode(b.scratch) + } + // put offset to beginning of header section + // cannot be varint encoded because it's offset will be calculated as + // the 8 bytes prior to the checksum + b.scratch.PutBE64(uint64(b.offset)) + + crc32Hash := Crc32HashPool.Get() + defer Crc32HashPool.Put(crc32Hash) + // wrap with final checksum + b.scratch.PutHash(crc32Hash) + _, err := b.writer.Write(b.scratch.Get()) + if err != nil { + return errors.Wrap(err, "writing bloom page headers") + } + return errors.Wrap(b.writer.Close(), "closing bloom writer") +} + +func (b *BloomBlockBuilder) flushPage() error { + crc32Hash := Crc32HashPool.Get() + defer Crc32HashPool.Put(crc32Hash) + + decompressedLen, compressedLen, err := b.page.writePage( + b.writer, + b.opts.schema.CompressorPool(), + crc32Hash, + ) + if err != nil { + return errors.Wrap(err, "writing bloom page") + } + header := BloomPageHeader{ + N: b.page.Count(), + Offset: b.offset, + Len: compressedLen, + DecompressedLen: decompressedLen, + } + b.pages = append(b.pages, header) + b.offset += compressedLen + b.page.Reset() + return nil +} + +type PageWriter struct { + enc *encoding.Encbuf + targetSize int + n int // number of encoded blooms +} + +func NewPageWriter(targetSize int) PageWriter { + return PageWriter{ + enc: &encoding.Encbuf{}, + targetSize: targetSize, + } +} + +func (w *PageWriter) Count() int { + return w.n +} + +func (w *PageWriter) Reset() { + w.enc.Reset() + w.n = 0 +} + +func (w *PageWriter) SpaceFor(numBytes int) bool { + // if a single bloom exceeds the target size, still accept it + // otherwise only accept it if adding it would not exceed the target size + return w.n == 0 || w.enc.Len()+numBytes <= w.targetSize +} + +func (w *PageWriter) Add(item []byte) (offset int) { + offset = w.enc.Len() + w.enc.PutBytes(item) + w.n++ + return offset +} + +func (w *PageWriter) writePage(writer io.Writer, pool chunkenc.WriterPool, crc32Hash hash.Hash32) (int, int, error) { + // write the number of blooms in this page, must not be varint + // so we can calculate it's position+len during decoding + w.enc.PutBE64(uint64(w.n)) + decompressedLen := w.enc.Len() + + buf := &bytes.Buffer{} + compressor := pool.GetWriter(buf) + defer pool.PutWriter(compressor) + + if _, err := compressor.Write(w.enc.Get()); err != nil { + return 0, 0, errors.Wrap(err, "compressing page") + } + + if err := compressor.Close(); err != nil { + return 0, 0, errors.Wrap(err, "closing compressor") + } + + // replace the encoded series page with the compressed one + w.enc.B = buf.Bytes() + w.enc.PutHash(crc32Hash) + + // write the page + if _, err := writer.Write(w.enc.Get()); err != nil { + return 0, 0, errors.Wrap(err, "writing page") + } + return decompressedLen, w.enc.Len(), nil +} + +type IndexBuilder struct { + opts BlockOptions + writer io.WriteCloser + + offset int // track the offset of the file + writtenSchema bool + pages []SeriesPageHeaderWithOffset + page PageWriter + scratch *encoding.Encbuf + + previousFp model.Fingerprint + previousOffset BloomOffset + fromFp model.Fingerprint + fromTs, throughTs model.Time +} + +func NewIndexBuilder(opts BlockOptions, writer io.WriteCloser) *IndexBuilder { + return &IndexBuilder{ + opts: opts, + writer: writer, + page: NewPageWriter(opts.SeriesPageSize), + scratch: &encoding.Encbuf{}, + } +} + +func (b *IndexBuilder) WriteSchema() error { + b.scratch.Reset() + b.opts.schema.Encode(b.scratch) + if _, err := b.writer.Write(b.scratch.Get()); err != nil { + return errors.Wrap(err, "writing schema") + } + b.writtenSchema = true + b.offset += b.scratch.Len() + return nil +} + +func (b *IndexBuilder) Append(series SeriesWithOffset) error { + if !b.writtenSchema { + if err := b.WriteSchema(); err != nil { + return errors.Wrap(err, "writing schema") + } + } + + b.scratch.Reset() + // we don't want to update the previous pointers yet in case + // we need to flush the page first which would + // be passed the incorrect final fp/offset + previousFp, previousOffset := series.Encode(b.scratch, b.previousFp, b.previousOffset) + + if !b.page.SpaceFor(b.scratch.Len()) { + if err := b.flushPage(); err != nil { + return errors.Wrap(err, "flushing series page") + } + + // re-encode now that a new page has been cut and we use delta-encoding + b.scratch.Reset() + previousFp, previousOffset = series.Encode(b.scratch, b.previousFp, b.previousOffset) + } + b.previousFp = previousFp + b.previousOffset = previousOffset + + switch { + case b.page.Count() == 0: + // Special case: this is the first series in a page + if len(series.Chunks) < 1 { + return fmt.Errorf("series with zero chunks for fingerprint %v", series.Fingerprint) + } + b.fromFp = series.Fingerprint + b.fromTs, b.throughTs = chkBounds(series.Chunks) + case b.previousFp > series.Fingerprint: + return fmt.Errorf("out of order series fingerprint for series %v", series.Fingerprint) + default: + from, through := chkBounds(series.Chunks) + if b.fromTs.After(from) { + b.fromTs = from + } + if b.throughTs.Before(through) { + b.throughTs = through + } + } + + _ = b.page.Add(b.scratch.Get()) + b.previousFp = series.Fingerprint + b.previousOffset = series.Offset + return nil +} + +// must be > 1 +func chkBounds(chks []ChunkRef) (from, through model.Time) { + from, through = chks[0].Start, chks[0].End + for _, chk := range chks[1:] { + if chk.Start.Before(from) { + from = chk.Start + } + + if chk.End.After(through) { + through = chk.End + } + } + return +} + +func (b *IndexBuilder) flushPage() error { + crc32Hash := Crc32HashPool.Get() + defer Crc32HashPool.Put(crc32Hash) + + decompressedLen, compressedLen, err := b.page.writePage( + b.writer, + b.opts.schema.CompressorPool(), + crc32Hash, + ) + if err != nil { + return errors.Wrap(err, "writing series page") + } + + header := SeriesPageHeaderWithOffset{ + Offset: b.offset, + Len: compressedLen, + DecompressedLen: decompressedLen, + SeriesHeader: SeriesHeader{ + NumSeries: b.page.Count(), + FromFp: b.fromFp, + ThroughFp: b.previousFp, + FromTs: b.fromTs, + ThroughTs: b.throughTs, + }, + } + + b.pages = append(b.pages, header) + b.offset += compressedLen + + b.fromFp = 0 + b.fromTs = 0 + b.throughTs = 0 + b.previousFp = 0 + b.previousOffset = BloomOffset{} + b.page.Reset() + + return nil +} + +func (b *IndexBuilder) Close() error { + if b.page.Count() > 0 { + if err := b.flushPage(); err != nil { + return errors.Wrap(err, "flushing final series page") + } + } + + b.scratch.Reset() + b.scratch.PutUvarint(len(b.pages)) + for _, h := range b.pages { + h.Encode(b.scratch) + } + + // put offset to beginning of header section + // cannot be varint encoded because it's offset will be calculated as + // the 8 bytes prior to the checksum + b.scratch.PutBE64(uint64(b.offset)) + crc32Hash := Crc32HashPool.Get() + defer Crc32HashPool.Put(crc32Hash) + // wrap with final checksum + b.scratch.PutHash(crc32Hash) + _, err := b.writer.Write(b.scratch.Get()) + if err != nil { + return errors.Wrap(err, "writing series page headers") + } + return errors.Wrap(b.writer.Close(), "closing series writer") +} diff --git a/pkg/storage/bloom/v1/builder_test.go b/pkg/storage/bloom/v1/builder_test.go new file mode 100644 index 0000000000..5629ad2032 --- /dev/null +++ b/pkg/storage/bloom/v1/builder_test.go @@ -0,0 +1,112 @@ +package v1 + +import ( + "bytes" + "fmt" + "testing" + + "github.com/owen-d/BoomFilters/boom" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/chunkenc" +) + +func mkBasicSeriesWithBlooms(n int, fromFp, throughFp model.Fingerprint, fromTs, throughTs model.Time) (seriesList []SeriesWithBloom) { + for i := 0; i < n; i++ { + var series Series + step := (throughFp - fromFp) / (model.Fingerprint(n)) + series.Fingerprint = fromFp + model.Fingerprint(i)*step + timeDelta := fromTs + (throughTs-fromTs)/model.Time(n)*model.Time(i) + series.Chunks = []ChunkRef{ + { + Start: fromTs + timeDelta*model.Time(i), + End: fromTs + timeDelta*model.Time(i), + Checksum: uint32(i), + }, + } + + var bloom Bloom + bloom.sbf = *boom.NewScalableBloomFilter(1024, 0.01, 0.8) + bloom.sbf.Add([]byte(fmt.Sprint(i))) + + seriesList = append(seriesList, SeriesWithBloom{ + Series: &series, + Bloom: &bloom, + }) + } + return +} + +func TestBlockBuilderRoundTrip(t *testing.T) { + numSeries := 100 + data := mkBasicSeriesWithBlooms(numSeries, 0, 0xffff, 0, 10000) + + // references for linking in memory reader+writer + indexBuf := bytes.NewBuffer(nil) + bloomsBuf := bytes.NewBuffer(nil) + // directory for directory reader+writer + tmpDir := t.TempDir() + + for _, tc := range []struct { + desc string + writer BlockWriter + reader BlockReader + }{ + { + desc: "in-memory", + writer: NewMemoryBlockWriter(indexBuf, bloomsBuf), + reader: NewByteReader(indexBuf, bloomsBuf), + }, + { + desc: "directory", + writer: NewDirectoryBlockWriter(tmpDir), + reader: NewDirectoryBlockReader(tmpDir), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + builder, err := NewBlockBuilder( + BlockOptions{ + schema: Schema{ + version: DefaultSchemaVersion, + encoding: chunkenc.EncSnappy, + }, + SeriesPageSize: 100, + BloomPageSize: 10 << 10, + }, + tc.writer, + ) + + require.Nil(t, err) + itr := NewSliceIter[SeriesWithBloom](data) + require.Nil(t, builder.BuildFrom(itr)) + block := NewBlock(tc.reader) + querier := NewBlockQuerier(block) + + for i := 0; i < len(data); i++ { + require.Equal(t, true, querier.Next(), "on iteration %d with error %v", i, querier.Err()) + got := querier.At() + require.Equal(t, data[i].Series, got.Series) + require.Equal(t, data[i].Bloom, got.Bloom) + } + // ensure no error + require.Nil(t, querier.Err()) + // ensure it's exhausted + require.Equal(t, false, querier.Next()) + + // test seek + i := numSeries / 2 + half := data[i:] + require.Nil(t, querier.Seek(half[0].Series.Fingerprint)) + for j := 0; j < len(half); j++ { + require.Equal(t, true, querier.Next(), "on iteration %d", j) + got := querier.At() + require.Equal(t, half[j].Series, got.Series) + require.Equal(t, half[j].Bloom, got.Bloom) + require.Nil(t, querier.Err()) + } + require.Equal(t, false, querier.Next()) + + }) + } +} diff --git a/pkg/storage/bloom/v1/index.go b/pkg/storage/bloom/v1/index.go new file mode 100644 index 0000000000..dec7c22661 --- /dev/null +++ b/pkg/storage/bloom/v1/index.go @@ -0,0 +1,419 @@ +package v1 + +import ( + "bytes" + "io" + + "github.com/pkg/errors" + "github.com/prometheus/common/model" + + "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/util/encoding" +) + +type Schema struct { + version byte + encoding chunkenc.Encoding +} + +// byte length +func (s Schema) Len() int { + // magic number + version + encoding + return 4 + 1 + 1 +} + +func (s *Schema) DecompressorPool() chunkenc.ReaderPool { + return chunkenc.GetReaderPool(s.encoding) +} + +func (s *Schema) CompressorPool() chunkenc.WriterPool { + return chunkenc.GetWriterPool(s.encoding) +} + +func (s *Schema) Encode(enc *encoding.Encbuf) { + enc.Reset() + enc.PutBE32(magicNumber) + enc.PutByte(s.version) + enc.PutByte(byte(s.encoding)) +} + +func (s *Schema) DecodeFrom(r io.ReadSeeker) error { + // TODO(owen-d): improve allocations + schemaBytes := make([]byte, s.Len()) + _, err := io.ReadFull(r, schemaBytes) + if err != nil { + return errors.Wrap(err, "reading schema") + } + + dec := encoding.DecWith(schemaBytes) + return s.Decode(&dec) +} + +func (s *Schema) Decode(dec *encoding.Decbuf) error { + number := dec.Be32() + if number != magicNumber { + return errors.Errorf("invalid magic number. expected %x, got %x", magicNumber, number) + } + s.version = dec.Byte() + if s.version != 1 { + return errors.Errorf("invalid version. expected %d, got %d", 1, s.version) + } + + s.encoding = chunkenc.Encoding(dec.Byte()) + if _, err := chunkenc.ParseEncoding(s.encoding.String()); err != nil { + return errors.Wrap(err, "parsing encoding") + } + + return dec.Err() +} + +// Block index is a set of series pages along with +// the headers for each page +type BlockIndex struct { + schema Schema + pageHeaders []SeriesPageHeaderWithOffset // headers for each series page +} + +func NewBlockIndex(encoding chunkenc.Encoding) BlockIndex { + return BlockIndex{ + schema: Schema{version: DefaultSchemaVersion, encoding: encoding}, + } +} + +func (b *BlockIndex) DecodeHeaders(r io.ReadSeeker) error { + if err := b.schema.DecodeFrom(r); err != nil { + return errors.Wrap(err, "decoding schema") + } + + var ( + err error + dec encoding.Decbuf + ) + + // last 12 bytes are (headers offset: 8 byte u64, checksum: 4 byte u32) + if _, err := r.Seek(-12, io.SeekEnd); err != nil { + return errors.Wrap(err, "seeking to bloom headers metadata") + } + dec.B, err = io.ReadAll(r) + if err != nil { + return errors.Wrap(err, "reading bloom headers metadata") + } + + headerOffset := dec.Be64() + if _, err := r.Seek(int64(headerOffset), io.SeekStart); err != nil { + return errors.Wrap(err, "seeking to index headers") + } + dec.B, err = io.ReadAll(r) + if err != nil { + return errors.Wrap(err, "reading index page headers") + } + + if err := dec.CheckCrc(castagnoliTable); err != nil { + return errors.Wrap(err, "checksumming page headers") + } + + b.pageHeaders = make( + []SeriesPageHeaderWithOffset, + dec.Uvarint(), + ) + + for i := 0; i < len(b.pageHeaders); i++ { + var s SeriesPageHeaderWithOffset + if err := s.Decode(&dec); err != nil { + return errors.Wrapf(err, "decoding %dth series header", i) + } + b.pageHeaders[i] = s + } + + return nil +} + +// decompress page and return an iterator over the bytes +func (b *BlockIndex) NewSeriesPageDecoder(r io.ReadSeeker, header SeriesPageHeaderWithOffset) (*SeriesPageDecoder, error) { + + if _, err := r.Seek(int64(header.Offset), io.SeekStart); err != nil { + return nil, errors.Wrap(err, "seeking to series page") + } + + data := BlockPool.Get(header.Len)[:header.Len] + defer BlockPool.Put(data) + _, err := io.ReadFull(r, data) + if err != nil { + return nil, errors.Wrap(err, "reading series page") + } + + dec := encoding.DecWith(data) + + if err := dec.CheckCrc(castagnoliTable); err != nil { + return nil, errors.Wrap(err, "checksumming series page") + } + + decompressor, err := b.schema.DecompressorPool().GetReader(bytes.NewReader(dec.Get())) + if err != nil { + return nil, errors.Wrap(err, "getting decompressor") + } + + decompressed := BlockPool.Get(header.DecompressedLen)[:header.DecompressedLen] + if _, err = io.ReadFull(decompressor, decompressed); err != nil { + return nil, errors.Wrap(err, "decompressing series page") + } + + // replace decoder's input with the now-decompressed data + dec.B = decompressed + + res := &SeriesPageDecoder{ + data: decompressed, + header: header.SeriesHeader, + + i: -1, + } + + res.Reset() + return res, nil +} + +// Header for a series page +type SeriesPageHeaderWithOffset struct { + Offset, Len, DecompressedLen int + SeriesHeader +} + +func (h *SeriesPageHeaderWithOffset) Encode(enc *encoding.Encbuf) { + enc.PutUvarint(h.Offset) + enc.PutUvarint(h.Len) + enc.PutUvarint(h.DecompressedLen) + h.SeriesHeader.Encode(enc) +} + +func (h *SeriesPageHeaderWithOffset) Decode(dec *encoding.Decbuf) error { + h.Offset = dec.Uvarint() + h.Len = dec.Uvarint() + h.DecompressedLen = dec.Uvarint() + return h.SeriesHeader.Decode(dec) +} + +type SeriesHeader struct { + NumSeries int + FromFp, ThroughFp model.Fingerprint + FromTs, ThroughTs model.Time +} + +// build one aggregated header for the entire block +func aggregateHeaders(xs []SeriesHeader) SeriesHeader { + if len(xs) == 0 { + return SeriesHeader{} + } + + res := SeriesHeader{ + FromFp: xs[0].FromFp, + ThroughFp: xs[len(xs)-1].ThroughFp, + } + + for _, x := range xs { + if x.FromTs < res.FromTs { + res.FromTs = x.FromTs + } + if x.ThroughTs > res.ThroughTs { + res.ThroughTs = x.ThroughTs + } + } + return res +} + +func (h *SeriesHeader) Encode(enc *encoding.Encbuf) { + enc.PutUvarint(h.NumSeries) + enc.PutUvarint64(uint64(h.FromFp)) + enc.PutUvarint64(uint64(h.ThroughFp)) + enc.PutVarint64(int64(h.FromTs)) + enc.PutVarint64(int64(h.ThroughTs)) +} + +func (h *SeriesHeader) Decode(dec *encoding.Decbuf) error { + h.NumSeries = dec.Uvarint() + h.FromFp = model.Fingerprint(dec.Uvarint64()) + h.ThroughFp = model.Fingerprint(dec.Uvarint64()) + h.FromTs = model.Time(dec.Varint64()) + h.ThroughTs = model.Time(dec.Varint64()) + return dec.Err() +} + +// can decode a series page one item at a time, useful when we don't +// need to iterate an entire page +type SeriesPageDecoder struct { + data []byte + dec encoding.Decbuf + header SeriesHeader + + // state + i int // current index + cur *SeriesWithOffset + err error + previousFp model.Fingerprint // previous series' fingerprint for delta-decoding + previousOffset BloomOffset // previous series' bloom offset for delta-decoding +} + +func (d *SeriesPageDecoder) Reset() { + d.i = -1 + d.cur = nil + d.err = nil + d.previousFp = 0 + d.previousOffset = BloomOffset{} + d.dec.B = d.data +} + +func (d *SeriesPageDecoder) Next() bool { + if d.err != nil { + return false + } + + d.i++ + if d.i >= d.header.NumSeries { + return false + } + + var res SeriesWithOffset + d.previousFp, d.previousOffset, d.err = res.Decode(&d.dec, d.previousFp, d.previousOffset) + if d.err != nil { + return false + } + + d.cur = &res + return true +} + +func (d *SeriesPageDecoder) Seek(fp model.Fingerprint) { + if fp > d.header.ThroughFp || fp < d.header.FromFp { + // shortcut: we know the fingerprint is not in this page + // so masquerade the index as if we've already iterated through + d.i = d.header.NumSeries + } + + // if we've seen an error or we've potentially skipped the desired fp, reset the page state + if d.Err() != nil || (d.cur != nil && d.cur.Fingerprint >= fp) { + d.Reset() + } + + for { + // previous byte offset in decoder, used for resetting + // position after finding the desired fp + offset := len(d.data) - d.dec.Len() + // previous bloom offset in decoder, used for + // resetting position after finding the desired fp + // since offsets are delta-encoded + previousBloomOffset := d.previousOffset + previousFp := d.previousFp + + // iteration finished + if ok := d.Next(); !ok { + return + } + + // we've seeked to the correct location. reverse one step and return + cur := d.At() + if cur.Fingerprint >= fp { + d.i-- + d.previousOffset = previousBloomOffset + d.previousFp = previousFp + d.dec.B = d.data[offset:] + return + } + } +} + +func (d *SeriesPageDecoder) At() (res *SeriesWithOffset) { + return d.cur +} + +func (d *SeriesPageDecoder) Err() error { + if d.err != nil { + return d.err + } + return d.dec.Err() +} + +type Series struct { + Fingerprint model.Fingerprint + Chunks []ChunkRef +} + +type SeriesWithOffset struct { + Offset BloomOffset + Series +} + +func (s *SeriesWithOffset) Encode( + enc *encoding.Encbuf, + previousFp model.Fingerprint, + previousOffset BloomOffset, +) (model.Fingerprint, BloomOffset) { + // delta encode fingerprint + enc.PutBE64(uint64(s.Fingerprint - previousFp)) + // delta encode offsets + s.Offset.Encode(enc, previousOffset) + + // encode chunks using delta encoded timestamps + var lastEnd model.Time + enc.PutUvarint(len(s.Chunks)) + for _, chunk := range s.Chunks { + lastEnd = chunk.Encode(enc, lastEnd) + } + + return s.Fingerprint, s.Offset +} + +func (s *SeriesWithOffset) Decode(dec *encoding.Decbuf, previousFp model.Fingerprint, previousOffset BloomOffset) (model.Fingerprint, BloomOffset, error) { + s.Fingerprint = previousFp + model.Fingerprint(dec.Be64()) + if err := s.Offset.Decode(dec, previousOffset); err != nil { + return 0, BloomOffset{}, errors.Wrap(err, "decoding bloom offset") + } + + // TODO(owen-d): use pool + s.Chunks = make([]ChunkRef, dec.Uvarint()) + var ( + err error + lastEnd model.Time + ) + for i := range s.Chunks { + lastEnd, err = s.Chunks[i].Decode(dec, lastEnd) + if err != nil { + return 0, BloomOffset{}, errors.Wrapf(err, "decoding %dth chunk", i) + } + } + return s.Fingerprint, s.Offset, dec.Err() +} + +type ChunkRef struct { + Start, End model.Time + Checksum uint32 +} + +func (r *ChunkRef) Encode(enc *encoding.Encbuf, previousEnd model.Time) model.Time { + // delta encode start time + enc.PutVarint64(int64(r.Start - previousEnd)) + enc.PutVarint64(int64(r.End - r.Start)) + enc.PutBE32(r.Checksum) + return r.End +} + +func (r *ChunkRef) Decode(dec *encoding.Decbuf, previousEnd model.Time) (model.Time, error) { + r.Start = previousEnd + model.Time(dec.Varint64()) + r.End = r.Start + model.Time(dec.Varint64()) + r.Checksum = dec.Be32() + return r.End, dec.Err() +} + +type BloomOffset struct { + Page int // page number in bloom block + ByteOffset int // offset to beginning of bloom within page +} + +func (o *BloomOffset) Encode(enc *encoding.Encbuf, previousOffset BloomOffset) { + enc.PutUvarint(o.Page - previousOffset.Page) + enc.PutUvarint(o.ByteOffset - previousOffset.ByteOffset) +} + +func (o *BloomOffset) Decode(dec *encoding.Decbuf, previousOffset BloomOffset) error { + o.Page = previousOffset.Page + dec.Uvarint() + o.ByteOffset = previousOffset.ByteOffset + dec.Uvarint() + return dec.Err() +} diff --git a/pkg/storage/bloom/v1/index_querier.go b/pkg/storage/bloom/v1/index_querier.go new file mode 100644 index 0000000000..1144f89160 --- /dev/null +++ b/pkg/storage/bloom/v1/index_querier.go @@ -0,0 +1,152 @@ +package v1 + +import ( + "context" + "sort" + + "github.com/efficientgo/core/errors" + "github.com/prometheus/common/model" +) + +type IndexQuerier interface { + Series(context.Context) Iterator[*SeriesWithOffset] +} + +type SeriesIterator interface { + Iterator[*SeriesWithOffset] + Reset() +} + +type LazySeriesIter struct { + b *Block + + // state + initialized bool + err error + curPageIndex int + curPage *SeriesPageDecoder +} + +// Decodes series pages one at a time and iterates through them +func NewLazySeriesIter(b *Block) *LazySeriesIter { + return &LazySeriesIter{ + b: b, + } +} + +func (it *LazySeriesIter) ensureInit() { + // TODO(owen-d): better control over when to decode + if !it.initialized { + if err := it.b.LoadHeaders(); err != nil { + it.err = err + } + it.initialized = true + } +} + +// Seek returns an iterator over the pages where the first fingerprint is >= fp +func (it *LazySeriesIter) Seek(fp model.Fingerprint) error { + it.ensureInit() + + // first potentially relevant page + desiredPage := sort.Search(len(it.b.index.pageHeaders), func(i int) bool { + header := it.b.index.pageHeaders[i] + return header.ThroughFp >= fp + }) + + page := it.b.index.pageHeaders[desiredPage] + + switch { + case desiredPage == len(it.b.index.pageHeaders), page.FromFp > fp: + // no overlap exists, either because no page was found with a throughFP >= fp + // or because the first page that was found has a fromFP > fp, + // meaning successive pages would also have a fromFP > fp + // since they're sorted in ascending fp order + it.curPageIndex = len(it.b.index.pageHeaders) + it.curPage = nil + return nil + + case desiredPage == it.curPageIndex && it.curPage != nil: + // on the right page, no action needed + default: + // need to load a new page + r, err := it.b.reader.Index() + if err != nil { + return errors.Wrap(err, "getting index reader") + } + it.curPage, err = it.b.index.NewSeriesPageDecoder( + r, + page, + ) + if err != nil { + return err + } + it.curPageIndex = desiredPage + } + + it.curPage.Seek(fp) + return nil +} + +func (it *LazySeriesIter) Next() bool { + it.ensureInit() + if it.err != nil { + return false + } + + return it.next() +} + +func (it *LazySeriesIter) next() bool { + for it.curPageIndex < len(it.b.index.pageHeaders) { + // first access of next page + if it.curPage == nil { + curHeader := it.b.index.pageHeaders[it.curPageIndex] + r, err := it.b.reader.Index() + if err != nil { + it.err = errors.Wrap(err, "getting index reader") + return false + } + it.curPage, err = it.b.index.NewSeriesPageDecoder( + r, + curHeader, + ) + if err != nil { + it.err = err + return false + } + continue + } + + if !it.curPage.Next() { + // there was an error + if it.curPage.Err() != nil { + return false + } + + // we've exhausted the current page, progress to next + it.curPageIndex++ + it.curPage = nil + continue + } + + return true + } + + // finished last page + return false +} + +func (it *LazySeriesIter) At() *SeriesWithOffset { + return it.curPage.At() +} + +func (it *LazySeriesIter) Err() error { + if it.err != nil { + return it.err + } + if it.curPage != nil { + return it.curPage.Err() + } + return nil +} diff --git a/pkg/storage/bloom/v1/index_test.go b/pkg/storage/bloom/v1/index_test.go new file mode 100644 index 0000000000..97d6eb9585 --- /dev/null +++ b/pkg/storage/bloom/v1/index_test.go @@ -0,0 +1,74 @@ +package v1 + +import ( + "testing" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/util/encoding" +) + +// does not include a real bloom offset +func mkBasicSeries(n int, fromFp, throughFp model.Fingerprint, fromTs, throughTs model.Time) []SeriesWithOffset { + var seriesList []SeriesWithOffset + for i := 0; i < n; i++ { + var series SeriesWithOffset + step := (throughFp - fromFp) / (model.Fingerprint(n)) + series.Fingerprint = fromFp + model.Fingerprint(i)*step + timeDelta := fromTs + (throughTs-fromTs)/model.Time(n)*model.Time(i) + series.Chunks = []ChunkRef{ + { + Start: fromTs + timeDelta*model.Time(i), + End: fromTs + timeDelta*model.Time(i), + Checksum: uint32(i), + }, + } + seriesList = append(seriesList, series) + } + return seriesList +} + +func TestBloomOffsetEncoding(t *testing.T) { + src := BloomOffset{Page: 1, ByteOffset: 2} + enc := &encoding.Encbuf{} + src.Encode(enc, BloomOffset{}) + + var dst BloomOffset + dec := encoding.DecWith(enc.Get()) + require.Nil(t, dst.Decode(&dec, BloomOffset{})) + + require.Equal(t, src, dst) +} + +func TestSeriesEncoding(t *testing.T) { + src := SeriesWithOffset{ + Series: Series{ + Fingerprint: model.Fingerprint(1), + Chunks: []ChunkRef{ + { + Start: 1, + End: 2, + Checksum: 3, + }, + { + Start: 4, + End: 5, + Checksum: 6, + }, + }, + }, + Offset: BloomOffset{Page: 2, ByteOffset: 3}, + } + + enc := &encoding.Encbuf{} + src.Encode(enc, 0, BloomOffset{}) + + dec := encoding.DecWith(enc.Get()) + var dst SeriesWithOffset + fp, offset, err := dst.Decode(&dec, 0, BloomOffset{}) + require.Nil(t, err) + require.Equal(t, src.Fingerprint, fp) + require.Equal(t, src.Offset, offset) + require.Equal(t, src, dst) +} diff --git a/pkg/storage/bloom/v1/util.go b/pkg/storage/bloom/v1/util.go new file mode 100644 index 0000000000..c10ad4e880 --- /dev/null +++ b/pkg/storage/bloom/v1/util.go @@ -0,0 +1,134 @@ +package v1 + +import ( + "hash" + "hash/crc32" + "io" + "sync" + + "github.com/prometheus/prometheus/util/pool" +) + +const ( + magicNumber = uint32(0xCA7CAFE5) + // Add new versions below + V1 byte = iota +) + +const ( + DefaultSchemaVersion = V1 +) + +var ( + castagnoliTable = crc32.MakeTable(crc32.Castagnoli) + + // Pool of crc32 hash + Crc32HashPool = ChecksumPool{ + Pool: sync.Pool{ + New: func() interface{} { + return crc32.New(castagnoliTable) + }, + }, + } + + // 1KB -> 8MB + BlockPool = BytePool{ + pool: pool.New( + 1<<10, 1<<24, 4, + func(size int) interface{} { + return make([]byte, size) + }), + } +) + +type BytePool struct { + pool *pool.Pool +} + +func (p *BytePool) Get(size int) []byte { + return p.pool.Get(size).([]byte)[:0] +} +func (p *BytePool) Put(b []byte) { + p.pool.Put(b) +} + +func newCRC32() hash.Hash32 { + return crc32.New(castagnoliTable) +} + +type ChecksumPool struct { + sync.Pool +} + +func (p *ChecksumPool) Get() hash.Hash32 { + h := p.Pool.Get().(hash.Hash32) + h.Reset() + return h +} + +func (p *ChecksumPool) Put(h hash.Hash32) { + p.Pool.Put(h) +} + +type Iterator[T any] interface { + Next() bool + Err() error + At() T +} + +type SliceIter[T any] struct { + cur int + xs []T +} + +func NewSliceIter[T any](xs []T) *SliceIter[T] { + return &SliceIter[T]{xs: xs, cur: -1} +} + +func (it *SliceIter[T]) Next() bool { + it.cur++ + return it.cur < len(it.xs) +} + +func (it *SliceIter[T]) Err() error { + return nil +} + +func (it *SliceIter[T]) At() T { + return it.xs[it.cur] +} + +type EmptyIter[T any] struct { + zero T +} + +func (it *EmptyIter[T]) Next() bool { + return false +} + +func (it *EmptyIter[T]) Err() error { + return nil +} + +func (it *EmptyIter[T]) At() T { + return it.zero +} + +// noop +func (it *EmptyIter[T]) Reset() {} + +func NewEmptyIter[T any](zero T) *EmptyIter[T] { + return &EmptyIter[T]{zero: zero} +} + +type NoopCloser struct { + io.Writer +} + +func (n NoopCloser) Close() error { + return nil +} + +func NewNoopCloser(w io.Writer) NoopCloser { + return NoopCloser{w} +} diff --git a/pkg/util/encoding/encoding.go b/pkg/util/encoding/encoding.go index 4cd6123da5..27164059d8 100644 --- a/pkg/util/encoding/encoding.go +++ b/pkg/util/encoding/encoding.go @@ -1,6 +1,11 @@ package encoding -import "github.com/prometheus/prometheus/tsdb/encoding" +import ( + "encoding/binary" + "hash/crc32" + + "github.com/prometheus/prometheus/tsdb/encoding" +) func EncWith(b []byte) (res Encbuf) { res.B = b @@ -44,3 +49,23 @@ func (d *Decbuf) Bytes(n int) []byte { d.B = d.B[n:] return x } + +func (d *Decbuf) CheckCrc(castagnoliTable *crc32.Table) error { + if d.E != nil { + return d.E + } + if len(d.B) < 4 { + d.E = encoding.ErrInvalidSize + return d.E + } + + offset := len(d.B) - 4 + expCRC := binary.BigEndian.Uint32(d.B[offset:]) + d.B = d.B[:offset] + + if d.Crc32(castagnoliTable) != expCRC { + d.E = encoding.ErrInvalidChecksum + return d.E + } + return nil +}