|
|
|
|
@ -26,6 +26,15 @@ import ( |
|
|
|
|
"time" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// CurationState contains high-level curation state information for the
|
|
|
|
|
// heads-up-display.
|
|
|
|
|
type CurationState struct { |
|
|
|
|
Active bool |
|
|
|
|
Name string |
|
|
|
|
Limit time.Duration |
|
|
|
|
Fingerprint model.Fingerprint |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// watermarkFilter determines whether to include or exclude candidate
|
|
|
|
|
// values from the curation process by virtue of how old the high watermark is.
|
|
|
|
|
type watermarkFilter struct { |
|
|
|
|
@ -43,6 +52,8 @@ type watermarkFilter struct { |
|
|
|
|
stop chan bool |
|
|
|
|
// stopAt is used to determine the elegibility of series for compaction.
|
|
|
|
|
stopAt time.Time |
|
|
|
|
// status is the outbound channel for notifying the status page of its state.
|
|
|
|
|
status chan CurationState |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// curator is responsible for effectuating a given curation policy across the
|
|
|
|
|
@ -97,7 +108,7 @@ func newCurator() curator { |
|
|
|
|
// curated.
|
|
|
|
|
// curationState is the on-disk store where the curation remarks are made for
|
|
|
|
|
// how much progress has been made.
|
|
|
|
|
func (c curator) run(ignoreYoungerThan time.Duration, instant time.Time, processor processor, curationState, samples, watermarks *leveldb.LevelDBPersistence) (err error) { |
|
|
|
|
func (c curator) run(ignoreYoungerThan time.Duration, instant time.Time, processor processor, curationState, samples, watermarks *leveldb.LevelDBPersistence, status chan CurationState) (err error) { |
|
|
|
|
defer func(t time.Time) { |
|
|
|
|
duration := float64(time.Since(t)) |
|
|
|
|
|
|
|
|
|
@ -113,6 +124,11 @@ func (c curator) run(ignoreYoungerThan time.Duration, instant time.Time, process |
|
|
|
|
curationDuration.IncrementBy(labels, duration) |
|
|
|
|
curationDurations.Add(labels, duration) |
|
|
|
|
}(time.Now()) |
|
|
|
|
defer func() { |
|
|
|
|
status <- CurationState{ |
|
|
|
|
Active: false, |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
iterator := samples.NewIterator(true) |
|
|
|
|
defer iterator.Close() |
|
|
|
|
@ -130,8 +146,9 @@ func (c curator) run(ignoreYoungerThan time.Duration, instant time.Time, process |
|
|
|
|
|
|
|
|
|
filter := watermarkFilter{ |
|
|
|
|
curationState: curationState, |
|
|
|
|
processor: processor, |
|
|
|
|
ignoreYoungerThan: ignoreYoungerThan, |
|
|
|
|
processor: processor, |
|
|
|
|
status: status, |
|
|
|
|
stop: c.stop, |
|
|
|
|
stopAt: instant.Add(-1 * ignoreYoungerThan), |
|
|
|
|
} |
|
|
|
|
@ -235,6 +252,8 @@ func getCurationRemark(states raw.Persistence, processor processor, ignoreYounge |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (w watermarkFilter) Filter(key, value interface{}) (r storage.FilterResult) { |
|
|
|
|
fingerprint := key.(model.Fingerprint) |
|
|
|
|
|
|
|
|
|
defer func() { |
|
|
|
|
labels := map[string]string{ |
|
|
|
|
cutOff: fmt.Sprint(w.ignoreYoungerThan), |
|
|
|
|
@ -245,11 +264,19 @@ func (w watermarkFilter) Filter(key, value interface{}) (r storage.FilterResult) |
|
|
|
|
curationFilterOperations.Increment(labels) |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
defer func() { |
|
|
|
|
w.status <- CurationState{ |
|
|
|
|
Active: true, |
|
|
|
|
Name: w.processor.Name(), |
|
|
|
|
Limit: w.ignoreYoungerThan, |
|
|
|
|
Fingerprint: fingerprint, |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
if w.shouldStop() { |
|
|
|
|
return storage.STOP |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fingerprint := key.(model.Fingerprint) |
|
|
|
|
curationRemark, err := getCurationRemark(w.curationState, w.processor, w.ignoreYoungerThan, fingerprint) |
|
|
|
|
if err != nil { |
|
|
|
|
return |
|
|
|
|
|