diff --git a/scrape/manager.go b/scrape/manager.go index 65ca1dc2b8..1af9141077 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -14,7 +14,11 @@ package scrape import ( + "encoding" "fmt" + "hash/fnv" + "net" + "os" "reflect" "sync" "time" @@ -22,6 +26,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/storage" @@ -54,6 +59,7 @@ type Manager struct { append Appendable graceShut chan struct{} + jitterSeed uint64 // Global jitterSeed seed is used to spread scrape workload across HA setup. mtxScrape sync.Mutex // Guards the fields below. scrapeConfigs map[string]*config.ScrapeConfig scrapePools map[string]*scrapePool @@ -111,7 +117,7 @@ func (m *Manager) reload() { level.Error(m.logger).Log("msg", "error reloading target set", "err", "invalid config id:"+setName) continue } - sp, err := newScrapePool(scrapeConfig, m.append, log.With(m.logger, "scrape_pool", setName)) + sp, err := newScrapePool(scrapeConfig, m.append, m.jitterSeed, log.With(m.logger, "scrape_pool", setName)) if err != nil { level.Error(m.logger).Log("msg", "error creating new scrape pool", "err", err, "scrape_pool", setName) continue @@ -131,6 +137,20 @@ func (m *Manager) reload() { wg.Wait() } +// setJitterSeed calculates a global jitterSeed per server relying on extra label set. +func (m *Manager) setJitterSeed(labels model.LabelSet) error { + h := fnv.New64a() + hostname, err := getFqdn() + if err != nil { + return err + } + if _, err := fmt.Fprintf(h, "%s%s", hostname, labels.String()); err != nil { + return err + } + m.jitterSeed = h.Sum64() + return nil +} + // Stop cancels all running scrape pools and blocks until all have exited. func (m *Manager) Stop() { m.mtxScrape.Lock() @@ -159,6 +179,10 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error { } m.scrapeConfigs = c + if err := m.setJitterSeed(cfg.GlobalConfig.ExternalLabels); err != nil { + return err + } + // Cleanup and reload pool if the configuration has changed. var failed bool for name, sp := range m.scrapePools { @@ -216,3 +240,45 @@ func (m *Manager) TargetsDropped() map[string][]*Target { } return targets } + +// getFqdn returns a fqdn if it's possible, otherwise falls back a hostname. +func getFqdn() (string, error) { + hostname, err := os.Hostname() + if err != nil { + return "", err + } + + ips, err := net.LookupIP(hostname) + if err != nil { + return hostname, err + } + + lookup := func(ipStr encoding.TextMarshaler) (string, error) { + ip, err := ipStr.MarshalText() + if err != nil { + return "", err + } + hosts, err := net.LookupAddr(string(ip)) + if err != nil || len(hosts) == 0 { + return "", err + } + return hosts[0], nil + } + + for _, addr := range ips { + if ip := addr.To4(); ip != nil { + if fqdn, err := lookup(ip); err == nil { + return fqdn, nil + } + + } + + if ip := addr.To16(); ip != nil { + if fqdn, err := lookup(ip); err == nil { + return fqdn, nil + } + + } + } + return hostname, nil +} diff --git a/scrape/manager_test.go b/scrape/manager_test.go index 70c681b936..c77214cb14 100644 --- a/scrape/manager_test.go +++ b/scrape/manager_test.go @@ -19,14 +19,12 @@ import ( "testing" "time" - "github.com/prometheus/prometheus/pkg/relabel" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/relabel" "github.com/prometheus/prometheus/util/testutil" - yaml "gopkg.in/yaml.v2" ) @@ -368,3 +366,44 @@ func TestManagerTargetsUpdates(t *testing.T) { t.Error("No scrape loops reload was triggered after targets update.") } } + +func TestSetJitter(t *testing.T) { + getConfig := func(prometheus string) *config.Config { + cfgText := ` +global: + external_labels: + prometheus: '` + prometheus + `' +` + + cfg := &config.Config{} + if err := yaml.UnmarshalStrict([]byte(cfgText), cfg); err != nil { + t.Fatalf("Unable to load YAML config cfgYaml: %s", err) + } + + return cfg + } + + scrapeManager := NewManager(nil, nil) + + // Load the first config. + cfg1 := getConfig("ha1") + if err := scrapeManager.setJitterSeed(cfg1.GlobalConfig.ExternalLabels); err != nil { + t.Error(err) + } + jitter1 := scrapeManager.jitterSeed + + if jitter1 == 0 { + t.Error("Jitter has to be a hash of uint64") + } + + // Load the first config. + cfg2 := getConfig("ha2") + if err := scrapeManager.setJitterSeed(cfg2.GlobalConfig.ExternalLabels); err != nil { + t.Error(err) + } + jitter2 := scrapeManager.jitterSeed + + if jitter1 == jitter2 { + t.Error("Jitter should not be the same on different set of external labels") + } +} diff --git a/scrape/scrape.go b/scrape/scrape.go index 668c2afc50..dabc9836f5 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -173,7 +173,7 @@ const maxAheadTime = 10 * time.Minute type labelsMutator func(labels.Labels) labels.Labels -func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger) (*scrapePool, error) { +func newScrapePool(cfg *config.ScrapeConfig, app Appendable, jitterSeed uint64, logger log.Logger) (*scrapePool, error) { targetScrapePools.Inc() if logger == nil { logger = log.NewNopLogger() @@ -219,6 +219,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger) return appender(app, opts.limit) }, cache, + jitterSeed, ) } @@ -489,7 +490,7 @@ func appender(app storage.Appender, limit int) storage.Appender { type scraper interface { scrape(ctx context.Context, w io.Writer) (string, error) report(start time.Time, dur time.Duration, err error) - offset(interval time.Duration) time.Duration + offset(interval time.Duration, jitterSeed uint64) time.Duration } // targetScraper implements the scraper interface for a target. @@ -580,6 +581,7 @@ type scrapeLoop struct { cache *scrapeCache lastScrapeSize int buffers *pool.Pool + jitterSeed uint64 appender func() storage.Appender sampleMutator labelsMutator @@ -798,6 +800,7 @@ func newScrapeLoop(ctx context.Context, reportSampleMutator labelsMutator, appender func() storage.Appender, cache *scrapeCache, + jitterSeed uint64, ) *scrapeLoop { if l == nil { l = log.NewNopLogger() @@ -816,6 +819,7 @@ func newScrapeLoop(ctx context.Context, sampleMutator: sampleMutator, reportSampleMutator: reportSampleMutator, stopped: make(chan struct{}), + jitterSeed: jitterSeed, l: l, ctx: ctx, } @@ -826,7 +830,7 @@ func newScrapeLoop(ctx context.Context, func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) { select { - case <-time.After(sl.scraper.offset(interval)): + case <-time.After(sl.scraper.offset(interval, sl.jitterSeed)): // Continue after a scraping offset. case <-sl.scrapeCtx.Done(): close(sl.stopped) diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index 23ec8deb75..9c8025f4b9 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -48,7 +48,7 @@ func TestNewScrapePool(t *testing.T) { var ( app = &nopAppendable{} cfg = &config.ScrapeConfig{} - sp, _ = newScrapePool(cfg, app, nil) + sp, _ = newScrapePool(cfg, app, 0, nil) ) if a, ok := sp.appendable.(*nopAppendable); !ok || a != app { @@ -83,7 +83,7 @@ func TestDroppedTargetsList(t *testing.T) { }, }, } - sp, _ = newScrapePool(cfg, app, nil) + sp, _ = newScrapePool(cfg, app, 0, nil) expectedLabelSetString = "{__address__=\"127.0.0.1:9090\", __metrics_path__=\"\", __scheme__=\"\", job=\"dropMe\"}" expectedLength = 1 ) @@ -305,7 +305,7 @@ func TestScrapePoolReload(t *testing.T) { func TestScrapePoolAppender(t *testing.T) { cfg := &config.ScrapeConfig{} app := &nopAppendable{} - sp, _ := newScrapePool(cfg, app, nil) + sp, _ := newScrapePool(cfg, app, 0, nil) loop := sp.newLoop(scrapeLoopOptions{ target: &Target{}, @@ -353,7 +353,7 @@ func TestScrapePoolRaces(t *testing.T) { newConfig := func() *config.ScrapeConfig { return &config.ScrapeConfig{ScrapeInterval: interval, ScrapeTimeout: timeout} } - sp, _ := newScrapePool(newConfig(), &nopAppendable{}, nil) + sp, _ := newScrapePool(newConfig(), &nopAppendable{}, 0, nil) tgts := []*targetgroup.Group{ { Targets: []model.LabelSet{ @@ -395,7 +395,7 @@ func TestScrapeLoopStopBeforeRun(t *testing.T) { nil, nil, nopMutator, nopMutator, - nil, nil, + nil, nil, 0, ) // The scrape pool synchronizes on stopping scrape loops. However, new scrape @@ -459,6 +459,7 @@ func TestScrapeLoopStop(t *testing.T) { nopMutator, app, nil, + 0, ) // Terminate loop after 2 scrapes. @@ -524,6 +525,7 @@ func TestScrapeLoopRun(t *testing.T) { nopMutator, app, nil, + 0, ) // The loop must terminate during the initial offset if the context @@ -569,6 +571,7 @@ func TestScrapeLoopRun(t *testing.T) { nopMutator, app, nil, + 0, ) go func() { @@ -617,6 +620,7 @@ func TestScrapeLoopMetadata(t *testing.T) { nopMutator, func() storage.Appender { return nopAppender{} }, cache, + 0, ) defer cancel() @@ -666,6 +670,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) { nopMutator, app, nil, + 0, ) // Succeed once, several failures, then stop. numScrapes := 0 @@ -724,6 +729,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) { nopMutator, app, nil, + 0, ) // Succeed once, several failures, then stop. @@ -828,6 +834,7 @@ func TestScrapeLoopAppend(t *testing.T) { }, func() storage.Appender { return app }, nil, + 0, ) now := time.Now() @@ -867,6 +874,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) { nopMutator, func() storage.Appender { return app }, nil, + 0, ) // Get the value of the Counter before performing the append. @@ -927,6 +935,7 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) { nopMutator, func() storage.Appender { return capp }, nil, + 0, ) now := time.Now() @@ -966,6 +975,7 @@ func TestScrapeLoopAppendStaleness(t *testing.T) { nopMutator, func() storage.Appender { return app }, nil, + 0, ) now := time.Now() @@ -1011,6 +1021,7 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) { nopMutator, func() storage.Appender { return app }, nil, + 0, ) now := time.Now() @@ -1050,6 +1061,7 @@ func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) { nopMutator, app, nil, + 0, ) scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { @@ -1079,6 +1091,7 @@ func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) { nopMutator, app, nil, + 0, ) scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { @@ -1125,6 +1138,7 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T nopMutator, func() storage.Appender { return app }, nil, + 0, ) now := time.Unix(1, 0) @@ -1158,6 +1172,7 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) { } }, nil, + 0, ) now := time.Now().Add(20 * time.Minute) @@ -1322,7 +1337,7 @@ type testScraper struct { scrapeFunc func(context.Context, io.Writer) error } -func (ts *testScraper) offset(interval time.Duration) time.Duration { +func (ts *testScraper) offset(interval time.Duration, jitterSeed uint64) time.Duration { return ts.offsetDur } diff --git a/scrape/target.go b/scrape/target.go index d136e6fff5..bafd3533e0 100644 --- a/scrape/target.go +++ b/scrape/target.go @@ -125,12 +125,14 @@ func (t *Target) hash() uint64 { } // offset returns the time until the next scrape cycle for the target. -func (t *Target) offset(interval time.Duration) time.Duration { +// It includes the global server jitterSeed for scrapes from multiple Prometheus to try to be at different times. +func (t *Target) offset(interval time.Duration, jitterSeed uint64) time.Duration { now := time.Now().UnixNano() + // Base is a pinned to absolute time, no matter how often offset is called. var ( base = int64(interval) - now%int64(interval) - offset = t.hash() % uint64(interval) + offset = (t.hash() ^ jitterSeed) % uint64(interval) next = base + int64(offset) ) diff --git a/scrape/target_test.go b/scrape/target_test.go index 7894709a66..d70508fdf9 100644 --- a/scrape/target_test.go +++ b/scrape/target_test.go @@ -47,6 +47,7 @@ func TestTargetLabels(t *testing.T) { func TestTargetOffset(t *testing.T) { interval := 10 * time.Second + jitter := uint64(0) offsets := make([]time.Duration, 10000) @@ -55,7 +56,7 @@ func TestTargetOffset(t *testing.T) { target := newTestTarget("example.com:80", 0, labels.FromStrings( "label", fmt.Sprintf("%d", i), )) - offsets[i] = target.offset(interval) + offsets[i] = target.offset(interval, jitter) } // Put the offsets into buckets and validate that they are all