|
|
|
|
@ -392,10 +392,10 @@ func CompactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta { |
|
|
|
|
// Compact creates a new block in the compactor's directory from the blocks in the
|
|
|
|
|
// provided directories.
|
|
|
|
|
func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (uid ulid.ULID, err error) { |
|
|
|
|
return c.CompactWithPopulateBlockFunc(dest, dirs, open, DefaultPopulateBlockFunc{}) |
|
|
|
|
return c.CompactWithBlockPopulator(dest, dirs, open, DefaultBlockPopulator{}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (c *LeveledCompactor) CompactWithPopulateBlockFunc(dest string, dirs []string, open []*Block, populateBlockFunc PopulateBlockFunc) (uid ulid.ULID, err error) { |
|
|
|
|
func (c *LeveledCompactor) CompactWithBlockPopulator(dest string, dirs []string, open []*Block, blockPopulator BlockPopulator) (uid ulid.ULID, err error) { |
|
|
|
|
var ( |
|
|
|
|
blocks []BlockReader |
|
|
|
|
bs []*Block |
|
|
|
|
@ -439,7 +439,7 @@ func (c *LeveledCompactor) CompactWithPopulateBlockFunc(dest string, dirs []stri |
|
|
|
|
uid = ulid.MustNew(ulid.Now(), rand.Reader) |
|
|
|
|
|
|
|
|
|
meta := CompactBlockMetas(uid, metas...) |
|
|
|
|
err = c.write(dest, meta, populateBlockFunc, blocks...) |
|
|
|
|
err = c.write(dest, meta, blockPopulator, blocks...) |
|
|
|
|
if err == nil { |
|
|
|
|
if meta.Stats.NumSamples == 0 { |
|
|
|
|
for _, b := range bs { |
|
|
|
|
@ -505,7 +505,7 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, p |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
err := c.write(dest, meta, DefaultPopulateBlockFunc{}, b) |
|
|
|
|
err := c.write(dest, meta, DefaultBlockPopulator{}, b) |
|
|
|
|
if err != nil { |
|
|
|
|
return uid, err |
|
|
|
|
} |
|
|
|
|
@ -550,7 +550,7 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// write creates a new block that is the union of the provided blocks into dir.
|
|
|
|
|
func (c *LeveledCompactor) write(dest string, meta *BlockMeta, populateBlockFunc PopulateBlockFunc, blocks ...BlockReader) (err error) { |
|
|
|
|
func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blockPopulator BlockPopulator, blocks ...BlockReader) (err error) { |
|
|
|
|
dir := filepath.Join(dest, meta.ULID.String()) |
|
|
|
|
tmp := dir + tmpForCreationBlockDirSuffix |
|
|
|
|
var closers []io.Closer |
|
|
|
|
@ -598,7 +598,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, populateBlockFunc |
|
|
|
|
} |
|
|
|
|
closers = append(closers, indexw) |
|
|
|
|
|
|
|
|
|
if err := populateBlockFunc.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, blocks, meta, indexw, chunkw); err != nil { |
|
|
|
|
if err := blockPopulator.PopulateBlock(c.ctx, c.metrics, c.logger, c.chunkPool, c.mergeFunc, blocks, meta, indexw, chunkw); err != nil { |
|
|
|
|
return errors.Wrap(err, "populate block") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -663,16 +663,16 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, populateBlockFunc |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type PopulateBlockFunc interface { |
|
|
|
|
type BlockPopulator interface { |
|
|
|
|
PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type DefaultPopulateBlockFunc struct{} |
|
|
|
|
type DefaultBlockPopulator struct{} |
|
|
|
|
|
|
|
|
|
// PopulateBlock fills the index and chunk writers with new data gathered as the union
|
|
|
|
|
// of the provided blocks. It returns meta information for the new block.
|
|
|
|
|
// It expects sorted blocks input by mint.
|
|
|
|
|
func (c DefaultPopulateBlockFunc) PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) (err error) { |
|
|
|
|
func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) (err error) { |
|
|
|
|
if len(blocks) == 0 { |
|
|
|
|
return errors.New("cannot populate block from no readers") |
|
|
|
|
} |
|
|
|
|
|