Don't fail writes due to full WAL disk (#3136)

* dont fail writes on full wal disk

* wal full failure will cause flush on shutdown

* logs the first full WAL failure
pull/3141/head
Owen Diehl 5 years ago committed by GitHub
parent fd94b8d30d
commit ed649eec37
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 24
      docs/sources/operations/storage/wal.md
  2. 29
      pkg/ingester/checkpoint_test.go
  3. 39
      pkg/ingester/flush_test.go
  4. 29
      pkg/ingester/ingester.go
  5. 57
      pkg/ingester/instance.go
  6. 12
      pkg/ingester/instance_test.go
  7. 5
      pkg/ingester/metrics.go
  8. 2
      pkg/ingester/transfer_test.go

@ -4,11 +4,29 @@ title: Write Ahead Log
# Write Ahead Log (WAL)
Ingesters temporarily store data in memory. In the event of a crash, there could be data loss. The WAL helps fill this gap in reliability.
Ingesters store all their data in memory. If there is a crash, there can be data loss. The WAL helps fill this gap in reliability.
This section will use Kubernetes as a reference.
The WAL in Loki records incoming data and stores it on the local file system in order to guarantee persistence of acknowledged data in the event of a process crash. Upon restart, Loki will "replay" all of the data in the log before registering itself as ready for subsequent writes. This allows Loki to maintain the performance & cost benefits of buffering data in memory _and_ durability benefits (it won't lose data once a write has been acknowledged).
To use the WAL, there are some changes that needs to be made.
This section will use Kubernetes as a reference deployment paradigm in the examples.
## Disclaimer & WAL nuances
The Write Ahead Log in Loki takes a few particular tradeoffs compared to other WALs you may be familiar with. The WAL aims to add additional durability guarantees, but _not at the expense of availability_. Particularly, there are two scenarios where the WAL sacrifices these guarantees.
1) Corruption/Deletion of the WAL prior to replaying it
In the event the WAL is corrupted/partially deleted, Loki will not be able to recover all of it's data. In this case, Loki will attempt to recover any data it can, but will not prevent Loki from starting.
Note: the Prometheus metric `loki_ingester_wal_corruptions_total` can be used to track and alert when this happens.
1) No space left on disk
In the event the underlying WAL disk is full, Loki will not fail incoming writes, but neither will it log them to the WAL. In this case, the persistence guarantees across process restarts will not hold.
Note: the Prometheus metric `loki_ingester_wal_disk_full_failures_total` can be used to track and alert when this happens.
### Metrics
## Changes to deployment

@ -38,12 +38,7 @@ func ensureIngesterData(ctx context.Context, t *testing.T, start, end time.Time,
require.Len(t, result.resps[0].Streams[1].Entries, ln)
}
func TestIngesterWAL(t *testing.T) {
walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal")
require.Nil(t, err)
defer os.RemoveAll(walDir)
func defaultIngesterTestConfigWithWAL(t *testing.T, walDir string) Config {
ingesterConfig := defaultIngesterTestConfig(t)
ingesterConfig.MaxTransferRetries = 0
ingesterConfig.WAL = WALConfig{
@ -52,6 +47,18 @@ func TestIngesterWAL(t *testing.T) {
Recover: true,
CheckpointDuration: time.Second,
}
return ingesterConfig
}
func TestIngesterWAL(t *testing.T) {
walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal")
require.Nil(t, err)
defer os.RemoveAll(walDir)
ingesterConfig := defaultIngesterTestConfigWithWAL(t, walDir)
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
@ -134,14 +141,8 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) {
require.Nil(t, err)
defer os.RemoveAll(walDir)
ingesterConfig := defaultIngesterTestConfig(t)
ingesterConfig.MaxTransferRetries = 0
ingesterConfig.WAL = WALConfig{
Enabled: true,
Dir: walDir,
Recover: true,
CheckpointDuration: time.Second,
}
ingesterConfig := defaultIngesterTestConfigWithWAL(t, walDir)
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)

@ -2,8 +2,11 @@ package ingester
import (
"fmt"
"io/ioutil"
"os"
"sort"
"sync"
"syscall"
"testing"
"time"
@ -44,7 +47,7 @@ func TestChunkFlushingIdle(t *testing.T) {
cfg.MaxChunkIdle = 100 * time.Millisecond
cfg.RetainPeriod = 500 * time.Millisecond
store, ing := newTestStore(t, cfg)
store, ing := newTestStore(t, cfg, nil)
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
testData := pushTestSamples(t, ing)
@ -54,7 +57,25 @@ func TestChunkFlushingIdle(t *testing.T) {
}
func TestChunkFlushingShutdown(t *testing.T) {
store, ing := newTestStore(t, defaultIngesterTestConfig(t))
store, ing := newTestStore(t, defaultIngesterTestConfig(t), nil)
testData := pushTestSamples(t, ing)
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing))
store.checkData(t, testData)
}
type fullWAL struct{}
func (fullWAL) Log(_ *WALRecord) error { return &os.PathError{Err: syscall.ENOSPC} }
func (fullWAL) Stop() error { return nil }
func TestWALFullFlush(t *testing.T) {
// technically replaced with a fake wal, but the ingester New() function creates a regular wal first,
// so we enable creation/cleanup even though it remains unused.
walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal")
require.Nil(t, err)
defer os.RemoveAll(walDir)
store, ing := newTestStore(t, defaultIngesterTestConfigWithWAL(t, walDir), fullWAL{})
testData := pushTestSamples(t, ing)
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing))
store.checkData(t, testData)
@ -66,7 +87,7 @@ func TestFlushingCollidingLabels(t *testing.T) {
cfg.MaxChunkIdle = 100 * time.Millisecond
cfg.RetainPeriod = 500 * time.Millisecond
store, ing := newTestStore(t, cfg)
store, ing := newTestStore(t, cfg, nil)
defer store.Stop()
const userID = "testUser"
@ -112,7 +133,7 @@ func TestFlushMaxAge(t *testing.T) {
cfg.MaxChunkAge = time.Minute
cfg.MaxChunkIdle = time.Hour
store, ing := newTestStore(t, cfg)
store, ing := newTestStore(t, cfg, nil)
defer store.Stop()
now := time.Unix(0, 0)
@ -166,7 +187,10 @@ type testStore struct {
chunks map[string][]chunk.Chunk
}
func newTestStore(t require.TestingT, cfg Config) (*testStore, *Ingester) {
// Note: the ingester New() function creates it's own WAL first which we then override if specified.
// Because of this, ensure any WAL directories exist/are cleaned up even when overriding the wal.
// This is an ugly hook for testing :(
func newTestStore(t require.TestingT, cfg Config, walOverride WAL) (*testStore, *Ingester) {
store := &testStore{
chunks: map[string][]chunk.Chunk{},
}
@ -178,6 +202,11 @@ func newTestStore(t require.TestingT, cfg Config) (*testStore, *Ingester) {
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
if walOverride != nil {
_ = ing.wal.Stop()
ing.wal = walOverride
}
return store, ing
}

@ -146,6 +146,10 @@ type Ingester struct {
limiter *Limiter
// Denotes whether the ingester should flush on shutdown.
// Currently only used by the WAL to signal when the disk is full.
flushOnShutdownSwitch *OnceSwitch
metrics *ingesterMetrics
wal WAL
@ -169,15 +173,16 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid
metrics := newIngesterMetrics(registerer)
i := &Ingester{
cfg: cfg,
clientConfig: clientConfig,
instances: map[string]*instance{},
store: store,
periodicConfigs: store.GetSchemaConfigs(),
loopQuit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
tailersQuit: make(chan struct{}),
metrics: metrics,
cfg: cfg,
clientConfig: clientConfig,
instances: map[string]*instance{},
store: store,
periodicConfigs: store.GetSchemaConfigs(),
loopQuit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
tailersQuit: make(chan struct{}),
metrics: metrics,
flushOnShutdownSwitch: &OnceSwitch{},
}
if cfg.WAL.Enabled {
@ -319,6 +324,10 @@ func (i *Ingester) stopping(_ error) error {
i.stopIncomingRequests()
var errs errUtil.MultiError
errs.Add(i.wal.Stop())
if i.flushOnShutdownSwitch.Get() {
i.lifecycler.SetFlushOnShutdown(true)
}
errs.Add(services.StopAndAwaitTerminated(context.Background(), i.lifecycler))
// Normally, flushers are stopped via lifecycler (in transferOut), but if lifecycler fails,
@ -384,7 +393,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) *instance {
defer i.instancesMtx.Unlock()
inst, ok = i.instances[instanceID]
if !ok {
inst = newInstance(&i.cfg, instanceID, i.limiter, i.wal, i.metrics)
inst = newInstance(&i.cfg, instanceID, i.limiter, i.wal, i.metrics, i.flushOnShutdownSwitch)
i.instances[instanceID] = inst
}
return inst

@ -3,8 +3,11 @@ package ingester
import (
"context"
"net/http"
"os"
"sync"
"syscall"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
@ -15,6 +18,7 @@ import (
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ingester/index"
"github.com/cortexproject/cortex/pkg/util"
cutil "github.com/cortexproject/cortex/pkg/util"
"github.com/grafana/loki/pkg/helpers"
@ -77,6 +81,10 @@ type instance struct {
wal WAL
// Denotes whether the ingester should flush on shutdown.
// Currently only used by the WAL to signal when the disk is full.
flushOnShutdownSwitch *OnceSwitch
metrics *ingesterMetrics
}
@ -86,6 +94,7 @@ func newInstance(
limiter *Limiter,
wal WAL,
metrics *ingesterMetrics,
flushOnShutdownSwitch *OnceSwitch,
) *instance {
i := &instance{
cfg: cfg,
@ -101,8 +110,9 @@ func newInstance(
tailers: map[uint32]*tailer{},
limiter: limiter,
wal: wal,
metrics: metrics,
wal: wal,
metrics: metrics,
flushOnShutdownSwitch: flushOnShutdownSwitch,
}
i.mapper = newFPMapper(i.getLabelsFromFingerprint)
return i
@ -161,8 +171,19 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
if !record.IsEmpty() {
if err := i.wal.Log(record); err != nil {
return err
if e, ok := err.(*os.PathError); ok && e.Err == syscall.ENOSPC {
i.metrics.walDiskFullFailures.Inc()
i.flushOnShutdownSwitch.TriggerAnd(func() {
level.Error(util.Logger).Log(
"msg",
"Error writing to WAL, disk full, no further messages will be logged for this error",
)
})
} else {
return err
}
}
}
return appendErr
@ -578,3 +599,33 @@ func shouldConsiderStream(stream *stream, req *logproto.SeriesRequest) bool {
}
return false
}
// OnceSwitch is a write optimized switch that can only ever be switched "on".
// It uses a RWMutex underneath the hood to quickly and effectively (in a concurrent environment)
// check if the switch has already been triggered, only actually acquiring the mutex for writing if not.
type OnceSwitch struct {
sync.RWMutex
toggle bool
}
func (o *OnceSwitch) Get() bool {
o.RLock()
defer o.RUnlock()
return o.toggle
}
// TriggerAnd will ensure the switch is on and run the provided function if
// the switch was not already toggled on.
func (o *OnceSwitch) TriggerAnd(fn func()) {
o.RLock()
if o.toggle {
o.RUnlock()
return
}
o.RUnlock()
o.Lock()
o.toggle = true
o.Unlock()
fn()
}

@ -35,7 +35,7 @@ func TestLabelsCollisions(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
i := newInstance(defaultConfig(), "test", limiter, noopWAL{}, nil)
i := newInstance(defaultConfig(), "test", limiter, noopWAL{}, nil, &OnceSwitch{})
// avoid entries from the future.
tt := time.Now().Add(-5 * time.Minute)
@ -62,7 +62,7 @@ func TestConcurrentPushes(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
inst := newInstance(defaultConfig(), "test", limiter, noopWAL{}, NilMetrics)
inst := newInstance(defaultConfig(), "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{})
const (
concurrent = 10
@ -120,7 +120,7 @@ func TestSyncPeriod(t *testing.T) {
minUtil = 0.20
)
inst := newInstance(defaultConfig(), "test", limiter, noopWAL{}, NilMetrics)
inst := newInstance(defaultConfig(), "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{})
lbls := makeRandomLabels()
tt := time.Now()
@ -160,7 +160,7 @@ func Test_SeriesQuery(t *testing.T) {
cfg.SyncPeriod = 1 * time.Minute
cfg.SyncMinUtilization = 0.20
instance := newInstance(cfg, "test", limiter, noopWAL{}, NilMetrics)
instance := newInstance(cfg, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{})
currentTime := time.Now()
@ -271,7 +271,7 @@ func Benchmark_PushInstance(b *testing.B) {
require.NoError(b, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
i := newInstance(&Config{}, "test", limiter, noopWAL{}, NilMetrics)
i := newInstance(&Config{}, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{})
ctx := context.Background()
for n := 0; n < b.N; n++ {
@ -313,7 +313,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) {
ctx := context.Background()
inst := newInstance(&Config{}, "test", limiter, noopWAL{}, NilMetrics)
inst := newInstance(&Config{}, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{})
t, err := newTailer("foo", `{namespace="foo",pod="bar",instance=~"10.*"}`, nil)
require.NoError(b, err)
for i := 0; i < 10000; i++ {

@ -13,6 +13,7 @@ type ingesterMetrics struct {
checkpointDuration prometheus.Summary
checkpointLoggedBytesTotal prometheus.Counter
walDiskFullFailures prometheus.Counter
walReplayDuration prometheus.Gauge
walCorruptionsTotal *prometheus.CounterVec
walLoggedBytesTotal prometheus.Counter
@ -30,6 +31,10 @@ const (
func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
return &ingesterMetrics{
walDiskFullFailures: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_wal_disk_full_failures_total",
Help: "Total number of wal write failures due to full disk.",
}),
walReplayDuration: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_wal_replay_duration_seconds",
Help: "Time taken to replay the checkpoint and the WAL.",

@ -165,7 +165,7 @@ func (f *testIngesterFactory) getIngester(joinAfter time.Duration, t *testing.T)
}, nil
}
_, ing := newTestStore(f.t, cfg)
_, ing := newTestStore(f.t, cfg, nil)
f.ingesters[fmt.Sprintf("%s:0", cfg.LifecyclerConfig.ID)] = ing
// NB there's some kind of race condition with the in-memory KV client when

Loading…
Cancel
Save