|
|
|
@ -318,21 +318,37 @@ func (i *Ingester) setupAutoForget() { |
|
|
|
|
|
|
|
|
|
func (i *Ingester) starting(ctx context.Context) error { |
|
|
|
|
if i.cfg.WAL.Enabled { |
|
|
|
|
start := time.Now() |
|
|
|
|
|
|
|
|
|
// Ignore retain period during wal replay.
|
|
|
|
|
old := i.cfg.RetainPeriod |
|
|
|
|
oldRetain := i.cfg.RetainPeriod |
|
|
|
|
i.cfg.RetainPeriod = 0 |
|
|
|
|
defer func() { |
|
|
|
|
i.cfg.RetainPeriod = old |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
// Disable the in process stream limit checks while replaying the WAL.
|
|
|
|
|
// It is re-enabled in the recover's Close() method.
|
|
|
|
|
i.limiter.DisableForWALReplay() |
|
|
|
|
|
|
|
|
|
recoverer := newIngesterRecoverer(i) |
|
|
|
|
defer recoverer.Close() |
|
|
|
|
|
|
|
|
|
start := time.Now() |
|
|
|
|
i.metrics.walReplayActive.Set(1) |
|
|
|
|
|
|
|
|
|
endReplay := func() func() { |
|
|
|
|
var once sync.Once |
|
|
|
|
return func() { |
|
|
|
|
once.Do(func() { |
|
|
|
|
level.Info(util_log.Logger).Log("msg", "closing recoverer") |
|
|
|
|
recoverer.Close() |
|
|
|
|
|
|
|
|
|
elapsed := time.Since(start) |
|
|
|
|
|
|
|
|
|
i.metrics.walReplayActive.Set(0) |
|
|
|
|
i.metrics.walReplayDuration.Set(elapsed.Seconds()) |
|
|
|
|
i.cfg.RetainPeriod = oldRetain |
|
|
|
|
level.Info(util_log.Logger).Log("msg", "WAL recovery finished", "time", elapsed.String()) |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
defer endReplay() |
|
|
|
|
|
|
|
|
|
level.Info(util_log.Logger).Log("msg", "recovering from checkpoint") |
|
|
|
|
checkpointReader, checkpointCloser, err := newCheckpointReader(i.cfg.WAL.Dir) |
|
|
|
@ -378,11 +394,7 @@ func (i *Ingester) starting(ctx context.Context) error { |
|
|
|
|
"errors", segmentRecoveryErr != nil, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
level.Info(util_log.Logger).Log("msg", "closing recoverer") |
|
|
|
|
recoverer.Close() |
|
|
|
|
elapsed := time.Since(start) |
|
|
|
|
i.metrics.walReplayDuration.Set(elapsed.Seconds()) |
|
|
|
|
level.Info(util_log.Logger).Log("msg", "recovery finished", "time", elapsed.String()) |
|
|
|
|
endReplay() |
|
|
|
|
|
|
|
|
|
i.wal.Start() |
|
|
|
|
} |
|
|
|
|