|
|
|
|
@ -50,7 +50,7 @@ const ( |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// WAL is a write ahead log for series data. It can only be written to.
|
|
|
|
|
// Use WALReader to read back from a write ahead log.
|
|
|
|
|
// Use walReader to read back from a write ahead log.
|
|
|
|
|
type WAL struct { |
|
|
|
|
mtx sync.Mutex |
|
|
|
|
|
|
|
|
|
@ -69,6 +69,13 @@ type WAL struct { |
|
|
|
|
donec chan struct{} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// RefSample is a timestamp/value pair associated with a reference to a series.
|
|
|
|
|
type RefSample struct { |
|
|
|
|
Ref uint64 |
|
|
|
|
T int64 |
|
|
|
|
V float64 |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const ( |
|
|
|
|
walDirName = "wal" |
|
|
|
|
walSegmentSizeBytes = 256 * 1024 * 1024 // 256 MB
|
|
|
|
|
@ -119,12 +126,12 @@ func OpenWAL(dir string, logger log.Logger, flushInterval time.Duration) (*WAL, |
|
|
|
|
|
|
|
|
|
// Reader returns a new reader over the the write ahead log data.
|
|
|
|
|
// It must be completely consumed before writing to the WAL.
|
|
|
|
|
func (w *WAL) Reader() *WALReader { |
|
|
|
|
return NewWALReader(w.logger, w) |
|
|
|
|
func (w *WAL) Reader() WALReader { |
|
|
|
|
return newWALReader(w, w.logger) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Log writes a batch of new series labels and samples to the log.
|
|
|
|
|
func (w *WAL) Log(series []labels.Labels, samples []refdSample) error { |
|
|
|
|
func (w *WAL) Log(series []labels.Labels, samples []RefSample) error { |
|
|
|
|
if err := w.encodeSeries(series); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
@ -395,7 +402,7 @@ func (w *WAL) encodeSeries(series []labels.Labels) error { |
|
|
|
|
return w.entry(WALEntrySeries, walSeriesSimple, buf) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (w *WAL) encodeSamples(samples []refdSample) error { |
|
|
|
|
func (w *WAL) encodeSamples(samples []RefSample) error { |
|
|
|
|
if len(samples) == 0 { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
@ -409,27 +416,27 @@ func (w *WAL) encodeSamples(samples []refdSample) error { |
|
|
|
|
// TODO(fabxc): optimize for all samples having the same timestamp.
|
|
|
|
|
first := samples[0] |
|
|
|
|
|
|
|
|
|
binary.BigEndian.PutUint64(b, first.ref) |
|
|
|
|
binary.BigEndian.PutUint64(b, first.Ref) |
|
|
|
|
buf = append(buf, b[:8]...) |
|
|
|
|
binary.BigEndian.PutUint64(b, uint64(first.t)) |
|
|
|
|
binary.BigEndian.PutUint64(b, uint64(first.T)) |
|
|
|
|
buf = append(buf, b[:8]...) |
|
|
|
|
|
|
|
|
|
for _, s := range samples { |
|
|
|
|
n := binary.PutVarint(b, int64(s.ref)-int64(first.ref)) |
|
|
|
|
n := binary.PutVarint(b, int64(s.Ref)-int64(first.Ref)) |
|
|
|
|
buf = append(buf, b[:n]...) |
|
|
|
|
|
|
|
|
|
n = binary.PutVarint(b, s.t-first.t) |
|
|
|
|
n = binary.PutVarint(b, s.T-first.T) |
|
|
|
|
buf = append(buf, b[:n]...) |
|
|
|
|
|
|
|
|
|
binary.BigEndian.PutUint64(b, math.Float64bits(s.v)) |
|
|
|
|
binary.BigEndian.PutUint64(b, math.Float64bits(s.V)) |
|
|
|
|
buf = append(buf, b[:8]...) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return w.entry(WALEntrySamples, walSamplesSimple, buf) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// WALReader decodes and emits write ahead log entries.
|
|
|
|
|
type WALReader struct { |
|
|
|
|
// walReader decodes and emits write ahead log entries.
|
|
|
|
|
type walReader struct { |
|
|
|
|
logger log.Logger |
|
|
|
|
|
|
|
|
|
wal *WAL |
|
|
|
|
@ -439,37 +446,35 @@ type WALReader struct { |
|
|
|
|
|
|
|
|
|
err error |
|
|
|
|
labels []labels.Labels |
|
|
|
|
samples []refdSample |
|
|
|
|
samples []RefSample |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// NewWALReader returns a new WALReader over the sequence of the given ReadClosers.
|
|
|
|
|
func NewWALReader(logger log.Logger, w *WAL) *WALReader { |
|
|
|
|
if logger == nil { |
|
|
|
|
logger = log.NewNopLogger() |
|
|
|
|
func newWALReader(w *WAL, l log.Logger) *walReader { |
|
|
|
|
if l == nil { |
|
|
|
|
l = log.NewNopLogger() |
|
|
|
|
} |
|
|
|
|
r := &WALReader{ |
|
|
|
|
logger: logger, |
|
|
|
|
return &walReader{ |
|
|
|
|
logger: l, |
|
|
|
|
wal: w, |
|
|
|
|
buf: make([]byte, 0, 128*4096), |
|
|
|
|
crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), |
|
|
|
|
} |
|
|
|
|
return r |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// At returns the last decoded entry of labels or samples.
|
|
|
|
|
// The returned slices are only valid until the next call to Next(). Their elements
|
|
|
|
|
// have to be copied to preserve them.
|
|
|
|
|
func (r *WALReader) At() ([]labels.Labels, []refdSample) { |
|
|
|
|
func (r *walReader) At() ([]labels.Labels, []RefSample) { |
|
|
|
|
return r.labels, r.samples |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Err returns the last error the reader encountered.
|
|
|
|
|
func (r *WALReader) Err() error { |
|
|
|
|
func (r *walReader) Err() error { |
|
|
|
|
return r.err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// nextEntry retrieves the next entry. It is also used as a testing hook.
|
|
|
|
|
func (r *WALReader) nextEntry() (WALEntryType, byte, []byte, error) { |
|
|
|
|
func (r *walReader) nextEntry() (WALEntryType, byte, []byte, error) { |
|
|
|
|
if r.cur >= len(r.wal.files) { |
|
|
|
|
return 0, 0, nil, io.EOF |
|
|
|
|
} |
|
|
|
|
@ -492,7 +497,7 @@ func (r *WALReader) nextEntry() (WALEntryType, byte, []byte, error) { |
|
|
|
|
|
|
|
|
|
// Next returns decodes the next entry pair and returns true
|
|
|
|
|
// if it was succesful.
|
|
|
|
|
func (r *WALReader) Next() bool { |
|
|
|
|
func (r *walReader) Next() bool { |
|
|
|
|
r.labels = r.labels[:0] |
|
|
|
|
r.samples = r.samples[:0] |
|
|
|
|
|
|
|
|
|
@ -549,12 +554,12 @@ func (r *WALReader) Next() bool { |
|
|
|
|
return r.err == nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *WALReader) current() *os.File { |
|
|
|
|
func (r *walReader) current() *os.File { |
|
|
|
|
return r.wal.files[r.cur] |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// truncate the WAL after the last valid entry.
|
|
|
|
|
func (r *WALReader) truncate(lastOffset int64) error { |
|
|
|
|
func (r *walReader) truncate(lastOffset int64) error { |
|
|
|
|
r.logger.Log("msg", "WAL corruption detected; truncating", |
|
|
|
|
"err", r.err, "file", r.current().Name(), "pos", lastOffset) |
|
|
|
|
|
|
|
|
|
@ -582,7 +587,7 @@ func walCorruptionErrf(s string, args ...interface{}) error { |
|
|
|
|
return walCorruptionErr(errors.Errorf(s, args...)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *WALReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { |
|
|
|
|
func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { |
|
|
|
|
r.crc32.Reset() |
|
|
|
|
tr := io.TeeReader(cr, r.crc32) |
|
|
|
|
|
|
|
|
|
@ -629,7 +634,7 @@ func (r *WALReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { |
|
|
|
|
return etype, flag, buf, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *WALReader) decodeSeries(flag byte, b []byte) error { |
|
|
|
|
func (r *walReader) decodeSeries(flag byte, b []byte) error { |
|
|
|
|
for len(b) > 0 { |
|
|
|
|
l, n := binary.Uvarint(b) |
|
|
|
|
if n < 1 { |
|
|
|
|
@ -659,7 +664,7 @@ func (r *WALReader) decodeSeries(flag byte, b []byte) error { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *WALReader) decodeSamples(flag byte, b []byte) error { |
|
|
|
|
func (r *walReader) decodeSamples(flag byte, b []byte) error { |
|
|
|
|
if len(b) < 16 { |
|
|
|
|
return errors.Wrap(errInvalidSize, "header length") |
|
|
|
|
} |
|
|
|
|
@ -670,7 +675,7 @@ func (r *WALReader) decodeSamples(flag byte, b []byte) error { |
|
|
|
|
b = b[16:] |
|
|
|
|
|
|
|
|
|
for len(b) > 0 { |
|
|
|
|
var smpl refdSample |
|
|
|
|
var smpl RefSample |
|
|
|
|
|
|
|
|
|
dref, n := binary.Varint(b) |
|
|
|
|
if n < 1 { |
|
|
|
|
@ -678,19 +683,19 @@ func (r *WALReader) decodeSamples(flag byte, b []byte) error { |
|
|
|
|
} |
|
|
|
|
b = b[n:] |
|
|
|
|
|
|
|
|
|
smpl.ref = uint64(int64(baseRef) + dref) |
|
|
|
|
smpl.Ref = uint64(int64(baseRef) + dref) |
|
|
|
|
|
|
|
|
|
dtime, n := binary.Varint(b) |
|
|
|
|
if n < 1 { |
|
|
|
|
return errors.Wrap(errInvalidSize, "sample timestamp delta") |
|
|
|
|
} |
|
|
|
|
b = b[n:] |
|
|
|
|
smpl.t = baseTime + dtime |
|
|
|
|
smpl.T = baseTime + dtime |
|
|
|
|
|
|
|
|
|
if len(b) < 8 { |
|
|
|
|
return errors.Wrapf(errInvalidSize, "sample value bits %d", len(b)) |
|
|
|
|
} |
|
|
|
|
smpl.v = float64(math.Float64frombits(binary.BigEndian.Uint64(b))) |
|
|
|
|
smpl.V = float64(math.Float64frombits(binary.BigEndian.Uint64(b))) |
|
|
|
|
b = b[8:] |
|
|
|
|
|
|
|
|
|
r.samples = append(r.samples, smpl) |
|
|
|
|
|