|
|
|
|
@ -15,6 +15,7 @@ |
|
|
|
|
package wlog |
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
"errors" |
|
|
|
|
"fmt" |
|
|
|
|
"io" |
|
|
|
|
"math" |
|
|
|
|
@ -25,7 +26,6 @@ import ( |
|
|
|
|
|
|
|
|
|
"github.com/go-kit/log" |
|
|
|
|
"github.com/go-kit/log/level" |
|
|
|
|
"github.com/pkg/errors" |
|
|
|
|
"golang.org/x/exp/slices" |
|
|
|
|
|
|
|
|
|
"github.com/prometheus/prometheus/tsdb/chunks" |
|
|
|
|
@ -102,8 +102,8 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head |
|
|
|
|
{ |
|
|
|
|
var sgmRange []SegmentRange |
|
|
|
|
dir, idx, err := LastCheckpoint(w.Dir()) |
|
|
|
|
if err != nil && err != record.ErrNotFound { |
|
|
|
|
return nil, errors.Wrap(err, "find last checkpoint") |
|
|
|
|
if err != nil && !errors.Is(err, record.ErrNotFound) { |
|
|
|
|
return nil, fmt.Errorf("find last checkpoint: %w", err) |
|
|
|
|
} |
|
|
|
|
last := idx + 1 |
|
|
|
|
if err == nil { |
|
|
|
|
@ -119,7 +119,7 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head |
|
|
|
|
sgmRange = append(sgmRange, SegmentRange{Dir: w.Dir(), First: from, Last: to}) |
|
|
|
|
sgmReader, err = NewSegmentsRangeReader(sgmRange...) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, errors.Wrap(err, "create segment reader") |
|
|
|
|
return nil, fmt.Errorf("create segment reader: %w", err) |
|
|
|
|
} |
|
|
|
|
defer sgmReader.Close() |
|
|
|
|
} |
|
|
|
|
@ -128,15 +128,15 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head |
|
|
|
|
cpdirtmp := cpdir + ".tmp" |
|
|
|
|
|
|
|
|
|
if err := os.RemoveAll(cpdirtmp); err != nil { |
|
|
|
|
return nil, errors.Wrap(err, "remove previous temporary checkpoint dir") |
|
|
|
|
return nil, fmt.Errorf("remove previous temporary checkpoint dir: %w", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if err := os.MkdirAll(cpdirtmp, 0o777); err != nil { |
|
|
|
|
return nil, errors.Wrap(err, "create checkpoint dir") |
|
|
|
|
return nil, fmt.Errorf("create checkpoint dir: %w", err) |
|
|
|
|
} |
|
|
|
|
cp, err := New(nil, nil, cpdirtmp, w.CompressionType()) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, errors.Wrap(err, "open checkpoint") |
|
|
|
|
return nil, fmt.Errorf("open checkpoint: %w", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Ensures that an early return caused by an error doesn't leave any tmp files.
|
|
|
|
|
@ -174,7 +174,7 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head |
|
|
|
|
case record.Series: |
|
|
|
|
series, err = dec.Series(rec, series) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, errors.Wrap(err, "decode series") |
|
|
|
|
return nil, fmt.Errorf("decode series: %w", err) |
|
|
|
|
} |
|
|
|
|
// Drop irrelevant series in place.
|
|
|
|
|
repl := series[:0] |
|
|
|
|
@ -192,7 +192,7 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head |
|
|
|
|
case record.Samples: |
|
|
|
|
samples, err = dec.Samples(rec, samples) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, errors.Wrap(err, "decode samples") |
|
|
|
|
return nil, fmt.Errorf("decode samples: %w", err) |
|
|
|
|
} |
|
|
|
|
// Drop irrelevant samples in place.
|
|
|
|
|
repl := samples[:0] |
|
|
|
|
@ -210,7 +210,7 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head |
|
|
|
|
case record.HistogramSamples: |
|
|
|
|
histogramSamples, err = dec.HistogramSamples(rec, histogramSamples) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, errors.Wrap(err, "decode histogram samples") |
|
|
|
|
return nil, fmt.Errorf("decode histogram samples: %w", err) |
|
|
|
|
} |
|
|
|
|
// Drop irrelevant histogramSamples in place.
|
|
|
|
|
repl := histogramSamples[:0] |
|
|
|
|
@ -228,7 +228,7 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head |
|
|
|
|
case record.Tombstones: |
|
|
|
|
tstones, err = dec.Tombstones(rec, tstones) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, errors.Wrap(err, "decode deletes") |
|
|
|
|
return nil, fmt.Errorf("decode deletes: %w", err) |
|
|
|
|
} |
|
|
|
|
// Drop irrelevant tombstones in place.
|
|
|
|
|
repl := tstones[:0] |
|
|
|
|
@ -249,7 +249,7 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head |
|
|
|
|
case record.Exemplars: |
|
|
|
|
exemplars, err = dec.Exemplars(rec, exemplars) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, errors.Wrap(err, "decode exemplars") |
|
|
|
|
return nil, fmt.Errorf("decode exemplars: %w", err) |
|
|
|
|
} |
|
|
|
|
// Drop irrelevant exemplars in place.
|
|
|
|
|
repl := exemplars[:0] |
|
|
|
|
@ -266,7 +266,7 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head |
|
|
|
|
case record.Metadata: |
|
|
|
|
metadata, err := dec.Metadata(rec, metadata) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, errors.Wrap(err, "decode metadata") |
|
|
|
|
return nil, fmt.Errorf("decode metadata: %w", err) |
|
|
|
|
} |
|
|
|
|
// Only keep reference to the latest found metadata for each refID.
|
|
|
|
|
repl := 0 |
|
|
|
|
@ -292,7 +292,7 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head |
|
|
|
|
// Flush records in 1 MB increments.
|
|
|
|
|
if len(buf) > 1*1024*1024 { |
|
|
|
|
if err := cp.Log(recs...); err != nil { |
|
|
|
|
return nil, errors.Wrap(err, "flush records") |
|
|
|
|
return nil, fmt.Errorf("flush records: %w", err) |
|
|
|
|
} |
|
|
|
|
buf, recs = buf[:0], recs[:0] |
|
|
|
|
} |
|
|
|
|
@ -300,12 +300,12 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head |
|
|
|
|
// If we hit any corruption during checkpointing, repairing is not an option.
|
|
|
|
|
// The head won't know which series records are lost.
|
|
|
|
|
if r.Err() != nil { |
|
|
|
|
return nil, errors.Wrap(r.Err(), "read segments") |
|
|
|
|
return nil, fmt.Errorf("read segments: %w", r.Err()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Flush remaining records.
|
|
|
|
|
if err := cp.Log(recs...); err != nil { |
|
|
|
|
return nil, errors.Wrap(err, "flush records") |
|
|
|
|
return nil, fmt.Errorf("flush records: %w", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Flush latest metadata records for each series.
|
|
|
|
|
@ -315,29 +315,29 @@ func Checkpoint(logger log.Logger, w *WL, from, to int, keep func(id chunks.Head |
|
|
|
|
latestMetadata = append(latestMetadata, m) |
|
|
|
|
} |
|
|
|
|
if err := cp.Log(enc.Metadata(latestMetadata, buf[:0])); err != nil { |
|
|
|
|
return nil, errors.Wrap(err, "flush metadata records") |
|
|
|
|
return nil, fmt.Errorf("flush metadata records: %w", err) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if err := cp.Close(); err != nil { |
|
|
|
|
return nil, errors.Wrap(err, "close checkpoint") |
|
|
|
|
return nil, fmt.Errorf("close checkpoint: %w", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Sync temporary directory before rename.
|
|
|
|
|
df, err := fileutil.OpenDir(cpdirtmp) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, errors.Wrap(err, "open temporary checkpoint directory") |
|
|
|
|
return nil, fmt.Errorf("open temporary checkpoint directory: %w", err) |
|
|
|
|
} |
|
|
|
|
if err := df.Sync(); err != nil { |
|
|
|
|
df.Close() |
|
|
|
|
return nil, errors.Wrap(err, "sync temporary checkpoint directory") |
|
|
|
|
return nil, fmt.Errorf("sync temporary checkpoint directory: %w", err) |
|
|
|
|
} |
|
|
|
|
if err = df.Close(); err != nil { |
|
|
|
|
return nil, errors.Wrap(err, "close temporary checkpoint directory") |
|
|
|
|
return nil, fmt.Errorf("close temporary checkpoint directory: %w", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if err := fileutil.Replace(cpdirtmp, cpdir); err != nil { |
|
|
|
|
return nil, errors.Wrap(err, "rename checkpoint directory") |
|
|
|
|
return nil, fmt.Errorf("rename checkpoint directory: %w", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return stats, nil |
|
|
|
|
@ -364,7 +364,7 @@ func listCheckpoints(dir string) (refs []checkpointRef, err error) { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
if !fi.IsDir() { |
|
|
|
|
return nil, errors.Errorf("checkpoint %s is not a directory", fi.Name()) |
|
|
|
|
return nil, fmt.Errorf("checkpoint %s is not a directory", fi.Name()) |
|
|
|
|
} |
|
|
|
|
idx, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):]) |
|
|
|
|
if err != nil { |
|
|
|
|
|