Blooms/block metadata (#11859)

A few updates to the bloom library:
* Uses `FingerprintBounds` in series headers
* Encodes `BlockOptions` in the series file so we can later read the
target page & block sizes the block was generated with in addition to
the schema.
* Introduces `BlockMetadata` struct and loads it correctly from blocks.
This struct will be used to convert to the `BlockRef`s from the
`bloomshipper` pkg and used in the bloom compactor + bloom gateway
* Integrates checksums better into block building and XORs the headers
metadata from each file (blooms, series) together to generate a final
checksum for the block (a combination of both files).
pull/11817/head^2
Owen Diehl 1 year ago committed by GitHub
parent 5132f6d2a7
commit 0f34d9155e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 46
      pkg/storage/bloom/v1/block.go
  2. 19
      pkg/storage/bloom/v1/bloom.go
  3. 109
      pkg/storage/bloom/v1/builder.go
  4. 26
      pkg/storage/bloom/v1/builder_test.go
  5. 56
      pkg/storage/bloom/v1/index.go
  6. 2
      pkg/storage/bloom/v1/index_querier.go

@ -7,21 +7,23 @@ import (
"github.com/prometheus/common/model"
)
type BlockMetadata struct {
Options BlockOptions
Series SeriesHeader
Checksum uint32
}
type Block struct {
// covers series pages
index BlockIndex
// covers bloom pages
blooms BloomBlock
// TODO(owen-d): implement
// synthetic header for the entire block
// built from all the pages in the index
header SeriesHeader
metadata BlockMetadata
reader BlockReader // should this be decoupled from the struct (accepted as method arg instead)?
initialized bool
dataRange SeriesHeader
}
func NewBlock(reader BlockReader) *Block {
@ -38,30 +40,49 @@ func (b *Block) LoadHeaders() error {
return errors.Wrap(err, "getting index reader")
}
if err := b.index.DecodeHeaders(idx); err != nil {
indexChecksum, err := b.index.DecodeHeaders(idx)
if err != nil {
return errors.Wrap(err, "decoding index")
}
b.metadata.Options = b.index.opts
// TODO(owen-d): better pattern
xs := make([]SeriesHeader, 0, len(b.index.pageHeaders))
for _, h := range b.index.pageHeaders {
xs = append(xs, h.SeriesHeader)
}
b.dataRange = aggregateHeaders(xs)
b.metadata.Series = aggregateHeaders(xs)
blooms, err := b.reader.Blooms()
if err != nil {
return errors.Wrap(err, "getting blooms reader")
}
if err := b.blooms.DecodeHeaders(blooms); err != nil {
bloomChecksum, err := b.blooms.DecodeHeaders(blooms)
if err != nil {
return errors.Wrap(err, "decoding blooms")
}
b.initialized = true
if !b.metadata.Options.Schema.Compatible(b.blooms.schema) {
return fmt.Errorf(
"schema mismatch: index (%v) vs blooms (%v)",
b.metadata.Options.Schema, b.blooms.schema,
)
}
b.metadata.Checksum = combineChecksums(indexChecksum, bloomChecksum)
}
return nil
}
// XOR checksums as a simple checksum combiner with the benefit that
// each part can be recomputed by XORing the result against the other
func combineChecksums(index, blooms uint32) uint32 {
return index ^ blooms
}
// convenience method
func (b *Block) Querier() *BlockQuerier {
return NewBlockQuerier(b)
@ -75,11 +96,18 @@ func (b *Block) Blooms() *LazyBloomIter {
return NewLazyBloomIter(b)
}
func (b *Block) Metadata() (BlockMetadata, error) {
if err := b.LoadHeaders(); err != nil {
return BlockMetadata{}, err
}
return b.metadata, nil
}
func (b *Block) Schema() (Schema, error) {
if err := b.LoadHeaders(); err != nil {
return Schema{}, err
}
return b.index.schema, nil
return b.metadata.Options.Schema, nil
}
type BlockQuerier struct {

@ -171,9 +171,9 @@ func NewBloomBlock(encoding chunkenc.Encoding) BloomBlock {
}
}
func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) error {
func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) (uint32, error) {
if err := b.schema.DecodeFrom(r); err != nil {
return errors.Wrap(err, "decoding schema")
return 0, errors.Wrap(err, "decoding schema")
}
var (
@ -182,35 +182,36 @@ func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) error {
)
// last 12 bytes are (headers offset: 8 byte u64, checksum: 4 byte u32)
if _, err := r.Seek(-12, io.SeekEnd); err != nil {
return errors.Wrap(err, "seeking to bloom headers metadata")
return 0, errors.Wrap(err, "seeking to bloom headers metadata")
}
dec.B, err = io.ReadAll(r)
if err != nil {
return errors.Wrap(err, "reading bloom headers metadata")
return 0, errors.Wrap(err, "reading bloom headers metadata")
}
headerOffset := dec.Be64()
checksum := dec.Be32()
if _, err := r.Seek(int64(headerOffset), io.SeekStart); err != nil {
return errors.Wrap(err, "seeking to bloom headers")
return 0, errors.Wrap(err, "seeking to bloom headers")
}
dec.B, err = io.ReadAll(r)
if err != nil {
return errors.Wrap(err, "reading bloom page headers")
return 0, errors.Wrap(err, "reading bloom page headers")
}
if err := dec.CheckCrc(castagnoliTable); err != nil {
return errors.Wrap(err, "checksumming page headers")
return 0, errors.Wrap(err, "checksumming page headers")
}
b.pageHeaders = make([]BloomPageHeader, dec.Uvarint())
for i := 0; i < len(b.pageHeaders); i++ {
header := &b.pageHeaders[i]
if err := header.Decode(&dec); err != nil {
return errors.Wrapf(err, "decoding %dth series header", i)
return 0, errors.Wrapf(err, "decoding %dth series header", i)
}
}
return nil
return checksum, nil
}
func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int) (*BloomPageDecoder, error) {

@ -5,7 +5,6 @@ import (
"fmt"
"hash"
"io"
"sort"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
@ -21,15 +20,46 @@ var (
type BlockOptions struct {
// Schema determines the Schema of the block and cannot be changed
// without recreating the block from underlying data
Schema Schema
// The following options can be changed on the fly.
// For instance, adding another page to a block with
// a different target page size is supported.
// a different target page size is supported, although
// the block will store the original sizes it was created with
// target size in bytes (decompressed)
// of each page type
SeriesPageSize, BloomPageSize, BlockSize int
SeriesPageSize, BloomPageSize, BlockSize uint64
}
func (b BlockOptions) Len() int {
return 3*8 + b.Schema.Len()
}
func (b *BlockOptions) DecodeFrom(r io.ReadSeeker) error {
buf := make([]byte, b.Len())
_, err := io.ReadFull(r, buf)
if err != nil {
return errors.Wrap(err, "reading block options")
}
dec := encoding.DecWith(buf)
if err := b.Schema.Decode(&dec); err != nil {
return errors.Wrap(err, "decoding schema")
}
b.SeriesPageSize = dec.Be64()
b.BloomPageSize = dec.Be64()
b.BlockSize = dec.Be64()
return nil
}
func (b BlockOptions) Encode(enc *encoding.Encbuf) {
b.Schema.Encode(enc)
enc.PutBE64(b.SeriesPageSize)
enc.PutBE64(b.BloomPageSize)
enc.PutBE64(b.BlockSize)
}
type BlockBuilder struct {
@ -90,14 +120,19 @@ func (b *BlockBuilder) BuildFrom(itr Iterator[SeriesWithBloom]) (uint32, error)
return 0, errors.Wrap(err, "iterating series with blooms")
}
checksum, err := b.blooms.Close()
return b.Close()
}
func (b *BlockBuilder) Close() (uint32, error) {
bloomChecksum, err := b.blooms.Close()
if err != nil {
return 0, errors.Wrap(err, "closing bloom file")
}
if err := b.index.Close(); err != nil {
indexCheckSum, err := b.index.Close()
if err != nil {
return 0, errors.Wrap(err, "closing series file")
}
return checksum, nil
return combineChecksums(indexCheckSum, bloomChecksum), nil
}
func (b *BlockBuilder) AddSeries(series SeriesWithBloom) error {
@ -131,7 +166,7 @@ func NewBloomBlockBuilder(opts BlockOptions, writer io.WriteCloser) *BloomBlockB
return &BloomBlockBuilder{
opts: opts,
writer: writer,
page: NewPageWriter(opts.BloomPageSize),
page: NewPageWriter(int(opts.BloomPageSize)),
scratch: &encoding.Encbuf{},
}
}
@ -307,16 +342,16 @@ func NewIndexBuilder(opts BlockOptions, writer io.WriteCloser) *IndexBuilder {
return &IndexBuilder{
opts: opts,
writer: writer,
page: NewPageWriter(opts.SeriesPageSize),
page: NewPageWriter(int(opts.SeriesPageSize)),
scratch: &encoding.Encbuf{},
}
}
func (b *IndexBuilder) WriteSchema() error {
func (b *IndexBuilder) WriteOpts() error {
b.scratch.Reset()
b.opts.Schema.Encode(b.scratch)
b.opts.Encode(b.scratch)
if _, err := b.writer.Write(b.scratch.Get()); err != nil {
return errors.Wrap(err, "writing schema")
return errors.Wrap(err, "writing opts+schema")
}
b.writtenSchema = true
b.offset += b.scratch.Len()
@ -325,8 +360,8 @@ func (b *IndexBuilder) WriteSchema() error {
func (b *IndexBuilder) Append(series SeriesWithOffset) error {
if !b.writtenSchema {
if err := b.WriteSchema(); err != nil {
return errors.Wrap(err, "writing schema")
if err := b.WriteOpts(); err != nil {
return errors.Wrap(err, "appending series")
}
}
@ -408,8 +443,7 @@ func (b *IndexBuilder) flushPage() error {
DecompressedLen: decompressedLen,
SeriesHeader: SeriesHeader{
NumSeries: b.page.Count(),
FromFp: b.fromFp,
ThroughFp: b.previousFp,
Bounds: NewBounds(b.fromFp, b.previousFp),
FromTs: b.fromTs,
ThroughTs: b.throughTs,
},
@ -428,10 +462,10 @@ func (b *IndexBuilder) flushPage() error {
return nil
}
func (b *IndexBuilder) Close() error {
func (b *IndexBuilder) Close() (uint32, error) {
if b.page.Count() > 0 {
if err := b.flushPage(); err != nil {
return errors.Wrap(err, "flushing final series page")
return 0, errors.Wrap(err, "flushing final series page")
}
}
@ -451,39 +485,9 @@ func (b *IndexBuilder) Close() error {
b.scratch.PutHash(crc32Hash)
_, err := b.writer.Write(b.scratch.Get())
if err != nil {
return errors.Wrap(err, "writing series page headers")
return 0, errors.Wrap(err, "writing series page headers")
}
return errors.Wrap(b.writer.Close(), "closing series writer")
}
// SortBlocksIntoOverlappingGroups sorts a list of blocks into a sorted list of lists,
// where each list contains blocks that overlap with each other.
// TODO(owen-d): implement as an iterator so we don't have to load all blocks at once
// NB: unused now, but likely useful when we want to optimize compaction. I wrote this expecting to need it now
// but it feels unsavory to remove it
func SortBlocksIntoOverlappingGroups(xs []*Block) (groups [][]*Block) {
sort.Slice(xs, func(i, j int) bool {
a, b := xs[i].index, xs[j].index
return a.pageHeaders[0].FromFp <= b.pageHeaders[0].FromFp
})
var curGroup []*Block
for _, x := range xs {
switch {
case len(curGroup) == 0:
curGroup = append(curGroup, x)
case curGroup[len(curGroup)-1].dataRange.OverlapFingerprintRange(x.dataRange):
curGroup = append(curGroup, x)
default:
groups = append(groups, curGroup)
curGroup = []*Block{x}
}
}
if len(curGroup) > 0 {
groups = append(groups, curGroup)
}
return groups
return crc32Hash.Sum32(), errors.Wrap(b.writer.Close(), "closing series writer")
}
// Simplistic implementation of a merge builder that builds a single block
@ -586,12 +590,9 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) {
}
}
checksum, err := builder.blooms.Close()
checksum, err := builder.Close()
if err != nil {
return 0, errors.Wrap(err, "closing bloom file")
}
if err := builder.index.Close(); err != nil {
return 0, errors.Wrap(err, "closing series file")
return 0, errors.Wrap(err, "closing block")
}
return checksum, nil
}

@ -9,8 +9,32 @@ import (
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/util/encoding"
)
func TestBlockOptionsRoundTrip(t *testing.T) {
opts := BlockOptions{
Schema: Schema{
version: V1,
encoding: chunkenc.EncSnappy,
nGramLength: 10,
nGramSkip: 2,
},
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
BlockSize: 10 << 20,
}
var enc encoding.Encbuf
opts.Encode(&enc)
var got BlockOptions
err := got.DecodeFrom(bytes.NewReader(enc.Get()))
require.Nil(t, err)
require.Equal(t, opts, got)
}
func TestBlockBuilderRoundTrip(t *testing.T) {
numSeries := 100
numKeysPerSeries := 10000
@ -334,7 +358,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {
checksum, err := mb.Build(builder)
require.Nil(t, err)
require.Equal(t, uint32(0x2ec4fd6a), checksum)
require.Equal(t, uint32(0xe306ec6e), checksum)
// ensure the new block contains one copy of all the data
// by comparing it against an iterator over the source data

@ -2,6 +2,7 @@ package v1
import (
"bytes"
"fmt"
"io"
"github.com/pkg/errors"
@ -17,6 +18,10 @@ type Schema struct {
nGramLength, nGramSkip uint64
}
func (s Schema) String() string {
return fmt.Sprintf("v%d,encoding=%s,ngram=%d,skip=%d", s.version, s.encoding, s.nGramLength, s.nGramSkip)
}
func (s Schema) Compatible(other Schema) bool {
return s == other
}
@ -89,19 +94,14 @@ func (s *Schema) Decode(dec *encoding.Decbuf) error {
// Block index is a set of series pages along with
// the headers for each page
type BlockIndex struct {
schema Schema
pageHeaders []SeriesPageHeaderWithOffset // headers for each series page
}
opts BlockOptions
func NewBlockIndex(encoding chunkenc.Encoding) BlockIndex {
return BlockIndex{
schema: Schema{version: DefaultSchemaVersion, encoding: encoding},
}
pageHeaders []SeriesPageHeaderWithOffset // headers for each series page
}
func (b *BlockIndex) DecodeHeaders(r io.ReadSeeker) error {
if err := b.schema.DecodeFrom(r); err != nil {
return errors.Wrap(err, "decoding schema")
func (b *BlockIndex) DecodeHeaders(r io.ReadSeeker) (uint32, error) {
if err := b.opts.DecodeFrom(r); err != nil {
return 0, errors.Wrap(err, "decoding block options")
}
var (
@ -111,24 +111,25 @@ func (b *BlockIndex) DecodeHeaders(r io.ReadSeeker) error {
// last 12 bytes are (headers offset: 8 byte u64, checksum: 4 byte u32)
if _, err := r.Seek(-12, io.SeekEnd); err != nil {
return errors.Wrap(err, "seeking to bloom headers metadata")
return 0, errors.Wrap(err, "seeking to bloom headers metadata")
}
dec.B, err = io.ReadAll(r)
if err != nil {
return errors.Wrap(err, "reading bloom headers metadata")
return 0, errors.Wrap(err, "reading bloom headers metadata")
}
headerOffset := dec.Be64()
checksum := dec.Be32()
if _, err := r.Seek(int64(headerOffset), io.SeekStart); err != nil {
return errors.Wrap(err, "seeking to index headers")
return 0, errors.Wrap(err, "seeking to index headers")
}
dec.B, err = io.ReadAll(r)
if err != nil {
return errors.Wrap(err, "reading index page headers")
return 0, errors.Wrap(err, "reading index page headers")
}
if err := dec.CheckCrc(castagnoliTable); err != nil {
return errors.Wrap(err, "checksumming page headers")
return 0, errors.Wrap(err, "checksumming page headers")
}
b.pageHeaders = make(
@ -139,12 +140,12 @@ func (b *BlockIndex) DecodeHeaders(r io.ReadSeeker) error {
for i := 0; i < len(b.pageHeaders); i++ {
var s SeriesPageHeaderWithOffset
if err := s.Decode(&dec); err != nil {
return errors.Wrapf(err, "decoding %dth series header", i)
return 0, errors.Wrapf(err, "decoding %dth series header", i)
}
b.pageHeaders[i] = s
}
return nil
return checksum, nil
}
// decompress page and return an iterator over the bytes
@ -167,7 +168,7 @@ func (b *BlockIndex) NewSeriesPageDecoder(r io.ReadSeeker, header SeriesPageHead
return nil, errors.Wrap(err, "checksumming series page")
}
decompressor, err := b.schema.DecompressorPool().GetReader(bytes.NewReader(dec.Get()))
decompressor, err := b.opts.Schema.DecompressorPool().GetReader(bytes.NewReader(dec.Get()))
if err != nil {
return nil, errors.Wrap(err, "getting decompressor")
}
@ -213,12 +214,12 @@ func (h *SeriesPageHeaderWithOffset) Decode(dec *encoding.Decbuf) error {
type SeriesHeader struct {
NumSeries int
FromFp, ThroughFp model.Fingerprint
Bounds FingerprintBounds
FromTs, ThroughTs model.Time
}
func (h SeriesHeader) OverlapFingerprintRange(other SeriesHeader) bool {
return h.ThroughFp >= other.FromFp && h.FromFp <= other.ThroughFp
return h.Bounds.Overlaps(other.Bounds)
}
// build one aggregated header for the entire block
@ -227,9 +228,10 @@ func aggregateHeaders(xs []SeriesHeader) SeriesHeader {
return SeriesHeader{}
}
fromFp, _ := xs[0].Bounds.GetFromThrough()
_, throughFP := xs[len(xs)-1].Bounds.GetFromThrough()
res := SeriesHeader{
FromFp: xs[0].FromFp,
ThroughFp: xs[len(xs)-1].ThroughFp,
Bounds: NewBounds(fromFp, throughFP),
}
for _, x := range xs {
@ -245,16 +247,16 @@ func aggregateHeaders(xs []SeriesHeader) SeriesHeader {
func (h *SeriesHeader) Encode(enc *encoding.Encbuf) {
enc.PutUvarint(h.NumSeries)
enc.PutUvarint64(uint64(h.FromFp))
enc.PutUvarint64(uint64(h.ThroughFp))
enc.PutUvarint64(uint64(h.Bounds.Min))
enc.PutUvarint64(uint64(h.Bounds.Max))
enc.PutVarint64(int64(h.FromTs))
enc.PutVarint64(int64(h.ThroughTs))
}
func (h *SeriesHeader) Decode(dec *encoding.Decbuf) error {
h.NumSeries = dec.Uvarint()
h.FromFp = model.Fingerprint(dec.Uvarint64())
h.ThroughFp = model.Fingerprint(dec.Uvarint64())
h.Bounds.Min = model.Fingerprint(dec.Uvarint64())
h.Bounds.Max = model.Fingerprint(dec.Uvarint64())
h.FromTs = model.Time(dec.Varint64())
h.ThroughTs = model.Time(dec.Varint64())
return dec.Err()
@ -305,7 +307,7 @@ func (d *SeriesPageDecoder) Next() bool {
}
func (d *SeriesPageDecoder) Seek(fp model.Fingerprint) {
if fp > d.header.ThroughFp {
if fp > d.header.Bounds.Max {
// shortcut: we know the fingerprint is too large so nothing in this page
// will match the seek call, which returns the first found fingerprint >= fp.
// so masquerade the index as if we've already iterated through

@ -49,7 +49,7 @@ func (it *LazySeriesIter) Seek(fp model.Fingerprint) error {
// first potentially relevant page
desiredPage := sort.Search(len(it.b.index.pageHeaders), func(i int) bool {
header := it.b.index.pageHeaders[i]
return header.ThroughFp >= fp
return header.Bounds.Max >= fp
})
switch {

Loading…
Cancel
Save