Run checkpointing of in-memory metrics and head chunks periodically.

Checkpointing interval is now a command line flag.

Along the way, several things were refactored.
- Restructure the way the storage is started and stopped..
- Number of series in checkpoint is now a uint64, not a varint.
  (Breaks old checkpoints, needs wipe!)
- More consistent naming and order of methods.

Change-Id: I883d9170c9a608ee716bb0ab3d0ded8ca03760d9
pull/413/head
Bjoern Rabenstein 12 years ago
parent 74c9b34a5e
commit f1de5b0c4e
  1. 11
      main.go
  2. 17
      storage/local/codable/codable.go
  3. 11
      storage/local/interface.go
  4. 171
      storage/local/persistence.go
  5. 497
      storage/local/storage.go
  6. 7
      storage/local/test_helpers.go

@ -58,6 +58,8 @@ var (
storagePurgeInterval = flag.Duration("storage.purgeInterval", time.Hour, "The period at which old data is deleted completely from storage.")
storageRetentionPeriod = flag.Duration("storage.retentionPeriod", 15*24*time.Hour, "The period of time to retain in storage.")
checkpointInterval = flag.Duration("storage.checkpointInterval", 5*time.Minute, "The period at which the in-memory index of time series is checkpointed.")
notificationQueueCapacity = flag.Int("alertmanager.notificationQueueCapacity", 100, "The size of the queue for pending alert manager notifications.")
printVersion = flag.Bool("version", false, "print version information")
@ -104,6 +106,7 @@ func NewPrometheus() *prometheus {
PersistenceStoragePath: *metricsStoragePath,
PersistencePurgeInterval: *storagePurgeInterval,
PersistenceRetentionPeriod: *storageRetentionPeriod,
CheckpointInterval: *checkpointInterval,
}
memStorage, err := local.NewMemorySeriesStorage(o)
if err != nil {
@ -191,9 +194,7 @@ func (p *prometheus) Serve() {
go p.notificationHandler.Run()
go p.interruptHandler()
storageStarted := make(chan struct{})
go p.storage.Serve(storageStarted)
<-storageStarted
p.storage.Start()
go func() {
err := p.webService.ServeForever()
@ -213,8 +214,8 @@ func (p *prometheus) Serve() {
// The following shut-down operations have to happen after
// unwrittenSamples is drained. So do not move them into close().
if err := p.storage.Close(); err != nil {
glog.Error("Error closing local storage: ", err)
if err := p.storage.Stop(); err != nil {
glog.Error("Error stopping local storage: ", err)
}
glog.Info("Local Storage: Done")

@ -72,15 +72,16 @@ func putBuf(buf []byte) {
}
// EncodeVarint encodes an int64 as a varint and writes it to an io.Writer.
// It returns the number of bytes written.
// This is a GC-friendly implementation that takes the required staging buffer
// from a buffer pool.
func EncodeVarint(w io.Writer, i int64) error {
func EncodeVarint(w io.Writer, i int64) (int, error) {
buf := getBuf(binary.MaxVarintLen64)
defer putBuf(buf)
bytesWritten := binary.PutVarint(buf, i)
_, err := w.Write(buf[:bytesWritten])
return err
return bytesWritten, err
}
// EncodeUint64 writes an uint64 to an io.Writer in big-endian byte-order.
@ -111,7 +112,7 @@ func DecodeUint64(r io.Reader) (uint64, error) {
// encodeString writes the varint encoded length followed by the bytes of s to
// b.
func encodeString(b *bytes.Buffer, s string) error {
if err := EncodeVarint(b, int64(len(s))); err != nil {
if _, err := EncodeVarint(b, int64(len(s))); err != nil {
return err
}
if _, err := b.WriteString(s); err != nil {
@ -143,7 +144,7 @@ type Metric clientmodel.Metric
// MarshalBinary implements encoding.BinaryMarshaler.
func (m Metric) MarshalBinary() ([]byte, error) {
buf := &bytes.Buffer{}
if err := EncodeVarint(buf, int64(len(m))); err != nil {
if _, err := EncodeVarint(buf, int64(len(m))); err != nil {
return nil, err
}
for l, v := range m {
@ -332,7 +333,7 @@ type LabelValueSet map[clientmodel.LabelValue]struct{}
// MarshalBinary implements encoding.BinaryMarshaler.
func (vs LabelValueSet) MarshalBinary() ([]byte, error) {
buf := &bytes.Buffer{}
if err := EncodeVarint(buf, int64(len(vs))); err != nil {
if _, err := EncodeVarint(buf, int64(len(vs))); err != nil {
return nil, err
}
for v := range vs {
@ -370,7 +371,7 @@ type LabelValues clientmodel.LabelValues
// MarshalBinary implements encoding.BinaryMarshaler.
func (vs LabelValues) MarshalBinary() ([]byte, error) {
buf := &bytes.Buffer{}
if err := EncodeVarint(buf, int64(len(vs))); err != nil {
if _, err := EncodeVarint(buf, int64(len(vs))); err != nil {
return nil, err
}
for _, v := range vs {
@ -409,10 +410,10 @@ type TimeRange struct {
// MarshalBinary implements encoding.BinaryMarshaler.
func (tr TimeRange) MarshalBinary() ([]byte, error) {
buf := &bytes.Buffer{}
if err := EncodeVarint(buf, int64(tr.First)); err != nil {
if _, err := EncodeVarint(buf, int64(tr.First)); err != nil {
return nil, err
}
if err := EncodeVarint(buf, int64(tr.Last)); err != nil {
if _, err := EncodeVarint(buf, int64(tr.Last)); err != nil {
return nil, err
}
return buf.Bytes(), nil

@ -41,10 +41,13 @@ type Storage interface {
GetMetricForFingerprint(clientmodel.Fingerprint) clientmodel.Metric
// Construct an iterator for a given fingerprint.
NewIterator(clientmodel.Fingerprint) SeriesIterator
// Run the request-serving and maintenance loop.
Serve(started chan struct{})
// Close the MetricsStorage and releases all resources.
Close() error
// Run the various maintenance loops in goroutines. Returns when the
// storage is ready to use. Keeps everything running in the background
// until Close is called.
Start()
// Stop shuts down the Storage gracefully, flushes all pending
// operations, stops all maintenance loops,and frees all resources.
Stop() error
// WaitForIndexing returns once all samples in the storage are
// indexed. Indexing is needed for GetFingerprintsForLabelMatchers and
// GetLabelValuesForLabelName and may lag behind.

@ -39,6 +39,7 @@ const (
seriesTempFileSuffix = ".db.tmp"
headsFileName = "heads.db"
headsTempFileName = "heads.db.tmp"
headsFormatVersion = 1
headsMagicString = "PrometheusHeads"
@ -101,6 +102,7 @@ type persistence struct {
indexingQueueCapacity prometheus.Metric
indexingBatchSizes prometheus.Summary
indexingBatchLatency prometheus.Summary
checkpointDuration prometheus.Gauge
}
// newPersistence returns a newly allocated persistence backed by local disk storage, ready to use.
@ -170,6 +172,12 @@ func newPersistence(basePath string, chunkLen int) (*persistence, error) {
Help: "Quantiles for batch indexing latencies in milliseconds.",
},
),
checkpointDuration: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "checkpoint_duration_milliseconds",
Help: "The duration (in milliseconds) it took to checkpoint in-memory metrics and head chunks.",
}),
}
go p.processIndexingQueue()
return p, nil
@ -181,6 +189,7 @@ func (p *persistence) Describe(ch chan<- *prometheus.Desc) {
ch <- p.indexingQueueCapacity.Desc()
p.indexingBatchSizes.Describe(ch)
p.indexingBatchLatency.Describe(ch)
ch <- p.checkpointDuration.Desc()
}
// Collect implements prometheus.Collector.
@ -191,6 +200,7 @@ func (p *persistence) Collect(ch chan<- prometheus.Metric) {
ch <- p.indexingQueueCapacity
p.indexingBatchSizes.Collect(ch)
p.indexingBatchLatency.Collect(ch)
ch <- p.checkpointDuration
}
// getFingerprintsForLabelPair returns the fingerprints for the given label
@ -340,32 +350,47 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie
return cds, nil
}
// persistSeriesMapAndHeads persists the fingerprint to memory-series mapping
// checkpointSeriesMapAndHeads persists the fingerprint to memory-series mapping
// and all open (non-full) head chunks. Do not call concurrently with
// LoadSeriesMapAndHeads.
//
// TODO: Currently, this method assumes to be called while nothing else is going
// on in the storage concurrently. To make this method callable during normal
// operations, certain things have to be done:
// - Make sure the length of the seriesMap doesn't change during the runtime.
// - Lock the fingerprints while persisting unpersisted head chunks.
// - Write to temporary file and only rename after successfully finishing.
func (p *persistence) persistSeriesMapAndHeads(fingerprintToSeries *seriesMap) error {
f, err := os.OpenFile(p.headsPath(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap, fpLocker *fingerprintLocker) (err error) {
glog.Info("Checkpointing in-memory metrics and head chunks...")
begin := time.Now()
f, err := os.OpenFile(p.headsTempFileName(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
if err != nil {
return err
return
}
defer f.Close()
defer func() {
closeErr := f.Close()
if err != nil {
return
}
err = closeErr
if err != nil {
return
}
err = os.Rename(p.headsTempFileName(), p.headsFileName())
duration := time.Since(begin)
p.checkpointDuration.Set(float64(duration) / float64(time.Millisecond))
glog.Infof("Done checkpointing in-memory metrics and head chunks in %v.", duration)
}()
w := bufio.NewWriterSize(f, fileBufSize)
if _, err := w.WriteString(headsMagicString); err != nil {
return err
if _, err = w.WriteString(headsMagicString); err != nil {
return
}
if err := codable.EncodeVarint(w, headsFormatVersion); err != nil {
return err
var numberOfSeriesOffset int
if numberOfSeriesOffset, err = codable.EncodeVarint(w, headsFormatVersion); err != nil {
return
}
if err := codable.EncodeVarint(w, int64(fingerprintToSeries.length())); err != nil {
return err
numberOfSeriesOffset += len(headsMagicString)
numberOfSeriesInHeader := uint64(fingerprintToSeries.length())
// We have to write the number of series as uint64 because we might need
// to overwrite it later, and a varint might change byte width then.
if err = codable.EncodeUint64(w, numberOfSeriesInHeader); err != nil {
return
}
iter := fingerprintToSeries.iter()
@ -375,48 +400,76 @@ func (p *persistence) persistSeriesMapAndHeads(fingerprintToSeries *seriesMap) e
}
}()
var realNumberOfSeries uint64
for m := range iter {
var seriesFlags byte
if m.series.chunkDescsLoaded {
seriesFlags |= flagChunkDescsLoaded
}
if m.series.headChunkPersisted {
seriesFlags |= flagHeadChunkPersisted
}
if err := w.WriteByte(seriesFlags); err != nil {
return err
}
if err := codable.EncodeUint64(w, uint64(m.fp)); err != nil {
return err
}
buf, err := codable.Metric(m.series.metric).MarshalBinary()
func() { // Wrapped in function to use defer for unlocking the fp.
fpLocker.Lock(m.fp)
defer fpLocker.Unlock(m.fp)
if len(m.series.chunkDescs) == 0 {
// This series was completely purged or archived in the meantime. Ignore.
return
}
realNumberOfSeries++
var seriesFlags byte
if m.series.chunkDescsLoaded {
seriesFlags |= flagChunkDescsLoaded
}
if m.series.headChunkPersisted {
seriesFlags |= flagHeadChunkPersisted
}
if err = w.WriteByte(seriesFlags); err != nil {
return
}
if err = codable.EncodeUint64(w, uint64(m.fp)); err != nil {
return
}
var buf []byte
buf, err = codable.Metric(m.series.metric).MarshalBinary()
if err != nil {
return
}
w.Write(buf)
if _, err = codable.EncodeVarint(w, int64(len(m.series.chunkDescs))); err != nil {
return
}
for i, chunkDesc := range m.series.chunkDescs {
if m.series.headChunkPersisted || i < len(m.series.chunkDescs)-1 {
if _, err = codable.EncodeVarint(w, int64(chunkDesc.firstTime())); err != nil {
return
}
if _, err = codable.EncodeVarint(w, int64(chunkDesc.lastTime())); err != nil {
return
}
} else {
// This is the non-persisted head chunk. Fully marshal it.
if err = w.WriteByte(chunkType(chunkDesc.chunk)); err != nil {
return
}
if err = chunkDesc.chunk.marshal(w); err != nil {
return
}
}
}
}()
if err != nil {
return err
return
}
w.Write(buf)
if err := codable.EncodeVarint(w, int64(len(m.series.chunkDescs))); err != nil {
return err
}
if err = w.Flush(); err != nil {
return
}
if realNumberOfSeries != numberOfSeriesInHeader {
// The number of series has changed in the meantime.
// Rewrite it in the header.
if _, err = f.Seek(int64(numberOfSeriesOffset), os.SEEK_SET); err != nil {
return
}
for i, chunkDesc := range m.series.chunkDescs {
if m.series.headChunkPersisted || i < len(m.series.chunkDescs)-1 {
if err := codable.EncodeVarint(w, int64(chunkDesc.firstTime())); err != nil {
return err
}
if err := codable.EncodeVarint(w, int64(chunkDesc.lastTime())); err != nil {
return err
}
} else {
// This is the non-persisted head chunk. Fully marshal it.
if err := w.WriteByte(chunkType(chunkDesc.chunk)); err != nil {
return err
}
if err := chunkDesc.chunk.marshal(w); err != nil {
return err
}
}
if err = codable.EncodeUint64(f, realNumberOfSeries); err != nil {
return
}
}
return w.Flush()
return
}
// loadSeriesMapAndHeads loads the fingerprint to memory-series mapping and all
@ -426,7 +479,7 @@ func (p *persistence) persistSeriesMapAndHeads(fingerprintToSeries *seriesMap) e
func (p *persistence) loadSeriesMapAndHeads() (*seriesMap, error) {
var chunksTotal, chunkDescsTotal int64
f, err := os.Open(p.headsPath())
f, err := os.Open(p.headsFileName())
if os.IsNotExist(err) {
return newSeriesMap(), nil
}
@ -450,7 +503,7 @@ func (p *persistence) loadSeriesMapAndHeads() (*seriesMap, error) {
if version, err := binary.ReadVarint(r); version != headsFormatVersion || err != nil {
return nil, fmt.Errorf("unknown heads format version, want %d", headsFormatVersion)
}
numSeries, err := binary.ReadVarint(r)
numSeries, err := codable.DecodeUint64(r)
if err != nil {
return nil, err
}
@ -783,10 +836,14 @@ func (p *persistence) offsetForChunkIndex(i int) int64 {
return int64(i * (chunkHeaderLen + p.chunkLen))
}
func (p *persistence) headsPath() string {
func (p *persistence) headsFileName() string {
return path.Join(p.basePath, headsFileName)
}
func (p *persistence) headsTempFileName() string {
return path.Join(p.basePath, headsTempFileName)
}
func (p *persistence) processIndexingQueue() {
batchSize := 0
nameToValues := index.LabelNameLabelValuesMapping{}

@ -37,30 +37,31 @@ const (
storageStopping
)
type memorySeriesStorage struct {
fpLocker *fingerprintLocker
persistDone chan bool
stopServing chan chan<- bool
type persistRequest struct {
fingerprint clientmodel.Fingerprint
chunkDesc *chunkDesc
}
type memorySeriesStorage struct {
fpLocker *fingerprintLocker
fingerprintToSeries *seriesMap
memoryEvictionInterval time.Duration
memoryRetentionPeriod time.Duration
persistencePurgeInterval time.Duration
persistenceRetentionPeriod time.Duration
persistQueue chan *persistRequest
persistence *persistence
persistLatency prometheus.Summary
persistErrors *prometheus.CounterVec
persistQueueLength prometheus.Gauge
numSeries prometheus.Gauge
seriesOps *prometheus.CounterVec
ingestedSamplesCount prometheus.Counter
purgeDuration, evictionDuration prometheus.Gauge
loopStopping, loopStopped chan struct{}
evictInterval, evictAfter time.Duration
purgeInterval, purgeAfter time.Duration
checkpointInterval time.Duration
persistQueue chan *persistRequest
persistStopped chan struct{}
persistence *persistence
persistLatency prometheus.Summary
persistErrors *prometheus.CounterVec
persistQueueLength prometheus.Gauge
numSeries prometheus.Gauge
seriesOps *prometheus.CounterVec
ingestedSamplesCount prometheus.Counter
purgeDuration, evictDuration prometheus.Gauge
}
// MemorySeriesStorageOptions contains options needed by
@ -72,6 +73,7 @@ type MemorySeriesStorageOptions struct {
PersistenceStoragePath string // Location of persistence files.
PersistencePurgeInterval time.Duration // How often to check for purging.
PersistenceRetentionPeriod time.Duration // Chunks at least that old are purged.
CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks.
}
// NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still
@ -96,20 +98,20 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
numSeries.Set(float64(fingerprintToSeries.length()))
return &memorySeriesStorage{
fpLocker: newFingerprintLocker(100), // TODO: Tweak value.
fpLocker: newFingerprintLocker(100), // TODO: Tweak value.
fingerprintToSeries: fingerprintToSeries,
persistDone: make(chan bool),
stopServing: make(chan chan<- bool),
memoryEvictionInterval: o.MemoryEvictionInterval,
memoryRetentionPeriod: o.MemoryRetentionPeriod,
loopStopping: make(chan struct{}),
loopStopped: make(chan struct{}),
evictInterval: o.MemoryEvictionInterval,
evictAfter: o.MemoryRetentionPeriod,
purgeInterval: o.PersistencePurgeInterval,
purgeAfter: o.PersistenceRetentionPeriod,
checkpointInterval: o.CheckpointInterval,
persistencePurgeInterval: o.PersistencePurgeInterval,
persistenceRetentionPeriod: o.PersistenceRetentionPeriod,
persistQueue: make(chan *persistRequest, persistQueueCap),
persistence: p,
persistQueue: make(chan *persistRequest, persistQueueCap),
persistStopped: make(chan struct{}),
persistence: p,
persistLatency: prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: namespace,
@ -154,18 +156,166 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
Name: "purge_duration_milliseconds",
Help: "The duration of the last storage purge iteration in milliseconds.",
}),
evictionDuration: prometheus.NewGauge(prometheus.GaugeOpts{
evictDuration: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "eviction_duration_milliseconds",
Name: "evict_duration_milliseconds",
Help: "The duration of the last memory eviction iteration in milliseconds.",
}),
}, nil
}
type persistRequest struct {
fingerprint clientmodel.Fingerprint
chunkDesc *chunkDesc
// Start implements Storage.
func (s *memorySeriesStorage) Start() {
go s.handlePersistQueue()
go s.loop()
}
// Stop implements Storage.
func (s *memorySeriesStorage) Stop() error {
glog.Info("Stopping maintenance loop...")
close(s.loopStopping)
<-s.loopStopped
glog.Info("Stopping persist loop...")
close(s.persistQueue)
<-s.persistStopped
// One final checkpoint of the series map and the head chunks.
if err := s.persistence.checkpointSeriesMapAndHeads(s.fingerprintToSeries, s.fpLocker); err != nil {
return err
}
if err := s.persistence.close(); err != nil {
return err
}
return nil
}
// WaitForIndexing implements Storage.
func (s *memorySeriesStorage) WaitForIndexing() {
s.persistence.waitForIndexing()
}
// NewIterator implements storage.
func (s *memorySeriesStorage) NewIterator(fp clientmodel.Fingerprint) SeriesIterator {
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
series, ok := s.fingerprintToSeries.get(fp)
if !ok {
// Oops, no series for fp found. That happens if, after
// preloading is done, the whole series is identified as old
// enough for purging and hence purged for good. As there is no
// data left to iterate over, return an iterator that will never
// return any values.
return nopSeriesIterator{}
}
return series.newIterator(
func() { s.fpLocker.Lock(fp) },
func() { s.fpLocker.Unlock(fp) },
)
}
// NewPreloader implements Storage.
func (s *memorySeriesStorage) NewPreloader() Preloader {
return &memorySeriesPreloader{
storage: s,
}
}
// GetFingerprintsForLabelMatchers implements Storage.
func (s *memorySeriesStorage) GetFingerprintsForLabelMatchers(labelMatchers metric.LabelMatchers) clientmodel.Fingerprints {
var result map[clientmodel.Fingerprint]struct{}
for _, matcher := range labelMatchers {
intersection := map[clientmodel.Fingerprint]struct{}{}
switch matcher.Type {
case metric.Equal:
fps, err := s.persistence.getFingerprintsForLabelPair(
metric.LabelPair{
Name: matcher.Name,
Value: matcher.Value,
},
)
if err != nil {
glog.Error("Error getting fingerprints for label pair: ", err)
}
if len(fps) == 0 {
return nil
}
for _, fp := range fps {
if _, ok := result[fp]; ok || result == nil {
intersection[fp] = struct{}{}
}
}
default:
values, err := s.persistence.getLabelValuesForLabelName(matcher.Name)
if err != nil {
glog.Errorf("Error getting label values for label name %q: %v", matcher.Name, err)
}
matches := matcher.Filter(values)
if len(matches) == 0 {
return nil
}
for _, v := range matches {
fps, err := s.persistence.getFingerprintsForLabelPair(
metric.LabelPair{
Name: matcher.Name,
Value: v,
},
)
if err != nil {
glog.Error("Error getting fingerprints for label pair: ", err)
}
for _, fp := range fps {
if _, ok := result[fp]; ok || result == nil {
intersection[fp] = struct{}{}
}
}
}
}
if len(intersection) == 0 {
return nil
}
result = intersection
}
fps := make(clientmodel.Fingerprints, 0, len(result))
for fp := range result {
fps = append(fps, fp)
}
return fps
}
// GetLabelValuesForLabelName implements Storage.
func (s *memorySeriesStorage) GetLabelValuesForLabelName(labelName clientmodel.LabelName) clientmodel.LabelValues {
lvs, err := s.persistence.getLabelValuesForLabelName(labelName)
if err != nil {
glog.Errorf("Error getting label values for label name %q: %v", labelName, err)
}
return lvs
}
// GetMetricForFingerprint implements Storage.
func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint) clientmodel.Metric {
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
series, ok := s.fingerprintToSeries.get(fp)
if ok {
// Copy required here because caller might mutate the returned
// metric.
m := make(clientmodel.Metric, len(series.metric))
for ln, lv := range series.metric {
m[ln] = lv
}
return m
}
metric, err := s.persistence.getArchivedMetric(fp)
if err != nil {
glog.Errorf("Error retrieving archived metric for fingerprint %v: %v", fp, err)
}
return metric
}
// AppendSamples implements Storage.
@ -250,51 +400,6 @@ func (s *memorySeriesStorage) preloadChunksForRange(
return series.preloadChunksForRange(from, through, fp, s.persistence)
}
// NewIterator implements storage.
func (s *memorySeriesStorage) NewIterator(fp clientmodel.Fingerprint) SeriesIterator {
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
series, ok := s.fingerprintToSeries.get(fp)
if !ok {
// Oops, no series for fp found. That happens if, after
// preloading is done, the whole series is identified as old
// enough for purging and hence purged for good. As there is no
// data left to iterate over, return an iterator that will never
// return any values.
return nopSeriesIterator{}
}
return series.newIterator(
func() { s.fpLocker.Lock(fp) },
func() { s.fpLocker.Unlock(fp) },
)
}
func (s *memorySeriesStorage) evictMemoryChunks(ttl time.Duration) {
defer func(begin time.Time) {
s.evictionDuration.Set(float64(time.Since(begin)) / float64(time.Millisecond))
}(time.Now())
for m := range s.fingerprintToSeries.iter() {
s.fpLocker.Lock(m.fp)
if m.series.evictOlderThan(
clientmodel.TimestampFromTime(time.Now()).Add(-1*ttl),
m.fp, s.persistQueue,
) {
s.fingerprintToSeries.del(m.fp)
s.numSeries.Dec()
if err := s.persistence.archiveMetric(
m.fp, m.series.metric, m.series.firstTime(), m.series.lastTime(),
); err != nil {
glog.Errorf("Error archiving metric %v: %v", m.series.metric, err)
} else {
s.seriesOps.WithLabelValues(archive).Inc()
}
}
s.fpLocker.Unlock(m.fp)
}
}
func (s *memorySeriesStorage) handlePersistQueue() {
for req := range s.persistQueue {
s.persistQueueLength.Set(float64(len(s.persistQueue)))
@ -310,58 +415,71 @@ func (s *memorySeriesStorage) handlePersistQueue() {
req.chunkDesc.unpin()
chunkOps.WithLabelValues(persistAndUnpin).Inc()
}
s.persistDone <- true
}
// WaitForIndexing implements Storage.
func (s *memorySeriesStorage) WaitForIndexing() {
s.persistence.waitForIndexing()
}
// Close stops serving, flushes all pending operations, and frees all
// resources. It implements Storage.
func (s *memorySeriesStorage) Close() error {
stopped := make(chan bool)
glog.Info("Waiting for storage to stop serving...")
s.stopServing <- stopped
<-stopped
glog.Info("Serving stopped.")
glog.Info("Stopping persist loop...")
close(s.persistQueue)
<-s.persistDone
glog.Info("Persist loop stopped.")
glog.Info("Persisting head chunks...")
if err := s.persistence.persistSeriesMapAndHeads(s.fingerprintToSeries); err != nil {
return err
}
glog.Info("Done persisting head chunks.")
if err := s.persistence.close(); err != nil {
return err
}
return nil
close(s.persistStopped)
}
func (s *memorySeriesStorage) purgePeriodically(stop <-chan bool) {
purgeTicker := time.NewTicker(s.persistencePurgeInterval)
defer purgeTicker.Stop()
func (s *memorySeriesStorage) loop() {
evictTicker := time.NewTicker(s.evictInterval)
purgeTicker := time.NewTicker(s.purgeInterval)
checkpointTicker := time.NewTicker(s.checkpointInterval)
defer func() {
evictTicker.Stop()
purgeTicker.Stop()
checkpointTicker.Stop()
glog.Info("Maintenance loop stopped.")
close(s.loopStopped)
}()
for {
select {
case <-stop:
glog.Info("Purging loop stopped.")
case <-s.loopStopping:
return
case <-checkpointTicker.C:
s.persistence.checkpointSeriesMapAndHeads(s.fingerprintToSeries, s.fpLocker)
case <-evictTicker.C:
// TODO: Change this to be based on number of chunks in memory.
glog.Info("Evicting chunks...")
begin := time.Now()
for m := range s.fingerprintToSeries.iter() {
select {
case <-s.loopStopping:
glog.Info("Interrupted evicting chunks.")
return
default:
// Keep going.
}
s.fpLocker.Lock(m.fp)
if m.series.evictOlderThan(
clientmodel.TimestampFromTime(time.Now()).Add(-1*s.evictAfter),
m.fp, s.persistQueue,
) {
s.fingerprintToSeries.del(m.fp)
s.numSeries.Dec()
if err := s.persistence.archiveMetric(
m.fp, m.series.metric, m.series.firstTime(), m.series.lastTime(),
); err != nil {
glog.Errorf("Error archiving metric %v: %v", m.series.metric, err)
} else {
s.seriesOps.WithLabelValues(archive).Inc()
}
}
s.fpLocker.Unlock(m.fp)
}
duration := time.Since(begin)
s.evictDuration.Set(float64(duration) / float64(time.Millisecond))
glog.Infof("Done evicting chunks in %v.", duration)
case <-purgeTicker.C:
glog.Info("Purging old series data...")
ts := clientmodel.TimestampFromTime(time.Now()).Add(-1 * s.persistenceRetentionPeriod)
ts := clientmodel.TimestampFromTime(time.Now()).Add(-1 * s.purgeAfter)
begin := time.Now()
for fp := range s.fingerprintToSeries.fpIter() {
select {
case <-stop:
glog.Info("Interrupted running series purge.")
case <-s.loopStopping:
glog.Info("Interrupted purging series.")
return
default:
s.purgeSeries(fp, ts)
@ -375,15 +493,16 @@ func (s *memorySeriesStorage) purgePeriodically(stop <-chan bool) {
}
for _, fp := range persistedFPs {
select {
case <-stop:
glog.Info("Interrupted running series purge.")
case <-s.loopStopping:
glog.Info("Interrupted purnging series.")
return
default:
s.purgeSeries(fp, ts)
}
}
s.purgeDuration.Set(float64(time.Since(begin)) / float64(time.Millisecond))
glog.Info("Done purging old series data.")
duration := time.Since(begin)
s.purgeDuration.Set(float64(duration) / float64(time.Millisecond))
glog.Infof("Done purging old series data in %v.", duration)
}
}
}
@ -427,130 +546,6 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime
}
}
// Serve implements Storage.
func (s *memorySeriesStorage) Serve(started chan struct{}) {
evictMemoryTicker := time.NewTicker(s.memoryEvictionInterval)
defer evictMemoryTicker.Stop()
go s.handlePersistQueue()
stopPurge := make(chan bool)
go s.purgePeriodically(stopPurge)
close(started)
for {
select {
case <-evictMemoryTicker.C:
s.evictMemoryChunks(s.memoryRetentionPeriod)
case stopped := <-s.stopServing:
stopPurge <- true
stopped <- true
return
}
}
}
// NewPreloader implements Storage.
func (s *memorySeriesStorage) NewPreloader() Preloader {
return &memorySeriesPreloader{
storage: s,
}
}
// GetFingerprintsForLabelMatchers implements Storage.
func (s *memorySeriesStorage) GetFingerprintsForLabelMatchers(labelMatchers metric.LabelMatchers) clientmodel.Fingerprints {
var result map[clientmodel.Fingerprint]struct{}
for _, matcher := range labelMatchers {
intersection := map[clientmodel.Fingerprint]struct{}{}
switch matcher.Type {
case metric.Equal:
fps, err := s.persistence.getFingerprintsForLabelPair(
metric.LabelPair{
Name: matcher.Name,
Value: matcher.Value,
},
)
if err != nil {
glog.Error("Error getting fingerprints for label pair: ", err)
}
if len(fps) == 0 {
return nil
}
for _, fp := range fps {
if _, ok := result[fp]; ok || result == nil {
intersection[fp] = struct{}{}
}
}
default:
values, err := s.persistence.getLabelValuesForLabelName(matcher.Name)
if err != nil {
glog.Errorf("Error getting label values for label name %q: %v", matcher.Name, err)
}
matches := matcher.Filter(values)
if len(matches) == 0 {
return nil
}
for _, v := range matches {
fps, err := s.persistence.getFingerprintsForLabelPair(
metric.LabelPair{
Name: matcher.Name,
Value: v,
},
)
if err != nil {
glog.Error("Error getting fingerprints for label pair: ", err)
}
for _, fp := range fps {
if _, ok := result[fp]; ok || result == nil {
intersection[fp] = struct{}{}
}
}
}
}
if len(intersection) == 0 {
return nil
}
result = intersection
}
fps := make(clientmodel.Fingerprints, 0, len(result))
for fp := range result {
fps = append(fps, fp)
}
return fps
}
// GetLabelValuesForLabelName implements Storage.
func (s *memorySeriesStorage) GetLabelValuesForLabelName(labelName clientmodel.LabelName) clientmodel.LabelValues {
lvs, err := s.persistence.getLabelValuesForLabelName(labelName)
if err != nil {
glog.Errorf("Error getting label values for label name %q: %v", labelName, err)
}
return lvs
}
// GetMetricForFingerprint implements Storage.
func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint) clientmodel.Metric {
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
series, ok := s.fingerprintToSeries.get(fp)
if ok {
// Copy required here because caller might mutate the returned
// metric.
m := make(clientmodel.Metric, len(series.metric))
for ln, lv := range series.metric {
m[ln] = lv
}
return m
}
metric, err := s.persistence.getArchivedMetric(fp)
if err != nil {
glog.Errorf("Error retrieving archived metric for fingerprint %v: %v", fp, err)
}
return metric
}
// To expose persistQueueCap as metric:
var (
persistQueueCapDesc = prometheus.NewDesc(
@ -574,7 +569,7 @@ func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
s.seriesOps.Describe(ch)
ch <- s.ingestedSamplesCount.Desc()
ch <- s.purgeDuration.Desc()
ch <- s.evictionDuration.Desc()
ch <- s.evictDuration.Desc()
ch <- persistQueueCapDesc
@ -593,7 +588,7 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
s.seriesOps.Collect(ch)
ch <- s.ingestedSamplesCount
ch <- s.purgeDuration
ch <- s.evictionDuration
ch <- s.evictDuration
ch <- persistQueueCapGauge

@ -26,7 +26,7 @@ type testStorageCloser struct {
}
func (t *testStorageCloser) Close() {
t.storage.Close()
t.storage.Stop()
t.directory.Close()
}
@ -41,6 +41,7 @@ func NewTestStorage(t testing.TB) (Storage, test.Closer) {
PersistencePurgeInterval: time.Hour,
PersistenceRetentionPeriod: 24 * 7 * time.Hour,
PersistenceStoragePath: directory.Path(),
CheckpointInterval: time.Hour,
}
storage, err := NewMemorySeriesStorage(o)
if err != nil {
@ -48,9 +49,7 @@ func NewTestStorage(t testing.TB) (Storage, test.Closer) {
t.Fatalf("Error creating storage: %s", err)
}
storageStarted := make(chan struct{})
go storage.Serve(storageStarted)
<-storageStarted
storage.Start()
closer := &testStorageCloser{
storage: storage,

Loading…
Cancel
Save