@ -56,7 +56,7 @@ type SamplesCB func([]RefSample) error
type SeriesCB func ( [ ] labels . Labels ) error
// DeletesCB is the callback after reading deletes.
type DeletesCB func ( [ ] s tone) error
type DeletesCB func ( [ ] S tone) error
// SegmentWAL is a write ahead log for series data.
type SegmentWAL struct {
@ -83,7 +83,7 @@ type WAL interface {
Reader ( ) WALReader
LogSeries ( [ ] labels . Labels ) error
LogSamples ( [ ] RefSample ) error
LogDeletes ( tombstoneReader ) error
LogDeletes ( [ ] Stone ) error
Close ( ) error
}
@ -180,8 +180,8 @@ func (w *SegmentWAL) LogSamples(samples []RefSample) error {
}
// LogDeletes write a batch of new deletes to the log.
func ( w * SegmentWAL ) LogDeletes ( tr tombstoneReader ) error {
if err := w . encodeDeletes ( tr ) ; err != nil {
func ( w * SegmentWAL ) LogDeletes ( stones [ ] Stone ) error {
if err := w . encodeDeletes ( stones ) ; err != nil {
return err
}
@ -483,14 +483,14 @@ func (w *SegmentWAL) encodeSamples(samples []RefSample) error {
return w . entry ( WALEntrySamples , walSamplesSimple , buf )
}
func ( w * SegmentWAL ) encodeDeletes ( tr tombstoneReader ) error {
func ( w * SegmentWAL ) encodeDeletes ( stones [ ] Stone ) error {
b := make ( [ ] byte , 2 * binary . MaxVarintLen64 )
eb := & encbuf { b : b }
buf := getWALBuffer ( )
for k , v := range tr {
for _ , itv := range v {
for _ , s := range stones {
for _ , itv := range s . inter vals {
eb . reset ( )
eb . putUvarint32 ( k )
eb . putUvarint32 ( s . ref )
eb . putVarint64 ( itv . mint )
eb . putVarint64 ( itv . maxt )
buf = append ( buf , eb . get ( ) ... )
@ -509,13 +509,9 @@ type walReader struct {
buf [ ] byte
crc32 hash . Hash32
samples [ ] RefSample
series [ ] labels . Labels
stones [ ] stone
samplesFunc SamplesCB
seriesFunc SeriesCB
deletesFunc DeletesCB
curType WALEntryType
curFlag byte
curBuf [ ] byte
err error
}
@ -538,11 +534,30 @@ func (r *walReader) Err() error {
}
func ( r * walReader ) Read ( seriesf SeriesCB , samplesf SamplesCB , deletesf DeletesCB ) error {
r . samplesFunc = samplesf
r . seriesFunc = seriesf
r . deletesFunc = deletesf
for r . next ( ) {
et , flag , b := r . at ( )
// In decoding below we never return a walCorruptionErr for now.
// Those should generally be catched by entry decoding before.
switch et {
case WALEntrySeries :
s , err := r . decodeSeries ( flag , b )
if err != nil {
return err
}
seriesf ( s )
case WALEntrySamples :
s , err := r . decodeSamples ( flag , b )
if err != nil {
return err
}
samplesf ( s )
case WALEntryDeletes :
s , err := r . decodeDeletes ( flag , b )
if err != nil {
return err
}
deletesf ( s )
}
}
return r . Err ( )
@ -570,13 +585,13 @@ func (r *walReader) nextEntry() (WALEntryType, byte, []byte, error) {
return et , flag , b , err
}
func ( r * walReader ) at ( ) ( WALEntryType , byte , [ ] byte ) {
return r . curType , r . curFlag , r . curBuf
}
// next returns decodes the next entry pair and returns true
// if it was succesful.
func ( r * walReader ) next ( ) bool {
r . series = r . series [ : 0 ]
r . samples = r . samples [ : 0 ]
r . stones = r . stones [ : 0 ]
if r . cur >= len ( r . wal . files ) {
return false
}
@ -614,16 +629,9 @@ func (r *walReader) next() bool {
return false
}
// In decoding below we never return a walCorruptionErr for now.
// Those should generally be catched by entry decoding before.
switch et {
case WALEntrySeries :
r . err = r . decodeSeries ( flag , b )
case WALEntrySamples :
r . err = r . decodeSamples ( flag , b )
case WALEntryDeletes :
r . err = r . decodeDeletes ( flag , b )
}
r . curType = et
r . curFlag = flag
r . curBuf = b
return r . err == nil
}
@ -707,11 +715,12 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) {
return etype , flag , buf , nil
}
func ( r * walReader ) decodeSeries ( flag byte , b [ ] byte ) error {
func ( r * walReader ) decodeSeries ( flag byte , b [ ] byte ) ( [ ] labels . Labels , error ) {
series := [ ] labels . Labels { }
for len ( b ) > 0 {
l , n := binary . Uvarint ( b )
if n < 1 {
return errors . Wrap ( errInvalidSize , "number of labels" )
return nil , errors . Wrap ( errInvalidSize , "number of labels" )
}
b = b [ n : ]
lset := make ( labels . Labels , l )
@ -719,29 +728,29 @@ func (r *walReader) decodeSeries(flag byte, b []byte) error {
for i := 0 ; i < int ( l ) ; i ++ {
nl , n := binary . Uvarint ( b )
if n < 1 || len ( b ) < n + int ( nl ) {
return errors . Wrap ( errInvalidSize , "label name" )
return nil , errors . Wrap ( errInvalidSize , "label name" )
}
lset [ i ] . Name = string ( b [ n : n + int ( nl ) ] )
b = b [ n + int ( nl ) : ]
vl , n := binary . Uvarint ( b )
if n < 1 || len ( b ) < n + int ( vl ) {
return errors . Wrap ( errInvalidSize , "label value" )
return nil , errors . Wrap ( errInvalidSize , "label value" )
}
lset [ i ] . Value = string ( b [ n : n + int ( vl ) ] )
b = b [ n + int ( vl ) : ]
}
r . series = append ( r . series , lset )
series = append ( series , lset )
}
return r . seriesFunc ( r . series )
return series , nil
}
func ( r * walReader ) decodeSamples ( flag byte , b [ ] byte ) error {
r . samples = r . samples [ : ]
func ( r * walReader ) decodeSamples ( flag byte , b [ ] byte ) ( [ ] RefSample , error ) {
samples := [ ] RefSample { }
if len ( b ) < 16 {
return errors . Wrap ( errInvalidSize , "header length" )
return nil , errors . Wrap ( errInvalidSize , "header length" )
}
var (
baseRef = binary . BigEndian . Uint64 ( b )
@ -754,7 +763,7 @@ func (r *walReader) decodeSamples(flag byte, b []byte) error {
dref , n := binary . Varint ( b )
if n < 1 {
return errors . Wrap ( errInvalidSize , "sample ref delta" )
return nil , errors . Wrap ( errInvalidSize , "sample ref delta" )
}
b = b [ n : ]
@ -762,36 +771,36 @@ func (r *walReader) decodeSamples(flag byte, b []byte) error {
dtime , n := binary . Varint ( b )
if n < 1 {
return errors . Wrap ( errInvalidSize , "sample timestamp delta" )
return nil , errors . Wrap ( errInvalidSize , "sample timestamp delta" )
}
b = b [ n : ]
smpl . T = baseTime + dtime
if len ( b ) < 8 {
return errors . Wrapf ( errInvalidSize , "sample value bits %d" , len ( b ) )
return nil , errors . Wrapf ( errInvalidSize , "sample value bits %d" , len ( b ) )
}
smpl . V = float64 ( math . Float64frombits ( binary . BigEndian . Uint64 ( b ) ) )
b = b [ 8 : ]
r . samples = append ( r . samples , smpl )
samples = append ( samples , smpl )
}
return r . samplesFunc ( r . samples )
return samples , nil
}
func ( r * walReader ) decodeDeletes ( flag byte , b [ ] byte ) error {
func ( r * walReader ) decodeDeletes ( flag byte , b [ ] byte ) ( [ ] Stone , error ) {
db := & decbuf { b : b }
r . samples = r . samples [ : ]
stones := [ ] Stone { }
for db . len ( ) > 0 {
var s s tone
var s S tone
s . ref = db . uvarint32 ( )
s . intervals = intervals { { db . varint64 ( ) , db . varint64 ( ) } }
if db . err ( ) != nil {
return db . err ( )
return nil , db . err ( )
}
r . stones = append ( r . stones , s )
stones = append ( stones , s )
}
return r . deletesFunc ( r . stones )
return stones , nil
}