Format + Library for building bloom filter "blocks" from chunks (#10780)

This PR introduces both a binary format for bloom filter "blocks" and a
library for building/querying them.

Each "block" consists of two files:
1) `Index`. This is a list of individually compressed pages of
stream+chunk information sorted in ascending fingerprint order. Each
series lists both the chunks indexed in the bloom filters as well as the
relevant offsets for said series in the bloom file.
2) `Blooms`. This is a list of individually compressed pages of bloom
filters for each series, again sorted in ascending fingerprint order.


There is still more work to do, but I'm opening this PR as an initial
pass.
pull/10804/head^2
Owen Diehl 3 years ago committed by GitHub
parent 0c58c78828
commit c405db4317
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      go.mod
  2. 10
      pkg/chunkenc/memchunk.go
  3. 6
      pkg/chunkenc/pool.go
  4. 4
      pkg/chunkenc/pool_test.go
  5. 4
      pkg/chunkenc/symbols_test.go
  6. 29
      pkg/storage/bloom/spec.go
  7. 7
      pkg/storage/bloom/v1/TODO.md
  8. 113
      pkg/storage/bloom/v1/block.go
  9. 186
      pkg/storage/bloom/v1/block_writer.go
  10. 224
      pkg/storage/bloom/v1/bloom.go
  11. 125
      pkg/storage/bloom/v1/bloom_querier.go
  12. 429
      pkg/storage/bloom/v1/builder.go
  13. 112
      pkg/storage/bloom/v1/builder_test.go
  14. 419
      pkg/storage/bloom/v1/index.go
  15. 152
      pkg/storage/bloom/v1/index_querier.go
  16. 74
      pkg/storage/bloom/v1/index_test.go
  17. 134
      pkg/storage/bloom/v1/util.go
  18. 27
      pkg/util/encoding/encoding.go

@ -118,6 +118,7 @@ require (
github.com/IBM/go-sdk-core/v5 v5.13.1
github.com/IBM/ibm-cos-sdk-go v1.10.0
github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc
github.com/efficientgo/core v1.0.0-rc.2
github.com/fsnotify/fsnotify v1.6.0
github.com/grafana/loki/pkg/push v0.0.0-20230127102416-571f88bc5765
github.com/heroku/x v0.0.61
@ -196,7 +197,6 @@ require (
github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/edsrzf/mmap-go v1.1.0 // indirect
github.com/efficientgo/core v1.0.0-rc.2 // indirect
github.com/emicklei/go-restful/v3 v3.10.2 // indirect
github.com/envoyproxy/go-control-plane v0.11.1 // indirect
github.com/envoyproxy/protoc-gen-validate v1.0.2 // indirect

@ -508,7 +508,7 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me
if fromCheckpoint {
bc.symbolizer = symbolizerFromCheckpoint(lb)
} else {
symbolizer, err := symbolizerFromEnc(lb, getReaderPool(bc.encoding))
symbolizer, err := symbolizerFromEnc(lb, GetReaderPool(bc.encoding))
if err != nil {
return nil, err
}
@ -626,7 +626,7 @@ func (c *MemChunk) writeTo(w io.Writer, forCheckpoint bool) (int64, error) {
}
} else {
var err error
n, crcHash, err = c.symbolizer.SerializeTo(w, getWriterPool(c.encoding))
n, crcHash, err = c.symbolizer.SerializeTo(w, GetWriterPool(c.encoding))
if err != nil {
return offset, errors.Wrap(err, "write structured metadata")
}
@ -912,7 +912,7 @@ func (c *MemChunk) cut() error {
return nil
}
b, err := c.head.Serialise(getWriterPool(c.encoding))
b, err := c.head.Serialise(GetWriterPool(c.encoding))
if err != nil {
return err
}
@ -1153,14 +1153,14 @@ func (b encBlock) Iterator(ctx context.Context, pipeline log.StreamPipeline, opt
if len(b.b) == 0 {
return iter.NoopIterator
}
return newEntryIterator(ctx, getReaderPool(b.enc), b.b, pipeline, b.format, b.symbolizer, options...)
return newEntryIterator(ctx, GetReaderPool(b.enc), b.b, pipeline, b.format, b.symbolizer, options...)
}
func (b encBlock) SampleIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.SampleIterator {
if len(b.b) == 0 {
return iter.NoopIterator
}
return newSampleIterator(ctx, getReaderPool(b.enc), b.b, b.format, extractor, b.symbolizer)
return newSampleIterator(ctx, GetReaderPool(b.enc), b.b, b.format, extractor, b.symbolizer)
}
func (b block) Offset() int {

@ -82,11 +82,11 @@ var (
}
)
func getWriterPool(enc Encoding) WriterPool {
return getReaderPool(enc).(WriterPool)
func GetWriterPool(enc Encoding) WriterPool {
return GetReaderPool(enc).(WriterPool)
}
func getReaderPool(enc Encoding) ReaderPool {
func GetReaderPool(enc Encoding) ReaderPool {
switch enc {
case EncGZIP:
return &Gzip

@ -25,8 +25,8 @@ func TestPool(t *testing.T) {
var (
buf = bytes.NewBuffer(nil)
res = make([]byte, 1024)
wpool = getWriterPool(enc)
rpool = getReaderPool(enc)
wpool = GetWriterPool(enc)
rpool = GetReaderPool(enc)
)
w := wpool.GetWriter(buf)

@ -161,10 +161,10 @@ func TestSymbolizer(t *testing.T) {
}
buf.Reset()
_, _, err = s.SerializeTo(buf, getWriterPool(encoding))
_, _, err = s.SerializeTo(buf, GetWriterPool(encoding))
require.NoError(t, err)
loaded, err = symbolizerFromEnc(buf.Bytes(), getReaderPool(encoding))
loaded, err = symbolizerFromEnc(buf.Bytes(), GetReaderPool(encoding))
require.NoError(t, err)
for i, symbols := range tc.expectedSymbols {
require.Equal(t, tc.labelsToAdd[i], loaded.Lookup(symbols))

@ -0,0 +1,29 @@
package bloom
import "github.com/prometheus/common/model"
type Metadata interface {
Version() uint32
NumSeries() uint64
NumChunks() uint64
Size() uint64 // bytes
// timestamps
From() int64
Through() int64
// series
FromFingerprint() model.Fingerprint
ThroughFingerprint() model.Fingerprint
}
type Iterator[K any, V any] interface {
Next() bool
Err() error
At() V
Seek(K) Iterator[K, V]
}
type Block interface {
SeriesIterator() Iterator[model.Fingerprint, []byte]
}

@ -0,0 +1,7 @@
* Should be able to read bloom as a []byte without copying it during decoding
* It's immutable + partition offsets are calculable, etc
* can encode version, parameters as the last n bytes, each partition's byte range can be determined from that. No need to unpack
* implement streaming encoding.Decbuf over io.ReadSeeker
* Build & load from directories
* Less copying! I've taken some shortcuts we'll need to refactor to avoid copying []byte around in a few places
* more sophisticated querying methods

@ -0,0 +1,113 @@
package v1
import (
"github.com/pkg/errors"
"github.com/prometheus/common/model"
)
type Block struct {
// covers series pages
index BlockIndex
// covers bloom pages
blooms BloomBlock
// TODO(owen-d): implement
// synthetic header for the entire block
// built from all the pages in the index
header SeriesHeader
reader BlockReader // should this be decoupled from the struct (accepted as method arg instead)?
initialized bool
}
func NewBlock(reader BlockReader) *Block {
return &Block{
reader: reader,
}
}
func (b *Block) LoadHeaders() error {
// TODO(owen-d): better control over when to decode
if !b.initialized {
idx, err := b.reader.Index()
if err != nil {
return errors.Wrap(err, "getting index reader")
}
if err := b.index.DecodeHeaders(idx); err != nil {
return errors.Wrap(err, "decoding index")
}
blooms, err := b.reader.Blooms()
if err != nil {
return errors.Wrap(err, "getting blooms reader")
}
if err := b.blooms.DecodeHeaders(blooms); err != nil {
return errors.Wrap(err, "decoding blooms")
}
b.initialized = true
}
return nil
}
func (b *Block) Series() *LazySeriesIter {
return NewLazySeriesIter(b)
}
func (b *Block) Blooms() *LazyBloomIter {
return NewLazyBloomIter(b)
}
type BlockQuerier struct {
series *LazySeriesIter
blooms *LazyBloomIter
cur *SeriesWithBloom
}
func NewBlockQuerier(b *Block) *BlockQuerier {
return &BlockQuerier{
series: NewLazySeriesIter(b),
blooms: NewLazyBloomIter(b),
}
}
func (bq *BlockQuerier) Seek(fp model.Fingerprint) error {
return bq.series.Seek(fp)
}
func (bq *BlockQuerier) Next() bool {
if !bq.series.Next() {
return false
}
series := bq.series.At()
bq.blooms.Seek(series.Offset)
if !bq.blooms.Next() {
return false
}
bloom := bq.blooms.At()
bq.cur = &SeriesWithBloom{
Series: &series.Series,
Bloom: bloom,
}
return true
}
func (bq *BlockQuerier) At() *SeriesWithBloom {
return bq.cur
}
func (bq *BlockQuerier) Err() error {
if err := bq.series.Err(); err != nil {
return err
}
return bq.blooms.Err()
}

@ -0,0 +1,186 @@
package v1
import (
"bytes"
"io"
"os"
"path/filepath"
"github.com/pkg/errors"
"github.com/grafana/loki/pkg/storage/chunk/client/util"
)
const (
bloomFileName = "bloom"
seriesFileName = "series"
)
type BlockWriter interface {
Index() (io.WriteCloser, error)
Blooms() (io.WriteCloser, error)
Size() (int, error) // byte size of accumualted index & blooms
}
type BlockReader interface {
Index() (io.ReadSeeker, error)
Blooms() (io.ReadSeeker, error)
}
// in memory impl
type MemoryBlockWriter struct {
index, blooms *bytes.Buffer
}
func NewMemoryBlockWriter(index, blooms *bytes.Buffer) MemoryBlockWriter {
return MemoryBlockWriter{
index: index,
blooms: blooms,
}
}
func (b MemoryBlockWriter) Index() (io.WriteCloser, error) {
return NewNoopCloser(b.index), nil
}
func (b MemoryBlockWriter) Blooms() (io.WriteCloser, error) {
return NewNoopCloser(b.blooms), nil
}
func (b MemoryBlockWriter) Size() (int, error) {
return b.index.Len() + b.blooms.Len(), nil
}
// Directory based impl
type DirectoryBlockWriter struct {
dir string
blooms, index *os.File
initialized bool
}
func NewDirectoryBlockWriter(dir string) *DirectoryBlockWriter {
return &DirectoryBlockWriter{
dir: dir,
}
}
func (b *DirectoryBlockWriter) Init() error {
if !b.initialized {
err := util.EnsureDirectory(b.dir)
if err != nil {
return errors.Wrap(err, "creating bloom block dir")
}
b.index, err = os.Create(filepath.Join(b.dir, seriesFileName))
if err != nil {
return errors.Wrap(err, "creating series file")
}
b.blooms, err = os.Create(filepath.Join(b.dir, bloomFileName))
if err != nil {
return errors.Wrap(err, "creating bloom file")
}
b.initialized = true
}
return nil
}
func (b *DirectoryBlockWriter) Index() (io.WriteCloser, error) {
if !b.initialized {
if err := b.Init(); err != nil {
return nil, err
}
}
return b.index, nil
}
func (b *DirectoryBlockWriter) Blooms() (io.WriteCloser, error) {
if !b.initialized {
if err := b.Init(); err != nil {
return nil, err
}
}
return b.blooms, nil
}
func (b *DirectoryBlockWriter) Size() (int, error) {
var size int
for _, f := range []*os.File{b.blooms, b.index} {
info, err := f.Stat()
if err != nil {
return 0, errors.Wrapf(err, "error stat'ing file %s", f.Name())
}
size += int(info.Size())
}
return size, nil
}
// In memory reader
type ByteReader struct {
index, blooms *bytes.Buffer
}
func NewByteReader(index, blooms *bytes.Buffer) *ByteReader {
return &ByteReader{index: index, blooms: blooms}
}
func (r *ByteReader) Index() (io.ReadSeeker, error) {
return bytes.NewReader(r.index.Bytes()), nil
}
func (r *ByteReader) Blooms() (io.ReadSeeker, error) {
return bytes.NewReader(r.blooms.Bytes()), nil
}
// File reader
type DirectoryBlockReader struct {
dir string
blooms, index *os.File
initialized bool
}
func NewDirectoryBlockReader(dir string) *DirectoryBlockReader {
return &DirectoryBlockReader{
dir: dir,
initialized: false,
}
}
func (r *DirectoryBlockReader) Init() error {
if !r.initialized {
var err error
r.index, err = os.Open(filepath.Join(r.dir, seriesFileName))
if err != nil {
return errors.Wrap(err, "opening series file")
}
r.blooms, err = os.Open(filepath.Join(r.dir, bloomFileName))
if err != nil {
return errors.Wrap(err, "opening bloom file")
}
r.initialized = true
}
return nil
}
func (r *DirectoryBlockReader) Index() (io.ReadSeeker, error) {
if !r.initialized {
if err := r.Init(); err != nil {
return nil, err
}
}
return r.index, nil
}
func (r *DirectoryBlockReader) Blooms() (io.ReadSeeker, error) {
if !r.initialized {
if err := r.Init(); err != nil {
return nil, err
}
}
return r.blooms, nil
}

@ -0,0 +1,224 @@
package v1
import (
"bytes"
"fmt"
"io"
"github.com/owen-d/BoomFilters/boom"
"github.com/pkg/errors"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/util/encoding"
)
type Bloom struct {
sbf boom.ScalableBloomFilter
}
func (b *Bloom) Encode(enc *encoding.Encbuf) error {
// divide by 8 b/c bloom capacity is measured in bits, but we want bytes
buf := bytes.NewBuffer(BlockPool.Get(int(b.sbf.Capacity() / 8)))
_, err := b.sbf.WriteTo(buf)
if err != nil {
return errors.Wrap(err, "encoding bloom filter")
}
data := buf.Bytes()
enc.PutUvarint(len(data)) // length of bloom filter
enc.PutBytes(data)
BlockPool.Put(data[:0]) // release to pool
return nil
}
func (b *Bloom) Decode(dec *encoding.Decbuf) error {
ln := dec.Uvarint()
data := dec.Bytes(ln)
_, err := b.sbf.ReadFrom(bytes.NewReader(data))
if err != nil {
return errors.Wrap(err, "decoding bloom filter")
}
return nil
}
func LazyDecodeBloomPage(dec *encoding.Decbuf, pool chunkenc.ReaderPool, decompressedSize int) (*BloomPageDecoder, error) {
if err := dec.CheckCrc(castagnoliTable); err != nil {
return nil, errors.Wrap(err, "checksumming bloom page")
}
decompressor, err := pool.GetReader(bytes.NewReader(dec.Get()))
if err != nil {
return nil, errors.Wrap(err, "getting decompressor")
}
b := BlockPool.Get(decompressedSize)[:decompressedSize]
defer BlockPool.Put(b)
if _, err = io.ReadFull(decompressor, b); err != nil {
return nil, errors.Wrap(err, "decompressing bloom page")
}
decoder := NewBloomPageDecoder(b)
return decoder, nil
}
func NewBloomPageDecoder(data []byte) *BloomPageDecoder {
// last 8 bytes are the number of blooms in this page
dec := encoding.DecWith(data[len(data)-8:])
n := int(dec.Be64())
// reset data to the bloom portion of the page
data = data[:len(data)-8]
dec.B = data
// reset data to the bloom portion of the page
decoder := &BloomPageDecoder{
dec: &dec,
data: data,
n: n,
}
return decoder
}
// Decoder is a seekable, reset-able iterator
type BloomPageDecoder struct {
data []byte
dec *encoding.Decbuf
n int // number of blooms in page
cur *Bloom
err error
}
func (d *BloomPageDecoder) Reset() {
d.err = nil
d.cur = nil
d.dec.B = d.data
}
func (d *BloomPageDecoder) Seek(offset int) {
d.dec.B = d.data[offset:]
}
func (d *BloomPageDecoder) Next() bool {
// end of iteration, no error
if d.dec.Len() == 0 {
return false
}
var b Bloom
d.err = b.Decode(d.dec)
// end of iteration, error
if d.err != nil {
return false
}
d.cur = &b
return true
}
func (d *BloomPageDecoder) At() *Bloom {
return d.cur
}
func (d *BloomPageDecoder) Err() error {
return d.err
}
type BloomPageHeader struct {
N, Offset, Len, DecompressedLen int
}
func (h *BloomPageHeader) Encode(enc *encoding.Encbuf) {
enc.PutUvarint(h.N)
enc.PutUvarint(h.Offset)
enc.PutUvarint(h.Len)
enc.PutUvarint(h.DecompressedLen)
}
func (h *BloomPageHeader) Decode(dec *encoding.Decbuf) error {
h.N = dec.Uvarint()
h.Offset = dec.Uvarint()
h.Len = dec.Uvarint()
h.DecompressedLen = dec.Uvarint()
return dec.Err()
}
type BloomBlock struct {
schema Schema
pageHeaders []BloomPageHeader
}
func NewBloomBlock(encoding chunkenc.Encoding) BloomBlock {
return BloomBlock{
schema: Schema{version: DefaultSchemaVersion, encoding: encoding},
}
}
func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) error {
if err := b.schema.DecodeFrom(r); err != nil {
return errors.Wrap(err, "decoding schema")
}
var (
err error
dec encoding.Decbuf
)
// last 12 bytes are (headers offset: 8 byte u64, checksum: 4 byte u32)
if _, err := r.Seek(-12, io.SeekEnd); err != nil {
return errors.Wrap(err, "seeking to bloom headers metadata")
}
dec.B, err = io.ReadAll(r)
if err != nil {
return errors.Wrap(err, "reading bloom headers metadata")
}
headerOffset := dec.Be64()
if _, err := r.Seek(int64(headerOffset), io.SeekStart); err != nil {
return errors.Wrap(err, "seeking to bloom headers")
}
dec.B, err = io.ReadAll(r)
if err != nil {
return errors.Wrap(err, "reading bloom page headers")
}
if err := dec.CheckCrc(castagnoliTable); err != nil {
return errors.Wrap(err, "checksumming page headers")
}
b.pageHeaders = make([]BloomPageHeader, dec.Uvarint())
for i := 0; i < len(b.pageHeaders); i++ {
header := &b.pageHeaders[i]
if err := header.Decode(&dec); err != nil {
return errors.Wrapf(err, "decoding %dth series header", i)
}
}
return nil
}
func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int) (*BloomPageDecoder, error) {
if pageIdx < 0 || pageIdx >= len(b.pageHeaders) {
return nil, fmt.Errorf("invalid page (%d) for bloom page decoding", pageIdx)
}
page := b.pageHeaders[pageIdx]
if _, err := r.Seek(int64(page.Offset), io.SeekStart); err != nil {
return nil, errors.Wrap(err, "seeking to bloom page")
}
data := BlockPool.Get(page.Len)[:page.Len]
_, err := io.ReadFull(r, data)
if err != nil {
return nil, errors.Wrap(err, "reading bloom page")
}
dec := encoding.DecWith(data)
return LazyDecodeBloomPage(&dec, b.schema.DecompressorPool(), page.DecompressedLen)
}

@ -0,0 +1,125 @@
package v1
import "github.com/pkg/errors"
type BloomQuerier interface {
Seek(BloomOffset) (*Bloom, error)
}
type LazyBloomIter struct {
b *Block
// state
initialized bool
err error
curPageIndex int
curPage *BloomPageDecoder
}
func NewLazyBloomIter(b *Block) *LazyBloomIter {
return &LazyBloomIter{
b: b,
}
}
func (it *LazyBloomIter) ensureInit() {
// TODO(owen-d): better control over when to decode
if !it.initialized {
if err := it.b.LoadHeaders(); err != nil {
it.err = err
}
it.initialized = true
}
}
func (it *LazyBloomIter) Seek(offset BloomOffset) {
it.ensureInit()
// if we need a different page or the current page hasn't been loaded,
// load the desired page
if it.curPageIndex != offset.Page || it.curPage == nil {
r, err := it.b.reader.Blooms()
if err != nil {
it.err = errors.Wrap(err, "getting blooms reader")
return
}
decoder, err := it.b.blooms.BloomPageDecoder(r, offset.Page)
if err != nil {
it.err = errors.Wrap(err, "loading bloom page")
return
}
it.curPageIndex = offset.Page
it.curPage = decoder
}
it.curPage.Seek(offset.ByteOffset)
}
func (it *LazyBloomIter) Next() bool {
it.ensureInit()
if it.err != nil {
return false
}
return it.next()
}
func (it *LazyBloomIter) next() bool {
if it.err != nil {
return false
}
for it.curPageIndex < len(it.b.blooms.pageHeaders) {
// first access of next page
if it.curPage == nil {
r, err := it.b.reader.Blooms()
if err != nil {
it.err = errors.Wrap(err, "getting blooms reader")
return false
}
it.curPage, err = it.b.blooms.BloomPageDecoder(
r,
it.curPageIndex,
)
if err != nil {
it.err = err
return false
}
continue
}
if !it.curPage.Next() {
// there was an error
if it.curPage.Err() != nil {
return false
}
// we've exhausted the current page, progress to next
it.curPageIndex++
it.curPage = nil
continue
}
return true
}
// finished last page
return false
}
func (it *LazyBloomIter) At() *Bloom {
return it.curPage.At()
}
func (it *LazyBloomIter) Err() error {
{
if it.err != nil {
return it.err
}
if it.curPage != nil {
return it.curPage.Err()
}
return nil
}
}

@ -0,0 +1,429 @@
package v1
import (
"bytes"
"fmt"
"hash"
"io"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/util/encoding"
)
// SerieResolver iterates two sequences, one from blocks which have already been indexed
// into blooms and another from TSDBs. The series+chunks which exist in the TSDBs but not the
// blocks need to then be indexed into the blocks.
// type SeriesResolver struct {
// fromBlocks Iterator[Series]
// fromTSDBs Iterator[Series]
// }
type BlockOptions struct {
schema Schema
// target size in bytes (decompressed)
// of each page type
SeriesPageSize, BloomPageSize, BlockSize int
}
type BlockBuilder struct {
opts BlockOptions
index *IndexBuilder
blooms *BloomBlockBuilder
}
func NewBlockBuilder(opts BlockOptions, writer BlockWriter) (*BlockBuilder, error) {
index, err := writer.Index()
if err != nil {
return nil, errors.Wrap(err, "initializing index writer")
}
blooms, err := writer.Blooms()
if err != nil {
return nil, errors.Wrap(err, "initializing blooms writer")
}
return &BlockBuilder{
opts: opts,
index: NewIndexBuilder(opts, index),
blooms: NewBloomBlockBuilder(opts, blooms),
}, nil
}
type SeriesWithBloom struct {
Series *Series
Bloom *Bloom
}
func (b *BlockBuilder) BuildFrom(itr Iterator[SeriesWithBloom]) error {
for itr.Next() {
series := itr.At()
offset, err := b.blooms.Append(series)
if err != nil {
return errors.Wrapf(err, "writing bloom for series %v", series.Series.Fingerprint)
}
if err := b.index.Append(SeriesWithOffset{
Offset: offset,
Series: *series.Series,
}); err != nil {
return errors.Wrapf(err, "writing index for series %v", series.Series.Fingerprint)
}
}
if err := itr.Err(); err != nil {
return errors.Wrap(err, "iterating series with blooms")
}
if err := b.blooms.Close(); err != nil {
return errors.Wrap(err, "closing bloom file")
}
if err := b.index.Close(); err != nil {
return errors.Wrap(err, "closing series file")
}
return nil
}
type BloomBlockBuilder struct {
opts BlockOptions
writer io.WriteCloser
offset int // track the offset of the file
writtenSchema bool
pages []BloomPageHeader
page PageWriter
scratch *encoding.Encbuf
}
func NewBloomBlockBuilder(opts BlockOptions, writer io.WriteCloser) *BloomBlockBuilder {
return &BloomBlockBuilder{
opts: opts,
writer: writer,
page: NewPageWriter(opts.BloomPageSize),
scratch: &encoding.Encbuf{},
}
}
func (b *BloomBlockBuilder) WriteSchema() error {
b.scratch.Reset()
b.opts.schema.Encode(b.scratch)
if _, err := b.writer.Write(b.scratch.Get()); err != nil {
return errors.Wrap(err, "writing schema")
}
b.writtenSchema = true
b.offset += b.scratch.Len()
return nil
}
func (b *BloomBlockBuilder) Append(series SeriesWithBloom) (BloomOffset, error) {
if !b.writtenSchema {
if err := b.WriteSchema(); err != nil {
return BloomOffset{}, errors.Wrap(err, "writing schema")
}
}
b.scratch.Reset()
if err := series.Bloom.Encode(b.scratch); err != nil {
return BloomOffset{}, errors.Wrapf(err, "encoding bloom for %v", series.Series.Fingerprint)
}
if !b.page.SpaceFor(b.scratch.Len()) {
if err := b.flushPage(); err != nil {
return BloomOffset{}, errors.Wrap(err, "flushing bloom page")
}
}
return BloomOffset{
Page: len(b.pages),
ByteOffset: b.page.Add(b.scratch.Get()),
}, nil
}
func (b *BloomBlockBuilder) Close() error {
if b.page.Count() > 0 {
if err := b.flushPage(); err != nil {
return errors.Wrap(err, "flushing final bloom page")
}
}
b.scratch.Reset()
b.scratch.PutUvarint(len(b.pages))
for _, h := range b.pages {
h.Encode(b.scratch)
}
// put offset to beginning of header section
// cannot be varint encoded because it's offset will be calculated as
// the 8 bytes prior to the checksum
b.scratch.PutBE64(uint64(b.offset))
crc32Hash := Crc32HashPool.Get()
defer Crc32HashPool.Put(crc32Hash)
// wrap with final checksum
b.scratch.PutHash(crc32Hash)
_, err := b.writer.Write(b.scratch.Get())
if err != nil {
return errors.Wrap(err, "writing bloom page headers")
}
return errors.Wrap(b.writer.Close(), "closing bloom writer")
}
func (b *BloomBlockBuilder) flushPage() error {
crc32Hash := Crc32HashPool.Get()
defer Crc32HashPool.Put(crc32Hash)
decompressedLen, compressedLen, err := b.page.writePage(
b.writer,
b.opts.schema.CompressorPool(),
crc32Hash,
)
if err != nil {
return errors.Wrap(err, "writing bloom page")
}
header := BloomPageHeader{
N: b.page.Count(),
Offset: b.offset,
Len: compressedLen,
DecompressedLen: decompressedLen,
}
b.pages = append(b.pages, header)
b.offset += compressedLen
b.page.Reset()
return nil
}
type PageWriter struct {
enc *encoding.Encbuf
targetSize int
n int // number of encoded blooms
}
func NewPageWriter(targetSize int) PageWriter {
return PageWriter{
enc: &encoding.Encbuf{},
targetSize: targetSize,
}
}
func (w *PageWriter) Count() int {
return w.n
}
func (w *PageWriter) Reset() {
w.enc.Reset()
w.n = 0
}
func (w *PageWriter) SpaceFor(numBytes int) bool {
// if a single bloom exceeds the target size, still accept it
// otherwise only accept it if adding it would not exceed the target size
return w.n == 0 || w.enc.Len()+numBytes <= w.targetSize
}
func (w *PageWriter) Add(item []byte) (offset int) {
offset = w.enc.Len()
w.enc.PutBytes(item)
w.n++
return offset
}
func (w *PageWriter) writePage(writer io.Writer, pool chunkenc.WriterPool, crc32Hash hash.Hash32) (int, int, error) {
// write the number of blooms in this page, must not be varint
// so we can calculate it's position+len during decoding
w.enc.PutBE64(uint64(w.n))
decompressedLen := w.enc.Len()
buf := &bytes.Buffer{}
compressor := pool.GetWriter(buf)
defer pool.PutWriter(compressor)
if _, err := compressor.Write(w.enc.Get()); err != nil {
return 0, 0, errors.Wrap(err, "compressing page")
}
if err := compressor.Close(); err != nil {
return 0, 0, errors.Wrap(err, "closing compressor")
}
// replace the encoded series page with the compressed one
w.enc.B = buf.Bytes()
w.enc.PutHash(crc32Hash)
// write the page
if _, err := writer.Write(w.enc.Get()); err != nil {
return 0, 0, errors.Wrap(err, "writing page")
}
return decompressedLen, w.enc.Len(), nil
}
type IndexBuilder struct {
opts BlockOptions
writer io.WriteCloser
offset int // track the offset of the file
writtenSchema bool
pages []SeriesPageHeaderWithOffset
page PageWriter
scratch *encoding.Encbuf
previousFp model.Fingerprint
previousOffset BloomOffset
fromFp model.Fingerprint
fromTs, throughTs model.Time
}
func NewIndexBuilder(opts BlockOptions, writer io.WriteCloser) *IndexBuilder {
return &IndexBuilder{
opts: opts,
writer: writer,
page: NewPageWriter(opts.SeriesPageSize),
scratch: &encoding.Encbuf{},
}
}
func (b *IndexBuilder) WriteSchema() error {
b.scratch.Reset()
b.opts.schema.Encode(b.scratch)
if _, err := b.writer.Write(b.scratch.Get()); err != nil {
return errors.Wrap(err, "writing schema")
}
b.writtenSchema = true
b.offset += b.scratch.Len()
return nil
}
func (b *IndexBuilder) Append(series SeriesWithOffset) error {
if !b.writtenSchema {
if err := b.WriteSchema(); err != nil {
return errors.Wrap(err, "writing schema")
}
}
b.scratch.Reset()
// we don't want to update the previous pointers yet in case
// we need to flush the page first which would
// be passed the incorrect final fp/offset
previousFp, previousOffset := series.Encode(b.scratch, b.previousFp, b.previousOffset)
if !b.page.SpaceFor(b.scratch.Len()) {
if err := b.flushPage(); err != nil {
return errors.Wrap(err, "flushing series page")
}
// re-encode now that a new page has been cut and we use delta-encoding
b.scratch.Reset()
previousFp, previousOffset = series.Encode(b.scratch, b.previousFp, b.previousOffset)
}
b.previousFp = previousFp
b.previousOffset = previousOffset
switch {
case b.page.Count() == 0:
// Special case: this is the first series in a page
if len(series.Chunks) < 1 {
return fmt.Errorf("series with zero chunks for fingerprint %v", series.Fingerprint)
}
b.fromFp = series.Fingerprint
b.fromTs, b.throughTs = chkBounds(series.Chunks)
case b.previousFp > series.Fingerprint:
return fmt.Errorf("out of order series fingerprint for series %v", series.Fingerprint)
default:
from, through := chkBounds(series.Chunks)
if b.fromTs.After(from) {
b.fromTs = from
}
if b.throughTs.Before(through) {
b.throughTs = through
}
}
_ = b.page.Add(b.scratch.Get())
b.previousFp = series.Fingerprint
b.previousOffset = series.Offset
return nil
}
// must be > 1
func chkBounds(chks []ChunkRef) (from, through model.Time) {
from, through = chks[0].Start, chks[0].End
for _, chk := range chks[1:] {
if chk.Start.Before(from) {
from = chk.Start
}
if chk.End.After(through) {
through = chk.End
}
}
return
}
func (b *IndexBuilder) flushPage() error {
crc32Hash := Crc32HashPool.Get()
defer Crc32HashPool.Put(crc32Hash)
decompressedLen, compressedLen, err := b.page.writePage(
b.writer,
b.opts.schema.CompressorPool(),
crc32Hash,
)
if err != nil {
return errors.Wrap(err, "writing series page")
}
header := SeriesPageHeaderWithOffset{
Offset: b.offset,
Len: compressedLen,
DecompressedLen: decompressedLen,
SeriesHeader: SeriesHeader{
NumSeries: b.page.Count(),
FromFp: b.fromFp,
ThroughFp: b.previousFp,
FromTs: b.fromTs,
ThroughTs: b.throughTs,
},
}
b.pages = append(b.pages, header)
b.offset += compressedLen
b.fromFp = 0
b.fromTs = 0
b.throughTs = 0
b.previousFp = 0
b.previousOffset = BloomOffset{}
b.page.Reset()
return nil
}
func (b *IndexBuilder) Close() error {
if b.page.Count() > 0 {
if err := b.flushPage(); err != nil {
return errors.Wrap(err, "flushing final series page")
}
}
b.scratch.Reset()
b.scratch.PutUvarint(len(b.pages))
for _, h := range b.pages {
h.Encode(b.scratch)
}
// put offset to beginning of header section
// cannot be varint encoded because it's offset will be calculated as
// the 8 bytes prior to the checksum
b.scratch.PutBE64(uint64(b.offset))
crc32Hash := Crc32HashPool.Get()
defer Crc32HashPool.Put(crc32Hash)
// wrap with final checksum
b.scratch.PutHash(crc32Hash)
_, err := b.writer.Write(b.scratch.Get())
if err != nil {
return errors.Wrap(err, "writing series page headers")
}
return errors.Wrap(b.writer.Close(), "closing series writer")
}

@ -0,0 +1,112 @@
package v1
import (
"bytes"
"fmt"
"testing"
"github.com/owen-d/BoomFilters/boom"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/chunkenc"
)
func mkBasicSeriesWithBlooms(n int, fromFp, throughFp model.Fingerprint, fromTs, throughTs model.Time) (seriesList []SeriesWithBloom) {
for i := 0; i < n; i++ {
var series Series
step := (throughFp - fromFp) / (model.Fingerprint(n))
series.Fingerprint = fromFp + model.Fingerprint(i)*step
timeDelta := fromTs + (throughTs-fromTs)/model.Time(n)*model.Time(i)
series.Chunks = []ChunkRef{
{
Start: fromTs + timeDelta*model.Time(i),
End: fromTs + timeDelta*model.Time(i),
Checksum: uint32(i),
},
}
var bloom Bloom
bloom.sbf = *boom.NewScalableBloomFilter(1024, 0.01, 0.8)
bloom.sbf.Add([]byte(fmt.Sprint(i)))
seriesList = append(seriesList, SeriesWithBloom{
Series: &series,
Bloom: &bloom,
})
}
return
}
func TestBlockBuilderRoundTrip(t *testing.T) {
numSeries := 100
data := mkBasicSeriesWithBlooms(numSeries, 0, 0xffff, 0, 10000)
// references for linking in memory reader+writer
indexBuf := bytes.NewBuffer(nil)
bloomsBuf := bytes.NewBuffer(nil)
// directory for directory reader+writer
tmpDir := t.TempDir()
for _, tc := range []struct {
desc string
writer BlockWriter
reader BlockReader
}{
{
desc: "in-memory",
writer: NewMemoryBlockWriter(indexBuf, bloomsBuf),
reader: NewByteReader(indexBuf, bloomsBuf),
},
{
desc: "directory",
writer: NewDirectoryBlockWriter(tmpDir),
reader: NewDirectoryBlockReader(tmpDir),
},
} {
t.Run(tc.desc, func(t *testing.T) {
builder, err := NewBlockBuilder(
BlockOptions{
schema: Schema{
version: DefaultSchemaVersion,
encoding: chunkenc.EncSnappy,
},
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
},
tc.writer,
)
require.Nil(t, err)
itr := NewSliceIter[SeriesWithBloom](data)
require.Nil(t, builder.BuildFrom(itr))
block := NewBlock(tc.reader)
querier := NewBlockQuerier(block)
for i := 0; i < len(data); i++ {
require.Equal(t, true, querier.Next(), "on iteration %d with error %v", i, querier.Err())
got := querier.At()
require.Equal(t, data[i].Series, got.Series)
require.Equal(t, data[i].Bloom, got.Bloom)
}
// ensure no error
require.Nil(t, querier.Err())
// ensure it's exhausted
require.Equal(t, false, querier.Next())
// test seek
i := numSeries / 2
half := data[i:]
require.Nil(t, querier.Seek(half[0].Series.Fingerprint))
for j := 0; j < len(half); j++ {
require.Equal(t, true, querier.Next(), "on iteration %d", j)
got := querier.At()
require.Equal(t, half[j].Series, got.Series)
require.Equal(t, half[j].Bloom, got.Bloom)
require.Nil(t, querier.Err())
}
require.Equal(t, false, querier.Next())
})
}
}

@ -0,0 +1,419 @@
package v1
import (
"bytes"
"io"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/util/encoding"
)
type Schema struct {
version byte
encoding chunkenc.Encoding
}
// byte length
func (s Schema) Len() int {
// magic number + version + encoding
return 4 + 1 + 1
}
func (s *Schema) DecompressorPool() chunkenc.ReaderPool {
return chunkenc.GetReaderPool(s.encoding)
}
func (s *Schema) CompressorPool() chunkenc.WriterPool {
return chunkenc.GetWriterPool(s.encoding)
}
func (s *Schema) Encode(enc *encoding.Encbuf) {
enc.Reset()
enc.PutBE32(magicNumber)
enc.PutByte(s.version)
enc.PutByte(byte(s.encoding))
}
func (s *Schema) DecodeFrom(r io.ReadSeeker) error {
// TODO(owen-d): improve allocations
schemaBytes := make([]byte, s.Len())
_, err := io.ReadFull(r, schemaBytes)
if err != nil {
return errors.Wrap(err, "reading schema")
}
dec := encoding.DecWith(schemaBytes)
return s.Decode(&dec)
}
func (s *Schema) Decode(dec *encoding.Decbuf) error {
number := dec.Be32()
if number != magicNumber {
return errors.Errorf("invalid magic number. expected %x, got %x", magicNumber, number)
}
s.version = dec.Byte()
if s.version != 1 {
return errors.Errorf("invalid version. expected %d, got %d", 1, s.version)
}
s.encoding = chunkenc.Encoding(dec.Byte())
if _, err := chunkenc.ParseEncoding(s.encoding.String()); err != nil {
return errors.Wrap(err, "parsing encoding")
}
return dec.Err()
}
// Block index is a set of series pages along with
// the headers for each page
type BlockIndex struct {
schema Schema
pageHeaders []SeriesPageHeaderWithOffset // headers for each series page
}
func NewBlockIndex(encoding chunkenc.Encoding) BlockIndex {
return BlockIndex{
schema: Schema{version: DefaultSchemaVersion, encoding: encoding},
}
}
func (b *BlockIndex) DecodeHeaders(r io.ReadSeeker) error {
if err := b.schema.DecodeFrom(r); err != nil {
return errors.Wrap(err, "decoding schema")
}
var (
err error
dec encoding.Decbuf
)
// last 12 bytes are (headers offset: 8 byte u64, checksum: 4 byte u32)
if _, err := r.Seek(-12, io.SeekEnd); err != nil {
return errors.Wrap(err, "seeking to bloom headers metadata")
}
dec.B, err = io.ReadAll(r)
if err != nil {
return errors.Wrap(err, "reading bloom headers metadata")
}
headerOffset := dec.Be64()
if _, err := r.Seek(int64(headerOffset), io.SeekStart); err != nil {
return errors.Wrap(err, "seeking to index headers")
}
dec.B, err = io.ReadAll(r)
if err != nil {
return errors.Wrap(err, "reading index page headers")
}
if err := dec.CheckCrc(castagnoliTable); err != nil {
return errors.Wrap(err, "checksumming page headers")
}
b.pageHeaders = make(
[]SeriesPageHeaderWithOffset,
dec.Uvarint(),
)
for i := 0; i < len(b.pageHeaders); i++ {
var s SeriesPageHeaderWithOffset
if err := s.Decode(&dec); err != nil {
return errors.Wrapf(err, "decoding %dth series header", i)
}
b.pageHeaders[i] = s
}
return nil
}
// decompress page and return an iterator over the bytes
func (b *BlockIndex) NewSeriesPageDecoder(r io.ReadSeeker, header SeriesPageHeaderWithOffset) (*SeriesPageDecoder, error) {
if _, err := r.Seek(int64(header.Offset), io.SeekStart); err != nil {
return nil, errors.Wrap(err, "seeking to series page")
}
data := BlockPool.Get(header.Len)[:header.Len]
defer BlockPool.Put(data)
_, err := io.ReadFull(r, data)
if err != nil {
return nil, errors.Wrap(err, "reading series page")
}
dec := encoding.DecWith(data)
if err := dec.CheckCrc(castagnoliTable); err != nil {
return nil, errors.Wrap(err, "checksumming series page")
}
decompressor, err := b.schema.DecompressorPool().GetReader(bytes.NewReader(dec.Get()))
if err != nil {
return nil, errors.Wrap(err, "getting decompressor")
}
decompressed := BlockPool.Get(header.DecompressedLen)[:header.DecompressedLen]
if _, err = io.ReadFull(decompressor, decompressed); err != nil {
return nil, errors.Wrap(err, "decompressing series page")
}
// replace decoder's input with the now-decompressed data
dec.B = decompressed
res := &SeriesPageDecoder{
data: decompressed,
header: header.SeriesHeader,
i: -1,
}
res.Reset()
return res, nil
}
// Header for a series page
type SeriesPageHeaderWithOffset struct {
Offset, Len, DecompressedLen int
SeriesHeader
}
func (h *SeriesPageHeaderWithOffset) Encode(enc *encoding.Encbuf) {
enc.PutUvarint(h.Offset)
enc.PutUvarint(h.Len)
enc.PutUvarint(h.DecompressedLen)
h.SeriesHeader.Encode(enc)
}
func (h *SeriesPageHeaderWithOffset) Decode(dec *encoding.Decbuf) error {
h.Offset = dec.Uvarint()
h.Len = dec.Uvarint()
h.DecompressedLen = dec.Uvarint()
return h.SeriesHeader.Decode(dec)
}
type SeriesHeader struct {
NumSeries int
FromFp, ThroughFp model.Fingerprint
FromTs, ThroughTs model.Time
}
// build one aggregated header for the entire block
func aggregateHeaders(xs []SeriesHeader) SeriesHeader {
if len(xs) == 0 {
return SeriesHeader{}
}
res := SeriesHeader{
FromFp: xs[0].FromFp,
ThroughFp: xs[len(xs)-1].ThroughFp,
}
for _, x := range xs {
if x.FromTs < res.FromTs {
res.FromTs = x.FromTs
}
if x.ThroughTs > res.ThroughTs {
res.ThroughTs = x.ThroughTs
}
}
return res
}
func (h *SeriesHeader) Encode(enc *encoding.Encbuf) {
enc.PutUvarint(h.NumSeries)
enc.PutUvarint64(uint64(h.FromFp))
enc.PutUvarint64(uint64(h.ThroughFp))
enc.PutVarint64(int64(h.FromTs))
enc.PutVarint64(int64(h.ThroughTs))
}
func (h *SeriesHeader) Decode(dec *encoding.Decbuf) error {
h.NumSeries = dec.Uvarint()
h.FromFp = model.Fingerprint(dec.Uvarint64())
h.ThroughFp = model.Fingerprint(dec.Uvarint64())
h.FromTs = model.Time(dec.Varint64())
h.ThroughTs = model.Time(dec.Varint64())
return dec.Err()
}
// can decode a series page one item at a time, useful when we don't
// need to iterate an entire page
type SeriesPageDecoder struct {
data []byte
dec encoding.Decbuf
header SeriesHeader
// state
i int // current index
cur *SeriesWithOffset
err error
previousFp model.Fingerprint // previous series' fingerprint for delta-decoding
previousOffset BloomOffset // previous series' bloom offset for delta-decoding
}
func (d *SeriesPageDecoder) Reset() {
d.i = -1
d.cur = nil
d.err = nil
d.previousFp = 0
d.previousOffset = BloomOffset{}
d.dec.B = d.data
}
func (d *SeriesPageDecoder) Next() bool {
if d.err != nil {
return false
}
d.i++
if d.i >= d.header.NumSeries {
return false
}
var res SeriesWithOffset
d.previousFp, d.previousOffset, d.err = res.Decode(&d.dec, d.previousFp, d.previousOffset)
if d.err != nil {
return false
}
d.cur = &res
return true
}
func (d *SeriesPageDecoder) Seek(fp model.Fingerprint) {
if fp > d.header.ThroughFp || fp < d.header.FromFp {
// shortcut: we know the fingerprint is not in this page
// so masquerade the index as if we've already iterated through
d.i = d.header.NumSeries
}
// if we've seen an error or we've potentially skipped the desired fp, reset the page state
if d.Err() != nil || (d.cur != nil && d.cur.Fingerprint >= fp) {
d.Reset()
}
for {
// previous byte offset in decoder, used for resetting
// position after finding the desired fp
offset := len(d.data) - d.dec.Len()
// previous bloom offset in decoder, used for
// resetting position after finding the desired fp
// since offsets are delta-encoded
previousBloomOffset := d.previousOffset
previousFp := d.previousFp
// iteration finished
if ok := d.Next(); !ok {
return
}
// we've seeked to the correct location. reverse one step and return
cur := d.At()
if cur.Fingerprint >= fp {
d.i--
d.previousOffset = previousBloomOffset
d.previousFp = previousFp
d.dec.B = d.data[offset:]
return
}
}
}
func (d *SeriesPageDecoder) At() (res *SeriesWithOffset) {
return d.cur
}
func (d *SeriesPageDecoder) Err() error {
if d.err != nil {
return d.err
}
return d.dec.Err()
}
type Series struct {
Fingerprint model.Fingerprint
Chunks []ChunkRef
}
type SeriesWithOffset struct {
Offset BloomOffset
Series
}
func (s *SeriesWithOffset) Encode(
enc *encoding.Encbuf,
previousFp model.Fingerprint,
previousOffset BloomOffset,
) (model.Fingerprint, BloomOffset) {
// delta encode fingerprint
enc.PutBE64(uint64(s.Fingerprint - previousFp))
// delta encode offsets
s.Offset.Encode(enc, previousOffset)
// encode chunks using delta encoded timestamps
var lastEnd model.Time
enc.PutUvarint(len(s.Chunks))
for _, chunk := range s.Chunks {
lastEnd = chunk.Encode(enc, lastEnd)
}
return s.Fingerprint, s.Offset
}
func (s *SeriesWithOffset) Decode(dec *encoding.Decbuf, previousFp model.Fingerprint, previousOffset BloomOffset) (model.Fingerprint, BloomOffset, error) {
s.Fingerprint = previousFp + model.Fingerprint(dec.Be64())
if err := s.Offset.Decode(dec, previousOffset); err != nil {
return 0, BloomOffset{}, errors.Wrap(err, "decoding bloom offset")
}
// TODO(owen-d): use pool
s.Chunks = make([]ChunkRef, dec.Uvarint())
var (
err error
lastEnd model.Time
)
for i := range s.Chunks {
lastEnd, err = s.Chunks[i].Decode(dec, lastEnd)
if err != nil {
return 0, BloomOffset{}, errors.Wrapf(err, "decoding %dth chunk", i)
}
}
return s.Fingerprint, s.Offset, dec.Err()
}
type ChunkRef struct {
Start, End model.Time
Checksum uint32
}
func (r *ChunkRef) Encode(enc *encoding.Encbuf, previousEnd model.Time) model.Time {
// delta encode start time
enc.PutVarint64(int64(r.Start - previousEnd))
enc.PutVarint64(int64(r.End - r.Start))
enc.PutBE32(r.Checksum)
return r.End
}
func (r *ChunkRef) Decode(dec *encoding.Decbuf, previousEnd model.Time) (model.Time, error) {
r.Start = previousEnd + model.Time(dec.Varint64())
r.End = r.Start + model.Time(dec.Varint64())
r.Checksum = dec.Be32()
return r.End, dec.Err()
}
type BloomOffset struct {
Page int // page number in bloom block
ByteOffset int // offset to beginning of bloom within page
}
func (o *BloomOffset) Encode(enc *encoding.Encbuf, previousOffset BloomOffset) {
enc.PutUvarint(o.Page - previousOffset.Page)
enc.PutUvarint(o.ByteOffset - previousOffset.ByteOffset)
}
func (o *BloomOffset) Decode(dec *encoding.Decbuf, previousOffset BloomOffset) error {
o.Page = previousOffset.Page + dec.Uvarint()
o.ByteOffset = previousOffset.ByteOffset + dec.Uvarint()
return dec.Err()
}

@ -0,0 +1,152 @@
package v1
import (
"context"
"sort"
"github.com/efficientgo/core/errors"
"github.com/prometheus/common/model"
)
type IndexQuerier interface {
Series(context.Context) Iterator[*SeriesWithOffset]
}
type SeriesIterator interface {
Iterator[*SeriesWithOffset]
Reset()
}
type LazySeriesIter struct {
b *Block
// state
initialized bool
err error
curPageIndex int
curPage *SeriesPageDecoder
}
// Decodes series pages one at a time and iterates through them
func NewLazySeriesIter(b *Block) *LazySeriesIter {
return &LazySeriesIter{
b: b,
}
}
func (it *LazySeriesIter) ensureInit() {
// TODO(owen-d): better control over when to decode
if !it.initialized {
if err := it.b.LoadHeaders(); err != nil {
it.err = err
}
it.initialized = true
}
}
// Seek returns an iterator over the pages where the first fingerprint is >= fp
func (it *LazySeriesIter) Seek(fp model.Fingerprint) error {
it.ensureInit()
// first potentially relevant page
desiredPage := sort.Search(len(it.b.index.pageHeaders), func(i int) bool {
header := it.b.index.pageHeaders[i]
return header.ThroughFp >= fp
})
page := it.b.index.pageHeaders[desiredPage]
switch {
case desiredPage == len(it.b.index.pageHeaders), page.FromFp > fp:
// no overlap exists, either because no page was found with a throughFP >= fp
// or because the first page that was found has a fromFP > fp,
// meaning successive pages would also have a fromFP > fp
// since they're sorted in ascending fp order
it.curPageIndex = len(it.b.index.pageHeaders)
it.curPage = nil
return nil
case desiredPage == it.curPageIndex && it.curPage != nil:
// on the right page, no action needed
default:
// need to load a new page
r, err := it.b.reader.Index()
if err != nil {
return errors.Wrap(err, "getting index reader")
}
it.curPage, err = it.b.index.NewSeriesPageDecoder(
r,
page,
)
if err != nil {
return err
}
it.curPageIndex = desiredPage
}
it.curPage.Seek(fp)
return nil
}
func (it *LazySeriesIter) Next() bool {
it.ensureInit()
if it.err != nil {
return false
}
return it.next()
}
func (it *LazySeriesIter) next() bool {
for it.curPageIndex < len(it.b.index.pageHeaders) {
// first access of next page
if it.curPage == nil {
curHeader := it.b.index.pageHeaders[it.curPageIndex]
r, err := it.b.reader.Index()
if err != nil {
it.err = errors.Wrap(err, "getting index reader")
return false
}
it.curPage, err = it.b.index.NewSeriesPageDecoder(
r,
curHeader,
)
if err != nil {
it.err = err
return false
}
continue
}
if !it.curPage.Next() {
// there was an error
if it.curPage.Err() != nil {
return false
}
// we've exhausted the current page, progress to next
it.curPageIndex++
it.curPage = nil
continue
}
return true
}
// finished last page
return false
}
func (it *LazySeriesIter) At() *SeriesWithOffset {
return it.curPage.At()
}
func (it *LazySeriesIter) Err() error {
if it.err != nil {
return it.err
}
if it.curPage != nil {
return it.curPage.Err()
}
return nil
}

@ -0,0 +1,74 @@
package v1
import (
"testing"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/util/encoding"
)
// does not include a real bloom offset
func mkBasicSeries(n int, fromFp, throughFp model.Fingerprint, fromTs, throughTs model.Time) []SeriesWithOffset {
var seriesList []SeriesWithOffset
for i := 0; i < n; i++ {
var series SeriesWithOffset
step := (throughFp - fromFp) / (model.Fingerprint(n))
series.Fingerprint = fromFp + model.Fingerprint(i)*step
timeDelta := fromTs + (throughTs-fromTs)/model.Time(n)*model.Time(i)
series.Chunks = []ChunkRef{
{
Start: fromTs + timeDelta*model.Time(i),
End: fromTs + timeDelta*model.Time(i),
Checksum: uint32(i),
},
}
seriesList = append(seriesList, series)
}
return seriesList
}
func TestBloomOffsetEncoding(t *testing.T) {
src := BloomOffset{Page: 1, ByteOffset: 2}
enc := &encoding.Encbuf{}
src.Encode(enc, BloomOffset{})
var dst BloomOffset
dec := encoding.DecWith(enc.Get())
require.Nil(t, dst.Decode(&dec, BloomOffset{}))
require.Equal(t, src, dst)
}
func TestSeriesEncoding(t *testing.T) {
src := SeriesWithOffset{
Series: Series{
Fingerprint: model.Fingerprint(1),
Chunks: []ChunkRef{
{
Start: 1,
End: 2,
Checksum: 3,
},
{
Start: 4,
End: 5,
Checksum: 6,
},
},
},
Offset: BloomOffset{Page: 2, ByteOffset: 3},
}
enc := &encoding.Encbuf{}
src.Encode(enc, 0, BloomOffset{})
dec := encoding.DecWith(enc.Get())
var dst SeriesWithOffset
fp, offset, err := dst.Decode(&dec, 0, BloomOffset{})
require.Nil(t, err)
require.Equal(t, src.Fingerprint, fp)
require.Equal(t, src.Offset, offset)
require.Equal(t, src, dst)
}

@ -0,0 +1,134 @@
package v1
import (
"hash"
"hash/crc32"
"io"
"sync"
"github.com/prometheus/prometheus/util/pool"
)
const (
magicNumber = uint32(0xCA7CAFE5)
// Add new versions below
V1 byte = iota
)
const (
DefaultSchemaVersion = V1
)
var (
castagnoliTable = crc32.MakeTable(crc32.Castagnoli)
// Pool of crc32 hash
Crc32HashPool = ChecksumPool{
Pool: sync.Pool{
New: func() interface{} {
return crc32.New(castagnoliTable)
},
},
}
// 1KB -> 8MB
BlockPool = BytePool{
pool: pool.New(
1<<10, 1<<24, 4,
func(size int) interface{} {
return make([]byte, size)
}),
}
)
type BytePool struct {
pool *pool.Pool
}
func (p *BytePool) Get(size int) []byte {
return p.pool.Get(size).([]byte)[:0]
}
func (p *BytePool) Put(b []byte) {
p.pool.Put(b)
}
func newCRC32() hash.Hash32 {
return crc32.New(castagnoliTable)
}
type ChecksumPool struct {
sync.Pool
}
func (p *ChecksumPool) Get() hash.Hash32 {
h := p.Pool.Get().(hash.Hash32)
h.Reset()
return h
}
func (p *ChecksumPool) Put(h hash.Hash32) {
p.Pool.Put(h)
}
type Iterator[T any] interface {
Next() bool
Err() error
At() T
}
type SliceIter[T any] struct {
cur int
xs []T
}
func NewSliceIter[T any](xs []T) *SliceIter[T] {
return &SliceIter[T]{xs: xs, cur: -1}
}
func (it *SliceIter[T]) Next() bool {
it.cur++
return it.cur < len(it.xs)
}
func (it *SliceIter[T]) Err() error {
return nil
}
func (it *SliceIter[T]) At() T {
return it.xs[it.cur]
}
type EmptyIter[T any] struct {
zero T
}
func (it *EmptyIter[T]) Next() bool {
return false
}
func (it *EmptyIter[T]) Err() error {
return nil
}
func (it *EmptyIter[T]) At() T {
return it.zero
}
// noop
func (it *EmptyIter[T]) Reset() {}
func NewEmptyIter[T any](zero T) *EmptyIter[T] {
return &EmptyIter[T]{zero: zero}
}
type NoopCloser struct {
io.Writer
}
func (n NoopCloser) Close() error {
return nil
}
func NewNoopCloser(w io.Writer) NoopCloser {
return NoopCloser{w}
}

@ -1,6 +1,11 @@
package encoding
import "github.com/prometheus/prometheus/tsdb/encoding"
import (
"encoding/binary"
"hash/crc32"
"github.com/prometheus/prometheus/tsdb/encoding"
)
func EncWith(b []byte) (res Encbuf) {
res.B = b
@ -44,3 +49,23 @@ func (d *Decbuf) Bytes(n int) []byte {
d.B = d.B[n:]
return x
}
func (d *Decbuf) CheckCrc(castagnoliTable *crc32.Table) error {
if d.E != nil {
return d.E
}
if len(d.B) < 4 {
d.E = encoding.ErrInvalidSize
return d.E
}
offset := len(d.B) - 4
expCRC := binary.BigEndian.Uint32(d.B[offset:])
d.B = d.B[:offset]
if d.Crc32(castagnoliTable) != expCRC {
d.E = encoding.ErrInvalidChecksum
return d.E
}
return nil
}

Loading…
Cancel
Save