fix: backport wal corruption fix to 2.9.x (#18229)

pull/18706/head
Assel Meher 6 months ago committed by GitHub
parent cc09480d86
commit 77fc888241
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 16
      pkg/storage/stores/tsdb/head.go
  2. 44
      pkg/storage/stores/tsdb/head_manager.go
  3. 118
      pkg/storage/stores/tsdb/head_manager_test.go
  4. 2
      pkg/storage/stores/tsdb/manager.go
  5. 118
      pkg/util/wal/wal_format.md

@ -62,11 +62,12 @@ guaranteeing we maintain querying consistency for the entire data lifecycle.
// TODO(owen-d)
type Metrics struct {
seriesNotFound prometheus.Counter
headRotations *prometheus.CounterVec
walTruncations *prometheus.CounterVec
tsdbBuilds *prometheus.CounterVec
tsdbBuildLastSuccess prometheus.Gauge
seriesNotFound prometheus.Counter
headRotations *prometheus.CounterVec
walTruncations *prometheus.CounterVec
tsdbBuilds *prometheus.CounterVec
tsdbBuildLastSuccess prometheus.Gauge
walCorruptionsRepairs *prometheus.CounterVec
}
func NewMetrics(r prometheus.Registerer) *Metrics {
@ -96,6 +97,11 @@ func NewMetrics(r prometheus.Registerer) *Metrics {
Name: "build_index_last_successful_timestamp_seconds",
Help: "Unix timestamp of the last successful tsdb index build",
}),
walCorruptionsRepairs: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki_tsdb",
Name: "wal_corruptions_repairs_total",
Help: "Total number of WAL corruptions repairs partitioned by status",
}, []string{statusLabel}),
}
}

@ -15,10 +15,12 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/wlog"
"go.uber.org/atomic"
"github.com/grafana/loki/pkg/storage/chunk"
@ -533,15 +535,15 @@ func legacyWalPath(parent string, t time.Time) string {
// recoverHead recovers from all WALs belonging to some period
// and inserts it into the active *tenantHeads
func recoverHead(name, dir string, heads *tenantHeads, wals []WALIdentifier, legacy bool) error {
func recoverHead(name, dir string, heads *tenantHeads, wals []WALIdentifier, legacy bool, logger log.Logger, repairsCounter *prometheus.CounterVec) error {
for _, id := range wals {
// use anonymous function for ease of cleanup
if err := func(id WALIdentifier) error {
walPath := walPath(name, dir, id.ts)
if legacy {
walPath = legacyWalPath(dir, id.ts)
}
walPath := walPath(name, dir, id.ts)
if legacy {
walPath = legacyWalPath(dir, id.ts)
}
// use anonymous function for ease of cleanup
if werr := func(walPath string) error {
reader, closer, err := wal.NewWalReader(walPath, -1)
if err != nil {
return err
@ -590,16 +592,34 @@ func recoverHead(name, dir string, heads *tenantHeads, wals []WALIdentifier, leg
}
return reader.Err()
}(id); err != nil {
return errors.Wrap(
err,
"error recovering from TSDB WAL",
)
}(walPath); werr != nil {
// Try to repair the WAL if it's a corruption error.
var cerr *wlog.CorruptionErr
if !errors.As(werr, &cerr) {
return fmt.Errorf("error recovering head from TSDB WAL: %w", werr)
}
level.Error(logger).Log("msg", "error recovering from TSDB WAL, will try repairing", "error", werr)
if err := repairWAL(werr, walPath, logger); err != nil {
repairsCounter.WithLabelValues(statusFailure).Inc()
return fmt.Errorf("repairing WAL failed: %w", err)
}
repairsCounter.WithLabelValues(statusSuccess).Inc()
}
}
return nil
}
func repairWAL(walErr error, walPath string, logger log.Logger) error {
wl, err := wlog.New(logger, nil, walPath, false)
if err != nil {
return fmt.Errorf("creating wlog for repair: %w", err)
}
defer wl.Close()
return wl.Repair(walErr)
}
type WALIdentifier struct {
ts time.Time
}

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math"
"os"
"path/filepath"
"sync"
"testing"
@ -46,7 +47,7 @@ func (m noopTSDBManager) BuildFromHead(_ *tenantHeads) error {
}
func (m noopTSDBManager) BuildFromWALs(_ time.Time, wals []WALIdentifier, _ bool) error {
return recoverHead(m.name, m.dir, m.tenantHeads, wals, false)
return recoverHead(m.name, m.dir, m.tenantHeads, wals, false, log.NewNopLogger(), NewMetrics(nil).walCorruptionsRepairs)
}
func (m noopTSDBManager) Start() error { return nil }
@ -249,7 +250,7 @@ func Test_HeadManager_RecoverHead(t *testing.T) {
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, 1, len(grp.wals))
require.Nil(t, recoverHead(mgr.name, mgr.dir, mgr.activeHeads, grp.wals, false))
require.Nil(t, recoverHead(mgr.name, mgr.dir, mgr.activeHeads, grp.wals, false, log.NewNopLogger(), NewMetrics(nil).walCorruptionsRepairs))
for _, c := range cases {
refs, err := mgr.GetChunkRefs(
@ -265,6 +266,119 @@ func Test_HeadManager_RecoverHead(t *testing.T) {
}
// test head recover from corrupted wal
func Test_HeadManager_RecoverHead_CorruptedWAL(t *testing.T) {
for _, tc := range []struct {
name string
setupFunc func(t *testing.T, walPath string, w *headWAL)
expectErr bool
}{
{
name: "last record torn",
setupFunc: func(t *testing.T, walPath string, w *headWAL) {
// write enough records to fill a WAL page.
for i := 0; i < 1000; i++ {
require.Nil(t, w.Log(&WALRecord{
UserID: "tenant1",
Fingerprint: mustParseLabels(`{foo="bar", bazz="buzz"}`).Hash(),
Series: record.RefSeries{
Ref: chunks.HeadSeriesRef(i),
Labels: mustParseLabels(`{foo="bar", bazz="buzz"}`),
},
Chks: ChunkMetasRecord{
Chks: []index.ChunkMeta{
{
MinTime: 1,
MaxTime: 10,
Checksum: 3,
},
},
Ref: uint64(i),
},
}))
}
require.Nil(t, w.Stop())
// truncate the WAL file to 100 bytes.
segmentFile := filepath.Join(walPath, "00000001")
require.Nil(t, os.Truncate(segmentFile, 32*1024)) // 32kb
},
},
{
name: "invalid checksum",
setupFunc: func(t *testing.T, walPath string, w *headWAL) {
require.Nil(t, w.Log(&WALRecord{
UserID: "tenant1",
Fingerprint: mustParseLabels(`{foo="bar", bazz="buzz"}`).Hash(),
Series: record.RefSeries{
Ref: chunks.HeadSeriesRef(1),
Labels: mustParseLabels(`{foo="bar", bazz="buzz"}`),
},
Chks: ChunkMetasRecord{
Chks: []index.ChunkMeta{
{
MinTime: 1,
MaxTime: 10,
Checksum: 3,
},
},
Ref: uint64(1),
},
}))
require.Nil(t, w.Stop())
// This will truncate the DATA part of the WAL record, causing the checksum to be invalid.
require.Nil(t, os.Truncate(filepath.Join(walPath, "00000001"), 10)) // 7 bytes(header) + a little bit of data.
},
},
{
name: "invalid Loki WAL record",
setupFunc: func(t *testing.T, _ string, w *headWAL) {
require.Nil(t, w.wal.Log([]byte("not a valid Loki WAL record")))
require.Nil(t, w.Stop())
},
// This is expected to fail because the WAL record is valid but the data part is not.
// We cannot repair the wal in this case. This would only happen if we update the WALRecord format
// and we try to replay a WAL that was written with the old format.
expectErr: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
dir := t.TempDir()
now := time.Now()
storeName := "store_2010-10-10"
mgr := NewHeadManager(storeName, log.NewNopLogger(), dir, NewMetrics(nil), newNoopTSDBManager(storeName, dir))
// This bit is normally handled by the Start() fn, but we're testing a smaller surface area
// so ensure our dirs exist
for _, d := range managerRequiredDirs(storeName, dir) {
require.Nil(t, util.EnsureDirectory(d))
}
// Call Rotate() to ensure the new head tenant heads exist, etc
require.Nil(t, mgr.Rotate(now))
// now build a WAL independently to test recovery
w, err := newHeadWAL(log.NewNopLogger(), walPath(mgr.name, mgr.dir, now), now)
require.Nil(t, err)
tc.setupFunc(t, walPath(mgr.name, mgr.dir, now), w)
grp, ok, err := walsForPeriod(managerWalDir(mgr.name, mgr.dir), mgr.period, mgr.period.PeriodFor(now))
require.Nil(t, err)
require.True(t, ok)
require.Equal(t, 1, len(grp.wals))
err = recoverHead(mgr.name, mgr.dir, mgr.activeHeads, grp.wals, false, log.NewNopLogger(), NewMetrics(nil).walCorruptionsRepairs)
if tc.expectErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}
// test mgr recover from multiple wals across multiple periods
func Test_HeadManager_Lifecycle(t *testing.T) {
dir := t.TempDir()

@ -286,7 +286,7 @@ func (m *tsdbManager) BuildFromWALs(t time.Time, ids []WALIdentifier, legacy boo
level.Debug(m.log).Log("msg", "recovering tenant heads")
for _, id := range ids {
tmp := newTenantHeads(id.ts, defaultHeadManagerStripeSize, m.metrics, m.log)
if err = recoverHead(m.name, m.dir, tmp, []WALIdentifier{id}, legacy); err != nil {
if err = recoverHead(m.name, m.dir, tmp, []WALIdentifier{id}, legacy, m.log, m.metrics.walCorruptionsRepairs); err != nil {
return errors.Wrap(err, "building TSDB from WALs")
}

@ -0,0 +1,118 @@
# WAL Segment Format Documentation
## Overview
A WAL (Write-Ahead Log) segment is a file containing a sequence of records. Each segment is divided into 32KB pages, and records can span multiple pages but never cross segment boundaries. This document describes the binary format of WAL segment files as used in Prometheus TSDB.
## Segment Structure
```
┌─────────────────────────────────────────────────────────────────┐
│ WAL SEGMENT FILE │
├─────────────────────────────────────────────────────────────────┤
│ PAGE 0 (32KB) │
├─────────────────────────────────────────────────────────────────┤
│ RECORD 1 │ RECORD 2 │ RECORD 3 │ ... │ PADDING │
├─────────────────────────────────────────────────────────────────┤
│ PAGE 1 (32KB) │
├─────────────────────────────────────────────────────────────────┤
│ RECORD N │ RECORD N+1 │ ... │ │ PADDING │
├─────────────────────────────────────────────────────────────────┤
│ ... │
└─────────────────────────────────────────────────────────────────┘
```
## Record Structure
Every record in a WAL segment follows this structure:
```
┌─────────────┬─────────────────────────────────────────────────────┐
│ HEADER │ DATA │
│ (7 bytes) │ (variable length) │
└─────────────┴─────────────────────────────────────────────────────┘
```
### Header Format (7 bytes total)
```
Byte 0 Bytes 1-2 Bytes 3-6
┌─────────┬─────────────────┬─────────────────────────────────────┐
│ TYPE │ LENGTH │ CRC32 │
│(1 byte) │ (2 bytes) │ (4 bytes) │
└─────────┴─────────────────┴─────────────────────────────────────┘
```
#### Byte 0 - Record Type and Compression Flags
```
Bit: 7 6 5 4 3 2 1 0
┌───┬───┬───┬───┬───┬───┬───┬───┐
│ - │ - │ - │ Z │ S │ T │ T │ T │
└───┴───┴───┴───┴───┴───┴───┴───┘
│ │ │ │ │ └───┴───┴───┘
│ │ │ │ │ └─ Record Type (3 bits)
│ │ │ │ └─ Snappy Compression Flag (1 bit)
│ │ │ └─ Zstd Compression Flag (1 bit)
└───┴───┴─ Unallocated (3 bits)
```
**Record Types:**
- `0` (recPageTerm): Rest of page is empty
- `1` (recFull): Complete record fits in current page
- `2` (recFirst): First fragment of a record spanning multiple pages
- `3` (recMiddle): Middle fragment of a record spanning multiple pages
- `4` (recLast): Final fragment of a record spanning multiple pages
**Compression Flags:**
- Bit 3 (snappyMask = 0x08): Set if data is Snappy compressed
- Bit 4 (zstdMask = 0x10): Set if data is Zstd compressed
#### Bytes 1-2 - Data Length
Big-endian 16-bit unsigned integer representing the length of the data portion in bytes.
#### Bytes 3-6 - CRC32 Checksum
Big-endian 32-bit CRC32 checksum (Castagnoli polynomial) of the data portion only.
## Record Fragmentation
When a record is larger than the remaining space in a page, it gets fragmented:
```
Page N Page N+1
┌─────────────────────────────┐ ┌─────────────────────────────────┐
│ [HEADER] [DATA PART 1] │ │ [HEADER] [DATA PART 2] [HEADER] │
│ Type: recFirst │ │ Type: recLast Type: recFull│
│ Length: 1024 │ │ Length: 512 Length: 256 │
│ CRC: 0x12345678 │ │ CRC: 0x87654321 CRC: 0xABCD │
└─────────────────────────────┘ └─────────────────────────────────┘
```
## Page Boundaries
- Each page is exactly 32KB (32,768 bytes)
- Records never span across segment boundaries
- Unused space at the end of pages is zero-padded
- A `recPageTerm` record type indicates the rest of the page is empty
## Data Format
The data portion contains the actual record payload. The format depends on the application using the WAL
- **Series Records**: Encoded series labels and references
- **Sample Records**: Encoded time series samples
- **Tombstone Records**: Encoded deletion markers
- **Custom Records**: Application-specific data.
## Compression
When compression is enabled:
1. The data is compressed before writing
2. The appropriate compression flag is set in the header
3. The CRC is calculated on the compressed data
4. The length field reflects the compressed data size
## References
- [Prometheus WAL Disk Format](https://github.com/prometheus/prometheus/blob/main/tsdb/docs/format/wal.md)
- [Prometheus TSDB WAL and Checkpoint](https://ganeshvernekar.com/blog/prometheus-tsdb-wal-and-checkpoint/)
Loading…
Cancel
Save