chore: Remove experimental bloom settings that have never been used or are not used any more (#15787)

Remove experimental results cache for bloom gateway client
    
    This feature never made it into production. Since it is only an
    experimental feature, it can be removed without prior deprecation.

Remove unused `bloom_gateway_shard_size` runtime setting
    
    The setting was once used when bloom gateways still used a ring for
    sharding.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/15774/head^2
Christian Haudum 1 year ago committed by GitHub
parent 45bae6dd1c
commit 1a9f382e6f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 26
      docs/sources/shared/configuration.md
  2. 1
      pkg/bloomgateway/bloomgateway_test.go
  3. 189
      pkg/bloomgateway/cache.go
  4. 507
      pkg/bloomgateway/cache_test.go
  5. 38
      pkg/bloomgateway/client.go
  6. 2
      pkg/bloomgateway/config.go
  7. 14
      pkg/validation/limits.go

@ -1394,22 +1394,6 @@ client:
# bloom-gateway-client.grpc
[grpc_client_config: <grpc_client>]
results_cache:
# The cache_config block configures the cache backend for a specific Loki
# component.
# The CLI flags prefix for this block configuration is:
# bloom-gateway-client.cache
[cache: <cache_config>]
# Use compression in cache. The default is an empty value '', which disables
# compression. Supported values are: 'snappy' and ''.
# CLI flag: -bloom-gateway-client.cache.compression
[compression: <string> | default = ""]
# Flag to control whether to cache bloom gateway client requests/responses.
# CLI flag: -bloom-gateway-client.cache_results
[cache_results: <boolean> | default = false]
# Comma separated addresses list in DNS Service Discovery format:
# https://grafana.com/docs/mimir/latest/configure/about-dns-service-discovery/#supported-discovery-modes
# CLI flag: -bloom-gateway-client.addresses
@ -1465,7 +1449,6 @@ The `bos_storage_config` block configures the connection to Baidu Object Storage
The `cache_config` block configures the cache backend for a specific Loki component. The supported CLI flags `<prefix>` used to reference this configuration block are:
- `bloom-gateway-client.cache`
- `bloom.metas-cache`
- `frontend`
- `frontend.index-stats-results-cache`
@ -3828,20 +3811,11 @@ shard_streams:
# CLI flag: -index-gateway.shard-size
[index_gateway_shard_size: <int> | default = 0]
# Experimental. The shard size defines how many bloom gateways should be used by
# a tenant for querying.
# CLI flag: -bloom-gateway.shard-size
[bloom_gateway_shard_size: <int> | default = 0]
# Experimental. Whether to use the bloom gateway component in the read path to
# filter chunks.
# CLI flag: -bloom-gateway.enable-filtering
[bloom_gateway_enable_filtering: <boolean> | default = false]
# Experimental. Interval for computing the cache key in the Bloom Gateway.
# CLI flag: -bloom-gateway.cache-key-interval
[bloom_gateway_cache_key_interval: <duration> | default = 15m]
# Experimental. Maximum number of builders to use when building blooms. 0 allows
# unlimited builders.
# CLI flag: -bloom-build.max-builders

@ -57,7 +57,6 @@ func newLimits() *validation.Overrides {
limits := validation.Limits{}
flagext.DefaultValues(&limits)
limits.BloomGatewayEnabled = true
limits.BloomGatewayShardSize = 1
overrides, _ := validation.NewOverrides(limits, nil)
return overrides

@ -1,189 +0,0 @@
package bloomgateway
import (
"context"
"flag"
"time"
"github.com/go-kit/log"
"github.com/prometheus/common/model"
"google.golang.org/grpc"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache"
)
const (
cacheParalellism = 1
)
type CacheConfig struct {
resultscache.Config `yaml:",inline"`
}
// RegisterFlags registers flags.
func (cfg *CacheConfig) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("bloom-gateway-client.cache.", f)
}
func (cfg *CacheConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.Config.RegisterFlagsWithPrefix(f, prefix)
}
type CacheLimits interface {
resultscache.Limits
BloomGatewayCacheKeyInterval(tenantID string) time.Duration
}
type keyGen struct {
CacheLimits
}
func newCacheKeyGen(limits CacheLimits) keyGen {
return keyGen{limits}
}
// TODO(owen-d): need to implement our own key-generation which accounts for fingerprint ranges requested.
func (k keyGen) GenerateCacheKey(ctx context.Context, tenant string, r resultscache.Request) string {
return resultscache.ConstSplitter(k.BloomGatewayCacheKeyInterval(tenant)).GenerateCacheKey(ctx, tenant, r)
}
type extractor struct{}
func newExtractor() extractor {
return extractor{}
}
// Extract extracts a subset of a response from the `start` and `end` timestamps in milliseconds.
// We remove chunks that are not within the given time range.
func (e extractor) Extract(start, end int64, r resultscache.Response, _, _ int64) resultscache.Response {
res := r.(*logproto.FilterChunkRefResponse)
chunkRefs := make([]*logproto.GroupedChunkRefs, 0, len(res.ChunkRefs))
for _, chunkRef := range res.ChunkRefs {
refs := make([]*logproto.ShortRef, 0, len(chunkRef.Refs))
for _, ref := range chunkRef.Refs {
if model.Time(end) < ref.From || ref.Through <= model.Time(start) {
continue
}
refs = append(refs, ref)
}
if len(refs) > 0 {
chunkRefs = append(chunkRefs, &logproto.GroupedChunkRefs{
Fingerprint: chunkRef.Fingerprint,
Labels: chunkRef.Labels,
Tenant: chunkRef.Tenant,
Refs: refs,
})
}
}
return &logproto.FilterChunkRefResponse{
ChunkRefs: chunkRefs,
}
}
type merger struct{}
func newMerger() merger {
return merger{}
}
// MergeResponse merges responses from multiple requests into a single Response
// We merge all chunks grouped by their fingerprint.
func (m merger) MergeResponse(responses ...resultscache.Response) (resultscache.Response, error) {
var size int
unmerged := make([][]*logproto.GroupedChunkRefs, 0, len(responses))
for _, r := range responses {
res := r.(*logproto.FilterChunkRefResponse)
unmerged = append(unmerged, res.ChunkRefs)
size += len(res.ChunkRefs)
}
buf := make([]*logproto.GroupedChunkRefs, 0, size)
deduped, err := mergeSeries(unmerged, buf)
if err != nil {
return nil, err
}
return &logproto.FilterChunkRefResponse{ChunkRefs: deduped}, nil
}
type ClientCache struct {
cache *resultscache.ResultsCache
next logproto.BloomGatewayClient
limits CacheLimits
logger log.Logger
}
func NewBloomGatewayClientCacheMiddleware(
logger log.Logger,
next logproto.BloomGatewayClient,
c cache.Cache,
limits CacheLimits,
cacheGen resultscache.CacheGenNumberLoader,
retentionEnabled bool,
) *ClientCache {
nextAsHandler := resultscache.HandlerFunc(func(ctx context.Context, cacheReq resultscache.Request) (resultscache.Response, error) {
req := cacheReq.(requestWithGrpcCallOptions)
return next.FilterChunkRefs(ctx, req.FilterChunkRefRequest, req.grpcCallOptions...)
})
resultsCache := resultscache.NewResultsCache(
logger,
c,
nextAsHandler,
newCacheKeyGen(limits),
limits,
newMerger(),
newExtractor(),
nil,
nil,
func(_ context.Context, _ []string, _ resultscache.Request) int {
return cacheParalellism
},
cacheGen,
retentionEnabled,
false,
)
return &ClientCache{
next: next,
cache: resultsCache,
limits: limits,
logger: logger,
}
}
// PrefetchBloomBlocks implements logproto.BloomGatewayClient.
func (c *ClientCache) PrefetchBloomBlocks(ctx context.Context, in *logproto.PrefetchBloomBlocksRequest, opts ...grpc.CallOption) (*logproto.PrefetchBloomBlocksResponse, error) {
return c.next.PrefetchBloomBlocks(ctx, in, opts...)
}
// FilterChunkRefs implements logproto.BloomGatewayClient.
func (c *ClientCache) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunkRefRequest, opts ...grpc.CallOption) (*logproto.FilterChunkRefResponse, error) {
cacheReq := requestWithGrpcCallOptions{
FilterChunkRefRequest: req,
grpcCallOptions: opts,
}
res, err := c.cache.Do(ctx, cacheReq)
if err != nil {
return nil, err
}
return res.(*logproto.FilterChunkRefResponse), nil
}
type requestWithGrpcCallOptions struct {
*logproto.FilterChunkRefRequest
grpcCallOptions []grpc.CallOption
}
func (r requestWithGrpcCallOptions) WithStartEndForCache(start time.Time, end time.Time) resultscache.Request {
return requestWithGrpcCallOptions{
FilterChunkRefRequest: r.FilterChunkRefRequest.WithStartEndForCache(start, end).(*logproto.FilterChunkRefRequest),
grpcCallOptions: r.grpcCallOptions,
}
}

@ -1,507 +0,0 @@
package bloomgateway
import (
"context"
"testing"
"time"
"github.com/go-kit/log"
"github.com/grafana/dskit/user"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
"github.com/grafana/loki/v3/pkg/querier/plan"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache"
"github.com/grafana/loki/v3/pkg/util/constants"
)
// Range is 1000-4000
var templateResponse = &logproto.FilterChunkRefResponse{
ChunkRefs: []*logproto.GroupedChunkRefs{
{
Fingerprint: 1,
Tenant: "fake",
Refs: []*logproto.ShortRef{
{
From: 1000,
Through: 1500,
Checksum: 10,
},
{
From: 1500,
Through: 2500,
Checksum: 20,
},
},
},
{
Fingerprint: 2,
Tenant: "fake",
Refs: []*logproto.ShortRef{
{
From: 3000,
Through: 4000,
Checksum: 30,
},
{
From: 1000,
Through: 3000,
Checksum: 40,
},
},
},
},
}
func TestExtract(t *testing.T) {
for _, tc := range []struct {
name string
start int64
end int64
input *logproto.FilterChunkRefResponse
expected *logproto.FilterChunkRefResponse
}{
{
name: "start and end out of range",
start: 100,
end: 200,
input: templateResponse,
expected: &logproto.FilterChunkRefResponse{
ChunkRefs: []*logproto.GroupedChunkRefs{},
},
},
{
name: "start spans exact range",
start: 1000,
end: 4000,
input: templateResponse,
expected: templateResponse,
},
{
name: "start spans more than range",
start: 100,
end: 5000,
input: templateResponse,
expected: templateResponse,
},
{
name: "start and end within range",
start: 1700,
end: 2700,
input: templateResponse,
expected: &logproto.FilterChunkRefResponse{
ChunkRefs: []*logproto.GroupedChunkRefs{
{
Fingerprint: 1,
Tenant: "fake",
Refs: []*logproto.ShortRef{
{
From: 1500,
Through: 2500,
Checksum: 20,
},
},
},
{
Fingerprint: 2,
Tenant: "fake",
Refs: []*logproto.ShortRef{
{
From: 1000,
Through: 3000,
Checksum: 40,
},
},
},
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
e := newExtractor()
actual := e.Extract(tc.start, tc.end, tc.input, 0, 0)
require.Equal(t, tc.expected, actual)
})
}
}
func TestMerge(t *testing.T) {
for _, tc := range []struct {
name string
input []*logproto.FilterChunkRefResponse
expected *logproto.FilterChunkRefResponse
}{
{
name: "empy input",
input: []*logproto.FilterChunkRefResponse{},
expected: &logproto.FilterChunkRefResponse{
ChunkRefs: []*logproto.GroupedChunkRefs{},
},
},
{
name: "single input",
input: []*logproto.FilterChunkRefResponse{templateResponse},
expected: templateResponse,
},
{
name: "repeating and non-repeating fingerprint with repeating and non-repeating chunks",
input: []*logproto.FilterChunkRefResponse{
{
ChunkRefs: []*logproto.GroupedChunkRefs{
{
Fingerprint: 1,
Tenant: "fake",
Refs: []*logproto.ShortRef{
{
From: 1000,
Through: 1500,
Checksum: 10,
},
{
From: 1500,
Through: 2500,
Checksum: 20,
},
},
},
{
Fingerprint: 2,
Tenant: "fake",
Refs: []*logproto.ShortRef{
{
From: 1000,
Through: 1500,
Checksum: 10,
},
{
From: 1500,
Through: 2500,
Checksum: 20,
},
},
},
},
},
{
ChunkRefs: []*logproto.GroupedChunkRefs{
// Same FP as in previous input and same chunks
{
Fingerprint: 1,
Tenant: "fake",
Refs: []*logproto.ShortRef{
{
From: 1000,
Through: 1500,
Checksum: 10,
},
{
From: 1500,
Through: 2500,
Checksum: 20,
},
},
},
// Same FP as in previous input, but different chunks
{
Fingerprint: 2,
Tenant: "fake",
Refs: []*logproto.ShortRef{
// Same chunk as in previous input
{
From: 1500,
Through: 2500,
Checksum: 20,
},
// New chunk
{
From: 2000,
Through: 2500,
Checksum: 30,
},
},
},
// New FP
{
Fingerprint: 3,
Tenant: "fake",
Refs: []*logproto.ShortRef{
{
From: 1000,
Through: 1500,
Checksum: 10,
},
{
From: 1500,
Through: 2500,
Checksum: 20,
},
},
},
},
},
{
ChunkRefs: []*logproto.GroupedChunkRefs{
// Same FP as in previous input and diff chunks
{
Fingerprint: 2,
Tenant: "fake",
Refs: []*logproto.ShortRef{
{
From: 700,
Through: 1000,
Checksum: 40,
},
{
From: 2000,
Through: 2700,
Checksum: 50,
},
},
},
},
},
},
expected: &logproto.FilterChunkRefResponse{
ChunkRefs: []*logproto.GroupedChunkRefs{
{
Fingerprint: 1,
Tenant: "fake",
Refs: []*logproto.ShortRef{
{
From: 1000,
Through: 1500,
Checksum: 10,
},
{
From: 1500,
Through: 2500,
Checksum: 20,
},
},
},
{
Fingerprint: 2,
Tenant: "fake",
Refs: []*logproto.ShortRef{
{
From: 700,
Through: 1000,
Checksum: 40,
},
{
From: 1000,
Through: 1500,
Checksum: 10,
},
{
From: 1500,
Through: 2500,
Checksum: 20,
},
{
From: 2000,
Through: 2500,
Checksum: 30,
},
{
From: 2000,
Through: 2700,
Checksum: 50,
},
},
},
{
Fingerprint: 3,
Tenant: "fake",
Refs: []*logproto.ShortRef{
{
From: 1000,
Through: 1500,
Checksum: 10,
},
{
From: 1500,
Through: 2500,
Checksum: 20,
},
},
},
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
input := make([]resultscache.Response, 0, len(tc.input))
for _, i := range tc.input {
input = append(input, i)
}
m := newMerger()
actual, err := m.MergeResponse(input...)
require.NoError(t, err)
resp, ok := actual.(*logproto.FilterChunkRefResponse)
require.True(t, ok)
require.Equal(t, tc.expected, resp)
})
}
}
func TestCache(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "fake")
limits := mockLimits{
cacheInterval: 15 * time.Minute,
}
cfg := CacheConfig{
Config: resultscache.Config{
CacheConfig: cache.Config{
Cache: cache.NewMockCache(),
},
},
}
c, err := cache.New(cfg.CacheConfig, nil, log.NewNopLogger(), stats.BloomFilterCache, constants.Loki)
require.NoError(t, err)
defer c.Stop()
chunkRefs := []*logproto.ChunkRef{
{
Fingerprint: 2,
UserID: "fake",
From: 1500,
Through: 2500,
Checksum: 30,
},
{
Fingerprint: 3,
UserID: "fake",
From: 2500,
Through: 3500,
},
}
expr, err := syntax.ParseExpr(`{foo="bar"} |= "does not match"`)
require.NoError(t, err)
req := &logproto.FilterChunkRefRequest{
From: model.Time(2000),
Through: model.Time(3000),
Refs: groupRefs(t, chunkRefs),
Plan: plan.QueryPlan{AST: expr},
}
expectedRes := &logproto.FilterChunkRefResponse{
ChunkRefs: groupRefs(t, chunkRefs),
}
server, calls := newMockServer(expectedRes)
cacheMiddleware := NewBloomGatewayClientCacheMiddleware(
log.NewNopLogger(),
server,
c,
limits,
nil,
false,
)
// First call should go to the server
*calls = 0
res, err := cacheMiddleware.FilterChunkRefs(ctx, req)
require.NoError(t, err)
require.Equal(t, 1, *calls)
require.Equal(t, expectedRes, res)
// Second call should go to the cache
*calls = 0
res, err = cacheMiddleware.FilterChunkRefs(ctx, req)
require.NoError(t, err)
require.Equal(t, 0, *calls)
require.Equal(t, expectedRes, res)
// Doing a request with new start and end should:
// 1. hit the server the leading time
// 2. hit the cache the cached span
// 3. hit the server for the trailing time
newChunkRefs := []*logproto.ChunkRef{
{
Fingerprint: 1,
UserID: "fake",
From: 1000,
Through: 1500,
Checksum: 10,
},
{
Fingerprint: 4,
UserID: "fake",
From: 3500,
Through: 4500,
},
}
server.SetResponse(&logproto.FilterChunkRefResponse{
ChunkRefs: groupRefs(t, newChunkRefs),
})
expectedRes = &logproto.FilterChunkRefResponse{
ChunkRefs: groupRefs(t, append(chunkRefs, newChunkRefs...)),
}
req.From = model.Time(100)
req.Through = model.Time(5000)
*calls = 0
res, err = cacheMiddleware.FilterChunkRefs(ctx, req)
require.NoError(t, err)
require.Equal(t, 2, *calls)
require.ElementsMatch(t, expectedRes.ChunkRefs, res.ChunkRefs)
// Doing a request again should only hit the cache
*calls = 0
res, err = cacheMiddleware.FilterChunkRefs(ctx, req)
require.NoError(t, err)
require.Equal(t, 0, *calls)
require.ElementsMatch(t, expectedRes.ChunkRefs, res.ChunkRefs)
}
type mockServer struct {
calls *int
res *logproto.FilterChunkRefResponse
}
var _ logproto.BloomGatewayClient = &mockServer{}
func newMockServer(res *logproto.FilterChunkRefResponse) (*mockServer, *int) {
var calls int
return &mockServer{
calls: &calls,
res: res,
}, &calls
}
func (s *mockServer) SetResponse(res *logproto.FilterChunkRefResponse) {
s.res = res
}
// FilterChunkRefs implements logproto.BloomGatewayClient.
func (s *mockServer) FilterChunkRefs(_ context.Context, _ *logproto.FilterChunkRefRequest, _ ...grpc.CallOption) (*logproto.FilterChunkRefResponse, error) {
*s.calls++
return s.res, nil
}
// PrefetchBloomBlocks implements logproto.BloomGatewayClient.
func (s *mockServer) PrefetchBloomBlocks(_ context.Context, _ *logproto.PrefetchBloomBlocksRequest, _ ...grpc.CallOption) (*logproto.PrefetchBloomBlocksResponse, error) {
panic("unimplemented")
}
type mockLimits struct {
cacheFreshness time.Duration
cacheInterval time.Duration
}
func (m mockLimits) MaxCacheFreshness(_ context.Context, _ string) time.Duration {
return m.cacheFreshness
}
func (m mockLimits) BloomGatewayCacheKeyInterval(_ string) time.Duration {
return m.cacheInterval
}

@ -20,14 +20,11 @@ import (
iter "github.com/grafana/loki/v3/pkg/iter/v2"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
"github.com/grafana/loki/v3/pkg/querier/plan"
"github.com/grafana/loki/v3/pkg/queue"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/util/constants"
"github.com/grafana/loki/v3/pkg/util/discovery"
)
@ -70,10 +67,6 @@ type ClientConfig struct {
// GRPCClientConfig configures the gRPC connection between the Bloom Gateway client and the server.
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
// Cache configures the cache used to store the results of the Bloom Gateway server.
Cache CacheConfig `yaml:"results_cache,omitempty"`
CacheResults bool `yaml:"cache_results"`
// Client sharding using DNS disvovery and jumphash
Addresses string `yaml:"addresses,omitempty"`
}
@ -86,9 +79,7 @@ func (i *ClientConfig) RegisterFlags(f *flag.FlagSet) {
// RegisterFlagsWithPrefix registers flags for the Bloom Gateway client configuration with a common prefix.
func (i *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
i.GRPCClientConfig.RegisterFlagsWithPrefix(prefix+"grpc", f)
i.Cache.RegisterFlagsWithPrefix(prefix+"cache.", f)
i.PoolConfig.RegisterFlagsWithPrefix(prefix+"pool.", f)
f.BoolVar(&i.CacheResults, prefix+"cache_results", false, "Flag to control whether to cache bloom gateway client requests/responses.")
f.StringVar(&i.Addresses, prefix+"addresses", "", "Comma separated addresses list in DNS Service Discovery format: https://grafana.com/docs/mimir/latest/configure/about-dns-service-discovery/#supported-discovery-modes")
}
@ -101,12 +92,6 @@ func (i *ClientConfig) Validate() error {
return errors.Wrap(err, "pool config")
}
if i.CacheResults {
if err := i.Cache.Validate(); err != nil {
return errors.Wrap(err, "cache config")
}
}
if i.Addresses == "" {
return errors.New("addresses requires a list of comma separated strings in DNS service discovery format with at least one item")
}
@ -151,34 +136,11 @@ func NewClient(
return nil, err
}
var c cache.Cache
if cfg.CacheResults {
c, err = cache.New(cfg.Cache.CacheConfig, registerer, logger, stats.BloomFilterCache, constants.Loki)
if err != nil {
return nil, errors.Wrap(err, "new bloom gateway cache")
}
if cfg.Cache.Compression == "snappy" {
c = cache.NewSnappy(c, logger)
}
}
clientFactory := func(addr string) (ringclient.PoolClient, error) {
pool, err := NewBloomGatewayGRPCPool(addr, dialOpts)
if err != nil {
return nil, errors.Wrap(err, "new bloom gateway grpc pool")
}
if cfg.CacheResults {
pool.BloomGatewayClient = NewBloomGatewayClientCacheMiddleware(
logger,
pool.BloomGatewayClient,
c,
limits,
cacheGen,
retentionEnabled,
)
}
return pool, nil
}

@ -47,7 +47,5 @@ func (cfg *Config) Validate() error {
}
type Limits interface {
CacheLimits
BloomGatewayShardSize(tenantID string) int
BloomGatewayEnabled(tenantID string) bool
}

@ -204,9 +204,7 @@ type Limits struct {
IndexGatewayShardSize int `yaml:"index_gateway_shard_size" json:"index_gateway_shard_size"`
BloomGatewayShardSize int `yaml:"bloom_gateway_shard_size" json:"bloom_gateway_shard_size" category:"experimental"`
BloomGatewayEnabled bool `yaml:"bloom_gateway_enable_filtering" json:"bloom_gateway_enable_filtering" category:"experimental"`
BloomGatewayCacheKeyInterval time.Duration `yaml:"bloom_gateway_cache_key_interval" json:"bloom_gateway_cache_key_interval" category:"experimental"`
BloomGatewayEnabled bool `yaml:"bloom_gateway_enable_filtering" json:"bloom_gateway_enable_filtering" category:"experimental"`
BloomBuildMaxBuilders int `yaml:"bloom_build_max_builders" json:"bloom_build_max_builders" category:"experimental"`
BloomBuildTaskMaxRetries int `yaml:"bloom_build_task_max_retries" json:"bloom_build_task_max_retries" category:"experimental"`
@ -403,9 +401,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.IndexGatewayShardSize, "index-gateway.shard-size", 0, "The shard size defines how many index gateways should be used by a tenant for querying. If the global shard factor is 0, the global shard factor is set to the deprecated -replication-factor for backwards compatibility reasons.")
f.IntVar(&l.BloomGatewayShardSize, "bloom-gateway.shard-size", 0, "Experimental. The shard size defines how many bloom gateways should be used by a tenant for querying.")
f.BoolVar(&l.BloomGatewayEnabled, "bloom-gateway.enable-filtering", false, "Experimental. Whether to use the bloom gateway component in the read path to filter chunks.")
f.DurationVar(&l.BloomGatewayCacheKeyInterval, "bloom-gateway.cache-key-interval", 15*time.Minute, "Experimental. Interval for computing the cache key in the Bloom Gateway.")
f.StringVar(&l.BloomBlockEncoding, "bloom-build.block-encoding", "none", "Experimental. Compression algorithm for bloom block pages.")
f.BoolVar(&l.BloomPrefetchBlocks, "bloom-build.prefetch-blocks", false, "Experimental. Prefetch blocks on bloom gateways as soon as they are built.")
@ -1033,14 +1029,6 @@ func (o *Overrides) IndexGatewayShardSize(userID string) int {
return o.getOverridesForUser(userID).IndexGatewayShardSize
}
func (o *Overrides) BloomGatewayShardSize(userID string) int {
return o.getOverridesForUser(userID).BloomGatewayShardSize
}
func (o *Overrides) BloomGatewayCacheKeyInterval(userID string) time.Duration {
return o.getOverridesForUser(userID).BloomGatewayCacheKeyInterval
}
func (o *Overrides) BloomGatewayEnabled(userID string) bool {
return o.getOverridesForUser(userID).BloomGatewayEnabled
}

Loading…
Cancel
Save