Refactor structure of promtail

This allows easier integration of other targets than files for promtail.

Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
pull/282/head
Christian Simon 7 years ago committed by Tom Wilkie
parent 4c985efade
commit 31789eafc4
  1. 3
      cmd/promtail/main.go
  2. 13
      pkg/promtail/api/config.go
  3. 2
      pkg/promtail/api/entry_parser.go
  4. 4
      pkg/promtail/api/types.go
  5. 14
      pkg/promtail/client/client.go
  6. 21
      pkg/promtail/positions/positions.go
  7. 19
      pkg/promtail/promtail.go
  8. 31
      pkg/promtail/targets/file/filetarget.go
  9. 27
      pkg/promtail/targets/file/filetargetmanager.go
  10. 58
      pkg/promtail/targets/manager.go

@ -13,6 +13,7 @@ import (
"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/promtail"
"github.com/grafana/loki/pkg/promtail/api"
)
func init() {
@ -22,7 +23,7 @@ func init() {
func main() {
var (
configFile = "docs/promtail-local-config.yaml"
config promtail.Config
config api.Config
)
flag.StringVar(&configFile, "config.file", "promtail.yml", "The config file.")
flagext.RegisterFlags(&config)

@ -1,4 +1,4 @@
package promtail
package api
import (
"flag"
@ -11,14 +11,17 @@ import (
sd_config "github.com/prometheus/prometheus/discovery/config"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/weaveworks/common/server"
"github.com/grafana/loki/pkg/promtail/client"
"github.com/grafana/loki/pkg/promtail/positions"
)
// Config for promtail, describing what files to watch.
type Config struct {
ServerConfig server.Config `yaml:"server,omitempty"`
ClientConfig ClientConfig `yaml:"client,omitempty"`
PositionsConfig PositionsConfig `yaml:"positions,omitempty"`
ScrapeConfig []ScrapeConfig `yaml:"scrape_configs,omitempty"`
ServerConfig server.Config `yaml:"server,omitempty"`
ClientConfig client.Config `yaml:"client,omitempty"`
PositionsConfig positions.Config `yaml:"positions,omitempty"`
ScrapeConfig []ScrapeConfig `yaml:"scrape_configs,omitempty"`
}
// RegisterFlags registers flags.

@ -1,4 +1,4 @@
package promtail
package api
import (
"encoding/json"

@ -1,4 +1,4 @@
package promtail
package api
import (
"time"
@ -32,7 +32,7 @@ func (e EntryMiddlewareFunc) Wrap(next EntryHandler) EntryHandler {
return e(next)
}
func addLabelsMiddleware(additionalLabels model.LabelSet) EntryMiddleware {
func AddLabelsMiddleware(additionalLabels model.LabelSet) EntryMiddleware {
return EntryMiddlewareFunc(func(next EntryHandler) EntryHandler {
return EntryHandlerFunc(func(labels model.LabelSet, time time.Time, entry string) error {
labels = additionalLabels.Merge(labels) // Add the additionalLabels but preserves the original labels.

@ -1,4 +1,4 @@
package promtail
package client
import (
"bytes"
@ -41,8 +41,8 @@ func init() {
prometheus.MustRegister(requestDuration)
}
// ClientConfig describes configuration for a HTTP pusher client.
type ClientConfig struct {
// Config describes configuration for a HTTP pusher client.
type Config struct {
URL flagext.URLValue
BatchWait time.Duration
BatchSize int
@ -51,7 +51,7 @@ type ClientConfig struct {
}
// RegisterFlags registers flags.
func (c *ClientConfig) RegisterFlags(flags *flag.FlagSet) {
func (c *Config) RegisterFlags(flags *flag.FlagSet) {
flags.Var(&c.URL, "client.url", "URL of log server")
flags.DurationVar(&c.BatchWait, "client.batch-wait", 1*time.Second, "Maximum wait period before sending batch.")
flags.IntVar(&c.BatchSize, "client.batch-size-bytes", 100*1024, "Maximum batch size to accrue before sending. ")
@ -60,7 +60,7 @@ func (c *ClientConfig) RegisterFlags(flags *flag.FlagSet) {
// Client for pushing logs in snappy-compressed protos over HTTP.
type Client struct {
logger log.Logger
cfg ClientConfig
cfg Config
quit chan struct{}
entries chan entry
wg sync.WaitGroup
@ -73,8 +73,8 @@ type entry struct {
logproto.Entry
}
// NewClient makes a new Client.
func NewClient(cfg ClientConfig, logger log.Logger) (*Client, error) {
// New makes a new Client.
func New(cfg Config, logger log.Logger) (*Client, error) {
c := &Client{
logger: logger,
cfg: cfg,

@ -1,4 +1,4 @@
package promtail
package positions
import (
"flag"
@ -10,19 +10,19 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"gopkg.in/yaml.v2"
yaml "gopkg.in/yaml.v2"
)
const positionFileMode = 0700
// PositionsConfig describes where to get postition information from.
type PositionsConfig struct {
// Config describes where to get postition information from.
type Config struct {
SyncPeriod time.Duration `yaml:"sync_period"`
PositionsFile string `yaml:"filename"`
}
// RegisterFlags register flags.
func (cfg *PositionsConfig) RegisterFlags(flags *flag.FlagSet) {
func (cfg *Config) RegisterFlags(flags *flag.FlagSet) {
flags.DurationVar(&cfg.SyncPeriod, "positions.sync-period", 10*time.Second, "Period with this to sync the position file.")
flag.StringVar(&cfg.PositionsFile, "positions.file", "/var/log/positions.yaml", "Location to read/wrtie positions from.")
}
@ -30,7 +30,7 @@ func (cfg *PositionsConfig) RegisterFlags(flags *flag.FlagSet) {
// Positions tracks how far through each file we've read.
type Positions struct {
logger log.Logger
cfg PositionsConfig
cfg Config
mtx sync.Mutex
positions map[string]int64
quit chan struct{}
@ -41,8 +41,8 @@ type positionsFile struct {
Positions map[string]int64 `yaml:"positions"`
}
// NewPositions makes a new Positions.
func NewPositions(logger log.Logger, cfg PositionsConfig) (*Positions, error) {
// New makes a new Positions.
func New(logger log.Logger, cfg Config) (*Positions, error) {
positions, err := readPositionsFile(cfg.PositionsFile)
if err != nil {
return nil, err
@ -87,6 +87,11 @@ func (p *Positions) Remove(path string) {
p.mtx.Unlock()
}
// SyncPeriod returns how often the positions file gets resynced
func (p *Positions) SyncPeriod() time.Duration {
return p.cfg.SyncPeriod
}
func (p *Positions) run() {
defer func() {
p.save()

@ -3,29 +3,34 @@ package promtail
import (
"github.com/cortexproject/cortex/pkg/util"
"github.com/weaveworks/common/server"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/grafana/loki/pkg/promtail/client"
"github.com/grafana/loki/pkg/promtail/positions"
"github.com/grafana/loki/pkg/promtail/targets"
)
// Promtail is the root struct for Promtail...
type Promtail struct {
client *Client
positions *Positions
targetManager *TargetManager
client *client.Client
positions *positions.Positions
targetManager *targets.TargetManager
server *server.Server
}
// New makes a new Promtail.
func New(cfg Config) (*Promtail, error) {
positions, err := NewPositions(util.Logger, cfg.PositionsConfig)
func New(cfg api.Config) (*Promtail, error) {
positions, err := positions.New(util.Logger, cfg.PositionsConfig)
if err != nil {
return nil, err
}
client, err := NewClient(cfg.ClientConfig, util.Logger)
client, err := client.New(cfg.ClientConfig, util.Logger)
if err != nil {
return nil, err
}
tm, err := NewTargetManager(util.Logger, positions, client, cfg.ScrapeConfig)
tm, err := targets.NewTargetManager(util.Logger, positions, client, cfg.ScrapeConfig)
if err != nil {
return nil, err
}

@ -1,4 +1,4 @@
package promtail
package file
import (
"os"
@ -16,6 +16,8 @@ import (
fsnotify "gopkg.in/fsnotify.v1"
"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/grafana/loki/pkg/promtail/positions"
)
var (
@ -42,12 +44,12 @@ const (
filename = "__filename__"
)
// Target describes a particular set of logs.
type Target struct {
// FileTarget describes a particular set of logs.
type FileTarget struct {
logger log.Logger
handler EntryHandler
positions *Positions
handler api.EntryHandler
positions *positions.Positions
watcher *fsnotify.Watcher
path string
@ -57,13 +59,14 @@ type Target struct {
tails map[string]*tailer
}
// NewTarget create a new Target.
func NewTarget(logger log.Logger, handler EntryHandler, positions *Positions, path string, labels model.LabelSet) (*Target, error) {
// NewFileTarget create a new FileTarget.
func NewFileTarget(logger log.Logger, handler api.EntryHandler, positions *positions.Positions, path string, labels model.LabelSet) (*FileTarget, error) {
var err error
path, err = filepath.Abs(path)
if err != nil {
return nil, errors.Wrap(err, "filepath.Abs")
}
matches, err := filepath.Glob(path)
if err != nil {
return nil, errors.Wrap(err, "filepath.Glob")
@ -94,11 +97,11 @@ func NewTarget(logger log.Logger, handler EntryHandler, positions *Positions, pa
}
}
t := &Target{
t := &FileTarget{
logger: logger,
watcher: watcher,
path: path,
handler: addLabelsMiddleware(labels).Wrap(handler),
handler: api.AddLabelsMiddleware(labels).Wrap(handler),
positions: positions,
quit: make(chan struct{}),
done: make(chan struct{}),
@ -130,12 +133,12 @@ func NewTarget(logger log.Logger, handler EntryHandler, positions *Positions, pa
}
// Stop the target.
func (t *Target) Stop() {
func (t *FileTarget) Stop() {
close(t.quit)
<-t.done
}
func (t *Target) run() {
func (t *FileTarget) run() {
defer func() {
helpers.LogError("closing watcher", t.watcher.Close)
for _, v := range t.tails {
@ -203,8 +206,8 @@ func (t *Target) run() {
type tailer struct {
logger log.Logger
handler EntryHandler
positions *Positions
handler api.EntryHandler
positions *positions.Positions
path string
filename string
@ -248,7 +251,7 @@ func newTailer(logger log.Logger, handler EntryHandler, positions *Positions, pa
tailer := &tailer{
logger: logger,
handler: addLabelsMiddleware(model.LabelSet{"filename": model.LabelValue(path)}).Wrap(handler),
handler: api.AddLabelsMiddleware(model.LabelSet{filename: model.LabelValue(path)}).Wrap(handler),
positions: positions,
path: path,

@ -1,4 +1,4 @@
package promtail
package file
import (
"context"
@ -16,6 +16,10 @@ import (
"github.com/prometheus/prometheus/discovery/targetgroup"
pkgrelabel "github.com/prometheus/prometheus/pkg/relabel"
"github.com/prometheus/prometheus/relabel"
"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/grafana/loki/pkg/promtail/positions"
)
const (
@ -48,9 +52,9 @@ type TargetManager struct {
// NewTargetManager creates a new TargetManager.
func NewTargetManager(
logger log.Logger,
positions *Positions,
client EntryHandler,
scrapeConfig []ScrapeConfig,
positions *positions.Positions,
client api.EntryHandler,
scrapeConfigs []api.ScrapeConfig,
) (*TargetManager, error) {
ctx, quit := context.WithCancel(context.Background())
tm := &TargetManager{
@ -67,12 +71,12 @@ func NewTargetManager(
}
config := map[string]sd_config.ServiceDiscoveryConfig{}
for _, cfg := range scrapeConfig {
for _, cfg := range scrapeConfigs {
s := &syncer{
log: logger,
positions: positions,
relabelConfig: cfg.RelabelConfigs,
targets: map[string]*Target{},
targets: map[string]*FileTarget{},
hostname: hostname,
entryHandler: cfg.EntryParser.Wrap(client),
}
@ -101,15 +105,16 @@ func (tm *TargetManager) Stop() {
for _, s := range tm.syncers {
s.stop()
}
}
type syncer struct {
log log.Logger
positions *Positions
entryHandler EntryHandler
positions *positions.Positions
entryHandler api.EntryHandler
hostname string
targets map[string]*Target
targets map[string]*FileTarget
relabelConfig []*pkgrelabel.Config
}
@ -181,8 +186,8 @@ func (s *syncer) Sync(groups []*targetgroup.Group) {
}
}
func (s *syncer) newTarget(path string, labels model.LabelSet) (*Target, error) {
return NewTarget(s.log, s.entryHandler, s.positions, path, labels)
func (s *syncer) newTarget(path string, labels model.LabelSet) (*FileTarget, error) {
return NewFileTarget(s.log, s.entryHandler, s.positions, path, labels)
}
func (s *syncer) stop() {

@ -0,0 +1,58 @@
package targets
import (
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/grafana/loki/pkg/promtail/positions"
"github.com/grafana/loki/pkg/promtail/targets/file"
)
type GenericTargetManager interface {
Stop()
}
type TargetManager struct {
targetManagers []GenericTargetManager
}
func NewTargetManager(
logger log.Logger,
positions *positions.Positions,
client api.EntryHandler,
scrapeConfigs []api.ScrapeConfig,
) (*TargetManager, error) {
var targetManagers []GenericTargetManager
var fileScrapeConfigs []api.ScrapeConfig
for _, cfg := range scrapeConfigs {
// for now every scrape config is a file target
fileScrapeConfigs = append(
fileScrapeConfigs,
cfg,
)
}
fileTargetManager, err := file.NewTargetManager(
logger,
positions,
client,
fileScrapeConfigs,
)
if err != nil {
return nil, errors.Wrap(err, "failed to make file target manager")
}
targetManagers = append(targetManagers, fileTargetManager)
return &TargetManager{targetManagers: targetManagers}, nil
}
func (tm *TargetManager) Stop() {
for _, t := range tm.targetManagers {
go func() {
t.Stop()
}()
}
}
Loading…
Cancel
Save