diff --git a/pkg/storage/stores/tsdb/head.go b/pkg/storage/stores/tsdb/head.go index f2101028d7..27813fa5a4 100644 --- a/pkg/storage/stores/tsdb/head.go +++ b/pkg/storage/stores/tsdb/head.go @@ -62,11 +62,12 @@ guaranteeing we maintain querying consistency for the entire data lifecycle. // TODO(owen-d) type Metrics struct { - seriesNotFound prometheus.Counter - headRotations *prometheus.CounterVec - walTruncations *prometheus.CounterVec - tsdbBuilds *prometheus.CounterVec - tsdbBuildLastSuccess prometheus.Gauge + seriesNotFound prometheus.Counter + headRotations *prometheus.CounterVec + walTruncations *prometheus.CounterVec + tsdbBuilds *prometheus.CounterVec + tsdbBuildLastSuccess prometheus.Gauge + walCorruptionsRepairs *prometheus.CounterVec } func NewMetrics(r prometheus.Registerer) *Metrics { @@ -96,6 +97,11 @@ func NewMetrics(r prometheus.Registerer) *Metrics { Name: "build_index_last_successful_timestamp_seconds", Help: "Unix timestamp of the last successful tsdb index build", }), + walCorruptionsRepairs: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: "loki_tsdb", + Name: "wal_corruptions_repairs_total", + Help: "Total number of WAL corruptions repairs partitioned by status", + }, []string{statusLabel}), } } diff --git a/pkg/storage/stores/tsdb/head_manager.go b/pkg/storage/stores/tsdb/head_manager.go index c1932c27fd..63a97d7d6b 100644 --- a/pkg/storage/stores/tsdb/head_manager.go +++ b/pkg/storage/stores/tsdb/head_manager.go @@ -15,10 +15,12 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/record" + "github.com/prometheus/prometheus/tsdb/wlog" "go.uber.org/atomic" "github.com/grafana/loki/pkg/storage/chunk" @@ -533,15 +535,15 @@ func legacyWalPath(parent string, t time.Time) string { // recoverHead recovers from all WALs belonging to some period // and inserts it into the active *tenantHeads -func recoverHead(name, dir string, heads *tenantHeads, wals []WALIdentifier, legacy bool) error { +func recoverHead(name, dir string, heads *tenantHeads, wals []WALIdentifier, legacy bool, logger log.Logger, repairsCounter *prometheus.CounterVec) error { for _, id := range wals { - // use anonymous function for ease of cleanup - if err := func(id WALIdentifier) error { - walPath := walPath(name, dir, id.ts) - if legacy { - walPath = legacyWalPath(dir, id.ts) - } + walPath := walPath(name, dir, id.ts) + if legacy { + walPath = legacyWalPath(dir, id.ts) + } + // use anonymous function for ease of cleanup + if werr := func(walPath string) error { reader, closer, err := wal.NewWalReader(walPath, -1) if err != nil { return err @@ -590,16 +592,34 @@ func recoverHead(name, dir string, heads *tenantHeads, wals []WALIdentifier, leg } return reader.Err() - }(id); err != nil { - return errors.Wrap( - err, - "error recovering from TSDB WAL", - ) + }(walPath); werr != nil { + // Try to repair the WAL if it's a corruption error. + var cerr *wlog.CorruptionErr + if !errors.As(werr, &cerr) { + return fmt.Errorf("error recovering head from TSDB WAL: %w", werr) + } + + level.Error(logger).Log("msg", "error recovering from TSDB WAL, will try repairing", "error", werr) + if err := repairWAL(werr, walPath, logger); err != nil { + repairsCounter.WithLabelValues(statusFailure).Inc() + return fmt.Errorf("repairing WAL failed: %w", err) + } + repairsCounter.WithLabelValues(statusSuccess).Inc() } } return nil } +func repairWAL(walErr error, walPath string, logger log.Logger) error { + wl, err := wlog.New(logger, nil, walPath, false) + if err != nil { + return fmt.Errorf("creating wlog for repair: %w", err) + } + defer wl.Close() + + return wl.Repair(walErr) +} + type WALIdentifier struct { ts time.Time } diff --git a/pkg/storage/stores/tsdb/head_manager_test.go b/pkg/storage/stores/tsdb/head_manager_test.go index 2837c8d602..cd8a230481 100644 --- a/pkg/storage/stores/tsdb/head_manager_test.go +++ b/pkg/storage/stores/tsdb/head_manager_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math" + "os" "path/filepath" "sync" "testing" @@ -46,7 +47,7 @@ func (m noopTSDBManager) BuildFromHead(_ *tenantHeads) error { } func (m noopTSDBManager) BuildFromWALs(_ time.Time, wals []WALIdentifier, _ bool) error { - return recoverHead(m.name, m.dir, m.tenantHeads, wals, false) + return recoverHead(m.name, m.dir, m.tenantHeads, wals, false, log.NewNopLogger(), NewMetrics(nil).walCorruptionsRepairs) } func (m noopTSDBManager) Start() error { return nil } @@ -249,7 +250,7 @@ func Test_HeadManager_RecoverHead(t *testing.T) { require.Nil(t, err) require.True(t, ok) require.Equal(t, 1, len(grp.wals)) - require.Nil(t, recoverHead(mgr.name, mgr.dir, mgr.activeHeads, grp.wals, false)) + require.Nil(t, recoverHead(mgr.name, mgr.dir, mgr.activeHeads, grp.wals, false, log.NewNopLogger(), NewMetrics(nil).walCorruptionsRepairs)) for _, c := range cases { refs, err := mgr.GetChunkRefs( @@ -265,6 +266,119 @@ func Test_HeadManager_RecoverHead(t *testing.T) { } +// test head recover from corrupted wal +func Test_HeadManager_RecoverHead_CorruptedWAL(t *testing.T) { + for _, tc := range []struct { + name string + setupFunc func(t *testing.T, walPath string, w *headWAL) + expectErr bool + }{ + { + name: "last record torn", + setupFunc: func(t *testing.T, walPath string, w *headWAL) { + // write enough records to fill a WAL page. + for i := 0; i < 1000; i++ { + require.Nil(t, w.Log(&WALRecord{ + UserID: "tenant1", + Fingerprint: mustParseLabels(`{foo="bar", bazz="buzz"}`).Hash(), + Series: record.RefSeries{ + Ref: chunks.HeadSeriesRef(i), + Labels: mustParseLabels(`{foo="bar", bazz="buzz"}`), + }, + Chks: ChunkMetasRecord{ + Chks: []index.ChunkMeta{ + { + MinTime: 1, + MaxTime: 10, + Checksum: 3, + }, + }, + Ref: uint64(i), + }, + })) + } + + require.Nil(t, w.Stop()) + + // truncate the WAL file to 100 bytes. + segmentFile := filepath.Join(walPath, "00000001") + require.Nil(t, os.Truncate(segmentFile, 32*1024)) // 32kb + }, + }, + { + name: "invalid checksum", + setupFunc: func(t *testing.T, walPath string, w *headWAL) { + require.Nil(t, w.Log(&WALRecord{ + UserID: "tenant1", + Fingerprint: mustParseLabels(`{foo="bar", bazz="buzz"}`).Hash(), + Series: record.RefSeries{ + Ref: chunks.HeadSeriesRef(1), + Labels: mustParseLabels(`{foo="bar", bazz="buzz"}`), + }, + Chks: ChunkMetasRecord{ + Chks: []index.ChunkMeta{ + { + MinTime: 1, + MaxTime: 10, + Checksum: 3, + }, + }, + Ref: uint64(1), + }, + })) + require.Nil(t, w.Stop()) + // This will truncate the DATA part of the WAL record, causing the checksum to be invalid. + require.Nil(t, os.Truncate(filepath.Join(walPath, "00000001"), 10)) // 7 bytes(header) + a little bit of data. + }, + }, + { + name: "invalid Loki WAL record", + setupFunc: func(t *testing.T, _ string, w *headWAL) { + require.Nil(t, w.wal.Log([]byte("not a valid Loki WAL record"))) + require.Nil(t, w.Stop()) + }, + // This is expected to fail because the WAL record is valid but the data part is not. + // We cannot repair the wal in this case. This would only happen if we update the WALRecord format + // and we try to replay a WAL that was written with the old format. + expectErr: true, + }, + } { + t.Run(tc.name, func(t *testing.T) { + dir := t.TempDir() + now := time.Now() + + storeName := "store_2010-10-10" + mgr := NewHeadManager(storeName, log.NewNopLogger(), dir, NewMetrics(nil), newNoopTSDBManager(storeName, dir)) + // This bit is normally handled by the Start() fn, but we're testing a smaller surface area + // so ensure our dirs exist + for _, d := range managerRequiredDirs(storeName, dir) { + require.Nil(t, util.EnsureDirectory(d)) + } + + // Call Rotate() to ensure the new head tenant heads exist, etc + require.Nil(t, mgr.Rotate(now)) + + // now build a WAL independently to test recovery + w, err := newHeadWAL(log.NewNopLogger(), walPath(mgr.name, mgr.dir, now), now) + require.Nil(t, err) + + tc.setupFunc(t, walPath(mgr.name, mgr.dir, now), w) + + grp, ok, err := walsForPeriod(managerWalDir(mgr.name, mgr.dir), mgr.period, mgr.period.PeriodFor(now)) + require.Nil(t, err) + require.True(t, ok) + require.Equal(t, 1, len(grp.wals)) + + err = recoverHead(mgr.name, mgr.dir, mgr.activeHeads, grp.wals, false, log.NewNopLogger(), NewMetrics(nil).walCorruptionsRepairs) + if tc.expectErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }) + } +} + // test mgr recover from multiple wals across multiple periods func Test_HeadManager_Lifecycle(t *testing.T) { dir := t.TempDir() diff --git a/pkg/storage/stores/tsdb/manager.go b/pkg/storage/stores/tsdb/manager.go index f4ea7978d9..416a7e74aa 100644 --- a/pkg/storage/stores/tsdb/manager.go +++ b/pkg/storage/stores/tsdb/manager.go @@ -286,7 +286,7 @@ func (m *tsdbManager) BuildFromWALs(t time.Time, ids []WALIdentifier, legacy boo level.Debug(m.log).Log("msg", "recovering tenant heads") for _, id := range ids { tmp := newTenantHeads(id.ts, defaultHeadManagerStripeSize, m.metrics, m.log) - if err = recoverHead(m.name, m.dir, tmp, []WALIdentifier{id}, legacy); err != nil { + if err = recoverHead(m.name, m.dir, tmp, []WALIdentifier{id}, legacy, m.log, m.metrics.walCorruptionsRepairs); err != nil { return errors.Wrap(err, "building TSDB from WALs") } diff --git a/pkg/util/wal/wal_format.md b/pkg/util/wal/wal_format.md new file mode 100644 index 0000000000..57ad381585 --- /dev/null +++ b/pkg/util/wal/wal_format.md @@ -0,0 +1,118 @@ +# WAL Segment Format Documentation + +## Overview + +A WAL (Write-Ahead Log) segment is a file containing a sequence of records. Each segment is divided into 32KB pages, and records can span multiple pages but never cross segment boundaries. This document describes the binary format of WAL segment files as used in Prometheus TSDB. + +## Segment Structure + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ WAL SEGMENT FILE │ +├─────────────────────────────────────────────────────────────────┤ +│ PAGE 0 (32KB) │ +├─────────────────────────────────────────────────────────────────┤ +│ RECORD 1 │ RECORD 2 │ RECORD 3 │ ... │ PADDING │ +├─────────────────────────────────────────────────────────────────┤ +│ PAGE 1 (32KB) │ +├─────────────────────────────────────────────────────────────────┤ +│ RECORD N │ RECORD N+1 │ ... │ │ PADDING │ +├─────────────────────────────────────────────────────────────────┤ +│ ... │ +└─────────────────────────────────────────────────────────────────┘ +``` + +## Record Structure + +Every record in a WAL segment follows this structure: + +``` +┌─────────────┬─────────────────────────────────────────────────────┐ +│ HEADER │ DATA │ +│ (7 bytes) │ (variable length) │ +└─────────────┴─────────────────────────────────────────────────────┘ +``` + +### Header Format (7 bytes total) + +``` + Byte 0 Bytes 1-2 Bytes 3-6 +┌─────────┬─────────────────┬─────────────────────────────────────┐ +│ TYPE │ LENGTH │ CRC32 │ +│(1 byte) │ (2 bytes) │ (4 bytes) │ +└─────────┴─────────────────┴─────────────────────────────────────┘ +``` + +#### Byte 0 - Record Type and Compression Flags + +``` +Bit: 7 6 5 4 3 2 1 0 + ┌───┬───┬───┬───┬───┬───┬───┬───┐ + │ - │ - │ - │ Z │ S │ T │ T │ T │ + └───┴───┴───┴───┴───┴───┴───┴───┘ + │ │ │ │ │ └───┴───┴───┘ + │ │ │ │ │ └─ Record Type (3 bits) + │ │ │ │ └─ Snappy Compression Flag (1 bit) + │ │ │ └─ Zstd Compression Flag (1 bit) + └───┴───┴─ Unallocated (3 bits) +``` + +**Record Types:** +- `0` (recPageTerm): Rest of page is empty +- `1` (recFull): Complete record fits in current page +- `2` (recFirst): First fragment of a record spanning multiple pages +- `3` (recMiddle): Middle fragment of a record spanning multiple pages +- `4` (recLast): Final fragment of a record spanning multiple pages + +**Compression Flags:** +- Bit 3 (snappyMask = 0x08): Set if data is Snappy compressed +- Bit 4 (zstdMask = 0x10): Set if data is Zstd compressed + +#### Bytes 1-2 - Data Length +Big-endian 16-bit unsigned integer representing the length of the data portion in bytes. + +#### Bytes 3-6 - CRC32 Checksum +Big-endian 32-bit CRC32 checksum (Castagnoli polynomial) of the data portion only. + +## Record Fragmentation + +When a record is larger than the remaining space in a page, it gets fragmented: + +``` +Page N Page N+1 +┌─────────────────────────────┐ ┌─────────────────────────────────┐ +│ [HEADER] [DATA PART 1] │ │ [HEADER] [DATA PART 2] [HEADER] │ +│ Type: recFirst │ │ Type: recLast Type: recFull│ +│ Length: 1024 │ │ Length: 512 Length: 256 │ +│ CRC: 0x12345678 │ │ CRC: 0x87654321 CRC: 0xABCD │ +└─────────────────────────────┘ └─────────────────────────────────┘ +``` + +## Page Boundaries + +- Each page is exactly 32KB (32,768 bytes) +- Records never span across segment boundaries +- Unused space at the end of pages is zero-padded +- A `recPageTerm` record type indicates the rest of the page is empty + +## Data Format + +The data portion contains the actual record payload. The format depends on the application using the WAL +- **Series Records**: Encoded series labels and references +- **Sample Records**: Encoded time series samples +- **Tombstone Records**: Encoded deletion markers +- **Custom Records**: Application-specific data. + +## Compression + +When compression is enabled: +1. The data is compressed before writing +2. The appropriate compression flag is set in the header +3. The CRC is calculated on the compressed data +4. The length field reflects the compressed data size + + +## References + +- [Prometheus WAL Disk Format](https://github.com/prometheus/prometheus/blob/main/tsdb/docs/format/wal.md) +- [Prometheus TSDB WAL and Checkpoint](https://ganeshvernekar.com/blog/prometheus-tsdb-wal-and-checkpoint/) \ No newline at end of file