Dynamic client-side throttling to avoid object storage rate-limits (GCS only) (#10140)

**What this PR does / why we need it**:
Across the various cloud providers' object storage services, there are
different rate-limits implemented. Rate-limits can be imposed under
multiple conditions, such as server-side scale up (ramping up from low
volume to high, "local" limit), reaching some defined upper limit
("absolute" limit), etc.

We cannot know apriori when these rate-limits will be imposed, so we
can't set up a client-side limiter to only allow a certain number of
requests through per second. Additionally, that would require global
coordination between queriers - which is difficult.

With the above constraints, I have instead taken inspiration from TCP's
[congestion control
algorithms](https://en.wikipedia.org/wiki/TCP_congestion_control). This
PR implements
[AIMD](https://en.wikipedia.org/wiki/Additive_increase/multiplicative_decrease)
(Additive Increase, Multiplicative Decrease), which is used in the
congestion _avoidance_ phase of congestion control. The default window
size (object store requests per second) is 2000; in other words, we skip
the "slow start" phase.

The controller uses the Go
[`rate.Limiter`](https://pkg.go.dev/golang.org/x/time/rate), which
implements the token-bucket algorithm.

To put it simply:
- every successful request widens the window (per second client
rate-limit)
- every rate-limited response reduces the window size by a backoff
factor (0.5 by default, so it will halve)
- when the limit has been reached, the querier will be delayed from
making further requests until tokens are available
pull/10264/head
Danny Kopping 2 years ago committed by GitHub
parent 176e9b7292
commit beed298549
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 102
      docs/sources/configure/_index.md
  3. 2
      go.mod
  4. 21
      pkg/loki/common/common.go
  5. 6
      pkg/loki/config_wrapper.go
  6. 3
      pkg/storage/chunk/client/alibaba/oss_object_client.go
  7. 4
      pkg/storage/chunk/client/aws/dynamodb_storage_client.go
  8. 3
      pkg/storage/chunk/client/aws/s3_storage_client.go
  9. 3
      pkg/storage/chunk/client/azure/blob_storage_client.go
  10. 4
      pkg/storage/chunk/client/baidubce/bos_storage_client.go
  11. 4
      pkg/storage/chunk/client/cassandra/storage_client.go
  12. 1
      pkg/storage/chunk/client/client.go
  13. 73
      pkg/storage/chunk/client/congestion/config.go
  14. 48
      pkg/storage/chunk/client/congestion/congestion.go
  15. 69
      pkg/storage/chunk/client/congestion/congestion_test.go
  16. 216
      pkg/storage/chunk/client/congestion/controller.go
  17. 290
      pkg/storage/chunk/client/congestion/controller_test.go
  18. 17
      pkg/storage/chunk/client/congestion/hedge.go
  19. 50
      pkg/storage/chunk/client/congestion/interfaces.go
  20. 82
      pkg/storage/chunk/client/congestion/metrics.go
  21. 54
      pkg/storage/chunk/client/congestion/retry.go
  22. 4
      pkg/storage/chunk/client/gcp/bigtable_object_client.go
  23. 74
      pkg/storage/chunk/client/gcp/gcs_object_client.go
  24. 178
      pkg/storage/chunk/client/gcp/gcs_object_client_test.go
  25. 4
      pkg/storage/chunk/client/grpc/storage_client.go
  26. 3
      pkg/storage/chunk/client/ibmcloud/cos_object_client.go
  27. 3
      pkg/storage/chunk/client/local/fs_object_client.go
  28. 4
      pkg/storage/chunk/client/metrics.go
  29. 5
      pkg/storage/chunk/client/object_client.go
  30. 3
      pkg/storage/chunk/client/openstack/swift_object_client.go
  31. 2
      pkg/storage/chunk/client/testutils/inmemory_storage_client.go
  32. 19
      pkg/storage/factory.go
  33. 19
      pkg/storage/store.go
  34. 4
      pkg/storage/stores/indexshipper/storage/prefixed_object_client.go
  35. 3
      pkg/storage/stores/series_store_write_test.go
  36. 4
      pkg/storage/util_test.go

@ -54,6 +54,7 @@
* [8662](https://github.com/grafana/loki/pull/8662) **liguozhong**: LogQL: Introduce `distinct`
* [9813](https://github.com/grafana/loki/pull/9813) **jeschkies**: Enable Protobuf encoding via content negotiation between querier and query frontend.
* [10281](https://github.com/grafana/loki/pull/10281) **dannykopping**: Track effectiveness of hedged requests.
* [10140](https://github.com/grafana/loki/pull/10140) **dannykopping**: Dynamic client-side throttling to avoid object storage rate-limits (GCS only)
##### Fixes

@ -1914,6 +1914,55 @@ hedging:
# CLI flag: -store.index-cache-validity
[index_cache_validity: <duration> | default = 5m]
congestion_control:
# Use storage congestion control (default: disabled).
# CLI flag: -store.enabled
[enabled: <boolean> | default = false]
controller:
# Congestion control strategy to use (default: none, options: 'aimd').
# CLI flag: -store.congestion-control.strategy
[strategy: <string> | default = ""]
aimd:
# AIMD starting throughput window size: how many requests can be sent per
# second (default: 2000).
# CLI flag: -store.congestion-control.strategy.aimd.start
[start: <int> | default = 2000]
# AIMD maximum throughput window size: upper limit of requests sent per
# second (default: 10000).
# CLI flag: -store.congestion-control.strategy.aimd.upper-bound
[upper_bound: <int> | default = 10000]
# AIMD backoff factor when upstream service is throttled to decrease
# number of requests sent per second (default: 0.5).
# CLI flag: -store.congestion-control.strategy.aimd.backoff-factor
[backoff_factor: <float> | default = 0.5]
retry:
# Congestion control retry strategy to use (default: none, options:
# 'limited').
# CLI flag: -store.retry.strategy
[strategy: <string> | default = ""]
# Maximum number of retries allowed.
# CLI flag: -store.retry.strategy.limited.limit
[limit: <int> | default = 2]
hedging:
config:
[at: <duration>]
[up_to: <int>]
[max_per_second: <int>]
# Congestion control hedge strategy to use (default: none, options:
# 'limited').
# CLI flag: -store.hedge.strategy
[strategy: <string> | default = ""]
# The cache block configures the cache backend.
# The CLI flags prefix for this block configuration is: store.index-cache-read
[index_queries_cache_config: <cache_config>]
@ -3157,6 +3206,55 @@ storage:
# The CLI flags prefix for this block configuration is: common.storage
[cos: <cos_storage_config>]
congestion_control:
# Use storage congestion control (default: disabled).
# CLI flag: -common.storage.enabled
[enabled: <boolean> | default = false]
controller:
# Congestion control strategy to use (default: none, options: 'aimd').
# CLI flag: -common.storage.congestion-control.strategy
[strategy: <string> | default = ""]
aimd:
# AIMD starting throughput window size: how many requests can be sent
# per second (default: 2000).
# CLI flag: -common.storage.congestion-control.strategy.aimd.start
[start: <int> | default = 2000]
# AIMD maximum throughput window size: upper limit of requests sent per
# second (default: 10000).
# CLI flag: -common.storage.congestion-control.strategy.aimd.upper-bound
[upper_bound: <int> | default = 10000]
# AIMD backoff factor when upstream service is throttled to decrease
# number of requests sent per second (default: 0.5).
# CLI flag: -common.storage.congestion-control.strategy.aimd.backoff-factor
[backoff_factor: <float> | default = 0.5]
retry:
# Congestion control retry strategy to use (default: none, options:
# 'limited').
# CLI flag: -common.storage.retry.strategy
[strategy: <string> | default = ""]
# Maximum number of retries allowed.
# CLI flag: -common.storage.retry.strategy.limited.limit
[limit: <int> | default = 2]
hedging:
config:
[at: <duration>]
[up_to: <int>]
[max_per_second: <int>]
# Congestion control hedge strategy to use (default: none, options:
# 'limited').
# CLI flag: -common.storage.hedge.strategy
[strategy: <string> | default = ""]
[persist_tokens: <boolean>]
[replication_factor: <int>]
@ -4420,6 +4518,10 @@ The `gcs_storage_config` block configures the connection to Google Cloud Storage
# Enable HTTP2 connections.
# CLI flag: -<prefix>.gcs.enable-http2
[enable_http2: <boolean> | default = true]
# Enable automatic retries of failed idempotent requests.
# CLI flag: -<prefix>.gcs.enable-retries
[enable_retries: <boolean> | default = true]
```
### s3_storage_config

@ -130,6 +130,7 @@ require (
golang.org/x/oauth2 v0.10.0
golang.org/x/text v0.11.0
google.golang.org/protobuf v1.31.0
k8s.io/apimachinery v0.26.2
)
require (
@ -315,7 +316,6 @@ require (
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
k8s.io/api v0.26.2 // indirect
k8s.io/apimachinery v0.26.2 // indirect
k8s.io/client-go v0.26.2 // indirect
k8s.io/klog/v2 v2.90.1 // indirect
k8s.io/kube-openapi v0.0.0-20230303024457-afdc3dddf62d // indirect

@ -10,6 +10,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/client/aws"
"github.com/grafana/loki/pkg/storage/chunk/client/azure"
"github.com/grafana/loki/pkg/storage/chunk/client/baidubce"
"github.com/grafana/loki/pkg/storage/chunk/client/congestion"
"github.com/grafana/loki/pkg/storage/chunk/client/gcp"
"github.com/grafana/loki/pkg/storage/chunk/client/hedging"
"github.com/grafana/loki/pkg/storage/chunk/client/ibmcloud"
@ -67,15 +68,16 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
}
type Storage struct {
S3 aws.S3Config `yaml:"s3"`
GCS gcp.GCSConfig `yaml:"gcs"`
Azure azure.BlobStorageConfig `yaml:"azure"`
AlibabaCloud alibaba.OssConfig `yaml:"alibabacloud"`
BOS baidubce.BOSStorageConfig `yaml:"bos"`
Swift openstack.SwiftConfig `yaml:"swift"`
FSConfig FilesystemConfig `yaml:"filesystem"`
Hedging hedging.Config `yaml:"hedging"`
COS ibmcloud.COSConfig `yaml:"cos"`
S3 aws.S3Config `yaml:"s3"`
GCS gcp.GCSConfig `yaml:"gcs"`
Azure azure.BlobStorageConfig `yaml:"azure"`
AlibabaCloud alibaba.OssConfig `yaml:"alibabacloud"`
BOS baidubce.BOSStorageConfig `yaml:"bos"`
Swift openstack.SwiftConfig `yaml:"swift"`
FSConfig FilesystemConfig `yaml:"filesystem"`
Hedging hedging.Config `yaml:"hedging"`
COS ibmcloud.COSConfig `yaml:"cos"`
CongestionControl congestion.Config `yaml:"congestion_control,omitempty"`
}
func (s *Storage) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
@ -88,6 +90,7 @@ func (s *Storage) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
s.FSConfig.RegisterFlagsWithPrefix(prefix, f)
s.Hedging.RegisterFlagsWithPrefix(prefix, f)
s.COS.RegisterFlagsWithPrefix(prefix, f)
s.CongestionControl.RegisterFlagsWithPrefix(prefix, f)
}
type FilesystemConfig struct {

@ -504,6 +504,12 @@ func applyStorageConfig(cfg, defaults *ConfigWrapper) error {
}
}
if !reflect.DeepEqual(cfg.Common.Storage.CongestionControl, defaults.StorageConfig.CongestionControl) {
applyConfig = func(r *ConfigWrapper) {
r.StorageConfig.CongestionControl = r.Common.Storage.CongestionControl
}
}
if configsFound > 1 {
return ErrTooManyStorageConfigs
}

@ -162,3 +162,6 @@ func (s *OssObjectClient) IsObjectNotFoundErr(err error) bool {
return false
}
}
// TODO(dannyk): implement for client
func (s *OssObjectClient) IsRetryableErr(error) bool { return false }

@ -578,6 +578,10 @@ func (a dynamoDBStorageClient) IsChunkNotFoundErr(_ error) bool {
return false
}
func (a dynamoDBStorageClient) IsRetryableErr(_ error) bool {
return false
}
func (a dynamoDBStorageClient) writesForChunks(chunks []chunk.Chunk) (dynamoDBWriteBatch, error) {
dynamoDBWrites := dynamoDBWriteBatch{}

@ -483,3 +483,6 @@ func (a *S3ObjectClient) IsObjectNotFoundErr(err error) bool {
return false
}
// TODO(dannyk): implement for client
func (a *S3ObjectClient) IsRetryableErr(error) bool { return false }

@ -558,3 +558,6 @@ func (b *BlobStorage) IsObjectNotFoundErr(err error) bool {
return false
}
// TODO(dannyk): implement for client
func (b *BlobStorage) IsRetryableErr(error) bool { return false }

@ -3,7 +3,6 @@ package baidubce
import (
"context"
"flag"
"io"
"time"
@ -171,3 +170,6 @@ func (b *BOSObjectStorage) IsObjectNotFoundErr(err error) bool {
}
func (b *BOSObjectStorage) Stop() {}
// TODO(dannyk): implement for client
func (b *BOSObjectStorage) IsRetryableErr(error) bool { return false }

@ -670,6 +670,10 @@ func (s *ObjectClient) IsChunkNotFoundErr(_ error) bool {
return false
}
func (s *ObjectClient) IsRetryableErr(_ error) bool {
return false
}
// Stop implement chunk.ObjectClient.
func (s *ObjectClient) Stop() {
s.readSession.Close()

@ -22,6 +22,7 @@ type Client interface {
GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error)
DeleteChunk(ctx context.Context, userID, chunkID string) error
IsChunkNotFoundErr(err error) bool
IsRetryableErr(err error) bool
}
// ObjectAndIndexClient allows optimisations where the same client handles both

@ -0,0 +1,73 @@
package congestion
import (
"flag"
"github.com/grafana/loki/pkg/storage/chunk/client/hedging"
)
type Config struct {
Enabled bool `yaml:"enabled"`
Controller ControllerConfig `yaml:"controller"`
Retry RetrierConfig `yaml:"retry"`
Hedge HedgerConfig `yaml:"hedging"`
}
func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.BoolVar(&c.Enabled, prefix+"enabled", false, "Use storage congestion control (default: disabled).")
c.Controller.RegisterFlagsWithPrefix(prefix+"congestion-control.", f)
c.Retry.RegisterFlagsWithPrefix(prefix+"retry.", f)
c.Hedge.RegisterFlagsWithPrefix(prefix+"hedge.", f)
}
type AIMD struct {
Start uint `yaml:"start"`
UpperBound uint `yaml:"upper_bound"`
BackoffFactor float64 `yaml:"backoff_factor"`
}
type ControllerConfig struct {
Strategy string `yaml:"strategy"`
AIMD AIMD `yaml:"aimd"`
}
func (c *ControllerConfig) RegisterFlags(f *flag.FlagSet) {
c.RegisterFlagsWithPrefix("", f)
}
func (c *ControllerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&c.Strategy, prefix+"strategy", "", "Congestion control strategy to use (default: none, options: 'aimd').")
f.UintVar(&c.AIMD.Start, prefix+"strategy.aimd.start", 2000, "AIMD starting throughput window size: how many requests can be sent per second (default: 2000).")
f.UintVar(&c.AIMD.UpperBound, prefix+"strategy.aimd.upper-bound", 10000, "AIMD maximum throughput window size: upper limit of requests sent per second (default: 10000).")
f.Float64Var(&c.AIMD.BackoffFactor, prefix+"strategy.aimd.backoff-factor", 0.5, "AIMD backoff factor when upstream service is throttled to decrease number of requests sent per second (default: 0.5).")
}
type RetrierConfig struct {
Strategy string `yaml:"strategy"`
Limit int `yaml:"limit"`
}
func (c *RetrierConfig) RegisterFlags(f *flag.FlagSet) {
c.RegisterFlagsWithPrefix("", f)
}
func (c *RetrierConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&c.Strategy, prefix+"strategy", "", "Congestion control retry strategy to use (default: none, options: 'limited').")
f.IntVar(&c.Limit, prefix+"strategy.limited.limit", 2, "Maximum number of retries allowed.")
}
type HedgerConfig struct {
hedging.Config
Strategy string `yaml:"strategy"`
}
func (c *HedgerConfig) RegisterFlags(f *flag.FlagSet) {
c.RegisterFlagsWithPrefix("", f)
}
func (c *HedgerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&c.Strategy, prefix+"strategy", "", "Congestion control hedge strategy to use (default: none, options: 'limited').")
// TODO hedge configs
}

@ -0,0 +1,48 @@
package congestion
import (
"strings"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
)
func NewController(cfg Config, logger log.Logger, metrics *Metrics) Controller {
logger = log.With(logger, "component", "congestion_control")
return newController(cfg, logger).
withRetrier(newRetrier(cfg, logger)).
withHedger(newHedger(cfg, logger)).
withMetrics(metrics)
}
func newController(cfg Config, logger log.Logger) Controller {
strat := strings.ToLower(cfg.Controller.Strategy)
switch strat {
case "aimd":
return NewAIMDController(cfg)
default:
level.Warn(logger).Log("msg", "unrecognized congestion control strategy in config, using noop", "strategy", strat)
return NewNoopController(cfg)
}
}
func newRetrier(cfg Config, logger log.Logger) Retrier {
strat := strings.ToLower(cfg.Retry.Strategy)
switch strat {
case "limited":
return NewLimitedRetrier(cfg)
default:
level.Warn(logger).Log("msg", "unrecognized retried strategy in config, using noop", "strategy", strat)
return NewNoopRetrier(cfg)
}
}
func newHedger(cfg Config, logger log.Logger) Hedger {
strat := strings.ToLower(cfg.Hedge.Strategy)
switch strat {
default:
level.Warn(logger).Log("msg", "unrecognized hedging strategy in config, using noop", "strategy", strat)
return NewNoopHedger(cfg)
}
}

@ -0,0 +1,69 @@
package congestion
import (
"testing"
"github.com/go-kit/log"
"github.com/stretchr/testify/require"
)
func TestZeroValueConstruction(t *testing.T) {
cfg := Config{}
ctrl := NewController(cfg, log.NewNopLogger(), NewMetrics(t.Name(), cfg))
require.IsType(t, &NoopController{}, ctrl)
require.IsType(t, &NoopRetrier{}, ctrl.getRetrier())
require.IsType(t, &NoopHedger{}, ctrl.getHedger())
}
func TestAIMDConstruction(t *testing.T) {
cfg := Config{
Controller: ControllerConfig{
Strategy: "aimd",
},
}
ctrl := NewController(cfg, log.NewNopLogger(), NewMetrics(t.Name(), cfg))
require.IsType(t, &AIMDController{}, ctrl)
require.IsType(t, &NoopRetrier{}, ctrl.getRetrier())
require.IsType(t, &NoopHedger{}, ctrl.getHedger())
}
func TestRetrierConstruction(t *testing.T) {
cfg := Config{
Retry: RetrierConfig{
Strategy: "limited",
},
}
ctrl := NewController(cfg, log.NewNopLogger(), NewMetrics(t.Name(), cfg))
require.IsType(t, &NoopController{}, ctrl)
require.IsType(t, &LimitedRetrier{}, ctrl.getRetrier())
require.IsType(t, &NoopHedger{}, ctrl.getHedger())
}
func TestCombinedConstruction(t *testing.T) {
cfg := Config{
Controller: ControllerConfig{
Strategy: "aimd",
},
Retry: RetrierConfig{
Strategy: "limited",
},
}
ctrl := NewController(cfg, log.NewNopLogger(), NewMetrics(t.Name(), cfg))
require.IsType(t, &AIMDController{}, ctrl)
require.IsType(t, &LimitedRetrier{}, ctrl.getRetrier())
require.IsType(t, &NoopHedger{}, ctrl.getHedger())
}
func TestHedgerConstruction(t *testing.T) {
//cfg := Config{
// Hedge: HedgerConfig{
// Strategy: "dont-hedge-retries",
// },
//}
// TODO(dannyk): implement hedging
t.Skip("hedging not yet implemented")
}

@ -0,0 +1,216 @@
package congestion
import (
"context"
"errors"
"io"
"math"
"time"
"golang.org/x/time/rate"
"github.com/grafana/loki/pkg/storage/chunk/client"
)
// AIMDController implements the Additive-Increase/Multiplicative-Decrease algorithm which is used in TCP congestion avoidance.
// https://en.wikipedia.org/wiki/Additive_increase/multiplicative_decrease
type AIMDController struct {
inner client.ObjectClient
retrier Retrier
hedger Hedger
metrics *Metrics
limiter *rate.Limiter
backoffFactor float64
upperBound rate.Limit
}
func NewAIMDController(cfg Config) *AIMDController {
lowerBound := rate.Limit(cfg.Controller.AIMD.Start)
upperBound := rate.Limit(cfg.Controller.AIMD.UpperBound)
if lowerBound <= 0 {
lowerBound = 1
}
if upperBound <= 0 {
// set to infinity if not defined
upperBound = rate.Limit(math.Inf(1))
}
backoffFactor := cfg.Controller.AIMD.BackoffFactor
if backoffFactor <= 0 {
// AIMD algorithm calls for halving rate
backoffFactor = 0.5
}
return &AIMDController{
limiter: rate.NewLimiter(lowerBound, int(lowerBound)),
backoffFactor: backoffFactor,
upperBound: upperBound,
}
}
func (a *AIMDController) Wrap(client client.ObjectClient) client.ObjectClient {
a.inner = client
return a
}
func (a *AIMDController) withRetrier(r Retrier) Controller {
a.retrier = r
return a
}
func (a *AIMDController) withHedger(h Hedger) Controller {
a.hedger = h
return a
}
func (a *AIMDController) withMetrics(m *Metrics) Controller {
a.metrics = m
a.updateLimitMetric()
return a
}
func (a *AIMDController) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error {
return a.inner.PutObject(ctx, objectKey, object)
}
func (a *AIMDController) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) {
// Only GetObject implements congestion avoidance; the other methods are either non-idempotent which means they
// cannot be retried, or are too low volume to care about
// TODO(dannyk): use hedging client to handle requests, do NOT hedge retries
rc, sz, err := a.retrier.Do(
func(attempt int) (io.ReadCloser, int64, error) {
a.metrics.requests.Add(1)
// in retry
if attempt > 0 {
a.metrics.retries.Add(1)
}
// apply back-pressure while rate-limit has been exceeded
//
// using Reserve() is slower because it assumes a constant wait time as tokens are replenished, but in experimentation
// it's faster to sit in a hot loop and probe every so often if there are tokens available
for !a.limiter.Allow() {
delay := time.Millisecond * 10
time.Sleep(delay)
a.metrics.backoffSec.Add(delay.Seconds())
}
// It is vitally important that retries are DISABLED in the inner implementation.
// Some object storage clients implement retries internally, and this will interfere here.
return a.inner.GetObject(ctx, objectKey)
},
a.IsRetryableErr,
a.additiveIncrease,
a.multiplicativeDecrease,
)
if errors.Is(err, RetriesExceeded) {
a.metrics.retriesExceeded.Add(1)
}
return rc, sz, err
}
func (a *AIMDController) List(ctx context.Context, prefix string, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
return a.inner.List(ctx, prefix, delimiter)
}
func (a *AIMDController) DeleteObject(ctx context.Context, objectKey string) error {
return a.inner.DeleteObject(ctx, objectKey)
}
func (a *AIMDController) IsObjectNotFoundErr(err error) bool {
return a.inner.IsObjectNotFoundErr(err)
}
func (a *AIMDController) IsRetryableErr(err error) bool {
retryable := a.inner.IsRetryableErr(err)
if !retryable {
a.metrics.nonRetryableErrors.Inc()
}
return retryable
}
func (a *AIMDController) Stop() {
a.inner.Stop()
}
// additiveIncrease increases the number of requests per second that can be sent linearly.
// it should never exceed the defined upper bound.
func (a *AIMDController) additiveIncrease() {
newLimit := a.limiter.Limit() + 1
if newLimit > a.upperBound {
newLimit = a.upperBound
}
a.limiter.SetLimit(newLimit)
a.limiter.SetBurst(int(newLimit))
a.updateLimitMetric()
}
// multiplicativeDecrease reduces the number of requests per second that can be sent exponentially.
// it should never be set lower than 1.
func (a *AIMDController) multiplicativeDecrease() {
newLimit := math.Ceil(math.Max(1, float64(a.limiter.Limit())*a.backoffFactor))
a.limiter.SetLimit(rate.Limit(newLimit))
a.limiter.SetBurst(int(newLimit))
a.updateLimitMetric()
}
func (a *AIMDController) updateLimitMetric() {
a.metrics.currentLimit.Set(float64(a.limiter.Limit()))
}
func (a *AIMDController) getRetrier() Retrier { return a.retrier }
func (a *AIMDController) getHedger() Hedger { return a.hedger }
func (a *AIMDController) getMetrics() *Metrics { return a.metrics }
type NoopController struct {
retrier Retrier
hedger Hedger
metrics *Metrics
}
func NewNoopController(Config) *NoopController {
return &NoopController{}
}
func (n *NoopController) PutObject(context.Context, string, io.ReadSeeker) error { return nil }
func (n *NoopController) GetObject(context.Context, string) (io.ReadCloser, int64, error) {
return nil, 0, nil
}
func (n *NoopController) List(context.Context, string, string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
return nil, nil, nil
}
func (n *NoopController) DeleteObject(context.Context, string) error { return nil }
func (n *NoopController) IsObjectNotFoundErr(error) bool { return false }
func (n *NoopController) IsRetryableErr(error) bool { return false }
func (n *NoopController) Stop() {}
func (n *NoopController) Wrap(c client.ObjectClient) client.ObjectClient { return c }
func (n *NoopController) withRetrier(r Retrier) Controller {
n.retrier = r
return n
}
func (n *NoopController) withHedger(h Hedger) Controller {
n.hedger = h
return n
}
func (n *NoopController) withMetrics(m *Metrics) Controller {
n.metrics = m
return n
}
func (n *NoopController) getRetrier() Retrier { return n.retrier }
func (n *NoopController) getHedger() Hedger { return n.hedger }
func (n *NoopController) getMetrics() *Metrics { return n.metrics }

@ -0,0 +1,290 @@
package congestion
import (
"context"
"io"
"strings"
"sync"
"testing"
"time"
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"github.com/grafana/loki/pkg/storage/chunk/client"
)
var errFakeFailure = errors.New("fake failure")
func TestRequestNoopRetry(t *testing.T) {
cfg := Config{
Controller: ControllerConfig{
Strategy: "aimd",
},
}
metrics := NewMetrics(t.Name(), cfg)
ctrl := NewController(cfg, log.NewNopLogger(), metrics)
// allow 1 request through, fail the rest
cli := newMockObjectClient(maxFailer{max: 1})
ctrl.Wrap(cli)
ctx := context.Background()
// first request succeeds
_, _, err := ctrl.GetObject(ctx, "foo")
require.NoError(t, err)
// nothing is done for failed requests
_, _, err = ctrl.GetObject(ctx, "foo")
require.ErrorIs(t, err, errFakeFailure)
require.EqualValues(t, 2, testutil.ToFloat64(metrics.requests))
require.EqualValues(t, 0, testutil.ToFloat64(metrics.retries))
}
func TestRequestZeroLimitedRetry(t *testing.T) {
cfg := Config{
Controller: ControllerConfig{
Strategy: "aimd",
},
Retry: RetrierConfig{
Strategy: "limited",
Limit: 0,
},
}
metrics := NewMetrics(t.Name(), cfg)
ctrl := NewController(cfg, log.NewNopLogger(), metrics)
// fail all requests
cli := newMockObjectClient(maxFailer{max: 0})
ctrl.Wrap(cli)
ctx := context.Background()
// first request fails, no retry is executed because limit = 0
_, _, err := ctrl.GetObject(ctx, "foo")
require.ErrorIs(t, err, RetriesExceeded)
require.EqualValues(t, 1, testutil.ToFloat64(metrics.requests))
require.EqualValues(t, 0, testutil.ToFloat64(metrics.retries))
}
func TestRequestLimitedRetry(t *testing.T) {
cfg := Config{
Controller: ControllerConfig{
Strategy: "aimd",
},
Retry: RetrierConfig{
Strategy: "limited",
Limit: 2,
},
}
metrics := NewMetrics(t.Name(), cfg)
ctrl := NewController(cfg, log.NewNopLogger(), metrics)
// allow 1 request through, fail the rest
cli := newMockObjectClient(maxFailer{max: 1})
ctrl.Wrap(cli)
ctx := context.Background()
// first request succeeds, no retries
_, _, err := ctrl.GetObject(ctx, "foo")
require.NoError(t, err)
require.EqualValues(t, 0, testutil.ToFloat64(metrics.retriesExceeded))
require.EqualValues(t, 0, testutil.ToFloat64(metrics.retries))
require.EqualValues(t, 1, testutil.ToFloat64(metrics.requests))
// all requests will now fail, which should incur 1 request & 2 retries
_, _, err = ctrl.GetObject(ctx, "foo")
require.ErrorIs(t, err, RetriesExceeded)
require.EqualValues(t, 1, testutil.ToFloat64(metrics.retriesExceeded))
require.EqualValues(t, 2, testutil.ToFloat64(metrics.retries))
require.EqualValues(t, 4, testutil.ToFloat64(metrics.requests))
}
func TestRequestLimitedRetryNonRetryableErr(t *testing.T) {
cfg := Config{
Controller: ControllerConfig{
Strategy: "aimd",
},
Retry: RetrierConfig{
Strategy: "limited",
Limit: 2,
},
}
metrics := NewMetrics(t.Name(), cfg)
ctrl := NewController(cfg, log.NewNopLogger(), metrics)
// fail all requests
cli := newMockObjectClient(maxFailer{max: 0})
// mark errors as non-retryable
cli.nonRetryableErrs = true
ctrl.Wrap(cli)
ctx := context.Background()
// request fails, retries not done since error is non-retryable
_, _, err := ctrl.GetObject(ctx, "foo")
require.ErrorIs(t, err, errFakeFailure)
require.EqualValues(t, 0, testutil.ToFloat64(metrics.retries))
require.EqualValues(t, 1, testutil.ToFloat64(metrics.nonRetryableErrors))
require.EqualValues(t, 1, testutil.ToFloat64(metrics.requests))
}
func TestAIMDReducedThroughput(t *testing.T) {
cfg := Config{
Controller: ControllerConfig{
Strategy: "aimd",
AIMD: AIMD{
Start: 1000,
UpperBound: 5000,
BackoffFactor: 0.5,
},
},
Retry: RetrierConfig{
Strategy: "limited",
Limit: 1,
},
}
var trigger atomic.Bool
metrics := NewMetrics(t.Name(), cfg)
ctrl := NewController(cfg, log.NewNopLogger(), metrics)
// fail requests only when triggered
cli := newMockObjectClient(triggeredFailer{trigger: &trigger})
ctrl.Wrap(cli)
// run for 1 second, measure the per-second rate of requests & successful responses
count, success := runAndMeasureRate(ctrl, time.Second)
require.Greater(t, count, 1.0)
require.Greater(t, success, 1.0)
// no time spent backing off because the per-second limit will not be hit
require.EqualValues(t, 0, testutil.ToFloat64(metrics.backoffSec))
previousCount, previousSuccess := count, success
var wg sync.WaitGroup
done := make(chan bool, 1)
// every 100ms trigger a failure
wg.Add(1)
go func(trigger *atomic.Bool) {
defer wg.Done()
tick := time.NewTicker(time.Millisecond * 100)
defer tick.Stop()
for {
select {
case <-tick.C:
trigger.Store(true)
case <-done:
return
}
}
}(&trigger)
// now, run the requests again but there will now be a failure rate & some throttling involved
count, success = runAndMeasureRate(ctrl, time.Second)
done <- true
wg.Wait()
// should have processed fewer requests than the last period
require.Less(t, count, previousCount)
require.Less(t, success, previousSuccess)
// should have fewer successful requests than total since we are failing some
require.Less(t, success, count)
}
func runAndMeasureRate(ctrl Controller, duration time.Duration) (float64, float64) {
var count, success float64
tick := time.NewTimer(duration)
defer tick.Stop()
for {
select {
case <-tick.C:
goto result
default:
ctx := context.Background()
count++
_, _, err := ctrl.GetObject(ctx, "foo")
if err == nil {
success++
}
}
}
result:
return count / duration.Seconds(), success / duration.Seconds()
}
type mockObjectClient struct {
reqCounter atomic.Uint64
strategy requestFailer
nonRetryableErrs bool
}
func (m *mockObjectClient) PutObject(context.Context, string, io.ReadSeeker) error {
panic("not implemented")
}
func (m *mockObjectClient) GetObject(context.Context, string) (io.ReadCloser, int64, error) {
time.Sleep(time.Millisecond * 10)
if m.strategy.fail(m.reqCounter.Inc()) {
return nil, 0, errFakeFailure
}
return io.NopCloser(strings.NewReader("bar")), 3, nil
}
func (m *mockObjectClient) List(context.Context, string, string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
panic("not implemented")
}
func (m *mockObjectClient) DeleteObject(context.Context, string) error {
panic("not implemented")
}
func (m *mockObjectClient) IsObjectNotFoundErr(error) bool { return false }
func (m *mockObjectClient) IsRetryableErr(error) bool { return !m.nonRetryableErrs }
func (m *mockObjectClient) Stop() {}
func newMockObjectClient(strat requestFailer) *mockObjectClient {
return &mockObjectClient{strategy: strat}
}
type requestFailer interface {
fail(i uint64) bool
}
type maxFailer struct {
max uint64
}
func (m maxFailer) fail(i uint64) bool { return i > m.max }
type triggeredFailer struct {
trigger *atomic.Bool
}
func (t triggeredFailer) fail(_ uint64) bool {
if t.trigger.Load() {
t.trigger.Store(false)
return true
}
return false
}

@ -0,0 +1,17 @@
package congestion
import (
"net/http"
"github.com/grafana/loki/pkg/storage/chunk/client/hedging"
)
type NoopHedger struct{}
func NewNoopHedger(Config) *NoopHedger {
return &NoopHedger{}
}
func (n NoopHedger) HTTPClient(hedging.Config) (*http.Client, error) {
return http.DefaultClient, nil
}

@ -0,0 +1,50 @@
package congestion
import (
"io"
"net/http"
"github.com/grafana/loki/pkg/storage/chunk/client"
"github.com/grafana/loki/pkg/storage/chunk/client/hedging"
)
// Controller handles congestion by:
// - determining if calls to object storage can be retried
// - defining and enforcing a back-pressure mechanism
// - centralising retries & hedging
type Controller interface {
client.ObjectClient
// Wrap wraps a given object store client and handles congestion against its backend service
Wrap(client client.ObjectClient) client.ObjectClient
withRetrier(Retrier) Controller
withHedger(Hedger) Controller
withMetrics(*Metrics) Controller
getRetrier() Retrier
getHedger() Hedger
getMetrics() *Metrics
}
type DoRequestFunc func(attempt int) (io.ReadCloser, int64, error)
type IsRetryableErrFunc func(err error) bool
// Retrier orchestrates requests & subsequent retries (if configured).
// NOTE: this only supports ObjectClient.GetObject calls right now.
type Retrier interface {
// Do executes a given function which is expected to be a GetObject call, and its return signature matches that.
// Any failed requests will be retried.
//
// count is the current request count; any positive number indicates retries, 0 indicates first attempt.
Do(fn DoRequestFunc, isRetryable IsRetryableErrFunc, onSuccess func(), onError func()) (io.ReadCloser, int64, error)
}
// Hedger orchestrates request "hedging", which is the process of sending a new request when the old request is
// taking too long, and returning the response that is received first
type Hedger interface {
// HTTPClient returns an HTTP client which is responsible for handling both the initial and all hedged requests.
// It is recommended that retries are not hedged.
// Bear in mind this function can be called several times, and should return the same client each time.
HTTPClient(cfg hedging.Config) (*http.Client, error)
}

@ -0,0 +1,82 @@
package congestion
import "github.com/prometheus/client_golang/prometheus"
type Metrics struct {
currentLimit prometheus.Gauge
backoffSec prometheus.Counter
requests prometheus.Counter
retries prometheus.Counter
nonRetryableErrors prometheus.Counter
retriesExceeded prometheus.Counter
}
func (m Metrics) Unregister() {
prometheus.Unregister(m.currentLimit)
}
// NewMetrics creates metrics to be used for monitoring congestion control.
// It needs to accept a "name" because congestion control is used in object clients, and there can be many object clients
// creates for the same store (multiple period configs, etc). It is the responsibility of the caller to ensure uniqueness,
// otherwise a duplicate registration panic will occur.
func NewMetrics(name string, cfg Config) *Metrics {
labels := map[string]string{
"strategy": cfg.Controller.Strategy,
"name": name,
}
const namespace = "loki"
const subsystem = "store_congestion_control"
m := Metrics{
currentLimit: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "limit",
Help: "Current per-second request limit to control congestion",
ConstLabels: labels,
}),
backoffSec: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "backoff_seconds_total",
Help: "How much time is spent backing off once throughput limit is encountered",
ConstLabels: labels,
}),
requests: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "requests_total",
Help: "How many requests were issued to the store",
ConstLabels: labels,
}),
retries: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "retries_total",
Help: "How many retries occurred",
ConstLabels: labels,
}),
nonRetryableErrors: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "non_retryable_errors_total",
Help: "How many request errors occurred which could not be retried",
ConstLabels: labels,
}),
retriesExceeded: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "retries_exceeded_total",
Help: "How many times the number of retries exceeded the configured limit.",
ConstLabels: labels,
}),
}
prometheus.MustRegister(m.currentLimit)
prometheus.MustRegister(m.backoffSec)
prometheus.MustRegister(m.requests)
prometheus.MustRegister(m.retries)
prometheus.MustRegister(m.nonRetryableErrors)
prometheus.MustRegister(m.retriesExceeded)
return &m
}

@ -0,0 +1,54 @@
package congestion
import (
"errors"
"io"
)
var RetriesExceeded = errors.New("retries exceeded")
type NoopRetrier struct{}
func NewNoopRetrier(Config) *NoopRetrier {
return &NoopRetrier{}
}
func (n NoopRetrier) Do(fn DoRequestFunc, _ IsRetryableErrFunc, _ func(), _ func()) (io.ReadCloser, int64, error) {
// don't retry, just execute the given function once
return fn(0)
}
// LimitedRetrier executes the initial request plus a configurable limit of subsequent retries.
// limit=0 is equivalent to NoopRetrier
type LimitedRetrier struct {
limit int
}
func NewLimitedRetrier(cfg Config) *LimitedRetrier {
return &LimitedRetrier{limit: cfg.Retry.Limit}
}
func (l *LimitedRetrier) Do(fn DoRequestFunc, isRetryable IsRetryableErrFunc, onSuccess func(), onError func()) (io.ReadCloser, int64, error) {
// i = 0 is initial request
// i > 0 is retry
for i := 0; i <= l.limit; i++ {
rc, sz, err := fn(i)
if err != nil {
if !isRetryable(err) {
return rc, sz, err
}
// TODO(dannyk): consider this more carefully
// only decrease rate-limit if error is retryable, otherwise all errors (context cancelled, dial errors, timeouts, etc)
// which may be mostly client-side would inappropriately reduce throughput
onError()
continue
}
onSuccess()
return rc, sz, err
}
return nil, 0, RetriesExceeded
}

@ -186,3 +186,7 @@ func (s *bigtableObjectClient) DeleteChunk(ctx context.Context, userID, chunkID
func (s *bigtableObjectClient) IsChunkNotFoundErr(_ error) bool {
return false
}
func (s *bigtableObjectClient) IsRetryableErr(_ error) bool {
return false
}

@ -13,9 +13,11 @@ import (
"github.com/grafana/dskit/flagext"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
google_http "google.golang.org/api/transport/http"
amnet "k8s.io/apimachinery/pkg/util/net"
"github.com/grafana/loki/pkg/storage/chunk/client"
"github.com/grafana/loki/pkg/storage/chunk/client/hedging"
@ -40,6 +42,9 @@ type GCSConfig struct {
EnableOpenCensus bool `yaml:"enable_opencensus"`
EnableHTTP2 bool `yaml:"enable_http2"`
// TODO(dannyk): remove this and disable GCS client retries; move a layer higher instead.
EnableRetries bool `yaml:"enable_retries"`
Insecure bool `yaml:"-"`
}
@ -56,6 +61,7 @@ func (cfg *GCSConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.RequestTimeout, prefix+"gcs.request-timeout", 0, "The duration after which the requests to GCS should be timed out.")
f.BoolVar(&cfg.EnableOpenCensus, prefix+"gcs.enable-opencensus", true, "Enable OpenCensus (OC) instrumentation for all requests.")
f.BoolVar(&cfg.EnableHTTP2, prefix+"gcs.enable-http2", true, "Enable HTTP2 connections.")
f.BoolVar(&cfg.EnableRetries, prefix+"gcs.enable-retries", true, "Enable automatic retries of failed idempotent requests.")
}
// NewGCSObjectClient makes a new chunk.Client that writes chunks to GCS.
@ -107,7 +113,13 @@ func newBucketHandle(ctx context.Context, cfg GCSConfig, hedgingCfg hedging.Conf
return nil, err
}
return client.Bucket(cfg.BucketName), nil
bucket := client.Bucket(cfg.BucketName)
if !cfg.EnableRetries {
bucket = bucket.Retryer(storage.WithPolicy(storage.RetryNever))
}
return bucket, nil
}
func (s *GCSObjectClient) Stop() {
@ -215,6 +227,66 @@ func (s *GCSObjectClient) IsObjectNotFoundErr(err error) bool {
return errors.Is(err, storage.ErrObjectNotExist)
}
func isTimeoutError(err error) bool {
var netErr net.Error
return errors.As(err, &netErr) && netErr.Timeout()
}
func isContextErr(err error) bool {
return errors.Is(err, context.DeadlineExceeded) ||
errors.Is(err, context.Canceled)
}
// IsStorageTimeoutErr returns true if error means that object cannot be retrieved right now due to server-side timeouts.
func (s *GCSObjectClient) IsStorageTimeoutErr(err error) bool {
// TODO(dannyk): move these out to be generic
// context errors are all client-side
if isContextErr(err) {
return false
}
// connection misconfiguration, or writing on a closed connection
// do NOT retry; this is not a server-side issue
if errors.Is(err, net.ErrClosed) || amnet.IsConnectionRefused(err) {
return false
}
// this is a server-side timeout
if isTimeoutError(err) {
return true
}
// connection closed (closed before established) or reset (closed after established)
// this is a server-side issue
if errors.Is(err, io.EOF) || amnet.IsConnectionReset(err) {
return true
}
if gerr, ok := err.(*googleapi.Error); ok {
// https://cloud.google.com/storage/docs/retry-strategy
return gerr.Code == http.StatusRequestTimeout ||
gerr.Code == http.StatusGatewayTimeout
}
return false
}
// IsStorageThrottledErr returns true if error means that object cannot be retrieved right now due to throttling.
func (s *GCSObjectClient) IsStorageThrottledErr(err error) bool {
if gerr, ok := err.(*googleapi.Error); ok {
// https://cloud.google.com/storage/docs/retry-strategy
return gerr.Code == http.StatusTooManyRequests ||
(gerr.Code/100 == 5) // all 5xx errors are retryable
}
return false
}
// IsRetryableErr returns true if the request failed due to some retryable server-side scenario
func (s *GCSObjectClient) IsRetryableErr(err error) bool {
return s.IsStorageTimeoutErr(err) || s.IsStorageThrottledErr(err)
}
func gcsTransport(ctx context.Context, scope string, insecure bool, http2 bool, serviceAccount flagext.Secret) (http.RoundTripper, error) {
customTransport := &http.Transport{
Proxy: http.ProxyFromEnvironment,

@ -3,6 +3,7 @@ package gcp
import (
"bytes"
"context"
"net"
"net/http"
"net/http/httptest"
"testing"
@ -89,3 +90,180 @@ func fakeServer(t *testing.T, returnIn time.Duration, counter *atomic.Int32) *ht
return server
}
func TestUpstreamRetryableErrs(t *testing.T) {
tests := []struct {
name string
httpResponseCode int
isThrottledErr bool
isTimeoutErr bool
}{
{
"bad request",
http.StatusBadRequest,
false,
false,
},
{
"too many requests",
http.StatusTooManyRequests,
true,
false,
},
{
"request timeout",
http.StatusRequestTimeout,
false,
true,
},
{
"internal server error",
http.StatusInternalServerError,
true,
false,
},
{
"service unavailable",
http.StatusServiceUnavailable,
true,
false,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
server := fakeHTTPRespondingServer(t, tc.httpResponseCode)
ctx := context.Background()
cli, err := newGCSObjectClient(ctx, GCSConfig{
BucketName: "test-bucket",
Insecure: true,
EnableRetries: false,
}, hedging.Config{}, func(ctx context.Context, opts ...option.ClientOption) (*storage.Client, error) {
opts = append(opts, option.WithEndpoint(server.URL))
opts = append(opts, option.WithoutAuthentication())
return storage.NewClient(ctx, opts...)
})
require.NoError(t, err)
_, _, err = cli.GetObject(ctx, "foo")
require.Equal(t, tc.isThrottledErr, cli.IsStorageThrottledErr(err))
require.Equal(t, tc.isTimeoutErr, cli.IsStorageTimeoutErr(err))
})
}
}
func TestTCPErrs(t *testing.T) {
tests := []struct {
name string
responseSleep time.Duration
connectSleep time.Duration
clientTimeout time.Duration
serverTimeout time.Duration
connectTimeout time.Duration
closeOnNew bool
closeOnActive bool
retryable bool
}{
{
name: "request took longer than client timeout, not retryable",
responseSleep: time.Millisecond * 20,
clientTimeout: time.Millisecond * 10,
retryable: false,
},
{
name: "client timeout exceeded on connect, not retryable",
connectSleep: time.Millisecond * 20,
clientTimeout: time.Millisecond * 10,
retryable: false,
},
{
// there are retryable because it's a server-side timeout
name: "transport connect timeout exceeded, retryable",
connectSleep: time.Millisecond * 40,
connectTimeout: time.Millisecond * 20,
// even though the client timeout is set, the connect timeout will be hit first
clientTimeout: time.Millisecond * 100,
retryable: true,
},
{
name: "connection is closed server-side before being established",
connectSleep: time.Millisecond * 10,
clientTimeout: time.Millisecond * 100,
closeOnNew: true,
retryable: true,
},
{
name: "connection is closed server-side after being established",
connectSleep: time.Millisecond * 10,
clientTimeout: time.Millisecond * 100,
closeOnActive: true,
retryable: true,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
server := fakeSleepingServer(t, tc.responseSleep, tc.connectSleep, tc.closeOnNew, tc.closeOnActive)
ctx, cancelFunc := context.WithTimeout(context.Background(), tc.clientTimeout)
defer t.Cleanup(cancelFunc)
client := http.DefaultClient
transport := http.DefaultTransport.(*http.Transport).Clone()
client.Transport = transport
client.Timeout = tc.connectTimeout
cli, err := newGCSObjectClient(ctx, GCSConfig{
BucketName: "test-bucket",
Insecure: true,
EnableRetries: false,
}, hedging.Config{}, func(ctx context.Context, opts ...option.ClientOption) (*storage.Client, error) {
opts = append(opts, option.WithEndpoint(server.URL))
opts = append(opts, option.WithoutAuthentication())
opts = append(opts, option.WithHTTPClient(client))
return storage.NewClient(ctx, opts...)
})
require.NoError(t, err)
_, _, err = cli.GetObject(ctx, "foo")
require.Error(t, err)
require.Equal(t, tc.retryable, cli.IsStorageTimeoutErr(err))
})
}
}
func fakeHTTPRespondingServer(t *testing.T, code int) *httptest.Server {
server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(code)
}))
server.StartTLS()
t.Cleanup(server.Close)
return server
}
func fakeSleepingServer(t *testing.T, responseSleep, connectSleep time.Duration, closeOnNew, closeOnActive bool) *httptest.Server {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// sleep on response to mimic server overload
time.Sleep(responseSleep)
}))
server.Config.ConnState = func(conn net.Conn, state http.ConnState) {
// sleep on initial connection attempt to mimic server non-responsiveness
if state == http.StateNew {
time.Sleep(connectSleep)
if closeOnNew {
require.NoError(t, conn.Close())
}
}
if closeOnActive && state != http.StateClosed {
require.NoError(t, conn.Close())
}
}
t.Cleanup(server.Close)
return server
}

@ -79,6 +79,10 @@ func (s *StorageClient) IsChunkNotFoundErr(_ error) bool {
return false
}
func (s *StorageClient) IsRetryableErr(_ error) bool {
return false
}
func (s *StorageClient) GetChunks(ctx context.Context, input []chunk.Chunk) ([]chunk.Chunk, error) {
req := &GetChunksRequest{}
req.Chunks = []*Chunk{}

@ -423,3 +423,6 @@ func (c *COSObjectClient) IsObjectNotFoundErr(err error) bool {
return false
}
// TODO(dannyk): implement for client
func (c *COSObjectClient) IsRetryableErr(error) bool { return false }

@ -212,6 +212,9 @@ func (f *FSObjectClient) IsObjectNotFoundErr(err error) bool {
return os.IsNotExist(errors.Cause(err))
}
// TODO(dannyk): implement for client
func (f *FSObjectClient) IsRetryableErr(error) bool { return false }
// copied from https://github.com/thanos-io/thanos/blob/55cb8ca38b3539381dc6a781e637df15c694e50a/pkg/objstore/filesystem/filesystem.go#L181
func isDirEmpty(name string) (ok bool, err error) {
f, err := os.Open(name)

@ -112,3 +112,7 @@ func (c MetricsChunkClient) DeleteChunk(ctx context.Context, userID, chunkID str
func (c MetricsChunkClient) IsChunkNotFoundErr(err error) bool {
return c.Client.IsChunkNotFoundErr(err)
}
func (c MetricsChunkClient) IsRetryableErr(err error) bool {
return c.Client.IsRetryableErr(err)
}

@ -34,6 +34,7 @@ type ObjectClient interface {
List(ctx context.Context, prefix string, delimiter string) ([]StorageObject, []StorageCommonPrefix, error)
DeleteObject(ctx context.Context, objectKey string) error
IsObjectNotFoundErr(err error) bool
IsRetryableErr(err error) bool
Stop()
}
@ -203,3 +204,7 @@ func (o *client) DeleteChunk(ctx context.Context, userID, chunkID string) error
func (o *client) IsChunkNotFoundErr(err error) bool {
return o.store.IsObjectNotFoundErr(err)
}
func (o *client) IsRetryableErr(err error) bool {
return o.store.IsRetryableErr(err)
}

@ -188,3 +188,6 @@ func (s *SwiftObjectClient) DeleteObject(_ context.Context, objectKey string) er
func (s *SwiftObjectClient) IsObjectNotFoundErr(err error) bool {
return errors.Is(err, swift.ObjectNotFound)
}
// TODO(dannyk): implement for client
func (s *SwiftObjectClient) IsRetryableErr(error) bool { return false }

@ -462,6 +462,8 @@ func (m *MockStorage) IsChunkNotFoundErr(err error) bool {
return m.IsObjectNotFoundErr(err)
}
func (m *MockStorage) IsRetryableErr(error) bool { return false }
func (m *MockStorage) DeleteObject(_ context.Context, objectKey string) error {
m.mtx.Lock()
defer m.mtx.Unlock()

@ -21,6 +21,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/client/azure"
"github.com/grafana/loki/pkg/storage/chunk/client/baidubce"
"github.com/grafana/loki/pkg/storage/chunk/client/cassandra"
"github.com/grafana/loki/pkg/storage/chunk/client/congestion"
"github.com/grafana/loki/pkg/storage/chunk/client/gcp"
"github.com/grafana/loki/pkg/storage/chunk/client/grpc"
"github.com/grafana/loki/pkg/storage/chunk/client/hedging"
@ -279,6 +280,7 @@ type Config struct {
NamedStores NamedStores `yaml:"named_stores"`
COSConfig ibmcloud.COSConfig `yaml:"cos"`
IndexCacheValidity time.Duration `yaml:"index_cache_validity"`
CongestionControl congestion.Config `yaml:"congestion_control,omitempty"`
IndexQueriesCacheConfig cache.Config `yaml:"index_queries_cache_config"`
DisableBroadIndexQueries bool `yaml:"disable_broad_index_queries"`
@ -308,6 +310,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.Swift.RegisterFlags(f)
cfg.GrpcConfig.RegisterFlags(f)
cfg.Hedging.RegisterFlagsWithPrefix("store.", f)
cfg.CongestionControl.RegisterFlagsWithPrefix("store.", f)
cfg.IndexQueriesCacheConfig.RegisterFlagsWithPrefix("store.index-cache-read.", "", f)
f.DurationVar(&cfg.IndexCacheValidity, "store.index-cache-validity", 5*time.Minute, "Cache validity for active index entries. Should be no higher than -ingester.max-chunk-idle.")
@ -422,7 +425,7 @@ func NewIndexClient(periodCfg config.PeriodConfig, tableRange config.TableRange,
}
// NewChunkClient makes a new chunk.Client of the desired types.
func NewChunkClient(name string, cfg Config, schemaCfg config.SchemaConfig, clientMetrics ClientMetrics, registerer prometheus.Registerer) (client.Client, error) {
func NewChunkClient(name string, cfg Config, schemaCfg config.SchemaConfig, cc congestion.Controller, registerer prometheus.Registerer, clientMetrics ClientMetrics) (client.Client, error) {
var (
storeType = name
)
@ -477,6 +480,13 @@ func NewChunkClient(name string, cfg Config, schemaCfg config.SchemaConfig, clie
if err != nil {
return nil, err
}
// TODO(dannyk): expand congestion control to all other object clients
// this switch statement can be simplified; all the branches like this one are alike
if cfg.CongestionControl.Enabled {
c = cc.Wrap(c)
}
return client.NewClientWithMaxParallel(c, nil, cfg.MaxParallelGetChunk, schemaCfg), nil
case config.StorageTypeSwift:
c, err := NewObjectClient(name, cfg, clientMetrics)
@ -623,6 +633,13 @@ func NewObjectClient(name string, cfg Config, clientMetrics ClientMetrics) (clie
gcsCfg = (gcp.GCSConfig)(nsCfg)
}
// ensure the GCS client's internal retry mechanism is disabled if we're using congestion control,
// which has its own retry mechanism
// TODO(dannyk): implement hedging in controller
if cfg.CongestionControl.Enabled {
gcsCfg.EnableRetries = false
}
return gcp.NewGCSObjectClient(context.Background(), gcsCfg, cfg.Hedging)
case config.StorageTypeAzure:
azureCfg := cfg.AzureStorageConfig

@ -23,6 +23,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/chunk/client"
"github.com/grafana/loki/pkg/storage/chunk/client/congestion"
"github.com/grafana/loki/pkg/storage/chunk/fetcher"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores"
@ -73,7 +74,8 @@ type store struct {
limits StoreLimits
logger log.Logger
chunkFilterer chunk.RequestChunkFilterer
chunkFilterer chunk.RequestChunkFilterer
congestionControllerFactory func(cfg congestion.Config, logger log.Logger, metrics *congestion.Metrics) congestion.Controller
}
// NewStore creates a new Loki Store using configuration supplied.
@ -139,6 +141,8 @@ func NewStore(cfg Config, storeCfg config.ChunkStoreConfig, schemaCfg config.Sch
storeCfg: storeCfg,
schemaCfg: schemaCfg,
congestionControllerFactory: congestion.NewController,
chunkClientMetrics: client.NewChunkClientMetrics(registerer),
clientMetrics: clientMetrics,
chunkMetrics: NewChunkMetrics(registerer, cfg.MaxChunkBatchSize),
@ -197,7 +201,18 @@ func (s *store) chunkClientForPeriod(p config.PeriodConfig) (client.Client, erro
chunkClientReg := prometheus.WrapRegistererWith(
prometheus.Labels{"component": "chunk-store-" + p.From.String()}, s.registerer)
chunks, err := NewChunkClient(objectStoreType, s.cfg, s.schemaCfg, s.clientMetrics, chunkClientReg)
var cc congestion.Controller
ccCfg := s.cfg.CongestionControl
if ccCfg.Enabled {
cc = s.congestionControllerFactory(
ccCfg,
s.logger,
congestion.NewMetrics(fmt.Sprintf("%s-%s", objectStoreType, p.From.String()), ccCfg),
)
}
chunks, err := NewChunkClient(objectStoreType, s.cfg, s.schemaCfg, cc, chunkClientReg, s.clientMetrics)
if err != nil {
return nil, errors.Wrap(err, "error creating object client")
}

@ -50,6 +50,10 @@ func (p prefixedObjectClient) IsObjectNotFoundErr(err error) bool {
return p.downstreamClient.IsObjectNotFoundErr(err)
}
func (p prefixedObjectClient) IsRetryableErr(err error) bool {
return p.downstreamClient.IsRetryableErr(err)
}
func (p prefixedObjectClient) Stop() {
p.downstreamClient.Stop()
}

@ -71,6 +71,9 @@ func (m *mockChunksClient) DeleteChunk(_ context.Context, _, _ string) error {
func (m *mockChunksClient) IsChunkNotFoundErr(_ error) bool {
panic("IsChunkNotFoundErr not implemented")
}
func (m *mockChunksClient) IsRetryableErr(_ error) bool {
panic("IsRetryableErr not implemented")
}
func TestChunkWriter_PutOne(t *testing.T) {
schemaConfig := config.SchemaConfig{

@ -297,6 +297,10 @@ func (m mockChunkStoreClient) IsChunkNotFoundErr(_ error) bool {
return false
}
func (m mockChunkStoreClient) IsRetryableErr(_ error) bool {
return false
}
var streamsFixture = []*logproto.Stream{
{
Labels: "{foo=\"bar\"}",

Loading…
Cancel
Save