|
|
|
|
@ -22,6 +22,7 @@ import ( |
|
|
|
|
"os" |
|
|
|
|
"path/filepath" |
|
|
|
|
"runtime" |
|
|
|
|
"sort" |
|
|
|
|
"strconv" |
|
|
|
|
"strings" |
|
|
|
|
"sync" |
|
|
|
|
@ -33,6 +34,7 @@ import ( |
|
|
|
|
"github.com/coreos/etcd/pkg/fileutil" |
|
|
|
|
"github.com/go-kit/kit/log" |
|
|
|
|
"github.com/nightlyone/lockfile" |
|
|
|
|
"github.com/oklog/ulid" |
|
|
|
|
"github.com/pkg/errors" |
|
|
|
|
"github.com/prometheus/client_golang/prometheus" |
|
|
|
|
"github.com/prometheus/tsdb/labels" |
|
|
|
|
@ -45,7 +47,6 @@ var DefaultOptions = &Options{ |
|
|
|
|
RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds
|
|
|
|
|
MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds
|
|
|
|
|
MaxBlockDuration: 24 * 60 * 60 * 1000, // 1 days in milliseconds
|
|
|
|
|
AppendableBlocks: 2, |
|
|
|
|
NoLockfile: false, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -64,13 +65,6 @@ type Options struct { |
|
|
|
|
// The maximum timestamp range of compacted blocks.
|
|
|
|
|
MaxBlockDuration uint64 |
|
|
|
|
|
|
|
|
|
// Number of head blocks that can be appended to.
|
|
|
|
|
// Should be two or higher to prevent write errors in general scenarios.
|
|
|
|
|
//
|
|
|
|
|
// After a new block is started for timestamp t0 or higher, appends with
|
|
|
|
|
// timestamps as early as t0 - (n-1) * MinBlockDuration are valid.
|
|
|
|
|
AppendableBlocks int |
|
|
|
|
|
|
|
|
|
// NoLockfile disables creation and consideration of a lock file.
|
|
|
|
|
NoLockfile bool |
|
|
|
|
} |
|
|
|
|
@ -86,11 +80,11 @@ type Appender interface { |
|
|
|
|
// Returned reference numbers are ephemeral and may be rejected in calls
|
|
|
|
|
// to AddFast() at any point. Adding the sample via Add() returns a new
|
|
|
|
|
// reference number.
|
|
|
|
|
Add(l labels.Labels, t int64, v float64) (uint64, error) |
|
|
|
|
Add(l labels.Labels, t int64, v float64) (string, error) |
|
|
|
|
|
|
|
|
|
// Add adds a sample pair for the referenced series. It is generally faster
|
|
|
|
|
// than adding a sample by providing its full label set.
|
|
|
|
|
AddFast(ref uint64, t int64, v float64) error |
|
|
|
|
AddFast(ref string, t int64, v float64) error |
|
|
|
|
|
|
|
|
|
// Commit submits the collected samples and purges the batch.
|
|
|
|
|
Commit() error |
|
|
|
|
@ -159,11 +153,6 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
absdir, err := filepath.Abs(dir) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if l == nil { |
|
|
|
|
l = log.NewLogfmtLogger(os.Stdout) |
|
|
|
|
l = log.With(l, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) |
|
|
|
|
@ -172,9 +161,6 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db |
|
|
|
|
if opts == nil { |
|
|
|
|
opts = DefaultOptions |
|
|
|
|
} |
|
|
|
|
if opts.AppendableBlocks < 1 { |
|
|
|
|
return nil, errors.Errorf("AppendableBlocks must be greater than 0") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
db = &DB{ |
|
|
|
|
dir: dir, |
|
|
|
|
@ -186,6 +172,10 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db |
|
|
|
|
stopc: make(chan struct{}), |
|
|
|
|
} |
|
|
|
|
if !opts.NoLockfile { |
|
|
|
|
absdir, err := filepath.Abs(dir) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
lockf, err := lockfile.New(filepath.Join(absdir, "lock")) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
@ -196,7 +186,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db |
|
|
|
|
db.lockf = &lockf |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
db.compactor = newCompactor(r, l, &compactorOptions{ |
|
|
|
|
db.compactor = newCompactor(dir, r, l, &compactorOptions{ |
|
|
|
|
maxBlockRange: opts.MaxBlockDuration, |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
@ -281,8 +271,8 @@ func (db *DB) compact() (changes bool, err error) { |
|
|
|
|
// returning the lock to not block Appenders.
|
|
|
|
|
// Selected blocks are semantically ensured to not be written to afterwards
|
|
|
|
|
// by appendable().
|
|
|
|
|
if len(db.heads) > db.opts.AppendableBlocks { |
|
|
|
|
for _, h := range db.heads[:len(db.heads)-db.opts.AppendableBlocks] { |
|
|
|
|
if len(db.heads) > 2 { |
|
|
|
|
for _, h := range db.heads[:len(db.heads)-2] { |
|
|
|
|
// Blocks that won't be appendable when instantiating a new appender
|
|
|
|
|
// might still have active appenders on them.
|
|
|
|
|
// Abort at the first one we encounter.
|
|
|
|
|
@ -302,7 +292,7 @@ func (db *DB) compact() (changes bool, err error) { |
|
|
|
|
default: |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if err = db.compactor.Write(h.Dir(), h); err != nil { |
|
|
|
|
if err = db.compactor.Write(h); err != nil { |
|
|
|
|
return changes, errors.Wrap(err, "persist head block") |
|
|
|
|
} |
|
|
|
|
changes = true |
|
|
|
|
@ -311,7 +301,7 @@ func (db *DB) compact() (changes bool, err error) { |
|
|
|
|
|
|
|
|
|
// Check for compactions of multiple blocks.
|
|
|
|
|
for { |
|
|
|
|
plans, err := db.compactor.Plan(db.dir) |
|
|
|
|
plans, err := db.compactor.Plan() |
|
|
|
|
if err != nil { |
|
|
|
|
return changes, errors.Wrap(err, "plan compaction") |
|
|
|
|
} |
|
|
|
|
@ -375,9 +365,9 @@ func retentionCutoff(dir string, mint int64) (bool, error) { |
|
|
|
|
return changes, fileutil.Fsync(df) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (db *DB) seqBlock(i int) (Block, bool) { |
|
|
|
|
func (db *DB) getBlock(id ulid.ULID) (Block, bool) { |
|
|
|
|
for _, b := range db.blocks { |
|
|
|
|
if b.Meta().Sequence == i { |
|
|
|
|
if b.Meta().ULID == id { |
|
|
|
|
return b, true |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
@ -399,10 +389,8 @@ func (db *DB) reloadBlocks() error { |
|
|
|
|
return errors.Wrap(err, "find blocks") |
|
|
|
|
} |
|
|
|
|
var ( |
|
|
|
|
metas []*BlockMeta |
|
|
|
|
blocks []Block |
|
|
|
|
heads []headBlock |
|
|
|
|
seqBlocks = make(map[int]Block, len(dirs)) |
|
|
|
|
blocks []Block |
|
|
|
|
exist = map[ulid.ULID]struct{}{} |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
for _, dir := range dirs { |
|
|
|
|
@ -410,47 +398,58 @@ func (db *DB) reloadBlocks() error { |
|
|
|
|
if err != nil { |
|
|
|
|
return errors.Wrapf(err, "read meta information %s", dir) |
|
|
|
|
} |
|
|
|
|
metas = append(metas, meta) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for i, meta := range metas { |
|
|
|
|
b, ok := db.seqBlock(meta.Sequence) |
|
|
|
|
|
|
|
|
|
if meta.Compaction.Generation == 0 { |
|
|
|
|
if !ok { |
|
|
|
|
b, err = db.openHeadBlock(dirs[i]) |
|
|
|
|
if err != nil { |
|
|
|
|
return errors.Wrapf(err, "load head at %s", dirs[i]) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if meta.ULID != b.Meta().ULID { |
|
|
|
|
return errors.Errorf("head block ULID changed unexpectedly") |
|
|
|
|
b, ok := db.getBlock(meta.ULID) |
|
|
|
|
if !ok { |
|
|
|
|
if meta.Compaction.Generation == 0 { |
|
|
|
|
b, err = db.openHeadBlock(dir) |
|
|
|
|
} else { |
|
|
|
|
b, err = newPersistedBlock(dir) |
|
|
|
|
} |
|
|
|
|
heads = append(heads, b.(headBlock)) |
|
|
|
|
} else { |
|
|
|
|
if !ok || meta.ULID != b.Meta().ULID { |
|
|
|
|
b, err = newPersistedBlock(dirs[i]) |
|
|
|
|
if err != nil { |
|
|
|
|
return errors.Wrapf(err, "open persisted block %s", dirs[i]) |
|
|
|
|
} |
|
|
|
|
if err != nil { |
|
|
|
|
return errors.Wrapf(err, "open block %s", dir) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
seqBlocks[meta.Sequence] = b |
|
|
|
|
blocks = append(blocks, b) |
|
|
|
|
exist[meta.ULID] = struct{}{} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Close all blocks that we no longer need. They are closed after returning all
|
|
|
|
|
// locks to avoid questionable locking order.
|
|
|
|
|
if err := validateBlockSequence(blocks); err != nil { |
|
|
|
|
return errors.Wrap(err, "invalid block sequence") |
|
|
|
|
} |
|
|
|
|
// Close all opened blocks that no longer exist after we returned all locks.
|
|
|
|
|
for _, b := range db.blocks { |
|
|
|
|
if nb, ok := seqBlocks[b.Meta().Sequence]; !ok || nb != b { |
|
|
|
|
if _, ok := exist[b.Meta().ULID]; !ok { |
|
|
|
|
cs = append(cs, b) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
db.blocks = blocks |
|
|
|
|
db.heads = heads |
|
|
|
|
db.heads = nil |
|
|
|
|
|
|
|
|
|
for _, b := range blocks { |
|
|
|
|
if b.Meta().Compaction.Generation == 0 { |
|
|
|
|
db.heads = append(db.heads, b.(*HeadBlock)) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func validateBlockSequence(bs []Block) error { |
|
|
|
|
if len(bs) == 0 { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
sort.Slice(bs, func(i, j int) bool { |
|
|
|
|
return bs[i].Meta().MinTime < bs[j].Meta().MinTime |
|
|
|
|
}) |
|
|
|
|
prev := bs[0] |
|
|
|
|
for _, b := range bs[1:] { |
|
|
|
|
if b.Meta().MinTime < prev.Meta().MaxTime { |
|
|
|
|
return errors.Errorf("block time ranges overlap", b.Meta().MinTime, prev.Meta().MaxTime) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -482,27 +481,7 @@ func (db *DB) Close() error { |
|
|
|
|
// Appender returns a new Appender on the database.
|
|
|
|
|
func (db *DB) Appender() Appender { |
|
|
|
|
db.mtx.RLock() |
|
|
|
|
a := &dbAppender{db: db} |
|
|
|
|
|
|
|
|
|
// XXX(fabxc): turn off creating initial appender as it will happen on-demand
|
|
|
|
|
// anyway. For now this, with combination of only having a single timestamp per batch,
|
|
|
|
|
// prevents opening more than one appender and hitting an unresolved deadlock (#11).
|
|
|
|
|
//
|
|
|
|
|
|
|
|
|
|
// Only instantiate appender after returning the headmtx to avoid
|
|
|
|
|
// questionable locking order.
|
|
|
|
|
db.headmtx.RLock() |
|
|
|
|
app := db.appendable() |
|
|
|
|
db.headmtx.RUnlock() |
|
|
|
|
|
|
|
|
|
for _, b := range app { |
|
|
|
|
a.heads = append(a.heads, &metaAppender{ |
|
|
|
|
meta: b.Meta(), |
|
|
|
|
app: b.Appender(), |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return a |
|
|
|
|
return &dbAppender{db: db} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type dbAppender struct { |
|
|
|
|
@ -517,34 +496,39 @@ type metaAppender struct { |
|
|
|
|
app Appender |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { |
|
|
|
|
h, err := a.appenderFor(t) |
|
|
|
|
func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (string, error) { |
|
|
|
|
h, err := a.appenderAt(t) |
|
|
|
|
if err != nil { |
|
|
|
|
return 0, err |
|
|
|
|
return "", err |
|
|
|
|
} |
|
|
|
|
ref, err := h.app.Add(lset, t, v) |
|
|
|
|
if err != nil { |
|
|
|
|
return 0, err |
|
|
|
|
return "", err |
|
|
|
|
} |
|
|
|
|
a.samples++ |
|
|
|
|
// Store last byte of sequence number in 3rd byte of reference.
|
|
|
|
|
return ref | (uint64(h.meta.Sequence&0xff) << 40), nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error { |
|
|
|
|
// Load the head last byte of the head sequence from the 3rd byte of the
|
|
|
|
|
// reference number.
|
|
|
|
|
gen := (ref << 16) >> 56 |
|
|
|
|
return string(append(h.meta.ULID[:], ref...)), nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
h, err := a.appenderFor(t) |
|
|
|
|
func (a *dbAppender) AddFast(ref string, t int64, v float64) error { |
|
|
|
|
if len(ref) < 16 { |
|
|
|
|
return errors.Wrap(ErrNotFound, "invalid ref length") |
|
|
|
|
} |
|
|
|
|
// The first 16 bytes a ref hold the ULID of the head block.
|
|
|
|
|
h, err := a.appenderAt(t) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
// If the last byte of the sequence does not add up, the reference is not valid.
|
|
|
|
|
if uint64(h.meta.Sequence&0xff) != gen { |
|
|
|
|
// Validate the ref points to the same block we got for t.
|
|
|
|
|
if string(h.meta.ULID[:]) != ref[:16] { |
|
|
|
|
return ErrNotFound |
|
|
|
|
} |
|
|
|
|
if err := h.app.AddFast(ref, t, v); err != nil { |
|
|
|
|
if err := h.app.AddFast(ref[16:], t, v); err != nil { |
|
|
|
|
// The block the ref points to might fit the given timestamp.
|
|
|
|
|
// We mask the error to stick with our contract.
|
|
|
|
|
if errors.Cause(err) == ErrOutOfBounds { |
|
|
|
|
err = ErrNotFound |
|
|
|
|
} |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -554,85 +538,84 @@ func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error { |
|
|
|
|
|
|
|
|
|
// appenderFor gets the appender for the head containing timestamp t.
|
|
|
|
|
// If the head block doesn't exist yet, it gets created.
|
|
|
|
|
func (a *dbAppender) appenderFor(t int64) (*metaAppender, error) { |
|
|
|
|
// If there's no fitting head block for t, ensure it gets created.
|
|
|
|
|
if len(a.heads) == 0 || t >= a.heads[len(a.heads)-1].meta.MaxTime { |
|
|
|
|
a.db.headmtx.Lock() |
|
|
|
|
|
|
|
|
|
var newHeads []headBlock |
|
|
|
|
|
|
|
|
|
if err := a.db.ensureHead(t); err != nil { |
|
|
|
|
a.db.headmtx.Unlock() |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
if len(a.heads) == 0 { |
|
|
|
|
newHeads = append(newHeads, a.db.appendable()...) |
|
|
|
|
} else { |
|
|
|
|
maxSeq := a.heads[len(a.heads)-1].meta.Sequence |
|
|
|
|
for _, b := range a.db.appendable() { |
|
|
|
|
if b.Meta().Sequence > maxSeq { |
|
|
|
|
newHeads = append(newHeads, b) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
func (a *dbAppender) appenderAt(t int64) (*metaAppender, error) { |
|
|
|
|
for _, h := range a.heads { |
|
|
|
|
if intervalContains(h.meta.MinTime, h.meta.MaxTime-1, t) { |
|
|
|
|
return h, nil |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// Currently opened appenders do not cover t. Ensure the head block is
|
|
|
|
|
// created and add missing appenders.
|
|
|
|
|
a.db.headmtx.Lock() |
|
|
|
|
|
|
|
|
|
if err := a.db.ensureHead(t); err != nil { |
|
|
|
|
a.db.headmtx.Unlock() |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// XXX(fabxc): temporary workaround. See comment on instantiating DB.Appender.
|
|
|
|
|
// for _, b := range newHeads {
|
|
|
|
|
// // Only get appender for the block with the specific timestamp.
|
|
|
|
|
// if t >= b.Meta().MaxTime {
|
|
|
|
|
// continue
|
|
|
|
|
// }
|
|
|
|
|
// a.heads = append(a.heads, &metaAppender{
|
|
|
|
|
// app: b.Appender(),
|
|
|
|
|
// meta: b.Meta(),
|
|
|
|
|
// })
|
|
|
|
|
// break
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
// Instantiate appenders after returning headmtx to avoid questionable
|
|
|
|
|
// locking order.
|
|
|
|
|
for _, b := range newHeads { |
|
|
|
|
a.heads = append(a.heads, &metaAppender{ |
|
|
|
|
app: b.Appender(), |
|
|
|
|
meta: b.Meta(), |
|
|
|
|
}) |
|
|
|
|
var hb headBlock |
|
|
|
|
for _, h := range a.db.appendable() { |
|
|
|
|
m := h.Meta() |
|
|
|
|
|
|
|
|
|
if intervalContains(m.MinTime, m.MaxTime-1, t) { |
|
|
|
|
hb = h |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
for i := len(a.heads) - 1; i >= 0; i-- { |
|
|
|
|
if h := a.heads[i]; t >= h.meta.MinTime { |
|
|
|
|
return h, nil |
|
|
|
|
} |
|
|
|
|
a.db.headmtx.Unlock() |
|
|
|
|
|
|
|
|
|
if hb == nil { |
|
|
|
|
return nil, ErrOutOfBounds |
|
|
|
|
} |
|
|
|
|
// Instantiate appender after returning headmtx!
|
|
|
|
|
app := &metaAppender{ |
|
|
|
|
meta: hb.Meta(), |
|
|
|
|
app: hb.Appender(), |
|
|
|
|
} |
|
|
|
|
a.heads = append(a.heads, app) |
|
|
|
|
|
|
|
|
|
return app, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return nil, ErrNotFound |
|
|
|
|
func rangeForTimestamp(t int64, width int64) (mint, maxt int64) { |
|
|
|
|
mint = (t / width) * width |
|
|
|
|
return mint, mint + width |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// ensureHead makes sure that there is a head block for the timestamp t if
|
|
|
|
|
// it is within or after the currently appendable window.
|
|
|
|
|
func (db *DB) ensureHead(t int64) error { |
|
|
|
|
// Initial case for a new database: we must create the first
|
|
|
|
|
// AppendableBlocks-1 front padding heads.
|
|
|
|
|
if len(db.heads) == 0 { |
|
|
|
|
for i := int64(db.opts.AppendableBlocks - 1); i >= 0; i-- { |
|
|
|
|
if _, err := db.cut(t - i*int64(db.opts.MinBlockDuration)); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
var ( |
|
|
|
|
mint, maxt = rangeForTimestamp(t, int64(db.opts.MinBlockDuration)) |
|
|
|
|
addBuffer = len(db.blocks) == 0 |
|
|
|
|
last BlockMeta |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
for { |
|
|
|
|
h := db.heads[len(db.heads)-1] |
|
|
|
|
m := h.Meta() |
|
|
|
|
// If t doesn't exceed the range of heads blocks, there's nothing to do.
|
|
|
|
|
if t < m.MaxTime { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
if _, err := db.cut(m.MaxTime); err != nil { |
|
|
|
|
if !addBuffer { |
|
|
|
|
last = db.blocks[len(db.blocks)-1].Meta() |
|
|
|
|
addBuffer = last.MaxTime <= mint-int64(db.opts.MinBlockDuration) |
|
|
|
|
} |
|
|
|
|
// Create another block of buffer in front if the DB is initialized or retrieving
|
|
|
|
|
// new data after a long gap.
|
|
|
|
|
// This ensures we always have a full block width if append window.
|
|
|
|
|
if addBuffer { |
|
|
|
|
if _, err := db.createHeadBlock(mint-int64(db.opts.MinBlockDuration), mint); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
// If the previous block reaches into our new window, make it smaller.
|
|
|
|
|
} else if mt := last.MaxTime; mt > mint { |
|
|
|
|
mint = mt |
|
|
|
|
} |
|
|
|
|
if mint >= maxt { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
// Error if the requested time for a head is before the appendable window.
|
|
|
|
|
if len(db.heads) > 0 && t < db.heads[0].Meta().MinTime { |
|
|
|
|
return ErrOutOfBounds |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
_, err := db.createHeadBlock(mint, maxt) |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (a *dbAppender) Commit() error { |
|
|
|
|
@ -640,14 +623,22 @@ func (a *dbAppender) Commit() error { |
|
|
|
|
|
|
|
|
|
// Commits to partial appenders must be concurrent as concurrent appenders
|
|
|
|
|
// may have conflicting locks on head appenders.
|
|
|
|
|
// XXX(fabxc): is this a leaky abstraction? Should make an effort to catch a multi-error?
|
|
|
|
|
var g errgroup.Group |
|
|
|
|
// For high-throughput use cases the errgroup causes significant blocking. Typically,
|
|
|
|
|
// we just deal with a single appender and special case it.
|
|
|
|
|
var err error |
|
|
|
|
|
|
|
|
|
for _, h := range a.heads { |
|
|
|
|
g.Go(h.app.Commit) |
|
|
|
|
switch len(a.heads) { |
|
|
|
|
case 1: |
|
|
|
|
err = a.heads[0].app.Commit() |
|
|
|
|
default: |
|
|
|
|
var g errgroup.Group |
|
|
|
|
for _, h := range a.heads { |
|
|
|
|
g.Go(h.app.Commit) |
|
|
|
|
} |
|
|
|
|
err = g.Wait() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if err := g.Wait(); err != nil { |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
// XXX(fabxc): Push the metric down into head block to account properly
|
|
|
|
|
@ -670,14 +661,15 @@ func (a *dbAppender) Rollback() error { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// appendable returns a copy of a slice of HeadBlocks that can still be appended to.
|
|
|
|
|
func (db *DB) appendable() []headBlock { |
|
|
|
|
var i int |
|
|
|
|
app := make([]headBlock, 0, db.opts.AppendableBlocks) |
|
|
|
|
|
|
|
|
|
if len(db.heads) > db.opts.AppendableBlocks { |
|
|
|
|
i = len(db.heads) - db.opts.AppendableBlocks |
|
|
|
|
func (db *DB) appendable() (r []headBlock) { |
|
|
|
|
switch len(db.heads) { |
|
|
|
|
case 0: |
|
|
|
|
case 1: |
|
|
|
|
r = append(r, db.heads[0]) |
|
|
|
|
default: |
|
|
|
|
r = append(r, db.heads[len(db.heads)-2:]...) |
|
|
|
|
} |
|
|
|
|
return append(app, db.heads[i:]...) |
|
|
|
|
return r |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func intervalOverlap(amin, amax, bmin, bmax int64) bool { |
|
|
|
|
@ -712,7 +704,7 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block { |
|
|
|
|
// openHeadBlock opens the head block at dir.
|
|
|
|
|
func (db *DB) openHeadBlock(dir string) (*HeadBlock, error) { |
|
|
|
|
var ( |
|
|
|
|
wdir = filepath.Join(dir, "wal") |
|
|
|
|
wdir = walDir(dir) |
|
|
|
|
l = log.With(db.logger, "wal", wdir) |
|
|
|
|
) |
|
|
|
|
wal, err := OpenSegmentWAL(wdir, l, 5*time.Second) |
|
|
|
|
@ -727,16 +719,10 @@ func (db *DB) openHeadBlock(dir string) (*HeadBlock, error) { |
|
|
|
|
return h, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// cut starts a new head block to append to. The completed head block
|
|
|
|
|
// will still be appendable for the configured grace period.
|
|
|
|
|
func (db *DB) cut(mint int64) (headBlock, error) { |
|
|
|
|
maxt := mint + int64(db.opts.MinBlockDuration) |
|
|
|
|
|
|
|
|
|
dir, seq, err := nextSequenceFile(db.dir, "b-") |
|
|
|
|
// createHeadBlock starts a new head block to append to.
|
|
|
|
|
func (db *DB) createHeadBlock(mint, maxt int64) (headBlock, error) { |
|
|
|
|
dir, err := TouchHeadBlock(db.dir, mint, maxt) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
if err := TouchHeadBlock(dir, seq, mint, maxt); err != nil { |
|
|
|
|
return nil, errors.Wrapf(err, "touch head block %s", dir) |
|
|
|
|
} |
|
|
|
|
newHead, err := db.openHeadBlock(dir) |
|
|
|
|
@ -759,13 +745,8 @@ func isBlockDir(fi os.FileInfo) bool { |
|
|
|
|
if !fi.IsDir() { |
|
|
|
|
return false |
|
|
|
|
} |
|
|
|
|
if !strings.HasPrefix(fi.Name(), "b-") { |
|
|
|
|
return false |
|
|
|
|
} |
|
|
|
|
if _, err := strconv.ParseUint(fi.Name()[2:], 10, 32); err != nil { |
|
|
|
|
return false |
|
|
|
|
} |
|
|
|
|
return true |
|
|
|
|
_, err := ulid.Parse(fi.Name()) |
|
|
|
|
return err == nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func blockDirs(dir string) ([]string, error) { |
|
|
|
|
@ -870,9 +851,8 @@ func (es MultiError) Err() error { |
|
|
|
|
return es |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func yoloString(b []byte) string { |
|
|
|
|
return *((*string)(unsafe.Pointer(&b))) |
|
|
|
|
} |
|
|
|
|
func yoloString(b []byte) string { return *((*string)(unsafe.Pointer(&b))) } |
|
|
|
|
func yoloBytes(s string) []byte { return *((*[]byte)(unsafe.Pointer(&s))) } |
|
|
|
|
|
|
|
|
|
func closeAll(cs ...io.Closer) error { |
|
|
|
|
var merr MultiError |
|
|
|
|
|