|
|
|
@ -217,6 +217,7 @@ func (mb *MergeBuilder) processNextSeries( |
|
|
|
|
) ( |
|
|
|
|
*SeriesWithBlooms, // nextInBlocks pointer update
|
|
|
|
|
int, // bytes added
|
|
|
|
|
int, // chunks added
|
|
|
|
|
bool, // blocksFinished update
|
|
|
|
|
bool, // done building block
|
|
|
|
|
error, // error
|
|
|
|
@ -230,7 +231,7 @@ func (mb *MergeBuilder) processNextSeries( |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
if !mb.store.Next() { |
|
|
|
|
return nil, 0, false, true, nil |
|
|
|
|
return nil, 0, 0, false, true, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
nextInStore := mb.store.At() |
|
|
|
@ -249,7 +250,7 @@ func (mb *MergeBuilder) processNextSeries( |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if err := mb.blocks.Err(); err != nil { |
|
|
|
|
return nil, 0, false, false, errors.Wrap(err, "iterating blocks") |
|
|
|
|
return nil, 0, 0, false, false, errors.Wrap(err, "iterating blocks") |
|
|
|
|
} |
|
|
|
|
blockSeriesIterated++ |
|
|
|
|
nextInBlocks = mb.blocks.At() |
|
|
|
@ -276,11 +277,11 @@ func (mb *MergeBuilder) processNextSeries( |
|
|
|
|
|
|
|
|
|
for bloom := range ch { |
|
|
|
|
if bloom.Err != nil { |
|
|
|
|
return nil, bytesAdded, false, false, errors.Wrap(bloom.Err, "populating bloom") |
|
|
|
|
return nil, bytesAdded, 0, false, false, errors.Wrap(bloom.Err, "populating bloom") |
|
|
|
|
} |
|
|
|
|
offset, err := builder.AddBloom(bloom.Bloom) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, bytesAdded, false, false, errors.Wrapf( |
|
|
|
|
return nil, bytesAdded, 0, false, false, errors.Wrapf( |
|
|
|
|
err, "adding bloom to block for fp (%s)", nextInStore.Fingerprint, |
|
|
|
|
) |
|
|
|
|
} |
|
|
|
@ -290,25 +291,29 @@ func (mb *MergeBuilder) processNextSeries( |
|
|
|
|
|
|
|
|
|
done, err := builder.AddSeries(*nextInStore, offsets) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, bytesAdded, false, false, errors.Wrap(err, "committing series") |
|
|
|
|
return nil, bytesAdded, 0, false, false, errors.Wrap(err, "committing series") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return nextInBlocks, bytesAdded, blocksFinished, done, nil |
|
|
|
|
return nextInBlocks, bytesAdded, chunksIndexed + chunksCopied, blocksFinished, done, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (mb *MergeBuilder) Build(builder *BlockBuilder) (checksum uint32, totalBytes int, err error) { |
|
|
|
|
var ( |
|
|
|
|
nextInBlocks *SeriesWithBlooms |
|
|
|
|
blocksFinished bool // whether any previous blocks have been exhausted while building new block
|
|
|
|
|
done bool |
|
|
|
|
nextInBlocks *SeriesWithBlooms |
|
|
|
|
blocksFinished bool // whether any previous blocks have been exhausted while building new block
|
|
|
|
|
done bool |
|
|
|
|
totalSeriesAdded = 0 |
|
|
|
|
totalChunksAdded int |
|
|
|
|
) |
|
|
|
|
for { |
|
|
|
|
var bytesAdded int |
|
|
|
|
nextInBlocks, bytesAdded, blocksFinished, done, err = mb.processNextSeries(builder, nextInBlocks, blocksFinished) |
|
|
|
|
var bytesAdded, chunksAdded int |
|
|
|
|
nextInBlocks, bytesAdded, chunksAdded, blocksFinished, done, err = mb.processNextSeries(builder, nextInBlocks, blocksFinished) |
|
|
|
|
totalBytes += bytesAdded |
|
|
|
|
totalChunksAdded += chunksAdded |
|
|
|
|
if err != nil { |
|
|
|
|
return 0, totalBytes, errors.Wrap(err, "processing next series") |
|
|
|
|
} |
|
|
|
|
totalSeriesAdded++ |
|
|
|
|
if done { |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
@ -324,6 +329,8 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (checksum uint32, totalByte |
|
|
|
|
flushedFor = blockFlushReasonFull |
|
|
|
|
} |
|
|
|
|
mb.metrics.blockSize.Observe(float64(sz)) |
|
|
|
|
mb.metrics.seriesPerBlock.Observe(float64(totalSeriesAdded)) |
|
|
|
|
mb.metrics.chunksPerBlock.Observe(float64(totalChunksAdded)) |
|
|
|
|
mb.metrics.blockFlushReason.WithLabelValues(flushedFor).Inc() |
|
|
|
|
|
|
|
|
|
checksum, err = builder.Close() |
|
|
|
|