diff --git a/pkg/promtail/position.go b/pkg/promtail/position.go index 98bbeb21fb..8a93d989dc 100644 --- a/pkg/promtail/position.go +++ b/pkg/promtail/position.go @@ -34,6 +34,7 @@ type Positions struct { mtx sync.Mutex positions map[string]int64 quit chan struct{} + done chan struct{} } type positionsFile struct { @@ -52,6 +53,7 @@ func NewPositions(logger log.Logger, cfg PositionsConfig) (*Positions, error) { cfg: cfg, positions: positions, quit: make(chan struct{}), + done: make(chan struct{}), } go p.run() @@ -61,6 +63,7 @@ func NewPositions(logger log.Logger, cfg PositionsConfig) (*Positions, error) { // Stop the Position tracker. func (p *Positions) Stop() { close(p.quit) + <-p.done } // Put records (asynchronously) how far we've read through a file. @@ -85,7 +88,11 @@ func (p *Positions) Remove(path string) { } func (p *Positions) run() { - defer p.save() + defer func() { + p.save() + level.Debug(p.logger).Log("msg", "positions saved") + close(p.done) + }() ticker := time.NewTicker(p.cfg.SyncPeriod) for { diff --git a/pkg/promtail/target.go b/pkg/promtail/target.go index 5f31093b15..b19efa2a5e 100644 --- a/pkg/promtail/target.go +++ b/pkg/promtail/target.go @@ -11,7 +11,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" - fsnotify "gopkg.in/fsnotify.v1" + "gopkg.in/fsnotify.v1" "github.com/grafana/loki/pkg/helpers" ) @@ -49,6 +49,7 @@ type Target struct { watcher *fsnotify.Watcher path string quit chan struct{} + done chan struct{} tails map[string]*tailer } @@ -75,8 +76,15 @@ func NewTarget(logger log.Logger, handler EntryHandler, positions *Positions, pa for _, p := range matches { dirs[filepath.Dir(p)] = struct{}{} } + + // If no files exist yet watch the directory specified in the path. + if matches == nil { + dirs[filepath.Dir(path)] = struct{}{} + } + // watch each dir for any new files. for dir := range dirs { + level.Debug(logger).Log("msg", "watching new directory", "directory", dir) if err := watcher.Add(dir); err != nil { helpers.LogError("closing watcher", watcher.Close) return nil, errors.Wrap(err, "watcher.Add") @@ -90,6 +98,7 @@ func NewTarget(logger log.Logger, handler EntryHandler, positions *Positions, pa handler: addLabelsMiddleware(labels).Wrap(handler), positions: positions, quit: make(chan struct{}), + done: make(chan struct{}), tails: map[string]*tailer{}, } @@ -120,6 +129,7 @@ func NewTarget(logger log.Logger, handler EntryHandler, positions *Positions, pa // Stop the target. func (t *Target) Stop() { close(t.quit) + <-t.done } func (t *Target) run() { @@ -128,6 +138,10 @@ func (t *Target) run() { for _, v := range t.tails { helpers.LogError("stopping tailer", v.stop) } + //Save positions + t.positions.Stop() + level.Debug(t.logger).Log("msg", "watcher closed, tailer stopped, positions saved") + close(t.done) }() for { @@ -155,6 +169,7 @@ func (t *Target) run() { continue } + level.Debug(t.logger).Log("msg", "tailing new file", "filename", event.Name) t.tails[event.Name] = tailer case fsnotify.Remove: @@ -191,6 +206,9 @@ type tailer struct { path string tail *tail.Tail + + quit chan struct{} + done chan struct{} } func newTailer(logger log.Logger, handler EntryHandler, positions *Positions, path string) (*tailer, error) { @@ -212,31 +230,36 @@ func newTailer(logger log.Logger, handler EntryHandler, positions *Positions, pa path: path, tail: tail, + quit: make(chan struct{}), + done: make(chan struct{}), } go tailer.run() return tailer, nil } func (t *tailer) run() { - defer func() { - level.Info(t.logger).Log("msg", "stopping tailing file", "filename", t.path) - }() - level.Info(t.logger).Log("msg", "start tailing file", "filename", t.path) positionSyncPeriod := t.positions.cfg.SyncPeriod positionWait := time.NewTicker(positionSyncPeriod) - defer positionWait.Stop() + + defer func() { + level.Info(t.logger).Log("msg", "stopping tailing file", "filename", t.path) + positionWait.Stop() + err := t.markPosition() + if err != nil { + level.Error(t.logger).Log("msg", "error getting tail position", "error", err) + } + close(t.done) + }() for { select { - case <-positionWait.C: - pos, err := t.tail.Tell() + err := t.markPosition() if err != nil { level.Error(t.logger).Log("msg", "error getting tail position", "error", err) continue } - t.positions.Put(t.path, pos) case line, ok := <-t.tail.Lines: if !ok { @@ -252,11 +275,25 @@ func (t *tailer) run() { if err := t.handler.Handle(model.LabelSet{}, line.Time, line.Text); err != nil { level.Error(t.logger).Log("msg", "error handling line", "error", err) } + case <-t.quit: + return } } } +func (t *tailer) markPosition() error { + pos, err := t.tail.Tell() + if err != nil { + return err + } + level.Debug(t.logger).Log("path", t.path, "current_position", pos) + t.positions.Put(t.path, pos) + return nil +} + func (t *tailer) stop() error { + close(t.quit) + <-t.done return t.tail.Stop() } diff --git a/pkg/promtail/target_test.go b/pkg/promtail/target_test.go new file mode 100644 index 0000000000..22a88049e4 --- /dev/null +++ b/pkg/promtail/target_test.go @@ -0,0 +1,519 @@ +package promtail + +import ( + "io/ioutil" + "math/rand" + "os" + "path/filepath" + "testing" + "time" + + "github.com/go-kit/kit/log/level" + + "github.com/go-kit/kit/log" + "github.com/prometheus/common/model" + "gopkg.in/yaml.v2" +) + +func TestLongSyncDelayStillSavesCorrectPosition(t *testing.T) { + w := log.NewSyncWriter(os.Stderr) + logger := log.NewLogfmtLogger(w) + + initRandom() + dirName := "/tmp/" + randName() + positionsFileName := dirName + "/positions.yml" + logFile := dirName + "/test.log" + + err := os.MkdirAll(dirName, 0750) + if err != nil { + t.Error(err) + return + } + defer os.RemoveAll(dirName) + + // Set the sync period to a really long value, to guarantee the sync timer never runs, this way we know + // everything saved was done through channel notifications when target.stop() was called. + positions, err := NewPositions(logger, PositionsConfig{ + SyncPeriod: 10 * time.Second, + PositionsFile: positionsFileName, + }) + if err != nil { + t.Error(err) + return + } + + client := &TestClient{ + log: logger, + messages: make([]string, 0), + } + + target, err := NewTarget(logger, client, positions, logFile, nil) + if err != nil { + t.Error(err) + return + } + + f, err := os.Create(logFile) + if err != nil { + t.Error(err) + return + } + + for i := 0; i < 10; i++ { + _, err = f.WriteString("test\n") + if err != nil { + t.Error(err) + return + } + time.Sleep(1 * time.Millisecond) + } + + target.Stop() + + buf, err := ioutil.ReadFile(filepath.Clean(positionsFileName)) + if err != nil { + t.Error("Expected to find a positions file but did not", err) + return + } + var p positionsFile + if err := yaml.UnmarshalStrict(buf, &p); err != nil { + t.Error("Failed to parse positions file:", err) + return + } + + // Assert the position value is in the correct spot. + if val, ok := p.Positions[logFile]; ok { + if val != 50 { + t.Error("Incorrect position found, expected 50, found", val) + } + } else { + t.Error("Positions file did not contain any data for our test log file") + } + + // Assert the number of messages the handler received is correct. + if len(client.messages) != 10 { + t.Error("Handler did not receive the correct number of messages, expected 10 received", len(client.messages)) + } + + // Spot check one of the messages. + if client.messages[0] != "test" { + t.Error("Expected first log message to be 'test' but was", client.messages[0]) + } + +} + +func TestWatchEntireDirectory(t *testing.T) { + w := log.NewSyncWriter(os.Stderr) + logger := log.NewLogfmtLogger(w) + + initRandom() + dirName := "/tmp/" + randName() + positionsFileName := dirName + "/positions.yml" + logFileDir := dirName + "/logdir/" + + err := os.MkdirAll(dirName, 0750) + if err != nil { + t.Error(err) + return + } + err = os.MkdirAll(logFileDir, 0750) + if err != nil { + t.Error(err) + return + } + defer os.RemoveAll(dirName) + + // Set the sync period to a really long value, to guarantee the sync timer never runs, this way we know + // everything saved was done through channel notifications when target.stop() was called. + positions, err := NewPositions(logger, PositionsConfig{ + SyncPeriod: 10 * time.Second, + PositionsFile: positionsFileName, + }) + if err != nil { + t.Error(err) + return + } + + client := &TestClient{ + log: logger, + messages: make([]string, 0), + } + + target, err := NewTarget(logger, client, positions, logFileDir+"*", nil) + if err != nil { + t.Error(err) + return + } + + f, err := os.Create(logFileDir + "test.log") + if err != nil { + t.Error(err) + return + } + + for i := 0; i < 10; i++ { + _, err = f.WriteString("test\n") + if err != nil { + t.Error(err) + return + } + time.Sleep(1 * time.Millisecond) + } + + target.Stop() + + buf, err := ioutil.ReadFile(filepath.Clean(positionsFileName)) + if err != nil { + t.Error("Expected to find a positions file but did not", err) + return + } + var p positionsFile + if err := yaml.UnmarshalStrict(buf, &p); err != nil { + t.Error("Failed to parse positions file:", err) + return + } + + // Assert the position value is in the correct spot. + if val, ok := p.Positions[logFileDir+"test.log"]; ok { + if val != 50 { + t.Error("Incorrect position found, expected 50, found", val) + } + } else { + t.Error("Positions file did not contain any data for our test log file") + } + + // Assert the number of messages the handler received is correct. + if len(client.messages) != 10 { + t.Error("Handler did not receive the correct number of messages, expected 10 received", len(client.messages)) + } + + // Spot check one of the messages. + if client.messages[0] != "test" { + t.Error("Expected first log message to be 'test' but was", client.messages[0]) + } + +} + +func TestFileRolls(t *testing.T) { + w := log.NewSyncWriter(os.Stderr) + logger := log.NewLogfmtLogger(w) + + initRandom() + dirName := "/tmp/" + randName() + positionsFile := dirName + "/positions.yml" + logFile := dirName + "/test.log" + + err := os.MkdirAll(dirName, 0750) + if err != nil { + t.Error(err) + return + } + defer os.RemoveAll(dirName) + + // Set the sync period to a really long value, to guarantee the sync timer never runs, this way we know + // everything saved was done through channel notifications when target.stop() was called. + positions, err := NewPositions(logger, PositionsConfig{ + SyncPeriod: 10 * time.Second, + PositionsFile: positionsFile, + }) + if err != nil { + t.Error(err) + return + } + + client := &TestClient{ + log: logger, + messages: make([]string, 0), + } + + target, err := NewTarget(logger, client, positions, dirName+"/*.log", nil) + if err != nil { + t.Error(err) + return + } + + f, err := os.Create(logFile) + if err != nil { + t.Error(err) + return + } + + for i := 0; i < 10; i++ { + _, err = f.WriteString("test1\n") + if err != nil { + t.Error(err) + return + } + time.Sleep(1 * time.Millisecond) + } + + // Rename the log file to something not in the pattern, then create a new file with the same name. + err = os.Rename(logFile, dirName+"/test.log.1") + if err != nil { + t.Error("Failed to rename log file for test", err) + return + } + f, err = os.Create(logFile) + if err != nil { + t.Error(err) + return + } + + for i := 0; i < 10; i++ { + _, err = f.WriteString("test2\n") + if err != nil { + t.Error(err) + return + } + time.Sleep(1 * time.Millisecond) + } + + target.Stop() + + if len(client.messages) != 20 { + t.Error("Handler did not receive the correct number of messages, expected 20 received", len(client.messages)) + } + + // Spot check one of the messages. + if client.messages[0] != "test1" { + t.Error("Expected first log message to be 'test1' but was", client.messages[0]) + } + + // Spot check the first message from the second file. + if client.messages[10] != "test2" { + t.Error("Expected first log message to be 'test2' but was", client.messages[10]) + } +} + +func TestResumesWhereLeftOff(t *testing.T) { + w := log.NewSyncWriter(os.Stderr) + logger := log.NewLogfmtLogger(w) + + initRandom() + dirName := "/tmp/" + randName() + positionsFileName := dirName + "/positions.yml" + logFile := dirName + "/test.log" + + err := os.MkdirAll(dirName, 0750) + if err != nil { + t.Error(err) + return + } + defer os.RemoveAll(dirName) + + // Set the sync period to a really long value, to guarantee the sync timer never runs, this way we know + // everything saved was done through channel notifications when target.stop() was called. + positions, err := NewPositions(logger, PositionsConfig{ + SyncPeriod: 10 * time.Second, + PositionsFile: positionsFileName, + }) + if err != nil { + t.Error(err) + return + } + + client := &TestClient{ + log: logger, + messages: make([]string, 0), + } + + target, err := NewTarget(logger, client, positions, dirName+"/*.log", nil) + if err != nil { + t.Error(err) + return + } + + f, err := os.Create(logFile) + if err != nil { + t.Error(err) + return + } + + for i := 0; i < 10; i++ { + _, err = f.WriteString("test1\n") + if err != nil { + t.Error(err) + return + } + time.Sleep(1 * time.Millisecond) + } + + target.Stop() + + // Create another positions (so that it loads from the previously saved positions file). + positions2, err := NewPositions(logger, PositionsConfig{ + SyncPeriod: 10 * time.Second, + PositionsFile: positionsFileName, + }) + if err != nil { + t.Error(err) + return + } + + // Create a new target, keep the same client so we can track what was sent through the handler. + target2, err := NewTarget(logger, client, positions2, dirName+"/*.log", nil) + if err != nil { + t.Error(err) + return + } + + for i := 0; i < 10; i++ { + _, err = f.WriteString("test2\n") + if err != nil { + t.Error(err) + return + } + time.Sleep(1 * time.Millisecond) + } + + target2.Stop() + + if len(client.messages) != 20 { + t.Error("Handler did not receive the correct number of messages, expected 20 received", len(client.messages)) + } + + // Spot check one of the messages. + if client.messages[0] != "test1" { + t.Error("Expected first log message to be 'test1' but was", client.messages[0]) + } + + // Spot check the first message from the second file. + if client.messages[10] != "test2" { + t.Error("Expected first log message to be 'test2' but was", client.messages[10]) + } +} + +func TestGlobWithMultipleFiles(t *testing.T) { + w := log.NewSyncWriter(os.Stderr) + logger := log.NewLogfmtLogger(w) + + initRandom() + dirName := "/tmp/" + randName() + positionsFileName := dirName + "/positions.yml" + logFile1 := dirName + "/test.log" + logFile2 := dirName + "/dirt.log" + + err := os.MkdirAll(dirName, 0750) + if err != nil { + t.Error(err) + return + } + defer os.RemoveAll(dirName) + + // Set the sync period to a really long value, to guarantee the sync timer never runs, this way we know + // everything saved was done through channel notifications when target.stop() was called. + positions, err := NewPositions(logger, PositionsConfig{ + SyncPeriod: 10 * time.Second, + PositionsFile: positionsFileName, + }) + if err != nil { + t.Error(err) + return + } + + client := &TestClient{ + log: logger, + messages: make([]string, 0), + } + + target, err := NewTarget(logger, client, positions, dirName+"/*.log", nil) + if err != nil { + t.Error(err) + return + } + + f1, err := os.Create(logFile1) + if err != nil { + t.Error(err) + return + } + f2, err := os.Create(logFile2) + if err != nil { + t.Error(err) + return + } + + for i := 0; i < 10; i++ { + _, err = f1.WriteString("test1\n") + if err != nil { + t.Error(err) + return + } + time.Sleep(1 * time.Millisecond) + _, err = f2.WriteString("dirt1\n") + if err != nil { + t.Error(err) + return + } + time.Sleep(1 * time.Millisecond) + } + + target.Stop() + + buf, err := ioutil.ReadFile(filepath.Clean(positionsFileName)) + if err != nil { + t.Error("Expected to find a positions file but did not", err) + return + } + var p positionsFile + if err := yaml.UnmarshalStrict(buf, &p); err != nil { + t.Error("Failed to parse positions file:", err) + return + } + + // Assert the position value is in the correct spot. + if val, ok := p.Positions[logFile1]; ok { + if val != 60 { + t.Error("Incorrect position found for file 1, expected 60, found", val) + } + } else { + t.Error("Positions file did not contain any data for our test log file") + } + if val, ok := p.Positions[logFile2]; ok { + if val != 60 { + t.Error("Incorrect position found for file 2, expected 60, found", val) + } + } else { + t.Error("Positions file did not contain any data for our test log file") + } + + // Assert the number of messages the handler received is correct. + if len(client.messages) != 20 { + t.Error("Handler did not receive the correct number of messages, expected 20 received", len(client.messages)) + } + + // Spot check one of the messages, the first message should be from the first file because we wrote that first. + if client.messages[0] != "test1" { + t.Error("Expected first log message to be 'test1' but was", client.messages[0]) + } + + // Spot check the second message, it should be from the second file. + if client.messages[1] != "dirt1" { + t.Error("Expected first log message to be 'test2' but was", client.messages[1]) + } +} + +type TestClient struct { + log log.Logger + messages []string +} + +func (c *TestClient) Handle(ls model.LabelSet, t time.Time, s string) error { + c.messages = append(c.messages, s) + level.Debug(c.log).Log("msg", "received log", "log", s) + return nil +} + +func initRandom() { + rand.Seed(time.Now().UnixNano()) +} + +var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") + +func randName() string { + b := make([]rune, 10) + for i := range b { + b[i] = letters[rand.Intn(len(letters))] + } + return string(b) +}