|
|
|
|
@ -324,7 +324,7 @@ func (db *DBReadOnly) FlushWAL(dir string) error { |
|
|
|
|
} |
|
|
|
|
mint := head.MinTime() |
|
|
|
|
maxt := head.MaxTime() |
|
|
|
|
rh := &rangeHead{ |
|
|
|
|
rh := &RangeHead{ |
|
|
|
|
head: head, |
|
|
|
|
mint: mint, |
|
|
|
|
maxt: maxt, |
|
|
|
|
@ -685,41 +685,61 @@ func (db *DB) Compact() (err error) { |
|
|
|
|
maxt := rangeForTimestamp(mint, db.head.chunkRange) |
|
|
|
|
|
|
|
|
|
// Wrap head into a range that bounds all reads to it.
|
|
|
|
|
head := &rangeHead{ |
|
|
|
|
head: db.head, |
|
|
|
|
mint: mint, |
|
|
|
|
// We remove 1 millisecond from maxt because block
|
|
|
|
|
// intervals are half-open: [b.MinTime, b.MaxTime). But
|
|
|
|
|
// chunk intervals are closed: [c.MinTime, c.MaxTime];
|
|
|
|
|
// so in order to make sure that overlaps are evaluated
|
|
|
|
|
// consistently, we explicitly remove the last value
|
|
|
|
|
// from the block interval here.
|
|
|
|
|
maxt: maxt - 1, |
|
|
|
|
} |
|
|
|
|
uid, err := db.compactor.Write(db.dir, head, mint, maxt, nil) |
|
|
|
|
if err != nil { |
|
|
|
|
return errors.Wrap(err, "persist head block") |
|
|
|
|
// We remove 1 millisecond from maxt because block
|
|
|
|
|
// intervals are half-open: [b.MinTime, b.MaxTime). But
|
|
|
|
|
// chunk intervals are closed: [c.MinTime, c.MaxTime];
|
|
|
|
|
// so in order to make sure that overlaps are evaluated
|
|
|
|
|
// consistently, we explicitly remove the last value
|
|
|
|
|
// from the block interval here.
|
|
|
|
|
head := NewRangeHead(db.head, mint, maxt-1) |
|
|
|
|
if err := db.compactHead(head, mint, maxt); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
runtime.GC() |
|
|
|
|
return db.compactBlocks() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if err := db.reload(); err != nil { |
|
|
|
|
if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil { |
|
|
|
|
return errors.Wrapf(err, "delete persisted head block after failed db reload:%s", uid) |
|
|
|
|
} |
|
|
|
|
return errors.Wrap(err, "reload blocks") |
|
|
|
|
// CompactHead compacts the given the RangeHead.
|
|
|
|
|
func (db *DB) CompactHead(head *RangeHead, mint, maxt int64) (err error) { |
|
|
|
|
db.cmtx.Lock() |
|
|
|
|
defer db.cmtx.Unlock() |
|
|
|
|
|
|
|
|
|
return db.compactHead(head, mint, maxt) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// compactHead compacts the given the RangeHead.
|
|
|
|
|
// The compaction mutex should be held before calling this method.
|
|
|
|
|
func (db *DB) compactHead(head *RangeHead, mint, maxt int64) (err error) { |
|
|
|
|
uid, err := db.compactor.Write(db.dir, head, mint, maxt, nil) |
|
|
|
|
if err != nil { |
|
|
|
|
return errors.Wrap(err, "persist head block") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
runtime.GC() |
|
|
|
|
|
|
|
|
|
if err := db.reload(); err != nil { |
|
|
|
|
if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil { |
|
|
|
|
return errors.Wrapf(err, "delete persisted head block after failed db reload:%s", uid) |
|
|
|
|
} |
|
|
|
|
if (uid == ulid.ULID{}) { |
|
|
|
|
// Compaction resulted in an empty block.
|
|
|
|
|
// Head truncating during db.reload() depends on the persisted blocks and
|
|
|
|
|
// in this case no new block will be persisted so manually truncate the head.
|
|
|
|
|
if err = db.head.Truncate(maxt); err != nil { |
|
|
|
|
return errors.Wrap(err, "head truncate failed (in compact)") |
|
|
|
|
} |
|
|
|
|
return errors.Wrap(err, "reload blocks") |
|
|
|
|
} |
|
|
|
|
if (uid == ulid.ULID{}) { |
|
|
|
|
// Compaction resulted in an empty block.
|
|
|
|
|
// Head truncating during db.reload() depends on the persisted blocks and
|
|
|
|
|
// in this case no new block will be persisted so manually truncate the head.
|
|
|
|
|
if err = db.head.Truncate(maxt); err != nil { |
|
|
|
|
return errors.Wrap(err, "head truncate failed (in compact)") |
|
|
|
|
} |
|
|
|
|
runtime.GC() |
|
|
|
|
} |
|
|
|
|
runtime.GC() |
|
|
|
|
|
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// compactBlocks compacts all the eligible on-disk blocks.
|
|
|
|
|
// The compaction mutex should be held before calling this method.
|
|
|
|
|
func (db *DB) compactBlocks() (err error) { |
|
|
|
|
// Check for compactions of multiple blocks.
|
|
|
|
|
for { |
|
|
|
|
plan, err := db.compactor.Plan(db.dir) |
|
|
|
|
@ -1192,7 +1212,7 @@ func (db *DB) Snapshot(dir string, withHead bool) error { |
|
|
|
|
|
|
|
|
|
mint := db.head.MinTime() |
|
|
|
|
maxt := db.head.MaxTime() |
|
|
|
|
head := &rangeHead{ |
|
|
|
|
head := &RangeHead{ |
|
|
|
|
head: db.head, |
|
|
|
|
mint: mint, |
|
|
|
|
maxt: maxt, |
|
|
|
|
@ -1221,7 +1241,7 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if maxt >= db.head.MinTime() { |
|
|
|
|
blocks = append(blocks, &rangeHead{ |
|
|
|
|
blocks = append(blocks, &RangeHead{ |
|
|
|
|
head: db.head, |
|
|
|
|
mint: mint, |
|
|
|
|
maxt: maxt, |
|
|
|
|
|