[new feature] promtail: Add config reload endoint / signal to promtail (#7247)

k119
李国忠 3 years ago committed by GitHub
parent 381735d3d4
commit fb26baa5b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 42
      clients/cmd/promtail/main.go
  2. 153
      clients/pkg/promtail/promtail.go
  3. 173
      clients/pkg/promtail/promtail_test.go
  4. 41
      clients/pkg/promtail/server/server.go
  5. 37
      clients/pkg/promtail/targets/manager.go

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"os" "os"
"reflect" "reflect"
"sync"
// embed time zone data // embed time zone data
_ "time/tzdata" _ "time/tzdata"
@ -20,11 +21,12 @@ import (
"github.com/grafana/loki/clients/pkg/logentry/stages" "github.com/grafana/loki/clients/pkg/logentry/stages"
"github.com/grafana/loki/clients/pkg/promtail" "github.com/grafana/loki/clients/pkg/promtail"
"github.com/grafana/loki/clients/pkg/promtail/client" "github.com/grafana/loki/clients/pkg/promtail/client"
"github.com/grafana/loki/clients/pkg/promtail/config" promtail_config "github.com/grafana/loki/clients/pkg/promtail/config"
"github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util"
_ "github.com/grafana/loki/pkg/util/build"
"github.com/grafana/loki/pkg/util/cfg" "github.com/grafana/loki/pkg/util/cfg"
_ "github.com/grafana/loki/pkg/util/build"
util_log "github.com/grafana/loki/pkg/util/log" util_log "github.com/grafana/loki/pkg/util/log"
) )
@ -32,16 +34,18 @@ func init() {
prometheus.MustRegister(version.NewCollector("promtail")) prometheus.MustRegister(version.NewCollector("promtail"))
} }
var mtx sync.Mutex
type Config struct { type Config struct {
config.Config `yaml:",inline"` promtail_config.Config `yaml:",inline"`
printVersion bool printVersion bool
printConfig bool printConfig bool
logConfig bool logConfig bool
dryRun bool dryRun bool
checkSyntax bool checkSyntax bool
configFile string configFile string
configExpandEnv bool configExpandEnv bool
inspect bool inspect bool
} }
func (c *Config) RegisterFlags(f *flag.FlagSet) { func (c *Config) RegisterFlags(f *flag.FlagSet) {
@ -68,11 +72,11 @@ func (c *Config) Clone() flagext.Registerer {
func main() { func main() {
// Load config, merging config file and CLI flags // Load config, merging config file and CLI flags
var config Config var config Config
if err := cfg.DefaultUnmarshal(&config, os.Args[1:], flag.CommandLine); err != nil { args := os.Args[1:]
if err := cfg.DefaultUnmarshal(&config, args, flag.CommandLine); err != nil {
fmt.Println("Unable to parse config:", err) fmt.Println("Unable to parse config:", err)
os.Exit(1) os.Exit(1)
} }
if config.checkSyntax { if config.checkSyntax {
if config.configFile == "" { if config.configFile == "" {
fmt.Println("Invalid config file") fmt.Println("Invalid config file")
@ -123,7 +127,17 @@ func main() {
} }
clientMetrics := client.NewMetrics(prometheus.DefaultRegisterer, config.Config.Options.StreamLagLabels) clientMetrics := client.NewMetrics(prometheus.DefaultRegisterer, config.Config.Options.StreamLagLabels)
p, err := promtail.New(config.Config, clientMetrics, config.dryRun) newConfigFunc := func() (*promtail_config.Config, error) {
mtx.Lock()
defer mtx.Unlock()
var config Config
if err := cfg.DefaultUnmarshal(&config, args, flag.NewFlagSet(os.Args[0], flag.ExitOnError)); err != nil {
fmt.Println("Unable to parse config:", err)
return nil, fmt.Errorf("unable to parse config: %w", err)
}
return &config.Config, nil
}
p, err := promtail.New(config.Config, newConfigFunc, clientMetrics, config.dryRun)
if err != nil { if err != nil {
level.Error(util_log.Logger).Log("msg", "error creating promtail", "error", err) level.Error(util_log.Logger).Log("msg", "error creating promtail", "error", err)
os.Exit(1) os.Exit(1)

@ -1,9 +1,15 @@
package promtail package promtail
import ( import (
"errors"
"fmt"
"os"
"os/signal"
"sync" "sync"
"syscall"
"github.com/go-kit/log" "github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/grafana/loki/clients/pkg/logentry/stages" "github.com/grafana/loki/clients/pkg/logentry/stages"
@ -16,6 +22,20 @@ import (
util_log "github.com/grafana/loki/pkg/util/log" util_log "github.com/grafana/loki/pkg/util/log"
) )
var reloadSuccessTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "promtail",
Name: "config_reload_success_total",
Help: "Number of reload success times.",
})
var reloadFailTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "promtail",
Name: "config_reload_fail_total",
Help: "Number of reload fail times.",
})
var errConfigNotChange = errors.New("config has not changed")
// Option is a function that can be passed to the New method of Promtail and // Option is a function that can be passed to the New method of Promtail and
// customize the Promtail that is created. // customize the Promtail that is created.
type Option func(p *Promtail) type Option func(p *Promtail)
@ -42,17 +62,32 @@ type Promtail struct {
logger log.Logger logger log.Logger
reg prometheus.Registerer reg prometheus.Registerer
stopped bool stopped bool
mtx sync.Mutex mtx sync.Mutex
configLoaded string
newConfig func() (*config.Config, error)
metrics *client.Metrics
dryRun bool
} }
// New makes a new Promtail. // New makes a new Promtail.
func New(cfg config.Config, metrics *client.Metrics, dryRun bool, opts ...Option) (*Promtail, error) { func New(cfg config.Config, newConfig func() (*config.Config, error), metrics *client.Metrics, dryRun bool, opts ...Option) (*Promtail, error) {
// Initialize promtail with some defaults and allow the options to override // Initialize promtail with some defaults and allow the options to override
// them. // them.
promtail := &Promtail{ promtail := &Promtail{
logger: util_log.Logger, logger: util_log.Logger,
reg: prometheus.DefaultRegisterer, reg: prometheus.DefaultRegisterer,
metrics: metrics,
dryRun: dryRun,
}
err := promtail.reg.Register(reloadSuccessTotal)
if err != nil {
return nil, fmt.Errorf("error register prometheus collector reloadSuccessTotal :%w", err)
}
err = promtail.reg.Register(reloadFailTotal)
if err != nil {
return nil, fmt.Errorf("error register prometheus collector reloadFailTotal :%w", err)
} }
for _, o := range opts { for _, o := range opts {
// todo (callum) I don't understand why I needed to add this check // todo (callum) I don't understand why I needed to add this check
@ -61,37 +96,71 @@ func New(cfg config.Config, metrics *client.Metrics, dryRun bool, opts ...Option
} }
o(promtail) o(promtail)
} }
err = promtail.reloadConfig(&cfg)
if err != nil {
return nil, err
}
server, err := server.New(cfg.ServerConfig, promtail.logger, promtail.targetManagers, cfg.String())
if err != nil {
return nil, fmt.Errorf("error creating loki server: %w", err)
}
promtail.server = server
promtail.newConfig = newConfig
cfg.Setup(promtail.logger) return promtail, nil
}
func (p *Promtail) reloadConfig(cfg *config.Config) error {
level.Debug(p.logger).Log("msg", "Reloading configuration file")
p.mtx.Lock()
defer p.mtx.Unlock()
newConfigFile := cfg.String()
if newConfigFile == p.configLoaded {
return errConfigNotChange
}
newConf := cfg.String()
level.Info(p.logger).Log("msg", "Reloading configuration file", "newConf", newConf)
if p.targetManagers != nil {
p.targetManagers.Stop()
}
if p.client != nil {
p.client.Stop()
}
cfg.Setup(p.logger)
if cfg.LimitsConfig.ReadlineRateEnabled { if cfg.LimitsConfig.ReadlineRateEnabled {
stages.SetReadLineRateLimiter(cfg.LimitsConfig.ReadlineRate, cfg.LimitsConfig.ReadlineBurst, cfg.LimitsConfig.ReadlineRateDrop) stages.SetReadLineRateLimiter(cfg.LimitsConfig.ReadlineRate, cfg.LimitsConfig.ReadlineBurst, cfg.LimitsConfig.ReadlineRateDrop)
} }
var err error var err error
if dryRun { if p.dryRun {
promtail.client, err = client.NewLogger(metrics, cfg.Options.StreamLagLabels, promtail.logger, cfg.ClientConfigs...) p.client, err = client.NewLogger(p.metrics, cfg.Options.StreamLagLabels, p.logger, cfg.ClientConfigs...)
if err != nil { if err != nil {
return nil, err return err
} }
cfg.PositionsConfig.ReadOnly = true cfg.PositionsConfig.ReadOnly = true
} else { } else {
promtail.client, err = client.NewMulti(metrics, cfg.Options.StreamLagLabels, promtail.logger, cfg.LimitsConfig.MaxStreams, cfg.ClientConfigs...) p.client, err = client.NewMulti(p.metrics, cfg.Options.StreamLagLabels, p.logger, cfg.LimitsConfig.MaxStreams, cfg.ClientConfigs...)
if err != nil { if err != nil {
return nil, err return err
} }
} }
tms, err := targets.NewTargetManagers(promtail, promtail.reg, promtail.logger, cfg.PositionsConfig, promtail.client, cfg.ScrapeConfig, &cfg.TargetConfig) tms, err := targets.NewTargetManagers(p, p.reg, p.logger, cfg.PositionsConfig, p.client, cfg.ScrapeConfig, &cfg.TargetConfig)
if err != nil { if err != nil {
return nil, err return err
} }
promtail.targetManagers = tms p.targetManagers = tms
server, err := server.New(cfg.ServerConfig, promtail.logger, tms, cfg.String())
if err != nil { promServer := p.server
return nil, err if promServer != nil {
promtailServer, ok := promServer.(*server.PromtailServer)
if !ok {
return errors.New("promtailServer cast fail")
}
promtailServer.ReloadServer(p.targetManagers, cfg.String())
} }
promtail.server = server p.configLoaded = newConf
return promtail, nil return nil
} }
// Run the promtail; will block until a signal is received. // Run the promtail; will block until a signal is received.
@ -103,6 +172,7 @@ func (p *Promtail) Run() error {
return nil return nil
} }
p.mtx.Unlock() // unlock before blocking p.mtx.Unlock() // unlock before blocking
go p.watchConfig()
return p.server.Run() return p.server.Run()
} }
@ -133,3 +203,48 @@ func (p *Promtail) Shutdown() {
func (p *Promtail) ActiveTargets() map[string][]target.Target { func (p *Promtail) ActiveTargets() map[string][]target.Target {
return p.targetManagers.ActiveTargets() return p.targetManagers.ActiveTargets()
} }
func (p *Promtail) watchConfig() {
// Reload handler.
// Make sure that sighup handler is registered with a redirect to the channel before the potentially
if p.newConfig == nil {
level.Warn(p.logger).Log("msg", "disable watchConfig", "reason", "Promtail newConfig func is Empty")
return
}
promtailServer, ok := p.server.(*server.PromtailServer)
if !ok {
level.Warn(p.logger).Log("msg", "disable watchConfig", "reason", "promtailServer cast fail")
return
}
level.Warn(p.logger).Log("msg", "enable watchConfig")
hup := make(chan os.Signal, 1)
signal.Notify(hup, syscall.SIGHUP)
for {
select {
case <-hup:
_ = p.reload()
case rc := <-promtailServer.Reload():
if err := p.reload(); err != nil {
rc <- err
} else {
rc <- nil
}
}
}
}
func (p *Promtail) reload() error {
cfg, err := p.newConfig()
if err != nil {
reloadFailTotal.Inc()
return fmt.Errorf("Error new Config: %w", err)
}
err = p.reloadConfig(cfg)
if err != nil {
reloadFailTotal.Inc()
level.Error(p.logger).Log("msg", "Error reloading config", "err", err)
return err
}
reloadSuccessTotal.Inc()
return nil
}

@ -20,6 +20,7 @@ import (
"github.com/grafana/dskit/flagext" "github.com/grafana/dskit/flagext"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/model" "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/discovery/targetgroup"
@ -106,7 +107,7 @@ func TestPromtail(t *testing.T) {
_ = server.Shutdown(context.Background()) _ = server.Shutdown(context.Background())
}() }()
p, err := New(buildTestConfig(t, positionsFileName, testDir), clientMetrics, false, nil) p, err := New(buildTestConfig(t, positionsFileName, testDir), nil, clientMetrics, false, nil)
if err != nil { if err != nil {
t.Error("error creating promtail", err) t.Error("error creating promtail", err)
return return
@ -659,7 +660,7 @@ func Test_DryRun(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer os.Remove(f.Name()) defer os.Remove(f.Name())
_, err = New(config.Config{}, clientMetrics, true, nil) _, err = New(config.Config{}, nil, clientMetrics, true, nil)
require.Error(t, err) require.Error(t, err)
// Set the minimum config needed to start a server. We need to do this since we // Set the minimum config needed to start a server. We need to do this since we
@ -673,7 +674,8 @@ func Test_DryRun(t *testing.T) {
}, },
} }
prometheus.DefaultRegisterer = prometheus.NewRegistry() // reset registry, otherwise you can't create 2 weavework server. prometheus.DefaultRegisterer = prometheus.NewRegistry()
_, err = New(config.Config{ _, err = New(config.Config{
ServerConfig: serverCfg, ServerConfig: serverCfg,
ClientConfig: client.Config{URL: flagext.URLValue{URL: &url.URL{Host: "string"}}}, ClientConfig: client.Config{URL: flagext.URLValue{URL: &url.URL{Host: "string"}}},
@ -681,7 +683,7 @@ func Test_DryRun(t *testing.T) {
PositionsFile: f.Name(), PositionsFile: f.Name(),
SyncPeriod: time.Second, SyncPeriod: time.Second,
}, },
}, clientMetrics, true, nil) }, nil, clientMetrics, true, nil)
require.NoError(t, err) require.NoError(t, err)
prometheus.DefaultRegisterer = prometheus.NewRegistry() prometheus.DefaultRegisterer = prometheus.NewRegistry()
@ -693,7 +695,168 @@ func Test_DryRun(t *testing.T) {
PositionsFile: f.Name(), PositionsFile: f.Name(),
SyncPeriod: time.Second, SyncPeriod: time.Second,
}, },
}, clientMetrics, false, nil) }, nil, clientMetrics, false, nil)
require.NoError(t, err) require.NoError(t, err)
require.IsType(t, &client.MultiClient{}, p.client) require.IsType(t, &client.MultiClient{}, p.client)
} }
func Test_Reload(t *testing.T) {
f, err := os.CreateTemp("/tmp", "Test_Reload")
require.NoError(t, err)
defer os.Remove(f.Name())
cfg := config.Config{
ServerConfig: server.Config{
Reload: true,
},
ClientConfig: client.Config{URL: flagext.URLValue{URL: &url.URL{Host: "string"}}},
PositionsConfig: positions.Config{
PositionsFile: f.Name(),
SyncPeriod: time.Second,
},
}
expectCfgStr := cfg.String()
expectedConfig := &config.Config{
ServerConfig: server.Config{
Reload: true,
},
ClientConfig: client.Config{URL: flagext.URLValue{URL: &url.URL{Host: "reloadtesturl"}}},
PositionsConfig: positions.Config{
PositionsFile: f.Name(),
SyncPeriod: time.Second,
},
}
expectedConfigReloaded := expectedConfig.String()
prometheus.DefaultRegisterer = prometheus.NewRegistry() // reset registry, otherwise you can't create 2 weavework server.
promtailServer, err := New(cfg, func() (*config.Config, error) {
return expectedConfig, nil
}, clientMetrics, true, nil)
require.NoError(t, err)
require.Equal(t, len(expectCfgStr), len(promtailServer.configLoaded))
require.Equal(t, expectCfgStr, promtailServer.configLoaded)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err = promtailServer.Run()
if err != nil {
err = errors.Wrap(err, "Failed to start promtail")
}
}()
defer promtailServer.Shutdown() // In case the test fails before the call to Shutdown below.
svr := promtailServer.server.(*pserver.PromtailServer)
require.NotEqual(t, len(expectedConfig.String()), len(svr.PromtailConfig()))
require.NotEqual(t, expectedConfig.String(), svr.PromtailConfig())
result, err := reload(t, svr.Server.HTTPListenAddr())
require.NoError(t, err)
expectedReloadResult := ""
require.Equal(t, expectedReloadResult, result)
require.Equal(t, len(expectedConfig.String()), len(svr.PromtailConfig()))
require.Equal(t, expectedConfig.String(), svr.PromtailConfig())
require.Equal(t, len(expectedConfigReloaded), len(promtailServer.configLoaded))
require.Equal(t, expectedConfigReloaded, promtailServer.configLoaded)
pb := &dto.Metric{}
err = reloadSuccessTotal.Write(pb)
require.NoError(t, err)
require.Equal(t, 1.0, pb.Counter.GetValue())
}
func Test_ReloadFail_NotPanic(t *testing.T) {
f, err := os.CreateTemp("/tmp", "Test_Reload")
require.NoError(t, err)
defer os.Remove(f.Name())
cfg := config.Config{
ServerConfig: server.Config{
Reload: true,
},
ClientConfig: client.Config{URL: flagext.URLValue{URL: &url.URL{Host: "string"}}},
PositionsConfig: positions.Config{
PositionsFile: f.Name(),
SyncPeriod: time.Second,
},
}
expectedConfig := &config.Config{
ServerConfig: server.Config{
Reload: true,
},
ClientConfig: client.Config{URL: flagext.URLValue{URL: &url.URL{Host: "reloadtesturl"}}},
PositionsConfig: positions.Config{
PositionsFile: f.Name(),
SyncPeriod: time.Second,
},
}
newConfigErr := errors.New("load config fail")
prometheus.DefaultRegisterer = prometheus.NewRegistry() // reset registry, otherwise you can't create 2 weavework server.
promtailServer, err := New(cfg, func() (*config.Config, error) {
return nil, newConfigErr
}, clientMetrics, true, nil)
require.NoError(t, err)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err = promtailServer.Run()
if err != nil {
err = errors.Wrap(err, "Failed to start promtail")
}
}()
defer promtailServer.Shutdown() // In case the test fails before the call to Shutdown below.
svr := promtailServer.server.(*pserver.PromtailServer)
httpListenAddr := svr.Server.HTTPListenAddr()
require.NotEqual(t, len(expectedConfig.String()), len(svr.PromtailConfig()))
require.NotEqual(t, expectedConfig.String(), svr.PromtailConfig())
result, err := reload(t, httpListenAddr)
require.Error(t, err)
expectedReloadResult := fmt.Sprintf("failed to reload config: Error new Config: %s\n", newConfigErr)
require.Equal(t, expectedReloadResult, result)
pb := &dto.Metric{}
err = reloadFailTotal.Write(pb)
require.NoError(t, err)
require.Equal(t, 1.0, pb.Counter.GetValue())
promtailServer.newConfig = func() (*config.Config, error) {
return &cfg, nil
}
result, err = reload(t, httpListenAddr)
require.Error(t, err)
require.Equal(t, fmt.Sprintf("failed to reload config: %s\n", errConfigNotChange), result)
}
func reload(t *testing.T, httpListenAddr net.Addr) (string, error) {
resp, err := http.Get(fmt.Sprintf("http://%s/reload", httpListenAddr))
if err != nil {
t.Fatal("Could not query reload endpoint", err)
}
if resp.StatusCode == http.StatusInternalServerError {
b, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatal("Error reading response body from /reload endpoint", err)
}
return string(b), errors.New("Received a 500 status code from /reload endpoint")
}
if resp.StatusCode != http.StatusOK {
return "", errors.New("Received a non 200 status code from /reload endpoint")
}
b, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatal("Error reading response body from /reload endpoint", err)
}
return string(b), nil
}

@ -10,6 +10,7 @@ import (
"path" "path"
"sort" "sort"
"strings" "strings"
"sync"
"syscall" "syscall"
"text/template" "text/template"
@ -39,8 +40,10 @@ type Server interface {
type PromtailServer struct { type PromtailServer struct {
*serverww.Server *serverww.Server
log log.Logger log log.Logger
mtx sync.Mutex
tms *targets.TargetManagers tms *targets.TargetManagers
externalURL *url.URL externalURL *url.URL
reloadCh chan chan error
healthCheckTarget bool healthCheckTarget bool
promtailCfg string promtailCfg string
} }
@ -51,6 +54,7 @@ type Config struct {
ExternalURL string `yaml:"external_url"` ExternalURL string `yaml:"external_url"`
HealthCheckTarget *bool `yaml:"health_check_target"` HealthCheckTarget *bool `yaml:"health_check_target"`
Disable bool `yaml:"disable"` Disable bool `yaml:"disable"`
Reload bool `yaml:"enable_runtime_reload"`
} }
// RegisterFlags with prefix registers flags where every name is prefixed by // RegisterFlags with prefix registers flags where every name is prefixed by
@ -60,6 +64,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.Config.RegisterFlags(f) cfg.Config.RegisterFlags(f)
f.BoolVar(&cfg.Disable, prefix+"server.disable", false, "Disable the http and grpc server.") f.BoolVar(&cfg.Disable, prefix+"server.disable", false, "Disable the http and grpc server.")
f.BoolVar(&cfg.Reload, prefix+"server.reload", false, "Enable reload via HTTP request.")
} }
// RegisterFlags adds the flags required to config this to the given FlagSet // RegisterFlags adds the flags required to config this to the given FlagSet
@ -91,6 +96,7 @@ func New(cfg Config, log log.Logger, tms *targets.TargetManagers, promtailCfg st
serv := &PromtailServer{ serv := &PromtailServer{
Server: wws, Server: wws,
log: log, log: log,
reloadCh: make(chan chan error),
tms: tms, tms: tms,
externalURL: externalURL, externalURL: externalURL,
healthCheckTarget: healthCheckTargetFlag, healthCheckTarget: healthCheckTargetFlag,
@ -103,12 +109,17 @@ func New(cfg Config, log log.Logger, tms *targets.TargetManagers, promtailCfg st
serv.HTTP.Path("/service-discovery").Handler(http.HandlerFunc(serv.serviceDiscovery)) serv.HTTP.Path("/service-discovery").Handler(http.HandlerFunc(serv.serviceDiscovery))
serv.HTTP.Path("/targets").Handler(http.HandlerFunc(serv.targets)) serv.HTTP.Path("/targets").Handler(http.HandlerFunc(serv.targets))
serv.HTTP.Path("/config").Handler(http.HandlerFunc(serv.config)) serv.HTTP.Path("/config").Handler(http.HandlerFunc(serv.config))
if cfg.Reload {
serv.HTTP.Path("/reload").Handler(http.HandlerFunc(serv.reload))
}
serv.HTTP.Path("/debug/fgprof").Handler(fgprof.Handler()) serv.HTTP.Path("/debug/fgprof").Handler(fgprof.Handler())
return serv, nil return serv, nil
} }
// serviceDiscovery serves the service discovery page. // serviceDiscovery serves the service discovery page.
func (s *PromtailServer) serviceDiscovery(rw http.ResponseWriter, req *http.Request) { func (s *PromtailServer) serviceDiscovery(rw http.ResponseWriter, req *http.Request) {
s.mtx.Lock()
defer s.mtx.Unlock()
var index []string var index []string
allTarget := s.tms.AllTargets() allTarget := s.tms.AllTargets()
for job := range allTarget { for job := range allTarget {
@ -187,6 +198,8 @@ func (s *PromtailServer) config(rw http.ResponseWriter, req *http.Request) {
// targets serves the targets page. // targets serves the targets page.
func (s *PromtailServer) targets(rw http.ResponseWriter, req *http.Request) { func (s *PromtailServer) targets(rw http.ResponseWriter, req *http.Request) {
s.mtx.Lock()
defer s.mtx.Unlock()
executeTemplate(req.Context(), rw, templateOptions{ executeTemplate(req.Context(), rw, templateOptions{
Data: struct { Data: struct {
TargetPools map[string][]target.Target TargetPools map[string][]target.Target
@ -218,8 +231,36 @@ func (s *PromtailServer) targets(rw http.ResponseWriter, req *http.Request) {
}) })
} }
func (s *PromtailServer) reload(rw http.ResponseWriter, req *http.Request) {
rc := make(chan error)
s.reloadCh <- rc
if err := <-rc; err != nil {
http.Error(rw, fmt.Sprintf("failed to reload config: %s", err), http.StatusInternalServerError)
}
}
// Reload returns the receive-only channel that signals configuration reload requests.
func (s *PromtailServer) Reload() <-chan chan error {
return s.reloadCh
}
// Reload returns the receive-only channel that signals configuration reload requests.
func (s *PromtailServer) PromtailConfig() string {
return s.promtailCfg
}
func (s *PromtailServer) ReloadServer(tms *targets.TargetManagers, promtailCfg string) {
s.mtx.Lock()
defer s.mtx.Unlock()
s.tms = tms
s.promtailCfg = promtailCfg
}
// ready serves the ready endpoint // ready serves the ready endpoint
func (s *PromtailServer) ready(rw http.ResponseWriter, _ *http.Request) { func (s *PromtailServer) ready(rw http.ResponseWriter, _ *http.Request) {
s.mtx.Lock()
defer s.mtx.Unlock()
if s.healthCheckTarget && !s.tms.Ready() { if s.healthCheckTarget && !s.tms.Ready() {
http.Error(rw, readinessProbeFailure, http.StatusInternalServerError) http.Error(rw, readinessProbeFailure, http.StatusInternalServerError)
return return

@ -41,6 +41,17 @@ const (
HerokuDrainConfigs = "herokuDrainConfigs" HerokuDrainConfigs = "herokuDrainConfigs"
) )
var (
fileMetrics *file.Metrics
syslogMetrics *syslog.Metrics
gcplogMetrics *gcplog.Metrics
gelfMetrics *gelf.Metrics
cloudflareMetrics *cloudflare.Metrics
dockerMetrics *docker.Metrics
journalMetrics *journal.Metrics
herokuDrainMetrics *heroku.Metrics
)
type targetManager interface { type targetManager interface {
Ready() bool Ready() bool
Stop() Stop()
@ -119,38 +130,28 @@ func NewTargetManagers(
return positionFile, nil return positionFile, nil
} }
var ( if len(targetScrapeConfigs[FileScrapeConfigs]) > 0 && fileMetrics == nil {
fileMetrics *file.Metrics
syslogMetrics *syslog.Metrics
gcplogMetrics *gcplog.Metrics
gelfMetrics *gelf.Metrics
cloudflareMetrics *cloudflare.Metrics
dockerMetrics *docker.Metrics
journalMetrics *journal.Metrics
herokuDrainMetrics *heroku.Metrics
)
if len(targetScrapeConfigs[FileScrapeConfigs]) > 0 {
fileMetrics = file.NewMetrics(reg) fileMetrics = file.NewMetrics(reg)
} }
if len(targetScrapeConfigs[SyslogScrapeConfigs]) > 0 { if len(targetScrapeConfigs[SyslogScrapeConfigs]) > 0 && syslogMetrics == nil {
syslogMetrics = syslog.NewMetrics(reg) syslogMetrics = syslog.NewMetrics(reg)
} }
if len(targetScrapeConfigs[GcplogScrapeConfigs]) > 0 { if len(targetScrapeConfigs[GcplogScrapeConfigs]) > 0 && gcplogMetrics == nil {
gcplogMetrics = gcplog.NewMetrics(reg) gcplogMetrics = gcplog.NewMetrics(reg)
} }
if len(targetScrapeConfigs[GelfConfigs]) > 0 { if len(targetScrapeConfigs[GelfConfigs]) > 0 && gelfMetrics == nil {
gelfMetrics = gelf.NewMetrics(reg) gelfMetrics = gelf.NewMetrics(reg)
} }
if len(targetScrapeConfigs[CloudflareConfigs]) > 0 { if len(targetScrapeConfigs[CloudflareConfigs]) > 0 && cloudflareMetrics == nil {
cloudflareMetrics = cloudflare.NewMetrics(reg) cloudflareMetrics = cloudflare.NewMetrics(reg)
} }
if len(targetScrapeConfigs[DockerConfigs]) > 0 || len(targetScrapeConfigs[DockerSDConfigs]) > 0 { if (len(targetScrapeConfigs[DockerConfigs]) > 0 || len(targetScrapeConfigs[DockerSDConfigs]) > 0) && dockerMetrics == nil {
dockerMetrics = docker.NewMetrics(reg) dockerMetrics = docker.NewMetrics(reg)
} }
if len(targetScrapeConfigs[JournalScrapeConfigs]) > 0 { if len(targetScrapeConfigs[JournalScrapeConfigs]) > 0 && journalMetrics == nil {
journalMetrics = journal.NewMetrics(reg) journalMetrics = journal.NewMetrics(reg)
} }
if len(targetScrapeConfigs[HerokuDrainConfigs]) > 0 { if len(targetScrapeConfigs[HerokuDrainConfigs]) > 0 && herokuDrainMetrics == nil {
herokuDrainMetrics = heroku.NewMetrics(reg) herokuDrainMetrics = heroku.NewMetrics(reg)
} }

Loading…
Cancel
Save