@ -1195,14 +1195,14 @@ func (a *initAppender) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Ex
return a . app . AppendExemplar ( ref , l , e )
}
func ( a * initAppender ) AppendHistogram ( ref uint64 , l labels . Labels , sh histogram . SparseHistogram ) ( uint64 , error ) {
func ( a * initAppender ) AppendHistogram ( ref uint64 , l labels . Labels , t int64 , sh histogram . SparseHistogram ) ( uint64 , error ) {
if a . app != nil {
return a . app . AppendHistogram ( ref , l , sh )
return a . app . AppendHistogram ( ref , l , t , sh )
}
//a.head.initTime(sh.Ts) FIXME(ganesh)
a . head . initTime ( t )
a . app = a . head . appender ( )
return a . app . AppendHistogram ( ref , l , sh )
return a . app . AppendHistogram ( ref , l , t , sh )
}
var _ storage . GetRef = & initAppender { }
@ -1363,6 +1363,8 @@ type headAppender struct {
samples [ ] record . RefSample
exemplars [ ] exemplarWithSeriesRef
sampleSeries [ ] * memSeries
histograms [ ] record . RefHistogram
histogramSeries [ ] * memSeries
appendID , cleanupAppendIDsBelow uint64
closed bool
@ -1457,9 +1459,63 @@ func (a *headAppender) AppendExemplar(ref uint64, _ labels.Labels, e exemplar.Ex
return s . ref , nil
}
func ( a * headAppender ) AppendHistogram ( ref uint64 , _ labels . Labels , sh histogram . SparseHistogram ) ( uint64 , error ) {
// TODO.
return 0 , nil
func ( a * headAppender ) AppendHistogram ( ref uint64 , lset labels . Labels , t int64 , sh histogram . SparseHistogram ) ( uint64 , error ) {
if t < a . minValidTime {
a . head . metrics . outOfBoundSamples . Inc ( )
return 0 , storage . ErrOutOfBounds
}
s := a . head . series . getByID ( ref )
if s == nil {
// Ensure no empty labels have gotten through.
lset = lset . WithoutEmpty ( )
if len ( lset ) == 0 {
return 0 , errors . Wrap ( ErrInvalidSample , "empty labelset" )
}
if l , dup := lset . HasDuplicateLabelNames ( ) ; dup {
return 0 , errors . Wrap ( ErrInvalidSample , fmt . Sprintf ( ` label name "%s" is not unique ` , l ) )
}
var created bool
var err error
s , created , err = a . head . getOrCreate ( lset . Hash ( ) , lset )
if err != nil {
return 0 , err
}
if created {
a . series = append ( a . series , record . RefSeries {
Ref : s . ref ,
Labels : lset ,
} )
}
}
s . Lock ( )
if err := s . appendableHistogram ( t , sh ) ; err != nil {
s . Unlock ( )
if err == storage . ErrOutOfOrderSample {
a . head . metrics . outOfOrderSamples . Inc ( )
}
return 0 , err
}
s . pendingCommit = true
s . Unlock ( )
if t < a . mint {
a . mint = t
}
if t > a . maxt {
a . maxt = t
}
a . histograms = append ( a . histograms , record . RefHistogram {
Ref : s . ref ,
T : t ,
H : sh ,
} )
a . histogramSeries = append ( a . histogramSeries , s )
return s . ref , nil
}
var _ storage . GetRef = & headAppender { }
@ -1572,6 +1628,24 @@ func (a *headAppender) Commit() (err error) {
a . head . metrics . chunksCreated . Inc ( )
}
}
total += len ( a . histograms ) // TODO: different metric?
for i , s := range a . histograms {
series = a . histogramSeries [ i ]
series . Lock ( )
ok , chunkCreated := series . appendHistogram ( s . T , s . H , a . appendID , a . head . chunkDiskMapper )
series . cleanupAppendIDsBelow ( a . cleanupAppendIDsBelow )
series . pendingCommit = false
series . Unlock ( )
if ! ok {
total --
a . head . metrics . outOfOrderSamples . Inc ( )
}
if chunkCreated {
a . head . metrics . chunks . Inc ( )
a . head . metrics . chunksCreated . Inc ( )
}
}
a . head . metrics . samplesAppended . Add ( float64 ( total ) )
a . head . updateMinMaxTime ( a . mint , a . maxt )
@ -2347,15 +2421,24 @@ func (s *memSeries) maxTime() int64 {
return c . maxTime
}
func ( s * memSeries ) cutNewHeadChunk ( mint int64 , chunkDiskMapper * chunks . ChunkDiskMapper ) * memChunk {
func ( s * memSeries ) cutNewHeadChunk ( mint int64 , e chunkenc . Encoding , chunkDiskMapper * chunks . ChunkDiskMapper ) * memChunk {
s . mmapCurrentHeadChunk ( chunkDiskMapper )
s . headChunk = & memChunk {
chunk : chunkenc . NewXORChunk ( ) ,
minTime : mint ,
maxTime : math . MinInt64 ,
}
if chunkenc . IsValidEncoding ( e ) {
var err error
s . headChunk . chunk , err = chunkenc . NewEmptyChunk ( e )
if err != nil {
panic ( err ) // This should never happen.
}
} else {
s . headChunk . chunk = chunkenc . NewXORChunk ( )
}
// Set upper bound on when the next chunk must be started. An earlier timestamp
// may be chosen dynamically at a later point.
s . nextAt = rangeForTimestamp ( mint , s . chunkRange )
@ -2409,6 +2492,28 @@ func (s *memSeries) appendable(t int64, v float64) error {
return nil
}
// appendableHistogram checks whether the given sample is valid for appending to the series.
func ( s * memSeries ) appendableHistogram ( t int64 , sh histogram . SparseHistogram ) error {
c := s . head ( )
if c == nil {
return nil
}
if t > c . maxTime {
return nil
}
if t < c . maxTime {
return storage . ErrOutOfOrderSample
}
// TODO: do it for histogram.
// We are allowing exact duplicates as we can encounter them in valid cases
// like federation and erroring out at that time would be extremely noisy.
//if math.Float64bits(s.sampleBuf[3].v) != math.Float64bits(v) {
// return storage.ErrDuplicateSampleForTimestamp
//}
return nil
}
// chunk returns the chunk for the chunk id from memory or by m-mapping it from the disk.
// If garbageCollect is true, it means that the returned *memChunk
// (and not the chunkenc.Chunk inside it) can be garbage collected after it's usage.
@ -2475,27 +2580,71 @@ func (s *memSeries) truncateChunksBefore(mint int64) (removed int) {
// isolation for this append.)
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
func ( s * memSeries ) append ( t int64 , v float64 , appendID uint64 , chunkDiskMapper * chunks . ChunkDiskMapper ) ( sampleInOrder , chunkCreated bool ) {
c , sampleInOrder , chunkCreated := s . appendPreprocessor ( t , chunkenc . EncXOR , chunkDiskMapper )
if ! sampleInOrder {
return sampleInOrder , chunkCreated
}
s . app . Append ( t , v )
c . maxTime = t
s . sampleBuf [ 0 ] = s . sampleBuf [ 1 ]
s . sampleBuf [ 1 ] = s . sampleBuf [ 2 ]
s . sampleBuf [ 2 ] = s . sampleBuf [ 3 ]
s . sampleBuf [ 3 ] = sample { t : t , v : v }
if appendID > 0 {
s . txs . add ( appendID )
}
return true , chunkCreated
}
// appendHistogram adds the sparse histogram.
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
func ( s * memSeries ) appendHistogram ( t int64 , sh histogram . SparseHistogram , appendID uint64 , chunkDiskMapper * chunks . ChunkDiskMapper ) ( sampleInOrder , chunkCreated bool ) {
c , sampleInOrder , chunkCreated := s . appendPreprocessor ( t , chunkenc . EncSHS , chunkDiskMapper )
if ! sampleInOrder {
return sampleInOrder , chunkCreated
}
s . app . AppendHistogram ( t , sh )
c . maxTime = t
if appendID > 0 {
s . txs . add ( appendID )
}
return true , chunkCreated
}
// appendPreprocessor takes care of cutting new chunks and m-mapping old chunks.
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
// This should be called only when appending data.
func ( s * memSeries ) appendPreprocessor ( t int64 , e chunkenc . Encoding , chunkDiskMapper * chunks . ChunkDiskMapper ) ( c * memChunk , sampleInOrder , chunkCreated bool ) {
// Based on Gorilla white papers this offers near-optimal compression ratio
// so anything bigger that this has diminishing returns and increases
// the time range within which we have to decompress all samples.
const samplesPerChunk = 120
c := s . head ( )
c = s . head ( )
if c == nil {
if len ( s . mmappedChunks ) > 0 && s . mmappedChunks [ len ( s . mmappedChunks ) - 1 ] . maxTime >= t {
// Out of order sample. Sample timestamp is already in the mmaped chunks, so ignore it.
return false , false
return c , false , false
}
// There is no chunk in this series yet, create the first chunk for the sample.
c = s . cutNewHeadChunk ( t , chunkDiskMapper )
c = s . cutNewHeadChunk ( t , e , chunkDiskMapper )
chunkCreated = true
}
numSamples := c . chunk . NumSamples ( )
// Out of order sample.
if c . maxTime >= t {
return false , chunkCreated
return c , false , chunkCreated
}
// If we reach 25% of a chunk's desired sample count, set a definitive time
// at which to start the next chunk.
@ -2504,23 +2653,10 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper
s . nextAt = computeChunkEndTime ( c . minTime , c . maxTime , s . nextAt )
}
if t >= s . nextAt {
c = s . cutNewHeadChunk ( t , chunkDiskMapper )
c = s . cutNewHeadChunk ( t , e , chunkDiskMapper )
chunkCreated = true
}
s . app . Append ( t , v )
c . maxTime = t
s . sampleBuf [ 0 ] = s . sampleBuf [ 1 ]
s . sampleBuf [ 1 ] = s . sampleBuf [ 2 ]
s . sampleBuf [ 2 ] = s . sampleBuf [ 3 ]
s . sampleBuf [ 3 ] = sample { t : t , v : v }
if appendID > 0 {
s . txs . add ( appendID )
}
return true , chunkCreated
return c , true , chunkCreated
}
// cleanupAppendIDsBelow cleans up older appendIDs. Has to be called after