Unit tests for Promtail (#244)

* Adding a test for target.go
Fixing some shutdown issues where tailers were stopped but didn't properly wait for the current position to be saved to file
Fixed an issue where an empty directory would lead to nothing being tailed

* adding more tests for target.go

* renaming quitComplete -> done per feedback
making PositionsFile private again, no reason for it to be public
removed unnecessary save of empty file on creation of positions file

* fixing lint errors

* cleaning up some comments
removing TODO around directories, added a test to verify behavior
some more cleanup per review

* fixing imports
pull/251/head
Ed 7 years ago committed by Tom Wilkie
parent a196c27354
commit 07f20b0db3
  1. 9
      pkg/promtail/position.go
  2. 55
      pkg/promtail/target.go
  3. 519
      pkg/promtail/target_test.go

@ -34,6 +34,7 @@ type Positions struct {
mtx sync.Mutex
positions map[string]int64
quit chan struct{}
done chan struct{}
}
type positionsFile struct {
@ -52,6 +53,7 @@ func NewPositions(logger log.Logger, cfg PositionsConfig) (*Positions, error) {
cfg: cfg,
positions: positions,
quit: make(chan struct{}),
done: make(chan struct{}),
}
go p.run()
@ -61,6 +63,7 @@ func NewPositions(logger log.Logger, cfg PositionsConfig) (*Positions, error) {
// Stop the Position tracker.
func (p *Positions) Stop() {
close(p.quit)
<-p.done
}
// Put records (asynchronously) how far we've read through a file.
@ -85,7 +88,11 @@ func (p *Positions) Remove(path string) {
}
func (p *Positions) run() {
defer p.save()
defer func() {
p.save()
level.Debug(p.logger).Log("msg", "positions saved")
close(p.done)
}()
ticker := time.NewTicker(p.cfg.SyncPeriod)
for {

@ -11,7 +11,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
fsnotify "gopkg.in/fsnotify.v1"
"gopkg.in/fsnotify.v1"
"github.com/grafana/loki/pkg/helpers"
)
@ -49,6 +49,7 @@ type Target struct {
watcher *fsnotify.Watcher
path string
quit chan struct{}
done chan struct{}
tails map[string]*tailer
}
@ -75,8 +76,15 @@ func NewTarget(logger log.Logger, handler EntryHandler, positions *Positions, pa
for _, p := range matches {
dirs[filepath.Dir(p)] = struct{}{}
}
// If no files exist yet watch the directory specified in the path.
if matches == nil {
dirs[filepath.Dir(path)] = struct{}{}
}
// watch each dir for any new files.
for dir := range dirs {
level.Debug(logger).Log("msg", "watching new directory", "directory", dir)
if err := watcher.Add(dir); err != nil {
helpers.LogError("closing watcher", watcher.Close)
return nil, errors.Wrap(err, "watcher.Add")
@ -90,6 +98,7 @@ func NewTarget(logger log.Logger, handler EntryHandler, positions *Positions, pa
handler: addLabelsMiddleware(labels).Wrap(handler),
positions: positions,
quit: make(chan struct{}),
done: make(chan struct{}),
tails: map[string]*tailer{},
}
@ -120,6 +129,7 @@ func NewTarget(logger log.Logger, handler EntryHandler, positions *Positions, pa
// Stop the target.
func (t *Target) Stop() {
close(t.quit)
<-t.done
}
func (t *Target) run() {
@ -128,6 +138,10 @@ func (t *Target) run() {
for _, v := range t.tails {
helpers.LogError("stopping tailer", v.stop)
}
//Save positions
t.positions.Stop()
level.Debug(t.logger).Log("msg", "watcher closed, tailer stopped, positions saved")
close(t.done)
}()
for {
@ -155,6 +169,7 @@ func (t *Target) run() {
continue
}
level.Debug(t.logger).Log("msg", "tailing new file", "filename", event.Name)
t.tails[event.Name] = tailer
case fsnotify.Remove:
@ -191,6 +206,9 @@ type tailer struct {
path string
tail *tail.Tail
quit chan struct{}
done chan struct{}
}
func newTailer(logger log.Logger, handler EntryHandler, positions *Positions, path string) (*tailer, error) {
@ -212,31 +230,36 @@ func newTailer(logger log.Logger, handler EntryHandler, positions *Positions, pa
path: path,
tail: tail,
quit: make(chan struct{}),
done: make(chan struct{}),
}
go tailer.run()
return tailer, nil
}
func (t *tailer) run() {
defer func() {
level.Info(t.logger).Log("msg", "stopping tailing file", "filename", t.path)
}()
level.Info(t.logger).Log("msg", "start tailing file", "filename", t.path)
positionSyncPeriod := t.positions.cfg.SyncPeriod
positionWait := time.NewTicker(positionSyncPeriod)
defer positionWait.Stop()
defer func() {
level.Info(t.logger).Log("msg", "stopping tailing file", "filename", t.path)
positionWait.Stop()
err := t.markPosition()
if err != nil {
level.Error(t.logger).Log("msg", "error getting tail position", "error", err)
}
close(t.done)
}()
for {
select {
case <-positionWait.C:
pos, err := t.tail.Tell()
err := t.markPosition()
if err != nil {
level.Error(t.logger).Log("msg", "error getting tail position", "error", err)
continue
}
t.positions.Put(t.path, pos)
case line, ok := <-t.tail.Lines:
if !ok {
@ -252,11 +275,25 @@ func (t *tailer) run() {
if err := t.handler.Handle(model.LabelSet{}, line.Time, line.Text); err != nil {
level.Error(t.logger).Log("msg", "error handling line", "error", err)
}
case <-t.quit:
return
}
}
}
func (t *tailer) markPosition() error {
pos, err := t.tail.Tell()
if err != nil {
return err
}
level.Debug(t.logger).Log("path", t.path, "current_position", pos)
t.positions.Put(t.path, pos)
return nil
}
func (t *tailer) stop() error {
close(t.quit)
<-t.done
return t.tail.Stop()
}

@ -0,0 +1,519 @@
package promtail
import (
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"testing"
"time"
"github.com/go-kit/kit/log/level"
"github.com/go-kit/kit/log"
"github.com/prometheus/common/model"
"gopkg.in/yaml.v2"
)
func TestLongSyncDelayStillSavesCorrectPosition(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
initRandom()
dirName := "/tmp/" + randName()
positionsFileName := dirName + "/positions.yml"
logFile := dirName + "/test.log"
err := os.MkdirAll(dirName, 0750)
if err != nil {
t.Error(err)
return
}
defer os.RemoveAll(dirName)
// Set the sync period to a really long value, to guarantee the sync timer never runs, this way we know
// everything saved was done through channel notifications when target.stop() was called.
positions, err := NewPositions(logger, PositionsConfig{
SyncPeriod: 10 * time.Second,
PositionsFile: positionsFileName,
})
if err != nil {
t.Error(err)
return
}
client := &TestClient{
log: logger,
messages: make([]string, 0),
}
target, err := NewTarget(logger, client, positions, logFile, nil)
if err != nil {
t.Error(err)
return
}
f, err := os.Create(logFile)
if err != nil {
t.Error(err)
return
}
for i := 0; i < 10; i++ {
_, err = f.WriteString("test\n")
if err != nil {
t.Error(err)
return
}
time.Sleep(1 * time.Millisecond)
}
target.Stop()
buf, err := ioutil.ReadFile(filepath.Clean(positionsFileName))
if err != nil {
t.Error("Expected to find a positions file but did not", err)
return
}
var p positionsFile
if err := yaml.UnmarshalStrict(buf, &p); err != nil {
t.Error("Failed to parse positions file:", err)
return
}
// Assert the position value is in the correct spot.
if val, ok := p.Positions[logFile]; ok {
if val != 50 {
t.Error("Incorrect position found, expected 50, found", val)
}
} else {
t.Error("Positions file did not contain any data for our test log file")
}
// Assert the number of messages the handler received is correct.
if len(client.messages) != 10 {
t.Error("Handler did not receive the correct number of messages, expected 10 received", len(client.messages))
}
// Spot check one of the messages.
if client.messages[0] != "test" {
t.Error("Expected first log message to be 'test' but was", client.messages[0])
}
}
func TestWatchEntireDirectory(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
initRandom()
dirName := "/tmp/" + randName()
positionsFileName := dirName + "/positions.yml"
logFileDir := dirName + "/logdir/"
err := os.MkdirAll(dirName, 0750)
if err != nil {
t.Error(err)
return
}
err = os.MkdirAll(logFileDir, 0750)
if err != nil {
t.Error(err)
return
}
defer os.RemoveAll(dirName)
// Set the sync period to a really long value, to guarantee the sync timer never runs, this way we know
// everything saved was done through channel notifications when target.stop() was called.
positions, err := NewPositions(logger, PositionsConfig{
SyncPeriod: 10 * time.Second,
PositionsFile: positionsFileName,
})
if err != nil {
t.Error(err)
return
}
client := &TestClient{
log: logger,
messages: make([]string, 0),
}
target, err := NewTarget(logger, client, positions, logFileDir+"*", nil)
if err != nil {
t.Error(err)
return
}
f, err := os.Create(logFileDir + "test.log")
if err != nil {
t.Error(err)
return
}
for i := 0; i < 10; i++ {
_, err = f.WriteString("test\n")
if err != nil {
t.Error(err)
return
}
time.Sleep(1 * time.Millisecond)
}
target.Stop()
buf, err := ioutil.ReadFile(filepath.Clean(positionsFileName))
if err != nil {
t.Error("Expected to find a positions file but did not", err)
return
}
var p positionsFile
if err := yaml.UnmarshalStrict(buf, &p); err != nil {
t.Error("Failed to parse positions file:", err)
return
}
// Assert the position value is in the correct spot.
if val, ok := p.Positions[logFileDir+"test.log"]; ok {
if val != 50 {
t.Error("Incorrect position found, expected 50, found", val)
}
} else {
t.Error("Positions file did not contain any data for our test log file")
}
// Assert the number of messages the handler received is correct.
if len(client.messages) != 10 {
t.Error("Handler did not receive the correct number of messages, expected 10 received", len(client.messages))
}
// Spot check one of the messages.
if client.messages[0] != "test" {
t.Error("Expected first log message to be 'test' but was", client.messages[0])
}
}
func TestFileRolls(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
initRandom()
dirName := "/tmp/" + randName()
positionsFile := dirName + "/positions.yml"
logFile := dirName + "/test.log"
err := os.MkdirAll(dirName, 0750)
if err != nil {
t.Error(err)
return
}
defer os.RemoveAll(dirName)
// Set the sync period to a really long value, to guarantee the sync timer never runs, this way we know
// everything saved was done through channel notifications when target.stop() was called.
positions, err := NewPositions(logger, PositionsConfig{
SyncPeriod: 10 * time.Second,
PositionsFile: positionsFile,
})
if err != nil {
t.Error(err)
return
}
client := &TestClient{
log: logger,
messages: make([]string, 0),
}
target, err := NewTarget(logger, client, positions, dirName+"/*.log", nil)
if err != nil {
t.Error(err)
return
}
f, err := os.Create(logFile)
if err != nil {
t.Error(err)
return
}
for i := 0; i < 10; i++ {
_, err = f.WriteString("test1\n")
if err != nil {
t.Error(err)
return
}
time.Sleep(1 * time.Millisecond)
}
// Rename the log file to something not in the pattern, then create a new file with the same name.
err = os.Rename(logFile, dirName+"/test.log.1")
if err != nil {
t.Error("Failed to rename log file for test", err)
return
}
f, err = os.Create(logFile)
if err != nil {
t.Error(err)
return
}
for i := 0; i < 10; i++ {
_, err = f.WriteString("test2\n")
if err != nil {
t.Error(err)
return
}
time.Sleep(1 * time.Millisecond)
}
target.Stop()
if len(client.messages) != 20 {
t.Error("Handler did not receive the correct number of messages, expected 20 received", len(client.messages))
}
// Spot check one of the messages.
if client.messages[0] != "test1" {
t.Error("Expected first log message to be 'test1' but was", client.messages[0])
}
// Spot check the first message from the second file.
if client.messages[10] != "test2" {
t.Error("Expected first log message to be 'test2' but was", client.messages[10])
}
}
func TestResumesWhereLeftOff(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
initRandom()
dirName := "/tmp/" + randName()
positionsFileName := dirName + "/positions.yml"
logFile := dirName + "/test.log"
err := os.MkdirAll(dirName, 0750)
if err != nil {
t.Error(err)
return
}
defer os.RemoveAll(dirName)
// Set the sync period to a really long value, to guarantee the sync timer never runs, this way we know
// everything saved was done through channel notifications when target.stop() was called.
positions, err := NewPositions(logger, PositionsConfig{
SyncPeriod: 10 * time.Second,
PositionsFile: positionsFileName,
})
if err != nil {
t.Error(err)
return
}
client := &TestClient{
log: logger,
messages: make([]string, 0),
}
target, err := NewTarget(logger, client, positions, dirName+"/*.log", nil)
if err != nil {
t.Error(err)
return
}
f, err := os.Create(logFile)
if err != nil {
t.Error(err)
return
}
for i := 0; i < 10; i++ {
_, err = f.WriteString("test1\n")
if err != nil {
t.Error(err)
return
}
time.Sleep(1 * time.Millisecond)
}
target.Stop()
// Create another positions (so that it loads from the previously saved positions file).
positions2, err := NewPositions(logger, PositionsConfig{
SyncPeriod: 10 * time.Second,
PositionsFile: positionsFileName,
})
if err != nil {
t.Error(err)
return
}
// Create a new target, keep the same client so we can track what was sent through the handler.
target2, err := NewTarget(logger, client, positions2, dirName+"/*.log", nil)
if err != nil {
t.Error(err)
return
}
for i := 0; i < 10; i++ {
_, err = f.WriteString("test2\n")
if err != nil {
t.Error(err)
return
}
time.Sleep(1 * time.Millisecond)
}
target2.Stop()
if len(client.messages) != 20 {
t.Error("Handler did not receive the correct number of messages, expected 20 received", len(client.messages))
}
// Spot check one of the messages.
if client.messages[0] != "test1" {
t.Error("Expected first log message to be 'test1' but was", client.messages[0])
}
// Spot check the first message from the second file.
if client.messages[10] != "test2" {
t.Error("Expected first log message to be 'test2' but was", client.messages[10])
}
}
func TestGlobWithMultipleFiles(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
initRandom()
dirName := "/tmp/" + randName()
positionsFileName := dirName + "/positions.yml"
logFile1 := dirName + "/test.log"
logFile2 := dirName + "/dirt.log"
err := os.MkdirAll(dirName, 0750)
if err != nil {
t.Error(err)
return
}
defer os.RemoveAll(dirName)
// Set the sync period to a really long value, to guarantee the sync timer never runs, this way we know
// everything saved was done through channel notifications when target.stop() was called.
positions, err := NewPositions(logger, PositionsConfig{
SyncPeriod: 10 * time.Second,
PositionsFile: positionsFileName,
})
if err != nil {
t.Error(err)
return
}
client := &TestClient{
log: logger,
messages: make([]string, 0),
}
target, err := NewTarget(logger, client, positions, dirName+"/*.log", nil)
if err != nil {
t.Error(err)
return
}
f1, err := os.Create(logFile1)
if err != nil {
t.Error(err)
return
}
f2, err := os.Create(logFile2)
if err != nil {
t.Error(err)
return
}
for i := 0; i < 10; i++ {
_, err = f1.WriteString("test1\n")
if err != nil {
t.Error(err)
return
}
time.Sleep(1 * time.Millisecond)
_, err = f2.WriteString("dirt1\n")
if err != nil {
t.Error(err)
return
}
time.Sleep(1 * time.Millisecond)
}
target.Stop()
buf, err := ioutil.ReadFile(filepath.Clean(positionsFileName))
if err != nil {
t.Error("Expected to find a positions file but did not", err)
return
}
var p positionsFile
if err := yaml.UnmarshalStrict(buf, &p); err != nil {
t.Error("Failed to parse positions file:", err)
return
}
// Assert the position value is in the correct spot.
if val, ok := p.Positions[logFile1]; ok {
if val != 60 {
t.Error("Incorrect position found for file 1, expected 60, found", val)
}
} else {
t.Error("Positions file did not contain any data for our test log file")
}
if val, ok := p.Positions[logFile2]; ok {
if val != 60 {
t.Error("Incorrect position found for file 2, expected 60, found", val)
}
} else {
t.Error("Positions file did not contain any data for our test log file")
}
// Assert the number of messages the handler received is correct.
if len(client.messages) != 20 {
t.Error("Handler did not receive the correct number of messages, expected 20 received", len(client.messages))
}
// Spot check one of the messages, the first message should be from the first file because we wrote that first.
if client.messages[0] != "test1" {
t.Error("Expected first log message to be 'test1' but was", client.messages[0])
}
// Spot check the second message, it should be from the second file.
if client.messages[1] != "dirt1" {
t.Error("Expected first log message to be 'test2' but was", client.messages[1])
}
}
type TestClient struct {
log log.Logger
messages []string
}
func (c *TestClient) Handle(ls model.LabelSet, t time.Time, s string) error {
c.messages = append(c.messages, s)
level.Debug(c.log).Log("msg", "received log", "log", s)
return nil
}
func initRandom() {
rand.Seed(time.Now().UnixNano())
}
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
func randName() string {
b := make([]rune, 10)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
}
Loading…
Cancel
Save