Fix Promtail watching deadlock (#5283)

* Reuse channel from fileEventWatcher map if already exists

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Remove sync() call in FileTarget constructor

The constructor `NewFileTarget` is called from within a locked code
block by the FileTargetManager. However, the constructor also calls
`sync()` which starts and stops watching new directories. These methods
`startWatching` and `stopWatching` both send to the `targetEventHandler`
channel, causing the `FileTargetManager` to receive the event in a
goroutine (`watch()` function) running in the background.
Handling the event calls `sendFileCreateEvent` on the syncers, which
subsequently tries to write to the `fileEventWatcher` channel which is
never read because of the locked mutex.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/5297/head
Christian Haudum 4 years ago committed by GitHub
parent c88734b72c
commit 3fb572abda
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      clients/pkg/promtail/targets/file/filetarget.go
  2. 150
      clients/pkg/promtail/targets/file/filetarget_test.go
  3. 42
      clients/pkg/promtail/targets/file/filetargetmanager.go
  4. 145
      clients/pkg/promtail/targets/file/filetargetmanager_test.go

@ -105,11 +105,6 @@ func NewFileTarget(
targetEventHandler: targetEventHandler,
}
err := t.sync()
if err != nil {
return nil, errors.Wrap(err, "filetarget.sync")
}
go t.run()
return t, nil
}
@ -159,7 +154,13 @@ func (t *FileTarget) run() {
close(t.done)
}()
err := t.sync()
if err != nil {
level.Error(t.logger).Log("msg", "error running sync function", "error", err)
}
ticker := time.NewTicker(t.targetConfig.SyncPeriod)
defer ticker.Stop()
for {
select {
@ -248,7 +249,7 @@ func (t *FileTarget) startWatching(dirs map[string]struct{}) {
if _, ok := t.watches[dir]; ok {
continue
}
level.Debug(t.logger).Log("msg", "watching new directory", "directory", dir)
level.Info(t.logger).Log("msg", "watching new directory", "directory", dir)
t.targetEventHandler <- fileTargetEvent{
path: dir,
eventType: fileTargetEventWatchStart,
@ -261,7 +262,7 @@ func (t *FileTarget) stopWatching(dirs map[string]struct{}) {
if _, ok := t.watches[dir]; !ok {
continue
}
level.Debug(t.logger).Log("msg", "removing directory from watcher", "directory", dir)
level.Info(t.logger).Log("msg", "removing directory from watcher", "directory", dir)
t.targetEventHandler <- fileTargetEvent{
path: dir,
eventType: fileTargetEventWatchStop,
@ -280,7 +281,7 @@ func (t *FileTarget) startTailing(ps []string) {
continue
}
if fi.IsDir() {
level.Error(t.logger).Log("msg", "failed to tail file", "error", "file is a directory", "filename", p)
level.Info(t.logger).Log("msg", "failed to tail file", "error", "file is a directory", "filename", p)
continue
}
level.Debug(t.logger).Log("msg", "tailing new file", "filename", p)

@ -4,10 +4,12 @@ import (
"context"
"fmt"
"os"
"path/filepath"
"sort"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
@ -17,25 +19,17 @@ import (
"github.com/grafana/loki/clients/pkg/promtail/client/fake"
"github.com/grafana/loki/clients/pkg/promtail/positions"
"github.com/grafana/loki/clients/pkg/promtail/targets/testutils"
)
func TestFileTargetSync(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
testutils.InitRandom()
dirName := "/tmp/" + testutils.RandName()
positionsFileName := dirName + "/positions.yml"
logDir1 := dirName + "/log1"
logDir1File1 := logDir1 + "/test1.log"
logDir1File2 := logDir1 + "/test2.log"
err := os.MkdirAll(dirName, 0750)
if err != nil {
t.Fatal(err)
}
defer func() { _ = os.RemoveAll(dirName) }()
dirName := newTestLogDirectories(t)
positionsFileName := filepath.Join(dirName, "positions.yml")
logDir1 := filepath.Join(dirName, "log1")
logDir1File1 := filepath.Join(logDir1, "test1.log")
logDir1File2 := filepath.Join(logDir1, "test2.log")
// Set the sync period to a really long value, to guarantee the sync timer never runs, this way we know
// everything saved was done through channel notifications when target.stop() was called.
@ -74,11 +68,9 @@ func TestFileTargetSync(t *testing.T) {
}()
path := logDir1 + "/*.log"
target, err := NewFileTarget(metrics, logger, client, ps, path, nil, nil, &Config{
SyncPeriod: 10 * time.Second,
SyncPeriod: 1 * time.Minute, // assure the sync is not called by the ticker
}, nil, fakeHandler)
if err != nil {
t.Fatal(err)
}
assert.NoError(t, err)
// Start with nothing watched.
if len(target.watches) != 0 {
@ -89,12 +81,12 @@ func TestFileTargetSync(t *testing.T) {
}
// Create the base dir, still nothing watched.
if err = os.MkdirAll(logDir1, 0750); err != nil {
t.Fatal(err)
}
if err = target.sync(); err != nil {
t.Fatal(err)
}
err = os.MkdirAll(logDir1, 0750)
assert.NoError(t, err)
err = target.sync()
assert.NoError(t, err)
if len(target.watches) != 0 {
t.Fatal("Expected watches to be 0 at this point in the test...")
}
@ -104,64 +96,64 @@ func TestFileTargetSync(t *testing.T) {
// Add a file, which should create a watcher and a tailer.
_, err = os.Create(logDir1File1)
if err != nil {
t.Fatal(err)
}
if err = target.sync(); err != nil {
t.Fatal(err)
}
if len(target.watches) != 1 {
t.Fatal("Expected watches to be 1 at this point in the test...")
}
if len(target.tails) != 1 {
t.Fatal("Expected tails to be 1 at this point in the test...")
}
assert.NoError(t, err)
// Delay sync() call to make sure the filesystem watch event does not fire during sync()
time.Sleep(10 * time.Millisecond)
err = target.sync()
assert.NoError(t, err)
assert.Equal(t, 1, len(target.watches),
"Expected watches to be 1 at this point in the test...",
)
assert.Equal(t, 1, len(target.tails),
"Expected tails to be 1 at this point in the test...",
)
require.Eventually(t, func() bool {
return receivedStartWatch.Load() == 1
}, time.Second*10, time.Millisecond*1, "Expected received starting watch event to be 1 at this point in the test...")
// Add another file, should get another tailer.
_, err = os.Create(logDir1File2)
if err != nil {
t.Fatal(err)
}
if err = target.sync(); err != nil {
t.Fatal(err)
}
if len(target.watches) != 1 {
t.Fatal("Expected watches to be 1 at this point in the test...")
}
if len(target.tails) != 2 {
t.Fatal("Expected tails to be 2 at this point in the test...")
}
assert.NoError(t, err)
err = target.sync()
assert.NoError(t, err)
assert.Equal(t, 1, len(target.watches),
"Expected watches to be 1 at this point in the test...",
)
assert.Equal(t, 2, len(target.tails),
"Expected tails to be 2 at this point in the test...",
)
// Remove one of the files, tailer should stop.
if err = os.Remove(logDir1File1); err != nil {
t.Fatal(err)
}
if err = target.sync(); err != nil {
t.Fatal(err)
}
if len(target.watches) != 1 {
t.Fatal("Expected watches to be 1 at this point in the test...")
}
if len(target.tails) != 1 {
t.Fatal("Expected tails to be 1 at this point in the test...")
}
err = os.Remove(logDir1File1)
assert.NoError(t, err)
err = target.sync()
assert.NoError(t, err)
assert.Equal(t, 1, len(target.watches),
"Expected watches to be 1 at this point in the test...",
)
assert.Equal(t, 1, len(target.tails),
"Expected tails to be 1 at this point in the test...",
)
// Remove the entire directory, other tailer should stop and watcher should go away.
if err = os.RemoveAll(logDir1); err != nil {
t.Fatal(err)
}
if err = target.sync(); err != nil {
t.Fatal(err)
}
if len(target.watches) != 0 {
t.Fatal("Expected watches to be 0 at this point in the test...")
}
if len(target.tails) != 0 {
t.Fatal("Expected tails to be 0 at this point in the test...")
}
err = os.RemoveAll(logDir1)
assert.NoError(t, err)
err = target.sync()
assert.NoError(t, err)
assert.Equal(t, 0, len(target.watches),
"Expected watches to be 0 at this point in the test...",
)
assert.Equal(t, 0, len(target.tails),
"Expected tails to be 0 at this point in the test...",
)
require.Eventually(t, func() bool {
return receivedStartWatch.Load() == 1
}, time.Second*10, time.Millisecond*1, "Expected received starting watch event to be 1 at this point in the test...")
@ -177,18 +169,12 @@ func TestHandleFileCreationEvent(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
testutils.InitRandom()
dirName := "/tmp/" + testutils.RandName()
positionsFileName := dirName + "/positions.yml"
logDir := dirName + "/log"
logFile := logDir + "/test1.log"
dirName := newTestLogDirectories(t)
positionsFileName := filepath.Join(dirName, "positions.yml")
logDir := filepath.Join(dirName, "log")
logFile := filepath.Join(logDir, "test1.log")
err := os.MkdirAll(dirName, 0750)
if err != nil {
t.Fatal(err)
}
defer func() { _ = os.RemoveAll(dirName) }()
if err = os.MkdirAll(logDir, 0750); err != nil {
if err := os.MkdirAll(logDir, 0750); err != nil {
t.Fatal(err)
}

@ -40,12 +40,13 @@ const (
type FileTargetManager struct {
log log.Logger
quit context.CancelFunc
done chan struct{}
syncers map[string]*targetSyncer
manager *discovery.Manager
watcher *fsnotify.Watcher
targetEventHandler chan fileTargetEvent
wg sync.WaitGroup
}
// NewFileTargetManager creates a new TargetManager.
@ -70,7 +71,6 @@ func NewFileTargetManager(
tm := &FileTargetManager{
log: logger,
quit: quit,
done: make(chan struct{}),
watcher: watcher,
targetEventHandler: make(chan fileTargetEvent),
syncers: map[string]*targetSyncer{},
@ -134,14 +134,19 @@ func NewFileTargetManager(
configs[cfg.JobName] = cfg.ServiceDiscoveryConfig.Configs()
}
tm.wg.Add(3)
go tm.run(ctx)
go tm.watch(ctx)
go tm.watchTargetEvents(ctx)
go tm.watchFsEvents(ctx)
go util.LogError("running target manager", tm.manager.Run)
return tm, tm.manager.ApplyConfig(configs)
}
func (tm *FileTargetManager) watch(ctx context.Context) {
func (tm *FileTargetManager) watchTargetEvents(ctx context.Context) {
defer tm.wg.Done()
for {
select {
case event := <-tm.targetEventHandler:
@ -155,9 +160,21 @@ func (tm *FileTargetManager) watch(ctx context.Context) {
level.Error(tm.log).Log("msg", " failed to remove directory from watcher", "error", err)
}
}
case <-ctx.Done():
return
}
}
}
func (tm *FileTargetManager) watchFsEvents(ctx context.Context) {
defer tm.wg.Done()
for {
select {
case event := <-tm.watcher.Events:
// we only care about Create events
if event.Op == fsnotify.Create {
level.Info(tm.log).Log("msg", "received file watcher event", "name", event.Name, "op", event.Op.String())
for _, s := range tm.syncers {
s.sendFileCreateEvent(event)
}
@ -171,7 +188,8 @@ func (tm *FileTargetManager) watch(ctx context.Context) {
}
func (tm *FileTargetManager) run(ctx context.Context) {
defer close(tm.done)
defer tm.wg.Done()
for {
select {
case targetGroups := <-tm.manager.SyncCh():
@ -197,7 +215,8 @@ func (tm *FileTargetManager) Ready() bool {
// Stop the TargetManager.
func (tm *FileTargetManager) Stop() {
tm.quit()
<-tm.done
tm.wg.Wait()
for _, s := range tm.syncers {
s.stop()
}
@ -307,9 +326,14 @@ func (s *targetSyncer) sync(groups []*targetgroup.Group, targetEventHandler chan
}
level.Info(s.log).Log("msg", "Adding target", "key", key)
watcher := make(chan fsnotify.Event)
s.fileEventWatchers[string(path)] = watcher
t, err := s.newTarget(string(path), labels, discoveredLabels, watcher, targetEventHandler)
wkey := string(path)
watcher, ok := s.fileEventWatchers[wkey]
if !ok {
watcher = make(chan fsnotify.Event)
s.fileEventWatchers[wkey] = watcher
}
t, err := s.newTarget(wkey, labels, discoveredLabels, watcher, targetEventHandler)
if err != nil {
dropped = append(dropped, target.NewDroppedTarget(fmt.Sprintf("Failed to create target: %s", err.Error()), discoveredLabels))
level.Error(s.log).Log("msg", "Failed to create target", "key", key, "error", err)

@ -1,7 +1,9 @@
package file
import (
"fmt"
"os"
"path/filepath"
"testing"
"time"
@ -18,33 +20,21 @@ import (
"github.com/grafana/loki/clients/pkg/promtail/client/fake"
"github.com/grafana/loki/clients/pkg/promtail/positions"
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
"github.com/grafana/loki/clients/pkg/promtail/targets/testutils"
)
func newTestLogDirectories() (string, func(), error) {
testutils.InitRandom()
dirName := "/tmp/" + testutils.RandName()
logFileDir := dirName + "/logdir"
err := os.MkdirAll(dirName, 0750)
if err != nil {
return "", nil, err
}
err = os.MkdirAll(logFileDir, 0750)
if err != nil {
return "", nil, err
}
return logFileDir, func() {
_ = os.RemoveAll(dirName)
}, nil
func newTestLogDirectories(t *testing.T) string {
tmpDir := t.TempDir()
logFileDir := filepath.Join(tmpDir, "logs")
err := os.MkdirAll(logFileDir, 0750)
assert.NoError(t, err)
return logFileDir
}
func newTestPositions(logger log.Logger, filePath string) (positions.Positions, error) {
// Set the sync period to a really long value, to guarantee the sync timer never runs, this way we know
// everything saved was done through channel notifications when target.stop() was called.
pos, err := positions.New(logger, positions.Config{
SyncPeriod: 10 * time.Second,
SyncPeriod: 60 * time.Second,
PositionsFile: filePath,
})
if err != nil {
@ -76,7 +66,7 @@ func newTestFileTargetManager(logger log.Logger, client api.EntryHandler, positi
},
}
tc := &Config{
SyncPeriod: 10 * time.Second,
SyncPeriod: 1 * time.Second,
}
metrics := NewMetrics(nil)
@ -90,14 +80,10 @@ func newTestFileTargetManager(logger log.Logger, client api.EntryHandler, positi
func TestLongPositionsSyncDelayStillSavesCorrectPosition(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
logDirName, cleanup, err := newTestLogDirectories()
if err != nil {
t.Fatal(err)
}
defer cleanup()
logDirName := newTestLogDirectories(t)
logFile := logDirName + "test.log"
positionsFileName := logDirName + "/positions.yml"
logFile := filepath.Join(logDirName, "test.log")
positionsFileName := filepath.Join(logDirName, "positions.yml")
ps, err := newTestPositions(logger, positionsFileName)
if err != nil {
t.Fatal(err)
@ -106,7 +92,7 @@ func TestLongPositionsSyncDelayStillSavesCorrectPosition(t *testing.T) {
client := fake.New(func() {})
defer client.Stop()
ftm, err := newTestFileTargetManager(logger, client, ps, logDirName+"*")
ftm, err := newTestFileTargetManager(logger, client, ps, logDirName+"/*")
if err != nil {
t.Fatal(err)
}
@ -154,14 +140,10 @@ func TestLongPositionsSyncDelayStillSavesCorrectPosition(t *testing.T) {
func TestWatchEntireDirectory(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
logDirName, cleanup, err := newTestLogDirectories()
if err != nil {
t.Fatal(err)
}
defer cleanup()
logDirName := newTestLogDirectories(t)
logFile := logDirName + "test.log"
positionsFileName := logDirName + "/positions.yml"
logFile := filepath.Join(logDirName, "test.log")
positionsFileName := filepath.Join(logDirName, "positions.yml")
ps, err := newTestPositions(logger, positionsFileName)
if err != nil {
t.Fatal(err)
@ -170,7 +152,7 @@ func TestWatchEntireDirectory(t *testing.T) {
client := fake.New(func() {})
defer client.Stop()
ftm, err := newTestFileTargetManager(logger, client, ps, logDirName+"*")
ftm, err := newTestFileTargetManager(logger, client, ps, logDirName+"/*")
if err != nil {
t.Fatal(err)
}
@ -218,14 +200,10 @@ func TestWatchEntireDirectory(t *testing.T) {
func TestFileRolls(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
logDirName, cleanup, err := newTestLogDirectories()
if err != nil {
t.Fatal(err)
}
defer cleanup()
logDirName := newTestLogDirectories(t)
logFile := logDirName + "/test.log"
positionsFileName := logDirName + "/positions.yml"
logFile := filepath.Join(logDirName, "test.log")
positionsFileName := filepath.Join(logDirName, "positions.yml")
ps, err := newTestPositions(logger, positionsFileName)
if err != nil {
t.Fatal(err)
@ -255,7 +233,7 @@ func TestFileRolls(t *testing.T) {
}, time.Second*10, time.Millisecond*1)
// Rename the log file to something not in the pattern, then create a new file with the same name.
err = os.Rename(logFile, logDirName+"/test.log.1")
err = os.Rename(logFile, filepath.Join(logDirName, "test.log.1"))
if err != nil {
t.Fatal("Failed to rename log file for test", err)
}
@ -295,14 +273,10 @@ func TestFileRolls(t *testing.T) {
func TestResumesWhereLeftOff(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
logDirName, cleanup, err := newTestLogDirectories()
if err != nil {
t.Fatal(err)
}
defer cleanup()
logDirName := newTestLogDirectories(t)
logFile := logDirName + "/test.log"
positionsFileName := logDirName + "/positions.yml"
logFile := filepath.Join(logDirName, "test.log")
positionsFileName := filepath.Join(logDirName, "positions.yml")
ps, err := newTestPositions(logger, positionsFileName)
if err != nil {
t.Fatal(err)
@ -378,15 +352,11 @@ func TestResumesWhereLeftOff(t *testing.T) {
func TestGlobWithMultipleFiles(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
logDirName, cleanup, err := newTestLogDirectories()
if err != nil {
t.Fatal(err)
}
defer cleanup()
logDirName := newTestLogDirectories(t)
logFile1 := logDirName + "/test.log"
logFile2 := logDirName + "/dirt.log"
positionsFileName := logDirName + "/positions.yml"
logFile1 := filepath.Join(logDirName, "test.log")
logFile2 := filepath.Join(logDirName, "dirt.log")
positionsFileName := filepath.Join(logDirName, "positions.yml")
ps, err := newTestPositions(logger, positionsFileName)
if err != nil {
t.Fatal(err)
@ -500,3 +470,60 @@ func TestDeadlockTargetManager(t *testing.T) {
require.Equal(t, len(syncer.targets), 0)
require.Equal(t, len(syncer.fileEventWatchers), 0)
}
func TestDeadlockStartWatchingDuringSync(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
oldLogDir := newTestLogDirectories(t)
newLogDir := newTestLogDirectories(t)
positionsFileName := filepath.Join(oldLogDir, "positions.yml")
ps, err := newTestPositions(logger, positionsFileName)
assert.NoError(t, err)
client := fake.New(func() {})
defer client.Stop()
ftm, err := newTestFileTargetManager(logger, client, ps, oldLogDir+"/*")
assert.NoError(t, err)
done := make(chan struct{})
go func() {
for i := 0; i < 10; i++ {
dir := filepath.Join(newLogDir, fmt.Sprintf("%d", i))
err := os.MkdirAll(dir, 0750)
assert.NoError(t, err)
time.Sleep(1 * time.Millisecond)
for j := 0; j < 100; j++ {
logFile := filepath.Join(dir, fmt.Sprintf("test%d.log", j))
f, err := os.Create(logFile)
assert.NoError(t, err)
_, err = f.WriteString("just a log line\n")
assert.NoError(t, err)
err = f.Close()
assert.NoError(t, err)
time.Sleep(1 * time.Millisecond)
}
}
close(done)
}()
tg := targetgroup.Group{
Targets: []model.LabelSet{{
"localhost": "",
}},
Labels: model.LabelSet{
"job": "varlogs",
"match": "true",
"__path__": model.LabelValue(newLogDir + "/**/*.log"),
},
Source: "",
}
for i := 0; i < 10; i++ {
ftm.syncers[""].sync([]*targetgroup.Group{&tg}, ftm.targetEventHandler)
}
<-done
ftm.Stop()
ps.Stop()
}

Loading…
Cancel
Save