diff --git a/clients/pkg/promtail/targets/file/filetarget.go b/clients/pkg/promtail/targets/file/filetarget.go index 5a564cb967..10a344bfb2 100644 --- a/clients/pkg/promtail/targets/file/filetarget.go +++ b/clients/pkg/promtail/targets/file/filetarget.go @@ -105,11 +105,6 @@ func NewFileTarget( targetEventHandler: targetEventHandler, } - err := t.sync() - if err != nil { - return nil, errors.Wrap(err, "filetarget.sync") - } - go t.run() return t, nil } @@ -159,7 +154,13 @@ func (t *FileTarget) run() { close(t.done) }() + err := t.sync() + if err != nil { + level.Error(t.logger).Log("msg", "error running sync function", "error", err) + } + ticker := time.NewTicker(t.targetConfig.SyncPeriod) + defer ticker.Stop() for { select { @@ -248,7 +249,7 @@ func (t *FileTarget) startWatching(dirs map[string]struct{}) { if _, ok := t.watches[dir]; ok { continue } - level.Debug(t.logger).Log("msg", "watching new directory", "directory", dir) + level.Info(t.logger).Log("msg", "watching new directory", "directory", dir) t.targetEventHandler <- fileTargetEvent{ path: dir, eventType: fileTargetEventWatchStart, @@ -261,7 +262,7 @@ func (t *FileTarget) stopWatching(dirs map[string]struct{}) { if _, ok := t.watches[dir]; !ok { continue } - level.Debug(t.logger).Log("msg", "removing directory from watcher", "directory", dir) + level.Info(t.logger).Log("msg", "removing directory from watcher", "directory", dir) t.targetEventHandler <- fileTargetEvent{ path: dir, eventType: fileTargetEventWatchStop, @@ -280,7 +281,7 @@ func (t *FileTarget) startTailing(ps []string) { continue } if fi.IsDir() { - level.Error(t.logger).Log("msg", "failed to tail file", "error", "file is a directory", "filename", p) + level.Info(t.logger).Log("msg", "failed to tail file", "error", "file is a directory", "filename", p) continue } level.Debug(t.logger).Log("msg", "tailing new file", "filename", p) diff --git a/clients/pkg/promtail/targets/file/filetarget_test.go b/clients/pkg/promtail/targets/file/filetarget_test.go index 0523d6d97a..b4647a3128 100644 --- a/clients/pkg/promtail/targets/file/filetarget_test.go +++ b/clients/pkg/promtail/targets/file/filetarget_test.go @@ -4,10 +4,12 @@ import ( "context" "fmt" "os" + "path/filepath" "sort" "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/atomic" @@ -17,25 +19,17 @@ import ( "github.com/grafana/loki/clients/pkg/promtail/client/fake" "github.com/grafana/loki/clients/pkg/promtail/positions" - "github.com/grafana/loki/clients/pkg/promtail/targets/testutils" ) func TestFileTargetSync(t *testing.T) { w := log.NewSyncWriter(os.Stderr) logger := log.NewLogfmtLogger(w) - testutils.InitRandom() - dirName := "/tmp/" + testutils.RandName() - positionsFileName := dirName + "/positions.yml" - logDir1 := dirName + "/log1" - logDir1File1 := logDir1 + "/test1.log" - logDir1File2 := logDir1 + "/test2.log" - - err := os.MkdirAll(dirName, 0750) - if err != nil { - t.Fatal(err) - } - defer func() { _ = os.RemoveAll(dirName) }() + dirName := newTestLogDirectories(t) + positionsFileName := filepath.Join(dirName, "positions.yml") + logDir1 := filepath.Join(dirName, "log1") + logDir1File1 := filepath.Join(logDir1, "test1.log") + logDir1File2 := filepath.Join(logDir1, "test2.log") // Set the sync period to a really long value, to guarantee the sync timer never runs, this way we know // everything saved was done through channel notifications when target.stop() was called. @@ -74,11 +68,9 @@ func TestFileTargetSync(t *testing.T) { }() path := logDir1 + "/*.log" target, err := NewFileTarget(metrics, logger, client, ps, path, nil, nil, &Config{ - SyncPeriod: 10 * time.Second, + SyncPeriod: 1 * time.Minute, // assure the sync is not called by the ticker }, nil, fakeHandler) - if err != nil { - t.Fatal(err) - } + assert.NoError(t, err) // Start with nothing watched. if len(target.watches) != 0 { @@ -89,12 +81,12 @@ func TestFileTargetSync(t *testing.T) { } // Create the base dir, still nothing watched. - if err = os.MkdirAll(logDir1, 0750); err != nil { - t.Fatal(err) - } - if err = target.sync(); err != nil { - t.Fatal(err) - } + err = os.MkdirAll(logDir1, 0750) + assert.NoError(t, err) + + err = target.sync() + assert.NoError(t, err) + if len(target.watches) != 0 { t.Fatal("Expected watches to be 0 at this point in the test...") } @@ -104,64 +96,64 @@ func TestFileTargetSync(t *testing.T) { // Add a file, which should create a watcher and a tailer. _, err = os.Create(logDir1File1) - if err != nil { - t.Fatal(err) - } - if err = target.sync(); err != nil { - t.Fatal(err) - } - if len(target.watches) != 1 { - t.Fatal("Expected watches to be 1 at this point in the test...") - } - if len(target.tails) != 1 { - t.Fatal("Expected tails to be 1 at this point in the test...") - } + assert.NoError(t, err) + + // Delay sync() call to make sure the filesystem watch event does not fire during sync() + time.Sleep(10 * time.Millisecond) + err = target.sync() + assert.NoError(t, err) + + assert.Equal(t, 1, len(target.watches), + "Expected watches to be 1 at this point in the test...", + ) + assert.Equal(t, 1, len(target.tails), + "Expected tails to be 1 at this point in the test...", + ) require.Eventually(t, func() bool { return receivedStartWatch.Load() == 1 }, time.Second*10, time.Millisecond*1, "Expected received starting watch event to be 1 at this point in the test...") // Add another file, should get another tailer. _, err = os.Create(logDir1File2) - if err != nil { - t.Fatal(err) - } - if err = target.sync(); err != nil { - t.Fatal(err) - } - if len(target.watches) != 1 { - t.Fatal("Expected watches to be 1 at this point in the test...") - } - if len(target.tails) != 2 { - t.Fatal("Expected tails to be 2 at this point in the test...") - } + assert.NoError(t, err) + + err = target.sync() + assert.NoError(t, err) + + assert.Equal(t, 1, len(target.watches), + "Expected watches to be 1 at this point in the test...", + ) + assert.Equal(t, 2, len(target.tails), + "Expected tails to be 2 at this point in the test...", + ) // Remove one of the files, tailer should stop. - if err = os.Remove(logDir1File1); err != nil { - t.Fatal(err) - } - if err = target.sync(); err != nil { - t.Fatal(err) - } - if len(target.watches) != 1 { - t.Fatal("Expected watches to be 1 at this point in the test...") - } - if len(target.tails) != 1 { - t.Fatal("Expected tails to be 1 at this point in the test...") - } + err = os.Remove(logDir1File1) + assert.NoError(t, err) + + err = target.sync() + assert.NoError(t, err) + + assert.Equal(t, 1, len(target.watches), + "Expected watches to be 1 at this point in the test...", + ) + assert.Equal(t, 1, len(target.tails), + "Expected tails to be 1 at this point in the test...", + ) // Remove the entire directory, other tailer should stop and watcher should go away. - if err = os.RemoveAll(logDir1); err != nil { - t.Fatal(err) - } - if err = target.sync(); err != nil { - t.Fatal(err) - } - if len(target.watches) != 0 { - t.Fatal("Expected watches to be 0 at this point in the test...") - } - if len(target.tails) != 0 { - t.Fatal("Expected tails to be 0 at this point in the test...") - } + err = os.RemoveAll(logDir1) + assert.NoError(t, err) + + err = target.sync() + assert.NoError(t, err) + + assert.Equal(t, 0, len(target.watches), + "Expected watches to be 0 at this point in the test...", + ) + assert.Equal(t, 0, len(target.tails), + "Expected tails to be 0 at this point in the test...", + ) require.Eventually(t, func() bool { return receivedStartWatch.Load() == 1 }, time.Second*10, time.Millisecond*1, "Expected received starting watch event to be 1 at this point in the test...") @@ -177,18 +169,12 @@ func TestHandleFileCreationEvent(t *testing.T) { w := log.NewSyncWriter(os.Stderr) logger := log.NewLogfmtLogger(w) - testutils.InitRandom() - dirName := "/tmp/" + testutils.RandName() - positionsFileName := dirName + "/positions.yml" - logDir := dirName + "/log" - logFile := logDir + "/test1.log" + dirName := newTestLogDirectories(t) + positionsFileName := filepath.Join(dirName, "positions.yml") + logDir := filepath.Join(dirName, "log") + logFile := filepath.Join(logDir, "test1.log") - err := os.MkdirAll(dirName, 0750) - if err != nil { - t.Fatal(err) - } - defer func() { _ = os.RemoveAll(dirName) }() - if err = os.MkdirAll(logDir, 0750); err != nil { + if err := os.MkdirAll(logDir, 0750); err != nil { t.Fatal(err) } diff --git a/clients/pkg/promtail/targets/file/filetargetmanager.go b/clients/pkg/promtail/targets/file/filetargetmanager.go index 3bf4c336f0..90a404f4cb 100644 --- a/clients/pkg/promtail/targets/file/filetargetmanager.go +++ b/clients/pkg/promtail/targets/file/filetargetmanager.go @@ -40,12 +40,13 @@ const ( type FileTargetManager struct { log log.Logger quit context.CancelFunc - done chan struct{} syncers map[string]*targetSyncer manager *discovery.Manager watcher *fsnotify.Watcher targetEventHandler chan fileTargetEvent + + wg sync.WaitGroup } // NewFileTargetManager creates a new TargetManager. @@ -70,7 +71,6 @@ func NewFileTargetManager( tm := &FileTargetManager{ log: logger, quit: quit, - done: make(chan struct{}), watcher: watcher, targetEventHandler: make(chan fileTargetEvent), syncers: map[string]*targetSyncer{}, @@ -134,14 +134,19 @@ func NewFileTargetManager( configs[cfg.JobName] = cfg.ServiceDiscoveryConfig.Configs() } + tm.wg.Add(3) go tm.run(ctx) - go tm.watch(ctx) + go tm.watchTargetEvents(ctx) + go tm.watchFsEvents(ctx) + go util.LogError("running target manager", tm.manager.Run) return tm, tm.manager.ApplyConfig(configs) } -func (tm *FileTargetManager) watch(ctx context.Context) { +func (tm *FileTargetManager) watchTargetEvents(ctx context.Context) { + defer tm.wg.Done() + for { select { case event := <-tm.targetEventHandler: @@ -155,9 +160,21 @@ func (tm *FileTargetManager) watch(ctx context.Context) { level.Error(tm.log).Log("msg", " failed to remove directory from watcher", "error", err) } } + case <-ctx.Done(): + return + } + } +} + +func (tm *FileTargetManager) watchFsEvents(ctx context.Context) { + defer tm.wg.Done() + + for { + select { case event := <-tm.watcher.Events: // we only care about Create events if event.Op == fsnotify.Create { + level.Info(tm.log).Log("msg", "received file watcher event", "name", event.Name, "op", event.Op.String()) for _, s := range tm.syncers { s.sendFileCreateEvent(event) } @@ -171,7 +188,8 @@ func (tm *FileTargetManager) watch(ctx context.Context) { } func (tm *FileTargetManager) run(ctx context.Context) { - defer close(tm.done) + defer tm.wg.Done() + for { select { case targetGroups := <-tm.manager.SyncCh(): @@ -197,7 +215,8 @@ func (tm *FileTargetManager) Ready() bool { // Stop the TargetManager. func (tm *FileTargetManager) Stop() { tm.quit() - <-tm.done + tm.wg.Wait() + for _, s := range tm.syncers { s.stop() } @@ -307,9 +326,14 @@ func (s *targetSyncer) sync(groups []*targetgroup.Group, targetEventHandler chan } level.Info(s.log).Log("msg", "Adding target", "key", key) - watcher := make(chan fsnotify.Event) - s.fileEventWatchers[string(path)] = watcher - t, err := s.newTarget(string(path), labels, discoveredLabels, watcher, targetEventHandler) + + wkey := string(path) + watcher, ok := s.fileEventWatchers[wkey] + if !ok { + watcher = make(chan fsnotify.Event) + s.fileEventWatchers[wkey] = watcher + } + t, err := s.newTarget(wkey, labels, discoveredLabels, watcher, targetEventHandler) if err != nil { dropped = append(dropped, target.NewDroppedTarget(fmt.Sprintf("Failed to create target: %s", err.Error()), discoveredLabels)) level.Error(s.log).Log("msg", "Failed to create target", "key", key, "error", err) diff --git a/clients/pkg/promtail/targets/file/filetargetmanager_test.go b/clients/pkg/promtail/targets/file/filetargetmanager_test.go index 18db511ef3..b23a549317 100644 --- a/clients/pkg/promtail/targets/file/filetargetmanager_test.go +++ b/clients/pkg/promtail/targets/file/filetargetmanager_test.go @@ -1,7 +1,9 @@ package file import ( + "fmt" "os" + "path/filepath" "testing" "time" @@ -18,33 +20,21 @@ import ( "github.com/grafana/loki/clients/pkg/promtail/client/fake" "github.com/grafana/loki/clients/pkg/promtail/positions" "github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" - "github.com/grafana/loki/clients/pkg/promtail/targets/testutils" ) -func newTestLogDirectories() (string, func(), error) { - testutils.InitRandom() - dirName := "/tmp/" + testutils.RandName() - logFileDir := dirName + "/logdir" - - err := os.MkdirAll(dirName, 0750) - if err != nil { - return "", nil, err - } - err = os.MkdirAll(logFileDir, 0750) - if err != nil { - return "", nil, err - } - - return logFileDir, func() { - _ = os.RemoveAll(dirName) - }, nil +func newTestLogDirectories(t *testing.T) string { + tmpDir := t.TempDir() + logFileDir := filepath.Join(tmpDir, "logs") + err := os.MkdirAll(logFileDir, 0750) + assert.NoError(t, err) + return logFileDir } func newTestPositions(logger log.Logger, filePath string) (positions.Positions, error) { // Set the sync period to a really long value, to guarantee the sync timer never runs, this way we know // everything saved was done through channel notifications when target.stop() was called. pos, err := positions.New(logger, positions.Config{ - SyncPeriod: 10 * time.Second, + SyncPeriod: 60 * time.Second, PositionsFile: filePath, }) if err != nil { @@ -76,7 +66,7 @@ func newTestFileTargetManager(logger log.Logger, client api.EntryHandler, positi }, } tc := &Config{ - SyncPeriod: 10 * time.Second, + SyncPeriod: 1 * time.Second, } metrics := NewMetrics(nil) @@ -90,14 +80,10 @@ func newTestFileTargetManager(logger log.Logger, client api.EntryHandler, positi func TestLongPositionsSyncDelayStillSavesCorrectPosition(t *testing.T) { w := log.NewSyncWriter(os.Stderr) logger := log.NewLogfmtLogger(w) - logDirName, cleanup, err := newTestLogDirectories() - if err != nil { - t.Fatal(err) - } - defer cleanup() + logDirName := newTestLogDirectories(t) - logFile := logDirName + "test.log" - positionsFileName := logDirName + "/positions.yml" + logFile := filepath.Join(logDirName, "test.log") + positionsFileName := filepath.Join(logDirName, "positions.yml") ps, err := newTestPositions(logger, positionsFileName) if err != nil { t.Fatal(err) @@ -106,7 +92,7 @@ func TestLongPositionsSyncDelayStillSavesCorrectPosition(t *testing.T) { client := fake.New(func() {}) defer client.Stop() - ftm, err := newTestFileTargetManager(logger, client, ps, logDirName+"*") + ftm, err := newTestFileTargetManager(logger, client, ps, logDirName+"/*") if err != nil { t.Fatal(err) } @@ -154,14 +140,10 @@ func TestLongPositionsSyncDelayStillSavesCorrectPosition(t *testing.T) { func TestWatchEntireDirectory(t *testing.T) { w := log.NewSyncWriter(os.Stderr) logger := log.NewLogfmtLogger(w) - logDirName, cleanup, err := newTestLogDirectories() - if err != nil { - t.Fatal(err) - } - defer cleanup() + logDirName := newTestLogDirectories(t) - logFile := logDirName + "test.log" - positionsFileName := logDirName + "/positions.yml" + logFile := filepath.Join(logDirName, "test.log") + positionsFileName := filepath.Join(logDirName, "positions.yml") ps, err := newTestPositions(logger, positionsFileName) if err != nil { t.Fatal(err) @@ -170,7 +152,7 @@ func TestWatchEntireDirectory(t *testing.T) { client := fake.New(func() {}) defer client.Stop() - ftm, err := newTestFileTargetManager(logger, client, ps, logDirName+"*") + ftm, err := newTestFileTargetManager(logger, client, ps, logDirName+"/*") if err != nil { t.Fatal(err) } @@ -218,14 +200,10 @@ func TestWatchEntireDirectory(t *testing.T) { func TestFileRolls(t *testing.T) { w := log.NewSyncWriter(os.Stderr) logger := log.NewLogfmtLogger(w) - logDirName, cleanup, err := newTestLogDirectories() - if err != nil { - t.Fatal(err) - } - defer cleanup() + logDirName := newTestLogDirectories(t) - logFile := logDirName + "/test.log" - positionsFileName := logDirName + "/positions.yml" + logFile := filepath.Join(logDirName, "test.log") + positionsFileName := filepath.Join(logDirName, "positions.yml") ps, err := newTestPositions(logger, positionsFileName) if err != nil { t.Fatal(err) @@ -255,7 +233,7 @@ func TestFileRolls(t *testing.T) { }, time.Second*10, time.Millisecond*1) // Rename the log file to something not in the pattern, then create a new file with the same name. - err = os.Rename(logFile, logDirName+"/test.log.1") + err = os.Rename(logFile, filepath.Join(logDirName, "test.log.1")) if err != nil { t.Fatal("Failed to rename log file for test", err) } @@ -295,14 +273,10 @@ func TestFileRolls(t *testing.T) { func TestResumesWhereLeftOff(t *testing.T) { w := log.NewSyncWriter(os.Stderr) logger := log.NewLogfmtLogger(w) - logDirName, cleanup, err := newTestLogDirectories() - if err != nil { - t.Fatal(err) - } - defer cleanup() + logDirName := newTestLogDirectories(t) - logFile := logDirName + "/test.log" - positionsFileName := logDirName + "/positions.yml" + logFile := filepath.Join(logDirName, "test.log") + positionsFileName := filepath.Join(logDirName, "positions.yml") ps, err := newTestPositions(logger, positionsFileName) if err != nil { t.Fatal(err) @@ -378,15 +352,11 @@ func TestResumesWhereLeftOff(t *testing.T) { func TestGlobWithMultipleFiles(t *testing.T) { w := log.NewSyncWriter(os.Stderr) logger := log.NewLogfmtLogger(w) - logDirName, cleanup, err := newTestLogDirectories() - if err != nil { - t.Fatal(err) - } - defer cleanup() + logDirName := newTestLogDirectories(t) - logFile1 := logDirName + "/test.log" - logFile2 := logDirName + "/dirt.log" - positionsFileName := logDirName + "/positions.yml" + logFile1 := filepath.Join(logDirName, "test.log") + logFile2 := filepath.Join(logDirName, "dirt.log") + positionsFileName := filepath.Join(logDirName, "positions.yml") ps, err := newTestPositions(logger, positionsFileName) if err != nil { t.Fatal(err) @@ -500,3 +470,60 @@ func TestDeadlockTargetManager(t *testing.T) { require.Equal(t, len(syncer.targets), 0) require.Equal(t, len(syncer.fileEventWatchers), 0) } + +func TestDeadlockStartWatchingDuringSync(t *testing.T) { + w := log.NewSyncWriter(os.Stderr) + logger := log.NewLogfmtLogger(w) + oldLogDir := newTestLogDirectories(t) + newLogDir := newTestLogDirectories(t) + + positionsFileName := filepath.Join(oldLogDir, "positions.yml") + ps, err := newTestPositions(logger, positionsFileName) + assert.NoError(t, err) + + client := fake.New(func() {}) + defer client.Stop() + + ftm, err := newTestFileTargetManager(logger, client, ps, oldLogDir+"/*") + assert.NoError(t, err) + + done := make(chan struct{}) + go func() { + for i := 0; i < 10; i++ { + dir := filepath.Join(newLogDir, fmt.Sprintf("%d", i)) + err := os.MkdirAll(dir, 0750) + assert.NoError(t, err) + time.Sleep(1 * time.Millisecond) + for j := 0; j < 100; j++ { + logFile := filepath.Join(dir, fmt.Sprintf("test%d.log", j)) + f, err := os.Create(logFile) + assert.NoError(t, err) + _, err = f.WriteString("just a log line\n") + assert.NoError(t, err) + err = f.Close() + assert.NoError(t, err) + time.Sleep(1 * time.Millisecond) + } + } + close(done) + }() + + tg := targetgroup.Group{ + Targets: []model.LabelSet{{ + "localhost": "", + }}, + Labels: model.LabelSet{ + "job": "varlogs", + "match": "true", + "__path__": model.LabelValue(newLogDir + "/**/*.log"), + }, + Source: "", + } + for i := 0; i < 10; i++ { + ftm.syncers[""].sync([]*targetgroup.Group{&tg}, ftm.targetEventHandler) + } + <-done + + ftm.Stop() + ps.Stop() +}