|
|
|
|
@ -122,13 +122,42 @@ type DB struct { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type dbMetrics struct { |
|
|
|
|
activeAppenders prometheus.Gauge |
|
|
|
|
loadedBlocks prometheus.GaugeFunc |
|
|
|
|
reloads prometheus.Counter |
|
|
|
|
reloadsFailed prometheus.Counter |
|
|
|
|
reloadDuration prometheus.Summary |
|
|
|
|
samplesAppended prometheus.Counter |
|
|
|
|
compactionsTriggered prometheus.Counter |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func newDBMetrics(r prometheus.Registerer) *dbMetrics { |
|
|
|
|
func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { |
|
|
|
|
m := &dbMetrics{} |
|
|
|
|
|
|
|
|
|
m.activeAppenders = prometheus.NewGauge(prometheus.GaugeOpts{ |
|
|
|
|
Name: "tsdb_active_appenders", |
|
|
|
|
Help: "Number of currently active appender transactions", |
|
|
|
|
}) |
|
|
|
|
m.loadedBlocks = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ |
|
|
|
|
Name: "tsdb_blocks_loaded", |
|
|
|
|
Help: "Number of currently loaded data blocks", |
|
|
|
|
}, func() float64 { |
|
|
|
|
db.mtx.RLock() |
|
|
|
|
defer db.mtx.RUnlock() |
|
|
|
|
return float64(len(db.blocks)) |
|
|
|
|
}) |
|
|
|
|
m.reloads = prometheus.NewCounter(prometheus.CounterOpts{ |
|
|
|
|
Name: "tsdb_reloads_total", |
|
|
|
|
Help: "Number of times the database reloaded block data from disk.", |
|
|
|
|
}) |
|
|
|
|
m.reloadsFailed = prometheus.NewCounter(prometheus.CounterOpts{ |
|
|
|
|
Name: "tsdb_reloads_failures_total", |
|
|
|
|
Help: "Number of times the database failed to reload black data from disk.", |
|
|
|
|
}) |
|
|
|
|
m.reloadDuration = prometheus.NewSummary(prometheus.SummaryOpts{ |
|
|
|
|
Name: "tsdb_reload_duration_seconds", |
|
|
|
|
Help: "Duration of block reloads.", |
|
|
|
|
}) |
|
|
|
|
m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{ |
|
|
|
|
Name: "tsdb_samples_appended_total", |
|
|
|
|
Help: "Total number of appended sampledb.", |
|
|
|
|
@ -140,6 +169,11 @@ func newDBMetrics(r prometheus.Registerer) *dbMetrics { |
|
|
|
|
|
|
|
|
|
if r != nil { |
|
|
|
|
r.MustRegister( |
|
|
|
|
m.activeAppenders, |
|
|
|
|
m.loadedBlocks, |
|
|
|
|
m.reloads, |
|
|
|
|
m.reloadsFailed, |
|
|
|
|
m.reloadDuration, |
|
|
|
|
m.samplesAppended, |
|
|
|
|
m.compactionsTriggered, |
|
|
|
|
) |
|
|
|
|
@ -165,12 +199,13 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db |
|
|
|
|
db = &DB{ |
|
|
|
|
dir: dir, |
|
|
|
|
logger: l, |
|
|
|
|
metrics: newDBMetrics(r), |
|
|
|
|
opts: opts, |
|
|
|
|
compactc: make(chan struct{}, 1), |
|
|
|
|
donec: make(chan struct{}), |
|
|
|
|
stopc: make(chan struct{}), |
|
|
|
|
} |
|
|
|
|
db.metrics = newDBMetrics(db, r) |
|
|
|
|
|
|
|
|
|
if !opts.NoLockfile { |
|
|
|
|
absdir, err := filepath.Abs(dir) |
|
|
|
|
if err != nil { |
|
|
|
|
@ -374,7 +409,15 @@ func (db *DB) getBlock(id ulid.ULID) (Block, bool) { |
|
|
|
|
return nil, false |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (db *DB) reloadBlocks() error { |
|
|
|
|
func (db *DB) reloadBlocks() (err error) { |
|
|
|
|
defer func(t time.Time) { |
|
|
|
|
if err != nil { |
|
|
|
|
db.metrics.reloadsFailed.Inc() |
|
|
|
|
} |
|
|
|
|
db.metrics.reloads.Inc() |
|
|
|
|
db.metrics.reloadDuration.Observe(time.Since(t).Seconds()) |
|
|
|
|
}(time.Now()) |
|
|
|
|
|
|
|
|
|
var cs []io.Closer |
|
|
|
|
defer func() { closeAll(cs...) }() |
|
|
|
|
|
|
|
|
|
@ -447,7 +490,7 @@ func validateBlockSequence(bs []Block) error { |
|
|
|
|
prev := bs[0] |
|
|
|
|
for _, b := range bs[1:] { |
|
|
|
|
if b.Meta().MinTime < prev.Meta().MaxTime { |
|
|
|
|
return errors.Errorf("block time ranges overlap", b.Meta().MinTime, prev.Meta().MaxTime) |
|
|
|
|
return errors.Errorf("block time ranges overlap (%d, %d)", b.Meta().MinTime, prev.Meta().MaxTime) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
@ -480,6 +523,8 @@ func (db *DB) Close() error { |
|
|
|
|
|
|
|
|
|
// Appender returns a new Appender on the database.
|
|
|
|
|
func (db *DB) Appender() Appender { |
|
|
|
|
db.metrics.activeAppenders.Inc() |
|
|
|
|
|
|
|
|
|
db.mtx.RLock() |
|
|
|
|
return &dbAppender{db: db} |
|
|
|
|
} |
|
|
|
|
@ -619,6 +664,7 @@ func (db *DB) ensureHead(t int64) error { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (a *dbAppender) Commit() error { |
|
|
|
|
defer a.db.metrics.activeAppenders.Dec() |
|
|
|
|
defer a.db.mtx.RUnlock() |
|
|
|
|
|
|
|
|
|
// Commits to partial appenders must be concurrent as concurrent appenders
|
|
|
|
|
@ -649,6 +695,7 @@ func (a *dbAppender) Commit() error { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (a *dbAppender) Rollback() error { |
|
|
|
|
defer a.db.metrics.activeAppenders.Dec() |
|
|
|
|
defer a.db.mtx.RUnlock() |
|
|
|
|
|
|
|
|
|
var g errgroup.Group |
|
|
|
|
|