@ -14,6 +14,7 @@
package tsdb
import (
"fmt"
"io"
"math/rand"
"os"
@ -33,7 +34,7 @@ import (
"github.com/prometheus/tsdb/labels"
)
// ExponentialBlockRanges returns the time ranges based on the stepSize
// ExponentialBlockRanges returns the time ranges based on the stepSize.
func ExponentialBlockRanges ( minSize int64 , steps , stepSize int ) [ ] int64 {
ranges := make ( [ ] int64 , 0 , steps )
curRange := minSize
@ -215,7 +216,7 @@ func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta {
Outer :
for _ , p := range parts {
// Donot select the range if it has a block whose compaction failed.
// Do not select the range if it has a block whose compaction failed.
for _ , dm := range p {
if dm . meta . Compaction . Failed {
continue Outer
@ -312,9 +313,12 @@ func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta {
// Compact creates a new block in the compactor's directory from the blocks in the
// provided directories.
func ( c * LeveledCompactor ) Compact ( dest string , dirs ... string ) ( uid ulid . ULID , err error ) {
var blocks [ ] BlockReader
var bs [ ] * Block
var metas [ ] * BlockMeta
var (
blocks [ ] BlockReader
bs [ ] * Block
metas [ ] * BlockMeta
uids [ ] string
)
for _ , d := range dirs {
b , err := OpenBlock ( d , c . chunkPool )
@ -331,13 +335,23 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (uid ulid.ULID,
metas = append ( metas , meta )
blocks = append ( blocks , b )
bs = append ( bs , b )
uids = append ( uids , meta . ULID . String ( ) )
}
entropy := rand . New ( rand . NewSource ( time . Now ( ) . UnixNano ( ) ) )
uid = ulid . MustNew ( ulid . Now ( ) , entropy )
err = c . write ( dest , compactBlockMetas ( uid , metas ... ) , blocks ... )
meta := compactBlockMetas ( uid , metas ... )
err = c . write ( dest , meta , blocks ... )
if err == nil {
level . Info ( c . logger ) . Log (
"msg" , "compact blocks" ,
"count" , len ( blocks ) ,
"mint" , meta . MinTime ,
"maxt" , meta . MaxTime ,
"ulid" , meta . ULID ,
"sources" , fmt . Sprintf ( "%v" , uids ) ,
)
return uid , nil
}
@ -365,7 +379,13 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) (
meta . Compaction . Level = 1
meta . Compaction . Sources = [ ] ulid . ULID { uid }
return uid , c . write ( dest , meta , b )
err := c . write ( dest , meta , b )
if err != nil {
return uid , err
}
level . Info ( c . logger ) . Log ( "msg" , "write block" , "mint" , meta . MinTime , "maxt" , meta . MaxTime , "ulid" , meta . ULID )
return uid , nil
}
// instrumentedChunkWriter is used for level 1 compactions to record statistics
@ -390,8 +410,6 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error {
// write creates a new block that is the union of the provided blocks into dir.
// It cleans up all files of the old blocks after completing successfully.
func ( c * LeveledCompactor ) write ( dest string , meta * BlockMeta , blocks ... BlockReader ) ( err error ) {
level . Info ( c . logger ) . Log ( "msg" , "compact blocks" , "count" , len ( blocks ) , "mint" , meta . MinTime , "maxt" , meta . MaxTime )
dir := filepath . Join ( dest , meta . ULID . String ( ) )
tmp := dir + ".tmp"
@ -472,7 +490,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
return errors . Wrap ( err , "sync temporary dir file" )
}
// close temp dir before rename block dir(for windows platform)
// Close temp dir before rename block dir (for windows platform).
if err = df . Close ( ) ; err != nil {
return errors . Wrap ( err , "close temporary dir" )
}
@ -482,6 +500,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
if err := renameFile ( tmp , dir ) ; err != nil {
return errors . Wrap ( err , "rename block dir" )
}
return nil
}
@ -718,11 +737,6 @@ type compactionMerger struct {
intervals Intervals
}
type compactionSeries struct {
labels labels . Labels
chunks [ ] * chunks . Meta
}
func newCompactionMerger ( a , b ChunkSeriesSet ) ( * compactionMerger , error ) {
c := & compactionMerger {
a : a ,