@ -15,6 +15,7 @@
package local
import (
"container/list"
"sync/atomic"
"time"
@ -27,13 +28,17 @@ import (
)
const (
persistQueueCap = 1024
chunkLen = 1024
persistQueueCap = 1024
evictRequestsCap = 1024
chunkLen = 1024
// See waitForNextFP.
fpMaxWaitDuration = 10 * time . Second
fpMinWaitDuration = 5 * time . Millisecond // ~ hard disk seek time.
fpMaxSweepTime = 6 * time . Hour
maxEvictInterval = time . Minute
headChunkTimeout = time . Hour // Close head chunk if not touched for that long.
)
type storageState uint
@ -49,12 +54,17 @@ type persistRequest struct {
chunkDesc * chunkDesc
}
type evictRequest struct {
cd * chunkDesc
evict bool
}
type memorySeriesStorage struct {
fpLocker * fingerprintLocker
fpToSeries * seriesMap
loopStopping , loopStopped chan struct { }
evictInterval , evictAfter time . Duration
maxMemoryChunks int
purgeAfter time . Duration
checkpointInterval time . Duration
@ -62,22 +72,25 @@ type memorySeriesStorage struct {
persistStopped chan struct { }
persistence * persistence
persistLatency prometheus . Summary
persistErrors * prometheus . CounterVec
persistQueueLength prometheus . Gauge
numSeries prometheus . Gauge
seriesOps * prometheus . CounterVec
ingestedSamplesCount prometheus . Counter
invalidPreloadRequestsCount prometheus . Counter
purgeDuration , evictDuration prometheus . Gauge
evictList * list . List
evictRequests chan evictRequest
evictStopping , evictStopped chan struct { }
persistLatency prometheus . Summary
persistErrors * prometheus . CounterVec
persistQueueLength prometheus . Gauge
numSeries prometheus . Gauge
seriesOps * prometheus . CounterVec
ingestedSamplesCount prometheus . Counter
invalidPreloadRequestsCount prometheus . Counter
purgeDuration prometheus . Gauge
}
// MemorySeriesStorageOptions contains options needed by
// NewMemorySeriesStorage. It is not safe to leave any of those at their zero
// values.
type MemorySeriesStorageOptions struct {
MemoryEvictionInterval time . Duration // How often to check for memory eviction.
MemoryRetentionPeriod time . Duration // Chunks at least that old are evicted from memory.
MemoryChunks int // How many chunks to keep in memory.
PersistenceStoragePath string // Location of persistence files.
PersistenceRetentionPeriod time . Duration // Chunks at least that old are purged.
CheckpointInterval time . Duration // How often to checkpoint the series map and head chunks.
@ -111,8 +124,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
loopStopping : make ( chan struct { } ) ,
loopStopped : make ( chan struct { } ) ,
evictInterval : o . MemoryEvictionInterval ,
evictAfter : o . MemoryRetentionPeriod ,
maxMemoryChunks : o . MemoryChunks ,
purgeAfter : o . PersistenceRetentionPeriod ,
checkpointInterval : o . CheckpointInterval ,
@ -120,6 +132,11 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
persistStopped : make ( chan struct { } ) ,
persistence : p ,
evictList : list . New ( ) ,
evictRequests : make ( chan evictRequest , evictRequestsCap ) ,
evictStopping : make ( chan struct { } ) ,
evictStopped : make ( chan struct { } ) ,
persistLatency : prometheus . NewSummary ( prometheus . SummaryOpts {
Namespace : namespace ,
Subsystem : subsystem ,
@ -163,23 +180,12 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
Name : "invalid_preload_requests_total" ,
Help : "The total number of preload requests referring to a non-existent series. This is an indication of outdated label indexes." ,
} ) ,
purgeDuration : prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "purge_duration_milliseconds" ,
Help : "The duration of the last storage purge iteration in milliseconds." ,
} ) ,
evictDuration : prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "evict_duration_milliseconds" ,
Help : "The duration of the last memory eviction iteration in milliseconds." ,
} ) ,
} , nil
}
// Start implements Storage.
func ( s * memorySeriesStorage ) Start ( ) {
go s . handleEvictList ( )
go s . handlePersistQueue ( )
go s . loop ( )
}
@ -187,14 +193,19 @@ func (s *memorySeriesStorage) Start() {
// Stop implements Storage.
func ( s * memorySeriesStorage ) Stop ( ) error {
glog . Info ( "Stopping local storage..." )
glog . Info ( "Stopping maintenance loop..." )
close ( s . loopStopping )
<- s . loopStopped
glog . Info ( "Stopping persist loop ..." )
glog . Info ( "Stopping persist queue ..." )
close ( s . persistQueue )
<- s . persistStopped
glog . Info ( "Stopping chunk eviction..." )
close ( s . evictStopping )
<- s . evictStopped
// One final checkpoint of the series map and the head chunks.
if err := s . persistence . checkpointSeriesMapAndHeads ( s . fpToSeries , s . fpLocker ) ; err != nil {
return err
@ -407,7 +418,83 @@ func (s *memorySeriesStorage) preloadChunksForRange(
return nil , nil
}
}
return series . preloadChunksForRange ( from , through , fp , s . persistence )
return series . preloadChunksForRange ( from , through , fp , s )
}
func ( s * memorySeriesStorage ) handleEvictList ( ) {
ticker := time . NewTicker ( maxEvictInterval )
count := 0
loop :
for {
// To batch up evictions a bit, this tries evictions at least
// once per evict interval, but earlier if the number of evict
// requests with evict==true that has happened since the last
// evict run is more than maxMemoryChunks/1000.
select {
case req := <- s . evictRequests :
if req . evict {
req . cd . evictListElement = s . evictList . PushBack ( req . cd )
count ++
if count > s . maxMemoryChunks / 1000 {
s . maybeEvict ( )
count = 0
}
} else {
if req . cd . evictListElement != nil {
s . evictList . Remove ( req . cd . evictListElement )
req . cd . evictListElement = nil
}
}
case <- ticker . C :
if s . evictList . Len ( ) > 0 {
s . maybeEvict ( )
}
case <- s . evictStopping :
break loop
}
}
ticker . Stop ( )
glog . Info ( "Chunk eviction stopped." )
close ( s . evictStopped )
}
// maybeEvict is a local helper method. Must only be called by handleEvictList.
func ( s * memorySeriesStorage ) maybeEvict ( ) {
numChunksToEvict := int ( atomic . LoadInt64 ( & numMemChunks ) ) - s . maxMemoryChunks
if numChunksToEvict <= 0 {
return
}
chunkDescsToEvict := make ( [ ] * chunkDesc , numChunksToEvict )
for i := range chunkDescsToEvict {
e := s . evictList . Front ( )
if e == nil {
break
}
cd := e . Value . ( * chunkDesc )
cd . evictListElement = nil
chunkDescsToEvict [ i ] = cd
s . evictList . Remove ( e )
}
// Do the actual eviction in a goroutine as we might otherwise deadlock,
// in the following way: A chunk was unpinned completely and therefore
// scheduled for eviction. At the time we actually try to evict it,
// another goroutine is pinning the chunk. The pinning goroutine has
// currently locked the chunk and tries to send the evict request (to
// remove the chunk from the evict list) to the evictRequests
// channel. The send blocks because evictRequests is full. However, the
// goroutine that is supposed to empty the channel is wating for the
// chunkDesc lock to try to evict the chunk.
go func ( ) {
for _ , cd := range chunkDescsToEvict {
if cd == nil {
break
}
cd . maybeEvict ( )
// We don't care if the eviction succeeds. If the chunk
// was pinned in the meantime, it will be added to the
// evict list once it gets unpinned again.
}
} ( )
}
func ( s * memorySeriesStorage ) handlePersistQueue ( ) {
@ -430,10 +517,10 @@ func (s *memorySeriesStorage) handlePersistQueue() {
s . persistence . setDirty ( true )
continue
}
req . chunkDesc . unpin ( )
req . chunkDesc . unpin ( s . evictRequests )
chunkOps . WithLabelValues ( persistAndUnpin ) . Inc ( )
}
glog . Info ( "Persist loop stopped." )
glog . Info ( "Persist queue drained and stopped." )
close ( s . persistStopped )
}
@ -471,11 +558,9 @@ func (s *memorySeriesStorage) waitForNextFP(numberOfFPs int) bool {
}
func ( s * memorySeriesStorage ) loop ( ) {
evictTicker := time . NewTicker ( s . evictInterval )
checkpointTicker := time . NewTicker ( s . checkpointInterval )
defer func ( ) {
evictTicker . Stop ( )
checkpointTicker . Stop ( )
glog . Info ( "Maintenance loop stopped." )
close ( s . loopStopped )
@ -550,47 +635,9 @@ loop:
break loop
case <- checkpointTicker . C :
s . persistence . checkpointSeriesMapAndHeads ( s . fpToSeries , s . fpLocker )
case <- evictTicker . C :
// TODO: Change this to be based on number of chunks in memory.
glog . Info ( "Evicting chunks..." )
begin := time . Now ( )
for m := range s . fpToSeries . iter ( ) {
select {
case <- s . loopStopping :
glog . Info ( "Interrupted evicting chunks." )
break loop
default :
// Keep going.
}
s . fpLocker . Lock ( m . fp )
allEvicted , headChunkToPersist := m . series . evictOlderThan (
clientmodel . TimestampFromTime ( time . Now ( ) ) . Add ( - 1 * s . evictAfter ) ,
)
if allEvicted {
s . fpToSeries . del ( m . fp )
s . numSeries . Dec ( )
if err := s . persistence . archiveMetric (
m . fp , m . series . metric , m . series . firstTime ( ) , m . series . lastTime ( ) ,
) ; err != nil {
glog . Errorf ( "Error archiving metric %v: %v" , m . series . metric , err )
} else {
s . seriesOps . WithLabelValues ( archive ) . Inc ( )
}
}
s . fpLocker . Unlock ( m . fp )
// Queue outside of lock!
if headChunkToPersist != nil {
s . persistQueue <- persistRequest { m . fp , headChunkToPersist }
}
}
duration := time . Since ( begin )
s . evictDuration . Set ( float64 ( duration ) / float64 ( time . Millisecond ) )
glog . Infof ( "Done evicting chunks in %v." , duration )
case fp := <- memoryFingerprints :
s . purgeSeries ( fp , clientmodel . TimestampFromTime ( time . Now ( ) ) . Add ( - 1 * s . purgeAfter ) )
// TODO: Move chunkdesc eviction, head chunk closing, and archiving here.
s . maintainSeries ( fp )
s . seriesOps . WithLabelValues ( memoryMaintenance ) . Inc ( )
case fp := <- archivedFingerprints :
s . purgeSeries ( fp , clientmodel . TimestampFromTime ( time . Now ( ) ) . Add ( - 1 * s . purgeAfter ) )
@ -604,6 +651,56 @@ loop:
}
}
// maintainSeries closes the head chunk if not touched in a while. It archives a
// series if all chunks are evicted. It evicts chunkDescs if there are too many.
func ( s * memorySeriesStorage ) maintainSeries ( fp clientmodel . Fingerprint ) {
var headChunkToPersist * chunkDesc
s . fpLocker . Lock ( fp )
defer func ( ) {
s . fpLocker . Unlock ( fp )
// Queue outside of lock!
if headChunkToPersist != nil {
s . persistQueue <- persistRequest { fp , headChunkToPersist }
}
} ( )
series , ok := s . fpToSeries . get ( fp )
if ! ok {
return
}
iOldestNotEvicted := - 1
for i , cd := range series . chunkDescs {
if ! cd . isEvicted ( ) {
iOldestNotEvicted = i
break
}
}
// Archive if all chunks are evicted.
if iOldestNotEvicted == - 1 {
s . fpToSeries . del ( fp )
s . numSeries . Dec ( )
if err := s . persistence . archiveMetric (
fp , series . metric , series . firstTime ( ) , series . lastTime ( ) ,
) ; err != nil {
glog . Errorf ( "Error archiving metric %v: %v" , series . metric , err )
} else {
s . seriesOps . WithLabelValues ( archive ) . Inc ( )
}
return
}
// If we are here, the series is not archived, so check for chunkDesc
// eviction next and then if the head chunk needs to be persisted.
series . evictChunkDescs ( iOldestNotEvicted )
if ! series . headChunkPersisted && time . Now ( ) . Sub ( series . head ( ) . firstTime ( ) . Time ( ) ) > headChunkTimeout {
series . headChunkPersisted = true
// Since we cannot modify the head chunk from now on, we
// don't need to bother with cloning anymore.
series . headChunkUsedByIterator = false
headChunkToPersist = series . head ( )
}
}
// purgeSeries purges chunks older than beforeTime from a series. If the series
// contains no chunks after the purge, it is dropped entirely.
func ( s * memorySeriesStorage ) purgeSeries ( fp clientmodel . Fingerprint , beforeTime clientmodel . Timestamp ) {
@ -635,7 +732,6 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime
}
return
}
// Deal with archived series.
has , firstTime , lastTime , err := s . persistence . hasArchivedMetric ( fp )
if err != nil {
@ -648,6 +744,7 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime
}
newFirstTime , _ , allDropped , err := s . persistence . dropChunks ( fp , beforeTime )
glog . Infoln ( "DEBUG:" , newFirstTime , allDropped )
if err != nil {
glog . Error ( "Error purging persisted chunks: " , err )
}
@ -685,8 +782,6 @@ func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
s . seriesOps . Describe ( ch )
ch <- s . ingestedSamplesCount . Desc ( )
ch <- s . invalidPreloadRequestsCount . Desc ( )
ch <- s . purgeDuration . Desc ( )
ch <- s . evictDuration . Desc ( )
ch <- persistQueueCapDesc
@ -705,8 +800,6 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
s . seriesOps . Collect ( ch )
ch <- s . ingestedSamplesCount
ch <- s . invalidPreloadRequestsCount
ch <- s . purgeDuration
ch <- s . evictDuration
ch <- persistQueueCapGauge