@ -151,10 +151,10 @@ func (t *Target) run() {
case event := <- t . watcher . Events :
switch event . Op {
case fsnotify . Create :
// protect against double Creates.
if _ , ok := t . tails [ event . Name ] ; ok {
level . Info ( t . logger ) . Log ( "msg" , "got 'create' for existing file" , "filename" , event . Name )
continue
if tailer , ok := t . tails [ event . Name ] ; ok {
level . Info ( t . logger ) . Log ( "msg" , "create for file being tailed. Will close and re-open" , "filename" , event . Name )
helpers . LogError ( "stopping tailer" , tailer . stop )
delete ( t . tails , event . Name )
}
matched , err := filepath . Match ( t . path , event . Name )
if err != nil {
@ -206,18 +206,39 @@ type tailer struct {
handler EntryHandler
positions * Positions
path string
tail * tail . Tail
path string
filename string
tail * tail . Tail
quit chan struct { }
done chan struct { }
}
func newTailer ( logger log . Logger , handler EntryHandler , positions * Positions , path string ) ( * tailer , error ) {
tail , err := tail . TailFile ( path , tail . Config {
filename := path
var reOpen bool
// Check if the path requested is a symbolic link
fi , err := os . Lstat ( path )
if err != nil {
return nil , err
}
if fi . Mode ( ) & os . ModeSymlink == os . ModeSymlink {
filename , err = os . Readlink ( path )
if err != nil {
return nil , err
}
// if we are tailing a symbolic link then we need to automatically re-open
// as we wont get a Create event when a file is rotated.
reOpen = true
}
tail , err := tail . TailFile ( filename , tail . Config {
Follow : true ,
ReOpen : reOpen ,
Location : & tail . SeekInfo {
Offset : positions . Get ( path ) ,
Offset : positions . Get ( filename ) ,
Whence : 0 ,
} ,
} )
@ -227,13 +248,14 @@ func newTailer(logger log.Logger, handler EntryHandler, positions *Positions, pa
tailer := & tailer {
logger : logger ,
handler : addLabelsMiddleware ( model . LabelSet { filename : model . LabelValue ( path ) } ) . Wrap ( handler ) ,
handler : addLabelsMiddleware ( model . LabelSet { "filename" : model . LabelValue ( path ) } ) . Wrap ( handler ) ,
positions : positions ,
path : path ,
tail : tail ,
quit : make ( chan struct { } ) ,
done : make ( chan struct { } ) ,
path : path ,
filename : filename ,
tail : tail ,
quit : make ( chan struct { } ) ,
done : make ( chan struct { } ) ,
}
go tailer . run ( )
filesActive . Add ( 1. )
@ -241,16 +263,16 @@ func newTailer(logger log.Logger, handler EntryHandler, positions *Positions, pa
}
func ( t * tailer ) run ( ) {
level . Info ( t . logger ) . Log ( "msg" , "start tailing file" , "filename " , t . path )
level . Info ( t . logger ) . Log ( "msg" , "start tailing file" , "path " , t . path )
positionSyncPeriod := t . positions . cfg . SyncPeriod
positionWait := time . NewTicker ( positionSyncPeriod )
defer func ( ) {
level . Info ( t . logger ) . Log ( "msg" , "stopping tailing file" , "filename " , t . path )
level . Info ( t . logger ) . Log ( "msg" , "stopped tailing file" , "path " , t . path )
positionWait . Stop ( )
err := t . markPosition ( )
if err != nil {
level . Error ( t . logger ) . Log ( "msg" , "error getting tail position" , "filename " , t . path , "error" , err )
level . Error ( t . logger ) . Log ( "msg" , "error getting tail position" , "path " , t . path , "error" , err )
}
close ( t . done )
} ( )
@ -260,7 +282,7 @@ func (t *tailer) run() {
case <- positionWait . C :
err := t . markPosition ( )
if err != nil {
level . Error ( t . logger ) . Log ( "msg" , "error getting tail position" , "filename " , t . path , "error" , err )
level . Error ( t . logger ) . Log ( "msg" , "error getting tail position" , "path " , t . path , "error" , err )
continue
}
@ -270,13 +292,13 @@ func (t *tailer) run() {
}
if line . Err != nil {
level . Error ( t . logger ) . Log ( "msg" , "error reading line" , "filename " , t . path , "error" , line . Err )
level . Error ( t . logger ) . Log ( "msg" , "error reading line" , "path " , t . path , "error" , line . Err )
}
readLines . WithLabelValues ( t . path ) . Inc ( )
readBytes . WithLabelValues ( t . path ) . Add ( float64 ( len ( line . Text ) ) )
if err := t . handler . Handle ( model . LabelSet { } , line . Time , line . Text ) ; err != nil {
level . Error ( t . logger ) . Log ( "msg" , "error handling line" , "filename " , t . path , "error" , err )
level . Error ( t . logger ) . Log ( "msg" , "error handling line" , "path " , t . path , "error" , err )
}
case <- t . quit :
return
@ -289,8 +311,8 @@ func (t *tailer) markPosition() error {
if err != nil {
return err
}
level . Debug ( t . logger ) . Log ( "path" , t . path , "current_position" , pos )
t . positions . Put ( t . path , pos )
level . Debug ( t . logger ) . Log ( "path" , t . path , "filename" , t . filename , " current_position" , pos )
t . positions . Put ( t . filename , pos )
return nil
}
@ -302,5 +324,5 @@ func (t *tailer) stop() error {
}
func ( t * tailer ) cleanup ( ) {
t . positions . Remove ( t . path )
t . positions . Remove ( t . filename )
}