mirror of https://github.com/grafana/loki
Ruler/loki rule validator (#2589)
* revendors cortex * loki multitenantmanager * vendoring compat * go modk32
parent
cd63a535d2
commit
164f5cd0ae
8
vendor/github.com/cortexproject/cortex/pkg/alertmanager/alertmanager_metrics.go
generated
vendored
8
vendor/github.com/cortexproject/cortex/pkg/alertmanager/alertmanager_metrics.go
generated
vendored
2
vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_balanced_set.go
generated
vendored
2
vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_balanced_set.go
generated
vendored
37
vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_replicated_set.go
generated
vendored
37
vendor/github.com/cortexproject/cortex/pkg/querier/blocks_store_replicated_set.go
generated
vendored
393
vendor/github.com/cortexproject/cortex/pkg/querier/queryrange/queryrange.pb.go
generated
vendored
393
vendor/github.com/cortexproject/cortex/pkg/querier/queryrange/queryrange.pb.go
generated
vendored
21
vendor/github.com/cortexproject/cortex/pkg/querier/queryrange/results_cache.go
generated
vendored
21
vendor/github.com/cortexproject/cortex/pkg/querier/queryrange/results_cache.go
generated
vendored
8
vendor/github.com/cortexproject/cortex/pkg/querier/queryrange/split_by_interval.go
generated
vendored
8
vendor/github.com/cortexproject/cortex/pkg/querier/queryrange/split_by_interval.go
generated
vendored
@ -1,60 +0,0 @@ |
||||
package storegateway |
||||
|
||||
import ( |
||||
"context" |
||||
|
||||
"github.com/go-kit/kit/log" |
||||
"github.com/go-kit/kit/log/level" |
||||
"github.com/oklog/ulid" |
||||
"github.com/thanos-io/thanos/pkg/block/metadata" |
||||
"github.com/thanos-io/thanos/pkg/extprom" |
||||
|
||||
"github.com/cortexproject/cortex/pkg/ring" |
||||
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" |
||||
) |
||||
|
||||
const ( |
||||
shardExcludedMeta = "shard-excluded" |
||||
) |
||||
|
||||
// ShardingMetadataFilter represents struct that allows sharding using the ring.
|
||||
// Not go-routine safe.
|
||||
type ShardingMetadataFilter struct { |
||||
r *ring.Ring |
||||
instanceAddr string |
||||
logger log.Logger |
||||
} |
||||
|
||||
// NewShardingMetadataFilter creates ShardingMetadataFilter.
|
||||
func NewShardingMetadataFilter(r *ring.Ring, instanceAddr string, logger log.Logger) *ShardingMetadataFilter { |
||||
return &ShardingMetadataFilter{ |
||||
r: r, |
||||
instanceAddr: instanceAddr, |
||||
logger: logger, |
||||
} |
||||
} |
||||
|
||||
// Filter filters out blocks not included within the current shard.
|
||||
func (f *ShardingMetadataFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) error { |
||||
// Buffer internally used by the ring (give extra room for a JOINING + LEAVING instance).
|
||||
buf := make([]ring.IngesterDesc, 0, f.r.ReplicationFactor()+2) |
||||
|
||||
for blockID := range metas { |
||||
key := cortex_tsdb.HashBlockID(blockID) |
||||
set, err := f.r.Get(key, ring.BlocksSync, buf) |
||||
|
||||
// If there are no healthy instances in the replication set or
|
||||
// the replication set for this block doesn't include this instance
|
||||
// then we filter it out.
|
||||
if err != nil || !set.Includes(f.instanceAddr) { |
||||
if err != nil { |
||||
level.Warn(f.logger).Log("msg", "failed to get replication set for block", "block", blockID.String(), "err", err) |
||||
} |
||||
|
||||
synced.WithLabelValues(shardExcludedMeta).Inc() |
||||
delete(metas, blockID) |
||||
} |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
@ -0,0 +1,201 @@ |
||||
package storegateway |
||||
|
||||
import ( |
||||
"context" |
||||
|
||||
"github.com/go-kit/kit/log" |
||||
"github.com/go-kit/kit/log/level" |
||||
"github.com/oklog/ulid" |
||||
"github.com/thanos-io/thanos/pkg/block" |
||||
"github.com/thanos-io/thanos/pkg/block/metadata" |
||||
"github.com/thanos-io/thanos/pkg/extprom" |
||||
"github.com/thanos-io/thanos/pkg/objstore" |
||||
|
||||
"github.com/cortexproject/cortex/pkg/ring" |
||||
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" |
||||
) |
||||
|
||||
const ( |
||||
shardExcludedMeta = "shard-excluded" |
||||
) |
||||
|
||||
type ShardingStrategy interface { |
||||
// FilterUsers whose blocks should be loaded by the store-gateway. Returns the list of user IDs
|
||||
// that should be synced by the store-gateway.
|
||||
FilterUsers(ctx context.Context, userIDs []string) []string |
||||
|
||||
// FilterBlocks that should be loaded by the store-gateway.
|
||||
FilterBlocks(ctx context.Context, userID string, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) error |
||||
} |
||||
|
||||
// ShardingLimits is the interface that should be implemented by the limits provider,
|
||||
// limiting the scope of the limits to the ones required by sharding strategies.
|
||||
type ShardingLimits interface { |
||||
StoreGatewayTenantShardSize(userID string) int |
||||
} |
||||
|
||||
// NoShardingStrategy is a no-op strategy. When this strategy is used, no tenant/block is filtered out.
|
||||
type NoShardingStrategy struct{} |
||||
|
||||
func NewNoShardingStrategy() *NoShardingStrategy { |
||||
return &NoShardingStrategy{} |
||||
} |
||||
|
||||
func (s *NoShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string { |
||||
return userIDs |
||||
} |
||||
|
||||
func (s *NoShardingStrategy) FilterBlocks(_ context.Context, _ string, _ map[ulid.ULID]*metadata.Meta, _ *extprom.TxGaugeVec) error { |
||||
return nil |
||||
} |
||||
|
||||
// DefaultShardingStrategy is a sharding strategy based on the hash ring formed by store-gateways.
|
||||
// Not go-routine safe.
|
||||
type DefaultShardingStrategy struct { |
||||
r *ring.Ring |
||||
instanceAddr string |
||||
logger log.Logger |
||||
} |
||||
|
||||
// NewDefaultShardingStrategy creates DefaultShardingStrategy.
|
||||
func NewDefaultShardingStrategy(r *ring.Ring, instanceAddr string, logger log.Logger) *DefaultShardingStrategy { |
||||
return &DefaultShardingStrategy{ |
||||
r: r, |
||||
instanceAddr: instanceAddr, |
||||
logger: logger, |
||||
} |
||||
} |
||||
|
||||
// FilterUsers implements ShardingStrategy.
|
||||
func (s *DefaultShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string { |
||||
return userIDs |
||||
} |
||||
|
||||
// FilterBlocks implements ShardingStrategy.
|
||||
func (s *DefaultShardingStrategy) FilterBlocks(_ context.Context, _ string, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) error { |
||||
filterBlocksByRingSharding(s.r, s.instanceAddr, metas, synced, s.logger) |
||||
return nil |
||||
} |
||||
|
||||
// ShuffleShardingStrategy is a shuffle sharding strategy, based on the hash ring formed by store-gateways,
|
||||
// where each tenant blocks are sharded across a subset of store-gateway instances.
|
||||
type ShuffleShardingStrategy struct { |
||||
r *ring.Ring |
||||
instanceID string |
||||
instanceAddr string |
||||
limits ShardingLimits |
||||
logger log.Logger |
||||
} |
||||
|
||||
// NewShuffleShardingStrategy makes a new ShuffleShardingStrategy.
|
||||
func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, limits ShardingLimits, logger log.Logger) *ShuffleShardingStrategy { |
||||
return &ShuffleShardingStrategy{ |
||||
r: r, |
||||
instanceID: instanceID, |
||||
instanceAddr: instanceAddr, |
||||
limits: limits, |
||||
logger: logger, |
||||
} |
||||
} |
||||
|
||||
// FilterUsers implements ShardingStrategy.
|
||||
func (s *ShuffleShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string { |
||||
var filteredIDs []string |
||||
|
||||
for _, userID := range userIDs { |
||||
subRing := GetShuffleShardingSubring(s.r, userID, s.limits) |
||||
|
||||
// Include the user only if it belongs to this store-gateway shard.
|
||||
if subRing.HasInstance(s.instanceID) { |
||||
filteredIDs = append(filteredIDs, userID) |
||||
} |
||||
} |
||||
|
||||
return filteredIDs |
||||
} |
||||
|
||||
// FilterBlocks implements ShardingStrategy.
|
||||
func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, userID string, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) error { |
||||
subRing := GetShuffleShardingSubring(s.r, userID, s.limits) |
||||
filterBlocksByRingSharding(subRing, s.instanceAddr, metas, synced, s.logger) |
||||
return nil |
||||
} |
||||
|
||||
func filterBlocksByRingSharding(r ring.ReadRing, instanceAddr string, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, logger log.Logger) { |
||||
// Buffer internally used by the ring (give extra room for a JOINING + LEAVING instance).
|
||||
buf := make([]ring.IngesterDesc, 0, r.ReplicationFactor()+2) |
||||
|
||||
for blockID := range metas { |
||||
key := cortex_tsdb.HashBlockID(blockID) |
||||
set, err := r.Get(key, ring.BlocksSync, buf) |
||||
|
||||
// If there are no healthy instances in the replication set or
|
||||
// the replication set for this block doesn't include this instance
|
||||
// then we filter it out.
|
||||
if err != nil || !set.Includes(instanceAddr) { |
||||
if err != nil { |
||||
level.Warn(logger).Log("msg", "excluded block because failed to get replication set", "block", blockID.String(), "err", err) |
||||
} |
||||
|
||||
synced.WithLabelValues(shardExcludedMeta).Inc() |
||||
delete(metas, blockID) |
||||
} |
||||
} |
||||
} |
||||
|
||||
// GetShuffleShardingSubring returns the subring to be used for a given user. This function
|
||||
// should be used both by store-gateway and querier in order to guarantee the same logic is used.
|
||||
func GetShuffleShardingSubring(ring *ring.Ring, userID string, limits ShardingLimits) ring.ReadRing { |
||||
shardSize := limits.StoreGatewayTenantShardSize(userID) |
||||
|
||||
// A shard size of 0 means shuffle sharding is disabled for this specific user,
|
||||
// so we just return the full ring so that blocks will be sharded across all store-gateways.
|
||||
if shardSize <= 0 { |
||||
return ring |
||||
} |
||||
|
||||
return ring.Subring(cortex_tsdb.HashTenantID(userID), shardSize) |
||||
} |
||||
|
||||
type shardingMetadataFilterAdapter struct { |
||||
userID string |
||||
strategy ShardingStrategy |
||||
} |
||||
|
||||
func NewShardingMetadataFilterAdapter(userID string, strategy ShardingStrategy) block.MetadataFilter { |
||||
return &shardingMetadataFilterAdapter{ |
||||
userID: userID, |
||||
strategy: strategy, |
||||
} |
||||
} |
||||
|
||||
// Filter implements block.MetadataFilter.
|
||||
func (a *shardingMetadataFilterAdapter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) error { |
||||
return a.strategy.FilterBlocks(ctx, a.userID, metas, synced) |
||||
} |
||||
|
||||
type shardingBucketReaderAdapter struct { |
||||
objstore.InstrumentedBucketReader |
||||
|
||||
userID string |
||||
strategy ShardingStrategy |
||||
} |
||||
|
||||
func NewShardingBucketReaderAdapter(userID string, strategy ShardingStrategy, wrapped objstore.InstrumentedBucketReader) objstore.InstrumentedBucketReader { |
||||
return &shardingBucketReaderAdapter{ |
||||
InstrumentedBucketReader: wrapped, |
||||
userID: userID, |
||||
strategy: strategy, |
||||
} |
||||
} |
||||
|
||||
// Iter implements objstore.BucketReader.
|
||||
func (a *shardingBucketReaderAdapter) Iter(ctx context.Context, dir string, f func(string) error) error { |
||||
// Skip iterating the bucket if the tenant doesn't belong to the shard. From the caller
|
||||
// perspective, this will look like the tenant has no blocks in the storage.
|
||||
if len(a.strategy.FilterUsers(ctx, []string{a.userID})) == 0 { |
||||
return nil |
||||
} |
||||
|
||||
return a.InstrumentedBucketReader.Iter(ctx, dir, f) |
||||
} |
||||
@ -0,0 +1,53 @@ |
||||
package util |
||||
|
||||
import ( |
||||
"context" |
||||
|
||||
"google.golang.org/grpc/metadata" |
||||
) |
||||
|
||||
// ipAddressesKey is key for the GRPC metadata where the IP addresses are stored
|
||||
const ipAddressesKey = "github.com/cortexproject/cortex/util/extract_forwarded/x-forwarded-for" |
||||
|
||||
// GetSourceIPsFromOutgoingCtx extracts the source field from the GRPC context
|
||||
func GetSourceIPsFromOutgoingCtx(ctx context.Context) string { |
||||
md, ok := metadata.FromOutgoingContext(ctx) |
||||
if !ok { |
||||
return "" |
||||
} |
||||
ipAddresses, ok := md[ipAddressesKey] |
||||
if !ok { |
||||
return "" |
||||
} |
||||
return ipAddresses[0] |
||||
} |
||||
|
||||
// GetSourceIPsFromIncomingCtx extracts the source field from the GRPC context
|
||||
func GetSourceIPsFromIncomingCtx(ctx context.Context) string { |
||||
md, ok := metadata.FromIncomingContext(ctx) |
||||
if !ok { |
||||
return "" |
||||
} |
||||
ipAddresses, ok := md[ipAddressesKey] |
||||
if !ok { |
||||
return "" |
||||
} |
||||
return ipAddresses[0] |
||||
} |
||||
|
||||
// AddSourceIPsToOutgoingContext adds the given source to the GRPC context
|
||||
func AddSourceIPsToOutgoingContext(ctx context.Context, source string) context.Context { |
||||
if source != "" { |
||||
ctx = metadata.AppendToOutgoingContext(ctx, ipAddressesKey, source) |
||||
} |
||||
return ctx |
||||
} |
||||
|
||||
// AddSourceIPsToIncomingContext adds the given source to the GRPC context
|
||||
func AddSourceIPsToIncomingContext(ctx context.Context, source string) context.Context { |
||||
if source != "" { |
||||
md := metadata.Pairs(ipAddressesKey, source) |
||||
ctx = metadata.NewIncomingContext(ctx, md) |
||||
} |
||||
return ctx |
||||
} |
||||
@ -0,0 +1,15 @@ |
||||
language: go |
||||
go: |
||||
- 1.10.x |
||||
- 1.11.x |
||||
- 1.12.x |
||||
sudo: false |
||||
before_install: |
||||
- go get -u golang.org/x/lint/golint |
||||
- go get github.com/axw/gocov/gocov |
||||
- go get github.com/mattn/goveralls |
||||
script: |
||||
- test -z "`gofmt -l .`" |
||||
- test -z "`golint ./...`" |
||||
- $GOPATH/bin/goveralls -service=travis-ci |
||||
- cd example && go build -o http_breaker && ./http_breaker |
||||
@ -0,0 +1,21 @@ |
||||
The MIT License (MIT) |
||||
|
||||
Copyright 2015 Sony Corporation |
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy |
||||
of this software and associated documentation files (the "Software"), to deal |
||||
in the Software without restriction, including without limitation the rights |
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
||||
copies of the Software, and to permit persons to whom the Software is |
||||
furnished to do so, subject to the following conditions: |
||||
|
||||
The above copyright notice and this permission notice shall be included in |
||||
all copies or substantial portions of the Software. |
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
||||
THE SOFTWARE. |
||||
@ -0,0 +1,128 @@ |
||||
gobreaker |
||||
========= |
||||
|
||||
[](http://godoc.org/github.com/sony/gobreaker) |
||||
[](https://travis-ci.org/sony/gobreaker) |
||||
[](https://coveralls.io/github/sony/gobreaker?branch=master) |
||||
|
||||
[gobreaker][repo-url] implements the [Circuit Breaker pattern](https://msdn.microsoft.com/en-us/library/dn589784.aspx) in Go. |
||||
|
||||
Installation |
||||
------------ |
||||
|
||||
``` |
||||
go get github.com/sony/gobreaker |
||||
``` |
||||
|
||||
Usage |
||||
----- |
||||
|
||||
The struct `CircuitBreaker` is a state machine to prevent sending requests that are likely to fail. |
||||
The function `NewCircuitBreaker` creates a new `CircuitBreaker`. |
||||
|
||||
```go |
||||
func NewCircuitBreaker(st Settings) *CircuitBreaker |
||||
``` |
||||
|
||||
You can configure `CircuitBreaker` by the struct `Settings`: |
||||
|
||||
```go |
||||
type Settings struct { |
||||
Name string |
||||
MaxRequests uint32 |
||||
Interval time.Duration |
||||
Timeout time.Duration |
||||
ReadyToTrip func(counts Counts) bool |
||||
OnStateChange func(name string, from State, to State) |
||||
} |
||||
``` |
||||
|
||||
- `Name` is the name of the `CircuitBreaker`. |
||||
|
||||
- `MaxRequests` is the maximum number of requests allowed to pass through |
||||
when the `CircuitBreaker` is half-open. |
||||
If `MaxRequests` is 0, `CircuitBreaker` allows only 1 request. |
||||
|
||||
- `Interval` is the cyclic period of the closed state |
||||
for `CircuitBreaker` to clear the internal `Counts`, described later in this section. |
||||
If `Interval` is 0, `CircuitBreaker` doesn't clear the internal `Counts` during the closed state. |
||||
|
||||
- `Timeout` is the period of the open state, |
||||
after which the state of `CircuitBreaker` becomes half-open. |
||||
If `Timeout` is 0, the timeout value of `CircuitBreaker` is set to 60 seconds. |
||||
|
||||
- `ReadyToTrip` is called with a copy of `Counts` whenever a request fails in the closed state. |
||||
If `ReadyToTrip` returns true, `CircuitBreaker` will be placed into the open state. |
||||
If `ReadyToTrip` is `nil`, default `ReadyToTrip` is used. |
||||
Default `ReadyToTrip` returns true when the number of consecutive failures is more than 5. |
||||
|
||||
- `OnStateChange` is called whenever the state of `CircuitBreaker` changes. |
||||
|
||||
The struct `Counts` holds the numbers of requests and their successes/failures: |
||||
|
||||
```go |
||||
type Counts struct { |
||||
Requests uint32 |
||||
TotalSuccesses uint32 |
||||
TotalFailures uint32 |
||||
ConsecutiveSuccesses uint32 |
||||
ConsecutiveFailures uint32 |
||||
} |
||||
``` |
||||
|
||||
`CircuitBreaker` clears the internal `Counts` either |
||||
on the change of the state or at the closed-state intervals. |
||||
`Counts` ignores the results of the requests sent before clearing. |
||||
|
||||
`CircuitBreaker` can wrap any function to send a request: |
||||
|
||||
```go |
||||
func (cb *CircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, error) |
||||
``` |
||||
|
||||
The method `Execute` runs the given request if `CircuitBreaker` accepts it. |
||||
`Execute` returns an error instantly if `CircuitBreaker` rejects the request. |
||||
Otherwise, `Execute` returns the result of the request. |
||||
If a panic occurs in the request, `CircuitBreaker` handles it as an error |
||||
and causes the same panic again. |
||||
|
||||
Example |
||||
------- |
||||
|
||||
```go |
||||
var cb *breaker.CircuitBreaker |
||||
|
||||
func Get(url string) ([]byte, error) { |
||||
body, err := cb.Execute(func() (interface{}, error) { |
||||
resp, err := http.Get(url) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
defer resp.Body.Close() |
||||
body, err := ioutil.ReadAll(resp.Body) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return body, nil |
||||
}) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return body.([]byte), nil |
||||
} |
||||
``` |
||||
|
||||
See [example](https://github.com/sony/gobreaker/blob/master/example) for details. |
||||
|
||||
License |
||||
------- |
||||
|
||||
The MIT License (MIT) |
||||
|
||||
See [LICENSE](https://github.com/sony/gobreaker/blob/master/LICENSE) for details. |
||||
|
||||
|
||||
[repo-url]: https://github.com/sony/gobreaker |
||||
@ -0,0 +1,5 @@ |
||||
module github.com/sony/gobreaker |
||||
|
||||
go 1.12 |
||||
|
||||
require github.com/stretchr/testify v1.3.0 |
||||
@ -0,0 +1,7 @@ |
||||
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= |
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= |
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= |
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= |
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= |
||||
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= |
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= |
||||
@ -0,0 +1,344 @@ |
||||
// Package gobreaker implements the Circuit Breaker pattern.
|
||||
// See https://msdn.microsoft.com/en-us/library/dn589784.aspx.
|
||||
package gobreaker |
||||
|
||||
import ( |
||||
"errors" |
||||
"fmt" |
||||
"sync" |
||||
"time" |
||||
) |
||||
|
||||
// State is a type that represents a state of CircuitBreaker.
|
||||
type State int |
||||
|
||||
// These constants are states of CircuitBreaker.
|
||||
const ( |
||||
StateClosed State = iota |
||||
StateHalfOpen |
||||
StateOpen |
||||
) |
||||
|
||||
var ( |
||||
// ErrTooManyRequests is returned when the CB state is half open and the requests count is over the cb maxRequests
|
||||
ErrTooManyRequests = errors.New("too many requests") |
||||
// ErrOpenState is returned when the CB state is open
|
||||
ErrOpenState = errors.New("circuit breaker is open") |
||||
) |
||||
|
||||
// String implements stringer interface.
|
||||
func (s State) String() string { |
||||
switch s { |
||||
case StateClosed: |
||||
return "closed" |
||||
case StateHalfOpen: |
||||
return "half-open" |
||||
case StateOpen: |
||||
return "open" |
||||
default: |
||||
return fmt.Sprintf("unknown state: %d", s) |
||||
} |
||||
} |
||||
|
||||
// Counts holds the numbers of requests and their successes/failures.
|
||||
// CircuitBreaker clears the internal Counts either
|
||||
// on the change of the state or at the closed-state intervals.
|
||||
// Counts ignores the results of the requests sent before clearing.
|
||||
type Counts struct { |
||||
Requests uint32 |
||||
TotalSuccesses uint32 |
||||
TotalFailures uint32 |
||||
ConsecutiveSuccesses uint32 |
||||
ConsecutiveFailures uint32 |
||||
} |
||||
|
||||
func (c *Counts) onRequest() { |
||||
c.Requests++ |
||||
} |
||||
|
||||
func (c *Counts) onSuccess() { |
||||
c.TotalSuccesses++ |
||||
c.ConsecutiveSuccesses++ |
||||
c.ConsecutiveFailures = 0 |
||||
} |
||||
|
||||
func (c *Counts) onFailure() { |
||||
c.TotalFailures++ |
||||
c.ConsecutiveFailures++ |
||||
c.ConsecutiveSuccesses = 0 |
||||
} |
||||
|
||||
func (c *Counts) clear() { |
||||
c.Requests = 0 |
||||
c.TotalSuccesses = 0 |
||||
c.TotalFailures = 0 |
||||
c.ConsecutiveSuccesses = 0 |
||||
c.ConsecutiveFailures = 0 |
||||
} |
||||
|
||||
// Settings configures CircuitBreaker:
|
||||
//
|
||||
// Name is the name of the CircuitBreaker.
|
||||
//
|
||||
// MaxRequests is the maximum number of requests allowed to pass through
|
||||
// when the CircuitBreaker is half-open.
|
||||
// If MaxRequests is 0, the CircuitBreaker allows only 1 request.
|
||||
//
|
||||
// Interval is the cyclic period of the closed state
|
||||
// for the CircuitBreaker to clear the internal Counts.
|
||||
// If Interval is 0, the CircuitBreaker doesn't clear internal Counts during the closed state.
|
||||
//
|
||||
// Timeout is the period of the open state,
|
||||
// after which the state of the CircuitBreaker becomes half-open.
|
||||
// If Timeout is 0, the timeout value of the CircuitBreaker is set to 60 seconds.
|
||||
//
|
||||
// ReadyToTrip is called with a copy of Counts whenever a request fails in the closed state.
|
||||
// If ReadyToTrip returns true, the CircuitBreaker will be placed into the open state.
|
||||
// If ReadyToTrip is nil, default ReadyToTrip is used.
|
||||
// Default ReadyToTrip returns true when the number of consecutive failures is more than 5.
|
||||
//
|
||||
// OnStateChange is called whenever the state of the CircuitBreaker changes.
|
||||
type Settings struct { |
||||
Name string |
||||
MaxRequests uint32 |
||||
Interval time.Duration |
||||
Timeout time.Duration |
||||
ReadyToTrip func(counts Counts) bool |
||||
OnStateChange func(name string, from State, to State) |
||||
} |
||||
|
||||
// CircuitBreaker is a state machine to prevent sending requests that are likely to fail.
|
||||
type CircuitBreaker struct { |
||||
name string |
||||
maxRequests uint32 |
||||
interval time.Duration |
||||
timeout time.Duration |
||||
readyToTrip func(counts Counts) bool |
||||
onStateChange func(name string, from State, to State) |
||||
|
||||
mutex sync.Mutex |
||||
state State |
||||
generation uint64 |
||||
counts Counts |
||||
expiry time.Time |
||||
} |
||||
|
||||
// TwoStepCircuitBreaker is like CircuitBreaker but instead of surrounding a function
|
||||
// with the breaker functionality, it only checks whether a request can proceed and
|
||||
// expects the caller to report the outcome in a separate step using a callback.
|
||||
type TwoStepCircuitBreaker struct { |
||||
cb *CircuitBreaker |
||||
} |
||||
|
||||
// NewCircuitBreaker returns a new CircuitBreaker configured with the given Settings.
|
||||
func NewCircuitBreaker(st Settings) *CircuitBreaker { |
||||
cb := new(CircuitBreaker) |
||||
|
||||
cb.name = st.Name |
||||
cb.interval = st.Interval |
||||
cb.onStateChange = st.OnStateChange |
||||
|
||||
if st.MaxRequests == 0 { |
||||
cb.maxRequests = 1 |
||||
} else { |
||||
cb.maxRequests = st.MaxRequests |
||||
} |
||||
|
||||
if st.Timeout == 0 { |
||||
cb.timeout = defaultTimeout |
||||
} else { |
||||
cb.timeout = st.Timeout |
||||
} |
||||
|
||||
if st.ReadyToTrip == nil { |
||||
cb.readyToTrip = defaultReadyToTrip |
||||
} else { |
||||
cb.readyToTrip = st.ReadyToTrip |
||||
} |
||||
|
||||
cb.toNewGeneration(time.Now()) |
||||
|
||||
return cb |
||||
} |
||||
|
||||
// NewTwoStepCircuitBreaker returns a new TwoStepCircuitBreaker configured with the given Settings.
|
||||
func NewTwoStepCircuitBreaker(st Settings) *TwoStepCircuitBreaker { |
||||
return &TwoStepCircuitBreaker{ |
||||
cb: NewCircuitBreaker(st), |
||||
} |
||||
} |
||||
|
||||
const defaultTimeout = time.Duration(60) * time.Second |
||||
|
||||
func defaultReadyToTrip(counts Counts) bool { |
||||
return counts.ConsecutiveFailures > 5 |
||||
} |
||||
|
||||
// Name returns the name of the CircuitBreaker.
|
||||
func (cb *CircuitBreaker) Name() string { |
||||
return cb.name |
||||
} |
||||
|
||||
// State returns the current state of the CircuitBreaker.
|
||||
func (cb *CircuitBreaker) State() State { |
||||
cb.mutex.Lock() |
||||
defer cb.mutex.Unlock() |
||||
|
||||
now := time.Now() |
||||
state, _ := cb.currentState(now) |
||||
return state |
||||
} |
||||
|
||||
// Execute runs the given request if the CircuitBreaker accepts it.
|
||||
// Execute returns an error instantly if the CircuitBreaker rejects the request.
|
||||
// Otherwise, Execute returns the result of the request.
|
||||
// If a panic occurs in the request, the CircuitBreaker handles it as an error
|
||||
// and causes the same panic again.
|
||||
func (cb *CircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, error) { |
||||
generation, err := cb.beforeRequest() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
defer func() { |
||||
e := recover() |
||||
if e != nil { |
||||
cb.afterRequest(generation, false) |
||||
panic(e) |
||||
} |
||||
}() |
||||
|
||||
result, err := req() |
||||
cb.afterRequest(generation, err == nil) |
||||
return result, err |
||||
} |
||||
|
||||
// Name returns the name of the TwoStepCircuitBreaker.
|
||||
func (tscb *TwoStepCircuitBreaker) Name() string { |
||||
return tscb.cb.Name() |
||||
} |
||||
|
||||
// State returns the current state of the TwoStepCircuitBreaker.
|
||||
func (tscb *TwoStepCircuitBreaker) State() State { |
||||
return tscb.cb.State() |
||||
} |
||||
|
||||
// Allow checks if a new request can proceed. It returns a callback that should be used to
|
||||
// register the success or failure in a separate step. If the circuit breaker doesn't allow
|
||||
// requests, it returns an error.
|
||||
func (tscb *TwoStepCircuitBreaker) Allow() (done func(success bool), err error) { |
||||
generation, err := tscb.cb.beforeRequest() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return func(success bool) { |
||||
tscb.cb.afterRequest(generation, success) |
||||
}, nil |
||||
} |
||||
|
||||
func (cb *CircuitBreaker) beforeRequest() (uint64, error) { |
||||
cb.mutex.Lock() |
||||
defer cb.mutex.Unlock() |
||||
|
||||
now := time.Now() |
||||
state, generation := cb.currentState(now) |
||||
|
||||
if state == StateOpen { |
||||
return generation, ErrOpenState |
||||
} else if state == StateHalfOpen && cb.counts.Requests >= cb.maxRequests { |
||||
return generation, ErrTooManyRequests |
||||
} |
||||
|
||||
cb.counts.onRequest() |
||||
return generation, nil |
||||
} |
||||
|
||||
func (cb *CircuitBreaker) afterRequest(before uint64, success bool) { |
||||
cb.mutex.Lock() |
||||
defer cb.mutex.Unlock() |
||||
|
||||
now := time.Now() |
||||
state, generation := cb.currentState(now) |
||||
if generation != before { |
||||
return |
||||
} |
||||
|
||||
if success { |
||||
cb.onSuccess(state, now) |
||||
} else { |
||||
cb.onFailure(state, now) |
||||
} |
||||
} |
||||
|
||||
func (cb *CircuitBreaker) onSuccess(state State, now time.Time) { |
||||
switch state { |
||||
case StateClosed: |
||||
cb.counts.onSuccess() |
||||
case StateHalfOpen: |
||||
cb.counts.onSuccess() |
||||
if cb.counts.ConsecutiveSuccesses >= cb.maxRequests { |
||||
cb.setState(StateClosed, now) |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (cb *CircuitBreaker) onFailure(state State, now time.Time) { |
||||
switch state { |
||||
case StateClosed: |
||||
cb.counts.onFailure() |
||||
if cb.readyToTrip(cb.counts) { |
||||
cb.setState(StateOpen, now) |
||||
} |
||||
case StateHalfOpen: |
||||
cb.setState(StateOpen, now) |
||||
} |
||||
} |
||||
|
||||
func (cb *CircuitBreaker) currentState(now time.Time) (State, uint64) { |
||||
switch cb.state { |
||||
case StateClosed: |
||||
if !cb.expiry.IsZero() && cb.expiry.Before(now) { |
||||
cb.toNewGeneration(now) |
||||
} |
||||
case StateOpen: |
||||
if cb.expiry.Before(now) { |
||||
cb.setState(StateHalfOpen, now) |
||||
} |
||||
} |
||||
return cb.state, cb.generation |
||||
} |
||||
|
||||
func (cb *CircuitBreaker) setState(state State, now time.Time) { |
||||
if cb.state == state { |
||||
return |
||||
} |
||||
|
||||
prev := cb.state |
||||
cb.state = state |
||||
|
||||
cb.toNewGeneration(now) |
||||
|
||||
if cb.onStateChange != nil { |
||||
cb.onStateChange(cb.name, prev, state) |
||||
} |
||||
} |
||||
|
||||
func (cb *CircuitBreaker) toNewGeneration(now time.Time) { |
||||
cb.generation++ |
||||
cb.counts.clear() |
||||
|
||||
var zero time.Time |
||||
switch cb.state { |
||||
case StateClosed: |
||||
if cb.interval == 0 { |
||||
cb.expiry = zero |
||||
} else { |
||||
cb.expiry = now.Add(cb.interval) |
||||
} |
||||
case StateOpen: |
||||
cb.expiry = now.Add(cb.timeout) |
||||
default: // StateHalfOpen
|
||||
cb.expiry = zero |
||||
} |
||||
} |
||||
Loading…
Reference in new issue