Add log-volume feature flag (#9762)

Adds a feature flag for use with the new log-volume endpoints so
associated features can be rolled out incrementally.
pull/9767/head
Travis Patterson 3 years ago committed by GitHub
parent dbc3040739
commit 806674fdaa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      docs/sources/configuration/_index.md
  2. 1
      pkg/querier/queryrange/limits.go
  3. 23
      pkg/querier/queryrange/roundtrip.go
  4. 7
      pkg/querier/queryrange/roundtrip_test.go
  5. 6
      pkg/validation/limits.go
  6. 4
      pkg/validation/limits_test.go

@ -2459,6 +2459,9 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# CLI flag: -frontend.max-querier-bytes-read
[max_querier_bytes_read: <int> | default = 0B]
# Enable log-volume endpoints.
[volume_enabled: <boolean>]
# Duration to delay the evaluation of rules to ensure the underlying metrics
# have been pushed to Cortex.
# CLI flag: -ruler.evaluation-delay-duration

@ -64,6 +64,7 @@ type Limits interface {
MaxQueryBytesRead(context.Context, string) int
MaxQuerierBytesRead(context.Context, string) int
MaxStatsCacheFreshness(context.Context, string) time.Duration
VolumeEnabled(string) bool
}
type limits struct {

@ -7,6 +7,8 @@ import (
"strings"
"time"
"github.com/weaveworks/common/user"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
@ -755,7 +757,26 @@ func NewSeriesVolumeTripperware(cfg Config,
) (queryrangebase.Tripperware, error) {
labelVolumeCfg := cfg
labelVolumeCfg.CacheIndexStatsResults = false
return NewIndexStatsTripperware(labelVolumeCfg, log, limits, schema, codec, c, cacheGenNumLoader, retentionEnabled, metrics)
statsTw, err := NewIndexStatsTripperware(labelVolumeCfg, log, limits, schema, codec, c, cacheGenNumLoader, retentionEnabled, metrics)
if err != nil {
return nil, err
}
return func(next http.RoundTripper) http.RoundTripper {
nextRt := statsTw(next)
return queryrangebase.RoundTripFunc(func(r *http.Request) (*http.Response, error) {
userID, err := user.ExtractOrgID(r.Context())
if err != nil {
return nil, err
}
if !limits.VolumeEnabled(userID) {
return nil, httpgrpc.Errorf(http.StatusNotFound, "not found")
}
return nextRt.RoundTrip(r)
})
}, nil
}
func NewIndexStatsTripperware(

@ -551,7 +551,7 @@ func TestIndexStatsTripperware(t *testing.T) {
}
func TestSeriesVolumeTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, testEngineOpts, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil)
tpw, stopper, err := NewTripperware(testConfig, testEngineOpts, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, volumeEnabled: true}, config.SchemaConfig{Configs: testSchemas}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
@ -1177,6 +1177,7 @@ type fakeLimits struct {
maxQueryBytesRead int
maxQuerierBytesRead int
maxStatsCacheFreshness time.Duration
volumeEnabled bool
}
func (f fakeLimits) QuerySplitDuration(key string) time.Duration {
@ -1253,6 +1254,10 @@ func (f fakeLimits) MaxStatsCacheFreshness(_ context.Context, _ string) time.Dur
return f.maxStatsCacheFreshness
}
func (f fakeLimits) VolumeEnabled(_ string) bool {
return f.volumeEnabled
}
func counter() (*int, http.Handler) {
count := 0
var lock sync.Mutex

@ -103,6 +103,7 @@ type Limits struct {
MinShardingLookback model.Duration `yaml:"min_sharding_lookback" json:"min_sharding_lookback"`
MaxQueryBytesRead flagext.ByteSize `yaml:"max_query_bytes_read" json:"max_query_bytes_read"`
MaxQuerierBytesRead flagext.ByteSize `yaml:"max_querier_bytes_read" json:"max_querier_bytes_read"`
VolumeEnabled bool `yaml:"volume_enabled" json:"volume_enabled" doc:"description=Enable log-volume endpoints."`
// Ruler defaults and limits.
@ -734,6 +735,11 @@ func (o *Overrides) IncrementDuplicateTimestamps(userID string) bool {
return o.getOverridesForUser(userID).IncrementDuplicateTimestamp
}
// VolumeEnabled returns whether volume endpoints are enabled for a user.
func (o *Overrides) VolumeEnabled(userID string) bool {
return o.getOverridesForUser(userID).VolumeEnabled
}
func (o *Overrides) IndexGatewayShardSize(userID string) int {
return o.getOverridesForUser(userID).IndexGatewayShardSize
}

@ -79,6 +79,7 @@ shard_streams:
blocked_queries:
- pattern: ".*foo.*"
regex: true
volume_enabled: true
`
inputJSON := `
{
@ -126,7 +127,8 @@ blocked_queries:
"pattern": ".*foo.*",
"regex": true
}
]
],
"volume_enabled": true
}
`

Loading…
Cancel
Save