diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index caabeffb0c..8a28ed6db7 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -225,7 +225,7 @@ func main() { var ( localStorage = &tsdb.ReadyStorage{} - remoteStorage = remote.NewStorage(log.With(logger, "component", "remote")) + remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), localStorage.StartTime) fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage) ) @@ -326,7 +326,8 @@ func main() { } level.Info(logger).Log("msg", "TSDB started") - localStorage.Set(db) + startTimeMargin := int64(time.Duration(cfg.tsdb.MinBlockDuration).Seconds() * 1000) + localStorage.Set(db, startTimeMargin) }() prometheus.MustRegister(configSuccess) diff --git a/config/config.go b/config/config.go index cc5eaefe6c..8a295b8961 100644 --- a/config/config.go +++ b/config/config.go @@ -197,6 +197,7 @@ var ( // DefaultRemoteReadConfig is the default remote read configuration. DefaultRemoteReadConfig = RemoteReadConfig{ RemoteTimeout: model.Duration(1 * time.Minute), + ReadRecent: true, } ) @@ -1490,7 +1491,7 @@ type QueueConfig struct { type RemoteReadConfig struct { URL *URL `yaml:"url"` RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"` - + ReadRecent bool `yaml:"read_recent,omitempty"` // We cannot do proper Go type embedding below as the parser will then parse // values arbitrarily into the overflow maps of further-down types. HTTPClientConfig HTTPClientConfig `yaml:",inline"` diff --git a/config/config_test.go b/config/config_test.go index 5f1a8d74ea..8ae364e28a 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -75,6 +75,19 @@ var expectedConf = &Config{ }, }, + RemoteReadConfigs: []*RemoteReadConfig{ + { + URL: mustParseURL("http://remote1/read"), + RemoteTimeout: model.Duration(1 * time.Minute), + ReadRecent: true, + }, + { + URL: mustParseURL("http://remote3/read"), + RemoteTimeout: model.Duration(1 * time.Minute), + ReadRecent: false, + }, + }, + ScrapeConfigs: []*ScrapeConfig{ { JobName: "prometheus", diff --git a/config/testdata/conf.good.yml b/config/testdata/conf.good.yml index 34da77d4ed..a65a401ac9 100644 --- a/config/testdata/conf.good.yml +++ b/config/testdata/conf.good.yml @@ -20,6 +20,12 @@ remote_write: action: drop - url: http://remote2/push +remote_read: + - url: http://remote1/read + read_recent: true + - url: http://remote3/read + read_recent: false + scrape_configs: - job_name: prometheus diff --git a/storage/fanout.go b/storage/fanout.go index b5743f1db4..2bc4323a74 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -20,6 +20,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" ) @@ -40,6 +41,27 @@ func NewFanout(logger log.Logger, primary Storage, secondaries ...Storage) Stora } } +// StartTime implements the Storage interface. +func (f *fanout) StartTime() (int64, error) { + // StartTime of a fanout should be the earliest StartTime of all its storages, + // both primary and secondaries. + firstTime, err := f.primary.StartTime() + if err != nil { + return int64(model.Latest), err + } + + for _, storage := range f.secondaries { + t, err := storage.StartTime() + if err != nil { + return int64(model.Latest), err + } + if t < firstTime { + firstTime = t + } + } + return firstTime, nil +} + func (f *fanout) Querier(ctx context.Context, mint, maxt int64) (Querier, error) { queriers := mergeQuerier{ queriers: make([]Querier, 0, 1+len(f.secondaries)), diff --git a/storage/interface.go b/storage/interface.go index 5eacb9f584..2b21792520 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -31,6 +31,9 @@ var ( // Storage ingests and manages samples, along with various indexes. All methods // are goroutine-safe. Storage implements storage.SampleAppender. type Storage interface { + // StartTime returns the oldest timestamp stored in the storage. + StartTime() (int64, error) + // Querier returns a new Querier on the storage. Querier(ctx context.Context, mint, maxt int64) (Querier, error) diff --git a/storage/remote/client.go b/storage/remote/client.go index f96181ce0d..7e4b23206c 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -37,15 +37,17 @@ const maxErrMsgLen = 256 // Client allows reading and writing from/to a remote HTTP endpoint. type Client struct { - index int // Used to differentiate metrics. - url *config.URL - client *http.Client - timeout time.Duration + index int // Used to differentiate metrics. + url *config.URL + client *http.Client + timeout time.Duration + readRecent bool } type clientConfig struct { url *config.URL timeout model.Duration + readRecent bool httpClientConfig config.HTTPClientConfig } @@ -57,10 +59,11 @@ func NewClient(index int, conf *clientConfig) (*Client, error) { } return &Client{ - index: index, - url: conf.url, - client: httpClient, - timeout: time.Duration(conf.timeout), + index: index, + url: conf.url, + client: httpClient, + timeout: time.Duration(conf.timeout), + readRecent: conf.readRecent, }, nil } diff --git a/storage/remote/read.go b/storage/remote/read.go index 1a88e6685c..8474a4236e 100644 --- a/storage/remote/read.go +++ b/storage/remote/read.go @@ -30,17 +30,35 @@ func (r *Storage) Querier(_ context.Context, mint, maxt int64) (storage.Querier, defer r.mtx.Unlock() queriers := make([]storage.Querier, 0, len(r.clients)) + localStartTime, err := r.localStartTimeCallback() + if err != nil { + return nil, err + } for _, c := range r.clients { + cmaxt := maxt + if !c.readRecent { + // Avoid queries whose timerange is later than the first timestamp in local DB. + if mint > localStartTime { + continue + } + // Query only samples older than the first timestamp in local DB. + if maxt > localStartTime { + cmaxt = localStartTime + } + } queriers = append(queriers, &querier{ mint: mint, - maxt: maxt, + maxt: cmaxt, client: c, externalLabels: r.externalLabels, }) } - return storage.NewMergeQuerier(queriers), nil + return newMergeQueriers(queriers), nil } +// Store it in variable to make it mockable in tests since a mergeQuerier is not publicly exposed. +var newMergeQueriers = storage.NewMergeQuerier + // Querier is an adapter to make a Client usable as a storage.Querier. type querier struct { mint, maxt int64 diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index 6b71afcdbb..cf50a0c5d3 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -17,8 +17,10 @@ import ( "reflect" "sort" "testing" + "time" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" @@ -123,3 +125,59 @@ func TestConcreteSeriesSet(t *testing.T) { t.Fatalf("Expected Next() to be false.") } } + +type mockMergeQuerier struct{ queriersCount int } + +func (*mockMergeQuerier) Select(...*labels.Matcher) storage.SeriesSet { return nil } +func (*mockMergeQuerier) LabelValues(name string) ([]string, error) { return nil, nil } +func (*mockMergeQuerier) Close() error { return nil } + +func TestRemoteStorageQuerier(t *testing.T) { + tests := []struct { + localStartTime int64 + readRecentClients []bool + mint int64 + maxt int64 + expectedQueriersCount int + }{ + { + localStartTime: int64(20), + readRecentClients: []bool{true, true, false}, + mint: int64(0), + maxt: int64(50), + expectedQueriersCount: 3, + }, + { + localStartTime: int64(20), + readRecentClients: []bool{true, true, false}, + mint: int64(30), + maxt: int64(50), + expectedQueriersCount: 2, + }, + } + + for i, test := range tests { + s := NewStorage(nil, func() (int64, error) { return test.localStartTime, nil }) + s.clients = []*Client{} + for _, readRecent := range test.readRecentClients { + c, _ := NewClient(0, &clientConfig{ + url: nil, + timeout: model.Duration(30 * time.Second), + httpClientConfig: config.HTTPClientConfig{}, + readRecent: readRecent, + }) + s.clients = append(s.clients, c) + } + // overrides mergeQuerier to mockMergeQuerier so we can reflect its type + newMergeQueriers = func(queriers []storage.Querier) storage.Querier { + return &mockMergeQuerier{queriersCount: len(queriers)} + } + + querier, _ := s.Querier(nil, test.mint, test.maxt) + actualQueriersCount := reflect.ValueOf(querier).Interface().(*mockMergeQuerier).queriersCount + + if !reflect.DeepEqual(actualQueriersCount, test.expectedQueriersCount) { + t.Fatalf("%d. unexpected queriers count; want %v, got %v", i, test.expectedQueriersCount, actualQueriersCount) + } + } +} diff --git a/storage/remote/storage.go b/storage/remote/storage.go index f11ddb4740..65f3f2d19d 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -21,6 +21,9 @@ import ( "github.com/prometheus/prometheus/config" ) +// Callback func that return the oldest timestamp stored in a storage. +type startTimeCallback func() (int64, error) + // Storage represents all the remote read and write endpoints. It implements // storage.Storage. type Storage struct { @@ -31,15 +34,17 @@ type Storage struct { queues []*QueueManager // For reads - clients []*Client - externalLabels model.LabelSet + clients []*Client + localStartTimeCallback startTimeCallback + externalLabels model.LabelSet } -func NewStorage(l log.Logger) *Storage { +// NewStorage returns a remote.Storage. +func NewStorage(l log.Logger, stCallback startTimeCallback) *Storage { if l == nil { l = log.NewNopLogger() } - return &Storage{logger: l} + return &Storage{logger: l, localStartTimeCallback: stCallback} } // ApplyConfig updates the state as the new config requires. @@ -87,6 +92,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { url: rrConf.URL, timeout: rrConf.RemoteTimeout, httpClientConfig: rrConf.HTTPClientConfig, + readRecent: rrConf.ReadRecent, }) if err != nil { return err @@ -100,6 +106,11 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { return nil } +// StartTime implements the Storage interface. +func (s *Storage) StartTime() (int64, error) { + return int64(model.Latest), nil +} + // Close the background processing of the storage queues. func (s *Storage) Close() error { s.mtx.Lock() diff --git a/storage/tsdb/tsdb.go b/storage/tsdb/tsdb.go index 6be55c0c58..e580d0a65e 100644 --- a/storage/tsdb/tsdb.go +++ b/storage/tsdb/tsdb.go @@ -40,11 +40,11 @@ type ReadyStorage struct { } // Set the storage. -func (s *ReadyStorage) Set(db *tsdb.DB) { +func (s *ReadyStorage) Set(db *tsdb.DB, startTimeMargin int64) { s.mtx.Lock() defer s.mtx.Unlock() - s.a = &adapter{db: db} + s.a = &adapter{db: db, startTimeMargin: startTimeMargin} } // Get the storage. @@ -62,6 +62,14 @@ func (s *ReadyStorage) get() *adapter { return x } +// StartTime implements the Storage interface. +func (s *ReadyStorage) StartTime() (int64, error) { + if x := s.get(); x != nil { + return x.StartTime() + } + return int64(model.Latest), ErrNotReady +} + // Querier implements the Storage interface. func (s *ReadyStorage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { if x := s.get(); x != nil { @@ -86,13 +94,15 @@ func (s *ReadyStorage) Close() error { return nil } -func Adapter(db *tsdb.DB) storage.Storage { - return &adapter{db: db} +// Adapter return an adapter as storage.Storage. +func Adapter(db *tsdb.DB, startTimeMargin int64) storage.Storage { + return &adapter{db: db, startTimeMargin: startTimeMargin} } // adapter implements a storage.Storage around TSDB. type adapter struct { - db *tsdb.DB + db *tsdb.DB + startTimeMargin int64 } // Options of the DB storage. @@ -139,6 +149,57 @@ func Open(path string, l log.Logger, r prometheus.Registerer, opts *Options) (*t return db, nil } +// StartTime implements the Storage interface. +func (a adapter) StartTime() (int64, error) { + startTime := int64(model.Latest) + + var indexr tsdb.IndexReader + if len(a.db.Blocks()) > 0 { + var err error + indexr, err = a.db.Blocks()[0].Index() + if err != nil { + return startTime, err + } + } else { + var err error + indexr, err = a.db.Head().Index() + if err != nil { + return startTime, err + } + } + + joblabel := "job" + tpls, err := indexr.LabelValues(joblabel) + if err != nil { + return startTime, err + } + + for i := 0; i < tpls.Len(); i++ { + vals, err := tpls.At(i) + if err != nil { + continue + } + + for _, v := range vals { + p, err := indexr.Postings(joblabel, v) + if err != nil { + continue + } + + if p.Next() { + var lset tsdbLabels.Labels + var chks []tsdb.ChunkMeta + indexr.Series(p.At(), &lset, &chks) + if startTime > chks[0].MinTime { + startTime = chks[0].MinTime + } + } + } + } + // Add a safety margin as it may take a few minutes for everything to spin up. + return startTime + a.startTimeMargin, nil +} + func (a adapter) Querier(_ context.Context, mint, maxt int64) (storage.Querier, error) { q, err := a.db.Querier(mint, maxt) if err != nil { diff --git a/util/testutil/storage.go b/util/testutil/storage.go index 46faa9dff4..246d6190d7 100644 --- a/util/testutil/storage.go +++ b/util/testutil/storage.go @@ -40,7 +40,7 @@ func NewStorage(t T) storage.Storage { if err != nil { t.Fatalf("Opening test storage failed: %s", err) } - return testStorage{Storage: tsdb.Adapter(db), dir: dir} + return testStorage{Storage: tsdb.Adapter(db, int64(0)), dir: dir} } type testStorage struct {