Remove unused sharding code from bloom gateway (#11906)

The sharding by fingerprint range is done on the client side, not on the
server side.

The sharding related code was not used in the gateway.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/11907/head
Christian Haudum 2 years ago committed by GitHub
parent 5914df77b9
commit 60551dacc4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 5
      pkg/bloomgateway/bloomgateway.go
  2. 16
      pkg/bloomgateway/bloomgateway_test.go
  3. 37
      pkg/bloomgateway/client.go
  4. 16
      pkg/bloomgateway/client_test.go
  5. 156
      pkg/bloomgateway/sharding.go
  6. 4
      pkg/loki/modules.go

@ -177,8 +177,6 @@ type Gateway struct {
activeUsers *util.ActiveUsersCleanupService
bloomStore bloomshipper.Store
sharding ShardingStrategy
pendingTasks *pendingTasks
serviceMngr *services.Manager
@ -196,12 +194,11 @@ func (l *fixedQueueLimits) MaxConsumers(_ string, _ int) int {
}
// New returns a new instance of the Bloom Gateway.
func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, overrides Limits, shardingStrategy ShardingStrategy, cm storage.ClientMetrics, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) {
func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, overrides Limits, cm storage.ClientMetrics, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) {
g := &Gateway{
cfg: cfg,
logger: logger,
metrics: newMetrics(reg, constants.Loki, metricsSubsystem),
sharding: shardingStrategy,
pendingTasks: makePendingTasks(pendingTasksInitialCap),
workerConfig: workerConfig{
maxItems: 100,

@ -45,8 +45,6 @@ func newLimits() *validation.Overrides {
}
func TestBloomGateway_StartStopService(t *testing.T) {
ss := NewNoopStrategy()
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()
limits := newLimits()
@ -96,7 +94,7 @@ func TestBloomGateway_StartStopService(t *testing.T) {
MaxOutstandingPerTenant: 1024,
}
gw, err := New(cfg, schemaCfg, storageCfg, limits, ss, cm, logger, reg)
gw, err := New(cfg, schemaCfg, storageCfg, limits, cm, logger, reg)
require.NoError(t, err)
err = services.StartAndAwaitRunning(context.Background(), gw)
@ -113,8 +111,6 @@ func TestBloomGateway_StartStopService(t *testing.T) {
func TestBloomGateway_FilterChunkRefs(t *testing.T) {
tenantID := "test"
ss := NewNoopStrategy()
logger := log.NewLogfmtLogger(os.Stderr)
reg := prometheus.NewRegistry()
limits := newLimits()
@ -165,7 +161,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
t.Run("shipper error is propagated", func(t *testing.T) {
reg := prometheus.NewRegistry()
gw, err := New(cfg, schemaCfg, storageCfg, limits, ss, cm, logger, reg)
gw, err := New(cfg, schemaCfg, storageCfg, limits, cm, logger, reg)
require.NoError(t, err)
now := mktime("2023-10-03 10:00")
@ -212,7 +208,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
t.Run("request cancellation does not result in channel locking", func(t *testing.T) {
reg := prometheus.NewRegistry()
gw, err := New(cfg, schemaCfg, storageCfg, limits, ss, cm, logger, reg)
gw, err := New(cfg, schemaCfg, storageCfg, limits, cm, logger, reg)
require.NoError(t, err)
now := mktime("2024-01-25 10:00")
@ -259,7 +255,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
t.Run("returns unfiltered chunk refs if no filters provided", func(t *testing.T) {
reg := prometheus.NewRegistry()
gw, err := New(cfg, schemaCfg, storageCfg, limits, ss, cm, logger, reg)
gw, err := New(cfg, schemaCfg, storageCfg, limits, cm, logger, reg)
require.NoError(t, err)
err = services.StartAndAwaitRunning(context.Background(), gw)
@ -304,7 +300,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
t.Run("gateway tracks active users", func(t *testing.T) {
reg := prometheus.NewRegistry()
gw, err := New(cfg, schemaCfg, storageCfg, limits, ss, cm, logger, reg)
gw, err := New(cfg, schemaCfg, storageCfg, limits, cm, logger, reg)
require.NoError(t, err)
err = services.StartAndAwaitRunning(context.Background(), gw)
@ -344,7 +340,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
t.Run("use fuse queriers to filter chunks", func(t *testing.T) {
reg := prometheus.NewRegistry()
gw, err := New(cfg, schemaCfg, storageCfg, limits, ss, cm, logger, reg)
gw, err := New(cfg, schemaCfg, storageCfg, limits, cm, logger, reg)
require.NoError(t, err)
now := mktime("2023-10-03 10:00")

@ -36,6 +36,10 @@ import (
)
var (
// BlocksOwnerRead is the operation used to check the authoritative owners of a block
// (replicas included) that are available for queries (a bloom gateway is available for
// queries only when ACTIVE).
BlocksOwnerRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)
// groupedChunksRefPool pooling slice of logproto.GroupedChunkRefs [64, 128, 256, ..., 65536]
groupedChunksRefPool = queue.NewSlicePool[*logproto.GroupedChunkRefs](1<<6, 1<<16, 2)
// ringGetBuffersPool pooling for ringGetBuffers to avoid calling ring.MakeBuffersForGet() for each request
@ -226,15 +230,16 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, from, t
}
subRing := GetShuffleShardingSubring(c.ring, tenant, c.limits)
rs, err := subRing.GetAllHealthy(BlocksRead)
rs, err := subRing.GetAllHealthy(BlocksOwnerRead)
if err != nil {
return nil, errors.Wrap(err, "bloom gateway get healthy instances")
}
streamsByInst, err := c.groupFingerprintsByServer(groups, subRing, rs.Instances)
servers, err := serverAddressesWithTokenRanges(subRing, rs.Instances)
if err != nil {
return nil, err
}
streamsByInst := groupFingerprintsByServer(groups, servers)
filteredChunkRefs := groupedChunksRefPool.Get(len(groups))
defer groupedChunksRefPool.Put(filteredChunkRefs)
@ -286,13 +291,9 @@ func (c *GatewayClient) doForAddrs(addrs []string, fn func(logproto.BloomGateway
return err
}
func (c *GatewayClient) groupFingerprintsByServer(groups []*logproto.GroupedChunkRefs, subRing ring.ReadRing, instances []ring.InstanceDesc) ([]instanceWithFingerprints, error) {
servers, err := serverAddressesWithTokenRanges(subRing, instances)
if err != nil {
return nil, err
}
func groupFingerprintsByServer(groups []*logproto.GroupedChunkRefs, servers []addrsWithTokenRange) []instanceWithFingerprints {
boundedFingerprints := partitionFingerprintsByAddresses(groups, servers)
return groupByInstance(boundedFingerprints), nil
return groupByInstance(boundedFingerprints)
}
func serverAddressesWithTokenRanges(subRing ring.ReadRing, instances []ring.InstanceDesc) ([]addrsWithTokenRange, error) {
@ -303,7 +304,7 @@ func serverAddressesWithTokenRanges(subRing ring.ReadRing, instances []ring.Inst
for it.Next() {
// We can use on of the tokens from the token range
// to obtain all addresses for that token.
rs, err := subRing.Get(it.At().MaxToken, BlocksRead, bufDescs, bufHosts, bufZones)
rs, err := subRing.Get(it.At().MaxToken, BlocksOwnerRead, bufDescs, bufHosts, bufZones)
if err != nil {
return nil, errors.Wrap(err, "bloom gateway get ring")
}
@ -410,3 +411,21 @@ func groupByInstance(boundedFingerprints []instanceWithFingerprints) []instanceW
return result
}
// GetShuffleShardingSubring returns the subring to be used for a given user.
// This function should be used both by index gateway servers and clients in
// order to guarantee the same logic is used.
func GetShuffleShardingSubring(ring ring.ReadRing, tenantID string, limits Limits) ring.ReadRing {
shardSize := limits.BloomGatewayShardSize(tenantID)
// A shard size of 0 means shuffle sharding is disabled for this specific user,
// so we just return the full ring so that indexes will be sharded across all index gateways.
// Since we set the shard size to replication factor if shard size is 0, this
// can only happen if both the shard size and the replication factor are set
// to 0.
if shardSize <= 0 {
return ring
}
return ring.ShuffleShard(tenantID, shardSize)
}

@ -207,19 +207,6 @@ func TestBloomGatewayClient_ServerAddressesWithTokenRanges(t *testing.T) {
}
func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) {
logger := log.NewNopLogger()
reg := prometheus.NewRegistry()
l, err := validation.NewOverrides(validation.Limits{BloomGatewayShardSize: 1}, nil)
require.NoError(t, err)
cfg := ClientConfig{}
flagext.DefaultValues(&cfg)
c, err := NewClient(cfg, nil, l, reg, logger, "loki", nil, false)
require.NoError(t, err)
instances := []ring.InstanceDesc{
{Id: "instance-1", Addr: "10.0.0.1", Tokens: []uint32{2146405214, 1029997044, 678878693}},
{Id: "instance-2", Addr: "10.0.0.2", Tokens: []uint32{296463531, 1697323986, 800258284}},
@ -339,8 +326,9 @@ func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) {
return tc.chunks[i].Fingerprint < tc.chunks[j].Fingerprint
})
res, err := c.groupFingerprintsByServer(tc.chunks, subRing, instances)
servers, err := serverAddressesWithTokenRanges(subRing, instances)
require.NoError(t, err)
res := groupFingerprintsByServer(tc.chunks, servers)
require.Equal(t, tc.expected, res)
})
}

@ -1,156 +0,0 @@
package bloomgateway
import (
"context"
"github.com/go-kit/log"
"github.com/grafana/dskit/ring"
util_ring "github.com/grafana/loki/pkg/util/ring"
)
// TODO(chaudum): Replace this placeholder with actual BlockRef struct.
type BlockRef struct {
FromFp, ThroughFp uint64
FromTs, ThroughTs int64
}
var (
// BlocksOwnerSync is the operation used to check the authoritative owners of a block
// (replicas included).
BlocksOwnerSync = ring.NewOp([]ring.InstanceState{ring.JOINING, ring.ACTIVE, ring.LEAVING}, nil)
// BlocksOwnerRead is the operation used to check the authoritative owners of a block
// (replicas included) that are available for queries (a bloom gateway is available for
// queries only when ACTIVE).
BlocksOwnerRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)
// BlocksRead is the operation run by the querier to query blocks via the bloom gateway.
BlocksRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, func(s ring.InstanceState) bool {
// Blocks can only be queried from ACTIVE instances. However, if the block belongs to
// a non-active instance, then we should extend the replication set and try to query it
// from the next ACTIVE instance in the ring (which is expected to have it because a
// bloom gateway keeps their previously owned blocks until new owners are ACTIVE).
return s != ring.ACTIVE
})
)
type ShardingStrategy interface {
// FilterTenants whose indexes should be loaded by the index gateway.
// Returns the list of user IDs that should be synced by the index gateway.
FilterTenants(ctx context.Context, tenantIDs []string) ([]string, error)
FilterBlocks(ctx context.Context, tenantID string, blockRefs []BlockRef) ([]BlockRef, error)
}
type ShuffleShardingStrategy struct {
util_ring.TenantSharding
r ring.ReadRing
ringLifeCycler *ring.BasicLifecycler
logger log.Logger
}
func NewShuffleShardingStrategy(r ring.ReadRing, ringLifecycler *ring.BasicLifecycler, limits Limits, logger log.Logger) *ShuffleShardingStrategy {
return &ShuffleShardingStrategy{
TenantSharding: util_ring.NewTenantShuffleSharding(r, ringLifecycler, limits.BloomGatewayShardSize),
ringLifeCycler: ringLifecycler,
logger: logger,
}
}
// FilterTenants implements ShardingStrategy.
func (s *ShuffleShardingStrategy) FilterTenants(_ context.Context, tenantIDs []string) ([]string, error) {
// As a protection, ensure the bloom gateway instance is healthy in the ring. It could also be missing
// in the ring if it was failing to heartbeat the ring and it got remove from another healthy bloom gateway
// instance, because of the auto-forget feature.
if set, err := s.r.GetAllHealthy(BlocksOwnerSync); err != nil {
return nil, err
} else if !set.Includes(s.ringLifeCycler.GetInstanceID()) {
return nil, errGatewayUnhealthy
}
var filteredIDs []string
for _, tenantID := range tenantIDs {
// Include the user only if it belongs to this bloom gateway shard.
if s.OwnsTenant(tenantID) {
filteredIDs = append(filteredIDs, tenantID)
}
}
return filteredIDs, nil
}
// nolint:revive
func getBucket(rangeMin, rangeMax, pos uint64) int {
return 0
}
// FilterBlocks implements ShardingStrategy.
func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, tenantID string, blockRefs []BlockRef) ([]BlockRef, error) {
if !s.OwnsTenant(tenantID) {
return nil, nil
}
filteredBlockRefs := make([]BlockRef, 0, len(blockRefs))
tenantRing := s.GetTenantSubRing(tenantID)
fpSharding := util_ring.NewFingerprintShuffleSharding(tenantRing, s.ringLifeCycler, BlocksOwnerSync)
for _, blockRef := range blockRefs {
owns, err := fpSharding.OwnsFingerprint(blockRef.FromFp)
if err != nil {
return nil, err
}
if owns {
filteredBlockRefs = append(filteredBlockRefs, blockRef)
continue
}
owns, err = fpSharding.OwnsFingerprint(blockRef.ThroughFp)
if err != nil {
return nil, err
}
if owns {
filteredBlockRefs = append(filteredBlockRefs, blockRef)
continue
}
}
return filteredBlockRefs, nil
}
// GetShuffleShardingSubring returns the subring to be used for a given user.
// This function should be used both by index gateway servers and clients in
// order to guarantee the same logic is used.
func GetShuffleShardingSubring(ring ring.ReadRing, tenantID string, limits Limits) ring.ReadRing {
shardSize := limits.BloomGatewayShardSize(tenantID)
// A shard size of 0 means shuffle sharding is disabled for this specific user,
// so we just return the full ring so that indexes will be sharded across all index gateways.
// Since we set the shard size to replication factor if shard size is 0, this
// can only happen if both the shard size and the replication factor are set
// to 0.
if shardSize <= 0 {
return ring
}
return ring.ShuffleShard(tenantID, shardSize)
}
// NoopStrategy is an implementation of the ShardingStrategy that does not
// filter anything.
type NoopStrategy struct{}
func NewNoopStrategy() *NoopStrategy {
return &NoopStrategy{}
}
// FilterTenants implements ShardingStrategy.
func (s *NoopStrategy) FilterTenants(_ context.Context, tenantIDs []string) ([]string, error) {
return tenantIDs, nil
}
// FilterBlocks implements ShardingStrategy.
func (s *NoopStrategy) FilterBlocks(_ context.Context, _ string, blockRefs []BlockRef) ([]BlockRef, error) {
return blockRefs, nil
}

@ -1271,9 +1271,7 @@ func (t *Loki) addCompactorMiddleware(h http.HandlerFunc) http.Handler {
func (t *Loki) initBloomGateway() (services.Service, error) {
logger := log.With(util_log.Logger, "component", "bloom-gateway")
shuffleSharding := bloomgateway.NewShuffleShardingStrategy(t.bloomGatewayRingManager.Ring, t.bloomGatewayRingManager.RingLifecycler, t.Overrides, logger)
gateway, err := bloomgateway.New(t.Cfg.BloomGateway, t.Cfg.SchemaConfig, t.Cfg.StorageConfig, t.Overrides, shuffleSharding, t.clientMetrics, logger, prometheus.DefaultRegisterer)
gateway, err := bloomgateway.New(t.Cfg.BloomGateway, t.Cfg.SchemaConfig, t.Cfg.StorageConfig, t.Overrides, t.clientMetrics, logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}

Loading…
Cancel
Save