diff --git a/storage/remote/storage.go b/storage/remote/storage.go index 3b71cd11ca..3c9d834fd1 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -60,7 +60,7 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal } shardUpdateDuration := 10 * time.Second s := &Storage{ - logger: logging.RateLimit(logging.Dedupe(l, 1*time.Minute), 1), + logger: logging.Dedupe(l, 1*time.Minute), localStartTimeCallback: stCallback, flushDeadline: flushDeadline, walDir: walDir, diff --git a/storage/remote/wal_watcher.go b/storage/remote/wal_watcher.go index b8830b82ae..01bd6e4514 100644 --- a/storage/remote/wal_watcher.go +++ b/storage/remote/wal_watcher.go @@ -19,6 +19,7 @@ import ( "math" "os" "path" + "sort" "strconv" "strings" "time" @@ -182,7 +183,7 @@ func NewWALWatcher(logger log.Logger, name string, writer writeTo, walDir string func (w *WALWatcher) Start() { level.Info(w.logger).Log("msg", "starting WAL watcher", "queue", w.name) - go w.runWatcher() + go w.loop() } func (w *WALWatcher) Stop() { @@ -190,71 +191,95 @@ func (w *WALWatcher) Stop() { close(w.quit) } -func (w *WALWatcher) runWatcher() { - // The WAL dir may not exist when Prometheus first starts up. +func (w *WALWatcher) loop() { + // We may encourter failures processing the WAL; we should wait and retry. + for { - if _, err := os.Stat(w.walDir); os.IsNotExist(err) { - time.Sleep(time.Second) - } else { - break + if err := w.run(); err != nil { + level.Error(w.logger).Log("msg", "error tailing WAL", "err", err) } - } - nw, err := wal.New(nil, nil, w.walDir) - if err != nil { - level.Error(w.logger).Log("err", err) - return + select { + case <-w.quit: + return + case <-time.After(5 * time.Second): + } } +} - first, last, err := nw.Segments() +func (w *WALWatcher) run() error { + nw, err := wal.New(nil, nil, w.walDir) if err != nil { - level.Error(w.logger).Log("err", err) - return - } - - if last == -1 { - level.Error(w.logger).Log("err", err) - return + return errors.Wrap(err, "wal.New") } // Backfill from the checkpoint first if it exists. - dir, _, err := tsdb.LastCheckpoint(w.walDir) + var nextIndex int + w.lastCheckpoint, nextIndex, err = tsdb.LastCheckpoint(w.walDir) if err != nil && err != tsdb.ErrNotFound { - level.Error(w.logger).Log("msg", "error looking for existing checkpoint, some samples may be dropped", "err", errors.Wrap(err, "find last checkpoint")) + return err } - level.Debug(w.logger).Log("msg", "reading checkpoint", "dir", dir) + level.Debug(w.logger).Log("msg", "reading checkpoint", "dir", w.lastCheckpoint, "startFrom", nextIndex) if err == nil { - w.lastCheckpoint = dir - err = w.readCheckpoint(dir) - if err != nil { - level.Error(w.logger).Log("msg", "error reading existing checkpoint, some samples may be dropped", "err", err) + if err = w.readCheckpoint(w.lastCheckpoint); err != nil { + return err } } - w.currentSegment = first - tail := false + w.currentSegment, err = w.findSegmentForIndex(nextIndex) + if err != nil { + return err + } - for { - if w.currentSegment == last { - tail = true - } + level.Debug(w.logger).Log("msg", "starting from", "currentSegment", w.currentSegment) + for { w.currentSegmentMetric.Set(float64(w.currentSegment)) - level.Info(w.logger).Log("msg", "process segment", "segment", w.currentSegment, "tail", tail) + level.Info(w.logger).Log("msg", "process segment", "segment", w.currentSegment) // On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment. // On subsequent calls to this function, currentSegment will have been incremented and we should open that segment. - if err := w.watch(nw, w.currentSegment, tail); err != nil { + if err := w.watch(nw, w.currentSegment, true); err != nil { level.Error(w.logger).Log("msg", "runWatcher is ending", "err", err) - return + return err } w.currentSegment++ } } -// Use tail true to indicate that the reader is currently on a segment that is +func (w *WALWatcher) findSegmentForIndex(index int) (int, error) { + files, err := fileutil.ReadDir(w.walDir) + if err != nil { + return -1, err + } + + var refs []int + var last int + for _, fn := range files { + k, err := strconv.Atoi(fn) + if err != nil { + continue + } + if len(refs) > 0 && k > last+1 { + return -1, errors.New("segments are not sequential") + } + refs = append(refs, k) + last = k + } + sort.Sort(sort.IntSlice(refs)) + + for _, r := range refs { + if r >= index { + return r, nil + } + } + + return -1, errors.New("failed to find segment for index") +} + +// Use tail true to indicate thatreader is currently on a segment that is // actively being written to. If false, assume it's a full segment and we're // replaying it on start to cache the series records. func (w *WALWatcher) watch(wl *wal.WAL, segmentNum int, tail bool) error { @@ -444,11 +469,15 @@ func (w *WALWatcher) readCheckpoint(checkpointDir string) error { // w.readSeriesRecords(wal.NewLiveReader(sr), i, size) r := wal.NewLiveReader(sr) - w.readSegment(r) + if err := w.readSegment(r); err != nil { + return errors.Wrap(err, "readSegment") + } + if r.TotalRead() != size { level.Warn(w.logger).Log("msg", "may not have read all data from checkpoint") } level.Debug(w.logger).Log("msg", "read series references from checkpoint", "checkpoint", checkpointDir) + return nil }