Merge pull request #6777 from prometheus/unify-interfaces
Import TSDB directly, without adapter; Unify Iterators, Querier, Series interfaces.pull/6843/head
commit
babcb86cd0
@ -1,323 +0,0 @@ |
||||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tsdb |
||||
|
||||
import ( |
||||
"context" |
||||
"sync" |
||||
"time" |
||||
|
||||
"github.com/alecthomas/units" |
||||
"github.com/go-kit/kit/log" |
||||
"github.com/pkg/errors" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"github.com/prometheus/common/model" |
||||
"github.com/prometheus/prometheus/pkg/labels" |
||||
"github.com/prometheus/prometheus/storage" |
||||
"github.com/prometheus/prometheus/tsdb" |
||||
) |
||||
|
||||
// ErrNotReady is returned if the underlying storage is not ready yet.
|
||||
var ErrNotReady = errors.New("TSDB not ready") |
||||
|
||||
// ReadyStorage implements the Storage interface while allowing to set the actual
|
||||
// storage at a later point in time.
|
||||
type ReadyStorage struct { |
||||
mtx sync.RWMutex |
||||
a *adapter |
||||
} |
||||
|
||||
// Set the storage.
|
||||
func (s *ReadyStorage) Set(db *tsdb.DB, startTimeMargin int64) { |
||||
s.mtx.Lock() |
||||
defer s.mtx.Unlock() |
||||
|
||||
s.a = &adapter{db: db, startTimeMargin: startTimeMargin} |
||||
} |
||||
|
||||
// Get the storage.
|
||||
func (s *ReadyStorage) Get() *tsdb.DB { |
||||
if x := s.get(); x != nil { |
||||
return x.db |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (s *ReadyStorage) get() *adapter { |
||||
s.mtx.RLock() |
||||
x := s.a |
||||
s.mtx.RUnlock() |
||||
return x |
||||
} |
||||
|
||||
// StartTime implements the Storage interface.
|
||||
func (s *ReadyStorage) StartTime() (int64, error) { |
||||
if x := s.get(); x != nil { |
||||
return x.StartTime() |
||||
} |
||||
return int64(model.Latest), ErrNotReady |
||||
} |
||||
|
||||
// Querier implements the Storage interface.
|
||||
func (s *ReadyStorage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { |
||||
if x := s.get(); x != nil { |
||||
return x.Querier(ctx, mint, maxt) |
||||
} |
||||
return nil, ErrNotReady |
||||
} |
||||
|
||||
// Appender implements the Storage interface.
|
||||
func (s *ReadyStorage) Appender() (storage.Appender, error) { |
||||
if x := s.get(); x != nil { |
||||
return x.Appender() |
||||
} |
||||
return nil, ErrNotReady |
||||
} |
||||
|
||||
// Close implements the Storage interface.
|
||||
func (s *ReadyStorage) Close() error { |
||||
if x := s.Get(); x != nil { |
||||
return x.Close() |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// Adapter return an adapter as storage.Storage.
|
||||
func Adapter(db *tsdb.DB, startTimeMargin int64) storage.Storage { |
||||
return &adapter{db: db, startTimeMargin: startTimeMargin} |
||||
} |
||||
|
||||
// adapter implements a storage.Storage around TSDB.
|
||||
type adapter struct { |
||||
db *tsdb.DB |
||||
startTimeMargin int64 |
||||
} |
||||
|
||||
// Options of the DB storage.
|
||||
type Options struct { |
||||
// The timestamp range of head blocks after which they get persisted.
|
||||
// It's the minimum duration of any persisted block.
|
||||
MinBlockDuration model.Duration |
||||
|
||||
// The maximum timestamp range of compacted blocks.
|
||||
MaxBlockDuration model.Duration |
||||
|
||||
// The maximum size of each WAL segment file.
|
||||
WALSegmentSize units.Base2Bytes |
||||
|
||||
// Duration for how long to retain data.
|
||||
RetentionDuration model.Duration |
||||
|
||||
// Maximum number of bytes to be retained.
|
||||
MaxBytes units.Base2Bytes |
||||
|
||||
// Disable creation and consideration of lockfile.
|
||||
NoLockfile bool |
||||
|
||||
// When true it disables the overlapping blocks check.
|
||||
// This in-turn enables vertical compaction and vertical query merge.
|
||||
AllowOverlappingBlocks bool |
||||
|
||||
// When true records in the WAL will be compressed.
|
||||
WALCompression bool |
||||
} |
||||
|
||||
var ( |
||||
startTime prometheus.GaugeFunc |
||||
headMaxTime prometheus.GaugeFunc |
||||
headMinTime prometheus.GaugeFunc |
||||
) |
||||
|
||||
func registerMetrics(db *tsdb.DB, r prometheus.Registerer) { |
||||
|
||||
startTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ |
||||
Name: "prometheus_tsdb_lowest_timestamp_seconds", |
||||
Help: "Lowest timestamp value stored in the database.", |
||||
}, func() float64 { |
||||
bb := db.Blocks() |
||||
if len(bb) == 0 { |
||||
return float64(db.Head().MinTime()) / 1000 |
||||
} |
||||
return float64(db.Blocks()[0].Meta().MinTime) / 1000 |
||||
}) |
||||
headMinTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ |
||||
Name: "prometheus_tsdb_head_min_time_seconds", |
||||
Help: "Minimum time bound of the head block.", |
||||
}, func() float64 { |
||||
return float64(db.Head().MinTime()) / 1000 |
||||
}) |
||||
headMaxTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ |
||||
Name: "prometheus_tsdb_head_max_time_seconds", |
||||
Help: "Maximum timestamp of the head block.", |
||||
}, func() float64 { |
||||
return float64(db.Head().MaxTime()) / 1000 |
||||
}) |
||||
|
||||
if r != nil { |
||||
r.MustRegister( |
||||
startTime, |
||||
headMaxTime, |
||||
headMinTime, |
||||
) |
||||
} |
||||
} |
||||
|
||||
// Open returns a new storage backed by a TSDB database that is configured for Prometheus.
|
||||
func Open(path string, l log.Logger, r prometheus.Registerer, opts *Options) (*tsdb.DB, error) { |
||||
if opts.MinBlockDuration > opts.MaxBlockDuration { |
||||
opts.MaxBlockDuration = opts.MinBlockDuration |
||||
} |
||||
// Start with smallest block duration and create exponential buckets until the exceed the
|
||||
// configured maximum block duration.
|
||||
rngs := tsdb.ExponentialBlockRanges(int64(time.Duration(opts.MinBlockDuration).Seconds()*1000), 10, 3) |
||||
|
||||
for i, v := range rngs { |
||||
if v > int64(time.Duration(opts.MaxBlockDuration).Seconds()*1000) { |
||||
rngs = rngs[:i] |
||||
break |
||||
} |
||||
} |
||||
|
||||
db, err := tsdb.Open(path, l, r, &tsdb.Options{ |
||||
WALSegmentSize: int(opts.WALSegmentSize), |
||||
RetentionDuration: uint64(time.Duration(opts.RetentionDuration).Seconds() * 1000), |
||||
MaxBytes: int64(opts.MaxBytes), |
||||
BlockRanges: rngs, |
||||
NoLockfile: opts.NoLockfile, |
||||
AllowOverlappingBlocks: opts.AllowOverlappingBlocks, |
||||
WALCompression: opts.WALCompression, |
||||
}) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
registerMetrics(db, r) |
||||
|
||||
return db, nil |
||||
} |
||||
|
||||
// StartTime implements the Storage interface.
|
||||
func (a adapter) StartTime() (int64, error) { |
||||
var startTime int64 |
||||
|
||||
if len(a.db.Blocks()) > 0 { |
||||
startTime = a.db.Blocks()[0].Meta().MinTime |
||||
} else { |
||||
startTime = time.Now().Unix() * 1000 |
||||
} |
||||
|
||||
// Add a safety margin as it may take a few minutes for everything to spin up.
|
||||
return startTime + a.startTimeMargin, nil |
||||
} |
||||
|
||||
func (a adapter) Querier(_ context.Context, mint, maxt int64) (storage.Querier, error) { |
||||
q, err := a.db.Querier(mint, maxt) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return querier{q: q}, nil |
||||
} |
||||
|
||||
// Appender returns a new appender against the storage.
|
||||
func (a adapter) Appender() (storage.Appender, error) { |
||||
return appender{a: a.db.Appender()}, nil |
||||
} |
||||
|
||||
// Close closes the storage and all its underlying resources.
|
||||
func (a adapter) Close() error { |
||||
return a.db.Close() |
||||
} |
||||
|
||||
type querier struct { |
||||
q tsdb.Querier |
||||
} |
||||
|
||||
func (q querier) Select(_ *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { |
||||
set, err := q.q.Select(ms...) |
||||
if err != nil { |
||||
return nil, nil, err |
||||
} |
||||
return seriesSet{set: set}, nil, nil |
||||
} |
||||
|
||||
func (q querier) SelectSorted(_ *storage.SelectParams, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) { |
||||
set, err := q.q.SelectSorted(ms...) |
||||
if err != nil { |
||||
return nil, nil, err |
||||
} |
||||
return seriesSet{set: set}, nil, nil |
||||
} |
||||
|
||||
func (q querier) LabelValues(name string) ([]string, storage.Warnings, error) { |
||||
v, err := q.q.LabelValues(name) |
||||
return v, nil, err |
||||
} |
||||
func (q querier) LabelNames() ([]string, storage.Warnings, error) { |
||||
v, err := q.q.LabelNames() |
||||
return v, nil, err |
||||
} |
||||
func (q querier) Close() error { return q.q.Close() } |
||||
|
||||
type seriesSet struct { |
||||
set tsdb.SeriesSet |
||||
} |
||||
|
||||
func (s seriesSet) Next() bool { return s.set.Next() } |
||||
func (s seriesSet) Err() error { return s.set.Err() } |
||||
func (s seriesSet) At() storage.Series { return series{s: s.set.At()} } |
||||
|
||||
type series struct { |
||||
s tsdb.Series |
||||
} |
||||
|
||||
func (s series) Labels() labels.Labels { return s.s.Labels() } |
||||
func (s series) Iterator() storage.SeriesIterator { return s.s.Iterator() } |
||||
|
||||
type appender struct { |
||||
a tsdb.Appender |
||||
} |
||||
|
||||
func (a appender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { |
||||
ref, err := a.a.Add(lset, t, v) |
||||
|
||||
switch errors.Cause(err) { |
||||
case tsdb.ErrNotFound: |
||||
return 0, storage.ErrNotFound |
||||
case tsdb.ErrOutOfOrderSample: |
||||
return 0, storage.ErrOutOfOrderSample |
||||
case tsdb.ErrAmendSample: |
||||
return 0, storage.ErrDuplicateSampleForTimestamp |
||||
case tsdb.ErrOutOfBounds: |
||||
return 0, storage.ErrOutOfBounds |
||||
} |
||||
return ref, err |
||||
} |
||||
|
||||
func (a appender) AddFast(_ labels.Labels, ref uint64, t int64, v float64) error { |
||||
err := a.a.AddFast(ref, t, v) |
||||
|
||||
switch errors.Cause(err) { |
||||
case tsdb.ErrNotFound: |
||||
return storage.ErrNotFound |
||||
case tsdb.ErrOutOfOrderSample: |
||||
return storage.ErrOutOfOrderSample |
||||
case tsdb.ErrAmendSample: |
||||
return storage.ErrDuplicateSampleForTimestamp |
||||
case tsdb.ErrOutOfBounds: |
||||
return storage.ErrOutOfBounds |
||||
} |
||||
return err |
||||
} |
||||
|
||||
func (a appender) Commit() error { return a.a.Commit() } |
||||
func (a appender) Rollback() error { return a.a.Rollback() } |
||||
@ -1,21 +0,0 @@ |
||||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tsdb |
||||
|
||||
// Export the internal variables only for tests.
|
||||
var ( |
||||
StartTime = &startTime |
||||
HeadMaxTime = &headMaxTime |
||||
HeadMinTime = &headMinTime |
||||
) |
||||
@ -1,64 +0,0 @@ |
||||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tsdb_test |
||||
|
||||
import ( |
||||
"testing" |
||||
|
||||
dto "github.com/prometheus/client_model/go" |
||||
"github.com/prometheus/common/model" |
||||
"github.com/prometheus/prometheus/pkg/labels" |
||||
"github.com/prometheus/prometheus/storage/tsdb" |
||||
"github.com/prometheus/prometheus/util/teststorage" |
||||
"github.com/prometheus/prometheus/util/testutil" |
||||
) |
||||
|
||||
func TestMetrics(t *testing.T) { |
||||
db := teststorage.New(t) |
||||
defer db.Close() |
||||
|
||||
metrics := &dto.Metric{} |
||||
startTime := *tsdb.StartTime |
||||
headMinTime := *tsdb.HeadMinTime |
||||
headMaxTime := *tsdb.HeadMaxTime |
||||
|
||||
// Check initial values.
|
||||
testutil.Ok(t, startTime.Write(metrics)) |
||||
testutil.Equals(t, float64(model.Latest)/1000, metrics.Gauge.GetValue()) |
||||
|
||||
testutil.Ok(t, headMinTime.Write(metrics)) |
||||
testutil.Equals(t, float64(model.Latest)/1000, metrics.Gauge.GetValue()) |
||||
|
||||
testutil.Ok(t, headMaxTime.Write(metrics)) |
||||
testutil.Equals(t, float64(model.Earliest)/1000, metrics.Gauge.GetValue()) |
||||
|
||||
app, err := db.Appender() |
||||
testutil.Ok(t, err) |
||||
|
||||
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 1, 1) |
||||
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 2, 1) |
||||
app.Add(labels.FromStrings(model.MetricNameLabel, "a"), 3, 1) |
||||
testutil.Ok(t, app.Commit()) |
||||
|
||||
// Check after adding some samples.
|
||||
testutil.Ok(t, startTime.Write(metrics)) |
||||
testutil.Equals(t, 0.001, metrics.Gauge.GetValue()) |
||||
|
||||
testutil.Ok(t, headMinTime.Write(metrics)) |
||||
testutil.Equals(t, 0.001, metrics.Gauge.GetValue()) |
||||
|
||||
testutil.Ok(t, headMaxTime.Write(metrics)) |
||||
testutil.Equals(t, 0.003, metrics.Gauge.GetValue()) |
||||
|
||||
} |
||||
Loading…
Reference in new issue