From b362e1e8d8152f5ffb3f203bf0467def56a0c756 Mon Sep 17 00:00:00 2001 From: Paul Rogers <129207811+paul1r@users.noreply.github.com> Date: Wed, 29 Oct 2025 10:45:07 -0400 Subject: [PATCH] fix(ingester): Add WAL cleanup code (#19607) --- pkg/ingester/checkpoint.go | 114 +++++++++- pkg/ingester/checkpoint_test.go | 354 ++++++++++++++++++++++++++++++++ pkg/ingester/recovery.go | 29 +++ 3 files changed, 490 insertions(+), 7 deletions(-) diff --git a/pkg/ingester/checkpoint.go b/pkg/ingester/checkpoint.go index 73b40f0857..d98bc4ae8e 100644 --- a/pkg/ingester/checkpoint.go +++ b/pkg/ingester/checkpoint.go @@ -337,13 +337,8 @@ func (w *WALCheckpointWriter) Advance() (bool, error) { level.Info(util_log.Logger).Log("msg", "attempting checkpoint for", "dir", checkpointDir) checkpointDirTemp := checkpointDir + ".tmp" - // cleanup any old partial checkpoints - if _, err := os.Stat(checkpointDirTemp); err == nil { - if err := os.RemoveAll(checkpointDirTemp); err != nil { - level.Error(util_log.Logger).Log("msg", "unable to cleanup old tmp checkpoint", "dir", checkpointDirTemp) - return false, err - } - } + // cleanup any old partial checkpoints (not just the current one) + cleanupStaleTmpCheckpoints(w.segmentWAL.Dir(), util_log.Logger) if err := os.MkdirAll(checkpointDirTemp, 0750); err != nil { return false, fmt.Errorf("create checkpoint dir: %w", err) @@ -479,6 +474,111 @@ func (w *WALCheckpointWriter) deleteCheckpoints(maxIndex int) (err error) { return errs.Err() } +// cleanupOldCheckpoints removes old checkpoints that have been superseded by a newer checkpoint. +// This is primarily used during startup to clean up old checkpoints when repeated checkpoint failures +// have prevented normal cleanup (which happens via deleteCheckpoints() after successful completion). +// +// A checkpoint at index N contains a complete snapshot of state through segment N, so segments <= N +// can be deleted once that checkpoint is successfully written. This function cleans up old checkpoints +// that are no longer needed because their corresponding segments have been truncated. +// +// The protectedCheckpointIdx parameter specifies a checkpoint that should never be deleted, typically +// the most recent successfully completed checkpoint. This ensures we always keep at least one valid +// recovery point. +// +// Note: It's normal for checkpoint.N to exist while firstSegment is N+1, since the checkpoint replaces +// the need for segment N. This function relies on the protectedCheckpointIdx to determine which +// checkpoints are safe to delete, not just the segment numbers. +// +// This differs from deleteCheckpoints() which runs after successful checkpoint completion and handles +// normal cleanup including .tmp files. +func cleanupOldCheckpoints(dir string, protectedCheckpointIdx int, logger log.Logger) { + level.Info(util_log.Logger).Log("msg", "old checkpoint cleanup starting") + start := time.Now() + allSuccess := true + defer func() { + elapsed := time.Since(start) + level.Info(util_log.Logger).Log("msg", "old checkpoint cleanup done", "duration", elapsed.String(), "success", allSuccess) + }() + firstSegment, _, err := wlog.Segments(dir) + if err != nil { + level.Error(logger).Log("msg", "unable to list WAL segments for old checkpoint cleanup, checkpoint cleanup cannot proceed, this is not expected and could lead to disk space exhaustion and may indicate disk I/O problems or corruption and should be investigated manually", "err", err) + allSuccess = false + return + } + + if firstSegment <= 0 { + // No cleanup needed if we're starting from segment 0 + return + } + + files, err := os.ReadDir(dir) + if err != nil { + level.Error(logger).Log("msg", "unable to read WAL directory for old checkpoint cleanup, checkpoint cleanup cannot proceed, this is not expected and could lead to disk space exhaustion and may indicate disk I/O problems or corruption and should be investigated manually", "err", err) + allSuccess = false + return + } + + for _, fi := range files { + // Check if this is a completed checkpoint (not .tmp) + idx, err := checkpointIndex(fi.Name(), false) + if err != nil || !fi.IsDir() { + continue + } + + // Delete checkpoints that are both: + // 1. From a time when segments <= firstSegment existed (idx < firstSegment is typical for old checkpoints) + // 2. Older than the protected checkpoint (idx < protectedCheckpointIdx means superseded) + // The second condition is the key safety check - we never delete the protected checkpoint. + if idx < firstSegment && idx < protectedCheckpointIdx { + orphanedPath := filepath.Join(dir, fi.Name()) + if err := os.RemoveAll(orphanedPath); err != nil { + level.Error(logger).Log("msg", "unable to cleanup old checkpoint, this is not expected and could lead to disk space exhaustion and may indicate disk I/O problems or corruption and should be investigated manually", "dir", orphanedPath, "err", err) + allSuccess = false + } else { + level.Info(logger).Log("msg", "cleaned up old superseded checkpoint", "dir", fi.Name(), "idx", idx, "firstSegment", firstSegment, "protectedCheckpoint", protectedCheckpointIdx) + } + } + } +} + +// cleanupStaleTmpCheckpoints removes all .tmp checkpoint directories which represent +// incomplete/failed checkpoint operations. These are safe to delete because recovery +// only uses completed checkpoints (those without the .tmp suffix). +func cleanupStaleTmpCheckpoints(dir string, logger log.Logger) { + level.Info(util_log.Logger).Log("msg", "tmp checkpoint cleanup starting") + start := time.Now() + allSuccess := true + defer func() { + elapsed := time.Since(start) + level.Info(util_log.Logger).Log("msg", "tmp checkpoint cleanup done", "duration", elapsed.String(), "success", allSuccess) + }() + + files, err := os.ReadDir(dir) + if err != nil { + level.Error(logger).Log("msg", "unable to read WAL directory for tmp checkpoint cleanup, checkpoint cleanup cannot proceed, this is not expected and could lead to disk space exhaustion and may indicate disk I/O problems or corruption and should be investigated manually", "err", err) + allSuccess = false + return + } + + for _, fi := range files { + // Check if this is a .tmp checkpoint directory + if _, tmpErr := checkpointIndex(fi.Name(), true); tmpErr == nil && fi.IsDir() { + // Only delete if it actually has the .tmp suffix + if filepath.Ext(fi.Name()) == ".tmp" { + tmpPath := filepath.Join(dir, fi.Name()) + if err := os.RemoveAll(tmpPath); err != nil { + level.Error(logger).Log("msg", "unable to cleanup stale tmp checkpoint, this is not expected and could lead to disk space exhaustion and may indicate disk I/O problems or corruption and should be investigated manually", "dir", tmpPath, "err", err) + // Continue cleaning up other .tmp directories even if one fails + allSuccess = false + } else { + level.Info(logger).Log("msg", "cleaned up stale tmp checkpoint at startup", "dir", fi.Name()) + } + } + } + } +} + func (w *WALCheckpointWriter) Close(abort bool) error { if len(w.recs) > 0 { if err := w.flush(); err != nil { diff --git a/pkg/ingester/checkpoint_test.go b/pkg/ingester/checkpoint_test.go index 887b4c223e..76fbfe250c 100644 --- a/pkg/ingester/checkpoint_test.go +++ b/pkg/ingester/checkpoint_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "path/filepath" "sort" "testing" "time" @@ -697,3 +698,356 @@ func TestIngesterWALReplaysUnorderedToOrdered(t *testing.T) { }) } } + +func TestCheckpointCleanupStaleTmpDirectories(t *testing.T) { + walDir := t.TempDir() + ingesterConfig := defaultIngesterTestConfigWithWAL(t, walDir) + + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + + newStore := func() *mockStore { + return &mockStore{ + chunks: map[string][]chunk.Chunk{}, + } + } + + readRingMock := mockReadRingWithOneActiveIngester() + + // Create some fake stale .tmp checkpoint directories + staleTmpDirs := []string{ + "checkpoint.000100.tmp", + "checkpoint.000200.tmp", + "checkpoint.000300.tmp", + } + for _, dir := range staleTmpDirs { + tmpPath := filepath.Join(walDir, dir) + require.NoError(t, os.MkdirAll(tmpPath, 0750)) + } + + // Verify the stale .tmp directories exist + files, err := os.ReadDir(walDir) + require.NoError(t, err) + tmpCount := 0 + for _, f := range files { + if f.IsDir() && filepath.Ext(f.Name()) == ".tmp" { + tmpCount++ + } + } + require.Equal(t, 3, tmpCount, "expected 3 .tmp directories before starting ingester") + + // Start the ingester - this should trigger checkpoint cleanup + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil) + require.NoError(t, err) + require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) + + // Push some data to trigger checkpoint + req := logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: `{foo="bar"}`, + Entries: []logproto.Entry{ + { + Timestamp: time.Now(), + Line: "test line", + }, + }, + }, + }, + } + ctx := user.InjectOrgID(context.Background(), "test") + _, err = i.Push(ctx, &req) + require.NoError(t, err) + + // Wait for a checkpoint to be created + expectCheckpoint(t, walDir, true, ingesterConfig.WAL.CheckpointDuration*10) + + // Stop the ingester to ensure no new checkpoints are being created + require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) + + // Verify all stale .tmp directories have been cleaned up + files, err = os.ReadDir(walDir) + require.NoError(t, err) + tmpCount = 0 + for _, f := range files { + if f.IsDir() && filepath.Ext(f.Name()) == ".tmp" { + tmpCount++ + } + } + require.Equal(t, 0, tmpCount, "expected all .tmp directories to be cleaned up after checkpoint") +} + +func TestCheckpointCleanupOldCheckpoints(t *testing.T) { + walDir := t.TempDir() + ingesterConfig := defaultIngesterTestConfigWithWAL(t, walDir) + + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + + newStore := func() *mockStore { + return &mockStore{ + chunks: map[string][]chunk.Chunk{}, + } + } + + readRingMock := mockReadRingWithOneActiveIngester() + + // Phase 1: Start ingester and create some WAL segments and a checkpoint + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil) + require.NoError(t, err) + require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) + + // Push some data to create WAL segments + req := logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: `{foo="bar"}`, + Entries: []logproto.Entry{ + { + Timestamp: time.Now(), + Line: "test line", + }, + }, + }, + }, + } + ctx := user.InjectOrgID(context.Background(), "test") + _, err = i.Push(ctx, &req) + require.NoError(t, err) + + // Wait for a checkpoint to be created + expectCheckpoint(t, walDir, true, ingesterConfig.WAL.CheckpointDuration*10) + + // Stop the ingester + require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) + + // Phase 2: Manually create a very old checkpoint that should have been deleted + // This simulates the scenario where repeated checkpoint failures prevented normal cleanup. + // In a healthy system, when checkpoint.000002 (or higher) completes, checkpoint.000001 would + // be deleted by deleteCheckpoints(). But if checkpoints kept failing, old checkpoints accumulate. + // The cleanup function should remove these superseded old checkpoints. + oldCheckpointDir := filepath.Join(walDir, "checkpoint.000001") + require.NoError(t, os.MkdirAll(oldCheckpointDir, 0750)) + + // Verify the old checkpoint exists + _, err = os.Stat(oldCheckpointDir) + require.NoError(t, err, "old checkpoint should exist before cleanup") + + // Phase 3: Restart the ingester + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil) + require.NoError(t, err) + require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) + + // Push more data to trigger another checkpoint which will clean up old checkpoints + _, err = i.Push(ctx, &req) + require.NoError(t, err) + + // Wait for another checkpoint to be created (this triggers cleanup via deleteCheckpoints()) + time.Sleep(ingesterConfig.WAL.CheckpointDuration * 2) + + // Stop the ingester + require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) + + // Phase 4: Verify the old checkpoint has been cleaned up + _, err = os.Stat(oldCheckpointDir) + require.True(t, os.IsNotExist(err), "old checkpoint should be deleted after new checkpoint completes") + + // Double-check by listing directory + files, err := os.ReadDir(walDir) + require.NoError(t, err) + for _, f := range files { + require.NotEqual(t, "checkpoint.000001", f.Name(), "old checkpoint should not exist in directory listing") + } +} + +func TestCheckpointCleanupOldCheckpointsAtStartup(t *testing.T) { + walDir := t.TempDir() + ingesterConfig := defaultIngesterTestConfigWithWAL(t, walDir) + + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + + newStore := func() *mockStore { + return &mockStore{ + chunks: map[string][]chunk.Chunk{}, + } + } + + readRingMock := mockReadRingWithOneActiveIngester() + + // Phase 1: Start ingester and create some WAL segments and a checkpoint + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil) + require.NoError(t, err) + require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) + + // Push some data to create WAL segments + req := logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: `{foo="bar"}`, + Entries: []logproto.Entry{ + { + Timestamp: time.Now(), + Line: "test line", + }, + }, + }, + }, + } + ctx := user.InjectOrgID(context.Background(), "test") + _, err = i.Push(ctx, &req) + require.NoError(t, err) + + // Wait for a checkpoint to be created + expectCheckpoint(t, walDir, true, ingesterConfig.WAL.CheckpointDuration*10) + + // Push more data and wait for another checkpoint to ensure we have multiple checkpoints + // and segments have been truncated + _, err = i.Push(ctx, &req) + require.NoError(t, err) + time.Sleep(ingesterConfig.WAL.CheckpointDuration * 2) + + // Stop the ingester + require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) + + // Phase 2: Manually create a very old checkpoint that should have been deleted + // This simulates the scenario where repeated checkpoint failures prevented normal cleanup. + // In a healthy system, when newer checkpoints complete, checkpoint.000000 would be deleted + // by deleteCheckpoints(). But if checkpoints kept failing, old checkpoints accumulate. + // This test verifies that startup cleanup removes these superseded old checkpoints. + oldCheckpointDir := filepath.Join(walDir, "checkpoint.000000") + require.NoError(t, os.MkdirAll(oldCheckpointDir, 0750)) + + // Verify the old checkpoint exists + _, err = os.Stat(oldCheckpointDir) + require.NoError(t, err, "old checkpoint should exist before startup") + + // Count checkpoints before restart + files, err := os.ReadDir(walDir) + require.NoError(t, err) + checkpointsBefore := 0 + for _, f := range files { + if _, cpErr := checkpointIndex(f.Name(), false); cpErr == nil && f.IsDir() { + checkpointsBefore++ + } + } + require.GreaterOrEqual(t, checkpointsBefore, 2, "should have at least 2 checkpoints before restart") + + // Phase 3: Restart the ingester WITHOUT pushing new data + // This tests that startup cleanup works independently of new checkpoints + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil) + require.NoError(t, err) + require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) + + // Phase 4: Verify the old checkpoint was cleaned up AT STARTUP + // (without needing to create a new checkpoint) + _, err = os.Stat(oldCheckpointDir) + require.True(t, os.IsNotExist(err), "old checkpoint should be deleted at startup") + + // Verify it's actually gone from directory listing + files, err = os.ReadDir(walDir) + require.NoError(t, err) + for _, f := range files { + require.NotEqual(t, "checkpoint.000000", f.Name(), "old checkpoint should not exist after startup") + } + + // Verify we still have at least one checkpoint (the valid one wasn't deleted) + checkpointsAfter := 0 + for _, f := range files { + if _, cpErr := checkpointIndex(f.Name(), false); cpErr == nil && f.IsDir() { + checkpointsAfter++ + } + } + require.GreaterOrEqual(t, checkpointsAfter, 1, "should still have at least one valid checkpoint") + require.Less(t, checkpointsAfter, checkpointsBefore, "should have fewer checkpoints after startup cleanup") + + // Stop the ingester + require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) +} + +func TestCheckpointCleanupStaleTmpDirectoriesAtStartup(t *testing.T) { + walDir := t.TempDir() + ingesterConfig := defaultIngesterTestConfigWithWAL(t, walDir) + + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + + newStore := func() *mockStore { + return &mockStore{ + chunks: map[string][]chunk.Chunk{}, + } + } + + readRingMock := mockReadRingWithOneActiveIngester() + + // Phase 1: Start ingester and create some WAL data, then stop + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil) + require.NoError(t, err) + require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) + + // Push some data to create WAL segments + req := logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: `{foo="bar"}`, + Entries: []logproto.Entry{ + { + Timestamp: time.Now(), + Line: "test line", + }, + }, + }, + }, + } + ctx := user.InjectOrgID(context.Background(), "test") + _, err = i.Push(ctx, &req) + require.NoError(t, err) + + // Wait for a checkpoint to be created + expectCheckpoint(t, walDir, true, ingesterConfig.WAL.CheckpointDuration*10) + + // Stop the ingester + require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) + + // Phase 2: Manually create stale .tmp checkpoint directories to simulate crashed checkpoints + staleTmpDirs := []string{ + "checkpoint.000100.tmp", + "checkpoint.000200.tmp", + "checkpoint.000300.tmp", + } + for _, dir := range staleTmpDirs { + tmpPath := filepath.Join(walDir, dir) + require.NoError(t, os.MkdirAll(tmpPath, 0750)) + } + + // Verify the stale .tmp directories exist before restart + files, err := os.ReadDir(walDir) + require.NoError(t, err) + tmpCountBefore := 0 + for _, f := range files { + if f.IsDir() && filepath.Ext(f.Name()) == ".tmp" { + tmpCountBefore++ + } + } + require.Equal(t, 3, tmpCountBefore, "expected 3 .tmp directories before restarting ingester") + + // Phase 3: Restart the ingester - this should trigger IMMEDIATE cleanup at startup + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil) + require.NoError(t, err) + require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) + + // Phase 4: Verify all stale .tmp directories have been cleaned up IMMEDIATELY + // (without waiting for a new checkpoint to be created) + files, err = os.ReadDir(walDir) + require.NoError(t, err) + tmpCountAfter := 0 + for _, f := range files { + if f.IsDir() && filepath.Ext(f.Name()) == ".tmp" { + tmpCountAfter++ + } + } + require.Equal(t, 0, tmpCountAfter, "expected all .tmp directories to be cleaned up immediately at startup") + + // Stop the ingester + require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) +} diff --git a/pkg/ingester/recovery.go b/pkg/ingester/recovery.go index b170a94bf9..31b74c0980 100644 --- a/pkg/ingester/recovery.go +++ b/pkg/ingester/recovery.go @@ -33,6 +33,9 @@ func (NoopWALReader) Record() []byte { return nil } func (NoopWALReader) Close() error { return nil } func newCheckpointReader(dir string, logger log.Logger) (WALReader, io.Closer, error) { + // Proactively clean up stale checkpoints at startup before recovery + cleanupCheckpointsAtStartup(dir, logger) + lastCheckpointDir, idx, err := lastCheckpoint(dir) if err != nil { return nil, nil, err @@ -50,6 +53,32 @@ func newCheckpointReader(dir string, logger log.Logger) (WALReader, io.Closer, e return wlog.NewReader(r), r, nil } +// cleanupCheckpointsAtStartup performs cleanup of stale checkpoint data at startup. +// This includes removing incomplete .tmp checkpoint directories from failed attempts, and +// removing old completed checkpoints that have been superseded. The most recent valid +// checkpoint is always protected to ensure a recovery point exists. +func cleanupCheckpointsAtStartup(dir string, logger log.Logger) { + // First, clean up any stale .tmp checkpoint directories from failed checkpoint attempts. + // These are always safe to delete at startup since they represent incomplete operations. + cleanupStaleTmpCheckpoints(dir, logger) + + // Find the most recent valid checkpoint to protect it from deletion + _, latestCheckpointIdx, err := lastCheckpoint(dir) + if err != nil { + level.Error(logger).Log("msg", "unable to find latest checkpoint for startup checkpoint cleanu, this is not expected and could lead to disk space exhaustion and may indicate disk I/O problems or corruption and should be investigated manually", "err", err) + return + } + + if latestCheckpointIdx < 0 { + // No checkpoints exist, nothing to clean up + return + } + + // Delegate to the shared cleanup function, protecting the latest checkpoint. + // This will remove any old checkpoints that are superseded by the latest one. + cleanupOldCheckpoints(dir, latestCheckpointIdx, logger) +} + type Recoverer interface { NumWorkers() int Series(series *Series) error