|
|
|
|
@ -166,17 +166,35 @@ func (c *compactor) match(dirs []dirMeta) bool { |
|
|
|
|
return uint64(dirs[len(dirs)-1].meta.MaxTime-dirs[0].meta.MinTime) <= c.opts.maxBlockRange |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func mergeBlockMetas(blocks ...Block) (res BlockMeta) { |
|
|
|
|
m0 := blocks[0].Meta() |
|
|
|
|
func compactBlockMetas(blocks ...BlockMeta) (res BlockMeta) { |
|
|
|
|
res.MinTime = blocks[0].MinTime |
|
|
|
|
res.MaxTime = blocks[len(blocks)-1].MaxTime |
|
|
|
|
|
|
|
|
|
res.MinTime = m0.MinTime |
|
|
|
|
res.MaxTime = blocks[len(blocks)-1].Meta().MaxTime |
|
|
|
|
|
|
|
|
|
res.Compaction.Generation = m0.Compaction.Generation + 1 |
|
|
|
|
sources := map[ulid.ULID]struct{}{} |
|
|
|
|
|
|
|
|
|
for _, b := range blocks { |
|
|
|
|
res.Stats.NumSamples += b.Meta().Stats.NumSamples |
|
|
|
|
res.Stats.NumSamples += b.Stats.NumSamples |
|
|
|
|
|
|
|
|
|
if b.Compaction.Generation > res.Compaction.Generation { |
|
|
|
|
res.Compaction.Generation = b.Compaction.Generation |
|
|
|
|
} |
|
|
|
|
for _, s := range b.Compaction.Sources { |
|
|
|
|
sources[s] = struct{}{} |
|
|
|
|
} |
|
|
|
|
// If it's an in memory block, its ULID goes into the sources.
|
|
|
|
|
if b.Compaction.Generation == 0 { |
|
|
|
|
sources[b.ULID] = struct{}{} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
res.Compaction.Generation++ |
|
|
|
|
|
|
|
|
|
for s := range sources { |
|
|
|
|
res.Compaction.Sources = append(res.Compaction.Sources, s) |
|
|
|
|
} |
|
|
|
|
sort.Slice(res.Compaction.Sources, func(i, j int) bool { |
|
|
|
|
return res.Compaction.Sources[i].Compare(res.Compaction.Sources[j]) < 0 |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
return res |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -293,6 +311,7 @@ func (c *compactor) write(uid ulid.ULID, blocks ...Block) (err error) { |
|
|
|
|
// of the provided blocks. It returns meta information for the new block.
|
|
|
|
|
func (c *compactor) populate(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) { |
|
|
|
|
var set compactionSet |
|
|
|
|
var metas []BlockMeta |
|
|
|
|
|
|
|
|
|
for i, b := range blocks { |
|
|
|
|
all, err := b.Index().Postings("", "") |
|
|
|
|
@ -309,6 +328,7 @@ func (c *compactor) populate(blocks []Block, indexw IndexWriter, chunkw ChunkWri |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
metas = append(metas, b.Meta()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// We fully rebuild the postings list index from merged series.
|
|
|
|
|
@ -316,7 +336,7 @@ func (c *compactor) populate(blocks []Block, indexw IndexWriter, chunkw ChunkWri |
|
|
|
|
postings = &memPostings{m: make(map[term][]uint32, 512)} |
|
|
|
|
values = map[string]stringset{} |
|
|
|
|
i = uint32(0) |
|
|
|
|
meta = mergeBlockMetas(blocks...) |
|
|
|
|
meta = compactBlockMetas(metas...) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
for set.Next() { |
|
|
|
|
|