|
|
|
|
@ -95,18 +95,19 @@ type labelLimits struct { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type scrapeLoopOptions struct { |
|
|
|
|
target *Target |
|
|
|
|
scraper scraper |
|
|
|
|
sampleLimit int |
|
|
|
|
bucketLimit int |
|
|
|
|
labelLimits *labelLimits |
|
|
|
|
honorLabels bool |
|
|
|
|
honorTimestamps bool |
|
|
|
|
interval time.Duration |
|
|
|
|
timeout time.Duration |
|
|
|
|
scrapeClassicHistograms bool |
|
|
|
|
mrc []*relabel.Config |
|
|
|
|
cache *scrapeCache |
|
|
|
|
target *Target |
|
|
|
|
scraper scraper |
|
|
|
|
sampleLimit int |
|
|
|
|
bucketLimit int |
|
|
|
|
labelLimits *labelLimits |
|
|
|
|
honorLabels bool |
|
|
|
|
honorTimestamps bool |
|
|
|
|
trackTimestampsStaleness bool |
|
|
|
|
interval time.Duration |
|
|
|
|
timeout time.Duration |
|
|
|
|
scrapeClassicHistograms bool |
|
|
|
|
mrc []*relabel.Config |
|
|
|
|
cache *scrapeCache |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const maxAheadTime = 10 * time.Minute |
|
|
|
|
@ -160,6 +161,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed |
|
|
|
|
cache, |
|
|
|
|
offsetSeed, |
|
|
|
|
opts.honorTimestamps, |
|
|
|
|
opts.trackTimestampsStaleness, |
|
|
|
|
opts.sampleLimit, |
|
|
|
|
opts.bucketLimit, |
|
|
|
|
opts.labelLimits, |
|
|
|
|
@ -270,9 +272,10 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { |
|
|
|
|
labelNameLengthLimit: int(sp.config.LabelNameLengthLimit), |
|
|
|
|
labelValueLengthLimit: int(sp.config.LabelValueLengthLimit), |
|
|
|
|
} |
|
|
|
|
honorLabels = sp.config.HonorLabels |
|
|
|
|
honorTimestamps = sp.config.HonorTimestamps |
|
|
|
|
mrc = sp.config.MetricRelabelConfigs |
|
|
|
|
honorLabels = sp.config.HonorLabels |
|
|
|
|
honorTimestamps = sp.config.HonorTimestamps |
|
|
|
|
trackTimestampsStaleness = sp.config.TrackTimestampsStaleness |
|
|
|
|
mrc = sp.config.MetricRelabelConfigs |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
sp.targetMtx.Lock() |
|
|
|
|
@ -298,17 +301,18 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { |
|
|
|
|
acceptHeader: acceptHeader(cfg.ScrapeProtocols), |
|
|
|
|
} |
|
|
|
|
newLoop = sp.newLoop(scrapeLoopOptions{ |
|
|
|
|
target: t, |
|
|
|
|
scraper: s, |
|
|
|
|
sampleLimit: sampleLimit, |
|
|
|
|
bucketLimit: bucketLimit, |
|
|
|
|
labelLimits: labelLimits, |
|
|
|
|
honorLabels: honorLabels, |
|
|
|
|
honorTimestamps: honorTimestamps, |
|
|
|
|
mrc: mrc, |
|
|
|
|
cache: cache, |
|
|
|
|
interval: interval, |
|
|
|
|
timeout: timeout, |
|
|
|
|
target: t, |
|
|
|
|
scraper: s, |
|
|
|
|
sampleLimit: sampleLimit, |
|
|
|
|
bucketLimit: bucketLimit, |
|
|
|
|
labelLimits: labelLimits, |
|
|
|
|
honorLabels: honorLabels, |
|
|
|
|
honorTimestamps: honorTimestamps, |
|
|
|
|
trackTimestampsStaleness: trackTimestampsStaleness, |
|
|
|
|
mrc: mrc, |
|
|
|
|
cache: cache, |
|
|
|
|
interval: interval, |
|
|
|
|
timeout: timeout, |
|
|
|
|
}) |
|
|
|
|
) |
|
|
|
|
if err != nil { |
|
|
|
|
@ -396,10 +400,11 @@ func (sp *scrapePool) sync(targets []*Target) { |
|
|
|
|
labelNameLengthLimit: int(sp.config.LabelNameLengthLimit), |
|
|
|
|
labelValueLengthLimit: int(sp.config.LabelValueLengthLimit), |
|
|
|
|
} |
|
|
|
|
honorLabels = sp.config.HonorLabels |
|
|
|
|
honorTimestamps = sp.config.HonorTimestamps |
|
|
|
|
mrc = sp.config.MetricRelabelConfigs |
|
|
|
|
scrapeClassicHistograms = sp.config.ScrapeClassicHistograms |
|
|
|
|
honorLabels = sp.config.HonorLabels |
|
|
|
|
honorTimestamps = sp.config.HonorTimestamps |
|
|
|
|
trackTimestampsStaleness = sp.config.TrackTimestampsStaleness |
|
|
|
|
mrc = sp.config.MetricRelabelConfigs |
|
|
|
|
scrapeClassicHistograms = sp.config.ScrapeClassicHistograms |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
sp.targetMtx.Lock() |
|
|
|
|
@ -421,17 +426,18 @@ func (sp *scrapePool) sync(targets []*Target) { |
|
|
|
|
metrics: sp.metrics, |
|
|
|
|
} |
|
|
|
|
l := sp.newLoop(scrapeLoopOptions{ |
|
|
|
|
target: t, |
|
|
|
|
scraper: s, |
|
|
|
|
sampleLimit: sampleLimit, |
|
|
|
|
bucketLimit: bucketLimit, |
|
|
|
|
labelLimits: labelLimits, |
|
|
|
|
honorLabels: honorLabels, |
|
|
|
|
honorTimestamps: honorTimestamps, |
|
|
|
|
mrc: mrc, |
|
|
|
|
interval: interval, |
|
|
|
|
timeout: timeout, |
|
|
|
|
scrapeClassicHistograms: scrapeClassicHistograms, |
|
|
|
|
target: t, |
|
|
|
|
scraper: s, |
|
|
|
|
sampleLimit: sampleLimit, |
|
|
|
|
bucketLimit: bucketLimit, |
|
|
|
|
labelLimits: labelLimits, |
|
|
|
|
honorLabels: honorLabels, |
|
|
|
|
honorTimestamps: honorTimestamps, |
|
|
|
|
trackTimestampsStaleness: trackTimestampsStaleness, |
|
|
|
|
mrc: mrc, |
|
|
|
|
interval: interval, |
|
|
|
|
timeout: timeout, |
|
|
|
|
scrapeClassicHistograms: scrapeClassicHistograms, |
|
|
|
|
}) |
|
|
|
|
if err != nil { |
|
|
|
|
l.setForcedError(err) |
|
|
|
|
@ -750,21 +756,22 @@ type cacheEntry struct { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type scrapeLoop struct { |
|
|
|
|
scraper scraper |
|
|
|
|
l log.Logger |
|
|
|
|
cache *scrapeCache |
|
|
|
|
lastScrapeSize int |
|
|
|
|
buffers *pool.Pool |
|
|
|
|
offsetSeed uint64 |
|
|
|
|
honorTimestamps bool |
|
|
|
|
forcedErr error |
|
|
|
|
forcedErrMtx sync.Mutex |
|
|
|
|
sampleLimit int |
|
|
|
|
bucketLimit int |
|
|
|
|
labelLimits *labelLimits |
|
|
|
|
interval time.Duration |
|
|
|
|
timeout time.Duration |
|
|
|
|
scrapeClassicHistograms bool |
|
|
|
|
scraper scraper |
|
|
|
|
l log.Logger |
|
|
|
|
cache *scrapeCache |
|
|
|
|
lastScrapeSize int |
|
|
|
|
buffers *pool.Pool |
|
|
|
|
offsetSeed uint64 |
|
|
|
|
honorTimestamps bool |
|
|
|
|
trackTimestampsStaleness bool |
|
|
|
|
forcedErr error |
|
|
|
|
forcedErrMtx sync.Mutex |
|
|
|
|
sampleLimit int |
|
|
|
|
bucketLimit int |
|
|
|
|
labelLimits *labelLimits |
|
|
|
|
interval time.Duration |
|
|
|
|
timeout time.Duration |
|
|
|
|
scrapeClassicHistograms bool |
|
|
|
|
|
|
|
|
|
appender func(ctx context.Context) storage.Appender |
|
|
|
|
sampleMutator labelsMutator |
|
|
|
|
@ -1046,6 +1053,7 @@ func newScrapeLoop(ctx context.Context, |
|
|
|
|
cache *scrapeCache, |
|
|
|
|
offsetSeed uint64, |
|
|
|
|
honorTimestamps bool, |
|
|
|
|
trackTimestampsStaleness bool, |
|
|
|
|
sampleLimit int, |
|
|
|
|
bucketLimit int, |
|
|
|
|
labelLimits *labelLimits, |
|
|
|
|
@ -1080,27 +1088,28 @@ func newScrapeLoop(ctx context.Context, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
sl := &scrapeLoop{ |
|
|
|
|
scraper: sc, |
|
|
|
|
buffers: buffers, |
|
|
|
|
cache: cache, |
|
|
|
|
appender: appender, |
|
|
|
|
sampleMutator: sampleMutator, |
|
|
|
|
reportSampleMutator: reportSampleMutator, |
|
|
|
|
stopped: make(chan struct{}), |
|
|
|
|
offsetSeed: offsetSeed, |
|
|
|
|
l: l, |
|
|
|
|
parentCtx: ctx, |
|
|
|
|
appenderCtx: appenderCtx, |
|
|
|
|
honorTimestamps: honorTimestamps, |
|
|
|
|
sampleLimit: sampleLimit, |
|
|
|
|
bucketLimit: bucketLimit, |
|
|
|
|
labelLimits: labelLimits, |
|
|
|
|
interval: interval, |
|
|
|
|
timeout: timeout, |
|
|
|
|
scrapeClassicHistograms: scrapeClassicHistograms, |
|
|
|
|
reportExtraMetrics: reportExtraMetrics, |
|
|
|
|
appendMetadataToWAL: appendMetadataToWAL, |
|
|
|
|
metrics: metrics, |
|
|
|
|
scraper: sc, |
|
|
|
|
buffers: buffers, |
|
|
|
|
cache: cache, |
|
|
|
|
appender: appender, |
|
|
|
|
sampleMutator: sampleMutator, |
|
|
|
|
reportSampleMutator: reportSampleMutator, |
|
|
|
|
stopped: make(chan struct{}), |
|
|
|
|
offsetSeed: offsetSeed, |
|
|
|
|
l: l, |
|
|
|
|
parentCtx: ctx, |
|
|
|
|
appenderCtx: appenderCtx, |
|
|
|
|
honorTimestamps: honorTimestamps, |
|
|
|
|
trackTimestampsStaleness: trackTimestampsStaleness, |
|
|
|
|
sampleLimit: sampleLimit, |
|
|
|
|
bucketLimit: bucketLimit, |
|
|
|
|
labelLimits: labelLimits, |
|
|
|
|
interval: interval, |
|
|
|
|
timeout: timeout, |
|
|
|
|
scrapeClassicHistograms: scrapeClassicHistograms, |
|
|
|
|
reportExtraMetrics: reportExtraMetrics, |
|
|
|
|
appendMetadataToWAL: appendMetadataToWAL, |
|
|
|
|
metrics: metrics, |
|
|
|
|
} |
|
|
|
|
sl.ctx, sl.cancel = context.WithCancel(ctx) |
|
|
|
|
|
|
|
|
|
@ -1547,7 +1556,7 @@ loop: |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if !ok { |
|
|
|
|
if parsedTimestamp == nil { |
|
|
|
|
if parsedTimestamp == nil || sl.trackTimestampsStaleness { |
|
|
|
|
// Bypass staleness logic if there is an explicit timestamp.
|
|
|
|
|
sl.cache.trackStaleness(hash, lset) |
|
|
|
|
} |
|
|
|
|
@ -1628,7 +1637,7 @@ loop: |
|
|
|
|
func (sl *scrapeLoop) checkAddError(ce *cacheEntry, met []byte, tp *int64, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (bool, error) { |
|
|
|
|
switch errors.Cause(err) { |
|
|
|
|
case nil: |
|
|
|
|
if tp == nil && ce != nil { |
|
|
|
|
if (tp == nil || sl.trackTimestampsStaleness) && ce != nil { |
|
|
|
|
sl.cache.trackStaleness(ce.hash, ce.lset) |
|
|
|
|
} |
|
|
|
|
return true, nil |
|
|
|
|
|