@ -64,15 +64,18 @@ type memorySeriesStorage struct {
fpLocker * fingerprintLocker
fpToSeries * seriesMap
loopStopping , loopStopped chan struct { }
maxMemoryChunks int
purgeAfter time . Duration
checkpointInterval time . Duration
loopStopping , loopStopped chan struct { }
maxMemoryChunks int
purgeAfter time . Duration
checkpointInterval time . Duration
checkpointDirtySeriesLimit int
persistQueue chan persistRequest
persistStopped chan struct { }
persistence * persistence
countPersistedHeadChunks chan struct { }
evictList * list . List
evictRequests chan evictRequest
evictStopping , evictStopped chan struct { }
@ -95,6 +98,7 @@ type MemorySeriesStorageOptions struct {
PersistenceStoragePath string // Location of persistence files.
PersistenceRetentionPeriod time . Duration // Chunks at least that old are purged.
CheckpointInterval time . Duration // How often to checkpoint the series map and head chunks.
CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint.
Dirty bool // Force the storage to consider itself dirty on startup.
}
@ -123,16 +127,19 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
fpLocker : newFingerprintLocker ( 256 ) ,
fpToSeries : fpToSeries ,
loopStopping : make ( chan struct { } ) ,
loopStopped : make ( chan struct { } ) ,
maxMemoryChunks : o . MemoryChunks ,
purgeAfter : o . PersistenceRetentionPeriod ,
checkpointInterval : o . CheckpointInterval ,
loopStopping : make ( chan struct { } ) ,
loopStopped : make ( chan struct { } ) ,
maxMemoryChunks : o . MemoryChunks ,
purgeAfter : o . PersistenceRetentionPeriod ,
checkpointInterval : o . CheckpointInterval ,
checkpointDirtySeriesLimit : o . CheckpointDirtySeriesLimit ,
persistQueue : make ( chan persistRequest , persistQueueCap ) ,
persistStopped : make ( chan struct { } ) ,
persistence : p ,
countPersistedHeadChunks : make ( chan struct { } , 1024 ) ,
evictList : list . New ( ) ,
evictRequests : make ( chan evictRequest , evictRequestsCap ) ,
evictStopping : make ( chan struct { } ) ,
@ -363,11 +370,20 @@ func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) {
Timestamp : sample . Timestamp ,
} )
s . fpLocker . Unlock ( fp )
if len ( chunkDescsToPersist ) == 0 {
return
}
// Queue only outside of the locked area, processing the persistQueue
// requires the same lock!
for _ , cd := range chunkDescsToPersist {
s . persistQueue <- persistRequest { fp , cd }
}
// Count that a head chunk was persisted, but only best effort, i.e. we
// don't want to block here.
select {
case s . countPersistedHeadChunks <- struct { } { } : // Counted.
default : // Meh...
}
}
func ( s * memorySeriesStorage ) getOrCreateSeries ( fp clientmodel . Fingerprint , m clientmodel . Metric ) * memorySeries {
@ -642,10 +658,19 @@ func (s *memorySeriesStorage) cycleThroughArchivedFingerprints() chan clientmode
}
func ( s * memorySeriesStorage ) loop ( ) {
checkpointTicker := time . NewTicker ( s . checkpointInterval )
checkpointTimer := time . NewTimer ( s . checkpointInterval )
// We take the number of head chunks persisted since the last checkpoint
// as an approximation for the number of series that are "dirty",
// i.e. whose head chunk is different from the one in the most recent
// checkpoint or for which the fact that the head chunk has been
// persisted is not reflected in the most recent checkpoint. This count
// could overestimate the number of dirty series, but it's good enough
// as a heuristics.
headChunksPersistedSinceLastCheckpoint := 0
defer func ( ) {
checkpointTicker . Stop ( )
checkpointTim er . Stop ( )
glog . Info ( "Maintenance loop stopped." )
close ( s . loopStopped )
} ( )
@ -658,15 +683,21 @@ loop:
select {
case <- s . loopStopping :
break loop
case <- checkpointTick er . C :
case <- checkpointTim er . C :
s . persistence . checkpointSeriesMapAndHeads ( s . fpToSeries , s . fpLocker )
headChunksPersistedSinceLastCheckpoint = 0
checkpointTimer . Reset ( s . checkpointInterval )
case fp := <- memoryFingerprints :
s . purgeSeries ( fp , clientmodel . TimestampFromTime ( time . Now ( ) ) . Add ( - 1 * s . purgeAfter ) )
s . maintainSeries ( fp )
s . seriesOps . WithLabelValues ( memoryMaintenance ) . Inc ( )
case fp := <- archivedFingerprints :
s . purgeSeries ( fp , clientmodel . TimestampFromTime ( time . Now ( ) ) . Add ( - 1 * s . purgeAfter ) )
s . seriesOps . WithLabelValues ( archiveMaintenance ) . Inc ( )
case <- s . countPersistedHeadChunks :
headChunksPersistedSinceLastCheckpoint ++
if headChunksPersistedSinceLastCheckpoint >= s . checkpointDirtySeriesLimit {
checkpointTimer . Reset ( 0 )
}
}
}
// Wait until both channels are closed.
@ -677,7 +708,8 @@ loop:
}
// maintainSeries closes the head chunk if not touched in a while. It archives a
// series if all chunks are evicted. It evicts chunkDescs if there are too many.
// series if all chunks are evicted. It evicts chunkDescs if there are too
// many.
func ( s * memorySeriesStorage ) maintainSeries ( fp clientmodel . Fingerprint ) {
var headChunkToPersist * chunkDesc
s . fpLocker . Lock ( fp )
@ -687,6 +719,12 @@ func (s *memorySeriesStorage) maintainSeries(fp clientmodel.Fingerprint) {
if headChunkToPersist != nil {
s . persistQueue <- persistRequest { fp , headChunkToPersist }
}
// Count that a head chunk was persisted, but only best effort, i.e. we
// don't want to block here.
select {
case s . countPersistedHeadChunks <- struct { } { } : // Counted.
default : // Meh...
}
} ( )
series , ok := s . fpToSeries . get ( fp )