parent
9f0dcc1d91
commit
3ac5222f8b
@ -0,0 +1,21 @@ |
||||
// Exporter is a prometheus exporter using multiple Factories to collect and export system metrics.
|
||||
package collector |
||||
|
||||
import ( |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
) |
||||
|
||||
var Factories []func(Config, prometheus.Registry) (Collector, error) |
||||
|
||||
// Interface a collector has to implement.
|
||||
type Collector interface { |
||||
// Get new metrics and expose them via prometheus registry.
|
||||
Update() (n int, err error) |
||||
|
||||
// Returns the name of the collector
|
||||
Name() string |
||||
} |
||||
|
||||
type Config struct { |
||||
Attributes map[string]string `json:"attributes"` |
||||
} |
||||
@ -1,12 +1,15 @@ |
||||
package exporter |
||||
package collector |
||||
|
||||
import ( |
||||
"flag" |
||||
"fmt" |
||||
"log" |
||||
"strconv" |
||||
"strings" |
||||
) |
||||
|
||||
var verbose = flag.Bool("verbose", false, "Verbose output.") |
||||
|
||||
func debug(name string, format string, a ...interface{}) { |
||||
if *verbose { |
||||
f := fmt.Sprintf("%s: %s", name, format) |
||||
@ -1,167 +0,0 @@ |
||||
// Exporter is a prometheus exporter using multiple collectorFactories to collect and export system metrics.
|
||||
package exporter |
||||
|
||||
import ( |
||||
"encoding/json" |
||||
"flag" |
||||
"fmt" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"github.com/prometheus/client_golang/prometheus/exp" |
||||
"io/ioutil" |
||||
"log" |
||||
"net/http" |
||||
"os" |
||||
"os/signal" |
||||
"runtime/pprof" |
||||
"sync" |
||||
"syscall" |
||||
"time" |
||||
) |
||||
|
||||
var verbose = flag.Bool("verbose", false, "Verbose output.") |
||||
var collectorFactories []func(config, prometheus.Registry) (Collector, error) |
||||
|
||||
// Interface a collector has to implement.
|
||||
type Collector interface { |
||||
// Get new metrics and expose them via prometheus registry.
|
||||
Update() (n int, err error) |
||||
|
||||
// Returns the name of the collector
|
||||
Name() string |
||||
} |
||||
|
||||
type config struct { |
||||
Attributes map[string]string `json:"attributes"` |
||||
ListeningAddress string `json:"listeningAddress"` |
||||
ScrapeInterval int `json:"scrapeInterval"` |
||||
} |
||||
|
||||
func (e *exporter) loadConfig() (err error) { |
||||
log.Printf("Reading config %s", e.configFile) |
||||
bytes, err := ioutil.ReadFile(e.configFile) |
||||
if err != nil { |
||||
return |
||||
} |
||||
|
||||
return json.Unmarshal(bytes, &e.config) // Make sure this is safe
|
||||
} |
||||
|
||||
type exporter struct { |
||||
configFile string |
||||
listeningAddress string |
||||
scrapeInterval time.Duration |
||||
scrapeDurations prometheus.Histogram |
||||
metricsUpdated prometheus.Gauge |
||||
config config |
||||
registry prometheus.Registry |
||||
Collectors []Collector |
||||
MemProfile string |
||||
} |
||||
|
||||
// New takes the path to a config file and returns an exporter instance
|
||||
func New(configFile string) (e exporter, err error) { |
||||
registry := prometheus.NewRegistry() |
||||
e = exporter{ |
||||
configFile: configFile, |
||||
scrapeDurations: prometheus.NewDefaultHistogram(), |
||||
metricsUpdated: prometheus.NewGauge(), |
||||
listeningAddress: ":8080", |
||||
scrapeInterval: 60 * time.Second, |
||||
registry: registry, |
||||
} |
||||
|
||||
err = e.loadConfig() |
||||
if err != nil { |
||||
return e, fmt.Errorf("Couldn't read config: %s", err) |
||||
} |
||||
for _, fn := range collectorFactories { |
||||
c, err := fn(e.config, e.registry) |
||||
if err != nil { |
||||
return e, err |
||||
} |
||||
e.Collectors = append(e.Collectors, c) |
||||
} |
||||
|
||||
if e.config.ListeningAddress != "" { |
||||
e.listeningAddress = e.config.ListeningAddress |
||||
} |
||||
if e.config.ScrapeInterval != 0 { |
||||
e.scrapeInterval = time.Duration(e.config.ScrapeInterval) * time.Second |
||||
} |
||||
|
||||
registry.Register("node_exporter_scrape_duration_seconds", "node_exporter: Duration of a scrape job.", prometheus.NilLabels, e.scrapeDurations) |
||||
registry.Register("node_exporter_metrics_updated", "node_exporter: Number of metrics updated.", prometheus.NilLabels, e.metricsUpdated) |
||||
|
||||
return e, nil |
||||
} |
||||
|
||||
func (e *exporter) serveStatus() { |
||||
exp.Handle(prometheus.ExpositionResource, e.registry.Handler()) |
||||
http.ListenAndServe(e.listeningAddress, exp.DefaultCoarseMux) |
||||
} |
||||
|
||||
func (e *exporter) Execute(c Collector) { |
||||
begin := time.Now() |
||||
updates, err := c.Update() |
||||
duration := time.Since(begin) |
||||
|
||||
label := map[string]string{ |
||||
"collector": c.Name(), |
||||
} |
||||
if err != nil { |
||||
log.Printf("ERROR: %s failed after %fs: %s", c.Name(), duration.Seconds(), err) |
||||
label["result"] = "error" |
||||
} else { |
||||
log.Printf("OK: %s success after %fs.", c.Name(), duration.Seconds()) |
||||
label["result"] = "success" |
||||
} |
||||
e.scrapeDurations.Add(label, duration.Seconds()) |
||||
e.metricsUpdated.Set(label, float64(updates)) |
||||
} |
||||
|
||||
func (e *exporter) Loop() { |
||||
sigHup := make(chan os.Signal) |
||||
sigUsr1 := make(chan os.Signal) |
||||
signal.Notify(sigHup, syscall.SIGHUP) |
||||
signal.Notify(sigUsr1, syscall.SIGUSR1) |
||||
|
||||
go e.serveStatus() |
||||
|
||||
tick := time.Tick(e.scrapeInterval) |
||||
for { |
||||
select { |
||||
case <-sigHup: |
||||
err := e.loadConfig() |
||||
if err != nil { |
||||
log.Printf("Couldn't reload config: %s", err) |
||||
continue |
||||
} |
||||
log.Printf("Got new config") |
||||
tick = time.Tick(e.scrapeInterval) |
||||
|
||||
case <-tick: |
||||
log.Printf("Starting new scrape interval") |
||||
wg := sync.WaitGroup{} |
||||
wg.Add(len(e.Collectors)) |
||||
for _, c := range e.Collectors { |
||||
go func(c Collector) { |
||||
e.Execute(c) |
||||
wg.Done() |
||||
}(c) |
||||
} |
||||
wg.Wait() |
||||
|
||||
case <-sigUsr1: |
||||
log.Printf("got signal") |
||||
if e.MemProfile != "" { |
||||
log.Printf("Writing memory profile to %s", e.MemProfile) |
||||
f, err := os.Create(e.MemProfile) |
||||
if err != nil { |
||||
log.Fatal(err) |
||||
} |
||||
pprof.WriteHeapProfile(f) |
||||
f.Close() |
||||
} |
||||
} |
||||
} |
||||
} |
||||
@ -1,27 +1,140 @@ |
||||
package main |
||||
|
||||
import ( |
||||
"encoding/json" |
||||
"flag" |
||||
"io/ioutil" |
||||
"log" |
||||
"net/http" |
||||
"os" |
||||
"os/signal" |
||||
"runtime/pprof" |
||||
"sync" |
||||
"syscall" |
||||
"time" |
||||
|
||||
"github.com/prometheus/node_exporter/exporter" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"github.com/prometheus/client_golang/prometheus/exp" |
||||
"github.com/prometheus/node_exporter/collector" |
||||
) |
||||
|
||||
var ( |
||||
configFile = flag.String("config", "node_exporter.conf", "config file.") |
||||
memprofile = flag.String("memprofile", "", "write memory profile to this file") |
||||
configFile = flag.String("config", "node_exporter.conf", "config file.") |
||||
memProfile = flag.String("memprofile", "", "write memory profile to this file") |
||||
listeningAddress = flag.String("listen", ":8080", "address to listen on") |
||||
interval = flag.Duration("interval", 60*time.Second, "refresh interval") |
||||
scrapeDurations = prometheus.NewDefaultHistogram() |
||||
metricsUpdated = prometheus.NewGauge() |
||||
) |
||||
|
||||
func main() { |
||||
flag.Parse() |
||||
|
||||
exporter, err := exporter.New(*configFile) |
||||
registry := prometheus.NewRegistry() |
||||
collectors, err := loadCollectors(*configFile, registry) |
||||
if err != nil { |
||||
log.Fatalf("Couldn't instantiate exporter: %s", err) |
||||
log.Fatalf("Couldn't load config and collectors: %s", err) |
||||
} |
||||
|
||||
registry.Register("node_exporter_scrape_duration_seconds", "node_exporter: Duration of a scrape job.", prometheus.NilLabels, scrapeDurations) |
||||
registry.Register("node_exporter_metrics_updated", "node_exporter: Number of metrics updated.", prometheus.NilLabels, metricsUpdated) |
||||
|
||||
log.Printf("Registered collectors:") |
||||
for _, c := range exporter.Collectors { |
||||
for _, c := range collectors { |
||||
log.Print(" - ", c.Name()) |
||||
} |
||||
exporter.Loop() |
||||
|
||||
sigHup := make(chan os.Signal) |
||||
sigUsr1 := make(chan os.Signal) |
||||
signal.Notify(sigHup, syscall.SIGHUP) |
||||
signal.Notify(sigUsr1, syscall.SIGUSR1) |
||||
|
||||
go serveStatus(registry) |
||||
|
||||
tick := time.Tick(*interval) |
||||
for { |
||||
select { |
||||
case <-sigHup: |
||||
collectors, err = loadCollectors(*configFile, registry) |
||||
if err != nil { |
||||
log.Fatalf("Couldn't load config and collectors: %s", err) |
||||
} |
||||
log.Printf("Reload collectors and config") |
||||
tick = time.Tick(*interval) |
||||
|
||||
case <-tick: |
||||
log.Printf("Starting new interval") |
||||
wg := sync.WaitGroup{} |
||||
wg.Add(len(collectors)) |
||||
for _, c := range collectors { |
||||
go func(c collector.Collector) { |
||||
Execute(c) |
||||
wg.Done() |
||||
}(c) |
||||
} |
||||
wg.Wait() |
||||
|
||||
case <-sigUsr1: |
||||
log.Printf("got signal") |
||||
if *memProfile != "" { |
||||
log.Printf("Writing memory profile to %s", *memProfile) |
||||
f, err := os.Create(*memProfile) |
||||
if err != nil { |
||||
log.Fatal(err) |
||||
} |
||||
pprof.WriteHeapProfile(f) |
||||
f.Close() |
||||
} |
||||
} |
||||
} |
||||
|
||||
} |
||||
|
||||
func loadCollectors(file string, registry prometheus.Registry) ([]collector.Collector, error) { |
||||
collectors := []collector.Collector{} |
||||
config, err := getConfig(file) |
||||
if err != nil { |
||||
log.Fatalf("Couldn't read config %s: %s", file, err) |
||||
} |
||||
for _, fn := range collector.Factories { |
||||
c, err := fn(*config, registry) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
collectors = append(collectors, c) |
||||
} |
||||
return collectors, nil |
||||
} |
||||
|
||||
func getConfig(file string) (*collector.Config, error) { |
||||
config := &collector.Config{} |
||||
log.Printf("Reading config %s", *configFile) |
||||
bytes, err := ioutil.ReadFile(*configFile) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return config, json.Unmarshal(bytes, &config) |
||||
} |
||||
|
||||
func serveStatus(registry prometheus.Registry) { |
||||
exp.Handle(prometheus.ExpositionResource, registry.Handler()) |
||||
http.ListenAndServe(*listeningAddress, exp.DefaultCoarseMux) |
||||
} |
||||
|
||||
func Execute(c collector.Collector) { |
||||
begin := time.Now() |
||||
updates, err := c.Update() |
||||
duration := time.Since(begin) |
||||
|
||||
label := map[string]string{ |
||||
"collector": c.Name(), |
||||
} |
||||
if err != nil { |
||||
log.Printf("ERROR: %s failed after %fs: %s", c.Name(), duration.Seconds(), err) |
||||
label["result"] = "error" |
||||
} else { |
||||
log.Printf("OK: %s success after %fs.", c.Name(), duration.Seconds()) |
||||
label["result"] = "success" |
||||
} |
||||
scrapeDurations.Add(label, duration.Seconds()) |
||||
metricsUpdated.Set(label, float64(updates)) |
||||
} |
||||
|
||||
Loading…
Reference in new issue