@ -20,6 +20,7 @@ import (
"github.com/prometheus/prometheus/model"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage"
"sort"
"sync"
"time"
)
@ -58,10 +59,11 @@ type Storage interface {
Serve ( )
// Stops the storage subsystem, flushing all pending operations.
Drain ( )
Flush ( )
}
func NewTieredStorage ( appendToMemoryQueueDepth , appendToDiskQueueDepth , viewQueueDepth uint , flushMemoryInterval , writeMemoryInterval , memoryTTL time . Duration ) Storage {
diskStorage , err := NewLevelDBMetricPersistence ( "/tmp/metrics-foof" )
func NewTieredStorage ( appendToMemoryQueueDepth , appendToDiskQueueDepth , viewQueueDepth uint , flushMemoryInterval , writeMemoryInterval , memoryTTL time . Duration , root string ) Storage {
diskStorage , err := NewLevelDBMetricPersistence ( root )
if err != nil {
panic ( err )
}
@ -90,7 +92,9 @@ func (t *tieredStorage) AppendSample(s model.Sample) (err error) {
}
func ( t * tieredStorage ) Drain ( ) {
t . draining <- true
if len ( t . draining ) == 0 {
t . draining <- true
}
}
func ( t * tieredStorage ) MakeView ( builder ViewRequestBuilder , deadline time . Duration ) ( view View , err error ) {
@ -120,13 +124,13 @@ func (t *tieredStorage) MakeView(builder ViewRequestBuilder, deadline time.Durat
}
func ( t * tieredStorage ) rebuildDiskFrontier ( ) ( err error ) {
fmt . Println ( "a1" )
begin := time . Now ( )
defer func ( ) {
duration := time . Now ( ) . Sub ( begin )
recordOutcome ( duration , err , map [ string ] string { operation : appendSample , result : success } , map [ string ] string { operation : rebuildDiskFrontier , result : failure } )
} ( )
i , closer , err := t . diskStorage . metricSamples . GetIterator ( )
if closer != nil {
defer closer . Close ( )
@ -134,12 +138,10 @@ func (t *tieredStorage) rebuildDiskFrontier() (err error) {
if err != nil {
panic ( err )
}
t . diskFrontier , err = newDiskFrontier ( i )
if err != nil {
panic ( err )
}
return
}
@ -150,6 +152,8 @@ func (t *tieredStorage) Serve() {
)
for {
t . reportQueues ( )
select {
case <- writeMemoryTicker :
t . writeMemory ( )
@ -159,11 +163,22 @@ func (t *tieredStorage) Serve() {
t . renderView ( viewRequest )
case <- t . draining :
t . flush ( )
return
break
}
}
}
func ( t * tieredStorage ) reportQueues ( ) {
queueSizes . Set ( map [ string ] string { "queue" : "append_to_disk" , "facet" : "occupancy" } , float64 ( len ( t . appendToDiskQueue ) ) )
queueSizes . Set ( map [ string ] string { "queue" : "append_to_disk" , "facet" : "capacity" } , float64 ( cap ( t . appendToDiskQueue ) ) )
queueSizes . Set ( map [ string ] string { "queue" : "append_to_memory" , "facet" : "occupancy" } , float64 ( len ( t . appendToMemoryQueue ) ) )
queueSizes . Set ( map [ string ] string { "queue" : "append_to_memory" , "facet" : "capacity" } , float64 ( cap ( t . appendToMemoryQueue ) ) )
queueSizes . Set ( map [ string ] string { "queue" : "view_generation" , "facet" : "occupancy" } , float64 ( len ( t . viewQueue ) ) )
queueSizes . Set ( map [ string ] string { "queue" : "view_generation" , "facet" : "capacity" } , float64 ( cap ( t . viewQueue ) ) )
}
func ( t * tieredStorage ) writeMemory ( ) {
begin := time . Now ( )
defer func ( ) {
@ -182,6 +197,10 @@ func (t *tieredStorage) writeMemory() {
}
}
func ( t * tieredStorage ) Flush ( ) {
t . flush ( )
}
// Write all pending appends.
func ( t * tieredStorage ) flush ( ) ( err error ) {
t . writeMemory ( )
@ -223,7 +242,6 @@ func (f memoryToDiskFlusherVisitor) Filter(key, value interface{}) (filterResult
return storage . ACCEPT
}
f . flusher . valuesRejected ++
return storage . STOP
}
@ -312,6 +330,7 @@ func (t *tieredStorage) renderView(viewJob viewJob) (err error) {
scans = viewJob . builder . ScanJobs ( )
// standingOperations = ops{}
// lastTime = time.Time{}
view = newView ( )
)
// Rebuilding of the frontier should happen on a conditional basis if a
@ -332,12 +351,10 @@ func (t *tieredStorage) renderView(viewJob viewJob) (err error) {
for _ , scanJob := range scans {
// XXX: Memoize the last retrieval for forward scans.
var (
standingOperations ops
// standingOperations ops
)
fmt . Printf ( "Starting scan of %s...\n" , scanJob )
// If the fingerprint is outside of the known frontier for the disk, the
// disk won't be queried at this time.
if ! ( t . diskFrontier == nil || scanJob . fingerprint . Less ( t . diskFrontier . firstFingerprint ) || t . diskFrontier . lastFingerprint . Less ( scanJob . fingerprint ) ) {
fmt . Printf ( "Using diskFrontier %s\n" , t . diskFrontier )
seriesFrontier , err := newSeriesFrontier ( scanJob . fingerprint , * t . diskFrontier , iterator )
@ -347,28 +364,28 @@ func (t *tieredStorage) renderView(viewJob viewJob) (err error) {
}
if seriesFrontier != nil {
for _ , operation := range scanJob . operations {
scanJob . operations = scanJob . operations [ 1 : len ( scanJob . operations ) ]
var (
targetKey = & dto . SampleKey { }
foundKey = & dto . SampleKey { }
foundValue * dto . SampleValueSeries
)
// if operation.StartsAt().Before(seriesFrontier.firstSupertime) {
// fmt.Printf("operation %s occurs before %s; discarding...\n", operation, seriesFrontier.firstSupertime)
// continue
// }
for _ , operation := range scanJob . operations {
if seriesFrontier . lastTime . Before ( operation . StartsAt ( ) ) {
fmt . Printf ( "operation %s occurs after %s; aborting...\n" , operation , seriesFrontier . lastTime )
break
}
// if seriesFrontier.lastTime.Before(operation.StartsAt()) {
// fmt.Printf("operation %s occurs after %s; discarding...\n", operation, seriesFrontier.lastTime)
// continue
// }
scanJob . operations = scanJob . operations [ 1 : len ( scanJob . operations ) ]
var (
targetKey = & dto . SampleKey { }
foundKey = & dto . SampleKey { }
)
if operation . StartsAt ( ) . Before ( seriesFrontier . firstSupertime ) {
fmt . Printf ( "operation %s occurs before %s; discarding...\n" , operation , seriesFrontier . firstSupertime )
continue
}
targetKey . Fingerprint = scanJob . fingerprint . ToDTO ( )
targetKey . Timestamp = indexable . EncodeTime ( operation . StartsAt ( ) )
fmt . Println ( "target (unencoded) ->" , targetKey )
rawKey , _ := coding . NewProtocolBufferEncoder ( targetKey ) . Encode ( )
iterator . Seek ( rawKey )
@ -378,35 +395,45 @@ func (t *tieredStorage) renderView(viewJob viewJob) (err error) {
panic ( err )
}
fmt . Printf ( "startAt -> %s\n" , operation . StartsAt ( ) )
fmt . Println ( "target ->" , rawKey )
fmt . Println ( "found ->" , iterator . Key ( ) )
fst := indexable . DecodeTime ( foundKey . Timestamp )
lst := time . Unix ( * foundKey . LastTimestamp , 0 )
fmt . Printf ( "(%s, %s)\n" , fst , lst )
fmt . Println ( rawKey )
fmt . Println ( foundKey )
var (
fst = indexable . DecodeTime ( foundKey . Timestamp )
lst = time . Unix ( * foundKey . LastTimestamp , 0 )
)
if ! ( ( operation . StartsAt ( ) . Before ( fst ) ) || lst . Before ( operation . StartsAt ( ) ) ) {
fmt . Printf ( "operation %s occurs inside of %s...\n" , operation , foundKey )
foundValue , err = extractSampleValue ( iterator )
if err != nil {
panic ( err )
}
fmt . Printf ( "f -> %s\n" , foundValue )
} else if operation . StartsAt ( ) . Before ( fst ) {
fmt . Printf ( "operation %s may occur in next entity; fast forwarding...\n" , operation )
panic ( "oops" )
} else {
for i := 0 ; i < 3 ; i ++ {
iterator . Next ( )
fmt . Println ( i )
foundKey , err = extractSampleKey ( iterator )
if err != nil {
panic ( err )
}
fst = indexable . DecodeTime ( foundKey . Timestamp )
lst = time . Unix ( * foundKey . LastTimestamp , 0 )
fmt . Println ( "found ->" , iterator . Key ( ) )
fmt . Printf ( "(%s, %s)\n" , fst , lst )
fmt . Println ( foundKey )
panic ( "illegal state" )
}
var (
elementCount = len ( foundValue . Value )
searcher = func ( i int ) bool {
return time . Unix ( * foundValue . Value [ i ] . Timestamp , 0 ) . After ( operation . StartsAt ( ) )
}
index = sort . Search ( elementCount , searcher )
)
standingOperations = append ( standingOperations , operation )
foundValue . Value = foundValue . Value [ index : elementCount ]
switch operation . ( type ) {
case getValuesAtTimeOp :
if len ( foundValue . Value ) > 0 {
view . appendSample ( scanJob . fingerprint , time . Unix ( * foundValue . Value [ 0 ] . Timestamp , 0 ) , model . SampleValue ( * foundValue . Value [ 0 ] . Value ) )
}
if len ( foundValue . Value ) > 1 {
}
default :
panic ( "unhandled" )
}
}
}
@ -445,6 +472,8 @@ func (t *tieredStorage) renderView(viewJob viewJob) (err error) {
// s.operations = s.operations[1:len(s.operations)]
// }
viewJob . output <- view
return
}