diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index 17bd9b395b..abecbf6773 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -177,8 +177,6 @@ type Gateway struct { activeUsers *util.ActiveUsersCleanupService bloomStore bloomshipper.Store - sharding ShardingStrategy - pendingTasks *pendingTasks serviceMngr *services.Manager @@ -196,12 +194,11 @@ func (l *fixedQueueLimits) MaxConsumers(_ string, _ int) int { } // New returns a new instance of the Bloom Gateway. -func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, overrides Limits, shardingStrategy ShardingStrategy, cm storage.ClientMetrics, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) { +func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, overrides Limits, cm storage.ClientMetrics, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) { g := &Gateway{ cfg: cfg, logger: logger, metrics: newMetrics(reg, constants.Loki, metricsSubsystem), - sharding: shardingStrategy, pendingTasks: makePendingTasks(pendingTasksInitialCap), workerConfig: workerConfig{ maxItems: 100, diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index 329c7b38a3..1b1c8b7d56 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -45,8 +45,6 @@ func newLimits() *validation.Overrides { } func TestBloomGateway_StartStopService(t *testing.T) { - - ss := NewNoopStrategy() logger := log.NewNopLogger() reg := prometheus.NewRegistry() limits := newLimits() @@ -96,7 +94,7 @@ func TestBloomGateway_StartStopService(t *testing.T) { MaxOutstandingPerTenant: 1024, } - gw, err := New(cfg, schemaCfg, storageCfg, limits, ss, cm, logger, reg) + gw, err := New(cfg, schemaCfg, storageCfg, limits, cm, logger, reg) require.NoError(t, err) err = services.StartAndAwaitRunning(context.Background(), gw) @@ -113,8 +111,6 @@ func TestBloomGateway_StartStopService(t *testing.T) { func TestBloomGateway_FilterChunkRefs(t *testing.T) { tenantID := "test" - - ss := NewNoopStrategy() logger := log.NewLogfmtLogger(os.Stderr) reg := prometheus.NewRegistry() limits := newLimits() @@ -165,7 +161,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { t.Run("shipper error is propagated", func(t *testing.T) { reg := prometheus.NewRegistry() - gw, err := New(cfg, schemaCfg, storageCfg, limits, ss, cm, logger, reg) + gw, err := New(cfg, schemaCfg, storageCfg, limits, cm, logger, reg) require.NoError(t, err) now := mktime("2023-10-03 10:00") @@ -212,7 +208,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { t.Run("request cancellation does not result in channel locking", func(t *testing.T) { reg := prometheus.NewRegistry() - gw, err := New(cfg, schemaCfg, storageCfg, limits, ss, cm, logger, reg) + gw, err := New(cfg, schemaCfg, storageCfg, limits, cm, logger, reg) require.NoError(t, err) now := mktime("2024-01-25 10:00") @@ -259,7 +255,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { t.Run("returns unfiltered chunk refs if no filters provided", func(t *testing.T) { reg := prometheus.NewRegistry() - gw, err := New(cfg, schemaCfg, storageCfg, limits, ss, cm, logger, reg) + gw, err := New(cfg, schemaCfg, storageCfg, limits, cm, logger, reg) require.NoError(t, err) err = services.StartAndAwaitRunning(context.Background(), gw) @@ -304,7 +300,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { t.Run("gateway tracks active users", func(t *testing.T) { reg := prometheus.NewRegistry() - gw, err := New(cfg, schemaCfg, storageCfg, limits, ss, cm, logger, reg) + gw, err := New(cfg, schemaCfg, storageCfg, limits, cm, logger, reg) require.NoError(t, err) err = services.StartAndAwaitRunning(context.Background(), gw) @@ -344,7 +340,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { t.Run("use fuse queriers to filter chunks", func(t *testing.T) { reg := prometheus.NewRegistry() - gw, err := New(cfg, schemaCfg, storageCfg, limits, ss, cm, logger, reg) + gw, err := New(cfg, schemaCfg, storageCfg, limits, cm, logger, reg) require.NoError(t, err) now := mktime("2023-10-03 10:00") diff --git a/pkg/bloomgateway/client.go b/pkg/bloomgateway/client.go index 6453987b91..9a75e4e87c 100644 --- a/pkg/bloomgateway/client.go +++ b/pkg/bloomgateway/client.go @@ -36,6 +36,10 @@ import ( ) var ( + // BlocksOwnerRead is the operation used to check the authoritative owners of a block + // (replicas included) that are available for queries (a bloom gateway is available for + // queries only when ACTIVE). + BlocksOwnerRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil) // groupedChunksRefPool pooling slice of logproto.GroupedChunkRefs [64, 128, 256, ..., 65536] groupedChunksRefPool = queue.NewSlicePool[*logproto.GroupedChunkRefs](1<<6, 1<<16, 2) // ringGetBuffersPool pooling for ringGetBuffers to avoid calling ring.MakeBuffersForGet() for each request @@ -226,15 +230,16 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, from, t } subRing := GetShuffleShardingSubring(c.ring, tenant, c.limits) - rs, err := subRing.GetAllHealthy(BlocksRead) + rs, err := subRing.GetAllHealthy(BlocksOwnerRead) if err != nil { return nil, errors.Wrap(err, "bloom gateway get healthy instances") } - streamsByInst, err := c.groupFingerprintsByServer(groups, subRing, rs.Instances) + servers, err := serverAddressesWithTokenRanges(subRing, rs.Instances) if err != nil { return nil, err } + streamsByInst := groupFingerprintsByServer(groups, servers) filteredChunkRefs := groupedChunksRefPool.Get(len(groups)) defer groupedChunksRefPool.Put(filteredChunkRefs) @@ -286,13 +291,9 @@ func (c *GatewayClient) doForAddrs(addrs []string, fn func(logproto.BloomGateway return err } -func (c *GatewayClient) groupFingerprintsByServer(groups []*logproto.GroupedChunkRefs, subRing ring.ReadRing, instances []ring.InstanceDesc) ([]instanceWithFingerprints, error) { - servers, err := serverAddressesWithTokenRanges(subRing, instances) - if err != nil { - return nil, err - } +func groupFingerprintsByServer(groups []*logproto.GroupedChunkRefs, servers []addrsWithTokenRange) []instanceWithFingerprints { boundedFingerprints := partitionFingerprintsByAddresses(groups, servers) - return groupByInstance(boundedFingerprints), nil + return groupByInstance(boundedFingerprints) } func serverAddressesWithTokenRanges(subRing ring.ReadRing, instances []ring.InstanceDesc) ([]addrsWithTokenRange, error) { @@ -303,7 +304,7 @@ func serverAddressesWithTokenRanges(subRing ring.ReadRing, instances []ring.Inst for it.Next() { // We can use on of the tokens from the token range // to obtain all addresses for that token. - rs, err := subRing.Get(it.At().MaxToken, BlocksRead, bufDescs, bufHosts, bufZones) + rs, err := subRing.Get(it.At().MaxToken, BlocksOwnerRead, bufDescs, bufHosts, bufZones) if err != nil { return nil, errors.Wrap(err, "bloom gateway get ring") } @@ -410,3 +411,21 @@ func groupByInstance(boundedFingerprints []instanceWithFingerprints) []instanceW return result } + +// GetShuffleShardingSubring returns the subring to be used for a given user. +// This function should be used both by index gateway servers and clients in +// order to guarantee the same logic is used. +func GetShuffleShardingSubring(ring ring.ReadRing, tenantID string, limits Limits) ring.ReadRing { + shardSize := limits.BloomGatewayShardSize(tenantID) + + // A shard size of 0 means shuffle sharding is disabled for this specific user, + // so we just return the full ring so that indexes will be sharded across all index gateways. + // Since we set the shard size to replication factor if shard size is 0, this + // can only happen if both the shard size and the replication factor are set + // to 0. + if shardSize <= 0 { + return ring + } + + return ring.ShuffleShard(tenantID, shardSize) +} diff --git a/pkg/bloomgateway/client_test.go b/pkg/bloomgateway/client_test.go index e59fff2306..b1716de815 100644 --- a/pkg/bloomgateway/client_test.go +++ b/pkg/bloomgateway/client_test.go @@ -207,19 +207,6 @@ func TestBloomGatewayClient_ServerAddressesWithTokenRanges(t *testing.T) { } func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) { - - logger := log.NewNopLogger() - reg := prometheus.NewRegistry() - - l, err := validation.NewOverrides(validation.Limits{BloomGatewayShardSize: 1}, nil) - require.NoError(t, err) - - cfg := ClientConfig{} - flagext.DefaultValues(&cfg) - - c, err := NewClient(cfg, nil, l, reg, logger, "loki", nil, false) - require.NoError(t, err) - instances := []ring.InstanceDesc{ {Id: "instance-1", Addr: "10.0.0.1", Tokens: []uint32{2146405214, 1029997044, 678878693}}, {Id: "instance-2", Addr: "10.0.0.2", Tokens: []uint32{296463531, 1697323986, 800258284}}, @@ -339,8 +326,9 @@ func TestBloomGatewayClient_GroupFingerprintsByServer(t *testing.T) { return tc.chunks[i].Fingerprint < tc.chunks[j].Fingerprint }) - res, err := c.groupFingerprintsByServer(tc.chunks, subRing, instances) + servers, err := serverAddressesWithTokenRanges(subRing, instances) require.NoError(t, err) + res := groupFingerprintsByServer(tc.chunks, servers) require.Equal(t, tc.expected, res) }) } diff --git a/pkg/bloomgateway/sharding.go b/pkg/bloomgateway/sharding.go deleted file mode 100644 index 5dfb9f1173..0000000000 --- a/pkg/bloomgateway/sharding.go +++ /dev/null @@ -1,156 +0,0 @@ -package bloomgateway - -import ( - "context" - - "github.com/go-kit/log" - "github.com/grafana/dskit/ring" - - util_ring "github.com/grafana/loki/pkg/util/ring" -) - -// TODO(chaudum): Replace this placeholder with actual BlockRef struct. -type BlockRef struct { - FromFp, ThroughFp uint64 - FromTs, ThroughTs int64 -} - -var ( - // BlocksOwnerSync is the operation used to check the authoritative owners of a block - // (replicas included). - BlocksOwnerSync = ring.NewOp([]ring.InstanceState{ring.JOINING, ring.ACTIVE, ring.LEAVING}, nil) - - // BlocksOwnerRead is the operation used to check the authoritative owners of a block - // (replicas included) that are available for queries (a bloom gateway is available for - // queries only when ACTIVE). - BlocksOwnerRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil) - - // BlocksRead is the operation run by the querier to query blocks via the bloom gateway. - BlocksRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, func(s ring.InstanceState) bool { - // Blocks can only be queried from ACTIVE instances. However, if the block belongs to - // a non-active instance, then we should extend the replication set and try to query it - // from the next ACTIVE instance in the ring (which is expected to have it because a - // bloom gateway keeps their previously owned blocks until new owners are ACTIVE). - return s != ring.ACTIVE - }) -) - -type ShardingStrategy interface { - // FilterTenants whose indexes should be loaded by the index gateway. - // Returns the list of user IDs that should be synced by the index gateway. - FilterTenants(ctx context.Context, tenantIDs []string) ([]string, error) - FilterBlocks(ctx context.Context, tenantID string, blockRefs []BlockRef) ([]BlockRef, error) -} - -type ShuffleShardingStrategy struct { - util_ring.TenantSharding - r ring.ReadRing - ringLifeCycler *ring.BasicLifecycler - logger log.Logger -} - -func NewShuffleShardingStrategy(r ring.ReadRing, ringLifecycler *ring.BasicLifecycler, limits Limits, logger log.Logger) *ShuffleShardingStrategy { - return &ShuffleShardingStrategy{ - TenantSharding: util_ring.NewTenantShuffleSharding(r, ringLifecycler, limits.BloomGatewayShardSize), - ringLifeCycler: ringLifecycler, - logger: logger, - } -} - -// FilterTenants implements ShardingStrategy. -func (s *ShuffleShardingStrategy) FilterTenants(_ context.Context, tenantIDs []string) ([]string, error) { - // As a protection, ensure the bloom gateway instance is healthy in the ring. It could also be missing - // in the ring if it was failing to heartbeat the ring and it got remove from another healthy bloom gateway - // instance, because of the auto-forget feature. - if set, err := s.r.GetAllHealthy(BlocksOwnerSync); err != nil { - return nil, err - } else if !set.Includes(s.ringLifeCycler.GetInstanceID()) { - return nil, errGatewayUnhealthy - } - - var filteredIDs []string - - for _, tenantID := range tenantIDs { - // Include the user only if it belongs to this bloom gateway shard. - if s.OwnsTenant(tenantID) { - filteredIDs = append(filteredIDs, tenantID) - } - } - - return filteredIDs, nil -} - -// nolint:revive -func getBucket(rangeMin, rangeMax, pos uint64) int { - return 0 -} - -// FilterBlocks implements ShardingStrategy. -func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, tenantID string, blockRefs []BlockRef) ([]BlockRef, error) { - if !s.OwnsTenant(tenantID) { - return nil, nil - } - - filteredBlockRefs := make([]BlockRef, 0, len(blockRefs)) - - tenantRing := s.GetTenantSubRing(tenantID) - - fpSharding := util_ring.NewFingerprintShuffleSharding(tenantRing, s.ringLifeCycler, BlocksOwnerSync) - for _, blockRef := range blockRefs { - owns, err := fpSharding.OwnsFingerprint(blockRef.FromFp) - if err != nil { - return nil, err - } - if owns { - filteredBlockRefs = append(filteredBlockRefs, blockRef) - continue - } - - owns, err = fpSharding.OwnsFingerprint(blockRef.ThroughFp) - if err != nil { - return nil, err - } - if owns { - filteredBlockRefs = append(filteredBlockRefs, blockRef) - continue - } - } - - return filteredBlockRefs, nil -} - -// GetShuffleShardingSubring returns the subring to be used for a given user. -// This function should be used both by index gateway servers and clients in -// order to guarantee the same logic is used. -func GetShuffleShardingSubring(ring ring.ReadRing, tenantID string, limits Limits) ring.ReadRing { - shardSize := limits.BloomGatewayShardSize(tenantID) - - // A shard size of 0 means shuffle sharding is disabled for this specific user, - // so we just return the full ring so that indexes will be sharded across all index gateways. - // Since we set the shard size to replication factor if shard size is 0, this - // can only happen if both the shard size and the replication factor are set - // to 0. - if shardSize <= 0 { - return ring - } - - return ring.ShuffleShard(tenantID, shardSize) -} - -// NoopStrategy is an implementation of the ShardingStrategy that does not -// filter anything. -type NoopStrategy struct{} - -func NewNoopStrategy() *NoopStrategy { - return &NoopStrategy{} -} - -// FilterTenants implements ShardingStrategy. -func (s *NoopStrategy) FilterTenants(_ context.Context, tenantIDs []string) ([]string, error) { - return tenantIDs, nil -} - -// FilterBlocks implements ShardingStrategy. -func (s *NoopStrategy) FilterBlocks(_ context.Context, _ string, blockRefs []BlockRef) ([]BlockRef, error) { - return blockRefs, nil -} diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 4bf5282746..592aa12b35 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1271,9 +1271,7 @@ func (t *Loki) addCompactorMiddleware(h http.HandlerFunc) http.Handler { func (t *Loki) initBloomGateway() (services.Service, error) { logger := log.With(util_log.Logger, "component", "bloom-gateway") - shuffleSharding := bloomgateway.NewShuffleShardingStrategy(t.bloomGatewayRingManager.Ring, t.bloomGatewayRingManager.RingLifecycler, t.Overrides, logger) - - gateway, err := bloomgateway.New(t.Cfg.BloomGateway, t.Cfg.SchemaConfig, t.Cfg.StorageConfig, t.Overrides, shuffleSharding, t.clientMetrics, logger, prometheus.DefaultRegisterer) + gateway, err := bloomgateway.New(t.Cfg.BloomGateway, t.Cfg.SchemaConfig, t.Cfg.StorageConfig, t.Overrides, t.clientMetrics, logger, prometheus.DefaultRegisterer) if err != nil { return nil, err }