From 440fb7d74cad481bebb222b21fc063fa56c2f5ac Mon Sep 17 00:00:00 2001 From: Dylan Guedes Date: Fri, 20 May 2022 03:50:27 -0300 Subject: [PATCH] Loki: Modifies TableManager to use IndexGateway ring (#5972) * Add two metrics to the IndexGateway. - Add a new `query_readiness_duration_seconds` metric, that reports query readiness duration of a tablemanager/index gateway instance. We should use it later to report performance against the ring mode - Add a new `usersToBeQueryReadyForTotal` metric, that reports number of users involved in the query readiness operation. We should use it later to correlate number of users with the query readiness duration. * Remove `usersToBeQueryReadyForTotal`. - It will report all users always for now, so it isn't too helpful the way it is. * Rename metric help text to not mislead people. * Log queryReadiness duration. * Fix where log message and duration and triggered. * Add two metrics to the IndexGateway. - Add a new `query_readiness_duration_seconds` metric, that reports query readiness duration of a tablemanager/index gateway instance. We should use it later to report performance against the ring mode - Add a new `usersToBeQueryReadyForTotal` metric, that reports number of users involved in the query readiness operation. We should use it later to correlate number of users with the query readiness duration. * Remove `usersToBeQueryReadyForTotal`. - It will report all users always for now, so it isn't too helpful the way it is. * Rename metric help text to not mislead people. * Log queryReadiness duration. * Fix where log message and duration and triggered. * Use boundaries to skip users in TableManager. - Separate the assigning of indexClient in the IndexGateway to allow initializing the shipper with the IndexGateway ring - Add new TenantBoundariesClient entity, that answers if a given tenantID should be ignored or not - Use the TenantBoundariesClient implemented by the IndexGateway in the downloads TableManager * Add IndexGateway configuration docs. * Add tests for pkg/util/ring TokenFor() and IsAssignedKey() Signed-off-by: JordanRushing * Small whitespace fix in pkg/util/ring_test.go Signed-off-by: JordanRushing * Apply suggestions from code review Rewrite QueryIndex error phrase. Co-authored-by: JordanRushing * Join users list in a single string. - This is necessary since go-kit doesn't support array type. * Tweak queryReadiness log messages. - As suggested by Ed on https://github.com/grafana/loki/pull/5972#discussion_r859734129 and https://github.com/grafana/loki/pull/5972#discussion_r859736072 * Implement new RingManager. - Adds a new entity to indexgateway named RingManager, responsible for managing the ring and the lifecycler used by the indexgateway. The ringManager is guaranteed to be initiatiated before the Shipper and before the IndexGateway. - Remove the readiness logic from the IndexGateway. - Pass the RingManager as the TenantBoundaries implementation of the Shipper * Return non-empty ringmanager for all modes. * Fix lint regarding error not being check. * Fix lint due to wrong import order. * Implement support for client and server mode for the ring manager. * Fix ring manager services registration. * Add option to configure whether or not to log gateway requests. * Check if IndexGateway is enabled instead of active. * Tune RingManager behavior. - Instantiate ring buffers inside IsAssignedKey instead of reusing same buffer to avoid issues with concurrency. - Remove unnecessary details from IndexGateway mode docs. - Return error when wrongly instantiating a RingManager when IndexGateway is in simple mode. * Rewrite `TenantInBoundaries` as a func instead of interface. * Fix lint. - Fix YAML tag - Remove else clause * Use distributor instead of querier in test. - Since the querier needs to wait for the index gateway ring, it isn't suitable for this test anymore. * Add ring mode guard clause. * Return code erased by mistake. * Rename TenantInBoundaries to IndexGatewayOwnsTenant. * Log len of users instead of all of them to avoid log msgs. * Add docstrings to new public functions. * Apply suggestions from code review Document that tenant filtering is only applied during query readiness. Co-authored-by: Sandeep Sukhani * Modify IsAssignedKey to expect address directly instead of lifecycler. - Also removes HasDedicatedAddress, since it isn't necessary anymore * Log error message when IsInReplicationSet fails. * Modify IsAssignedKey to return true by default. * Remove wrong assigning of Service for clientMode. * Log gateway requests before error checks. * Remove unnecessary endpoint registration. * Fix lint. * Update pkg/util/ring.go Co-authored-by: Sandeep Sukhani Co-authored-by: JordanRushing Co-authored-by: Sandeep Sukhani --- docs/sources/configuration/_index.md | 27 ++ pkg/loki/loki.go | 4 +- pkg/loki/modules.go | 37 +-- pkg/storage/factory.go | 4 +- pkg/storage/store.go | 2 +- .../stores/shipper/downloads/table_manager.go | 18 +- .../shipper/downloads/table_manager_test.go | 2 +- pkg/storage/stores/shipper/gateway_client.go | 9 + .../stores/shipper/gateway_client_test.go | 2 +- .../stores/shipper/indexgateway/gateway.go | 170 +----------- .../shipper/indexgateway/gateway_test.go | 2 +- .../stores/shipper/indexgateway/lifecycle.go | 8 +- .../shipper/indexgateway/ringmanager.go | 255 ++++++++++++++++++ .../stores/shipper/shipper_index_client.go | 9 +- pkg/util/ring.go | 18 +- pkg/util/ring_test.go | 141 ++++++++++ 16 files changed, 501 insertions(+), 207 deletions(-) create mode 100644 pkg/storage/stores/shipper/indexgateway/ringmanager.go create mode 100644 pkg/util/ring_test.go diff --git a/docs/sources/configuration/_index.md b/docs/sources/configuration/_index.md index de57fec746..bb299455a1 100644 --- a/docs/sources/configuration/_index.md +++ b/docs/sources/configuration/_index.md @@ -132,6 +132,9 @@ Pass the `-config.expand-env` flag at the command line to enable this way of set # key value store. [ingester: ] +# Configures the index gateway server. +[index_gateway: ] + # Configures where Loki will store data. [storage_config: ] @@ -1757,6 +1760,10 @@ boltdb_shipper: # The CLI flags prefix for this block config is: boltdb.shipper.index-gateway-client [grpc_client_config: ] + # Configures if gateway requests should be logged or not. + # CLI flag: -boltdb.shipper.index-gateway-client.log-gateway-requests + [log_gateway_requests: | default = false] + # Cache validity for active index entries. Should be no higher than # the chunk_idle_period in the ingester settings. # CLI flag: -store.index-cache-validity @@ -2398,6 +2405,26 @@ backoff_config: [max_retries: | default = 10] ``` +## index_gateway + +The `index_gateway` block configures the Loki index gateway server, responsible for serving index queries +without the need to constantly interact with the object store. + +```yaml +# Defines in which mode the index gateway server will operate (default to 'simple'). +# It supports two modes: +# 'simple': an index gateway server instance is responsible for handling, +# storing and returning requests for all indices for all tenants. +# 'ring': an index gateway server instance is responsible for a subset of tenants instead +# of all tenants. +[mode: | default = simple] + +# Defines the ring to be used by the index gateway servers and clients in case the servers +# are configured to run in 'ring' mode. In case this isn't configured, this block supports +# inheriting configuration from the common ring section. +[ring: ] +``` + ## table_manager The `table_manager` block configures the Loki table-manager. diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 5ce12b9e9e..20188c45e4 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -253,7 +253,7 @@ type Loki struct { QueryFrontEndTripperware basetripper.Tripperware queryScheduler *scheduler.Scheduler usageReport *usagestats.Reporter - indexGatewayRing *ring.Ring + indexGatewayRingManager *indexgateway.RingManager clientMetrics storage.ClientMetrics @@ -512,7 +512,7 @@ func (t *Loki) setupModuleManager() error { Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides, TenantConfigs, UsageReport}, TableManager: {Server, UsageReport}, Compactor: {Server, Overrides, MemberlistKV, UsageReport}, - IndexGateway: {Server, Store, Overrides, UsageReport, MemberlistKV}, + IndexGateway: {Server, Store, Overrides, UsageReport, MemberlistKV, IndexGatewayRing}, IngesterQuerier: {Ring}, IndexGatewayRing: {RuntimeConfig, Server, MemberlistKV}, All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler, Compactor}, diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 62506a8e41..5def8a0cbc 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -13,7 +13,6 @@ import ( "github.com/NYTimes/gziphandler" "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/grafana/dskit/kv" "github.com/grafana/dskit/kv/codec" "github.com/grafana/dskit/kv/memberlist" "github.com/grafana/dskit/ring" @@ -386,7 +385,9 @@ func (t *Loki) initStore() (_ services.Service, err error) { // Always set these configs // TODO(owen-d): Do the same for TSDB when we add IndexGatewayClientConfig t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Mode = t.Cfg.IndexGateway.Mode - t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Ring = t.indexGatewayRing + if t.Cfg.IndexGateway.Mode == indexgateway.RingMode { + t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Ring = t.indexGatewayRingManager.Ring + } // If RF > 1 and current or upcoming index type is boltdb-shipper then disable index dedupe and write dedupe cache. // This is to ensure that index entries are replicated to all the boltdb files in ingesters flushing replicated data. @@ -855,7 +856,7 @@ func (t *Loki) initIndexGateway() (services.Service, error) { t.Cfg.IndexGateway.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.IndexGateway.Ring.ListenPort = t.Cfg.Server.GRPCListenPort - indexClient, err := storage.NewIndexClient(config.BoltDBShipperType, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.overrides, t.clientMetrics, prometheus.DefaultRegisterer) + indexClient, err := storage.NewIndexClient(config.BoltDBShipperType, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.overrides, t.clientMetrics, t.indexGatewayRingManager.IndexGatewayOwnsTenant, prometheus.DefaultRegisterer) if err != nil { return nil, err } @@ -864,8 +865,6 @@ func (t *Loki) initIndexGateway() (services.Service, error) { return nil, err } - t.Server.HTTP.Path("/indexgateway/ring").Methods("GET", "POST").Handler(gateway) - indexgatewaypb.RegisterIndexGatewayServer(t.Server.GRPC, gateway) return gateway, nil } @@ -878,29 +877,21 @@ func (t *Loki) initIndexGatewayRing() (_ services.Service, err error) { t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly t.Cfg.IndexGateway.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.IndexGateway.Ring.ListenPort = t.Cfg.Server.GRPCListenPort - ringCfg := t.Cfg.IndexGateway.Ring.ToRingConfig(t.Cfg.IndexGateway.Ring.ReplicationFactor) - reg := prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer) - logger := util_log.Logger - ringStore, err := kv.NewClient( - ringCfg.KVStore, - ring.GetCodec(), - kv.RegistererWithKVName(prometheus.WrapRegistererWithPrefix("loki_", reg), "index-gateway"), - logger, - ) - if err != nil { - return nil, gerrors.Wrap(err, "kv new client") + managerMode := indexgateway.ClientMode + if t.Cfg.isModuleEnabled(IndexGateway) { + managerMode = indexgateway.ServerMode } + rm, err := indexgateway.NewRingManager(managerMode, t.Cfg.IndexGateway, util_log.Logger, prometheus.DefaultRegisterer) - t.indexGatewayRing, err = ring.NewWithStoreClientAndStrategy( - ringCfg, indexgateway.RingIdentifier, indexgateway.RingKey, ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy(), prometheus.WrapRegistererWithPrefix("loki_", reg), logger, - ) if err != nil { - return nil, gerrors.Wrap(err, "new with store client and strategy") + return nil, gerrors.Wrap(err, "new index gateway ring manager") } - t.Server.HTTP.Path("/indexgateway/ring").Methods("GET", "POST").Handler(t.indexGatewayRing) - return t.indexGatewayRing, nil + t.indexGatewayRingManager = rm + + t.Server.HTTP.Path("/indexgateway/ring").Methods("GET", "POST").Handler(t.indexGatewayRingManager) + return t.indexGatewayRingManager, nil } func (t *Loki) initQueryScheduler() (services.Service, error) { @@ -962,7 +953,7 @@ func (t *Loki) deleteRequestsStore() (deletion.DeleteRequestsStore, error) { deleteStore := deletion.NewNoOpDeleteRequestsStore() if config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) && filteringEnabled { - indexClient, err := storage.NewIndexClient(config.BoltDBShipperType, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.overrides, t.clientMetrics, prometheus.DefaultRegisterer) + indexClient, err := storage.NewIndexClient(config.BoltDBShipperType, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.overrides, t.clientMetrics, nil, prometheus.DefaultRegisterer) if err != nil { return nil, err } diff --git a/pkg/storage/factory.go b/pkg/storage/factory.go index be2c9cb38d..2924c109ad 100644 --- a/pkg/storage/factory.go +++ b/pkg/storage/factory.go @@ -124,7 +124,7 @@ func (cfg *Config) Validate() error { } // NewIndexClient makes a new index client of the desired type. -func NewIndexClient(name string, cfg Config, schemaCfg config.SchemaConfig, limits StoreLimits, cm ClientMetrics, registerer prometheus.Registerer) (index.Client, error) { +func NewIndexClient(name string, cfg Config, schemaCfg config.SchemaConfig, limits StoreLimits, cm ClientMetrics, ownsTenantFn downloads.IndexGatewayOwnsTenant, registerer prometheus.Registerer) (index.Client, error) { switch name { case config.StorageTypeInMemory: store := testutils.NewMockStorage() @@ -171,7 +171,7 @@ func NewIndexClient(name string, cfg Config, schemaCfg config.SchemaConfig, limi return nil, err } - boltDBIndexClientWithShipper, err = shipper.NewShipper(cfg.BoltDBShipperConfig, objectClient, limits, registerer) + boltDBIndexClientWithShipper, err = shipper.NewShipper(cfg.BoltDBShipperConfig, objectClient, limits, ownsTenantFn, registerer) return boltDBIndexClientWithShipper, err default: diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 149eb1b886..3cc324d1c9 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -232,7 +232,7 @@ func (s *store) storeForPeriod(p config.PeriodConfig, chunkClient client.Client, }, nil } - idx, err := NewIndexClient(p.IndexType, s.cfg, s.schemaCfg, s.limits, s.clientMetrics, indexClientReg) + idx, err := NewIndexClient(p.IndexType, s.cfg, s.schemaCfg, s.limits, s.clientMetrics, nil, indexClientReg) if err != nil { return nil, nil, nil, errors.Wrap(err, "error creating index client") } diff --git a/pkg/storage/stores/shipper/downloads/table_manager.go b/pkg/storage/stores/shipper/downloads/table_manager.go index 85574b6fca..fd4aa8ef45 100644 --- a/pkg/storage/stores/shipper/downloads/table_manager.go +++ b/pkg/storage/stores/shipper/downloads/table_manager.go @@ -7,7 +7,6 @@ import ( "path/filepath" "regexp" "strconv" - "strings" "sync" "time" @@ -32,6 +31,11 @@ type Limits interface { DefaultLimits() *validation.Limits } +// IndexGatewayOwnsTenant is invoked by an IndexGateway instance and answers whether if the given tenant is assigned to this instance or not. +// +// It is only relevant by an IndexGateway in the ring mode and if it returns false for a given tenant, that tenant will be ignored by this IndexGateway during query readiness. +type IndexGatewayOwnsTenant func(tenant string) bool + type Config struct { CacheDir string SyncInterval time.Duration @@ -52,9 +56,11 @@ type TableManager struct { ctx context.Context cancel context.CancelFunc wg sync.WaitGroup + + ownsTenant IndexGatewayOwnsTenant } -func NewTableManager(cfg Config, boltIndexClient BoltDBIndexClient, indexStorageClient storage.Client, registerer prometheus.Registerer) (*TableManager, error) { +func NewTableManager(cfg Config, boltIndexClient BoltDBIndexClient, indexStorageClient storage.Client, ownsTenantFn IndexGatewayOwnsTenant, registerer prometheus.Registerer) (*TableManager, error) { if err := chunk_util.EnsureDirectory(cfg.CacheDir); err != nil { return nil, err } @@ -64,6 +70,7 @@ func NewTableManager(cfg Config, boltIndexClient BoltDBIndexClient, indexStorage cfg: cfg, boltIndexClient: boltIndexClient, indexStorageClient: indexStorageClient, + ownsTenant: ownsTenantFn, tables: make(map[string]Table), metrics: newMetrics(registerer), ctx: ctx, @@ -319,8 +326,7 @@ func (tm *TableManager) ensureQueryReadiness(ctx context.Context) error { if err := table.EnsureQueryReadiness(ctx, usersToBeQueryReadyFor); err != nil { return err } - joinedUsers := strings.Join(usersToBeQueryReadyFor, ",") - level.Info(util_log.Logger).Log("msg", "index pre-download for query readiness completed", "users", joinedUsers, "duration", time.Since(perTableStart), "table", tableName) + level.Info(util_log.Logger).Log("msg", "index pre-download for query readiness completed", "users_len", len(usersToBeQueryReadyFor), "duration", time.Since(perTableStart), "table", tableName) } return nil @@ -345,6 +351,10 @@ func (tm *TableManager) findUsersInTableForQueryReadiness(tableNumber int64, use continue } + if tm.ownsTenant != nil && !tm.ownsTenant(userID) { + continue + } + if activeTableNumber-tableNumber <= int64(queryReadyNumDays) { usersToBeQueryReadyFor = append(usersToBeQueryReadyFor, userID) } diff --git a/pkg/storage/stores/shipper/downloads/table_manager_test.go b/pkg/storage/stores/shipper/downloads/table_manager_test.go index 8f7003890e..e9b212ca1d 100644 --- a/pkg/storage/stores/shipper/downloads/table_manager_test.go +++ b/pkg/storage/stores/shipper/downloads/table_manager_test.go @@ -26,7 +26,7 @@ func buildTestTableManager(t *testing.T, path string) (*TableManager, stopFunc) CacheTTL: time.Hour, Limits: &mockLimits{}, } - tableManager, err := NewTableManager(cfg, boltDBIndexClient, indexStorageClient, nil) + tableManager, err := NewTableManager(cfg, boltDBIndexClient, indexStorageClient, nil, nil) require.NoError(t, err) return tableManager, func() { diff --git a/pkg/storage/stores/shipper/gateway_client.go b/pkg/storage/stores/shipper/gateway_client.go index 236f7f5fdc..666d49056a 100644 --- a/pkg/storage/stores/shipper/gateway_client.go +++ b/pkg/storage/stores/shipper/gateway_client.go @@ -67,6 +67,10 @@ type IndexGatewayClientConfig struct { // Forcefully disable the use of the index gateway client for the storage. // This is mainly useful for the index-gateway component which should always use the storage. Disabled bool `yaml:"-"` + + // LogGatewayRequests configures if requests sent to the gateway should be logged or not. + // The log messages are of type debug and contain the address of the gateway and the relevant tenant. + LogGatewayRequests bool `yaml:"log_gateway_requests"` } // RegisterFlagsWithPrefix register client-specific flags with the given prefix. @@ -75,6 +79,7 @@ type IndexGatewayClientConfig struct { func (i *IndexGatewayClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { i.GRPCClientConfig.RegisterFlagsWithPrefix(prefix+".grpc", f) f.StringVar(&i.Address, prefix+".server-address", "", "Hostname or IP of the Index Gateway gRPC server running in simple mode.") + f.BoolVar(&i.LogGatewayRequests, prefix+".log-gateway-requests", false, "Whether requests sent to the gateway should be logged or not.") } func (i *IndexGatewayClientConfig) RegisterFlags(f *flag.FlagSet) { @@ -312,6 +317,10 @@ func (s *GatewayClient) ringModeDo(ctx context.Context, callback func(client ind }) var lastErr error for _, addr := range addrs { + if s.cfg.LogGatewayRequests { + level.Debug(util_log.Logger).Log("msg", "sending request to gateway", "gateway", addr, "tenant", userID) + } + genericClient, err := s.pool.GetClientFor(addr) if err != nil { level.Error(util_log.Logger).Log("msg", fmt.Sprintf("failed to get client for instance %s", addr), "err", err) diff --git a/pkg/storage/stores/shipper/gateway_client_test.go b/pkg/storage/stores/shipper/gateway_client_test.go index 387f53e409..d104b35a0e 100644 --- a/pkg/storage/stores/shipper/gateway_client_test.go +++ b/pkg/storage/stores/shipper/gateway_client_test.go @@ -229,7 +229,7 @@ func benchmarkIndexQueries(b *testing.B, queries []index.Query) { CacheTTL: 15 * time.Minute, QueryReadyNumDays: 30, Limits: mockLimits{}, - }, bclient, storage.NewIndexStorageClient(fs, "index/"), nil) + }, bclient, storage.NewIndexStorageClient(fs, "index/"), nil, nil) require.NoError(b, err) // initialize the index gateway server diff --git a/pkg/storage/stores/shipper/indexgateway/gateway.go b/pkg/storage/stores/shipper/indexgateway/gateway.go index f7289419ea..89c1aa1c90 100644 --- a/pkg/storage/stores/shipper/indexgateway/gateway.go +++ b/pkg/storage/stores/shipper/indexgateway/gateway.go @@ -2,18 +2,11 @@ package indexgateway import ( "context" - "fmt" - "net/http" "sync" - "time" "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/grafana/dskit/kv" - "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" "github.com/grafana/dskit/tenant" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" @@ -25,21 +18,10 @@ import ( "github.com/grafana/loki/pkg/storage/stores/series/index" "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb" "github.com/grafana/loki/pkg/storage/stores/shipper/util" - util_log "github.com/grafana/loki/pkg/util/log" ) const ( - maxIndexEntriesPerResponse = 1000 - ringAutoForgetUnhealthyPeriods = 10 - ringNameForServer = "index-gateway" - ringNumTokens = 128 - ringCheckPeriod = 3 * time.Second - - // RingIdentifier is used as a unique name to register the Index Gateway ring. - RingIdentifier = "index-gateway" - - // RingKey is the name of the key used to register the different Index Gateway instances in the key-value store. - RingKey = "index-gateway" + maxIndexEntriesPerResponse = 1000 ) type IndexQuerier interface { @@ -65,12 +47,6 @@ type Gateway struct { log log.Logger shipper IndexQuerier - - subservices *services.Manager - subservicesWatcher *services.FailureWatcher - - ringLifecycler *ring.BasicLifecycler - ring *ring.Ring } // NewIndexGateway instantiates a new Index Gateway and start its services. @@ -80,141 +56,18 @@ type Gateway struct { func NewIndexGateway(cfg Config, log log.Logger, registerer prometheus.Registerer, indexQuerier IndexQuerier, indexClient IndexClient) (*Gateway, error) { g := &Gateway{ indexQuerier: indexQuerier, - indexClient: indexClient, cfg: cfg, log: log, + indexClient: indexClient, } - if cfg.Mode == RingMode { - ringStore, err := kv.NewClient( - cfg.Ring.KVStore, - ring.GetCodec(), - kv.RegistererWithKVName(prometheus.WrapRegistererWithPrefix("loki_", registerer), "index-gateway"), - log, - ) - if err != nil { - return nil, errors.Wrap(err, "create KV store client") - } - - lifecyclerCfg, err := cfg.Ring.ToLifecyclerConfig(ringNumTokens, log) - if err != nil { - return nil, errors.Wrap(err, "invalid ring lifecycler config") - } - - delegate := ring.BasicLifecyclerDelegate(g) - delegate = ring.NewLeaveOnStoppingDelegate(delegate, log) - delegate = ring.NewTokensPersistencyDelegate(cfg.Ring.TokensFilePath, ring.JOINING, delegate, log) - delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*cfg.Ring.HeartbeatTimeout, delegate, log) - - g.ringLifecycler, err = ring.NewBasicLifecycler(lifecyclerCfg, ringNameForServer, RingKey, ringStore, delegate, log, registerer) - if err != nil { - return nil, errors.Wrap(err, "index gateway create ring lifecycler") - } - - ringCfg := cfg.Ring.ToRingConfig(cfg.Ring.ReplicationFactor) - g.ring, err = ring.NewWithStoreClientAndStrategy(ringCfg, ringNameForServer, RingKey, ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy(), prometheus.WrapRegistererWithPrefix("loki_", registerer), log) - if err != nil { - return nil, errors.Wrap(err, "index gateway create ring client") - } - - svcs := []services.Service{g.ringLifecycler, g.ring} - g.subservices, err = services.NewManager(svcs...) - if err != nil { - return nil, fmt.Errorf("new index gateway services manager: %w", err) - } - - g.subservicesWatcher = services.NewFailureWatcher() - g.subservicesWatcher.WatchManager(g.subservices) - g.Service = services.NewBasicService(g.starting, g.running, g.stopping) - } else { - g.Service = services.NewIdleService(nil, func(failureCase error) error { - g.indexQuerier.Stop() - g.indexClient.Stop() - return nil - }) - } - - return g, nil -} - -// starting implements the Lifecycler interface and is one of the lifecycle hooks. -// -// Only invoked if the Index Gateway is in ring mode. -func (g *Gateway) starting(ctx context.Context) (err error) { - // In case this function will return error we want to unregister the instance - // from the ring. We do it ensuring dependencies are gracefully stopped if they - // were already started. - defer func() { - if err == nil || g.subservices == nil { - return - } - - if stopErr := services.StopManagerAndAwaitStopped(context.Background(), g.subservices); stopErr != nil { - level.Error(util_log.Logger).Log("msg", "failed to gracefully stop index gateway dependencies", "err", stopErr) - } - }() - - if err := services.StartManagerAndAwaitHealthy(ctx, g.subservices); err != nil { - return errors.Wrap(err, "unable to start index gateway subservices") - } - - // The BasicLifecycler does not automatically move state to ACTIVE such that any additional work that - // someone wants to do can be done before becoming ACTIVE. For the index gateway we don't currently - // have any additional work so we can become ACTIVE right away. - // Wait until the ring client detected this instance in the JOINING - // state to make sure that when we'll run the initial sync we already - // know the tokens assigned to this instance. - level.Info(util_log.Logger).Log("msg", "waiting until index gateway is JOINING in the ring") - if err := ring.WaitInstanceState(ctx, g.ring, g.ringLifecycler.GetInstanceID(), ring.JOINING); err != nil { - return err - } - level.Info(util_log.Logger).Log("msg", "index gateway is JOINING in the ring") - - if err = g.ringLifecycler.ChangeState(ctx, ring.ACTIVE); err != nil { - return errors.Wrapf(err, "switch instance to %s in the ring", ring.ACTIVE) - } - - // Wait until the ring client detected this instance in the ACTIVE state to - // make sure that when we'll run the loop it won't be detected as a ring - // topology change. - level.Info(util_log.Logger).Log("msg", "waiting until index gateway is ACTIVE in the ring") - if err := ring.WaitInstanceState(ctx, g.ring, g.ringLifecycler.GetInstanceID(), ring.ACTIVE); err != nil { - return err - } - level.Info(util_log.Logger).Log("msg", "index gateway is ACTIVE in the ring") - - return nil -} - -// running implements the Lifecycler interface and is one of the lifecycle hooks. -// -// Only invoked if the Index Gateway is in ring mode. -func (g *Gateway) running(ctx context.Context) error { - t := time.NewTicker(ringCheckPeriod) - defer t.Stop() - for { - select { - case <-ctx.Done(): - return nil - case err := <-g.subservicesWatcher.Chan(): - return errors.Wrap(err, "running index gateway subservice failed") - case <-t.C: - continue - // TODO: should we implement CAS check? - } - } -} - -// stopping implements the Lifecycler interface and is one of the lifecycle hooks. -// -// Only invoked if the Index Gateway is in ring mode. -func (g *Gateway) stopping(_ error) error { - level.Debug(util_log.Logger).Log("msg", "stopping index gateway") - defer func() { + g.Service = services.NewIdleService(nil, func(failureCase error) error { g.indexQuerier.Stop() g.indexClient.Stop() - }() - return services.StopManagerAndAwaitStopped(context.Background(), g.subservices) + return nil + }) + + return g, nil } func (g *Gateway) QueryIndex(request *indexgatewaypb.QueryIndexRequest, server indexgatewaypb.IndexGateway_QueryIndexServer) error { @@ -377,12 +230,3 @@ func (g *Gateway) LabelValuesForMetricName(ctx context.Context, req *indexgatewa Values: names, }, nil } - -// ServeHTTP serves the HTTP route /indexgateway/ring. -func (g *Gateway) ServeHTTP(w http.ResponseWriter, req *http.Request) { - if g.cfg.Mode == RingMode { - g.ring.ServeHTTP(w, req) - } else { - w.Write([]byte("IndexGateway running with 'useIndexGatewayRing' disabled.")) - } -} diff --git a/pkg/storage/stores/shipper/indexgateway/gateway_test.go b/pkg/storage/stores/shipper/indexgateway/gateway_test.go index c60c4c422b..0b70a9dbc1 100644 --- a/pkg/storage/stores/shipper/indexgateway/gateway_test.go +++ b/pkg/storage/stores/shipper/indexgateway/gateway_test.go @@ -127,8 +127,8 @@ func TestGateway_QueryIndex(t *testing.T) { }) } expectedQueryKey = util.QueryKey(query) - gateway.indexClient = mockIndexClient{response: &mockBatch{size: responseSize}} + err := gateway.QueryIndex(&indexgatewaypb.QueryIndexRequest{Queries: []*indexgatewaypb.IndexQuery{{ TableName: query.TableName, HashValue: query.HashValue, diff --git a/pkg/storage/stores/shipper/indexgateway/lifecycle.go b/pkg/storage/stores/shipper/indexgateway/lifecycle.go index 05639b1f94..5cf109a3c4 100644 --- a/pkg/storage/stores/shipper/indexgateway/lifecycle.go +++ b/pkg/storage/stores/shipper/indexgateway/lifecycle.go @@ -4,7 +4,7 @@ import ( "github.com/grafana/dskit/ring" ) -func (g *Gateway) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) { +func (rm *RingManager) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) { // When we initialize the index gateway instance in the ring we want to start from // a clean situation, so whatever is the state we set it JOINING, while we keep existing // tokens (if any) or the ones loaded from file. @@ -22,7 +22,7 @@ func (g *Gateway) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring. return ring.JOINING, tokens } -func (g *Gateway) OnRingInstanceTokens(_ *ring.BasicLifecycler, _ ring.Tokens) {} -func (g *Gateway) OnRingInstanceStopping(_ *ring.BasicLifecycler) {} -func (g *Gateway) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.InstanceDesc) { +func (rm *RingManager) OnRingInstanceTokens(_ *ring.BasicLifecycler, _ ring.Tokens) {} +func (rm *RingManager) OnRingInstanceStopping(_ *ring.BasicLifecycler) {} +func (rm *RingManager) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.InstanceDesc) { } diff --git a/pkg/storage/stores/shipper/indexgateway/ringmanager.go b/pkg/storage/stores/shipper/indexgateway/ringmanager.go new file mode 100644 index 0000000000..b0dfce18db --- /dev/null +++ b/pkg/storage/stores/shipper/indexgateway/ringmanager.go @@ -0,0 +1,255 @@ +package indexgateway + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/kv" + "github.com/grafana/dskit/ring" + "github.com/grafana/dskit/services" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + + loki_util "github.com/grafana/loki/pkg/util" +) + +const ( + ringAutoForgetUnhealthyPeriods = 10 + ringNameForServer = "index-gateway" + ringNumTokens = 128 + ringCheckPeriod = 3 * time.Second + + // RingIdentifier is used as a unique name to register the Index Gateway ring. + RingIdentifier = "index-gateway" + + // RingKey is the name of the key used to register the different Index Gateway instances in the key-value store. + RingKey = "index-gateway" +) + +// ManagerMode defines the different modes for the RingManager to execute. +// +// The RingManager and its modes are only relevant if the IndexGateway is running in ring mode. +type ManagerMode int + +const ( + // ClientMode is the RingManager mode executed by Loki components that are clients of the IndexGateway. + // The RingManager in client will have its own ring key-value store but it won't try to register itself in the ring. + ClientMode ManagerMode = iota + + // ServerMode is the RingManager mode execute by the IndexGateway. + // The RingManager in server mode will register itself in the ring. + ServerMode +) + +// RingManager is a component instantiated before all the others and is responsible for the ring setup. +// +// All Loki components that are involved with the IndexGateway (including the IndexGateway itself) will +// require a RingManager. However, the components that are clients of the IndexGateway will ran it in client +// mode while the IndexGateway itself will ran the manager in server mode. +type RingManager struct { + services.Service + + subservices *services.Manager + subservicesWatcher *services.FailureWatcher + + RingLifecycler *ring.BasicLifecycler + Ring *ring.Ring + managerMode ManagerMode + + cfg Config + + log log.Logger +} + +// NewRingManager is the recommended way of instantiating a RingManager. +// +// The other functions will assume the RingManager was instantiated through this function. +func NewRingManager(managerMode ManagerMode, cfg Config, log log.Logger, registerer prometheus.Registerer) (*RingManager, error) { + rm := &RingManager{ + cfg: cfg, log: log, managerMode: managerMode, + } + + if cfg.Mode != RingMode { + return nil, fmt.Errorf("ring manager shouldn't be invoked when index gateway not in ring mode") + } + + // instantiate kv store for both modes. + ringStore, err := kv.NewClient( + rm.cfg.Ring.KVStore, + ring.GetCodec(), + kv.RegistererWithKVName(prometheus.WrapRegistererWithPrefix("loki_", registerer), "index-gateway-ring-manager"), + rm.log, + ) + if err != nil { + return nil, errors.Wrap(err, "index gateway ring manager create KV store client") + } + + // instantiate ring for both mode modes. + ringCfg := rm.cfg.Ring.ToRingConfig(rm.cfg.Ring.ReplicationFactor) + rm.Ring, err = ring.NewWithStoreClientAndStrategy(ringCfg, ringNameForServer, RingKey, ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy(), prometheus.WrapRegistererWithPrefix("loki_", registerer), rm.log) + if err != nil { + return nil, errors.Wrap(err, "index gateway ring manager create ring client") + } + + if managerMode == ServerMode { + if err := rm.startServerMode(ringStore, registerer); err != nil { + return nil, err + } + return rm, nil + } + + if err := rm.startClientMode(); err != nil { + return nil, err + } + return rm, nil +} + +func (rm *RingManager) startServerMode(ringStore kv.Client, registerer prometheus.Registerer) error { + lifecyclerCfg, err := rm.cfg.Ring.ToLifecyclerConfig(ringNumTokens, rm.log) + if err != nil { + return errors.Wrap(err, "invalid ring lifecycler config") + } + + delegate := ring.BasicLifecyclerDelegate(rm) + delegate = ring.NewLeaveOnStoppingDelegate(delegate, rm.log) + delegate = ring.NewTokensPersistencyDelegate(rm.cfg.Ring.TokensFilePath, ring.JOINING, delegate, rm.log) + delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*rm.cfg.Ring.HeartbeatTimeout, delegate, rm.log) + + rm.RingLifecycler, err = ring.NewBasicLifecycler(lifecyclerCfg, ringNameForServer, RingKey, ringStore, delegate, rm.log, registerer) + if err != nil { + return errors.Wrap(err, "index gateway ring manager create ring lifecycler") + } + + svcs := []services.Service{rm.RingLifecycler, rm.Ring} + rm.subservices, err = services.NewManager(svcs...) + if err != nil { + return errors.Wrap(err, "new index gateway services manager in server mode") + } + + rm.subservicesWatcher = services.NewFailureWatcher() + rm.subservicesWatcher.WatchManager(rm.subservices) + rm.Service = services.NewBasicService(rm.starting, rm.running, rm.stopping) + + return nil +} + +func (rm *RingManager) startClientMode() error { + var err error + + svcs := []services.Service{rm.Ring} + rm.subservices, err = services.NewManager(svcs...) + if err != nil { + return errors.Wrap(err, "new index gateway services manager in client mode") + } + + rm.subservicesWatcher = services.NewFailureWatcher() + rm.subservicesWatcher.WatchManager(rm.subservices) + + rm.Service = services.NewIdleService(func(ctx context.Context) error { + return services.StartManagerAndAwaitHealthy(ctx, rm.subservices) + }, func(failureCase error) error { + return services.StopManagerAndAwaitStopped(context.Background(), rm.subservices) + }) + + return nil +} + +// starting implements the Lifecycler interface and is one of the lifecycle hooks. +func (rm *RingManager) starting(ctx context.Context) (err error) { + // In case this function will return error we want to unregister the instance + // from the ring. We do it ensuring dependencies are gracefully stopped if they + // were already started. + defer func() { + if err == nil || rm.subservices == nil { + return + } + + if stopErr := services.StopManagerAndAwaitStopped(context.Background(), rm.subservices); stopErr != nil { + level.Error(rm.log).Log("msg", "failed to gracefully stop index gateway ring manager dependencies", "err", stopErr) + } + }() + + if err := services.StartManagerAndAwaitHealthy(ctx, rm.subservices); err != nil { + return errors.Wrap(err, "unable to start index gateway ring manager subservices") + } + + // The BasicLifecycler does not automatically move state to ACTIVE such that any additional work that + // someone wants to do can be done before becoming ACTIVE. For the index gateway we don't currently + // have any additional work so we can become ACTIVE right away. + // Wait until the ring client detected this instance in the JOINING + // state to make sure that when we'll run the initial sync we already + // know the tokens assigned to this instance. + level.Info(rm.log).Log("msg", "waiting until index gateway is JOINING in the ring") + if err := ring.WaitInstanceState(ctx, rm.Ring, rm.RingLifecycler.GetInstanceID(), ring.JOINING); err != nil { + return err + } + level.Info(rm.log).Log("msg", "index gateway is JOINING in the ring") + + if err = rm.RingLifecycler.ChangeState(ctx, ring.ACTIVE); err != nil { + return errors.Wrapf(err, "switch instance to %s in the ring", ring.ACTIVE) + } + + // Wait until the ring client detected this instance in the ACTIVE state to + // make sure that when we'll run the loop it won't be detected as a ring + // topology change. + level.Info(rm.log).Log("msg", "waiting until index gateway is ACTIVE in the ring") + if err := ring.WaitInstanceState(ctx, rm.Ring, rm.RingLifecycler.GetInstanceID(), ring.ACTIVE); err != nil { + return err + } + level.Info(rm.log).Log("msg", "index gateway is ACTIVE in the ring") + + return nil +} + +// running implements the Lifecycler interface and is one of the lifecycle hooks. +func (rm *RingManager) running(ctx context.Context) error { + t := time.NewTicker(ringCheckPeriod) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return nil + case err := <-rm.subservicesWatcher.Chan(): + return errors.Wrap(err, "running index gateway ring manager subservice failed") + case <-t.C: + continue + } + } +} + +// stopping implements the Lifecycler interface and is one of the lifecycle hooks. +func (rm *RingManager) stopping(_ error) error { + level.Debug(rm.log).Log("msg", "stopping index gateway ring manager") + return services.StopManagerAndAwaitStopped(context.Background(), rm.subservices) +} + +// IndexGatewayOwnsTenant dictates if a given tenant should be ignored by an IndexGateway or not. +// +// It fallbacks to true so that the IndexGateway will only skip tenants if it is certain of that. +// This implementation relies on the tokens assigned to an IndexGateway instance to define if a tenant +// is assigned or not. +func (rm *RingManager) IndexGatewayOwnsTenant(tenant string) bool { + if rm.cfg.Mode != RingMode { + return true + } + + if rm.managerMode == ClientMode { + level.Error(rm.log).Log("msg", "ring manager in client mode doesn't support tenant in boundaries interface") + return true + } + + return loki_util.IsAssignedKey(rm.Ring, rm.RingLifecycler.GetInstanceAddr(), tenant) +} + +// ServeHTTP serves the HTTP route /indexgateway/ring. +func (rm *RingManager) ServeHTTP(w http.ResponseWriter, req *http.Request) { + if rm.cfg.Mode == RingMode { + rm.Ring.ServeHTTP(w, req) + } else { + _, _ = w.Write([]byte("IndexGateway running with 'useIndexGatewayRing' disabled.")) + } +} diff --git a/pkg/storage/stores/shipper/shipper_index_client.go b/pkg/storage/stores/shipper/shipper_index_client.go index c57e53fbdf..40b55b69d3 100644 --- a/pkg/storage/stores/shipper/shipper_index_client.go +++ b/pkg/storage/stores/shipper/shipper_index_client.go @@ -111,13 +111,13 @@ type Shipper struct { } // NewShipper creates a shipper for syncing local objects with a store -func NewShipper(cfg Config, storageClient client.ObjectClient, limits downloads.Limits, registerer prometheus.Registerer) (index.Client, error) { +func NewShipper(cfg Config, storageClient client.ObjectClient, limits downloads.Limits, ownsTenantFn downloads.IndexGatewayOwnsTenant, registerer prometheus.Registerer) (index.Client, error) { shipper := Shipper{ cfg: cfg, metrics: newMetrics(registerer), } - err := shipper.init(storageClient, limits, registerer) + err := shipper.init(storageClient, limits, ownsTenantFn, registerer) if err != nil { return nil, err } @@ -127,7 +127,7 @@ func NewShipper(cfg Config, storageClient client.ObjectClient, limits downloads. return &shipper, nil } -func (s *Shipper) init(storageClient client.ObjectClient, limits downloads.Limits, registerer prometheus.Registerer) error { +func (s *Shipper) init(storageClient client.ObjectClient, limits downloads.Limits, ownsTenantFn downloads.IndexGatewayOwnsTenant, registerer prometheus.Registerer) error { // When we run with target querier we don't have ActiveIndexDirectory set so using CacheLocation instead. // Also it doesn't matter which directory we use since BoltDBIndexClient doesn't do anything with it but it is good to have a valid path. boltdbIndexClientDir := s.cfg.ActiveIndexDirectory @@ -172,7 +172,8 @@ func (s *Shipper) init(storageClient client.ObjectClient, limits downloads.Limit QueryReadyNumDays: s.cfg.QueryReadyNumDays, Limits: limits, } - downloadsManager, err := downloads.NewTableManager(cfg, s.boltDBIndexClient, indexStorageClient, registerer) + + downloadsManager, err := downloads.NewTableManager(cfg, s.boltDBIndexClient, indexStorageClient, ownsTenantFn, registerer) if err != nil { return err } diff --git a/pkg/util/ring.go b/pkg/util/ring.go index 8270d94ae0..80ab207d1d 100644 --- a/pkg/util/ring.go +++ b/pkg/util/ring.go @@ -3,7 +3,10 @@ package util import ( "hash/fnv" + "github.com/go-kit/log/level" "github.com/grafana/dskit/ring" + + util_log "github.com/grafana/loki/pkg/util/log" ) // TokenFor generates a token used for finding ingesters from ring @@ -16,7 +19,7 @@ func TokenFor(userID, labels string) uint32 { // IsInReplicationSet will query the provided ring for the provided key // and see if the provided address is in the resulting ReplicationSet -func IsInReplicationSet(r *ring.Ring, ringKey uint32, address string) (bool, error) { +func IsInReplicationSet(r ring.ReadRing, ringKey uint32, address string) (bool, error) { bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet() rs, err := r.Get(ringKey, ring.Write, bufDescs, bufHosts, bufZones) if err != nil { @@ -31,3 +34,16 @@ func IsInReplicationSet(r *ring.Ring, ringKey uint32, address string) (bool, err } return false, nil } + +// IsAssignedKey replies wether the given instance address is in the ReplicationSet responsible for the given key or not, based on the tokens. +// +// The result will be defined based on the tokens assigned to each ring component, queried through the ring client. +func IsAssignedKey(ringClient ring.ReadRing, instanceAddress string, key string) bool { + token := TokenFor(key, "" /* labels */) + inSet, err := IsInReplicationSet(ringClient, token, instanceAddress) + if err != nil { + level.Error(util_log.Logger).Log("msg", "error checking if key is in replicationset", "error", err, "key", key) + return false + } + return inSet +} diff --git a/pkg/util/ring_test.go b/pkg/util/ring_test.go new file mode 100644 index 0000000000..7955c20c2e --- /dev/null +++ b/pkg/util/ring_test.go @@ -0,0 +1,141 @@ +package util + +import ( + "testing" + "time" + + "github.com/grafana/dskit/ring" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/mock" +) + +func TestTokenFor(t *testing.T) { + if TokenFor("userID", "labels") != 2908432762 { + t.Errorf("TokenFor(userID, labels) = %v, want 2908432762", TokenFor("userID", "labels")) + } +} + +func TestIsAssignedKey(t *testing.T) { + for _, tc := range []struct { + desc string + ring ring.ReadRing + userID string + exp bool + addr string + }{ + { + desc: "basic ring and tenant are assigned key", + ring: newReadRingMock([]ring.InstanceDesc{{Addr: "127.0.0.1", Timestamp: time.Now().UnixNano(), State: ring.ACTIVE, Tokens: []uint32{1, 2, 3}}}), + userID: "1", + exp: true, + addr: "127.0.0.1", + }, + { + desc: "basic ring and tenant are not assigned key", + ring: newReadRingMock([]ring.InstanceDesc{{Addr: "127.0.0.2", Timestamp: time.Now().UnixNano(), State: ring.ACTIVE, Tokens: []uint32{1, 2, 3}}}), + userID: "1", + exp: false, + addr: "127.0.0.1", + }, + } { + t.Run(tc.desc, func(t *testing.T) { + if res := IsAssignedKey(tc.ring, newReadLifecyclerMock(tc.addr).addr, tc.userID); res != tc.exp { + t.Errorf("IsAssignedKey(%v, %v) = %v, want %v", tc.ring, tc.userID, res, tc.exp) + } + }) + } +} + +type readRingMock struct { + replicationSet ring.ReplicationSet +} + +func newReadRingMock(ingesters []ring.InstanceDesc) *readRingMock { + return &readRingMock{ + replicationSet: ring.ReplicationSet{ + Instances: ingesters, + MaxErrors: 0, + }, + } +} + +func (r *readRingMock) Describe(ch chan<- *prometheus.Desc) { +} + +func (r *readRingMock) Collect(ch chan<- prometheus.Metric) { +} + +func (r *readRingMock) Get(key uint32, op ring.Operation, buf []ring.InstanceDesc, _ []string, _ []string) (ring.ReplicationSet, error) { + return r.replicationSet, nil +} + +func (r *readRingMock) ShuffleShard(identifier string, size int) ring.ReadRing { + // pass by value to copy + return func(r readRingMock) *readRingMock { + r.replicationSet.Instances = r.replicationSet.Instances[:size] + return &r + }(*r) +} + +func (r *readRingMock) BatchGet(keys []uint32, op ring.Operation) ([]ring.ReplicationSet, error) { + return []ring.ReplicationSet{r.replicationSet}, nil +} + +func (r *readRingMock) GetAllHealthy(op ring.Operation) (ring.ReplicationSet, error) { + return r.replicationSet, nil +} + +func (r *readRingMock) GetReplicationSetForOperation(op ring.Operation) (ring.ReplicationSet, error) { + return r.replicationSet, nil +} + +func (r *readRingMock) ReplicationFactor() int { + return 1 +} + +func (r *readRingMock) InstancesCount() int { + return len(r.replicationSet.Instances) +} + +func (r *readRingMock) Subring(key uint32, n int) ring.ReadRing { + return r +} + +func (r *readRingMock) HasInstance(instanceID string) bool { + for _, ing := range r.replicationSet.Instances { + if ing.Addr != instanceID { + return true + } + } + return false +} + +func (r *readRingMock) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ring.ReadRing { + return r +} + +func (r *readRingMock) CleanupShuffleShardCache(identifier string) {} + +func (r *readRingMock) GetInstanceState(instanceID string) (ring.InstanceState, error) { + return 0, nil +} + +type readLifecyclerMock struct { + mock.Mock + addr string +} + +func newReadLifecyclerMock(addr string) *readLifecyclerMock { + return &readLifecyclerMock{ + addr: addr, + } +} + +func (m *readLifecyclerMock) HealthyInstancesCount() int { + args := m.Called() + return args.Int(0) +} + +func (m *readLifecyclerMock) GetInstanceAddr() string { + return m.addr +}