|
|
|
|
@ -190,6 +190,7 @@ type SegmentWAL struct { |
|
|
|
|
|
|
|
|
|
stopc chan struct{} |
|
|
|
|
donec chan struct{} |
|
|
|
|
actorc chan func() error // sequentialized background operations
|
|
|
|
|
buffers sync.Pool |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -213,6 +214,7 @@ func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration, |
|
|
|
|
flushInterval: flushInterval, |
|
|
|
|
donec: make(chan struct{}), |
|
|
|
|
stopc: make(chan struct{}), |
|
|
|
|
actorc: make(chan func() error, 1), |
|
|
|
|
segmentSize: walSegmentSizeBytes, |
|
|
|
|
crc32: newCRC32(), |
|
|
|
|
} |
|
|
|
|
@ -384,7 +386,7 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error { |
|
|
|
|
w.putBuffer(buf) |
|
|
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
return errors.Wrap(err, "write to compaction segment") |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if r.Err() != nil { |
|
|
|
|
@ -401,14 +403,15 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error { |
|
|
|
|
csf.Sync() |
|
|
|
|
csf.Close() |
|
|
|
|
|
|
|
|
|
candidates[0].Close() // need close before remove on platform windows
|
|
|
|
|
if err := renameFile(csf.Name(), candidates[0].Name()); err != nil { |
|
|
|
|
return err |
|
|
|
|
return errors.Wrap(err, "rename compaction segment") |
|
|
|
|
} |
|
|
|
|
for _, f := range candidates[1:] { |
|
|
|
|
f.Close() // need close before remove on platform windows
|
|
|
|
|
if err := os.RemoveAll(f.Name()); err != nil { |
|
|
|
|
return errors.Wrap(err, "delete WAL segment file") |
|
|
|
|
} |
|
|
|
|
f.Close() |
|
|
|
|
} |
|
|
|
|
if err := w.dirFile.Sync(); err != nil { |
|
|
|
|
return err |
|
|
|
|
@ -522,6 +525,15 @@ func (w *SegmentWAL) openSegmentFile(name string) (*os.File, error) { |
|
|
|
|
} |
|
|
|
|
metab := make([]byte, 8) |
|
|
|
|
|
|
|
|
|
// If there is an error, we need close f for platform windows before gc.
|
|
|
|
|
// Otherwise, file op may fail.
|
|
|
|
|
hasError := true |
|
|
|
|
defer func() { |
|
|
|
|
if hasError { |
|
|
|
|
f.Close() |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
if n, err := f.Read(metab); err != nil { |
|
|
|
|
return nil, errors.Wrapf(err, "validate meta %q", f.Name()) |
|
|
|
|
} else if n != 8 { |
|
|
|
|
@ -534,6 +546,7 @@ func (w *SegmentWAL) openSegmentFile(name string) (*os.File, error) { |
|
|
|
|
if metab[4] != WALFormatDefault { |
|
|
|
|
return nil, errors.Errorf("unknown WAL segment format %d in %q", metab[4], f.Name()) |
|
|
|
|
} |
|
|
|
|
hasError = false |
|
|
|
|
return f, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -569,18 +582,21 @@ func (w *SegmentWAL) cut() error { |
|
|
|
|
// Finish last segment asynchronously to not block the WAL moving along
|
|
|
|
|
// in the new segment.
|
|
|
|
|
go func() { |
|
|
|
|
off, err := hf.Seek(0, os.SEEK_CUR) |
|
|
|
|
if err != nil { |
|
|
|
|
level.Error(w.logger).Log("msg", "finish old segment", "segment", hf.Name(), "err", err) |
|
|
|
|
} |
|
|
|
|
if err := hf.Truncate(off); err != nil { |
|
|
|
|
level.Error(w.logger).Log("msg", "finish old segment", "segment", hf.Name(), "err", err) |
|
|
|
|
} |
|
|
|
|
if err := hf.Sync(); err != nil { |
|
|
|
|
level.Error(w.logger).Log("msg", "finish old segment", "segment", hf.Name(), "err", err) |
|
|
|
|
} |
|
|
|
|
if err := hf.Close(); err != nil { |
|
|
|
|
level.Error(w.logger).Log("msg", "finish old segment", "segment", hf.Name(), "err", err) |
|
|
|
|
w.actorc <- func() error { |
|
|
|
|
off, err := hf.Seek(0, os.SEEK_CUR) |
|
|
|
|
if err != nil { |
|
|
|
|
return errors.Wrapf(err, "finish old segment %s", hf.Name()) |
|
|
|
|
} |
|
|
|
|
if err := hf.Truncate(off); err != nil { |
|
|
|
|
return errors.Wrapf(err, "finish old segment %s", hf.Name()) |
|
|
|
|
} |
|
|
|
|
if err := hf.Sync(); err != nil { |
|
|
|
|
return errors.Wrapf(err, "finish old segment %s", hf.Name()) |
|
|
|
|
} |
|
|
|
|
if err := hf.Close(); err != nil { |
|
|
|
|
return errors.Wrapf(err, "finish old segment %s", hf.Name()) |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
} |
|
|
|
|
@ -595,8 +611,8 @@ func (w *SegmentWAL) cut() error { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
go func() { |
|
|
|
|
if err = w.dirFile.Sync(); err != nil { |
|
|
|
|
level.Error(w.logger).Log("msg", "sync WAL directory", "err", err) |
|
|
|
|
w.actorc <- func() error { |
|
|
|
|
return errors.Wrap(w.dirFile.Sync(), "sync WAL directory") |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
@ -675,9 +691,23 @@ func (w *SegmentWAL) run(interval time.Duration) { |
|
|
|
|
defer close(w.donec) |
|
|
|
|
|
|
|
|
|
for { |
|
|
|
|
// Processing all enqueued operations has precedence over shutdown and
|
|
|
|
|
// background syncs.
|
|
|
|
|
select { |
|
|
|
|
case f := <-w.actorc: |
|
|
|
|
if err := f(); err != nil { |
|
|
|
|
level.Error(w.logger).Log("msg", "operation failed", "err", err) |
|
|
|
|
} |
|
|
|
|
continue |
|
|
|
|
default: |
|
|
|
|
} |
|
|
|
|
select { |
|
|
|
|
case <-w.stopc: |
|
|
|
|
return |
|
|
|
|
case f := <-w.actorc: |
|
|
|
|
if err := f(); err != nil { |
|
|
|
|
level.Error(w.logger).Log("msg", "operation failed", "err", err) |
|
|
|
|
} |
|
|
|
|
case <-tick: |
|
|
|
|
if err := w.Sync(); err != nil { |
|
|
|
|
level.Error(w.logger).Log("msg", "sync failed", "err", err) |
|
|
|
|
@ -702,7 +732,8 @@ func (w *SegmentWAL) Close() error { |
|
|
|
|
if hf := w.head(); hf != nil { |
|
|
|
|
return errors.Wrapf(hf.Close(), "closing WAL head %s", hf.Name()) |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
|
|
|
|
|
return w.dirFile.Close() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const ( |
|
|
|
|
|