mirror of https://github.com/grafana/loki
WAL backpressure (#3218)
* adds WAL replay backpressure * backpressure fn doesnt lock during passed fn for concurrency * replayController * replayController supports a RemoveFlushedChunks fn * custom wal flusher * encoding test determinism * standardizes recoveredBytesTotal to replayControllers ownership * linting * default wal memory threshold to 4gb, adds docs * fix test after v3 chunk schema change * more lenient expectCheckpoint function * replayController protects against flush race condition * adds replay_memory_ceiling to wal jsonnet * replay ceiling help msgpull/3251/head
parent
2bbf4b21c3
commit
224e68a50b
@ -0,0 +1,104 @@ |
||||
package ingester |
||||
|
||||
import ( |
||||
"sync" |
||||
|
||||
"go.uber.org/atomic" |
||||
) |
||||
|
||||
type replayFlusher struct { |
||||
i *Ingester |
||||
} |
||||
|
||||
func (f *replayFlusher) Flush() { |
||||
f.i.InitFlushQueues() |
||||
f.i.flush(false) // flush data but don't remove streams from the ingesters
|
||||
|
||||
// Similar to sweepUsers with the exception that it will not remove streams
|
||||
// afterwards to prevent unlinking a stream which may receive later writes from the WAL.
|
||||
// We have to do this here after the flushQueues have been drained.
|
||||
instances := f.i.getInstances() |
||||
|
||||
for _, instance := range instances { |
||||
instance.streamsMtx.Lock() |
||||
|
||||
for _, stream := range instance.streams { |
||||
f.i.removeFlushedChunks(instance, stream, false) |
||||
} |
||||
|
||||
instance.streamsMtx.Unlock() |
||||
} |
||||
} |
||||
|
||||
type Flusher interface { |
||||
Flush() |
||||
} |
||||
|
||||
// replayController handles coordinating backpressure between WAL replays and chunk flushing.
|
||||
type replayController struct { |
||||
cfg WALConfig |
||||
metrics *ingesterMetrics |
||||
currentBytes atomic.Int64 |
||||
cond *sync.Cond |
||||
isFlushing atomic.Bool |
||||
flusher Flusher |
||||
} |
||||
|
||||
// flusher is expected to reduce pressure via calling Sub
|
||||
func newReplayController(metrics *ingesterMetrics, cfg WALConfig, flusher Flusher) *replayController { |
||||
return &replayController{ |
||||
cfg: cfg, |
||||
metrics: metrics, |
||||
cond: sync.NewCond(&sync.Mutex{}), |
||||
flusher: flusher, |
||||
} |
||||
} |
||||
|
||||
func (c *replayController) Add(x int64) { |
||||
c.metrics.recoveredBytesTotal.Add(float64(x)) |
||||
c.metrics.setRecoveryBytesInUse(c.currentBytes.Add(x)) |
||||
} |
||||
|
||||
func (c *replayController) Sub(x int64) { |
||||
c.metrics.setRecoveryBytesInUse(c.currentBytes.Sub(x)) |
||||
|
||||
} |
||||
|
||||
func (c *replayController) Cur() int { |
||||
return int(c.currentBytes.Load()) |
||||
} |
||||
|
||||
func (c *replayController) Flush() { |
||||
if c.isFlushing.CAS(false, true) { |
||||
c.flusher.Flush() |
||||
c.isFlushing.Store(false) |
||||
|
||||
// Broadcast after lock is acquired to prevent race conditions with cpu scheduling
|
||||
// where the flush code could finish before the goroutine which initiated it gets to call
|
||||
// c.cond.Wait()
|
||||
c.cond.L.Lock() |
||||
c.cond.Broadcast() |
||||
c.cond.L.Unlock() |
||||
} |
||||
} |
||||
|
||||
// WithBackPressure is expected to call replayController.Add in the passed function to increase the managed byte count.
|
||||
// It will call the function as long as there is expected room before the memory cap and will then flush data intermittently
|
||||
// when needed.
|
||||
func (c *replayController) WithBackPressure(fn func() error) error { |
||||
// Account for backpressure and wait until there's enough memory to continue replaying the WAL
|
||||
c.cond.L.Lock() |
||||
|
||||
// use 90% as a threshold since we'll be adding to it.
|
||||
for c.Cur() > int(c.cfg.ReplayMemoryCeiling)*9/10 { |
||||
// too much backpressure, flush
|
||||
go c.Flush() |
||||
c.cond.Wait() |
||||
} |
||||
|
||||
// Don't hold the lock while executing the provided function.
|
||||
// This ensures we can run functions concurrently.
|
||||
c.cond.L.Unlock() |
||||
|
||||
return fn() |
||||
} |
||||
@ -0,0 +1,77 @@ |
||||
package ingester |
||||
|
||||
import ( |
||||
"sync" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/stretchr/testify/require" |
||||
) |
||||
|
||||
type dumbFlusher struct { |
||||
onFlush func() |
||||
} |
||||
|
||||
func newDumbFlusher(onFlush func()) *dumbFlusher { |
||||
return &dumbFlusher{ |
||||
onFlush: onFlush, |
||||
} |
||||
} |
||||
|
||||
func (f *dumbFlusher) Flush() { |
||||
if f.onFlush != nil { |
||||
f.onFlush() |
||||
} |
||||
} |
||||
|
||||
func nilMetrics() *ingesterMetrics { return newIngesterMetrics(nil) } |
||||
|
||||
func TestReplayController(t *testing.T) { |
||||
var ops []string |
||||
var opLock sync.Mutex |
||||
|
||||
var rc *replayController |
||||
flusher := newDumbFlusher( |
||||
func() { |
||||
rc.Sub(100) // simulate flushing 100 bytes
|
||||
opLock.Lock() |
||||
defer opLock.Unlock() |
||||
ops = append(ops, "Flush") |
||||
}, |
||||
) |
||||
rc = newReplayController(nilMetrics(), WALConfig{ReplayMemoryCeiling: 100}, flusher) |
||||
|
||||
var wg sync.WaitGroup |
||||
n := 5 |
||||
wg.Add(n) |
||||
|
||||
for i := 0; i < n; i++ { |
||||
// In order to prevent all the goroutines from running before they've added bytes
|
||||
// to the internal count, introduce a brief sleep.
|
||||
time.Sleep(time.Millisecond) |
||||
|
||||
// nolint:errcheck,unparam
|
||||
go rc.WithBackPressure(func() error { |
||||
rc.Add(50) |
||||
opLock.Lock() |
||||
defer opLock.Unlock() |
||||
ops = append(ops, "WithBackPressure") |
||||
wg.Done() |
||||
return nil |
||||
}) |
||||
} |
||||
|
||||
wg.Wait() |
||||
|
||||
expected := []string{ |
||||
"WithBackPressure", // add 50, total 50
|
||||
"WithBackPressure", // add 50, total 100
|
||||
"Flush", // subtract 100, total 0
|
||||
"WithBackPressure", // add 50, total 50
|
||||
"WithBackPressure", // add 50, total 100
|
||||
"Flush", // subtract 100, total 0
|
||||
"WithBackPressure", // add 50, total 50
|
||||
} |
||||
require.Equal(t, expected, ops) |
||||
|
||||
} |
||||
Loading…
Reference in new issue