@ -389,9 +389,6 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) {
var (
dec record . Decoder
series [ ] record . RefSeries
samples [ ] record . RefSample
tstones [ ] tombstones . Stone
allStones = tombstones . NewMemTombstones ( )
shards = make ( [ ] [ ] record . RefSample , n )
)
@ -400,21 +397,82 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) {
level . Warn ( h . logger ) . Log ( "msg" , "closing memTombstones during wal read" , "err" , err )
}
} ( )
for r . Next ( ) {
series , samples , tstones = series [ : 0 ] , samples [ : 0 ] , tstones [ : 0 ]
rec := r . Record ( )
switch dec . Type ( rec ) {
case record . Series :
series , err = dec . Series ( rec , series )
if err != nil {
return & wal . CorruptionErr {
Err : errors . Wrap ( err , "decode series" ) ,
var (
decoded = make ( chan interface { } , 10 )
errCh = make ( chan error , 1 )
seriesPool = sync . Pool {
New : func ( ) interface { } {
return [ ] record . RefSeries { }
} ,
}
samplesPool = sync . Pool {
New : func ( ) interface { } {
return [ ] record . RefSample { }
} ,
}
tstonesPool = sync . Pool {
New : func ( ) interface { } {
return [ ] tombstones . Stone { }
} ,
}
)
go func ( ) {
defer close ( decoded )
for r . Next ( ) {
rec := r . Record ( )
switch dec . Type ( rec ) {
case record . Series :
series := seriesPool . Get ( ) . ( [ ] record . RefSeries ) [ : 0 ]
series , err = dec . Series ( rec , series )
if err != nil {
errCh <- & wal . CorruptionErr {
Err : errors . Wrap ( err , "decode series" ) ,
Segment : r . Segment ( ) ,
Offset : r . Offset ( ) ,
}
return
}
decoded <- series
case record . Samples :
samples := samplesPool . Get ( ) . ( [ ] record . RefSample ) [ : 0 ]
samples , err = dec . Samples ( rec , samples )
if err != nil {
errCh <- & wal . CorruptionErr {
Err : errors . Wrap ( err , "decode samples" ) ,
Segment : r . Segment ( ) ,
Offset : r . Offset ( ) ,
}
return
}
decoded <- samples
case record . Tombstones :
tstones := tstonesPool . Get ( ) . ( [ ] tombstones . Stone ) [ : 0 ]
tstones , err = dec . Tombstones ( rec , tstones )
if err != nil {
errCh <- & wal . CorruptionErr {
Err : errors . Wrap ( err , "decode tombstones" ) ,
Segment : r . Segment ( ) ,
Offset : r . Offset ( ) ,
}
return
}
decoded <- tstones
default :
errCh <- & wal . CorruptionErr {
Err : errors . Errorf ( "invalid record type %v" , dec . Type ( rec ) ) ,
Segment : r . Segment ( ) ,
Offset : r . Offset ( ) ,
}
return
}
for _ , s := range series {
}
} ( )
for d := range decoded {
switch v := d . ( type ) {
case [ ] record . RefSeries :
for _ , s := range v {
series , created := h . getOrCreateWithID ( s . Ref , s . Labels . Hash ( ) , s . Labels )
if ! created {
@ -426,16 +484,10 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) {
h . lastSeriesID = s . Ref
}
}
case record . Samples :
samples , err = dec . Samples ( rec , samples )
s := samples
if err != nil {
return & wal . CorruptionErr {
Err : errors . Wrap ( err , "decode samples" ) ,
Segment : r . Segment ( ) ,
Offset : r . Offset ( ) ,
}
}
//lint:ignore SA6002 relax staticcheck verification.
seriesPool . Put ( v )
case [ ] record . RefSample :
samples := v
// We split up the samples into chunks of 5000 samples or less.
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
// cause thousands of very large in flight buffers occupying large amounts
@ -465,17 +517,10 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) {
}
samples = samples [ m : ]
}
samples = s // Keep whole slice for reuse.
case record . Tombstones :
tstones , err = dec . Tombstones ( rec , tstones )
if err != nil {
return & wal . CorruptionErr {
Err : errors . Wrap ( err , "decode tombstones" ) ,
Segment : r . Segment ( ) ,
Offset : r . Offset ( ) ,
}
}
for _ , s := range tstones {
//lint:ignore SA6002 relax staticcheck verification.
samplesPool . Put ( v )
case [ ] tombstones . Stone :
for _ , s := range v {
for _ , itv := range s . Intervals {
if itv . Maxt < h . minValidTime {
continue
@ -487,15 +532,19 @@ func (h *Head) loadWAL(r *wal.Reader, multiRef map[uint64]uint64) (err error) {
allStones . AddInterval ( s . Ref , itv )
}
}
//lint:ignore SA6002 relax staticcheck verification.
tstonesPool . Put ( v )
default :
return & wal . CorruptionErr {
Err : errors . Errorf ( "invalid record type %v" , dec . Type ( rec ) ) ,
Segment : r . Segment ( ) ,
Offset : r . Offset ( ) ,
}
panic ( fmt . Errorf ( "unexpected decoded type: %T" , d ) )
}
}
select {
case err := <- errCh :
return err
default :
}
// Signal termination to each worker and wait for it to close its output channel.
for i := 0 ; i < n ; i ++ {
close ( inputs [ i ] )