diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index f5b8cd3c57..8acecd9133 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -434,7 +434,7 @@ func main() { level.Info(logger).Log("vm_limits", prom_runtime.VMLimits()) var ( - localStorage = &readyStorage{} + localStorage = &readyStorage{stats: tsdb.NewDBStats()} scraper = &readyScrapeManager{} remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, cfg.localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper) fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage) @@ -815,11 +815,13 @@ func main() { return errors.New("flag 'storage.tsdb.max-block-chunk-segment-size' must be set over 1MB") } } + db, err := openDBWithMetrics( cfg.localStoragePath, logger, prometheus.DefaultRegisterer, &opts, + localStorage.getStats(), ) if err != nil { return errors.Wrapf(err, "opening storage failed") @@ -901,12 +903,13 @@ func main() { level.Info(logger).Log("msg", "See you next time!") } -func openDBWithMetrics(dir string, logger log.Logger, reg prometheus.Registerer, opts *tsdb.Options) (*tsdb.DB, error) { +func openDBWithMetrics(dir string, logger log.Logger, reg prometheus.Registerer, opts *tsdb.Options, stats *tsdb.DBStats) (*tsdb.DB, error) { db, err := tsdb.Open( dir, log.With(logger, "component", "tsdb"), reg, opts, + stats, ) if err != nil { return nil, err @@ -1076,6 +1079,7 @@ type readyStorage struct { mtx sync.RWMutex db *tsdb.DB startTimeMargin int64 + stats *tsdb.DBStats } // Set the storage. @@ -1087,7 +1091,6 @@ func (s *readyStorage) Set(db *tsdb.DB, startTimeMargin int64) { s.startTimeMargin = startTimeMargin } -// get is internal, you should use readyStorage as the front implementation layer. func (s *readyStorage) get() *tsdb.DB { s.mtx.RLock() x := s.db @@ -1095,6 +1098,13 @@ func (s *readyStorage) get() *tsdb.DB { return x } +func (s *readyStorage) getStats() *tsdb.DBStats { + s.mtx.RLock() + x := s.stats + s.mtx.RUnlock() + return x +} + // StartTime implements the Storage interface. func (s *readyStorage) StartTime() (int64, error) { if x := s.get(); x != nil { @@ -1197,6 +1207,14 @@ func (s *readyStorage) Stats(statsByLabelName string) (*tsdb.Stats, error) { return nil, tsdb.ErrNotReady } +// WALReplayStatus implements the api_v1.TSDBStats interface. +func (s *readyStorage) WALReplayStatus() (tsdb.WALReplayStatus, error) { + if x := s.getStats(); x != nil { + return x.Head.WALReplayStatus.GetWALReplayStatus(), nil + } + return tsdb.WALReplayStatus{}, tsdb.ErrNotReady +} + // ErrNotReady is returned if the underlying scrape manager is not ready yet. var ErrNotReady = errors.New("Scrape manager not ready") diff --git a/cmd/prometheus/main_test.go b/cmd/prometheus/main_test.go index 8cb545d1e3..17e930f417 100644 --- a/cmd/prometheus/main_test.go +++ b/cmd/prometheus/main_test.go @@ -289,7 +289,7 @@ func TestTimeMetrics(t *testing.T) { }() reg := prometheus.NewRegistry() - db, err := openDBWithMetrics(tmpDir, log.NewNopLogger(), reg, nil) + db, err := openDBWithMetrics(tmpDir, log.NewNopLogger(), reg, nil, nil) require.NoError(t, err) defer func() { require.NoError(t, db.Close()) diff --git a/cmd/promtool/backfill_test.go b/cmd/promtool/backfill_test.go index 05b701a0b6..ce1573637b 100644 --- a/cmd/promtool/backfill_test.go +++ b/cmd/promtool/backfill_test.go @@ -548,7 +548,7 @@ after_eof 1 2 } require.NoError(t, err) - db, err := tsdb.Open(outputDir, nil, nil, tsdb.DefaultOptions()) + db, err := tsdb.Open(outputDir, nil, nil, tsdb.DefaultOptions(), nil) require.NoError(t, err) defer func() { require.NoError(t, db.Close()) diff --git a/cmd/promtool/rules_test.go b/cmd/promtool/rules_test.go index 3459e89426..c6bc667e8d 100644 --- a/cmd/promtool/rules_test.go +++ b/cmd/promtool/rules_test.go @@ -115,7 +115,7 @@ func TestBackfillRuleIntegration(t *testing.T) { opts := tsdb.DefaultOptions() opts.AllowOverlappingBlocks = true - db, err := tsdb.Open(tmpDir, nil, nil, opts) + db, err := tsdb.Open(tmpDir, nil, nil, opts, nil) require.NoError(t, err) blocks := db.Blocks() diff --git a/cmd/promtool/tsdb.go b/cmd/promtool/tsdb.go index 9fb59cb9da..8714377953 100644 --- a/cmd/promtool/tsdb.go +++ b/cmd/promtool/tsdb.go @@ -87,7 +87,7 @@ func benchmarkWrite(outPath, samplesFile string, numMetrics, numScrapes int) err st, err := tsdb.Open(dir, l, nil, &tsdb.Options{ RetentionDuration: int64(15 * 24 * time.Hour / time.Millisecond), MinBlockDuration: int64(2 * time.Hour / time.Millisecond), - }) + }, tsdb.NewDBStats()) if err != nil { return err } diff --git a/docs/querying/api.md b/docs/querying/api.md index 9eab688cf1..ba13a32b85 100644 --- a/docs/querying/api.md +++ b/docs/querying/api.md @@ -1033,6 +1033,39 @@ $ curl http://localhost:9090/api/v1/status/tsdb *New in v2.15* +### WAL Replay Stats + +The following endpoint returns information about the WAL replay: + +``` +GET /api/v1/status/walreplay +``` + +**read**: The number of segments replayed so far. +**total**: The total number segments needed to be replayed. +**progress**: The progress of the replay (0 - 100%). +**state**: The state of the replay. Possible states: +- **waiting**: Waiting for the replay to start. +- **in progress**: The replay is in progress. +- **done**: The replay has finished. + +```json +$ curl http://localhost:9090/api/v1/status/walreplay +{ + "status": "success", + "data": { + "min": 2, + "max": 5, + "current": 40, + "state": "in progress" + } +} +``` + +NOTE: This endpoint is available before the server has been marked ready and is updated in real time to facilitate monitoring the progress of the WAL replay. + +*New in v2.28* + ## TSDB Admin APIs These are APIs that expose database functionalities for the advanced user. These APIs are not enabled unless the `--web.enable-admin-api` is set. diff --git a/tsdb/block_test.go b/tsdb/block_test.go index c292cdcf57..714d47a9f6 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -441,7 +441,7 @@ func createBlockFromHead(tb testing.TB, dir string, head *Head) string { func createHead(tb testing.TB, w *wal.WAL, series []storage.Series, chunkDir string) *Head { opts := DefaultHeadOptions() opts.ChunkDirRoot = chunkDir - head, err := NewHead(nil, nil, w, opts) + head, err := NewHead(nil, nil, w, opts, nil) require.NoError(tb, err) app := head.Appender(context.Background()) diff --git a/tsdb/blockwriter.go b/tsdb/blockwriter.go index 95d6f2854c..53b16bbecc 100644 --- a/tsdb/blockwriter.go +++ b/tsdb/blockwriter.go @@ -72,7 +72,7 @@ func (w *BlockWriter) initHead() error { opts := DefaultHeadOptions() opts.ChunkRange = w.blockSize opts.ChunkDirRoot = w.chunkDir - h, err := NewHead(nil, w.logger, nil, opts) + h, err := NewHead(nil, w.logger, nil, opts, NewHeadStats()) if err != nil { return errors.Wrap(err, "tsdb.NewHead") } diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 255a4c14d6..c00ea1144d 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -1096,7 +1096,7 @@ func BenchmarkCompactionFromHead(b *testing.B) { opts := DefaultHeadOptions() opts.ChunkRange = 1000 opts.ChunkDirRoot = chunkDir - h, err := NewHead(nil, nil, nil, opts) + h, err := NewHead(nil, nil, nil, opts, nil) require.NoError(b, err) for ln := 0; ln < labelNames; ln++ { app := h.Appender(context.Background()) @@ -1196,7 +1196,7 @@ func TestCancelCompactions(t *testing.T) { // Measure the compaction time without interrupting it. var timeCompactionUninterrupted time.Duration { - db, err := open(tmpdir, log.NewNopLogger(), nil, DefaultOptions(), []int64{1, 2000}) + db, err := open(tmpdir, log.NewNopLogger(), nil, DefaultOptions(), []int64{1, 2000}, nil) require.NoError(t, err) require.Equal(t, 3, len(db.Blocks()), "initial block count mismatch") require.Equal(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial compaction counter mismatch") @@ -1216,7 +1216,7 @@ func TestCancelCompactions(t *testing.T) { } // Measure the compaction time when closing the db in the middle of compaction. { - db, err := open(tmpdirCopy, log.NewNopLogger(), nil, DefaultOptions(), []int64{1, 2000}) + db, err := open(tmpdirCopy, log.NewNopLogger(), nil, DefaultOptions(), []int64{1, 2000}, nil) require.NoError(t, err) require.Equal(t, 3, len(db.Blocks()), "initial block count mismatch") require.Equal(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial compaction counter mismatch") diff --git a/tsdb/db.go b/tsdb/db.go index 2bb56a85c8..62cd302bce 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -297,6 +297,20 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { return m } +// DBStats contains statistics about the DB seperated by component (eg. head). +// They are available before the DB has finished initializing. +type DBStats struct { + Head *HeadStats +} + +// NewDBStats returns a new DBStats object initialized using the +// the new function from each component. +func NewDBStats() *DBStats { + return &DBStats{ + Head: NewHeadStats(), + } +} + // ErrClosed is returned when the db is closed. var ErrClosed = errors.New("db already closed") @@ -346,7 +360,7 @@ func (db *DBReadOnly) FlushWAL(dir string) (returnErr error) { } opts := DefaultHeadOptions() opts.ChunkDirRoot = db.dir - head, err := NewHead(nil, db.logger, w, opts) + head, err := NewHead(nil, db.logger, w, opts, NewHeadStats()) if err != nil { return err } @@ -402,7 +416,7 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue opts := DefaultHeadOptions() opts.ChunkDirRoot = db.dir - head, err := NewHead(nil, db.logger, nil, opts) + head, err := NewHead(nil, db.logger, nil, opts, NewHeadStats()) if err != nil { return nil, err } @@ -422,7 +436,7 @@ func (db *DBReadOnly) loadDataAsQueryable(maxt int64) (storage.SampleAndChunkQue } opts := DefaultHeadOptions() opts.ChunkDirRoot = db.dir - head, err = NewHead(nil, db.logger, w, opts) + head, err = NewHead(nil, db.logger, w, opts, NewHeadStats()) if err != nil { return nil, err } @@ -541,10 +555,11 @@ func (db *DBReadOnly) Close() error { } // Open returns a new DB in the given directory. If options are empty, DefaultOptions will be used. -func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db *DB, err error) { +func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, stats *DBStats) (db *DB, err error) { var rngs []int64 opts, rngs = validateOpts(opts, nil) - return open(dir, l, r, opts, rngs) + + return open(dir, l, r, opts, rngs, stats) } func validateOpts(opts *Options, rngs []int64) (*Options, []int64) { @@ -575,13 +590,16 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) { return opts, rngs } -func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs []int64) (_ *DB, returnedErr error) { +func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs []int64, stats *DBStats) (_ *DB, returnedErr error) { if err := os.MkdirAll(dir, 0777); err != nil { return nil, err } if l == nil { l = log.NewNopLogger() } + if stats == nil { + stats = NewDBStats() + } for i, v := range rngs { if v > opts.MaxBlockDuration { @@ -678,7 +696,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs headOpts.StripeSize = opts.StripeSize headOpts.SeriesCallback = opts.SeriesLifecycleCallback headOpts.NumExemplars = opts.MaxExemplars - db.head, err = NewHead(r, l, wlog, headOpts) + db.head, err = NewHead(r, l, wlog, headOpts, stats.Head) if err != nil { return nil, err } diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 8c0808e90b..89b83c6b20 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -62,10 +62,10 @@ func openTestDB(t testing.TB, opts *Options, rngs []int64) (db *DB) { require.NoError(t, err) if len(rngs) == 0 { - db, err = Open(tmpdir, nil, nil, opts) + db, err = Open(tmpdir, nil, nil, opts, nil) } else { opts, rngs = validateOpts(opts, rngs) - db, err = open(tmpdir, nil, nil, opts, rngs) + db, err = open(tmpdir, nil, nil, opts, rngs, nil) } require.NoError(t, err) @@ -241,7 +241,7 @@ func TestNoPanicAfterWALCorruption(t *testing.T) { // Query the data. { - db, err := Open(db.Dir(), nil, nil, nil) + db, err := Open(db.Dir(), nil, nil, nil, nil) require.NoError(t, err) defer func() { require.NoError(t, db.Close()) @@ -581,7 +581,7 @@ func TestDB_Snapshot(t *testing.T) { require.NoError(t, db.Close()) // reopen DB from snapshot - db, err = Open(snap, nil, nil, nil) + db, err = Open(snap, nil, nil, nil, nil) require.NoError(t, err) defer func() { require.NoError(t, db.Close()) }() @@ -633,7 +633,7 @@ func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) { require.NoError(t, db.Close()) // Reopen DB from snapshot. - db, err = Open(snap, nil, nil, nil) + db, err = Open(snap, nil, nil, nil, nil) require.NoError(t, err) defer func() { require.NoError(t, db.Close()) }() @@ -703,7 +703,7 @@ Outer: require.NoError(t, db.Close()) // reopen DB from snapshot - db, err = Open(snap, nil, nil, nil) + db, err = Open(snap, nil, nil, nil, nil) require.NoError(t, err) defer func() { require.NoError(t, db.Close()) }() @@ -923,7 +923,7 @@ func TestWALFlushedOnDBClose(t *testing.T) { require.NoError(t, db.Close()) - db, err = Open(dirDb, nil, nil, nil) + db, err = Open(dirDb, nil, nil, nil, nil) require.NoError(t, err) defer func() { require.NoError(t, db.Close()) }() @@ -1046,7 +1046,7 @@ func TestTombstoneClean(t *testing.T) { require.NoError(t, db.Close()) // Reopen DB from snapshot. - db, err = Open(snap, nil, nil, nil) + db, err = Open(snap, nil, nil, nil, nil) require.NoError(t, err) defer db.Close() @@ -1135,7 +1135,7 @@ func TestTombstoneCleanResultEmptyBlock(t *testing.T) { require.NoError(t, db.Close()) // Reopen DB from snapshot. - db, err = Open(snap, nil, nil, nil) + db, err = Open(snap, nil, nil, nil, nil) require.NoError(t, err) defer db.Close() @@ -1757,7 +1757,7 @@ func TestInitializeHeadTimestamp(t *testing.T) { require.NoError(t, os.RemoveAll(dir)) }() - db, err := Open(dir, nil, nil, nil) + db, err := Open(dir, nil, nil, nil, nil) require.NoError(t, err) defer db.Close() @@ -1799,7 +1799,7 @@ func TestInitializeHeadTimestamp(t *testing.T) { require.NoError(t, err) require.NoError(t, w.Close()) - db, err := Open(dir, nil, nil, nil) + db, err := Open(dir, nil, nil, nil, nil) require.NoError(t, err) defer db.Close() @@ -1815,7 +1815,7 @@ func TestInitializeHeadTimestamp(t *testing.T) { createBlock(t, dir, genSeries(1, 1, 1000, 2000)) - db, err := Open(dir, nil, nil, nil) + db, err := Open(dir, nil, nil, nil, nil) require.NoError(t, err) defer db.Close() @@ -1851,7 +1851,7 @@ func TestInitializeHeadTimestamp(t *testing.T) { r := prometheus.NewRegistry() - db, err := Open(dir, nil, r, nil) + db, err := Open(dir, nil, r, nil, nil) require.NoError(t, err) defer db.Close() @@ -2125,7 +2125,7 @@ func TestBlockRanges(t *testing.T) { // when a non standard block already exists. firstBlockMaxT := int64(3) createBlock(t, dir, genSeries(1, 1, 0, firstBlockMaxT)) - db, err := open(dir, logger, nil, DefaultOptions(), []int64{10000}) + db, err := open(dir, logger, nil, DefaultOptions(), []int64{10000}, nil) require.NoError(t, err) rangeToTriggerCompaction := db.compactor.(*LeveledCompactor).ranges[0]/2*3 + 1 @@ -2177,7 +2177,7 @@ func TestBlockRanges(t *testing.T) { thirdBlockMaxt := secondBlockMaxt + 2 createBlock(t, dir, genSeries(1, 1, secondBlockMaxt+1, thirdBlockMaxt)) - db, err = open(dir, logger, nil, DefaultOptions(), []int64{10000}) + db, err = open(dir, logger, nil, DefaultOptions(), []int64{10000}, nil) require.NoError(t, err) defer db.Close() @@ -2245,7 +2245,7 @@ func TestDBReadOnly(t *testing.T) { // Open a normal db to use for a comparison. { - dbWritable, err := Open(dbDir, logger, nil, nil) + dbWritable, err := Open(dbDir, logger, nil, nil, nil) require.NoError(t, err) dbWritable.DisableCompactions() @@ -2347,7 +2347,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) { }() // Append data to the WAL. - db, err := Open(dbDir, logger, nil, nil) + db, err := Open(dbDir, logger, nil, nil, nil) require.NoError(t, err) db.DisableCompactions() app := db.Appender(ctx) @@ -2408,7 +2408,7 @@ func TestDBCannotSeePartialCommits(t *testing.T) { require.NoError(t, os.RemoveAll(tmpdir)) }() - db, err := Open(tmpdir, nil, nil, nil) + db, err := Open(tmpdir, nil, nil, nil, nil) require.NoError(t, err) defer db.Close() @@ -2478,7 +2478,7 @@ func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) { require.NoError(t, os.RemoveAll(tmpdir)) }() - db, err := Open(tmpdir, nil, nil, nil) + db, err := Open(tmpdir, nil, nil, nil, nil) require.NoError(t, err) defer db.Close() @@ -2805,7 +2805,7 @@ func TestCompactHead(t *testing.T) { WALCompression: true, } - db, err := Open(dbDir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg) + db, err := Open(dbDir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg, nil) require.NoError(t, err) ctx := context.Background() app := db.Appender(ctx) @@ -2826,7 +2826,7 @@ func TestCompactHead(t *testing.T) { // Delete everything but the new block and // reopen the db to query it to ensure it includes the head data. require.NoError(t, deleteNonBlocks(db.Dir())) - db, err = Open(dbDir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg) + db, err = Open(dbDir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg, nil) require.NoError(t, err) require.Equal(t, 1, len(db.Blocks())) require.Equal(t, int64(maxt), db.Head().MinTime()) @@ -2955,7 +2955,7 @@ func TestOpen_VariousBlockStates(t *testing.T) { opts := DefaultOptions() opts.RetentionDuration = 0 - db, err := Open(tmpDir, log.NewLogfmtLogger(os.Stderr), nil, opts) + db, err := Open(tmpDir, log.NewLogfmtLogger(os.Stderr), nil, opts, nil) require.NoError(t, err) loadedBlocks := db.Blocks() @@ -3000,7 +3000,7 @@ func TestOneCheckpointPerCompactCall(t *testing.T) { require.NoError(t, os.RemoveAll(tmpDir)) }) - db, err := Open(tmpDir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg) + db, err := Open(tmpDir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg, nil) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, db.Close()) @@ -3061,7 +3061,7 @@ func TestOneCheckpointPerCompactCall(t *testing.T) { createBlock(t, db.dir, genSeries(1, 1, newBlockMint, newBlockMaxt)) - db, err = Open(db.dir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg) + db, err = Open(db.dir, log.NewNopLogger(), prometheus.NewRegistry(), tsdbCfg, nil) require.NoError(t, err) db.DisableCompactions() @@ -3119,7 +3119,7 @@ func TestNoPanicOnTSDBOpenError(t *testing.T) { lockf, _, err := fileutil.Flock(filepath.Join(absdir, "lock")) require.NoError(t, err) - _, err = Open(tmpdir, nil, nil, DefaultOptions()) + _, err = Open(tmpdir, nil, nil, DefaultOptions(), nil) require.Error(t, err) require.NoError(t, lockf.Release()) diff --git a/tsdb/head.go b/tsdb/head.go index f98e21ebe9..b90616550f 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -105,6 +105,8 @@ type Head struct { closedMtx sync.Mutex closed bool + + stats *HeadStats } // HeadOptions are parameters for the Head block. @@ -307,6 +309,38 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { return m } +// HeadStats are the statistics for the head component of the DB. +type HeadStats struct { + WALReplayStatus *WALReplayStatus +} + +// NewHeadStats returns a new HeadStats object. +func NewHeadStats() *HeadStats { + return &HeadStats{ + WALReplayStatus: &WALReplayStatus{}, + } +} + +// WALReplayStatus contains status information about the WAL replay. +type WALReplayStatus struct { + sync.RWMutex + Min int + Max int + Current int +} + +// GetWALReplayStatus returns the WAL replay status information. +func (s *WALReplayStatus) GetWALReplayStatus() WALReplayStatus { + s.RLock() + defer s.RUnlock() + + return WALReplayStatus{ + Min: s.Min, + Max: s.Max, + Current: s.Current, + } +} + const cardinalityCacheExpirationTime = time.Duration(30) * time.Second // PostingsCardinalityStats returns top 10 highest cardinality stats By label and value names. @@ -328,7 +362,7 @@ func (h *Head) PostingsCardinalityStats(statsByLabelName string) *index.Postings } // NewHead opens the head block in dir. -func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOptions) (*Head, error) { +func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOptions, stats *HeadStats) (*Head, error) { if l == nil { l = log.NewNopLogger() } @@ -344,6 +378,10 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOpti return nil, err } + if stats == nil { + stats = NewHeadStats() + } + h := &Head{ wal: wal, logger: l, @@ -360,6 +398,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOpti return &memChunk{} }, }, + stats: stats, } h.chunkRange.Store(opts.ChunkRange) h.minTime.Store(math.MaxInt64) @@ -795,6 +834,8 @@ func (h *Head) Init(minValidTime int64) error { return errors.Wrap(err, "finding WAL segments") } + h.startWALReplayStatus(startFrom, last) + // Backfill segments from the most recent checkpoint onwards. for i := startFrom; i <= last; i++ { s, err := wal.OpenReadSegment(wal.SegmentName(h.wal.Dir(), i)) @@ -811,6 +852,7 @@ func (h *Head) Init(minValidTime int64) error { return err } level.Info(h.logger).Log("msg", "WAL segment loaded", "segment", i, "maxSegment", last) + h.updateWALReplayStatusRead(i) } walReplayDuration := time.Since(start) @@ -2701,3 +2743,19 @@ func (h *Head) Size() int64 { func (h *RangeHead) Size() int64 { return h.head.Size() } + +func (h *Head) startWALReplayStatus(startFrom, last int) { + h.stats.WALReplayStatus.Lock() + defer h.stats.WALReplayStatus.Unlock() + + h.stats.WALReplayStatus.Min = startFrom + h.stats.WALReplayStatus.Max = last + h.stats.WALReplayStatus.Current = startFrom +} + +func (h *Head) updateWALReplayStatusRead(current int) { + h.stats.WALReplayStatus.Lock() + defer h.stats.WALReplayStatus.Unlock() + + h.stats.WALReplayStatus.Current = current +} diff --git a/tsdb/head_bench_test.go b/tsdb/head_bench_test.go index 9671886b7e..2f9100d13d 100644 --- a/tsdb/head_bench_test.go +++ b/tsdb/head_bench_test.go @@ -36,7 +36,7 @@ func BenchmarkHeadStripeSeriesCreate(b *testing.B) { opts := DefaultHeadOptions() opts.ChunkRange = 1000 opts.ChunkDirRoot = chunkDir - h, err := NewHead(nil, nil, nil, opts) + h, err := NewHead(nil, nil, nil, opts, nil) require.NoError(b, err) defer h.Close() @@ -55,7 +55,7 @@ func BenchmarkHeadStripeSeriesCreateParallel(b *testing.B) { opts := DefaultHeadOptions() opts.ChunkRange = 1000 opts.ChunkDirRoot = chunkDir - h, err := NewHead(nil, nil, nil, opts) + h, err := NewHead(nil, nil, nil, opts, nil) require.NoError(b, err) defer h.Close() @@ -83,7 +83,7 @@ func BenchmarkHeadStripeSeriesCreate_PreCreationFailure(b *testing.B) { // Mock the PreCreation() callback to fail on each series. opts.SeriesCallback = failingSeriesLifecycleCallback{} - h, err := NewHead(nil, nil, nil, opts) + h, err := NewHead(nil, nil, nil, opts, nil) require.NoError(b, err) defer h.Close() diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 6ae7c5104b..371e0ecd62 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -52,7 +52,7 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal. opts.ChunkRange = chunkRange opts.ChunkDirRoot = dir opts.NumExemplars = 10 - h, err := NewHead(nil, nil, wlog, opts) + h, err := NewHead(nil, nil, wlog, opts, nil) require.NoError(t, err) require.NoError(t, h.chunkDiskMapper.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil })) @@ -230,7 +230,7 @@ func BenchmarkLoadWAL(b *testing.B) { opts := DefaultHeadOptions() opts.ChunkRange = 1000 opts.ChunkDirRoot = w.Dir() - h, err := NewHead(nil, nil, w, opts) + h, err := NewHead(nil, nil, w, opts, nil) require.NoError(b, err) h.Init(0) } @@ -354,7 +354,7 @@ func TestHead_WALMultiRef(t *testing.T) { opts := DefaultHeadOptions() opts.ChunkRange = 1000 opts.ChunkDirRoot = w.Dir() - head, err = NewHead(nil, nil, w, opts) + head, err = NewHead(nil, nil, w, opts, nil) require.NoError(t, err) require.NoError(t, head.Init(0)) defer func() { @@ -638,7 +638,7 @@ func TestHeadDeleteSimple(t *testing.T) { opts := DefaultHeadOptions() opts.ChunkRange = 1000 opts.ChunkDirRoot = reloadedW.Dir() - reloadedHead, err := NewHead(nil, nil, reloadedW, opts) + reloadedHead, err := NewHead(nil, nil, reloadedW, opts, nil) require.NoError(t, err) require.NoError(t, reloadedHead.Init(0)) @@ -1323,7 +1323,7 @@ func TestWalRepair_DecodingError(t *testing.T) { opts := DefaultHeadOptions() opts.ChunkRange = 1 opts.ChunkDirRoot = w.Dir() - h, err := NewHead(nil, nil, w, opts) + h, err := NewHead(nil, nil, w, opts, nil) require.NoError(t, err) require.Equal(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal)) initErr := h.Init(math.MinInt64) @@ -1336,7 +1336,7 @@ func TestWalRepair_DecodingError(t *testing.T) { // Open the db to trigger a repair. { - db, err := Open(dir, nil, nil, DefaultOptions()) + db, err := Open(dir, nil, nil, DefaultOptions(), nil) require.NoError(t, err) defer func() { require.NoError(t, db.Close()) @@ -1381,7 +1381,7 @@ func TestHeadReadWriterRepair(t *testing.T) { opts := DefaultHeadOptions() opts.ChunkRange = chunkRange opts.ChunkDirRoot = dir - h, err := NewHead(nil, nil, w, opts) + h, err := NewHead(nil, nil, w, opts, nil) require.NoError(t, err) require.Equal(t, 0.0, prom_testutil.ToFloat64(h.metrics.mmapChunkCorruptionTotal)) require.NoError(t, h.Init(math.MinInt64)) @@ -1416,7 +1416,7 @@ func TestHeadReadWriterRepair(t *testing.T) { // Open the db to trigger a repair. { - db, err := Open(dir, nil, nil, DefaultOptions()) + db, err := Open(dir, nil, nil, DefaultOptions(), nil) require.NoError(t, err) defer func() { require.NoError(t, db.Close()) @@ -1614,7 +1614,7 @@ func TestMemSeriesIsolation(t *testing.T) { opts := DefaultHeadOptions() opts.ChunkRange = 1000 opts.ChunkDirRoot = wlog.Dir() - hb, err = NewHead(nil, nil, wlog, opts) + hb, err = NewHead(nil, nil, wlog, opts, nil) defer func() { require.NoError(t, hb.Close()) }() require.NoError(t, err) require.NoError(t, hb.Init(0)) @@ -1758,7 +1758,7 @@ func TestOutOfOrderSamplesMetric(t *testing.T) { require.NoError(t, os.RemoveAll(dir)) }() - db, err := Open(dir, nil, nil, DefaultOptions()) + db, err := Open(dir, nil, nil, DefaultOptions(), nil) require.NoError(t, err) defer func() { require.NoError(t, db.Close()) diff --git a/tsdb/querier_bench_test.go b/tsdb/querier_bench_test.go index a04b1d3e4a..ed2d6fb4b1 100644 --- a/tsdb/querier_bench_test.go +++ b/tsdb/querier_bench_test.go @@ -40,7 +40,7 @@ func BenchmarkPostingsForMatchers(b *testing.B) { opts := DefaultHeadOptions() opts.ChunkRange = 1000 opts.ChunkDirRoot = chunkDir - h, err := NewHead(nil, nil, nil, opts) + h, err := NewHead(nil, nil, nil, opts, nil) require.NoError(b, err) defer func() { require.NoError(b, h.Close()) @@ -152,7 +152,7 @@ func BenchmarkQuerierSelect(b *testing.B) { opts := DefaultHeadOptions() opts.ChunkRange = 1000 opts.ChunkDirRoot = chunkDir - h, err := NewHead(nil, nil, nil, opts) + h, err := NewHead(nil, nil, nil, opts, nil) require.NoError(b, err) defer h.Close() app := h.Appender(context.Background()) diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 7586ffde57..8f82d1d55e 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -409,7 +409,7 @@ func TestBlockQuerier_AgainstHeadWithOpenChunks(t *testing.T) { t.Run("", func(t *testing.T) { opts := DefaultHeadOptions() opts.ChunkRange = 2 * time.Hour.Milliseconds() - h, err := NewHead(nil, nil, nil, opts) + h, err := NewHead(nil, nil, nil, opts, nil) require.NoError(t, err) defer h.Close() @@ -1579,7 +1579,7 @@ func TestPostingsForMatchers(t *testing.T) { opts := DefaultHeadOptions() opts.ChunkRange = 1000 opts.ChunkDirRoot = chunkDir - h, err := NewHead(nil, nil, nil, opts) + h, err := NewHead(nil, nil, nil, opts, nil) require.NoError(t, err) defer func() { require.NoError(t, h.Close()) @@ -1844,7 +1844,7 @@ func TestClose(t *testing.T) { createBlock(t, dir, genSeries(1, 1, 0, 10)) createBlock(t, dir, genSeries(1, 1, 10, 20)) - db, err := Open(dir, nil, nil, DefaultOptions()) + db, err := Open(dir, nil, nil, DefaultOptions(), nil) if err != nil { t.Fatalf("Opening test storage failed: %s", err) } diff --git a/tsdb/repair_test.go b/tsdb/repair_test.go index cf75a28f2e..7fb2720fd6 100644 --- a/tsdb/repair_test.go +++ b/tsdb/repair_test.go @@ -95,7 +95,7 @@ func TestRepairBadIndexVersion(t *testing.T) { require.NoError(t, r.Close()) // On DB opening all blocks in the base dir should be repaired. - db, err := Open(tmpDir, nil, nil, nil) + db, err := Open(tmpDir, nil, nil, nil, nil) require.NoError(t, err) db.Close() diff --git a/util/teststorage/storage.go b/util/teststorage/storage.go index 16b3c3c7c6..75c179c522 100644 --- a/util/teststorage/storage.go +++ b/util/teststorage/storage.go @@ -39,7 +39,7 @@ func New(t testutil.T) *TestStorage { opts.MinBlockDuration = int64(24 * time.Hour / time.Millisecond) opts.MaxBlockDuration = int64(24 * time.Hour / time.Millisecond) opts.MaxExemplars = 10 - db, err := tsdb.Open(dir, nil, nil, opts) + db, err := tsdb.Open(dir, nil, nil, opts, tsdb.NewDBStats()) if err != nil { t.Fatalf("Opening test storage failed: %s", err) } diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 3483c36456..1415092a65 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -154,6 +154,7 @@ type TSDBAdminStats interface { Snapshot(dir string, withHead bool) error Stats(statsByLabelName string) (*tsdb.Stats, error) + WALReplayStatus() (tsdb.WALReplayStatus, error) } // API can register a set of endpoints in a router and handle @@ -309,6 +310,7 @@ func (api *API) Register(r *route.Router) { r.Get("/status/buildinfo", wrap(api.serveBuildInfo)) r.Get("/status/flags", wrap(api.serveFlags)) r.Get("/status/tsdb", wrap(api.serveTSDBStatus)) + r.Get("/status/walreplay", api.serveWALReplayStatus) r.Post("/read", api.ready(http.HandlerFunc(api.remoteRead))) r.Post("/write", api.ready(http.HandlerFunc(api.remoteWrite))) @@ -1351,6 +1353,25 @@ func (api *API) serveTSDBStatus(*http.Request) apiFuncResult { }, nil, nil, nil} } +type walReplayStatus struct { + Min int `json:"min"` + Max int `json:"max"` + Current int `json:"current"` +} + +func (api *API) serveWALReplayStatus(w http.ResponseWriter, r *http.Request) { + httputil.SetCORS(w, api.CORSOrigin, r) + status, err := api.db.WALReplayStatus() + if err != nil { + api.respondError(w, &apiError{errorInternal, err}, nil) + } + api.respond(w, walReplayStatus{ + Min: status.Min, + Max: status.Max, + Current: status.Current, + }, nil) +} + func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) { // This is only really for tests - this will never be nil IRL. if api.remoteReadHandler != nil { diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 1f4a0e0277..5fd740b061 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -2115,9 +2115,12 @@ func (f *fakeDB) Stats(statsByLabelName string) (_ *tsdb.Stats, retErr error) { }() opts := tsdb.DefaultHeadOptions() opts.ChunkRange = 1000 - h, _ := tsdb.NewHead(nil, nil, nil, opts) + h, _ := tsdb.NewHead(nil, nil, nil, opts, nil) return h.Stats(statsByLabelName), nil } +func (f *fakeDB) WALReplayStatus() (tsdb.WALReplayStatus, error) { + return tsdb.WALReplayStatus{}, nil +} func TestAdminEndpoints(t *testing.T) { tsdb, tsdbWithError, tsdbNotReady := &fakeDB{}, &fakeDB{err: errors.New("some error")}, &fakeDB{err: errors.Wrap(tsdb.ErrNotReady, "wrap")} diff --git a/web/ui/react-app/src/App.tsx b/web/ui/react-app/src/App.tsx index 6729c0db4f..143b3d3ce2 100755 --- a/web/ui/react-app/src/App.tsx +++ b/web/ui/react-app/src/App.tsx @@ -2,13 +2,15 @@ import React, { FC } from 'react'; import Navigation from './Navbar'; import { Container } from 'reactstrap'; -import { Router, Redirect } from '@reach/router'; +import { Router, Redirect, navigate } from '@reach/router'; import useMedia from 'use-media'; -import { Alerts, Config, Flags, Rules, ServiceDiscovery, Status, Targets, TSDBStatus, PanelList } from './pages'; +import { Alerts, Config, Flags, Rules, ServiceDiscovery, Status, Targets, TSDBStatus, PanelList, Starting } from './pages'; import { PathPrefixContext } from './contexts/PathPrefixContext'; import { ThemeContext, themeName, themeSetting } from './contexts/ThemeContext'; import { Theme, themeLocalStorageKey } from './Theme'; import { useLocalStorage } from './hooks/useLocalStorage'; +import { useFetchReady } from './hooks/useFetch'; +import { usePathPrefix } from './contexts/PathPrefixContext'; interface AppProps { consolesLink: string | null; @@ -29,6 +31,7 @@ const App: FC = ({ consolesLink }) => { '/rules', '/targets', '/service-discovery', + '/starting', ]; if (basePath.endsWith('/')) { basePath = basePath.slice(0, -1); @@ -42,6 +45,14 @@ const App: FC = ({ consolesLink }) => { } } + const pathPrefix = usePathPrefix(); + const { ready, isLoading, isUnexpected } = useFetchReady(pathPrefix); + if (basePath !== '/starting') { + if (!ready && !isLoading && !isUnexpected) { + navigate('/starting'); + } + } + const [userTheme, setUserTheme] = useLocalStorage(themeLocalStorageKey, 'auto'); const browserHasThemes = useMedia('(prefers-color-scheme)'); const browserWantsDarkTheme = useMedia('(prefers-color-scheme: dark)'); @@ -76,6 +87,7 @@ const App: FC = ({ consolesLink }) => { + diff --git a/web/ui/react-app/src/hooks/useFetch.ts b/web/ui/react-app/src/hooks/useFetch.ts index 5f18b369a9..f88b01c0f0 100644 --- a/web/ui/react-app/src/hooks/useFetch.ts +++ b/web/ui/react-app/src/hooks/useFetch.ts @@ -1,4 +1,6 @@ import { useState, useEffect } from 'react'; +import { API_PATH } from '../constants/constants'; +import { WALReplayStatus } from '../types/types'; export type APIResponse = { status: string; data: T }; @@ -8,6 +10,18 @@ export interface FetchState { isLoading: boolean; } +export interface FetchStateReady { + ready: boolean; + isUnexpected: boolean; + isLoading: boolean; +} + +export interface FetchStateReadyInterval { + ready: boolean; + isUnexpected: boolean; + walReplayStatus: WALReplayStatus; +} + export const useFetch = (url: string, options?: RequestInit): FetchState => { const [response, setResponse] = useState>({ status: 'start fetching' } as any); const [error, setError] = useState(); @@ -32,3 +46,73 @@ export const useFetch = (url: string, options?: RequestInit): Fetc }, [url, options]); return { response, error, isLoading }; }; + +export const useFetchReady = (pathPrefix: string, options?: RequestInit): FetchStateReady => { + const [ready, setReady] = useState(false); + const [isUnexpected, setIsUnexpected] = useState(false); + const [isLoading, setIsLoading] = useState(true); + + useEffect(() => { + const fetchData = async () => { + setIsLoading(true); + try { + const res = await fetch(`${pathPrefix}/-/ready`, { cache: 'no-store', credentials: 'same-origin', ...options }); + if (res.status === 200) { + setReady(true); + } + // The server sends back a 503 if it isn't ready, + // if we get back anything else that means something has gone wrong. + if (res.status !== 503) { + setIsUnexpected(true); + } else { + setIsUnexpected(false); + } + + setIsLoading(false); + } catch (error) { + setIsUnexpected(true); + } + }; + fetchData(); + }, [pathPrefix, options]); + return { ready, isUnexpected, isLoading }; +}; + +// This is used on the starting page to periodically check if the server is ready yet, +// and check the status of the WAL replay. +export const useFetchReadyInterval = (pathPrefix: string, options?: RequestInit): FetchStateReadyInterval => { + const [ready, setReady] = useState(false); + const [isUnexpected, setIsUnexpected] = useState(false); + const [walReplayStatus, setWALReplayStatus] = useState({} as any); + + useEffect(() => { + const interval = setInterval(async () => { + try { + let res = await fetch(`${pathPrefix}/-/ready`, { cache: 'no-store', credentials: 'same-origin', ...options }); + if (res.status === 200) { + setReady(true); + clearInterval(interval); + return; + } + if (res.status !== 503) { + setIsUnexpected(true); + setWALReplayStatus({ data: { last: 0, first: 0 } } as any); + } else { + setIsUnexpected(false); + + res = await fetch(`${pathPrefix}/${API_PATH}/status/walreplay`, { cache: 'no-store', credentials: 'same-origin' }); + if (res.ok) { + const data = (await res.json()) as WALReplayStatus; + setWALReplayStatus(data); + } + } + } catch (error) { + setIsUnexpected(true); + setWALReplayStatus({ data: { last: 0, first: 0 } } as any); + } + }, 1000); + + return () => clearInterval(interval); + }, [pathPrefix, options]); + return { ready, isUnexpected, walReplayStatus }; +}; diff --git a/web/ui/react-app/src/pages/index.ts b/web/ui/react-app/src/pages/index.ts index d6f07d798d..d8ad5e72be 100644 --- a/web/ui/react-app/src/pages/index.ts +++ b/web/ui/react-app/src/pages/index.ts @@ -7,5 +7,6 @@ import Status from './status/Status'; import Targets from './targets/Targets'; import PanelList from './graph/PanelList'; import TSDBStatus from './tsdbStatus/TSDBStatus'; +import Starting from './starting/Starting'; -export { Alerts, Config, Flags, Rules, ServiceDiscovery, Status, Targets, TSDBStatus, PanelList }; +export { Alerts, Config, Flags, Rules, ServiceDiscovery, Status, Targets, TSDBStatus, PanelList, Starting }; diff --git a/web/ui/react-app/src/pages/starting/Starting.tsx b/web/ui/react-app/src/pages/starting/Starting.tsx new file mode 100644 index 0000000000..24d45e1fbc --- /dev/null +++ b/web/ui/react-app/src/pages/starting/Starting.tsx @@ -0,0 +1,60 @@ +import React, { FC, useEffect } from 'react'; +import { RouteComponentProps, navigate } from '@reach/router'; +import { Progress, Alert } from 'reactstrap'; + +import { useFetchReadyInterval } from '../../hooks/useFetch'; +import { WALReplayData } from '../../types/types'; +import { usePathPrefix } from '../../contexts/PathPrefixContext'; + +interface StartingContentProps { + isUnexpected: boolean; + status?: WALReplayData; +} + +export const StartingContent: FC = ({ status, isUnexpected }) => { + if (isUnexpected) { + return ( + + Error: Server is not responding + + ); + } + + return ( +
+
+

Starting up...

+ {status?.current! > status?.min! ? ( +
+

+ Replaying WAL ({status?.current}/{status?.max}) +

+ +
+ ) : null} +
+
+ ); +}; + +const Starting: FC = () => { + const pathPrefix = usePathPrefix(); + const { ready, walReplayStatus, isUnexpected } = useFetchReadyInterval(pathPrefix); + + useEffect(() => { + if (ready) { + navigate('/'); + } + }, [ready]); + + return ; +}; + +export default Starting; diff --git a/web/ui/react-app/src/types/types.ts b/web/ui/react-app/src/types/types.ts index a30e64107a..1eee32d3db 100644 --- a/web/ui/react-app/src/types/types.ts +++ b/web/ui/react-app/src/types/types.ts @@ -24,3 +24,13 @@ export interface Rule { state: RuleState; type: string; } + +export interface WALReplayData { + min: number; + max: number; + current: number; +} + +export interface WALReplayStatus { + data?: WALReplayData; +} diff --git a/web/web.go b/web/web.go index 374627a9e3..0ba558a37e 100644 --- a/web/web.go +++ b/web/web.go @@ -79,6 +79,7 @@ var reactRouterPaths = []string{ "/status", "/targets", "/tsdb-status", + "/starting", } // withStackTrace logs the stack trace in case the request panics. The function diff --git a/web/web_test.go b/web/web_test.go index 793dccf632..399e552663 100644 --- a/web/web_test.go +++ b/web/web_test.go @@ -104,6 +104,10 @@ func (a *dbAdapter) Stats(statsByLabelName string) (*tsdb.Stats, error) { return a.Head().Stats(statsByLabelName), nil } +func (a *dbAdapter) WALReplayStatus() (tsdb.WALReplayStatus, error) { + return tsdb.WALReplayStatus{}, nil +} + func TestReadyAndHealthy(t *testing.T) { t.Parallel() @@ -111,7 +115,7 @@ func TestReadyAndHealthy(t *testing.T) { require.NoError(t, err) defer func() { require.NoError(t, os.RemoveAll(dbDir)) }() - db, err := tsdb.Open(dbDir, nil, nil, nil) + db, err := tsdb.Open(dbDir, nil, nil, nil, nil) require.NoError(t, err) opts := &Options{ @@ -230,7 +234,7 @@ func TestRoutePrefix(t *testing.T) { require.NoError(t, err) defer func() { require.NoError(t, os.RemoveAll(dbDir)) }() - db, err := tsdb.Open(dbDir, nil, nil, nil) + db, err := tsdb.Open(dbDir, nil, nil, nil, nil) require.NoError(t, err) opts := &Options{ @@ -395,7 +399,7 @@ func TestShutdownWithStaleConnection(t *testing.T) { require.NoError(t, err) defer func() { require.NoError(t, os.RemoveAll(dbDir)) }() - db, err := tsdb.Open(dbDir, nil, nil, nil) + db, err := tsdb.Open(dbDir, nil, nil, nil, nil) require.NoError(t, err) timeout := 10 * time.Second