|
|
|
|
@ -300,7 +300,7 @@ func (db *DB) run() { |
|
|
|
|
case <-db.compactc: |
|
|
|
|
db.metrics.compactionsTriggered.Inc() |
|
|
|
|
|
|
|
|
|
_, err := db.compact() |
|
|
|
|
err := db.compact() |
|
|
|
|
if err != nil { |
|
|
|
|
level.Error(db.logger).Log("msg", "compaction failed", "err", err) |
|
|
|
|
backoff = exponential(backoff, 1*time.Second, 1*time.Minute) |
|
|
|
|
@ -366,12 +366,12 @@ func (a dbAppender) Commit() error { |
|
|
|
|
// this is sufficient to reliably delete old data.
|
|
|
|
|
// Old blocks are only deleted on reload based on the new block's parent information.
|
|
|
|
|
// See DB.reload documentation for further information.
|
|
|
|
|
func (db *DB) compact() (changes bool, err error) { |
|
|
|
|
func (db *DB) compact() (err error) { |
|
|
|
|
db.cmtx.Lock() |
|
|
|
|
defer db.cmtx.Unlock() |
|
|
|
|
|
|
|
|
|
if !db.compactionsEnabled { |
|
|
|
|
return false, nil |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Check whether we have pending head blocks that are ready to be persisted.
|
|
|
|
|
@ -379,7 +379,7 @@ func (db *DB) compact() (changes bool, err error) { |
|
|
|
|
for { |
|
|
|
|
select { |
|
|
|
|
case <-db.stopc: |
|
|
|
|
return changes, nil |
|
|
|
|
return nil |
|
|
|
|
default: |
|
|
|
|
} |
|
|
|
|
// The head has a compactable range if 1.5 level 0 ranges are between the oldest
|
|
|
|
|
@ -402,14 +402,13 @@ func (db *DB) compact() (changes bool, err error) { |
|
|
|
|
maxt: maxt - 1, |
|
|
|
|
} |
|
|
|
|
if _, err = db.compactor.Write(db.dir, head, mint, maxt, nil); err != nil { |
|
|
|
|
return changes, errors.Wrap(err, "persist head block") |
|
|
|
|
return errors.Wrap(err, "persist head block") |
|
|
|
|
} |
|
|
|
|
changes = true |
|
|
|
|
|
|
|
|
|
runtime.GC() |
|
|
|
|
|
|
|
|
|
if err := db.reload(); err != nil { |
|
|
|
|
return changes, errors.Wrap(err, "reload blocks") |
|
|
|
|
return errors.Wrap(err, "reload blocks") |
|
|
|
|
} |
|
|
|
|
runtime.GC() |
|
|
|
|
} |
|
|
|
|
@ -418,7 +417,7 @@ func (db *DB) compact() (changes bool, err error) { |
|
|
|
|
for { |
|
|
|
|
plan, err := db.compactor.Plan(db.dir) |
|
|
|
|
if err != nil { |
|
|
|
|
return changes, errors.Wrap(err, "plan compaction") |
|
|
|
|
return errors.Wrap(err, "plan compaction") |
|
|
|
|
} |
|
|
|
|
if len(plan) == 0 { |
|
|
|
|
break |
|
|
|
|
@ -426,23 +425,22 @@ func (db *DB) compact() (changes bool, err error) { |
|
|
|
|
|
|
|
|
|
select { |
|
|
|
|
case <-db.stopc: |
|
|
|
|
return changes, nil |
|
|
|
|
return nil |
|
|
|
|
default: |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if _, err := db.compactor.Compact(db.dir, plan...); err != nil { |
|
|
|
|
return changes, errors.Wrapf(err, "compact %s", plan) |
|
|
|
|
return errors.Wrapf(err, "compact %s", plan) |
|
|
|
|
} |
|
|
|
|
changes = true |
|
|
|
|
runtime.GC() |
|
|
|
|
|
|
|
|
|
if err := db.reload(); err != nil { |
|
|
|
|
return changes, errors.Wrap(err, "reload blocks") |
|
|
|
|
return errors.Wrap(err, "reload blocks") |
|
|
|
|
} |
|
|
|
|
runtime.GC() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return changes, nil |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (db *DB) getBlock(id ulid.ULID) (*Block, bool) { |
|
|
|
|
|