parent
16464c3a33
commit
2dda5775e3
@ -0,0 +1,374 @@ |
||||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.package remote
|
||||
|
||||
package storage |
||||
|
||||
import ( |
||||
"container/heap" |
||||
"strings" |
||||
|
||||
"github.com/prometheus/prometheus/pkg/labels" |
||||
) |
||||
|
||||
type fanout struct { |
||||
storages []Storage |
||||
} |
||||
|
||||
// NewFanout returns a new fan-out Storage, which proxies reads and writes
|
||||
// through to multiple underlying storages.
|
||||
func NewFanout(storages ...Storage) Storage { |
||||
return &fanout{ |
||||
storages: storages, |
||||
} |
||||
} |
||||
|
||||
func (f *fanout) Querier(mint, maxt int64) (Querier, error) { |
||||
queriers := mergeQuerier{ |
||||
queriers: make([]Querier, 0, len(f.storages)), |
||||
} |
||||
for _, storage := range f.storages { |
||||
querier, err := storage.Querier(mint, maxt) |
||||
if err != nil { |
||||
queriers.Close() |
||||
return nil, err |
||||
} |
||||
queriers.queriers = append(queriers.queriers, querier) |
||||
} |
||||
return &queriers, nil |
||||
} |
||||
|
||||
func (f *fanout) Appender() (Appender, error) { |
||||
appenders := make([]Appender, 0, len(f.storages)) |
||||
for _, storage := range f.storages { |
||||
appender, err := storage.Appender() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
appenders = append(appenders, appender) |
||||
} |
||||
return &fanoutAppender{ |
||||
appenders: appenders, |
||||
}, nil |
||||
} |
||||
|
||||
// Close closes the storage and all its underlying resources.
|
||||
func (f *fanout) Close() error { |
||||
// TODO return multiple errors?
|
||||
var lastErr error |
||||
for _, storage := range f.storages { |
||||
if err := storage.Close(); err != nil { |
||||
lastErr = err |
||||
} |
||||
} |
||||
return lastErr |
||||
} |
||||
|
||||
// fanoutAppender implements Appender.
|
||||
type fanoutAppender struct { |
||||
appenders []Appender |
||||
} |
||||
|
||||
func (f *fanoutAppender) Add(l labels.Labels, t int64, v float64) (string, error) { |
||||
for _, appender := range f.appenders { |
||||
if _, err := appender.Add(l, t, v); err != nil { |
||||
return "", err |
||||
} |
||||
} |
||||
return "", nil |
||||
} |
||||
|
||||
func (f *fanoutAppender) AddFast(ref string, t int64, v float64) error { |
||||
// TODO this is a cheat, and causes us to fall back to slow path even for local writes.
|
||||
return ErrNotFound |
||||
} |
||||
|
||||
func (f *fanoutAppender) Commit() error { |
||||
for _, appender := range f.appenders { |
||||
if err := appender.Commit(); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (f *fanoutAppender) Rollback() error { |
||||
for _, appender := range f.appenders { |
||||
if err := appender.Rollback(); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// mergeQuerier implements Querier.
|
||||
type mergeQuerier struct { |
||||
queriers []Querier |
||||
} |
||||
|
||||
func NewMergeQuerier(queriers []Querier) Querier { |
||||
return &mergeQuerier{ |
||||
queriers: queriers, |
||||
} |
||||
} |
||||
|
||||
// Select returns a set of series that matches the given label matchers.
|
||||
func (q *mergeQuerier) Select(matchers ...*labels.Matcher) SeriesSet { |
||||
seriesSets := make([]SeriesSet, 0, len(q.queriers)) |
||||
for _, querier := range q.queriers { |
||||
seriesSets = append(seriesSets, querier.Select(matchers...)) |
||||
} |
||||
return newMergeSeriesSet(seriesSets) |
||||
} |
||||
|
||||
// LabelValues returns all potential values for a label name.
|
||||
func (q *mergeQuerier) LabelValues(name string) ([]string, error) { |
||||
var results [][]string |
||||
for _, querier := range q.queriers { |
||||
values, err := querier.LabelValues(name) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
results = append(results, values) |
||||
} |
||||
return mergeStringSlices(results), nil |
||||
} |
||||
|
||||
func mergeStringSlices(ss [][]string) []string { |
||||
switch len(ss) { |
||||
case 0: |
||||
return nil |
||||
case 1: |
||||
return ss[0] |
||||
case 2: |
||||
return mergeTwoStringSlices(ss[0], ss[1]) |
||||
default: |
||||
halfway := len(ss) / 2 |
||||
return mergeTwoStringSlices( |
||||
mergeStringSlices(ss[:halfway]), |
||||
mergeStringSlices(ss[halfway:]), |
||||
) |
||||
} |
||||
} |
||||
|
||||
func mergeTwoStringSlices(a, b []string) []string { |
||||
i, j := 0, 0 |
||||
result := make([]string, 0, len(a)+len(b)) |
||||
for i < len(a) && j < len(b) { |
||||
switch strings.Compare(a[i], b[j]) { |
||||
case 0: |
||||
result = append(result, a[i]) |
||||
i++ |
||||
j++ |
||||
case 1: |
||||
result = append(result, a[i]) |
||||
i++ |
||||
case -1: |
||||
result = append(result, b[j]) |
||||
j++ |
||||
} |
||||
} |
||||
copy(result, a[i:]) |
||||
copy(result, b[j:]) |
||||
return result |
||||
} |
||||
|
||||
// Close releases the resources of the Querier.
|
||||
func (q *mergeQuerier) Close() error { |
||||
// TODO return multiple errors?
|
||||
var lastErr error |
||||
for _, querier := range q.queriers { |
||||
if err := querier.Close(); err != nil { |
||||
lastErr = err |
||||
} |
||||
} |
||||
return lastErr |
||||
} |
||||
|
||||
// mergeSeriesSet implements SeriesSet
|
||||
type mergeSeriesSet struct { |
||||
currentLabels labels.Labels |
||||
currentSets []SeriesSet |
||||
sets seriesSetHeap |
||||
} |
||||
|
||||
func newMergeSeriesSet(sets []SeriesSet) SeriesSet { |
||||
// Sets need to be pre-advanced, so we can introspect the label of the
|
||||
// series under the cursor.
|
||||
var h seriesSetHeap |
||||
for _, set := range sets { |
||||
if set.Next() { |
||||
heap.Push(&h, set) |
||||
} |
||||
} |
||||
return &mergeSeriesSet{ |
||||
sets: h, |
||||
} |
||||
} |
||||
|
||||
func (c *mergeSeriesSet) Next() bool { |
||||
// Firstly advance all the current series sets. If any of them have run out
|
||||
// we can drop them, otherwise they should be inserted back into the heap.
|
||||
for _, set := range c.currentSets { |
||||
if set.Next() { |
||||
heap.Push(&c.sets, set) |
||||
} |
||||
} |
||||
if len(c.sets) == 0 { |
||||
return false |
||||
} |
||||
|
||||
// Now, pop items of the heap that have equal label sets.
|
||||
c.currentSets = nil |
||||
c.currentLabels = c.sets[0].At().Labels() |
||||
for len(c.sets) > 0 && labels.Equal(c.currentLabels, c.sets[0].At().Labels()) { |
||||
set := heap.Pop(&c.sets).(SeriesSet) |
||||
c.currentSets = append(c.currentSets, set) |
||||
} |
||||
return true |
||||
} |
||||
|
||||
func (c *mergeSeriesSet) At() Series { |
||||
series := []Series{} |
||||
for _, seriesSet := range c.currentSets { |
||||
series = append(series, seriesSet.At()) |
||||
} |
||||
return &mergeSeries{ |
||||
labels: c.currentLabels, |
||||
series: series, |
||||
} |
||||
} |
||||
|
||||
func (c *mergeSeriesSet) Err() error { |
||||
for _, set := range c.sets { |
||||
if err := set.Err(); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
type seriesSetHeap []SeriesSet |
||||
|
||||
func (h seriesSetHeap) Len() int { return len(h) } |
||||
func (h seriesSetHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } |
||||
|
||||
func (h seriesSetHeap) Less(i, j int) bool { |
||||
a, b := h[i].At().Labels(), h[j].At().Labels() |
||||
return labels.Compare(a, b) < 0 |
||||
} |
||||
|
||||
func (h *seriesSetHeap) Push(x interface{}) { |
||||
*h = append(*h, x.(SeriesSet)) |
||||
} |
||||
|
||||
func (h *seriesSetHeap) Pop() interface{} { |
||||
old := *h |
||||
n := len(old) |
||||
x := old[n-1] |
||||
*h = old[0 : n-1] |
||||
return x |
||||
} |
||||
|
||||
type mergeSeries struct { |
||||
labels labels.Labels |
||||
series []Series |
||||
} |
||||
|
||||
func (m *mergeSeries) Labels() labels.Labels { |
||||
return m.labels |
||||
} |
||||
|
||||
func (m *mergeSeries) Iterator() SeriesIterator { |
||||
iterators := make([]SeriesIterator, 0, len(m.series)) |
||||
for _, s := range m.series { |
||||
iterators = append(iterators, s.Iterator()) |
||||
} |
||||
return &mergeIterator{ |
||||
iterators: iterators, |
||||
} |
||||
} |
||||
|
||||
type mergeIterator struct { |
||||
iterators []SeriesIterator |
||||
h seriesIteratorHeap |
||||
} |
||||
|
||||
func newMergeIterator(iterators []SeriesIterator) SeriesIterator { |
||||
return &mergeIterator{ |
||||
iterators: iterators, |
||||
h: nil, |
||||
} |
||||
} |
||||
|
||||
func (c *mergeIterator) Seek(t int64) bool { |
||||
c.h = seriesIteratorHeap{} |
||||
for _, iter := range c.iterators { |
||||
if iter.Seek(t) { |
||||
heap.Push(&c.h, iter) |
||||
} |
||||
} |
||||
return len(c.h) > 0 |
||||
} |
||||
|
||||
func (c *mergeIterator) At() (t int64, v float64) { |
||||
// TODO do I need to dedupe or just merge?
|
||||
return c.h[0].At() |
||||
} |
||||
|
||||
func (c *mergeIterator) Next() bool { |
||||
// Detect the case where Next is called before At
|
||||
if c.h == nil { |
||||
panic("Next() called before Seek()") |
||||
} |
||||
|
||||
if len(c.h) == 0 { |
||||
return false |
||||
} |
||||
iter := heap.Pop(&c.h).(SeriesIterator) |
||||
if iter.Next() { |
||||
heap.Push(&c.h, iter) |
||||
} |
||||
return len(c.h) > 0 |
||||
} |
||||
|
||||
func (c *mergeIterator) Err() error { |
||||
for _, iter := range c.iterators { |
||||
if err := iter.Err(); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
type seriesIteratorHeap []SeriesIterator |
||||
|
||||
func (h seriesIteratorHeap) Len() int { return len(h) } |
||||
func (h seriesIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } |
||||
|
||||
func (h seriesIteratorHeap) Less(i, j int) bool { |
||||
at, _ := h[i].At() |
||||
bt, _ := h[j].At() |
||||
return at < bt |
||||
} |
||||
|
||||
func (h *seriesIteratorHeap) Push(x interface{}) { |
||||
*h = append(*h, x.(SeriesIterator)) |
||||
} |
||||
|
||||
func (h *seriesIteratorHeap) Pop() interface{} { |
||||
old := *h |
||||
n := len(old) |
||||
x := old[n-1] |
||||
*h = old[0 : n-1] |
||||
return x |
||||
} |
||||
@ -0,0 +1,200 @@ |
||||
// Copyright 2016 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package remote |
||||
|
||||
import ( |
||||
"bufio" |
||||
"bytes" |
||||
"fmt" |
||||
"io" |
||||
"io/ioutil" |
||||
"net/http" |
||||
"time" |
||||
|
||||
"github.com/golang/protobuf/proto" |
||||
"github.com/golang/snappy" |
||||
"golang.org/x/net/context" |
||||
"golang.org/x/net/context/ctxhttp" |
||||
|
||||
"github.com/prometheus/common/model" |
||||
"github.com/prometheus/prometheus/config" |
||||
"github.com/prometheus/prometheus/util/httputil" |
||||
) |
||||
|
||||
const maxErrMsgLen = 256 |
||||
|
||||
// Client allows reading and writing from/to a remote HTTP endpoint.
|
||||
type Client struct { |
||||
index int // Used to differentiate metrics.
|
||||
url *config.URL |
||||
client *http.Client |
||||
timeout time.Duration |
||||
} |
||||
|
||||
type clientConfig struct { |
||||
url *config.URL |
||||
timeout model.Duration |
||||
httpClientConfig config.HTTPClientConfig |
||||
} |
||||
|
||||
// NewClient creates a new Client.
|
||||
func NewClient(index int, conf *clientConfig) (*Client, error) { |
||||
httpClient, err := httputil.NewClientFromConfig(conf.httpClientConfig) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return &Client{ |
||||
index: index, |
||||
url: conf.url, |
||||
client: httpClient, |
||||
timeout: time.Duration(conf.timeout), |
||||
}, nil |
||||
} |
||||
|
||||
type recoverableError struct { |
||||
error |
||||
} |
||||
|
||||
// Store sends a batch of samples to the HTTP endpoint.
|
||||
func (c *Client) Store(samples model.Samples) error { |
||||
req := &WriteRequest{ |
||||
Timeseries: make([]*TimeSeries, 0, len(samples)), |
||||
} |
||||
for _, s := range samples { |
||||
ts := &TimeSeries{ |
||||
Labels: make([]*LabelPair, 0, len(s.Metric)), |
||||
} |
||||
for k, v := range s.Metric { |
||||
ts.Labels = append(ts.Labels, |
||||
&LabelPair{ |
||||
Name: string(k), |
||||
Value: string(v), |
||||
}) |
||||
} |
||||
ts.Samples = []*Sample{ |
||||
{ |
||||
Value: float64(s.Value), |
||||
TimestampMs: int64(s.Timestamp), |
||||
}, |
||||
} |
||||
req.Timeseries = append(req.Timeseries, ts) |
||||
} |
||||
|
||||
data, err := proto.Marshal(req) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
compressed := snappy.Encode(nil, data) |
||||
httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(compressed)) |
||||
if err != nil { |
||||
// Errors from NewRequest are from unparseable URLs, so are not
|
||||
// recoverable.
|
||||
return err |
||||
} |
||||
httpReq.Header.Add("Content-Encoding", "snappy") |
||||
httpReq.Header.Set("Content-Type", "application/x-protobuf") |
||||
httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") |
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), c.timeout) |
||||
defer cancel() |
||||
|
||||
httpResp, err := ctxhttp.Do(ctx, c.client, httpReq) |
||||
if err != nil { |
||||
// Errors from client.Do are from (for example) network errors, so are
|
||||
// recoverable.
|
||||
return recoverableError{err} |
||||
} |
||||
defer httpResp.Body.Close() |
||||
|
||||
if httpResp.StatusCode/100 != 2 { |
||||
scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, maxErrMsgLen)) |
||||
line := "" |
||||
if scanner.Scan() { |
||||
line = scanner.Text() |
||||
} |
||||
err = fmt.Errorf("server returned HTTP status %s: %s", httpResp.Status, line) |
||||
} |
||||
if httpResp.StatusCode/100 == 5 { |
||||
return recoverableError{err} |
||||
} |
||||
return err |
||||
} |
||||
|
||||
// Name identifies the client.
|
||||
func (c Client) Name() string { |
||||
return fmt.Sprintf("%d:%s", c.index, c.url) |
||||
} |
||||
|
||||
// Read reads from a remote endpoint.
|
||||
func (c *Client) Read(ctx context.Context, from, through int64, matchers []*LabelMatcher) ([]*TimeSeries, error) { |
||||
req := &ReadRequest{ |
||||
// TODO: Support batching multiple queries into one read request,
|
||||
// as the protobuf interface allows for it.
|
||||
Queries: []*Query{{ |
||||
StartTimestampMs: from, |
||||
EndTimestampMs: through, |
||||
Matchers: matchers, |
||||
}}, |
||||
} |
||||
|
||||
data, err := proto.Marshal(req) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("unable to marshal read request: %v", err) |
||||
} |
||||
|
||||
compressed := snappy.Encode(nil, data) |
||||
httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(compressed)) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("unable to create request: %v", err) |
||||
} |
||||
httpReq.Header.Add("Content-Encoding", "snappy") |
||||
httpReq.Header.Set("Content-Type", "application/x-protobuf") |
||||
httpReq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0") |
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, c.timeout) |
||||
defer cancel() |
||||
|
||||
httpResp, err := ctxhttp.Do(ctx, c.client, httpReq) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("error sending request: %v", err) |
||||
} |
||||
defer httpResp.Body.Close() |
||||
if httpResp.StatusCode/100 != 2 { |
||||
return nil, fmt.Errorf("server returned HTTP status %s", httpResp.Status) |
||||
} |
||||
|
||||
compressed, err = ioutil.ReadAll(httpResp.Body) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("error reading response: %v", err) |
||||
} |
||||
|
||||
uncompressed, err := snappy.Decode(nil, compressed) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("error reading response: %v", err) |
||||
} |
||||
|
||||
var resp ReadResponse |
||||
err = proto.Unmarshal(uncompressed, &resp) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("unable to unmarshal response body: %v", err) |
||||
} |
||||
|
||||
if len(resp.Results) != len(req.Queries) { |
||||
return nil, fmt.Errorf("responses: want %d, got %d", len(req.Queries), len(resp.Results)) |
||||
} |
||||
|
||||
return resp.Results[0].Timeseries, nil |
||||
} |
||||
@ -0,0 +1,79 @@ |
||||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.package remote
|
||||
|
||||
package remote |
||||
|
||||
import ( |
||||
"fmt" |
||||
"net/http" |
||||
"net/http/httptest" |
||||
"net/url" |
||||
"reflect" |
||||
"strings" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/prometheus/common/model" |
||||
"github.com/prometheus/prometheus/config" |
||||
) |
||||
|
||||
var longErrMessage = strings.Repeat("error message", maxErrMsgLen) |
||||
|
||||
func TestStoreHTTPErrorHandling(t *testing.T) { |
||||
tests := []struct { |
||||
code int |
||||
err error |
||||
}{ |
||||
{ |
||||
code: 200, |
||||
err: nil, |
||||
}, |
||||
{ |
||||
code: 300, |
||||
err: fmt.Errorf("server returned HTTP status 300 Multiple Choices: " + longErrMessage[:maxErrMsgLen]), |
||||
}, |
||||
{ |
||||
code: 404, |
||||
err: fmt.Errorf("server returned HTTP status 404 Not Found: " + longErrMessage[:maxErrMsgLen]), |
||||
}, |
||||
{ |
||||
code: 500, |
||||
err: recoverableError{fmt.Errorf("server returned HTTP status 500 Internal Server Error: " + longErrMessage[:maxErrMsgLen])}, |
||||
}, |
||||
} |
||||
|
||||
for i, test := range tests { |
||||
server := httptest.NewServer( |
||||
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { |
||||
http.Error(w, longErrMessage, test.code) |
||||
}), |
||||
) |
||||
|
||||
serverURL, err := url.Parse(server.URL) |
||||
if err != nil { |
||||
panic(err) |
||||
} |
||||
|
||||
c, err := NewClient(0, &clientConfig{ |
||||
url: &config.URL{serverURL}, |
||||
timeout: model.Duration(time.Second), |
||||
}) |
||||
|
||||
err = c.Store(nil) |
||||
if !reflect.DeepEqual(err, test.err) { |
||||
t.Errorf("%d. Unexpected error; want %v, got %v", i, test.err, err) |
||||
} |
||||
|
||||
server.Close() |
||||
} |
||||
} |
||||
@ -0,0 +1,68 @@ |
||||
// Copyright 2013 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package remote |
||||
|
||||
import ( |
||||
"sync" |
||||
"sync/atomic" |
||||
"time" |
||||
) |
||||
|
||||
// ewmaRate tracks an exponentially weighted moving average of a per-second rate.
|
||||
type ewmaRate struct { |
||||
newEvents int64 |
||||
alpha float64 |
||||
interval time.Duration |
||||
lastRate float64 |
||||
init bool |
||||
mutex sync.Mutex |
||||
} |
||||
|
||||
// newEWMARate always allocates a new ewmaRate, as this guarantees the atomically
|
||||
// accessed int64 will be aligned on ARM. See prometheus#2666.
|
||||
func newEWMARate(alpha float64, interval time.Duration) *ewmaRate { |
||||
return &ewmaRate{ |
||||
alpha: alpha, |
||||
interval: interval, |
||||
} |
||||
} |
||||
|
||||
// rate returns the per-second rate.
|
||||
func (r *ewmaRate) rate() float64 { |
||||
r.mutex.Lock() |
||||
defer r.mutex.Unlock() |
||||
return r.lastRate |
||||
} |
||||
|
||||
// tick assumes to be called every r.interval.
|
||||
func (r *ewmaRate) tick() { |
||||
newEvents := atomic.LoadInt64(&r.newEvents) |
||||
atomic.AddInt64(&r.newEvents, -newEvents) |
||||
instantRate := float64(newEvents) / r.interval.Seconds() |
||||
|
||||
r.mutex.Lock() |
||||
defer r.mutex.Unlock() |
||||
|
||||
if r.init { |
||||
r.lastRate += r.alpha * (instantRate - r.lastRate) |
||||
} else { |
||||
r.init = true |
||||
r.lastRate = instantRate |
||||
} |
||||
} |
||||
|
||||
// inc counts one event.
|
||||
func (r *ewmaRate) incr(incr int64) { |
||||
atomic.AddInt64(&r.newEvents, incr) |
||||
} |
||||
@ -0,0 +1,512 @@ |
||||
// Copyright 2013 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package remote |
||||
|
||||
import ( |
||||
"math" |
||||
"sync" |
||||
"time" |
||||
|
||||
"golang.org/x/time/rate" |
||||
|
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"github.com/prometheus/common/log" |
||||
"github.com/prometheus/common/model" |
||||
"github.com/prometheus/prometheus/config" |
||||
"github.com/prometheus/prometheus/relabel" |
||||
) |
||||
|
||||
// String constants for instrumentation.
|
||||
const ( |
||||
namespace = "prometheus" |
||||
subsystem = "remote_storage" |
||||
queue = "queue" |
||||
|
||||
// We track samples in/out and how long pushes take using an Exponentially
|
||||
// Weighted Moving Average.
|
||||
ewmaWeight = 0.2 |
||||
shardUpdateDuration = 10 * time.Second |
||||
|
||||
// Allow 30% too many shards before scaling down.
|
||||
shardToleranceFraction = 0.3 |
||||
|
||||
// Limit to 1 log event every 10s
|
||||
logRateLimit = 0.1 |
||||
logBurst = 10 |
||||
) |
||||
|
||||
var ( |
||||
succeededSamplesTotal = prometheus.NewCounterVec( |
||||
prometheus.CounterOpts{ |
||||
Namespace: namespace, |
||||
Subsystem: subsystem, |
||||
Name: "succeeded_samples_total", |
||||
Help: "Total number of samples successfully sent to remote storage.", |
||||
}, |
||||
[]string{queue}, |
||||
) |
||||
failedSamplesTotal = prometheus.NewCounterVec( |
||||
prometheus.CounterOpts{ |
||||
Namespace: namespace, |
||||
Subsystem: subsystem, |
||||
Name: "failed_samples_total", |
||||
Help: "Total number of samples which failed on send to remote storage.", |
||||
}, |
||||
[]string{queue}, |
||||
) |
||||
droppedSamplesTotal = prometheus.NewCounterVec( |
||||
prometheus.CounterOpts{ |
||||
Namespace: namespace, |
||||
Subsystem: subsystem, |
||||
Name: "dropped_samples_total", |
||||
Help: "Total number of samples which were dropped due to the queue being full.", |
||||
}, |
||||
[]string{queue}, |
||||
) |
||||
sentBatchDuration = prometheus.NewHistogramVec( |
||||
prometheus.HistogramOpts{ |
||||
Namespace: namespace, |
||||
Subsystem: subsystem, |
||||
Name: "sent_batch_duration_seconds", |
||||
Help: "Duration of sample batch send calls to the remote storage.", |
||||
Buckets: prometheus.DefBuckets, |
||||
}, |
||||
[]string{queue}, |
||||
) |
||||
queueLength = prometheus.NewGaugeVec( |
||||
prometheus.GaugeOpts{ |
||||
Namespace: namespace, |
||||
Subsystem: subsystem, |
||||
Name: "queue_length", |
||||
Help: "The number of processed samples queued to be sent to the remote storage.", |
||||
}, |
||||
[]string{queue}, |
||||
) |
||||
queueCapacity = prometheus.NewGaugeVec( |
||||
prometheus.GaugeOpts{ |
||||
Namespace: namespace, |
||||
Subsystem: subsystem, |
||||
Name: "queue_capacity", |
||||
Help: "The capacity of the queue of samples to be sent to the remote storage.", |
||||
}, |
||||
[]string{queue}, |
||||
) |
||||
numShards = prometheus.NewGaugeVec( |
||||
prometheus.GaugeOpts{ |
||||
Namespace: namespace, |
||||
Subsystem: subsystem, |
||||
Name: "shards", |
||||
Help: "The number of shards used for parallel sending to the remote storage.", |
||||
}, |
||||
[]string{queue}, |
||||
) |
||||
) |
||||
|
||||
func init() { |
||||
prometheus.MustRegister(succeededSamplesTotal) |
||||
prometheus.MustRegister(failedSamplesTotal) |
||||
prometheus.MustRegister(droppedSamplesTotal) |
||||
prometheus.MustRegister(sentBatchDuration) |
||||
prometheus.MustRegister(queueLength) |
||||
prometheus.MustRegister(queueCapacity) |
||||
prometheus.MustRegister(numShards) |
||||
} |
||||
|
||||
// QueueManagerConfig is the configuration for the queue used to write to remote
|
||||
// storage.
|
||||
type QueueManagerConfig struct { |
||||
// Number of samples to buffer per shard before we start dropping them.
|
||||
QueueCapacity int |
||||
// Max number of shards, i.e. amount of concurrency.
|
||||
MaxShards int |
||||
// Maximum number of samples per send.
|
||||
MaxSamplesPerSend int |
||||
// Maximum time sample will wait in buffer.
|
||||
BatchSendDeadline time.Duration |
||||
// Max number of times to retry a batch on recoverable errors.
|
||||
MaxRetries int |
||||
// On recoverable errors, backoff exponentially.
|
||||
MinBackoff time.Duration |
||||
MaxBackoff time.Duration |
||||
} |
||||
|
||||
// defaultQueueManagerConfig is the default remote queue configuration.
|
||||
var defaultQueueManagerConfig = QueueManagerConfig{ |
||||
// With a maximum of 1000 shards, assuming an average of 100ms remote write
|
||||
// time and 100 samples per batch, we will be able to push 1M samples/s.
|
||||
MaxShards: 1000, |
||||
MaxSamplesPerSend: 100, |
||||
|
||||
// By default, buffer 1000 batches, which at 100ms per batch is 1:40mins. At
|
||||
// 1000 shards, this will buffer 100M samples total.
|
||||
QueueCapacity: 100 * 1000, |
||||
BatchSendDeadline: 5 * time.Second, |
||||
|
||||
// Max number of times to retry a batch on recoverable errors.
|
||||
MaxRetries: 10, |
||||
MinBackoff: 30 * time.Millisecond, |
||||
MaxBackoff: 100 * time.Millisecond, |
||||
} |
||||
|
||||
// StorageClient defines an interface for sending a batch of samples to an
|
||||
// external timeseries database.
|
||||
type StorageClient interface { |
||||
// Store stores the given samples in the remote storage.
|
||||
Store(model.Samples) error |
||||
// Name identifies the remote storage implementation.
|
||||
Name() string |
||||
} |
||||
|
||||
// QueueManager manages a queue of samples to be sent to the Storage
|
||||
// indicated by the provided StorageClient.
|
||||
type QueueManager struct { |
||||
cfg QueueManagerConfig |
||||
externalLabels model.LabelSet |
||||
relabelConfigs []*config.RelabelConfig |
||||
client StorageClient |
||||
queueName string |
||||
logLimiter *rate.Limiter |
||||
|
||||
shardsMtx sync.Mutex |
||||
shards *shards |
||||
numShards int |
||||
reshardChan chan int |
||||
quit chan struct{} |
||||
wg sync.WaitGroup |
||||
|
||||
samplesIn, samplesOut, samplesOutDuration *ewmaRate |
||||
integralAccumulator float64 |
||||
} |
||||
|
||||
// NewQueueManager builds a new QueueManager.
|
||||
func NewQueueManager(cfg QueueManagerConfig, externalLabels model.LabelSet, relabelConfigs []*config.RelabelConfig, client StorageClient) *QueueManager { |
||||
t := &QueueManager{ |
||||
cfg: cfg, |
||||
externalLabels: externalLabels, |
||||
relabelConfigs: relabelConfigs, |
||||
client: client, |
||||
queueName: client.Name(), |
||||
|
||||
logLimiter: rate.NewLimiter(logRateLimit, logBurst), |
||||
numShards: 1, |
||||
reshardChan: make(chan int), |
||||
quit: make(chan struct{}), |
||||
|
||||
samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), |
||||
samplesOut: newEWMARate(ewmaWeight, shardUpdateDuration), |
||||
samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration), |
||||
} |
||||
t.shards = t.newShards(t.numShards) |
||||
numShards.WithLabelValues(t.queueName).Set(float64(t.numShards)) |
||||
queueCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.QueueCapacity)) |
||||
|
||||
return t |
||||
} |
||||
|
||||
// Append queues a sample to be sent to the remote storage. It drops the
|
||||
// sample on the floor if the queue is full.
|
||||
// Always returns nil.
|
||||
func (t *QueueManager) Append(s *model.Sample) error { |
||||
var snew model.Sample |
||||
snew = *s |
||||
snew.Metric = s.Metric.Clone() |
||||
|
||||
for ln, lv := range t.externalLabels { |
||||
if _, ok := s.Metric[ln]; !ok { |
||||
snew.Metric[ln] = lv |
||||
} |
||||
} |
||||
|
||||
snew.Metric = model.Metric( |
||||
relabel.Process(model.LabelSet(snew.Metric), t.relabelConfigs...)) |
||||
|
||||
if snew.Metric == nil { |
||||
return nil |
||||
} |
||||
|
||||
t.shardsMtx.Lock() |
||||
enqueued := t.shards.enqueue(&snew) |
||||
t.shardsMtx.Unlock() |
||||
|
||||
if enqueued { |
||||
queueLength.WithLabelValues(t.queueName).Inc() |
||||
} else { |
||||
droppedSamplesTotal.WithLabelValues(t.queueName).Inc() |
||||
if t.logLimiter.Allow() { |
||||
log.Warn("Remote storage queue full, discarding sample. Multiple subsequent messages of this kind may be suppressed.") |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// NeedsThrottling implements storage.SampleAppender. It will always return
|
||||
// false as a remote storage drops samples on the floor if backlogging instead
|
||||
// of asking for throttling.
|
||||
func (*QueueManager) NeedsThrottling() bool { |
||||
return false |
||||
} |
||||
|
||||
// Start the queue manager sending samples to the remote storage.
|
||||
// Does not block.
|
||||
func (t *QueueManager) Start() { |
||||
t.wg.Add(2) |
||||
go t.updateShardsLoop() |
||||
go t.reshardLoop() |
||||
|
||||
t.shardsMtx.Lock() |
||||
defer t.shardsMtx.Unlock() |
||||
t.shards.start() |
||||
} |
||||
|
||||
// Stop stops sending samples to the remote storage and waits for pending
|
||||
// sends to complete.
|
||||
func (t *QueueManager) Stop() { |
||||
log.Infof("Stopping remote storage...") |
||||
close(t.quit) |
||||
t.wg.Wait() |
||||
|
||||
t.shardsMtx.Lock() |
||||
defer t.shardsMtx.Unlock() |
||||
t.shards.stop() |
||||
log.Info("Remote storage stopped.") |
||||
} |
||||
|
||||
func (t *QueueManager) updateShardsLoop() { |
||||
defer t.wg.Done() |
||||
|
||||
ticker := time.Tick(shardUpdateDuration) |
||||
for { |
||||
select { |
||||
case <-ticker: |
||||
t.calculateDesiredShards() |
||||
case <-t.quit: |
||||
return |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (t *QueueManager) calculateDesiredShards() { |
||||
t.samplesIn.tick() |
||||
t.samplesOut.tick() |
||||
t.samplesOutDuration.tick() |
||||
|
||||
// We use the number of incoming samples as a prediction of how much work we
|
||||
// will need to do next iteration. We add to this any pending samples
|
||||
// (received - send) so we can catch up with any backlog. We use the average
|
||||
// outgoing batch latency to work out how many shards we need.
|
||||
var ( |
||||
samplesIn = t.samplesIn.rate() |
||||
samplesOut = t.samplesOut.rate() |
||||
samplesPending = samplesIn - samplesOut |
||||
samplesOutDuration = t.samplesOutDuration.rate() |
||||
) |
||||
|
||||
// We use an integral accumulator, like in a PID, to help dampen oscillation.
|
||||
t.integralAccumulator = t.integralAccumulator + (samplesPending * 0.1) |
||||
|
||||
if samplesOut <= 0 { |
||||
return |
||||
} |
||||
|
||||
var ( |
||||
timePerSample = samplesOutDuration / samplesOut |
||||
desiredShards = (timePerSample * (samplesIn + samplesPending + t.integralAccumulator)) / float64(time.Second) |
||||
) |
||||
log.Debugf("QueueManager.caclulateDesiredShards samplesIn=%f, samplesOut=%f, samplesPending=%f, desiredShards=%f", |
||||
samplesIn, samplesOut, samplesPending, desiredShards) |
||||
|
||||
// Changes in the number of shards must be greater than shardToleranceFraction.
|
||||
var ( |
||||
lowerBound = float64(t.numShards) * (1. - shardToleranceFraction) |
||||
upperBound = float64(t.numShards) * (1. + shardToleranceFraction) |
||||
) |
||||
log.Debugf("QueueManager.updateShardsLoop %f <= %f <= %f", lowerBound, desiredShards, upperBound) |
||||
if lowerBound <= desiredShards && desiredShards <= upperBound { |
||||
return |
||||
} |
||||
|
||||
numShards := int(math.Ceil(desiredShards)) |
||||
if numShards > t.cfg.MaxShards { |
||||
numShards = t.cfg.MaxShards |
||||
} |
||||
if numShards == t.numShards { |
||||
return |
||||
} |
||||
|
||||
// Resharding can take some time, and we want this loop
|
||||
// to stay close to shardUpdateDuration.
|
||||
select { |
||||
case t.reshardChan <- numShards: |
||||
log.Infof("Remote storage resharding from %d to %d shards.", t.numShards, numShards) |
||||
t.numShards = numShards |
||||
default: |
||||
log.Infof("Currently resharding, skipping.") |
||||
} |
||||
} |
||||
|
||||
func (t *QueueManager) reshardLoop() { |
||||
defer t.wg.Done() |
||||
|
||||
for { |
||||
select { |
||||
case numShards := <-t.reshardChan: |
||||
t.reshard(numShards) |
||||
case <-t.quit: |
||||
return |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (t *QueueManager) reshard(n int) { |
||||
numShards.WithLabelValues(t.queueName).Set(float64(n)) |
||||
|
||||
t.shardsMtx.Lock() |
||||
newShards := t.newShards(n) |
||||
oldShards := t.shards |
||||
t.shards = newShards |
||||
t.shardsMtx.Unlock() |
||||
|
||||
oldShards.stop() |
||||
|
||||
// We start the newShards after we have stopped (the therefore completely
|
||||
// flushed) the oldShards, to guarantee we only every deliver samples in
|
||||
// order.
|
||||
newShards.start() |
||||
} |
||||
|
||||
type shards struct { |
||||
qm *QueueManager |
||||
queues []chan *model.Sample |
||||
done chan struct{} |
||||
wg sync.WaitGroup |
||||
} |
||||
|
||||
func (t *QueueManager) newShards(numShards int) *shards { |
||||
queues := make([]chan *model.Sample, numShards) |
||||
for i := 0; i < numShards; i++ { |
||||
queues[i] = make(chan *model.Sample, t.cfg.QueueCapacity) |
||||
} |
||||
s := &shards{ |
||||
qm: t, |
||||
queues: queues, |
||||
done: make(chan struct{}), |
||||
} |
||||
s.wg.Add(numShards) |
||||
return s |
||||
} |
||||
|
||||
func (s *shards) len() int { |
||||
return len(s.queues) |
||||
} |
||||
|
||||
func (s *shards) start() { |
||||
for i := 0; i < len(s.queues); i++ { |
||||
go s.runShard(i) |
||||
} |
||||
} |
||||
|
||||
func (s *shards) stop() { |
||||
for _, shard := range s.queues { |
||||
close(shard) |
||||
} |
||||
s.wg.Wait() |
||||
} |
||||
|
||||
func (s *shards) enqueue(sample *model.Sample) bool { |
||||
s.qm.samplesIn.incr(1) |
||||
|
||||
fp := sample.Metric.FastFingerprint() |
||||
shard := uint64(fp) % uint64(len(s.queues)) |
||||
|
||||
select { |
||||
case s.queues[shard] <- sample: |
||||
return true |
||||
default: |
||||
return false |
||||
} |
||||
} |
||||
|
||||
func (s *shards) runShard(i int) { |
||||
defer s.wg.Done() |
||||
queue := s.queues[i] |
||||
|
||||
// Send batches of at most MaxSamplesPerSend samples to the remote storage.
|
||||
// If we have fewer samples than that, flush them out after a deadline
|
||||
// anyways.
|
||||
pendingSamples := model.Samples{} |
||||
|
||||
for { |
||||
select { |
||||
case sample, ok := <-queue: |
||||
if !ok { |
||||
if len(pendingSamples) > 0 { |
||||
log.Debugf("Flushing %d samples to remote storage...", len(pendingSamples)) |
||||
s.sendSamples(pendingSamples) |
||||
log.Debugf("Done flushing.") |
||||
} |
||||
return |
||||
} |
||||
|
||||
queueLength.WithLabelValues(s.qm.queueName).Dec() |
||||
pendingSamples = append(pendingSamples, sample) |
||||
|
||||
for len(pendingSamples) >= s.qm.cfg.MaxSamplesPerSend { |
||||
s.sendSamples(pendingSamples[:s.qm.cfg.MaxSamplesPerSend]) |
||||
pendingSamples = pendingSamples[s.qm.cfg.MaxSamplesPerSend:] |
||||
} |
||||
case <-time.After(s.qm.cfg.BatchSendDeadline): |
||||
if len(pendingSamples) > 0 { |
||||
s.sendSamples(pendingSamples) |
||||
pendingSamples = pendingSamples[:0] |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (s *shards) sendSamples(samples model.Samples) { |
||||
begin := time.Now() |
||||
s.sendSamplesWithBackoff(samples) |
||||
|
||||
// These counters are used to caclulate the dynamic sharding, and as such
|
||||
// should be maintained irrespective of success or failure.
|
||||
s.qm.samplesOut.incr(int64(len(samples))) |
||||
s.qm.samplesOutDuration.incr(int64(time.Since(begin))) |
||||
} |
||||
|
||||
// sendSamples to the remote storage with backoff for recoverable errors.
|
||||
func (s *shards) sendSamplesWithBackoff(samples model.Samples) { |
||||
backoff := s.qm.cfg.MinBackoff |
||||
for retries := s.qm.cfg.MaxRetries; retries > 0; retries-- { |
||||
begin := time.Now() |
||||
err := s.qm.client.Store(samples) |
||||
|
||||
sentBatchDuration.WithLabelValues(s.qm.queueName).Observe(time.Since(begin).Seconds()) |
||||
if err == nil { |
||||
succeededSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples))) |
||||
return |
||||
} |
||||
|
||||
log.Warnf("Error sending %d samples to remote storage: %s", len(samples), err) |
||||
if _, ok := err.(recoverableError); !ok { |
||||
break |
||||
} |
||||
time.Sleep(backoff) |
||||
backoff = backoff * 2 |
||||
if backoff > s.qm.cfg.MaxBackoff { |
||||
backoff = s.qm.cfg.MaxBackoff |
||||
} |
||||
} |
||||
|
||||
failedSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples))) |
||||
} |
||||
@ -0,0 +1,253 @@ |
||||
// Copyright 2013 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package remote |
||||
|
||||
import ( |
||||
"fmt" |
||||
"sync" |
||||
"sync/atomic" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/prometheus/common/model" |
||||
) |
||||
|
||||
type TestStorageClient struct { |
||||
receivedSamples map[string]model.Samples |
||||
expectedSamples map[string]model.Samples |
||||
wg sync.WaitGroup |
||||
mtx sync.Mutex |
||||
} |
||||
|
||||
func NewTestStorageClient() *TestStorageClient { |
||||
return &TestStorageClient{ |
||||
receivedSamples: map[string]model.Samples{}, |
||||
expectedSamples: map[string]model.Samples{}, |
||||
} |
||||
} |
||||
|
||||
func (c *TestStorageClient) expectSamples(ss model.Samples) { |
||||
c.mtx.Lock() |
||||
defer c.mtx.Unlock() |
||||
|
||||
for _, s := range ss { |
||||
ts := s.Metric.String() |
||||
c.expectedSamples[ts] = append(c.expectedSamples[ts], s) |
||||
} |
||||
c.wg.Add(len(ss)) |
||||
} |
||||
|
||||
func (c *TestStorageClient) waitForExpectedSamples(t *testing.T) { |
||||
c.wg.Wait() |
||||
|
||||
c.mtx.Lock() |
||||
defer c.mtx.Unlock() |
||||
for ts, expectedSamples := range c.expectedSamples { |
||||
for i, expected := range expectedSamples { |
||||
if !expected.Equal(c.receivedSamples[ts][i]) { |
||||
t.Fatalf("%d. Expected %v, got %v", i, expected, c.receivedSamples[ts][i]) |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (c *TestStorageClient) Store(ss model.Samples) error { |
||||
c.mtx.Lock() |
||||
defer c.mtx.Unlock() |
||||
|
||||
for _, s := range ss { |
||||
ts := s.Metric.String() |
||||
c.receivedSamples[ts] = append(c.receivedSamples[ts], s) |
||||
} |
||||
c.wg.Add(-len(ss)) |
||||
return nil |
||||
} |
||||
|
||||
func (c *TestStorageClient) Name() string { |
||||
return "teststorageclient" |
||||
} |
||||
|
||||
func TestSampleDelivery(t *testing.T) { |
||||
// Let's create an even number of send batches so we don't run into the
|
||||
// batch timeout case.
|
||||
n := defaultQueueManagerConfig.QueueCapacity * 2 |
||||
|
||||
samples := make(model.Samples, 0, n) |
||||
for i := 0; i < n; i++ { |
||||
name := model.LabelValue(fmt.Sprintf("test_metric_%d", i)) |
||||
samples = append(samples, &model.Sample{ |
||||
Metric: model.Metric{ |
||||
model.MetricNameLabel: name, |
||||
}, |
||||
Value: model.SampleValue(i), |
||||
}) |
||||
} |
||||
|
||||
c := NewTestStorageClient() |
||||
c.expectSamples(samples[:len(samples)/2]) |
||||
|
||||
cfg := defaultQueueManagerConfig |
||||
cfg.MaxShards = 1 |
||||
m := NewQueueManager(cfg, nil, nil, c) |
||||
|
||||
// These should be received by the client.
|
||||
for _, s := range samples[:len(samples)/2] { |
||||
m.Append(s) |
||||
} |
||||
// These will be dropped because the queue is full.
|
||||
for _, s := range samples[len(samples)/2:] { |
||||
m.Append(s) |
||||
} |
||||
m.Start() |
||||
defer m.Stop() |
||||
|
||||
c.waitForExpectedSamples(t) |
||||
} |
||||
|
||||
func TestSampleDeliveryOrder(t *testing.T) { |
||||
ts := 10 |
||||
n := defaultQueueManagerConfig.MaxSamplesPerSend * ts |
||||
|
||||
samples := make(model.Samples, 0, n) |
||||
for i := 0; i < n; i++ { |
||||
name := model.LabelValue(fmt.Sprintf("test_metric_%d", i%ts)) |
||||
samples = append(samples, &model.Sample{ |
||||
Metric: model.Metric{ |
||||
model.MetricNameLabel: name, |
||||
}, |
||||
Value: model.SampleValue(i), |
||||
Timestamp: model.Time(i), |
||||
}) |
||||
} |
||||
|
||||
c := NewTestStorageClient() |
||||
c.expectSamples(samples) |
||||
m := NewQueueManager(defaultQueueManagerConfig, nil, nil, c) |
||||
|
||||
// These should be received by the client.
|
||||
for _, s := range samples { |
||||
m.Append(s) |
||||
} |
||||
m.Start() |
||||
defer m.Stop() |
||||
|
||||
c.waitForExpectedSamples(t) |
||||
} |
||||
|
||||
// TestBlockingStorageClient is a queue_manager StorageClient which will block
|
||||
// on any calls to Store(), until the `block` channel is closed, at which point
|
||||
// the `numCalls` property will contain a count of how many times Store() was
|
||||
// called.
|
||||
type TestBlockingStorageClient struct { |
||||
numCalls uint64 |
||||
block chan bool |
||||
} |
||||
|
||||
func NewTestBlockedStorageClient() *TestBlockingStorageClient { |
||||
return &TestBlockingStorageClient{ |
||||
block: make(chan bool), |
||||
numCalls: 0, |
||||
} |
||||
} |
||||
|
||||
func (c *TestBlockingStorageClient) Store(s model.Samples) error { |
||||
atomic.AddUint64(&c.numCalls, 1) |
||||
<-c.block |
||||
return nil |
||||
} |
||||
|
||||
func (c *TestBlockingStorageClient) NumCalls() uint64 { |
||||
return atomic.LoadUint64(&c.numCalls) |
||||
} |
||||
|
||||
func (c *TestBlockingStorageClient) unlock() { |
||||
close(c.block) |
||||
} |
||||
|
||||
func (c *TestBlockingStorageClient) Name() string { |
||||
return "testblockingstorageclient" |
||||
} |
||||
|
||||
func (t *QueueManager) queueLen() int { |
||||
t.shardsMtx.Lock() |
||||
defer t.shardsMtx.Unlock() |
||||
queueLength := 0 |
||||
for _, shard := range t.shards.queues { |
||||
queueLength += len(shard) |
||||
} |
||||
return queueLength |
||||
} |
||||
|
||||
func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { |
||||
// Our goal is to fully empty the queue:
|
||||
// `MaxSamplesPerSend*Shards` samples should be consumed by the
|
||||
// per-shard goroutines, and then another `MaxSamplesPerSend`
|
||||
// should be left on the queue.
|
||||
n := defaultQueueManagerConfig.MaxSamplesPerSend * 2 |
||||
|
||||
samples := make(model.Samples, 0, n) |
||||
for i := 0; i < n; i++ { |
||||
name := model.LabelValue(fmt.Sprintf("test_metric_%d", i)) |
||||
samples = append(samples, &model.Sample{ |
||||
Metric: model.Metric{ |
||||
model.MetricNameLabel: name, |
||||
}, |
||||
Value: model.SampleValue(i), |
||||
}) |
||||
} |
||||
|
||||
c := NewTestBlockedStorageClient() |
||||
cfg := defaultQueueManagerConfig |
||||
cfg.MaxShards = 1 |
||||
cfg.QueueCapacity = n |
||||
m := NewQueueManager(cfg, nil, nil, c) |
||||
|
||||
m.Start() |
||||
|
||||
defer func() { |
||||
c.unlock() |
||||
m.Stop() |
||||
}() |
||||
|
||||
for _, s := range samples { |
||||
m.Append(s) |
||||
} |
||||
|
||||
// Wait until the runShard() loops drain the queue. If things went right, it
|
||||
// should then immediately block in sendSamples(), but, in case of error,
|
||||
// it would spawn too many goroutines, and thus we'd see more calls to
|
||||
// client.Store()
|
||||
//
|
||||
// The timed wait is maybe non-ideal, but, in order to verify that we're
|
||||
// not spawning too many concurrent goroutines, we have to wait on the
|
||||
// Run() loop to consume a specific number of elements from the
|
||||
// queue... and it doesn't signal that in any obvious way, except by
|
||||
// draining the queue. We cap the waiting at 1 second -- that should give
|
||||
// plenty of time, and keeps the failure fairly quick if we're not draining
|
||||
// the queue properly.
|
||||
for i := 0; i < 100 && m.queueLen() > 0; i++ { |
||||
time.Sleep(10 * time.Millisecond) |
||||
} |
||||
|
||||
if m.queueLen() != defaultQueueManagerConfig.MaxSamplesPerSend { |
||||
t.Fatalf("Failed to drain QueueManager queue, %d elements left", |
||||
m.queueLen(), |
||||
) |
||||
} |
||||
|
||||
numCalls := c.NumCalls() |
||||
if numCalls != uint64(1) { |
||||
t.Errorf("Saw %d concurrent sends, expected 1", numCalls) |
||||
} |
||||
} |
||||
@ -0,0 +1,256 @@ |
||||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package remote |
||||
|
||||
import ( |
||||
"context" |
||||
"sort" |
||||
|
||||
"github.com/prometheus/common/model" |
||||
|
||||
"github.com/prometheus/prometheus/pkg/labels" |
||||
"github.com/prometheus/prometheus/storage" |
||||
) |
||||
|
||||
// Querier returns a new Querier on the storage.
|
||||
func (r *Storage) Querier(mint, maxt int64) (storage.Querier, error) { |
||||
r.mtx.Lock() |
||||
defer r.mtx.Unlock() |
||||
|
||||
queriers := make([]storage.Querier, 0, len(r.clients)) |
||||
for _, c := range r.clients { |
||||
queriers = append(queriers, &querier{ |
||||
mint: mint, |
||||
maxt: maxt, |
||||
client: c, |
||||
externalLabels: r.externalLabels, |
||||
}) |
||||
} |
||||
return storage.NewMergeQuerier(queriers), nil |
||||
} |
||||
|
||||
// Querier is an adapter to make a Client usable as a storage.Querier.
|
||||
type querier struct { |
||||
mint, maxt int64 |
||||
client *Client |
||||
externalLabels model.LabelSet |
||||
} |
||||
|
||||
// Select returns a set of series that matches the given label matchers.
|
||||
func (q *querier) Select(matchers ...*labels.Matcher) storage.SeriesSet { |
||||
m, added := q.addExternalLabels(matchers) |
||||
|
||||
res, err := q.client.Read(context.TODO(), q.mint, q.maxt, labelMatchersToProto(m)) |
||||
if err != nil { |
||||
return errSeriesSet{err: err} |
||||
} |
||||
|
||||
series := make([]storage.Series, 0, len(res)) |
||||
for _, ts := range res { |
||||
labels := labelPairsToLabels(ts.Labels) |
||||
removeLabels(labels, added) |
||||
series = append(series, &concreteSeries{ |
||||
labels: labels, |
||||
samples: ts.Samples, |
||||
}) |
||||
} |
||||
sort.Sort(byLabel(series)) |
||||
return &concreteSeriesSet{ |
||||
series: series, |
||||
} |
||||
} |
||||
|
||||
func labelMatchersToProto(matchers []*labels.Matcher) []*LabelMatcher { |
||||
pbMatchers := make([]*LabelMatcher, 0, len(matchers)) |
||||
for _, m := range matchers { |
||||
var mType MatchType |
||||
switch m.Type { |
||||
case labels.MatchEqual: |
||||
mType = MatchType_EQUAL |
||||
case labels.MatchNotEqual: |
||||
mType = MatchType_NOT_EQUAL |
||||
case labels.MatchRegexp: |
||||
mType = MatchType_REGEX_MATCH |
||||
case labels.MatchNotRegexp: |
||||
mType = MatchType_REGEX_NO_MATCH |
||||
default: |
||||
panic("invalid matcher type") |
||||
} |
||||
pbMatchers = append(pbMatchers, &LabelMatcher{ |
||||
Type: mType, |
||||
Name: string(m.Name), |
||||
Value: string(m.Value), |
||||
}) |
||||
} |
||||
return pbMatchers |
||||
} |
||||
|
||||
func labelPairsToLabels(labelPairs []*LabelPair) labels.Labels { |
||||
result := make(labels.Labels, 0, len(labelPairs)) |
||||
for _, l := range labelPairs { |
||||
result = append(result, labels.Label{ |
||||
Name: l.Name, |
||||
Value: l.Value, |
||||
}) |
||||
} |
||||
sort.Sort(result) |
||||
return result |
||||
} |
||||
|
||||
type byLabel []storage.Series |
||||
|
||||
func (a byLabel) Len() int { return len(a) } |
||||
func (a byLabel) Swap(i, j int) { a[i], a[j] = a[j], a[i] } |
||||
func (a byLabel) Less(i, j int) bool { return labels.Compare(a[i].Labels(), a[j].Labels()) < 0 } |
||||
|
||||
// LabelValues returns all potential values for a label name.
|
||||
func (q *querier) LabelValues(name string) ([]string, error) { |
||||
// TODO implement?
|
||||
return nil, nil |
||||
} |
||||
|
||||
// Close releases the resources of the Querier.
|
||||
func (q *querier) Close() error { |
||||
return nil |
||||
} |
||||
|
||||
// errSeriesSet implements storage.SeriesSet, just returning an error.
|
||||
type errSeriesSet struct { |
||||
err error |
||||
} |
||||
|
||||
func (errSeriesSet) Next() bool { |
||||
return false |
||||
} |
||||
|
||||
func (errSeriesSet) At() storage.Series { |
||||
return nil |
||||
} |
||||
|
||||
func (e errSeriesSet) Err() error { |
||||
return e.err |
||||
} |
||||
|
||||
// concreteSeriesSet implements storage.SeriesSet.
|
||||
type concreteSeriesSet struct { |
||||
cur int |
||||
series []storage.Series |
||||
} |
||||
|
||||
func (c *concreteSeriesSet) Next() bool { |
||||
c.cur++ |
||||
return c.cur < len(c.series) |
||||
} |
||||
|
||||
func (c *concreteSeriesSet) At() storage.Series { |
||||
return c.series[c.cur] |
||||
} |
||||
|
||||
func (c *concreteSeriesSet) Err() error { |
||||
return nil |
||||
} |
||||
|
||||
// concreteSeries implementes storage.Series.
|
||||
type concreteSeries struct { |
||||
labels labels.Labels |
||||
samples []*Sample |
||||
} |
||||
|
||||
func (c *concreteSeries) Labels() labels.Labels { |
||||
return c.labels |
||||
} |
||||
|
||||
func (c *concreteSeries) Iterator() storage.SeriesIterator { |
||||
return &concreteSeriesIterator{ |
||||
series: c, |
||||
} |
||||
} |
||||
|
||||
// concreteSeriesIterator implements storage.SeriesIterator.
|
||||
type concreteSeriesIterator struct { |
||||
cur int |
||||
series *concreteSeries |
||||
} |
||||
|
||||
func (c *concreteSeriesIterator) Seek(t int64) bool { |
||||
c.cur = sort.Search(len(c.series.samples), func(n int) bool { |
||||
return c.series.samples[c.cur].TimestampMs > t |
||||
}) |
||||
return c.cur == 0 |
||||
} |
||||
|
||||
func (c *concreteSeriesIterator) At() (t int64, v float64) { |
||||
s := c.series.samples[c.cur] |
||||
return s.TimestampMs, s.Value |
||||
} |
||||
|
||||
func (c *concreteSeriesIterator) Next() bool { |
||||
c.cur++ |
||||
return c.cur < len(c.series.samples) |
||||
} |
||||
|
||||
func (c *concreteSeriesIterator) Err() error { |
||||
return nil |
||||
} |
||||
|
||||
// addExternalLabels adds matchers for each external label. External labels
|
||||
// that already have a corresponding user-supplied matcher are skipped, as we
|
||||
// assume that the user explicitly wants to select a different value for them.
|
||||
// We return the new set of matchers, along with a map of labels for which
|
||||
// matchers were added, so that these can later be removed from the result
|
||||
// time series again.
|
||||
func (q *querier) addExternalLabels(matchers []*labels.Matcher) ([]*labels.Matcher, model.LabelSet) { |
||||
el := make(model.LabelSet, len(q.externalLabels)) |
||||
for k, v := range q.externalLabels { |
||||
el[k] = v |
||||
} |
||||
for _, m := range matchers { |
||||
if _, ok := el[model.LabelName(m.Name)]; ok { |
||||
delete(el, model.LabelName(m.Name)) |
||||
} |
||||
} |
||||
for k, v := range el { |
||||
m, err := labels.NewMatcher(labels.MatchEqual, string(k), string(v)) |
||||
if err != nil { |
||||
panic(err) |
||||
} |
||||
matchers = append(matchers, m) |
||||
} |
||||
return matchers, el |
||||
} |
||||
|
||||
func removeLabels(l labels.Labels, toDelete model.LabelSet) { |
||||
for i := 0; i < len(l); { |
||||
if _, ok := toDelete[model.LabelName(l[i].Name)]; ok { |
||||
l = l[:i+copy(l[i:], l[i+1:])] |
||||
} else { |
||||
i++ |
||||
} |
||||
} |
||||
} |
||||
|
||||
//// MatrixToIterators returns series iterators for a given matrix.
|
||||
//func MatrixToIterators(m model.Matrix, err error) ([]local.SeriesIterator, error) {
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
//
|
||||
// its := make([]local.SeriesIterator, 0, len(m))
|
||||
// for _, ss := range m {
|
||||
// its = append(its, sampleStreamIterator{
|
||||
// ss: ss,
|
||||
// })
|
||||
// }
|
||||
// return its, nil
|
||||
//}
|
||||
@ -0,0 +1,94 @@ |
||||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package remote |
||||
|
||||
import ( |
||||
"reflect" |
||||
"sort" |
||||
"testing" |
||||
|
||||
"github.com/prometheus/common/model" |
||||
"github.com/prometheus/prometheus/pkg/labels" |
||||
) |
||||
|
||||
func mustNewLabelMatcher(mt labels.MatchType, name, val string) *labels.Matcher { |
||||
m, err := labels.NewMatcher(mt, name, val) |
||||
if err != nil { |
||||
panic(err) |
||||
} |
||||
return m |
||||
} |
||||
|
||||
func TestAddExternalLabels(t *testing.T) { |
||||
tests := []struct { |
||||
el model.LabelSet |
||||
inMatchers []*labels.Matcher |
||||
outMatchers []*labels.Matcher |
||||
added model.LabelSet |
||||
}{ |
||||
{ |
||||
el: model.LabelSet{}, |
||||
inMatchers: []*labels.Matcher{ |
||||
mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"), |
||||
}, |
||||
outMatchers: []*labels.Matcher{ |
||||
mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"), |
||||
}, |
||||
added: model.LabelSet{}, |
||||
}, |
||||
{ |
||||
el: model.LabelSet{"region": "europe", "dc": "berlin-01"}, |
||||
inMatchers: []*labels.Matcher{ |
||||
mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"), |
||||
}, |
||||
outMatchers: []*labels.Matcher{ |
||||
mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"), |
||||
mustNewLabelMatcher(labels.MatchEqual, "region", "europe"), |
||||
mustNewLabelMatcher(labels.MatchEqual, "dc", "berlin-01"), |
||||
}, |
||||
added: model.LabelSet{"region": "europe", "dc": "berlin-01"}, |
||||
}, |
||||
{ |
||||
el: model.LabelSet{"region": "europe", "dc": "berlin-01"}, |
||||
inMatchers: []*labels.Matcher{ |
||||
mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"), |
||||
mustNewLabelMatcher(labels.MatchEqual, "dc", "munich-02"), |
||||
}, |
||||
outMatchers: []*labels.Matcher{ |
||||
mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"), |
||||
mustNewLabelMatcher(labels.MatchEqual, "region", "europe"), |
||||
mustNewLabelMatcher(labels.MatchEqual, "dc", "munich-02"), |
||||
}, |
||||
added: model.LabelSet{"region": "europe"}, |
||||
}, |
||||
} |
||||
|
||||
for i, test := range tests { |
||||
q := querier{ |
||||
externalLabels: test.el, |
||||
} |
||||
|
||||
matchers, added := q.addExternalLabels(test.inMatchers) |
||||
|
||||
sort.Slice(test.outMatchers, func(i, j int) bool { return test.outMatchers[i].Name < test.outMatchers[j].Name }) |
||||
sort.Slice(matchers, func(i, j int) bool { return matchers[i].Name < matchers[j].Name }) |
||||
|
||||
if !reflect.DeepEqual(matchers, test.outMatchers) { |
||||
t.Fatalf("%d. unexpected matchers; want %v, got %v", i, test.outMatchers, matchers) |
||||
} |
||||
if !reflect.DeepEqual(added, test.added) { |
||||
t.Fatalf("%d. unexpected added labels; want %v, got %v", i, test.added, added) |
||||
} |
||||
} |
||||
} |
||||
@ -0,0 +1,312 @@ |
||||
// Code generated by protoc-gen-go.
|
||||
// source: remote.proto
|
||||
// DO NOT EDIT!
|
||||
|
||||
/* |
||||
Package remote is a generated protocol buffer package. |
||||
|
||||
It is generated from these files: |
||||
remote.proto |
||||
|
||||
It has these top-level messages: |
||||
Sample |
||||
LabelPair |
||||
TimeSeries |
||||
WriteRequest |
||||
ReadRequest |
||||
ReadResponse |
||||
Query |
||||
LabelMatcher |
||||
QueryResult |
||||
*/ |
||||
package remote |
||||
|
||||
import proto "github.com/golang/protobuf/proto" |
||||
import fmt "fmt" |
||||
import math "math" |
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
var _ = proto.Marshal |
||||
var _ = fmt.Errorf |
||||
var _ = math.Inf |
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the proto package it is being compiled against.
|
||||
// A compilation error at this line likely means your copy of the
|
||||
// proto package needs to be updated.
|
||||
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
|
||||
|
||||
type MatchType int32 |
||||
|
||||
const ( |
||||
MatchType_EQUAL MatchType = 0 |
||||
MatchType_NOT_EQUAL MatchType = 1 |
||||
MatchType_REGEX_MATCH MatchType = 2 |
||||
MatchType_REGEX_NO_MATCH MatchType = 3 |
||||
) |
||||
|
||||
var MatchType_name = map[int32]string{ |
||||
0: "EQUAL", |
||||
1: "NOT_EQUAL", |
||||
2: "REGEX_MATCH", |
||||
3: "REGEX_NO_MATCH", |
||||
} |
||||
var MatchType_value = map[string]int32{ |
||||
"EQUAL": 0, |
||||
"NOT_EQUAL": 1, |
||||
"REGEX_MATCH": 2, |
||||
"REGEX_NO_MATCH": 3, |
||||
} |
||||
|
||||
func (x MatchType) String() string { |
||||
return proto.EnumName(MatchType_name, int32(x)) |
||||
} |
||||
func (MatchType) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } |
||||
|
||||
type Sample struct { |
||||
Value float64 `protobuf:"fixed64,1,opt,name=value" json:"value,omitempty"` |
||||
TimestampMs int64 `protobuf:"varint,2,opt,name=timestamp_ms,json=timestampMs" json:"timestamp_ms,omitempty"` |
||||
} |
||||
|
||||
func (m *Sample) Reset() { *m = Sample{} } |
||||
func (m *Sample) String() string { return proto.CompactTextString(m) } |
||||
func (*Sample) ProtoMessage() {} |
||||
func (*Sample) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } |
||||
|
||||
func (m *Sample) GetValue() float64 { |
||||
if m != nil { |
||||
return m.Value |
||||
} |
||||
return 0 |
||||
} |
||||
|
||||
func (m *Sample) GetTimestampMs() int64 { |
||||
if m != nil { |
||||
return m.TimestampMs |
||||
} |
||||
return 0 |
||||
} |
||||
|
||||
type LabelPair struct { |
||||
Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"` |
||||
Value string `protobuf:"bytes,2,opt,name=value" json:"value,omitempty"` |
||||
} |
||||
|
||||
func (m *LabelPair) Reset() { *m = LabelPair{} } |
||||
func (m *LabelPair) String() string { return proto.CompactTextString(m) } |
||||
func (*LabelPair) ProtoMessage() {} |
||||
func (*LabelPair) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } |
||||
|
||||
func (m *LabelPair) GetName() string { |
||||
if m != nil { |
||||
return m.Name |
||||
} |
||||
return "" |
||||
} |
||||
|
||||
func (m *LabelPair) GetValue() string { |
||||
if m != nil { |
||||
return m.Value |
||||
} |
||||
return "" |
||||
} |
||||
|
||||
type TimeSeries struct { |
||||
Labels []*LabelPair `protobuf:"bytes,1,rep,name=labels" json:"labels,omitempty"` |
||||
// Sorted by time, oldest sample first.
|
||||
Samples []*Sample `protobuf:"bytes,2,rep,name=samples" json:"samples,omitempty"` |
||||
} |
||||
|
||||
func (m *TimeSeries) Reset() { *m = TimeSeries{} } |
||||
func (m *TimeSeries) String() string { return proto.CompactTextString(m) } |
||||
func (*TimeSeries) ProtoMessage() {} |
||||
func (*TimeSeries) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } |
||||
|
||||
func (m *TimeSeries) GetLabels() []*LabelPair { |
||||
if m != nil { |
||||
return m.Labels |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (m *TimeSeries) GetSamples() []*Sample { |
||||
if m != nil { |
||||
return m.Samples |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
type WriteRequest struct { |
||||
Timeseries []*TimeSeries `protobuf:"bytes,1,rep,name=timeseries" json:"timeseries,omitempty"` |
||||
} |
||||
|
||||
func (m *WriteRequest) Reset() { *m = WriteRequest{} } |
||||
func (m *WriteRequest) String() string { return proto.CompactTextString(m) } |
||||
func (*WriteRequest) ProtoMessage() {} |
||||
func (*WriteRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } |
||||
|
||||
func (m *WriteRequest) GetTimeseries() []*TimeSeries { |
||||
if m != nil { |
||||
return m.Timeseries |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
type ReadRequest struct { |
||||
Queries []*Query `protobuf:"bytes,1,rep,name=queries" json:"queries,omitempty"` |
||||
} |
||||
|
||||
func (m *ReadRequest) Reset() { *m = ReadRequest{} } |
||||
func (m *ReadRequest) String() string { return proto.CompactTextString(m) } |
||||
func (*ReadRequest) ProtoMessage() {} |
||||
func (*ReadRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } |
||||
|
||||
func (m *ReadRequest) GetQueries() []*Query { |
||||
if m != nil { |
||||
return m.Queries |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
type ReadResponse struct { |
||||
// In same order as the request's queries.
|
||||
Results []*QueryResult `protobuf:"bytes,1,rep,name=results" json:"results,omitempty"` |
||||
} |
||||
|
||||
func (m *ReadResponse) Reset() { *m = ReadResponse{} } |
||||
func (m *ReadResponse) String() string { return proto.CompactTextString(m) } |
||||
func (*ReadResponse) ProtoMessage() {} |
||||
func (*ReadResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} } |
||||
|
||||
func (m *ReadResponse) GetResults() []*QueryResult { |
||||
if m != nil { |
||||
return m.Results |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
type Query struct { |
||||
StartTimestampMs int64 `protobuf:"varint,1,opt,name=start_timestamp_ms,json=startTimestampMs" json:"start_timestamp_ms,omitempty"` |
||||
EndTimestampMs int64 `protobuf:"varint,2,opt,name=end_timestamp_ms,json=endTimestampMs" json:"end_timestamp_ms,omitempty"` |
||||
Matchers []*LabelMatcher `protobuf:"bytes,3,rep,name=matchers" json:"matchers,omitempty"` |
||||
} |
||||
|
||||
func (m *Query) Reset() { *m = Query{} } |
||||
func (m *Query) String() string { return proto.CompactTextString(m) } |
||||
func (*Query) ProtoMessage() {} |
||||
func (*Query) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} } |
||||
|
||||
func (m *Query) GetStartTimestampMs() int64 { |
||||
if m != nil { |
||||
return m.StartTimestampMs |
||||
} |
||||
return 0 |
||||
} |
||||
|
||||
func (m *Query) GetEndTimestampMs() int64 { |
||||
if m != nil { |
||||
return m.EndTimestampMs |
||||
} |
||||
return 0 |
||||
} |
||||
|
||||
func (m *Query) GetMatchers() []*LabelMatcher { |
||||
if m != nil { |
||||
return m.Matchers |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
type LabelMatcher struct { |
||||
Type MatchType `protobuf:"varint,1,opt,name=type,enum=remote.MatchType" json:"type,omitempty"` |
||||
Name string `protobuf:"bytes,2,opt,name=name" json:"name,omitempty"` |
||||
Value string `protobuf:"bytes,3,opt,name=value" json:"value,omitempty"` |
||||
} |
||||
|
||||
func (m *LabelMatcher) Reset() { *m = LabelMatcher{} } |
||||
func (m *LabelMatcher) String() string { return proto.CompactTextString(m) } |
||||
func (*LabelMatcher) ProtoMessage() {} |
||||
func (*LabelMatcher) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} } |
||||
|
||||
func (m *LabelMatcher) GetType() MatchType { |
||||
if m != nil { |
||||
return m.Type |
||||
} |
||||
return MatchType_EQUAL |
||||
} |
||||
|
||||
func (m *LabelMatcher) GetName() string { |
||||
if m != nil { |
||||
return m.Name |
||||
} |
||||
return "" |
||||
} |
||||
|
||||
func (m *LabelMatcher) GetValue() string { |
||||
if m != nil { |
||||
return m.Value |
||||
} |
||||
return "" |
||||
} |
||||
|
||||
type QueryResult struct { |
||||
Timeseries []*TimeSeries `protobuf:"bytes,1,rep,name=timeseries" json:"timeseries,omitempty"` |
||||
} |
||||
|
||||
func (m *QueryResult) Reset() { *m = QueryResult{} } |
||||
func (m *QueryResult) String() string { return proto.CompactTextString(m) } |
||||
func (*QueryResult) ProtoMessage() {} |
||||
func (*QueryResult) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} } |
||||
|
||||
func (m *QueryResult) GetTimeseries() []*TimeSeries { |
||||
if m != nil { |
||||
return m.Timeseries |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func init() { |
||||
proto.RegisterType((*Sample)(nil), "remote.Sample") |
||||
proto.RegisterType((*LabelPair)(nil), "remote.LabelPair") |
||||
proto.RegisterType((*TimeSeries)(nil), "remote.TimeSeries") |
||||
proto.RegisterType((*WriteRequest)(nil), "remote.WriteRequest") |
||||
proto.RegisterType((*ReadRequest)(nil), "remote.ReadRequest") |
||||
proto.RegisterType((*ReadResponse)(nil), "remote.ReadResponse") |
||||
proto.RegisterType((*Query)(nil), "remote.Query") |
||||
proto.RegisterType((*LabelMatcher)(nil), "remote.LabelMatcher") |
||||
proto.RegisterType((*QueryResult)(nil), "remote.QueryResult") |
||||
proto.RegisterEnum("remote.MatchType", MatchType_name, MatchType_value) |
||||
} |
||||
|
||||
func init() { proto.RegisterFile("remote.proto", fileDescriptor0) } |
||||
|
||||
var fileDescriptor0 = []byte{ |
||||
// 424 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x53, 0xc1, 0x6e, 0xd3, 0x40, |
||||
0x10, 0x65, 0xe3, 0x26, 0xc1, 0x63, 0x37, 0x84, 0xa1, 0x87, 0x1c, 0xc3, 0x4a, 0x08, 0x83, 0xa0, |
||||
0x42, 0x45, 0x70, 0xe3, 0x10, 0x50, 0x04, 0x42, 0x4d, 0x4b, 0xb7, 0x46, 0x70, 0xb3, 0xb6, 0x64, |
||||
0x24, 0x2c, 0x79, 0x13, 0x77, 0x77, 0x8d, 0x94, 0xcf, 0xe0, 0x8f, 0x51, 0x76, 0xb3, 0x8e, 0x23, |
||||
0xe5, 0xc4, 0x2d, 0x33, 0xef, 0xbd, 0x99, 0x97, 0x7d, 0x63, 0x48, 0x35, 0xa9, 0xb5, 0xa5, 0xf3, |
||||
0x5a, 0xaf, 0xed, 0x1a, 0x07, 0xbe, 0xe2, 0x33, 0x18, 0xdc, 0x4a, 0x55, 0x57, 0x84, 0x67, 0xd0, |
||||
0xff, 0x23, 0xab, 0x86, 0x26, 0x6c, 0xca, 0x32, 0x26, 0x7c, 0x81, 0x4f, 0x21, 0xb5, 0xa5, 0x22, |
||||
0x63, 0xa5, 0xaa, 0x0b, 0x65, 0x26, 0xbd, 0x29, 0xcb, 0x22, 0x91, 0xb4, 0xbd, 0x85, 0xe1, 0xef, |
||||
0x20, 0xbe, 0x94, 0x77, 0x54, 0x7d, 0x93, 0xa5, 0x46, 0x84, 0x93, 0x95, 0x54, 0x7e, 0x48, 0x2c, |
||||
0xdc, 0xef, 0xfd, 0xe4, 0x9e, 0x6b, 0xfa, 0x82, 0x4b, 0x80, 0xbc, 0x54, 0x74, 0x4b, 0xba, 0x24, |
||||
0x83, 0x2f, 0x60, 0x50, 0x6d, 0x87, 0x98, 0x09, 0x9b, 0x46, 0x59, 0x72, 0xf1, 0xf8, 0x7c, 0x67, |
||||
0xb7, 0x1d, 0x2d, 0x76, 0x04, 0xcc, 0x60, 0x68, 0x9c, 0xe5, 0xad, 0x9b, 0x2d, 0x77, 0x14, 0xb8, |
||||
0xfe, 0x9f, 0x88, 0x00, 0xf3, 0x8f, 0x90, 0xfe, 0xd0, 0xa5, 0x25, 0x41, 0xf7, 0x0d, 0x19, 0x8b, |
||||
0x17, 0x00, 0xce, 0xb8, 0x5b, 0xb9, 0x5b, 0x84, 0x41, 0xbc, 0x37, 0x23, 0x3a, 0x2c, 0xfe, 0x1e, |
||||
0x12, 0x41, 0x72, 0x19, 0x46, 0x3c, 0x87, 0xe1, 0x7d, 0xd3, 0xd5, 0x9f, 0x06, 0xfd, 0x4d, 0x43, |
||||
0x7a, 0x23, 0x02, 0xca, 0x3f, 0x40, 0xea, 0x75, 0xa6, 0x5e, 0xaf, 0x0c, 0xe1, 0x6b, 0x18, 0x6a, |
||||
0x32, 0x4d, 0x65, 0x83, 0xf0, 0xc9, 0xa1, 0xd0, 0x61, 0x22, 0x70, 0xf8, 0x5f, 0x06, 0x7d, 0x07, |
||||
0xe0, 0x2b, 0x40, 0x63, 0xa5, 0xb6, 0xc5, 0x41, 0x0e, 0xcc, 0xe5, 0x30, 0x76, 0x48, 0xbe, 0x0f, |
||||
0x03, 0x33, 0x18, 0xd3, 0x6a, 0x59, 0x1c, 0xc9, 0x6c, 0x44, 0xab, 0x65, 0x97, 0xf9, 0x06, 0x1e, |
||||
0x2a, 0x69, 0x7f, 0xfd, 0x26, 0x6d, 0x26, 0x91, 0x73, 0x74, 0x76, 0xf0, 0xe6, 0x0b, 0x0f, 0x8a, |
||||
0x96, 0xc5, 0x0b, 0x48, 0xbb, 0x08, 0x3e, 0x83, 0x13, 0xbb, 0xa9, 0x7d, 0xd6, 0xa3, 0x7d, 0x62, |
||||
0x0e, 0xce, 0x37, 0x35, 0x09, 0x07, 0xb7, 0x27, 0xd1, 0x3b, 0x76, 0x12, 0x51, 0xf7, 0x24, 0x66, |
||||
0x90, 0x74, 0x1e, 0xe3, 0x7f, 0xe2, 0x7a, 0xf9, 0x15, 0xe2, 0x76, 0x3f, 0xc6, 0xd0, 0x9f, 0xdf, |
||||
0x7c, 0x9f, 0x5d, 0x8e, 0x1f, 0xe0, 0x29, 0xc4, 0x57, 0xd7, 0x79, 0xe1, 0x4b, 0x86, 0x8f, 0x20, |
||||
0x11, 0xf3, 0xcf, 0xf3, 0x9f, 0xc5, 0x62, 0x96, 0x7f, 0xfa, 0x32, 0xee, 0x21, 0xc2, 0xc8, 0x37, |
||||
0xae, 0xae, 0x77, 0xbd, 0xe8, 0x6e, 0xe0, 0x3e, 0x95, 0xb7, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, |
||||
0x9b, 0x9e, 0x76, 0xb3, 0x3a, 0x03, 0x00, 0x00, |
||||
} |
||||
@ -0,0 +1,68 @@ |
||||
// Copyright 2016 Prometheus Team |
||||
// Licensed under the Apache License, Version 2.0 (the "License"); |
||||
// you may not use this file except in compliance with the License. |
||||
// You may obtain a copy of the License at |
||||
// |
||||
// http://www.apache.org/licenses/LICENSE-2.0 |
||||
// |
||||
// Unless required by applicable law or agreed to in writing, software |
||||
// distributed under the License is distributed on an "AS IS" BASIS, |
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
// See the License for the specific language governing permissions and |
||||
// limitations under the License. |
||||
|
||||
syntax = "proto3"; |
||||
|
||||
package remote; |
||||
|
||||
message Sample { |
||||
double value = 1; |
||||
int64 timestamp_ms = 2; |
||||
} |
||||
|
||||
message LabelPair { |
||||
string name = 1; |
||||
string value = 2; |
||||
} |
||||
|
||||
message TimeSeries { |
||||
repeated LabelPair labels = 1; |
||||
// Sorted by time, oldest sample first. |
||||
repeated Sample samples = 2; |
||||
} |
||||
|
||||
message WriteRequest { |
||||
repeated TimeSeries timeseries = 1; |
||||
} |
||||
|
||||
message ReadRequest { |
||||
repeated Query queries = 1; |
||||
} |
||||
|
||||
message ReadResponse { |
||||
// In same order as the request's queries. |
||||
repeated QueryResult results = 1; |
||||
} |
||||
|
||||
message Query { |
||||
int64 start_timestamp_ms = 1; |
||||
int64 end_timestamp_ms = 2; |
||||
repeated LabelMatcher matchers = 3; |
||||
} |
||||
|
||||
enum MatchType { |
||||
EQUAL = 0; |
||||
NOT_EQUAL = 1; |
||||
REGEX_MATCH = 2; |
||||
REGEX_NO_MATCH = 3; |
||||
} |
||||
|
||||
message LabelMatcher { |
||||
MatchType type = 1; |
||||
string name = 2; |
||||
string value = 3; |
||||
} |
||||
|
||||
message QueryResult { |
||||
repeated TimeSeries timeseries = 1; |
||||
} |
||||
@ -0,0 +1,101 @@ |
||||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.package remote
|
||||
|
||||
package remote |
||||
|
||||
import ( |
||||
"sync" |
||||
|
||||
"github.com/prometheus/common/model" |
||||
"github.com/prometheus/prometheus/config" |
||||
) |
||||
|
||||
type Storage struct { |
||||
mtx sync.RWMutex |
||||
|
||||
// For writes
|
||||
queues []*QueueManager |
||||
|
||||
// For reads
|
||||
clients []*Client |
||||
externalLabels model.LabelSet |
||||
} |
||||
|
||||
// ApplyConfig updates the state as the new config requires.
|
||||
func (s *Storage) ApplyConfig(conf *config.Config) error { |
||||
s.mtx.Lock() |
||||
defer s.mtx.Unlock() |
||||
|
||||
// Update write queues
|
||||
|
||||
newQueues := []*QueueManager{} |
||||
// TODO: we should only stop & recreate queues which have changes,
|
||||
// as this can be quite disruptive.
|
||||
for i, rwConf := range conf.RemoteWriteConfigs { |
||||
c, err := NewClient(i, &clientConfig{ |
||||
url: rwConf.URL, |
||||
timeout: rwConf.RemoteTimeout, |
||||
httpClientConfig: rwConf.HTTPClientConfig, |
||||
}) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
newQueues = append(newQueues, NewQueueManager( |
||||
defaultQueueManagerConfig, |
||||
conf.GlobalConfig.ExternalLabels, |
||||
rwConf.WriteRelabelConfigs, |
||||
c, |
||||
)) |
||||
} |
||||
|
||||
for _, q := range s.queues { |
||||
q.Stop() |
||||
} |
||||
|
||||
s.queues = newQueues |
||||
for _, q := range s.queues { |
||||
q.Start() |
||||
} |
||||
|
||||
// Update read clients
|
||||
|
||||
clients := []*Client{} |
||||
for i, rrConf := range conf.RemoteReadConfigs { |
||||
c, err := NewClient(i, &clientConfig{ |
||||
url: rrConf.URL, |
||||
timeout: rrConf.RemoteTimeout, |
||||
httpClientConfig: rrConf.HTTPClientConfig, |
||||
}) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
clients = append(clients, c) |
||||
} |
||||
|
||||
s.clients = clients |
||||
s.externalLabels = conf.GlobalConfig.ExternalLabels |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// Stop the background processing of the storage queues.
|
||||
func (s *Storage) Close() error { |
||||
s.mtx.Lock() |
||||
defer s.mtx.Unlock() |
||||
|
||||
for _, q := range s.queues { |
||||
q.Stop() |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
@ -0,0 +1,57 @@ |
||||
// Copyright 2017 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package remote |
||||
|
||||
import ( |
||||
"github.com/prometheus/common/model" |
||||
"github.com/prometheus/prometheus/pkg/labels" |
||||
"github.com/prometheus/prometheus/storage" |
||||
) |
||||
|
||||
func (s *Storage) Appender() (storage.Appender, error) { |
||||
return s, nil |
||||
} |
||||
|
||||
func (s *Storage) Add(l labels.Labels, t int64, v float64) (string, error) { |
||||
s.mtx.RLock() |
||||
defer s.mtx.RUnlock() |
||||
for _, q := range s.queues { |
||||
q.Append(&model.Sample{ |
||||
Metric: labelsToMetric(l), |
||||
Timestamp: model.Time(t), |
||||
Value: model.SampleValue(v), |
||||
}) |
||||
} |
||||
return "", nil |
||||
} |
||||
|
||||
func labelsToMetric(ls labels.Labels) model.Metric { |
||||
metric := make(model.Metric, len(ls)) |
||||
for _, l := range ls { |
||||
metric[model.LabelName(l.Name)] = model.LabelValue(l.Value) |
||||
} |
||||
return metric |
||||
} |
||||
|
||||
func (*Storage) AddFast(ref string, t int64, v float64) error { |
||||
return storage.ErrNotFound |
||||
} |
||||
|
||||
func (*Storage) Commit() error { |
||||
return nil |
||||
} |
||||
|
||||
func (*Storage) Rollback() error { |
||||
return nil |
||||
} |
||||
@ -0,0 +1,27 @@ |
||||
Copyright (c) 2009 The Go Authors. All rights reserved. |
||||
|
||||
Redistribution and use in source and binary forms, with or without |
||||
modification, are permitted provided that the following conditions are |
||||
met: |
||||
|
||||
* Redistributions of source code must retain the above copyright |
||||
notice, this list of conditions and the following disclaimer. |
||||
* Redistributions in binary form must reproduce the above |
||||
copyright notice, this list of conditions and the following disclaimer |
||||
in the documentation and/or other materials provided with the |
||||
distribution. |
||||
* Neither the name of Google Inc. nor the names of its |
||||
contributors may be used to endorse or promote products derived from |
||||
this software without specific prior written permission. |
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
||||
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
||||
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
||||
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
||||
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
||||
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
||||
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
||||
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
||||
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
||||
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
||||
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
||||
@ -0,0 +1,22 @@ |
||||
Additional IP Rights Grant (Patents) |
||||
|
||||
"This implementation" means the copyrightable works distributed by |
||||
Google as part of the Go project. |
||||
|
||||
Google hereby grants to You a perpetual, worldwide, non-exclusive, |
||||
no-charge, royalty-free, irrevocable (except as stated in this section) |
||||
patent license to make, have made, use, offer to sell, sell, import, |
||||
transfer and otherwise run, modify and propagate the contents of this |
||||
implementation of Go, where such license applies only to those patent |
||||
claims, both currently owned or controlled by Google and acquired in |
||||
the future, licensable by Google that are necessarily infringed by this |
||||
implementation of Go. This grant does not include claims that would be |
||||
infringed only as a consequence of further modification of this |
||||
implementation. If you or your agent or exclusive licensee institute or |
||||
order or agree to the institution of patent litigation against any |
||||
entity (including a cross-claim or counterclaim in a lawsuit) alleging |
||||
that this implementation of Go or any code incorporated within this |
||||
implementation of Go constitutes direct or contributory patent |
||||
infringement, or inducement of patent infringement, then any patent |
||||
rights granted to you under this License for this implementation of Go |
||||
shall terminate as of the date such litigation is filed. |
||||
@ -0,0 +1,380 @@ |
||||
// Copyright 2015 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Package rate provides a rate limiter.
|
||||
package rate |
||||
|
||||
import ( |
||||
"fmt" |
||||
"math" |
||||
"sync" |
||||
"time" |
||||
) |
||||
|
||||
// Limit defines the maximum frequency of some events.
|
||||
// Limit is represented as number of events per second.
|
||||
// A zero Limit allows no events.
|
||||
type Limit float64 |
||||
|
||||
// Inf is the infinite rate limit; it allows all events (even if burst is zero).
|
||||
const Inf = Limit(math.MaxFloat64) |
||||
|
||||
// Every converts a minimum time interval between events to a Limit.
|
||||
func Every(interval time.Duration) Limit { |
||||
if interval <= 0 { |
||||
return Inf |
||||
} |
||||
return 1 / Limit(interval.Seconds()) |
||||
} |
||||
|
||||
// A Limiter controls how frequently events are allowed to happen.
|
||||
// It implements a "token bucket" of size b, initially full and refilled
|
||||
// at rate r tokens per second.
|
||||
// Informally, in any large enough time interval, the Limiter limits the
|
||||
// rate to r tokens per second, with a maximum burst size of b events.
|
||||
// As a special case, if r == Inf (the infinite rate), b is ignored.
|
||||
// See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets.
|
||||
//
|
||||
// The zero value is a valid Limiter, but it will reject all events.
|
||||
// Use NewLimiter to create non-zero Limiters.
|
||||
//
|
||||
// Limiter has three main methods, Allow, Reserve, and Wait.
|
||||
// Most callers should use Wait.
|
||||
//
|
||||
// Each of the three methods consumes a single token.
|
||||
// They differ in their behavior when no token is available.
|
||||
// If no token is available, Allow returns false.
|
||||
// If no token is available, Reserve returns a reservation for a future token
|
||||
// and the amount of time the caller must wait before using it.
|
||||
// If no token is available, Wait blocks until one can be obtained
|
||||
// or its associated context.Context is canceled.
|
||||
//
|
||||
// The methods AllowN, ReserveN, and WaitN consume n tokens.
|
||||
type Limiter struct { |
||||
limit Limit |
||||
burst int |
||||
|
||||
mu sync.Mutex |
||||
tokens float64 |
||||
// last is the last time the limiter's tokens field was updated
|
||||
last time.Time |
||||
// lastEvent is the latest time of a rate-limited event (past or future)
|
||||
lastEvent time.Time |
||||
} |
||||
|
||||
// Limit returns the maximum overall event rate.
|
||||
func (lim *Limiter) Limit() Limit { |
||||
lim.mu.Lock() |
||||
defer lim.mu.Unlock() |
||||
return lim.limit |
||||
} |
||||
|
||||
// Burst returns the maximum burst size. Burst is the maximum number of tokens
|
||||
// that can be consumed in a single call to Allow, Reserve, or Wait, so higher
|
||||
// Burst values allow more events to happen at once.
|
||||
// A zero Burst allows no events, unless limit == Inf.
|
||||
func (lim *Limiter) Burst() int { |
||||
return lim.burst |
||||
} |
||||
|
||||
// NewLimiter returns a new Limiter that allows events up to rate r and permits
|
||||
// bursts of at most b tokens.
|
||||
func NewLimiter(r Limit, b int) *Limiter { |
||||
return &Limiter{ |
||||
limit: r, |
||||
burst: b, |
||||
} |
||||
} |
||||
|
||||
// Allow is shorthand for AllowN(time.Now(), 1).
|
||||
func (lim *Limiter) Allow() bool { |
||||
return lim.AllowN(time.Now(), 1) |
||||
} |
||||
|
||||
// AllowN reports whether n events may happen at time now.
|
||||
// Use this method if you intend to drop / skip events that exceed the rate limit.
|
||||
// Otherwise use Reserve or Wait.
|
||||
func (lim *Limiter) AllowN(now time.Time, n int) bool { |
||||
return lim.reserveN(now, n, 0).ok |
||||
} |
||||
|
||||
// A Reservation holds information about events that are permitted by a Limiter to happen after a delay.
|
||||
// A Reservation may be canceled, which may enable the Limiter to permit additional events.
|
||||
type Reservation struct { |
||||
ok bool |
||||
lim *Limiter |
||||
tokens int |
||||
timeToAct time.Time |
||||
// This is the Limit at reservation time, it can change later.
|
||||
limit Limit |
||||
} |
||||
|
||||
// OK returns whether the limiter can provide the requested number of tokens
|
||||
// within the maximum wait time. If OK is false, Delay returns InfDuration, and
|
||||
// Cancel does nothing.
|
||||
func (r *Reservation) OK() bool { |
||||
return r.ok |
||||
} |
||||
|
||||
// Delay is shorthand for DelayFrom(time.Now()).
|
||||
func (r *Reservation) Delay() time.Duration { |
||||
return r.DelayFrom(time.Now()) |
||||
} |
||||
|
||||
// InfDuration is the duration returned by Delay when a Reservation is not OK.
|
||||
const InfDuration = time.Duration(1<<63 - 1) |
||||
|
||||
// DelayFrom returns the duration for which the reservation holder must wait
|
||||
// before taking the reserved action. Zero duration means act immediately.
|
||||
// InfDuration means the limiter cannot grant the tokens requested in this
|
||||
// Reservation within the maximum wait time.
|
||||
func (r *Reservation) DelayFrom(now time.Time) time.Duration { |
||||
if !r.ok { |
||||
return InfDuration |
||||
} |
||||
delay := r.timeToAct.Sub(now) |
||||
if delay < 0 { |
||||
return 0 |
||||
} |
||||
return delay |
||||
} |
||||
|
||||
// Cancel is shorthand for CancelAt(time.Now()).
|
||||
func (r *Reservation) Cancel() { |
||||
r.CancelAt(time.Now()) |
||||
return |
||||
} |
||||
|
||||
// CancelAt indicates that the reservation holder will not perform the reserved action
|
||||
// and reverses the effects of this Reservation on the rate limit as much as possible,
|
||||
// considering that other reservations may have already been made.
|
||||
func (r *Reservation) CancelAt(now time.Time) { |
||||
if !r.ok { |
||||
return |
||||
} |
||||
|
||||
r.lim.mu.Lock() |
||||
defer r.lim.mu.Unlock() |
||||
|
||||
if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) { |
||||
return |
||||
} |
||||
|
||||
// calculate tokens to restore
|
||||
// The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved
|
||||
// after r was obtained. These tokens should not be restored.
|
||||
restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct)) |
||||
if restoreTokens <= 0 { |
||||
return |
||||
} |
||||
// advance time to now
|
||||
now, _, tokens := r.lim.advance(now) |
||||
// calculate new number of tokens
|
||||
tokens += restoreTokens |
||||
if burst := float64(r.lim.burst); tokens > burst { |
||||
tokens = burst |
||||
} |
||||
// update state
|
||||
r.lim.last = now |
||||
r.lim.tokens = tokens |
||||
if r.timeToAct == r.lim.lastEvent { |
||||
prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens))) |
||||
if !prevEvent.Before(now) { |
||||
r.lim.lastEvent = prevEvent |
||||
} |
||||
} |
||||
|
||||
return |
||||
} |
||||
|
||||
// Reserve is shorthand for ReserveN(time.Now(), 1).
|
||||
func (lim *Limiter) Reserve() *Reservation { |
||||
return lim.ReserveN(time.Now(), 1) |
||||
} |
||||
|
||||
// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen.
|
||||
// The Limiter takes this Reservation into account when allowing future events.
|
||||
// ReserveN returns false if n exceeds the Limiter's burst size.
|
||||
// Usage example:
|
||||
// r := lim.ReserveN(time.Now(), 1)
|
||||
// if !r.OK() {
|
||||
// // Not allowed to act! Did you remember to set lim.burst to be > 0 ?
|
||||
// return
|
||||
// }
|
||||
// time.Sleep(r.Delay())
|
||||
// Act()
|
||||
// Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events.
|
||||
// If you need to respect a deadline or cancel the delay, use Wait instead.
|
||||
// To drop or skip events exceeding rate limit, use Allow instead.
|
||||
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation { |
||||
r := lim.reserveN(now, n, InfDuration) |
||||
return &r |
||||
} |
||||
|
||||
// contextContext is a temporary(?) copy of the context.Context type
|
||||
// to support both Go 1.6 using golang.org/x/net/context and Go 1.7+
|
||||
// with the built-in context package. If people ever stop using Go 1.6
|
||||
// we can remove this.
|
||||
type contextContext interface { |
||||
Deadline() (deadline time.Time, ok bool) |
||||
Done() <-chan struct{} |
||||
Err() error |
||||
Value(key interface{}) interface{} |
||||
} |
||||
|
||||
// Wait is shorthand for WaitN(ctx, 1).
|
||||
func (lim *Limiter) wait(ctx contextContext) (err error) { |
||||
return lim.WaitN(ctx, 1) |
||||
} |
||||
|
||||
// WaitN blocks until lim permits n events to happen.
|
||||
// It returns an error if n exceeds the Limiter's burst size, the Context is
|
||||
// canceled, or the expected wait time exceeds the Context's Deadline.
|
||||
// The burst limit is ignored if the rate limit is Inf.
|
||||
func (lim *Limiter) waitN(ctx contextContext, n int) (err error) { |
||||
if n > lim.burst && lim.limit != Inf { |
||||
return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst) |
||||
} |
||||
// Check if ctx is already cancelled
|
||||
select { |
||||
case <-ctx.Done(): |
||||
return ctx.Err() |
||||
default: |
||||
} |
||||
// Determine wait limit
|
||||
now := time.Now() |
||||
waitLimit := InfDuration |
||||
if deadline, ok := ctx.Deadline(); ok { |
||||
waitLimit = deadline.Sub(now) |
||||
} |
||||
// Reserve
|
||||
r := lim.reserveN(now, n, waitLimit) |
||||
if !r.ok { |
||||
return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n) |
||||
} |
||||
// Wait
|
||||
t := time.NewTimer(r.DelayFrom(now)) |
||||
defer t.Stop() |
||||
select { |
||||
case <-t.C: |
||||
// We can proceed.
|
||||
return nil |
||||
case <-ctx.Done(): |
||||
// Context was canceled before we could proceed. Cancel the
|
||||
// reservation, which may permit other events to proceed sooner.
|
||||
r.Cancel() |
||||
return ctx.Err() |
||||
} |
||||
} |
||||
|
||||
// SetLimit is shorthand for SetLimitAt(time.Now(), newLimit).
|
||||
func (lim *Limiter) SetLimit(newLimit Limit) { |
||||
lim.SetLimitAt(time.Now(), newLimit) |
||||
} |
||||
|
||||
// SetLimitAt sets a new Limit for the limiter. The new Limit, and Burst, may be violated
|
||||
// or underutilized by those which reserved (using Reserve or Wait) but did not yet act
|
||||
// before SetLimitAt was called.
|
||||
func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit) { |
||||
lim.mu.Lock() |
||||
defer lim.mu.Unlock() |
||||
|
||||
now, _, tokens := lim.advance(now) |
||||
|
||||
lim.last = now |
||||
lim.tokens = tokens |
||||
lim.limit = newLimit |
||||
} |
||||
|
||||
// reserveN is a helper method for AllowN, ReserveN, and WaitN.
|
||||
// maxFutureReserve specifies the maximum reservation wait duration allowed.
|
||||
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
|
||||
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation { |
||||
lim.mu.Lock() |
||||
|
||||
if lim.limit == Inf { |
||||
lim.mu.Unlock() |
||||
return Reservation{ |
||||
ok: true, |
||||
lim: lim, |
||||
tokens: n, |
||||
timeToAct: now, |
||||
} |
||||
} |
||||
|
||||
now, last, tokens := lim.advance(now) |
||||
|
||||
// Calculate the remaining number of tokens resulting from the request.
|
||||
tokens -= float64(n) |
||||
|
||||
// Calculate the wait duration
|
||||
var waitDuration time.Duration |
||||
if tokens < 0 { |
||||
waitDuration = lim.limit.durationFromTokens(-tokens) |
||||
} |
||||
|
||||
// Decide result
|
||||
ok := n <= lim.burst && waitDuration <= maxFutureReserve |
||||
|
||||
// Prepare reservation
|
||||
r := Reservation{ |
||||
ok: ok, |
||||
lim: lim, |
||||
limit: lim.limit, |
||||
} |
||||
if ok { |
||||
r.tokens = n |
||||
r.timeToAct = now.Add(waitDuration) |
||||
} |
||||
|
||||
// Update state
|
||||
if ok { |
||||
lim.last = now |
||||
lim.tokens = tokens |
||||
lim.lastEvent = r.timeToAct |
||||
} else { |
||||
lim.last = last |
||||
} |
||||
|
||||
lim.mu.Unlock() |
||||
return r |
||||
} |
||||
|
||||
// advance calculates and returns an updated state for lim resulting from the passage of time.
|
||||
// lim is not changed.
|
||||
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) { |
||||
last := lim.last |
||||
if now.Before(last) { |
||||
last = now |
||||
} |
||||
|
||||
// Avoid making delta overflow below when last is very old.
|
||||
maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens) |
||||
elapsed := now.Sub(last) |
||||
if elapsed > maxElapsed { |
||||
elapsed = maxElapsed |
||||
} |
||||
|
||||
// Calculate the new number of tokens, due to time that passed.
|
||||
delta := lim.limit.tokensFromDuration(elapsed) |
||||
tokens := lim.tokens + delta |
||||
if burst := float64(lim.burst); tokens > burst { |
||||
tokens = burst |
||||
} |
||||
|
||||
return now, last, tokens |
||||
} |
||||
|
||||
// durationFromTokens is a unit conversion function from the number of tokens to the duration
|
||||
// of time it takes to accumulate them at a rate of limit tokens per second.
|
||||
func (limit Limit) durationFromTokens(tokens float64) time.Duration { |
||||
seconds := tokens / float64(limit) |
||||
return time.Nanosecond * time.Duration(1e9*seconds) |
||||
} |
||||
|
||||
// tokensFromDuration is a unit conversion function from a time duration to the number of tokens
|
||||
// which could be accumulated during that duration at a rate of limit tokens per second.
|
||||
func (limit Limit) tokensFromDuration(d time.Duration) float64 { |
||||
return d.Seconds() * float64(limit) |
||||
} |
||||
@ -0,0 +1,21 @@ |
||||
// Copyright 2017 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// +build !go1.7
|
||||
|
||||
package rate |
||||
|
||||
import "golang.org/x/net/context" |
||||
|
||||
// Wait is shorthand for WaitN(ctx, 1).
|
||||
func (lim *Limiter) Wait(ctx context.Context) (err error) { |
||||
return lim.waitN(ctx, 1) |
||||
} |
||||
|
||||
// WaitN blocks until lim permits n events to happen.
|
||||
// It returns an error if n exceeds the Limiter's burst size, the Context is
|
||||
// canceled, or the expected wait time exceeds the Context's Deadline.
|
||||
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) { |
||||
return lim.waitN(ctx, n) |
||||
} |
||||
@ -0,0 +1,21 @@ |
||||
// Copyright 2017 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// +build go1.7
|
||||
|
||||
package rate |
||||
|
||||
import "context" |
||||
|
||||
// Wait is shorthand for WaitN(ctx, 1).
|
||||
func (lim *Limiter) Wait(ctx context.Context) (err error) { |
||||
return lim.waitN(ctx, 1) |
||||
} |
||||
|
||||
// WaitN blocks until lim permits n events to happen.
|
||||
// It returns an error if n exceeds the Limiter's burst size, the Context is
|
||||
// canceled, or the expected wait time exceeds the Context's Deadline.
|
||||
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) { |
||||
return lim.waitN(ctx, n) |
||||
} |
||||
Loading…
Reference in new issue