|
|
|
|
@ -52,13 +52,16 @@ const ( |
|
|
|
|
WALEntryDeletes WALEntryType = 4 |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// SamplesCB is the callback after reading samples.
|
|
|
|
|
// SamplesCB is the callback after reading samples. The passed slice
|
|
|
|
|
// is only valid until the call returns.
|
|
|
|
|
type SamplesCB func([]RefSample) error |
|
|
|
|
|
|
|
|
|
// SeriesCB is the callback after reading series.
|
|
|
|
|
// SeriesCB is the callback after reading series. The passed slice
|
|
|
|
|
// is only valid until the call returns.
|
|
|
|
|
type SeriesCB func([]RefSeries) error |
|
|
|
|
|
|
|
|
|
// DeletesCB is the callback after reading deletes.
|
|
|
|
|
// DeletesCB is the callback after reading deletes. The passed slice
|
|
|
|
|
// is only valid until the call returns.
|
|
|
|
|
type DeletesCB func([]Stone) error |
|
|
|
|
|
|
|
|
|
// WAL is a write ahead log that can log new series labels and samples.
|
|
|
|
|
@ -395,6 +398,10 @@ func (w *SegmentWAL) LogSeries(series []RefSeries) error { |
|
|
|
|
buf := w.getBuffer() |
|
|
|
|
|
|
|
|
|
flag := w.encodeSeries(buf, series) |
|
|
|
|
|
|
|
|
|
w.mtx.Lock() |
|
|
|
|
defer w.mtx.Unlock() |
|
|
|
|
|
|
|
|
|
err := w.write(WALEntrySeries, flag, buf.get()) |
|
|
|
|
|
|
|
|
|
w.putBuffer(buf) |
|
|
|
|
@ -410,10 +417,6 @@ func (w *SegmentWAL) LogSeries(series []RefSeries) error { |
|
|
|
|
tf.minSeries = s.Ref |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if w.flushInterval <= 0 { |
|
|
|
|
return errors.Wrap(w.Sync(), "sync") |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -422,6 +425,10 @@ func (w *SegmentWAL) LogSamples(samples []RefSample) error { |
|
|
|
|
buf := w.getBuffer() |
|
|
|
|
|
|
|
|
|
flag := w.encodeSamples(buf, samples) |
|
|
|
|
|
|
|
|
|
w.mtx.Lock() |
|
|
|
|
defer w.mtx.Unlock() |
|
|
|
|
|
|
|
|
|
err := w.write(WALEntrySamples, flag, buf.get()) |
|
|
|
|
|
|
|
|
|
w.putBuffer(buf) |
|
|
|
|
@ -436,10 +443,6 @@ func (w *SegmentWAL) LogSamples(samples []RefSample) error { |
|
|
|
|
tf.maxTime = s.T |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if w.flushInterval <= 0 { |
|
|
|
|
return errors.Wrap(w.Sync(), "sync") |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -448,6 +451,10 @@ func (w *SegmentWAL) LogDeletes(stones []Stone) error { |
|
|
|
|
buf := w.getBuffer() |
|
|
|
|
|
|
|
|
|
flag := w.encodeDeletes(buf, stones) |
|
|
|
|
|
|
|
|
|
w.mtx.Lock() |
|
|
|
|
defer w.mtx.Unlock() |
|
|
|
|
|
|
|
|
|
err := w.write(WALEntryDeletes, flag, buf.get()) |
|
|
|
|
|
|
|
|
|
w.putBuffer(buf) |
|
|
|
|
@ -464,10 +471,6 @@ func (w *SegmentWAL) LogDeletes(stones []Stone) error { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if w.flushInterval <= 0 { |
|
|
|
|
return errors.Wrap(w.Sync(), "sync") |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -522,19 +525,26 @@ func (w *SegmentWAL) createSegmentFile(name string) (*os.File, error) { |
|
|
|
|
func (w *SegmentWAL) cut() error { |
|
|
|
|
// Sync current head to disk and close.
|
|
|
|
|
if hf := w.head(); hf != nil { |
|
|
|
|
if err := w.sync(); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
off, err := hf.Seek(0, os.SEEK_CUR) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
if err := hf.Truncate(off); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
if err := hf.Close(); err != nil { |
|
|
|
|
if err := w.flush(); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
// Finish last segment asynchronously to not block the WAL moving along
|
|
|
|
|
// in the new segment.
|
|
|
|
|
go func() { |
|
|
|
|
off, err := hf.Seek(0, os.SEEK_CUR) |
|
|
|
|
if err != nil { |
|
|
|
|
w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err) |
|
|
|
|
} |
|
|
|
|
if err := hf.Truncate(off); err != nil { |
|
|
|
|
w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err) |
|
|
|
|
} |
|
|
|
|
if err := hf.Sync(); err != nil { |
|
|
|
|
w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err) |
|
|
|
|
} |
|
|
|
|
if err := hf.Close(); err != nil { |
|
|
|
|
w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err) |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
p, _, err := nextSequenceFile(w.dirFile.Name()) |
|
|
|
|
@ -546,9 +556,11 @@ func (w *SegmentWAL) cut() error { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if err = w.dirFile.Sync(); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
go func() { |
|
|
|
|
if err = w.dirFile.Sync(); err != nil { |
|
|
|
|
w.logger.Log("msg", "sync WAL directory", "err", err) |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
w.files = append(w.files, newSegmentFile(f)) |
|
|
|
|
|
|
|
|
|
@ -594,6 +606,9 @@ func (w *SegmentWAL) sync() error { |
|
|
|
|
if err := w.flush(); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
if w.head() == nil { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
return fileutil.Fdatasync(w.head().File) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -655,8 +670,6 @@ const ( |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
func (w *SegmentWAL) write(t WALEntryType, flag uint8, buf []byte) error { |
|
|
|
|
w.mtx.Lock() |
|
|
|
|
defer w.mtx.Unlock() |
|
|
|
|
// Cut to the next segment if the entry exceeds the file size unless it would also
|
|
|
|
|
// exceed the size of a new segment.
|
|
|
|
|
// TODO(gouthamve): Add a test for this case where the commit is greater than segmentSize.
|
|
|
|
|
@ -769,6 +782,10 @@ type walReader struct { |
|
|
|
|
curBuf []byte |
|
|
|
|
lastOffset int64 // offset after last successfully read entry
|
|
|
|
|
|
|
|
|
|
seriesBuf []RefSeries |
|
|
|
|
sampleBuf []RefSample |
|
|
|
|
tombstoneBuf []Stone |
|
|
|
|
|
|
|
|
|
err error |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -996,7 +1013,8 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) { |
|
|
|
|
series := []RefSeries{} |
|
|
|
|
r.seriesBuf = r.seriesBuf[:0] |
|
|
|
|
|
|
|
|
|
dec := decbuf{b: b} |
|
|
|
|
|
|
|
|
|
for len(dec.b) > 0 && dec.err() == nil { |
|
|
|
|
@ -1010,7 +1028,7 @@ func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) { |
|
|
|
|
} |
|
|
|
|
sort.Sort(lset) |
|
|
|
|
|
|
|
|
|
series = append(series, RefSeries{ |
|
|
|
|
r.seriesBuf = append(r.seriesBuf, RefSeries{ |
|
|
|
|
Ref: ref, |
|
|
|
|
Labels: lset, |
|
|
|
|
}) |
|
|
|
|
@ -1019,16 +1037,16 @@ func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) { |
|
|
|
|
return nil, dec.err() |
|
|
|
|
} |
|
|
|
|
if len(dec.b) > 0 { |
|
|
|
|
return series, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) |
|
|
|
|
return r.seriesBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) |
|
|
|
|
} |
|
|
|
|
return series, nil |
|
|
|
|
return r.seriesBuf, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) { |
|
|
|
|
if len(b) == 0 { |
|
|
|
|
return nil, nil |
|
|
|
|
} |
|
|
|
|
samples := []RefSample{} |
|
|
|
|
r.sampleBuf = r.sampleBuf[:0] |
|
|
|
|
dec := decbuf{b: b} |
|
|
|
|
|
|
|
|
|
var ( |
|
|
|
|
@ -1041,7 +1059,7 @@ func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) { |
|
|
|
|
dtime := dec.varint64() |
|
|
|
|
val := dec.be64() |
|
|
|
|
|
|
|
|
|
samples = append(samples, RefSample{ |
|
|
|
|
r.sampleBuf = append(r.sampleBuf, RefSample{ |
|
|
|
|
Ref: uint64(int64(baseRef) + dref), |
|
|
|
|
T: baseTime + dtime, |
|
|
|
|
V: math.Float64frombits(val), |
|
|
|
|
@ -1049,20 +1067,20 @@ func (r *walReader) decodeSamples(flag byte, b []byte) ([]RefSample, error) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if dec.err() != nil { |
|
|
|
|
return nil, errors.Wrapf(dec.err(), "decode error after %d samples", len(samples)) |
|
|
|
|
return nil, errors.Wrapf(dec.err(), "decode error after %d samples", len(r.sampleBuf)) |
|
|
|
|
} |
|
|
|
|
if len(dec.b) > 0 { |
|
|
|
|
return samples, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) |
|
|
|
|
return r.sampleBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) |
|
|
|
|
} |
|
|
|
|
return samples, nil |
|
|
|
|
return r.sampleBuf, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) { |
|
|
|
|
dec := &decbuf{b: b} |
|
|
|
|
var stones []Stone |
|
|
|
|
r.tombstoneBuf = r.tombstoneBuf[:0] |
|
|
|
|
|
|
|
|
|
for dec.len() > 0 && dec.err() == nil { |
|
|
|
|
stones = append(stones, Stone{ |
|
|
|
|
r.tombstoneBuf = append(r.tombstoneBuf, Stone{ |
|
|
|
|
ref: dec.be64(), |
|
|
|
|
intervals: Intervals{ |
|
|
|
|
{Mint: dec.varint64(), Maxt: dec.varint64()}, |
|
|
|
|
@ -1073,7 +1091,7 @@ func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) { |
|
|
|
|
return nil, dec.err() |
|
|
|
|
} |
|
|
|
|
if len(dec.b) > 0 { |
|
|
|
|
return stones, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) |
|
|
|
|
return r.tombstoneBuf, errors.Errorf("unexpected %d bytes left in entry", len(dec.b)) |
|
|
|
|
} |
|
|
|
|
return stones, nil |
|
|
|
|
return r.tombstoneBuf, nil |
|
|
|
|
} |
|
|
|
|
|