Fix race condition when stopping to tail files (#10631)

**What this PR does / why we need it**:

Users are getting `PromtailFileMissing` alerts, which are defined as:
```
promtail_file_bytes_total{namespace!=""} unless promtail_read_bytes_total{namespace!=""}
```

This appears to be due to a race condition where a file can be deleted
while the file target is being stopped.
pull/10469/head^2
Piotr 2 years ago committed by GitHub
parent 641c9ee48a
commit a8d5815510
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      CHANGELOG.md
  2. 163
      clients/pkg/promtail/targets/file/filetarget_test.go
  3. 7
      clients/pkg/promtail/targets/file/tailer.go

@ -34,6 +34,8 @@
##### Fixes
* [10631](https://github.com/grafana/loki/pull/10631) **thampiotr**: Fix race condition in cleaning up metrics when stopping to tail files.
#### LogCLI
##### Fixes

@ -1,22 +1,25 @@
package file
import (
"bytes"
"context"
"fmt"
"math/rand"
"os"
"path/filepath"
"sort"
"sync"
"testing"
"time"
"github.com/fsnotify/fsnotify"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"github.com/go-kit/log"
"github.com/grafana/loki/clients/pkg/promtail/client/fake"
"github.com/grafana/loki/clients/pkg/promtail/positions"
)
@ -109,9 +112,10 @@ func TestFileTargetSync(t *testing.T) {
assert.Equal(t, 1, len(target.readers),
"Expected tails to be 1 at this point in the test...",
)
require.Eventually(t, func() bool {
requireEventually(t, func() bool {
return receivedStartWatch.Load() == 1
}, time.Second*10, time.Millisecond*1, "Expected received starting watch event to be 1 at this point in the test...")
}, "Expected received starting watch event to be 1 at this point in the test...")
// Add another file, should get another tailer.
_, err = os.Create(logDir1File2)
@ -154,12 +158,12 @@ func TestFileTargetSync(t *testing.T) {
assert.Equal(t, 0, len(target.readers),
"Expected tails to be 0 at this point in the test...",
)
require.Eventually(t, func() bool {
requireEventually(t, func() bool {
return receivedStartWatch.Load() == 1
}, time.Second*10, time.Millisecond*1, "Expected received starting watch event to be 1 at this point in the test...")
require.Eventually(t, func() bool {
}, "Expected received starting watch event to be 1 at this point in the test...")
requireEventually(t, func() bool {
return receivedStartWatch.Load() == 1
}, time.Second*10, time.Millisecond*1, "Expected received stopping watch event to be 1 at this point in the test...")
}, "Expected received stopping watch event to be 1 at this point in the test...")
target.Stop()
ps.Stop()
@ -184,7 +188,8 @@ func TestFileTarget_StopsTailersCleanly(t *testing.T) {
fakeHandler := make(chan fileTargetEvent, 10)
pathToWatch := filepath.Join(tempDir, "*.log")
target, err := NewFileTarget(NewMetrics(nil), logger, client, ps, pathToWatch, "", nil, nil, &Config{
registry := prometheus.NewRegistry()
target, err := NewFileTarget(NewMetrics(registry), logger, client, ps, pathToWatch, "", nil, nil, &Config{
SyncPeriod: 10 * time.Millisecond,
}, DefaultWatchConig, nil, fakeHandler, "", nil)
assert.NoError(t, err)
@ -192,18 +197,24 @@ func TestFileTarget_StopsTailersCleanly(t *testing.T) {
_, err = os.Create(logFile)
assert.NoError(t, err)
require.Eventually(t, func() bool {
requireEventually(t, func() bool {
return len(target.readers) == 1
}, time.Second*10, time.Millisecond*1, "expected 1 tailer to be created")
}, "expected 1 tailer to be created")
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP promtail_files_active_total Number of active files.
# TYPE promtail_files_active_total gauge
promtail_files_active_total 1
`), "promtail_files_active_total"))
// Inject an error to tailer
initailTailer := target.readers[logFile].(*tailer)
_ = initailTailer.tail.Tomb.Killf("test: network file systems can be unreliable")
// Tailer will be replaced by a new one
require.Eventually(t, func() bool {
requireEventually(t, func() bool {
return len(target.readers) == 1 && target.readers[logFile].(*tailer) != initailTailer
}, time.Second*10, time.Millisecond*1, "expected dead tailer to be replaced by a new one")
}, "expected dead tailer to be replaced by a new one")
// The old tailer should be stopped:
select {
@ -221,6 +232,101 @@ func TestFileTarget_StopsTailersCleanly(t *testing.T) {
target.Stop()
ps.Stop()
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP promtail_files_active_total Number of active files.
# TYPE promtail_files_active_total gauge
promtail_files_active_total 0
`), "promtail_files_active_total"))
}
func TestFileTarget_StopsTailersCleanly_Parallel(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
tempDir := t.TempDir()
positionsFileName := filepath.Join(tempDir, "positions.yml")
ps, err := positions.New(logger, positions.Config{
SyncPeriod: 10 * time.Millisecond,
PositionsFile: positionsFileName,
})
require.NoError(t, err)
client := fake.New(func() {})
defer client.Stop()
pathToWatch := filepath.Join(tempDir, "*.log")
registry := prometheus.NewRegistry()
metrics := NewMetrics(registry)
// Increase this to several thousand to make the test more likely to fail when debugging a race condition
iterations := 500
fakeHandler := make(chan fileTargetEvent, 10*iterations)
for i := 0; i < iterations; i++ {
logFile := filepath.Join(tempDir, fmt.Sprintf("test_%d.log", i))
target, err := NewFileTarget(metrics, logger, client, ps, pathToWatch, "", nil, nil, &Config{
SyncPeriod: 10 * time.Millisecond,
}, DefaultWatchConig, nil, fakeHandler, "", nil)
assert.NoError(t, err)
file, err := os.Create(logFile)
assert.NoError(t, err)
// Write some data to the file
for j := 0; j < 5; j++ {
_, _ = file.WriteString(fmt.Sprintf("test %d\n", j))
}
require.NoError(t, file.Close())
requireEventually(t, func() bool {
return testutil.CollectAndCount(registry, "promtail_read_lines_total") == 1
}, "expected 1 read_lines_total metric")
requireEventually(t, func() bool {
return testutil.CollectAndCount(registry, "promtail_read_bytes_total") == 1
}, "expected 1 read_bytes_total metric")
requireEventually(t, func() bool {
return testutil.ToFloat64(metrics.readLines) == 5
}, "expected 5 read_lines_total")
requireEventually(t, func() bool {
return testutil.ToFloat64(metrics.totalBytes) == 35
}, "expected 35 total_bytes")
requireEventually(t, func() bool {
return testutil.ToFloat64(metrics.readBytes) == 35
}, "expected 35 read_bytes")
// Concurrently stop the target and remove the file
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
sleepRandomDuration(time.Millisecond * 10)
target.Stop()
wg.Done()
}()
go func() {
sleepRandomDuration(time.Millisecond * 10)
_ = os.Remove(logFile)
wg.Done()
}()
wg.Wait()
requireEventually(t, func() bool {
return testutil.CollectAndCount(registry, "promtail_read_bytes_total") == 0
}, "expected read_bytes_total metric to be cleaned up")
requireEventually(t, func() bool {
return testutil.CollectAndCount(registry, "promtail_file_bytes_total") == 0
}, "expected file_bytes_total metric to be cleaned up")
}
ps.Stop()
}
func TestFileTargetPathExclusion(t *testing.T) {
@ -325,12 +431,12 @@ func TestFileTargetPathExclusion(t *testing.T) {
assert.Equal(t, 3, len(target.readers),
"Expected tails to be 3 at this point in the test...",
)
require.Eventually(t, func() bool {
requireEventually(t, func() bool {
return receivedStartWatch.Load() == 2
}, time.Second*10, time.Millisecond*1, "Expected received starting watch event to be 2 at this point in the test...")
require.Eventually(t, func() bool {
}, "Expected received starting watch event to be 2 at this point in the test...")
requireEventually(t, func() bool {
return receivedStopWatch.Load() == 0
}, time.Second*10, time.Millisecond*1, "Expected received stopping watch event to be 0 at this point in the test...")
}, "Expected received stopping watch event to be 0 at this point in the test...")
// Remove the first directory, other tailer should stop and its watchers should go away.
// Only the non-excluded `logDir2` should be watched.
@ -346,12 +452,12 @@ func TestFileTargetPathExclusion(t *testing.T) {
assert.Equal(t, 1, len(target.readers),
"Expected tails to be 1 at this point in the test...",
)
require.Eventually(t, func() bool {
requireEventually(t, func() bool {
return receivedStartWatch.Load() == 2
}, time.Second*10, time.Millisecond*1, "Expected received starting watch event to still be 2 at this point in the test...")
require.Eventually(t, func() bool {
}, "Expected received starting watch event to still be 2 at this point in the test...")
requireEventually(t, func() bool {
return receivedStopWatch.Load() == 1
}, time.Second*10, time.Millisecond*1, "Expected received stopping watch event to be 1 at this point in the test...")
}, "Expected received stopping watch event to be 1 at this point in the test...")
require.NoError(t, os.RemoveAll(logDir2))
require.NoError(t, os.RemoveAll(logDir3))
@ -421,9 +527,9 @@ func TestHandleFileCreationEvent(t *testing.T) {
Name: logFile,
Op: fsnotify.Create,
}
require.Eventually(t, func() bool {
requireEventually(t, func() bool {
return len(target.readers) == 1
}, time.Second*10, time.Millisecond*1, "Expected tails to be 1 at this point in the test...")
}, "Expected tails to be 1 at this point in the test...")
}
func TestToStopTailing(t *testing.T) {
@ -509,3 +615,12 @@ func TestMissing(t *testing.T) {
}
}
func requireEventually(t *testing.T, f func() bool, msg string) {
t.Helper()
require.Eventually(t, f, time.Second*10, time.Millisecond, msg)
}
func sleepRandomDuration(maxDuration time.Duration) {
time.Sleep(time.Duration(rand.Int63n(int64(maxDuration))))
}

@ -116,6 +116,8 @@ func (t *tailer) updatePosition() {
defer func() {
positionWait.Stop()
level.Info(t.logger).Log("msg", "position timer: exited", "path", t.path)
// NOTE: metrics must be cleaned up after the position timer exits, as MarkPositionAndSize() updates metrics.
t.cleanupMetrics()
close(t.posdone)
}()
@ -149,7 +151,6 @@ func (t *tailer) readLines() {
// This function runs in a goroutine, if it exits this tailer will never do any more tailing.
// Clean everything up.
defer func() {
t.cleanupMetrics()
t.running.Store(false)
level.Info(t.logger).Log("msg", "tail routine: exited", "path", t.path)
close(t.done)
@ -208,12 +209,14 @@ func (t *tailer) MarkPositionAndSize() error {
}
return err
}
t.metrics.totalBytes.WithLabelValues(t.path).Set(float64(size))
pos, err := t.tail.Tell()
if err != nil {
return err
}
// Update metrics and positions file all together to avoid race conditions when `t.tail` is stopped.
t.metrics.totalBytes.WithLabelValues(t.path).Set(float64(size))
t.metrics.readBytes.WithLabelValues(t.path).Set(float64(pos))
t.positions.Put(t.path, pos)

Loading…
Cancel
Save