fix(ingester): Add WAL cleanup code (#19607)

pull/19572/head^2
Paul Rogers 7 months ago committed by GitHub
parent 2bbdfa54e7
commit b362e1e8d8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 114
      pkg/ingester/checkpoint.go
  2. 354
      pkg/ingester/checkpoint_test.go
  3. 29
      pkg/ingester/recovery.go

@ -337,13 +337,8 @@ func (w *WALCheckpointWriter) Advance() (bool, error) {
level.Info(util_log.Logger).Log("msg", "attempting checkpoint for", "dir", checkpointDir)
checkpointDirTemp := checkpointDir + ".tmp"
// cleanup any old partial checkpoints
if _, err := os.Stat(checkpointDirTemp); err == nil {
if err := os.RemoveAll(checkpointDirTemp); err != nil {
level.Error(util_log.Logger).Log("msg", "unable to cleanup old tmp checkpoint", "dir", checkpointDirTemp)
return false, err
}
}
// cleanup any old partial checkpoints (not just the current one)
cleanupStaleTmpCheckpoints(w.segmentWAL.Dir(), util_log.Logger)
if err := os.MkdirAll(checkpointDirTemp, 0750); err != nil {
return false, fmt.Errorf("create checkpoint dir: %w", err)
@ -479,6 +474,111 @@ func (w *WALCheckpointWriter) deleteCheckpoints(maxIndex int) (err error) {
return errs.Err()
}
// cleanupOldCheckpoints removes old checkpoints that have been superseded by a newer checkpoint.
// This is primarily used during startup to clean up old checkpoints when repeated checkpoint failures
// have prevented normal cleanup (which happens via deleteCheckpoints() after successful completion).
//
// A checkpoint at index N contains a complete snapshot of state through segment N, so segments <= N
// can be deleted once that checkpoint is successfully written. This function cleans up old checkpoints
// that are no longer needed because their corresponding segments have been truncated.
//
// The protectedCheckpointIdx parameter specifies a checkpoint that should never be deleted, typically
// the most recent successfully completed checkpoint. This ensures we always keep at least one valid
// recovery point.
//
// Note: It's normal for checkpoint.N to exist while firstSegment is N+1, since the checkpoint replaces
// the need for segment N. This function relies on the protectedCheckpointIdx to determine which
// checkpoints are safe to delete, not just the segment numbers.
//
// This differs from deleteCheckpoints() which runs after successful checkpoint completion and handles
// normal cleanup including .tmp files.
func cleanupOldCheckpoints(dir string, protectedCheckpointIdx int, logger log.Logger) {
level.Info(util_log.Logger).Log("msg", "old checkpoint cleanup starting")
start := time.Now()
allSuccess := true
defer func() {
elapsed := time.Since(start)
level.Info(util_log.Logger).Log("msg", "old checkpoint cleanup done", "duration", elapsed.String(), "success", allSuccess)
}()
firstSegment, _, err := wlog.Segments(dir)
if err != nil {
level.Error(logger).Log("msg", "unable to list WAL segments for old checkpoint cleanup, checkpoint cleanup cannot proceed, this is not expected and could lead to disk space exhaustion and may indicate disk I/O problems or corruption and should be investigated manually", "err", err)
allSuccess = false
return
}
if firstSegment <= 0 {
// No cleanup needed if we're starting from segment 0
return
}
files, err := os.ReadDir(dir)
if err != nil {
level.Error(logger).Log("msg", "unable to read WAL directory for old checkpoint cleanup, checkpoint cleanup cannot proceed, this is not expected and could lead to disk space exhaustion and may indicate disk I/O problems or corruption and should be investigated manually", "err", err)
allSuccess = false
return
}
for _, fi := range files {
// Check if this is a completed checkpoint (not .tmp)
idx, err := checkpointIndex(fi.Name(), false)
if err != nil || !fi.IsDir() {
continue
}
// Delete checkpoints that are both:
// 1. From a time when segments <= firstSegment existed (idx < firstSegment is typical for old checkpoints)
// 2. Older than the protected checkpoint (idx < protectedCheckpointIdx means superseded)
// The second condition is the key safety check - we never delete the protected checkpoint.
if idx < firstSegment && idx < protectedCheckpointIdx {
orphanedPath := filepath.Join(dir, fi.Name())
if err := os.RemoveAll(orphanedPath); err != nil {
level.Error(logger).Log("msg", "unable to cleanup old checkpoint, this is not expected and could lead to disk space exhaustion and may indicate disk I/O problems or corruption and should be investigated manually", "dir", orphanedPath, "err", err)
allSuccess = false
} else {
level.Info(logger).Log("msg", "cleaned up old superseded checkpoint", "dir", fi.Name(), "idx", idx, "firstSegment", firstSegment, "protectedCheckpoint", protectedCheckpointIdx)
}
}
}
}
// cleanupStaleTmpCheckpoints removes all .tmp checkpoint directories which represent
// incomplete/failed checkpoint operations. These are safe to delete because recovery
// only uses completed checkpoints (those without the .tmp suffix).
func cleanupStaleTmpCheckpoints(dir string, logger log.Logger) {
level.Info(util_log.Logger).Log("msg", "tmp checkpoint cleanup starting")
start := time.Now()
allSuccess := true
defer func() {
elapsed := time.Since(start)
level.Info(util_log.Logger).Log("msg", "tmp checkpoint cleanup done", "duration", elapsed.String(), "success", allSuccess)
}()
files, err := os.ReadDir(dir)
if err != nil {
level.Error(logger).Log("msg", "unable to read WAL directory for tmp checkpoint cleanup, checkpoint cleanup cannot proceed, this is not expected and could lead to disk space exhaustion and may indicate disk I/O problems or corruption and should be investigated manually", "err", err)
allSuccess = false
return
}
for _, fi := range files {
// Check if this is a .tmp checkpoint directory
if _, tmpErr := checkpointIndex(fi.Name(), true); tmpErr == nil && fi.IsDir() {
// Only delete if it actually has the .tmp suffix
if filepath.Ext(fi.Name()) == ".tmp" {
tmpPath := filepath.Join(dir, fi.Name())
if err := os.RemoveAll(tmpPath); err != nil {
level.Error(logger).Log("msg", "unable to cleanup stale tmp checkpoint, this is not expected and could lead to disk space exhaustion and may indicate disk I/O problems or corruption and should be investigated manually", "dir", tmpPath, "err", err)
// Continue cleaning up other .tmp directories even if one fails
allSuccess = false
} else {
level.Info(logger).Log("msg", "cleaned up stale tmp checkpoint at startup", "dir", fi.Name())
}
}
}
}
}
func (w *WALCheckpointWriter) Close(abort bool) error {
if len(w.recs) > 0 {
if err := w.flush(); err != nil {

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"path/filepath"
"sort"
"testing"
"time"
@ -697,3 +698,356 @@ func TestIngesterWALReplaysUnorderedToOrdered(t *testing.T) {
})
}
}
func TestCheckpointCleanupStaleTmpDirectories(t *testing.T) {
walDir := t.TempDir()
ingesterConfig := defaultIngesterTestConfigWithWAL(t, walDir)
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
newStore := func() *mockStore {
return &mockStore{
chunks: map[string][]chunk.Chunk{},
}
}
readRingMock := mockReadRingWithOneActiveIngester()
// Create some fake stale .tmp checkpoint directories
staleTmpDirs := []string{
"checkpoint.000100.tmp",
"checkpoint.000200.tmp",
"checkpoint.000300.tmp",
}
for _, dir := range staleTmpDirs {
tmpPath := filepath.Join(walDir, dir)
require.NoError(t, os.MkdirAll(tmpPath, 0750))
}
// Verify the stale .tmp directories exist
files, err := os.ReadDir(walDir)
require.NoError(t, err)
tmpCount := 0
for _, f := range files {
if f.IsDir() && filepath.Ext(f.Name()) == ".tmp" {
tmpCount++
}
}
require.Equal(t, 3, tmpCount, "expected 3 .tmp directories before starting ingester")
// Start the ingester - this should trigger checkpoint cleanup
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil)
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
// Push some data to trigger checkpoint
req := logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: `{foo="bar"}`,
Entries: []logproto.Entry{
{
Timestamp: time.Now(),
Line: "test line",
},
},
},
},
}
ctx := user.InjectOrgID(context.Background(), "test")
_, err = i.Push(ctx, &req)
require.NoError(t, err)
// Wait for a checkpoint to be created
expectCheckpoint(t, walDir, true, ingesterConfig.WAL.CheckpointDuration*10)
// Stop the ingester to ensure no new checkpoints are being created
require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))
// Verify all stale .tmp directories have been cleaned up
files, err = os.ReadDir(walDir)
require.NoError(t, err)
tmpCount = 0
for _, f := range files {
if f.IsDir() && filepath.Ext(f.Name()) == ".tmp" {
tmpCount++
}
}
require.Equal(t, 0, tmpCount, "expected all .tmp directories to be cleaned up after checkpoint")
}
func TestCheckpointCleanupOldCheckpoints(t *testing.T) {
walDir := t.TempDir()
ingesterConfig := defaultIngesterTestConfigWithWAL(t, walDir)
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
newStore := func() *mockStore {
return &mockStore{
chunks: map[string][]chunk.Chunk{},
}
}
readRingMock := mockReadRingWithOneActiveIngester()
// Phase 1: Start ingester and create some WAL segments and a checkpoint
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil)
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
// Push some data to create WAL segments
req := logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: `{foo="bar"}`,
Entries: []logproto.Entry{
{
Timestamp: time.Now(),
Line: "test line",
},
},
},
},
}
ctx := user.InjectOrgID(context.Background(), "test")
_, err = i.Push(ctx, &req)
require.NoError(t, err)
// Wait for a checkpoint to be created
expectCheckpoint(t, walDir, true, ingesterConfig.WAL.CheckpointDuration*10)
// Stop the ingester
require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))
// Phase 2: Manually create a very old checkpoint that should have been deleted
// This simulates the scenario where repeated checkpoint failures prevented normal cleanup.
// In a healthy system, when checkpoint.000002 (or higher) completes, checkpoint.000001 would
// be deleted by deleteCheckpoints(). But if checkpoints kept failing, old checkpoints accumulate.
// The cleanup function should remove these superseded old checkpoints.
oldCheckpointDir := filepath.Join(walDir, "checkpoint.000001")
require.NoError(t, os.MkdirAll(oldCheckpointDir, 0750))
// Verify the old checkpoint exists
_, err = os.Stat(oldCheckpointDir)
require.NoError(t, err, "old checkpoint should exist before cleanup")
// Phase 3: Restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil)
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
// Push more data to trigger another checkpoint which will clean up old checkpoints
_, err = i.Push(ctx, &req)
require.NoError(t, err)
// Wait for another checkpoint to be created (this triggers cleanup via deleteCheckpoints())
time.Sleep(ingesterConfig.WAL.CheckpointDuration * 2)
// Stop the ingester
require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))
// Phase 4: Verify the old checkpoint has been cleaned up
_, err = os.Stat(oldCheckpointDir)
require.True(t, os.IsNotExist(err), "old checkpoint should be deleted after new checkpoint completes")
// Double-check by listing directory
files, err := os.ReadDir(walDir)
require.NoError(t, err)
for _, f := range files {
require.NotEqual(t, "checkpoint.000001", f.Name(), "old checkpoint should not exist in directory listing")
}
}
func TestCheckpointCleanupOldCheckpointsAtStartup(t *testing.T) {
walDir := t.TempDir()
ingesterConfig := defaultIngesterTestConfigWithWAL(t, walDir)
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
newStore := func() *mockStore {
return &mockStore{
chunks: map[string][]chunk.Chunk{},
}
}
readRingMock := mockReadRingWithOneActiveIngester()
// Phase 1: Start ingester and create some WAL segments and a checkpoint
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil)
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
// Push some data to create WAL segments
req := logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: `{foo="bar"}`,
Entries: []logproto.Entry{
{
Timestamp: time.Now(),
Line: "test line",
},
},
},
},
}
ctx := user.InjectOrgID(context.Background(), "test")
_, err = i.Push(ctx, &req)
require.NoError(t, err)
// Wait for a checkpoint to be created
expectCheckpoint(t, walDir, true, ingesterConfig.WAL.CheckpointDuration*10)
// Push more data and wait for another checkpoint to ensure we have multiple checkpoints
// and segments have been truncated
_, err = i.Push(ctx, &req)
require.NoError(t, err)
time.Sleep(ingesterConfig.WAL.CheckpointDuration * 2)
// Stop the ingester
require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))
// Phase 2: Manually create a very old checkpoint that should have been deleted
// This simulates the scenario where repeated checkpoint failures prevented normal cleanup.
// In a healthy system, when newer checkpoints complete, checkpoint.000000 would be deleted
// by deleteCheckpoints(). But if checkpoints kept failing, old checkpoints accumulate.
// This test verifies that startup cleanup removes these superseded old checkpoints.
oldCheckpointDir := filepath.Join(walDir, "checkpoint.000000")
require.NoError(t, os.MkdirAll(oldCheckpointDir, 0750))
// Verify the old checkpoint exists
_, err = os.Stat(oldCheckpointDir)
require.NoError(t, err, "old checkpoint should exist before startup")
// Count checkpoints before restart
files, err := os.ReadDir(walDir)
require.NoError(t, err)
checkpointsBefore := 0
for _, f := range files {
if _, cpErr := checkpointIndex(f.Name(), false); cpErr == nil && f.IsDir() {
checkpointsBefore++
}
}
require.GreaterOrEqual(t, checkpointsBefore, 2, "should have at least 2 checkpoints before restart")
// Phase 3: Restart the ingester WITHOUT pushing new data
// This tests that startup cleanup works independently of new checkpoints
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil)
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
// Phase 4: Verify the old checkpoint was cleaned up AT STARTUP
// (without needing to create a new checkpoint)
_, err = os.Stat(oldCheckpointDir)
require.True(t, os.IsNotExist(err), "old checkpoint should be deleted at startup")
// Verify it's actually gone from directory listing
files, err = os.ReadDir(walDir)
require.NoError(t, err)
for _, f := range files {
require.NotEqual(t, "checkpoint.000000", f.Name(), "old checkpoint should not exist after startup")
}
// Verify we still have at least one checkpoint (the valid one wasn't deleted)
checkpointsAfter := 0
for _, f := range files {
if _, cpErr := checkpointIndex(f.Name(), false); cpErr == nil && f.IsDir() {
checkpointsAfter++
}
}
require.GreaterOrEqual(t, checkpointsAfter, 1, "should still have at least one valid checkpoint")
require.Less(t, checkpointsAfter, checkpointsBefore, "should have fewer checkpoints after startup cleanup")
// Stop the ingester
require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))
}
func TestCheckpointCleanupStaleTmpDirectoriesAtStartup(t *testing.T) {
walDir := t.TempDir()
ingesterConfig := defaultIngesterTestConfigWithWAL(t, walDir)
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
newStore := func() *mockStore {
return &mockStore{
chunks: map[string][]chunk.Chunk{},
}
}
readRingMock := mockReadRingWithOneActiveIngester()
// Phase 1: Start ingester and create some WAL data, then stop
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil)
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
// Push some data to create WAL segments
req := logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: `{foo="bar"}`,
Entries: []logproto.Entry{
{
Timestamp: time.Now(),
Line: "test line",
},
},
},
},
}
ctx := user.InjectOrgID(context.Background(), "test")
_, err = i.Push(ctx, &req)
require.NoError(t, err)
// Wait for a checkpoint to be created
expectCheckpoint(t, walDir, true, ingesterConfig.WAL.CheckpointDuration*10)
// Stop the ingester
require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))
// Phase 2: Manually create stale .tmp checkpoint directories to simulate crashed checkpoints
staleTmpDirs := []string{
"checkpoint.000100.tmp",
"checkpoint.000200.tmp",
"checkpoint.000300.tmp",
}
for _, dir := range staleTmpDirs {
tmpPath := filepath.Join(walDir, dir)
require.NoError(t, os.MkdirAll(tmpPath, 0750))
}
// Verify the stale .tmp directories exist before restart
files, err := os.ReadDir(walDir)
require.NoError(t, err)
tmpCountBefore := 0
for _, f := range files {
if f.IsDir() && filepath.Ext(f.Name()) == ".tmp" {
tmpCountBefore++
}
}
require.Equal(t, 3, tmpCountBefore, "expected 3 .tmp directories before restarting ingester")
// Phase 3: Restart the ingester - this should trigger IMMEDIATE cleanup at startup
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil)
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
// Phase 4: Verify all stale .tmp directories have been cleaned up IMMEDIATELY
// (without waiting for a new checkpoint to be created)
files, err = os.ReadDir(walDir)
require.NoError(t, err)
tmpCountAfter := 0
for _, f := range files {
if f.IsDir() && filepath.Ext(f.Name()) == ".tmp" {
tmpCountAfter++
}
}
require.Equal(t, 0, tmpCountAfter, "expected all .tmp directories to be cleaned up immediately at startup")
// Stop the ingester
require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))
}

@ -33,6 +33,9 @@ func (NoopWALReader) Record() []byte { return nil }
func (NoopWALReader) Close() error { return nil }
func newCheckpointReader(dir string, logger log.Logger) (WALReader, io.Closer, error) {
// Proactively clean up stale checkpoints at startup before recovery
cleanupCheckpointsAtStartup(dir, logger)
lastCheckpointDir, idx, err := lastCheckpoint(dir)
if err != nil {
return nil, nil, err
@ -50,6 +53,32 @@ func newCheckpointReader(dir string, logger log.Logger) (WALReader, io.Closer, e
return wlog.NewReader(r), r, nil
}
// cleanupCheckpointsAtStartup performs cleanup of stale checkpoint data at startup.
// This includes removing incomplete .tmp checkpoint directories from failed attempts, and
// removing old completed checkpoints that have been superseded. The most recent valid
// checkpoint is always protected to ensure a recovery point exists.
func cleanupCheckpointsAtStartup(dir string, logger log.Logger) {
// First, clean up any stale .tmp checkpoint directories from failed checkpoint attempts.
// These are always safe to delete at startup since they represent incomplete operations.
cleanupStaleTmpCheckpoints(dir, logger)
// Find the most recent valid checkpoint to protect it from deletion
_, latestCheckpointIdx, err := lastCheckpoint(dir)
if err != nil {
level.Error(logger).Log("msg", "unable to find latest checkpoint for startup checkpoint cleanu, this is not expected and could lead to disk space exhaustion and may indicate disk I/O problems or corruption and should be investigated manually", "err", err)
return
}
if latestCheckpointIdx < 0 {
// No checkpoints exist, nothing to clean up
return
}
// Delegate to the shared cleanup function, protecting the latest checkpoint.
// This will remove any old checkpoints that are superseded by the latest one.
cleanupOldCheckpoints(dir, latestCheckpointIdx, logger)
}
type Recoverer interface {
NumWorkers() int
Series(series *Series) error

Loading…
Cancel
Save