@ -65,7 +65,8 @@ type Head struct {
memChunkPool sync . Pool
// All series addressable by their ID or hash.
series * stripeSeries
series * stripeSeries
seriesCallback SeriesLifecycleCallback
symMtx sync . RWMutex
symbols map [ string ] struct { }
@ -284,20 +285,23 @@ func (h *Head) PostingsCardinalityStats(statsByLabelName string) *index.Postings
// stripeSize sets the number of entries in the hash map, it must be a power of 2.
// A larger stripeSize will allocate more memory up-front, but will increase performance when handling a large number of series.
// A smaller stripeSize reduces the memory allocated, but can decrease performance with large number of series.
func NewHead ( r prometheus . Registerer , l log . Logger , wal * wal . WAL , chunkRange int64 , chkDirRoot string , pool chunkenc . Pool , stripeSize int ) ( * Head , error ) {
func NewHead ( r prometheus . Registerer , l log . Logger , wal * wal . WAL , chunkRange int64 , chkDirRoot string , pool chunkenc . Pool , stripeSize int , seriesCallback SeriesLifecycleCallback ) ( * Head , error ) {
if l == nil {
l = log . NewNopLogger ( )
}
if chunkRange < 1 {
return nil , errors . Errorf ( "invalid chunk range %d" , chunkRange )
}
if seriesCallback == nil {
seriesCallback = & noopSeriesLifecycleCallback { }
}
h := & Head {
wal : wal ,
logger : l ,
chunkRange : chunkRange ,
minTime : math . MaxInt64 ,
maxTime : math . MinInt64 ,
series : newStripeSeries ( stripeSize ) ,
series : newStripeSeries ( stripeSize , seriesCallback ) ,
values : map [ string ] stringset { } ,
symbols : map [ string ] struct { } { } ,
postings : index . NewUnorderedMemPostings ( ) ,
@ -309,7 +313,8 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int
return & memChunk { }
} ,
} ,
chunkDirRoot : chkDirRoot ,
chunkDirRoot : chkDirRoot ,
seriesCallback : seriesCallback ,
}
h . metrics = newHeadMetrics ( h , r )
@ -408,12 +413,33 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks
n = runtime . GOMAXPROCS ( 0 )
inputs = make ( [ ] chan [ ] record . RefSample , n )
outputs = make ( [ ] chan [ ] record . RefSample , n )
dec record . Decoder
shards = make ( [ ] [ ] record . RefSample , n )
decoded = make ( chan interface { } , 10 )
decodeErr , seriesCreationErr error
seriesPool = sync . Pool {
New : func ( ) interface { } {
return [ ] record . RefSeries { }
} ,
}
samplesPool = sync . Pool {
New : func ( ) interface { } {
return [ ] record . RefSample { }
} ,
}
tstonesPool = sync . Pool {
New : func ( ) interface { } {
return [ ] tombstones . Stone { }
} ,
}
)
wg . Add ( n )
defer func ( ) {
// For CorruptionErr ensure to terminate all workers before exiting.
if _ , ok := err . ( * wal . CorruptionErr ) ; ok {
_ , ok := err . ( * wal . CorruptionErr )
if ok || seriesCreationErr != nil {
for i := 0 ; i < n ; i ++ {
close ( inputs [ i ] )
for range outputs [ i ] {
@ -423,6 +449,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks
}
} ( )
wg . Add ( n )
for i := 0 ; i < n ; i ++ {
outputs [ i ] = make ( chan [ ] record . RefSample , 300 )
inputs [ i ] = make ( chan [ ] record . RefSample , 300 )
@ -434,30 +461,6 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks
} ( inputs [ i ] , outputs [ i ] )
}
var (
dec record . Decoder
shards = make ( [ ] [ ] record . RefSample , n )
)
var (
decoded = make ( chan interface { } , 10 )
errCh = make ( chan error , 1 )
seriesPool = sync . Pool {
New : func ( ) interface { } {
return [ ] record . RefSeries { }
} ,
}
samplesPool = sync . Pool {
New : func ( ) interface { } {
return [ ] record . RefSample { }
} ,
}
tstonesPool = sync . Pool {
New : func ( ) interface { } {
return [ ] tombstones . Stone { }
} ,
}
)
go func ( ) {
defer close ( decoded )
for r . Next ( ) {
@ -467,7 +470,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks
series := seriesPool . Get ( ) . ( [ ] record . RefSeries ) [ : 0 ]
series , err = dec . Series ( rec , series )
if err != nil {
errCh <- & wal . CorruptionErr {
decodeErr = & wal . CorruptionErr {
Err : errors . Wrap ( err , "decode series" ) ,
Segment : r . Segment ( ) ,
Offset : r . Offset ( ) ,
@ -479,7 +482,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks
samples := samplesPool . Get ( ) . ( [ ] record . RefSample ) [ : 0 ]
samples , err = dec . Samples ( rec , samples )
if err != nil {
errCh <- & wal . CorruptionErr {
decodeErr = & wal . CorruptionErr {
Err : errors . Wrap ( err , "decode samples" ) ,
Segment : r . Segment ( ) ,
Offset : r . Offset ( ) ,
@ -491,7 +494,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks
tstones := tstonesPool . Get ( ) . ( [ ] tombstones . Stone ) [ : 0 ]
tstones , err = dec . Tombstones ( rec , tstones )
if err != nil {
errCh <- & wal . CorruptionErr {
decodeErr = & wal . CorruptionErr {
Err : errors . Wrap ( err , "decode tombstones" ) ,
Segment : r . Segment ( ) ,
Offset : r . Offset ( ) ,
@ -500,7 +503,7 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks
}
decoded <- tstones
default :
errCh <- & wal . CorruptionErr {
decodeErr = & wal . CorruptionErr {
Err : errors . Errorf ( "invalid record type %v" , dec . Type ( rec ) ) ,
Segment : r . Segment ( ) ,
Offset : r . Offset ( ) ,
@ -510,11 +513,16 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks
}
} ( )
Outer :
for d := range decoded {
switch v := d . ( type ) {
case [ ] record . RefSeries :
for _ , s := range v {
series , created := h . getOrCreateWithID ( s . Ref , s . Labels . Hash ( ) , s . Labels )
series , created , err := h . getOrCreateWithID ( s . Ref , s . Labels . Hash ( ) , s . Labels )
if err != nil {
seriesCreationErr = err
break Outer
}
if created {
// If this series gets a duplicate record, we don't restore its mmapped chunks,
@ -593,10 +601,14 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64, mmappedChunks
}
}
select {
case err := <- errCh :
return err
default :
if decodeErr != nil {
return decodeErr
}
if seriesCreationErr != nil {
// Drain the channel to unblock the goroutine.
for range decoded {
}
return seriesCreationErr
}
// Signal termination to each worker and wait for it to close its output channel.
@ -1084,7 +1096,10 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro
return 0 , errors . Wrap ( ErrInvalidSample , fmt . Sprintf ( ` label name "%s" is not unique ` , l ) )
}
s , created := a . head . getOrCreate ( lset . Hash ( ) , lset )
s , created , err := a . head . getOrCreate ( lset . Hash ( ) , lset )
if err != nil {
return 0 , err
}
if created {
a . series = append ( a . series , record . RefSeries {
Ref : s . ref ,
@ -1611,13 +1626,13 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks
return nil
}
func ( h * Head ) getOrCreate ( hash uint64 , lset labels . Labels ) ( * memSeries , bool ) {
func ( h * Head ) getOrCreate ( hash uint64 , lset labels . Labels ) ( * memSeries , bool , error ) {
// Just using `getOrSet` below would be semantically sufficient, but we'd create
// a new series on every sample inserted via Add(), which causes allocations
// and makes our series IDs rather random and harder to compress in postings.
s := h . series . getByHash ( hash , lset )
if s != nil {
return s , false
return s , false , nil
}
// Optimistically assume that we are the first one to create the series.
@ -1626,12 +1641,15 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool) {
return h . getOrCreateWithID ( id , hash , lset )
}
func ( h * Head ) getOrCreateWithID ( id , hash uint64 , lset labels . Labels ) ( * memSeries , bool ) {
func ( h * Head ) getOrCreateWithID ( id , hash uint64 , lset labels . Labels ) ( * memSeries , bool , error ) {
s := newMemSeries ( lset , id , h . chunkRange , & h . memChunkPool )
s , created := h . series . getOrSet ( hash , s )
s , created , err := h . series . getOrSet ( hash , s )
if err != nil {
return nil , false , err
}
if ! created {
return s , false
return s , false , nil
}
h . metrics . seriesCreated . Inc ( )
@ -1654,7 +1672,7 @@ func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSerie
h . symbols [ l . Value ] = struct { } { }
}
return s , true
return s , true , nil
}
// seriesHashmap is a simple hashmap for memSeries by their label set. It is built
@ -1707,10 +1725,11 @@ const (
// with the maps was profiled to be slower – likely due to the additional pointer
// dereferences.
type stripeSeries struct {
size int
series [ ] map [ uint64 ] * memSeries
hashes [ ] seriesHashmap
locks [ ] stripeLock
size int
series [ ] map [ uint64 ] * memSeries
hashes [ ] seriesHashmap
locks [ ] stripeLock
seriesLifecycleCallback SeriesLifecycleCallback
}
type stripeLock struct {
@ -1719,12 +1738,13 @@ type stripeLock struct {
_ [ 40 ] byte
}
func newStripeSeries ( stripeSize int ) * stripeSeries {
func newStripeSeries ( stripeSize int , seriesCallback SeriesLifecycleCallback ) * stripeSeries {
s := & stripeSeries {
size : stripeSize ,
series : make ( [ ] map [ uint64 ] * memSeries , stripeSize ) ,
hashes : make ( [ ] seriesHashmap , stripeSize ) ,
locks : make ( [ ] stripeLock , stripeSize ) ,
size : stripeSize ,
series : make ( [ ] map [ uint64 ] * memSeries , stripeSize ) ,
hashes : make ( [ ] seriesHashmap , stripeSize ) ,
locks : make ( [ ] stripeLock , stripeSize ) ,
seriesLifecycleCallback : seriesCallback ,
}
for i := range s . series {
@ -1740,8 +1760,9 @@ func newStripeSeries(stripeSize int) *stripeSeries {
// series entirely that have no chunks left.
func ( s * stripeSeries ) gc ( mint int64 ) ( map [ uint64 ] struct { } , int ) {
var (
deleted = map [ uint64 ] struct { } { }
rmChunks = 0
deleted = map [ uint64 ] struct { } { }
deletedForCallback = [ ] labels . Labels { }
rmChunks = 0
)
// Run through all series and truncate old chunks. Mark those with no
// chunks left as deleted and store their ID.
@ -1772,6 +1793,7 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) {
deleted [ series . ref ] = struct { } { }
s . hashes [ i ] . del ( hash , series . lset )
delete ( s . series [ j ] , series . ref )
deletedForCallback = append ( deletedForCallback , series . lset )
if i != j {
s . locks [ j ] . Unlock ( )
@ -1782,6 +1804,9 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) {
}
s . locks [ i ] . Unlock ( )
s . seriesLifecycleCallback . PostDeletion ( deletedForCallback ... )
deletedForCallback = deletedForCallback [ : 0 ]
}
return deleted , rmChunks
@ -1807,25 +1832,39 @@ func (s *stripeSeries) getByHash(hash uint64, lset labels.Labels) *memSeries {
return series
}
func ( s * stripeSeries ) getOrSet ( hash uint64 , series * memSeries ) ( * memSeries , bool ) {
i := hash & uint64 ( s . size - 1 )
func ( s * stripeSeries ) getOrSet ( hash uint64 , series * memSeries ) ( * memSeries , bool , error ) {
// PreCreation is called here to avoid calling it inside the lock.
// It is not necessary to call it just before creating a series,
// rather it gives a 'hint' whether to create a series or not.
createSeriesErr := s . seriesLifecycleCallback . PreCreation ( series . lset )
i := hash & uint64 ( s . size - 1 )
s . locks [ i ] . Lock ( )
if prev := s . hashes [ i ] . get ( hash , series . lset ) ; prev != nil {
s . locks [ i ] . Unlock ( )
return prev , false
return prev , false , nil
}
if createSeriesErr == nil {
s . hashes [ i ] . set ( hash , series )
}
s . hashes [ i ] . set ( hash , series )
s . locks [ i ] . Unlock ( )
if createSeriesErr != nil {
// The callback prevented creation of series.
return nil , false , createSeriesErr
}
// Setting the series in the s.hashes marks the creation of series
// as any further calls to this methods would return that series.
s . seriesLifecycleCallback . PostCreation ( series . lset )
i = series . ref & uint64 ( s . size - 1 )
s . locks [ i ] . Lock ( )
s . series [ i ] [ series . ref ] = series
s . locks [ i ] . Unlock ( )
return series , true
return series , true , nil
}
type sample struct {
@ -2275,3 +2314,24 @@ type mmappedChunk struct {
func ( mc * mmappedChunk ) OverlapsClosedInterval ( mint , maxt int64 ) bool {
return mc . minTime <= maxt && mint <= mc . maxTime
}
// SeriesLifecycleCallback specifies a list of callbacks that will be called during a lifecycle of a series.
// It is always a no-op in Prometheus and mainly meant for external users who import TSDB.
// All the callbacks should be safe to be called concurrently.
// It is upto the user to implement soft or hard consistency by making the callbacks
// atomic or non-atomic. Atomic callbacks can cause degradation performance.
type SeriesLifecycleCallback interface {
// PreCreation is called before creating a series to indicate if the series can be created.
// A non nil error means the series should not be created.
PreCreation ( labels . Labels ) error
// PostCreation is called after creating a series to indicate a creation of series.
PostCreation ( labels . Labels )
// PostDeletion is called after deletion of series.
PostDeletion ( ... labels . Labels )
}
type noopSeriesLifecycleCallback struct { }
func ( noopSeriesLifecycleCallback ) PreCreation ( labels . Labels ) error { return nil }
func ( noopSeriesLifecycleCallback ) PostCreation ( labels . Labels ) { }
func ( noopSeriesLifecycleCallback ) PostDeletion ( ... labels . Labels ) { }