Add shuffle sharding to index gateway (#9710)

This PR introduces shuffle sharding of the index files across index gateways where the shuffle sharding key is the tenant ID.

Instead of having a global replication factor on the index gateways which affects all tenants the same, whether they are small or large tenants, the shuffle sharding allows to specify a shard factor per tenant (on top of the replication factor). This a) reduces the amount of indexes that are downloaded on individual index gateways in case of a high replication factor (which is currently the de-factor standard to deal with high load) and b) allows to scale tenants individually.

The shuffle sharding algorithm is part of dskit's ring implementation and is not part of this commit.
If you have set a high RF for the index gateways, it is required to reduce the RF and instead set the shard factor for the tenants.


Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/9761/head
Christian Haudum 2 years ago committed by GitHub
parent 3509edd98d
commit ce81895241
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 10
      docs/sources/configuration/_index.md
  3. 19
      docs/sources/upgrading/_index.md
  4. 4
      pkg/loki/loki.go
  5. 7
      pkg/loki/modules.go
  6. 13
      pkg/storage/factory.go
  7. 2
      pkg/storage/store.go
  8. 43
      pkg/storage/stores/indexshipper/downloads/table_manager.go
  9. 57
      pkg/storage/stores/indexshipper/gatewayclient/gateway_client.go
  10. 146
      pkg/storage/stores/indexshipper/gatewayclient/gateway_client_test.go
  11. 8
      pkg/storage/stores/indexshipper/shipper.go
  12. 2
      pkg/storage/stores/shipper/indexgateway/config.go
  13. 10
      pkg/storage/stores/shipper/indexgateway/gateway.go
  14. 28
      pkg/storage/stores/shipper/indexgateway/ringmanager.go
  15. 113
      pkg/storage/stores/shipper/indexgateway/shufflesharding.go
  16. 8
      pkg/storage/stores/shipper/shipper_index_client.go
  17. 2
      pkg/util/limiter/combined_limits.go
  18. 25
      pkg/util/ring.go
  19. 31
      pkg/util/ring_test.go
  20. 8
      pkg/validation/limits.go

@ -6,6 +6,7 @@
##### Enhancements
* [9710](https://github.com/grafana/loki/pull/9710) **chaudum**: Add shuffle sharding to index gateway
* [9573](https://github.com/grafana/loki/pull/9573) **CCOLLOT**: Lambda-Promtail: Add support for AWS CloudFront log ingestion.
* [9497](https://github.com/grafana/loki/pull/9497) **CCOLLOT**: Lambda-Promtail: Add support for AWS CloudTrail log ingestion.
* [8886](https://github.com/grafana/loki/pull/8886) **MichelHollands**: Add new logql template function `unixToTime`

@ -1651,7 +1651,9 @@ ring:
# CLI flag: -index-gateway.ring.instance-enable-ipv6
[instance_enable_ipv6: <boolean> | default = false]
# How many index gateway instances are assigned to each tenant.
# Deprecated: How many index gateway instances are assigned to each tenant.
# Use -index-gateway.shard-size instead. The shard size is also a per-tenant
# setting.
# CLI flag: -replication-factor
[replication_factor: <int> | default = 3]
```
@ -2612,6 +2614,12 @@ shard_streams:
# Minimum number of label matchers a query should contain.
[minimum_labels_number: <int>]
# The shard size defines how many index gateways should be used by a tenant for
# querying. If the global shard factor is 0, the global shard factor is set to
# the deprecated -replication-factor for backwards compatibility reasons.
# CLI flag: -index-gateway.shard-size
[index_gateway_shard_size: <int> | default = 0]
```
### frontend_worker

@ -35,7 +35,26 @@ The output is incredibly verbose as it shows the entire internal config struct u
### Loki
#### Index gateway shuffle sharding
The index gateway now supports shuffle sharding of index data when running in
"ring" mode. The index data is sharded by tenant where each tenant gets
assigned a sub-set of all available instances of the index gateways in the ring.
If you configured a high replication factor to accommodate for load, since
in the past this was the only option to give a tenant more instances for
querying, you should consider reducing the replication factor to a meaningful
value for replication (for example, from 12 to 3) and instead set the shard factor for
individual tenants as required.
If the global shard factor (no per-tenant) is 0 (default value), the global
shard factor is set to replication factor. It can still be overwritten per
tenant.
In the context of the index gateway, sharding is synonymous to replication.
#### Index shipper multi-store support
In previous releases, if you did not explicitly configure `-boltdb.shipper.shared-store`, `-tsdb.shipper.shared-store`, those values default to the `object_store` configured in the latest `period_config` of the corresponding index type.
These defaults are removed in favor of uploading indexes to multiple stores. If you do not explicitly configure a `shared-store`, the boltdb and tsdb indexes will be shipped to the `object_store` configured for that period.

@ -666,8 +666,8 @@ func (t *Loki) setupModuleManager() error {
Compactor: {Server, Overrides, MemberlistKV, Analytics},
IndexGateway: {Server, Store, Overrides, Analytics, MemberlistKV, IndexGatewayRing},
IngesterQuerier: {Ring},
QuerySchedulerRing: {RuntimeConfig, Server, MemberlistKV},
IndexGatewayRing: {RuntimeConfig, Server, MemberlistKV},
QuerySchedulerRing: {Overrides, Server, MemberlistKV},
IndexGatewayRing: {Overrides, Server, MemberlistKV},
All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler, Compactor},
Read: {QueryFrontend, Querier},
Write: {Ingester, Distributor},

@ -257,6 +257,9 @@ func (t *Loki) initRuntimeConfig() (services.Service, error) {
}
func (t *Loki) initOverrides() (_ services.Service, err error) {
if t.Cfg.LimitsConfig.IndexGatewayShardSize == 0 {
t.Cfg.LimitsConfig.IndexGatewayShardSize = t.Cfg.IndexGateway.Ring.ReplicationFactor
}
t.Overrides, err = validation.NewOverrides(t.Cfg.LimitsConfig, t.TenantLimits)
// overrides are not a service, since they don't have any operational state.
return nil, err
@ -1163,6 +1166,8 @@ func (t *Loki) addCompactorMiddleware(h http.HandlerFunc) http.Handler {
func (t *Loki) initIndexGateway() (services.Service, error) {
t.Cfg.IndexGateway.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
shardingStrategy := indexgateway.GetShardingStrategy(t.Cfg.IndexGateway, t.indexGatewayRingManager, t.Overrides)
var indexClients []indexgateway.IndexClientWithRange
for i, period := range t.Cfg.SchemaConfig.Configs {
if period.IndexType != config.BoltDBShipperType {
@ -1175,7 +1180,7 @@ func (t *Loki) initIndexGateway() (services.Service, error) {
}
tableRange := period.GetIndexTableNumberRange(periodEndTime)
indexClient, err := storage.NewIndexClient(period, tableRange, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.Overrides, t.clientMetrics, t.indexGatewayRingManager.IndexGatewayOwnsTenant,
indexClient, err := storage.NewIndexClient(period, tableRange, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.Overrides, t.clientMetrics, shardingStrategy,
prometheus.DefaultRegisterer, log.With(util_log.Logger, "index-store", fmt.Sprintf("%s-%s", period.IndexType, period.From.String())),
)
if err != nil {

@ -35,6 +35,7 @@ import (
"github.com/grafana/loki/pkg/storage/stores/indexshipper/gatewayclient"
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway"
util_log "github.com/grafana/loki/pkg/util/log"
)
@ -63,6 +64,7 @@ func ResetBoltDBIndexClientsWithShipper() {
type StoreLimits interface {
downloads.Limits
stores.StoreLimits
indexgateway.Limits
CardinalityLimit(string) int
}
@ -346,7 +348,7 @@ func (cfg *Config) Validate() error {
}
// NewIndexClient makes a new index client of the desired type.
func NewIndexClient(periodCfg config.PeriodConfig, tableRange config.TableRange, cfg Config, schemaCfg config.SchemaConfig, limits StoreLimits, cm ClientMetrics, ownsTenantFn downloads.IndexGatewayOwnsTenant, registerer prometheus.Registerer, logger log.Logger) (index.Client, error) {
func NewIndexClient(periodCfg config.PeriodConfig, tableRange config.TableRange, cfg Config, schemaCfg config.SchemaConfig, limits StoreLimits, cm ClientMetrics, shardingStrategy indexgateway.ShardingStrategy, registerer prometheus.Registerer, logger log.Logger) (index.Client, error) {
switch periodCfg.IndexType {
case config.StorageTypeInMemory:
store := testutils.NewMockStorage()
@ -379,7 +381,7 @@ func NewIndexClient(periodCfg config.PeriodConfig, tableRange config.TableRange,
return indexGatewayClient, nil
}
gateway, err := gatewayclient.NewGatewayClient(cfg.BoltDBShipperConfig.IndexGatewayClientConfig, registerer, logger)
gateway, err := gatewayclient.NewGatewayClient(cfg.BoltDBShipperConfig.IndexGatewayClientConfig, registerer, limits, logger)
if err != nil {
return nil, err
}
@ -402,8 +404,11 @@ func NewIndexClient(periodCfg config.PeriodConfig, tableRange config.TableRange,
return nil, err
}
shipper, err := shipper.NewShipper(cfg.BoltDBShipperConfig, objectClient, limits,
ownsTenantFn, tableRange, registerer, logger)
var filterFn downloads.TenantFilter
if shardingStrategy != nil {
filterFn = shardingStrategy.FilterTenants
}
shipper, err := shipper.NewShipper(cfg.BoltDBShipperConfig, objectClient, limits, filterFn, tableRange, registerer, logger)
if err != nil {
return nil, err
}

@ -221,7 +221,7 @@ func (s *store) storeForPeriod(p config.PeriodConfig, tableRange config.TableRan
if p.IndexType == config.TSDBType {
if shouldUseIndexGatewayClient(s.cfg.TSDBShipperConfig) {
// inject the index-gateway client into the index store
gw, err := gatewayclient.NewGatewayClient(s.cfg.TSDBShipperConfig.IndexGatewayClientConfig, indexClientReg, indexClientLogger)
gw, err := gatewayclient.NewGatewayClient(s.cfg.TSDBShipperConfig.IndexGatewayClientConfig, indexClientReg, s.limits, indexClientLogger)
if err != nil {
return nil, nil, nil, err
}

@ -5,6 +5,7 @@ import (
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"
@ -31,10 +32,15 @@ type Limits interface {
DefaultLimits() *validation.Limits
}
// IndexGatewayOwnsTenant is invoked by an IndexGateway instance and answers whether if the given tenant is assigned to this instance or not.
// TenantFilter is invoked by an IndexGateway instance and answers which
// tenants from the given list of tenants are assigned to this instance.
//
// It is only relevant by an IndexGateway in the ring mode and if it returns false for a given tenant, that tenant will be ignored by this IndexGateway during query readiness.
type IndexGatewayOwnsTenant func(tenant string) bool
// It is only relevant by an IndexGateway in the ring mode and if its result
// does not contain a given tenant, that tenant will be ignored by this
// IndexGateway during query readiness.
//
// It requires the same function signature as indexgateway.(*ShardingStrategy).FilterTenants
type TenantFilter func([]string) ([]string, error)
type TableManager interface {
Stop()
@ -65,11 +71,11 @@ type tableManager struct {
cancel context.CancelFunc
wg sync.WaitGroup
ownsTenant IndexGatewayOwnsTenant
tenantFilter TenantFilter
}
func NewTableManager(cfg Config, openIndexFileFunc index.OpenIndexFileFunc, indexStorageClient storage.Client,
ownsTenantFn IndexGatewayOwnsTenant, tableRangeToHandle config.TableRange, reg prometheus.Registerer, logger log.Logger) (TableManager, error) {
tenantFilter TenantFilter, tableRangeToHandle config.TableRange, reg prometheus.Registerer, logger log.Logger) (TableManager, error) {
if err := util.EnsureDirectory(cfg.CacheDir); err != nil {
return nil, err
}
@ -80,7 +86,7 @@ func NewTableManager(cfg Config, openIndexFileFunc index.OpenIndexFileFunc, inde
openIndexFileFunc: openIndexFileFunc,
indexStorageClient: indexStorageClient,
tableRangeToHandle: tableRangeToHandle,
ownsTenant: ownsTenantFn,
tenantFilter: tenantFilter,
tables: make(map[string]Table),
metrics: newMetrics(reg),
logger: logger,
@ -257,7 +263,11 @@ func (tm *tableManager) ensureQueryReadiness(ctx context.Context) error {
distinctUsers := make(map[string]struct{})
defer func() {
level.Info(tm.logger).Log("msg", "query readiness setup completed", "duration", time.Since(start), "distinct_users_len", len(distinctUsers))
ids := make([]string, 0, len(distinctUsers))
for k := range distinctUsers {
ids = append(ids, k)
}
level.Info(tm.logger).Log("msg", "query readiness setup completed", "duration", time.Since(start), "distinct_users_len", len(distinctUsers), "distinct_users", strings.Join(ids, ","))
}()
activeTableNumber := getActiveTableNumber()
@ -322,7 +332,10 @@ func (tm *tableManager) ensureQueryReadiness(ctx context.Context) error {
listFilesDuration := time.Since(operationStart)
// find the users whos index we need to keep ready for querying from this table
usersToBeQueryReadyFor := tm.findUsersInTableForQueryReadiness(tableNumber, usersWithIndex, queryReadinessNumByUserID)
usersToBeQueryReadyFor, err := tm.findUsersInTableForQueryReadiness(tableNumber, usersWithIndex, queryReadinessNumByUserID)
if err != nil {
return err
}
// continue if both user index and common index is not required to be downloaded for query readiness
if len(usersToBeQueryReadyFor) == 0 && activeTableNumber-tableNumber > int64(tm.cfg.QueryReadyNumDays) {
@ -349,6 +362,7 @@ func (tm *tableManager) ensureQueryReadiness(ctx context.Context) error {
level.Info(tm.logger).Log(
"msg", "index pre-download for query readiness completed",
"users_len", len(usersToBeQueryReadyFor),
"users", strings.Join(usersToBeQueryReadyFor, ","),
"query_readiness_duration", ensureQueryReadinessDuration,
"table", tableName,
"create_table_duration", createTableDuration,
@ -361,8 +375,7 @@ func (tm *tableManager) ensureQueryReadiness(ctx context.Context) error {
// findUsersInTableForQueryReadiness returns the users that needs their index to be query ready based on the tableNumber and
// query readiness number provided per user
func (tm *tableManager) findUsersInTableForQueryReadiness(tableNumber int64, usersWithIndexInTable []string,
queryReadinessNumByUserID map[string]int) []string {
func (tm *tableManager) findUsersInTableForQueryReadiness(tableNumber int64, usersWithIndexInTable []string, queryReadinessNumByUserID map[string]int) ([]string, error) {
activeTableNumber := getActiveTableNumber()
usersToBeQueryReadyFor := []string{}
@ -377,16 +390,14 @@ func (tm *tableManager) findUsersInTableForQueryReadiness(tableNumber int64, use
continue
}
if tm.ownsTenant != nil && !tm.ownsTenant(userID) {
continue
}
if activeTableNumber-tableNumber <= int64(queryReadyNumDays) {
usersToBeQueryReadyFor = append(usersToBeQueryReadyFor, userID)
}
}
return usersToBeQueryReadyFor
if tm.tenantFilter != nil {
return tm.tenantFilter(usersToBeQueryReadyFor)
}
return usersToBeQueryReadyFor, nil
}
// loadLocalTables loads tables present locally.

@ -6,7 +6,6 @@ import (
"fmt"
"io"
"math/rand"
"sync"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
@ -25,7 +24,6 @@ import (
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway"
shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util"
"github.com/grafana/loki/pkg/util"
util_log "github.com/grafana/loki/pkg/util/log"
util_math "github.com/grafana/loki/pkg/util/math"
)
@ -99,15 +97,14 @@ type GatewayClient struct {
ring ring.ReadRing
stringBufPool *sync.Pool
instanceBufPool *sync.Pool
limits indexgateway.Limits
}
// NewGatewayClient instantiates a new client used to communicate with an Index Gateway instance.
//
// If it is configured to be in ring mode, a pool of GRPC connections to all Index Gateway instances is created.
// Otherwise, it creates a single GRPC connection to an Index Gateway instance running in simple mode.
func NewGatewayClient(cfg IndexGatewayClientConfig, r prometheus.Registerer, logger log.Logger) (*GatewayClient, error) {
func NewGatewayClient(cfg IndexGatewayClientConfig, r prometheus.Registerer, limits indexgateway.Limits, logger log.Logger) (*GatewayClient, error) {
latency := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "loki",
Name: "index_gateway_request_duration_seconds",
@ -129,6 +126,7 @@ func NewGatewayClient(cfg IndexGatewayClientConfig, r prometheus.Registerer, log
cfg: cfg,
storeGatewayClientRequestDuration: latency,
ring: cfg.Ring,
limits: limits,
}
dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(sgClient.storeGatewayClientRequestDuration))
@ -146,20 +144,6 @@ func NewGatewayClient(cfg IndexGatewayClientConfig, r prometheus.Registerer, log
return igPool, nil
}
// Replication factor plus additional room for JOINING/LEAVING instances
// See also ring.GetBufferSize
bufSize := cfg.Ring.ReplicationFactor() * 3 / 2
sgClient.stringBufPool = &sync.Pool{
New: func() any {
return make([]string, 0, bufSize)
},
}
sgClient.instanceBufPool = &sync.Pool{
New: func() any {
return make([]ring.InstanceDesc, 0, bufSize)
},
}
sgClient.pool = clientpool.NewPool(cfg.PoolConfig, sgClient.ring, factory, logger)
} else {
sgClient.conn, err = grpc.Dial(cfg.Address, dialOpts...)
@ -353,25 +337,10 @@ func (s *GatewayClient) ringModeDo(ctx context.Context, callback func(client log
if err != nil {
return errors.Wrap(err, "index gateway client get tenant ID")
}
bufDescs := s.instanceBufPool.Get().([]ring.InstanceDesc)
defer s.instanceBufPool.Put(bufDescs) //nolint:staticcheck
bufHosts := s.stringBufPool.Get().([]string)
defer s.stringBufPool.Put(bufHosts) //nolint:staticcheck
bufZones := s.stringBufPool.Get().([]string)
defer s.stringBufPool.Put(bufZones) //nolint:staticcheck
key := util.TokenFor(userID, "" /* labels */)
rs, err := s.ring.Get(key, ring.WriteNoExtend, bufDescs[:0], bufHosts[:0], bufZones[:0])
addrs, err := s.getServerAddresses(userID)
if err != nil {
return errors.Wrap(err, "index gateway get ring")
return err
}
addrs := rs.GetAddresses()
// shuffle addresses to make sure we don't always access the same Index Gateway instances in sequence for same tenant.
rand.Shuffle(len(addrs), func(i, j int) {
addrs[i], addrs[j] = addrs[j], addrs[i]
})
var lastErr error
for _, addr := range addrs {
if s.cfg.LogGatewayRequests {
@ -397,6 +366,22 @@ func (s *GatewayClient) ringModeDo(ctx context.Context, callback func(client log
return lastErr
}
func (s *GatewayClient) getServerAddresses(tenantID string) ([]string, error) {
r := indexgateway.GetShuffleShardingSubring(s.ring, tenantID, s.limits)
rs, err := r.GetReplicationSetForOperation(indexgateway.IndexesRead)
if err != nil {
return nil, errors.Wrap(err, "index gateway get ring")
}
addrs := rs.GetAddresses()
// shuffle addresses to make sure we don't always access the same Index Gateway instances in sequence for same tenant.
rand.Shuffle(len(addrs), func(i, j int) {
addrs[i], addrs[j] = addrs[j], addrs[i]
})
return addrs, nil
}
func (s *GatewayClient) NewWriteBatch() index.WriteBatch {
panic("unsupported")
}

@ -4,10 +4,17 @@ import (
"context"
"errors"
"fmt"
"math"
"net"
"testing"
"time"
"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/kv/consul"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
@ -17,7 +24,7 @@ import (
"github.com/grafana/loki/pkg/storage/stores/series/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway"
"github.com/grafana/loki/pkg/storage/stores/shipper/util"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/validation"
)
const (
@ -107,7 +114,135 @@ func createTestGrpcServer(t *testing.T) (func(), string) {
return s.GracefulStop, lis.Addr().String()
}
type mockTenantLimits map[string]*validation.Limits
func (tl mockTenantLimits) TenantLimits(userID string) *validation.Limits {
return tl[userID]
}
func (tl mockTenantLimits) AllByUserID() map[string]*validation.Limits {
return tl
}
func TestGatewayClient_RingMode(t *testing.T) {
// prepare servers and ring
logger := log.NewNopLogger()
ringKey := "test"
n := 6 // nuber of index gateway instances
rf := 1 // replication factor
s := 3 // shard size
nodes := make([]*mockIndexGatewayServer, n)
for i := 0; i < n; i++ {
nodes[i] = &mockIndexGatewayServer{}
}
nodeDescs := map[string]ring.InstanceDesc{}
for i := range nodes {
addr := fmt.Sprintf("index-gateway-%d", i)
nodeDescs[addr] = ring.InstanceDesc{
Addr: addr,
State: ring.ACTIVE,
Timestamp: time.Now().Unix(),
RegisteredTimestamp: time.Now().Add(-10 * time.Minute).Unix(),
Tokens: []uint32{uint32((math.MaxUint32 / n) * i)},
}
}
kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), logger, nil)
t.Cleanup(func() { closer.Close() })
err := kvStore.CAS(context.Background(), ringKey,
func(_ interface{}) (interface{}, bool, error) {
return &ring.Desc{
Ingesters: nodeDescs,
}, true, nil
},
)
require.NoError(t, err)
ringCfg := ring.Config{
KVStore: kv.Config{
Mock: kvStore,
},
HeartbeatTimeout: time.Hour,
ZoneAwarenessEnabled: false,
ReplicationFactor: rf,
}
igwRing, err := ring.New(ringCfg, "indexgateway", ringKey, logger, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), igwRing))
require.Eventually(t, func() bool {
return igwRing.InstancesCount() == n
}, time.Minute, time.Second)
t.Cleanup(func() {
igwRing.StopAsync()
})
t.Run("global shard size", func(t *testing.T) {
o, err := validation.NewOverrides(validation.Limits{IndexGatewayShardSize: s}, nil)
require.NoError(t, err)
cfg := IndexGatewayClientConfig{}
flagext.DefaultValues(&cfg)
cfg.Mode = indexgateway.RingMode
cfg.Ring = igwRing
c, err := NewGatewayClient(cfg, nil, o, logger)
require.NoError(t, err)
require.NotNil(t, c)
// Shuffle sharding is deterministic
// The same tenant ID gets the same servers assigned every time
addrs, err := c.getServerAddresses("12345")
require.NoError(t, err)
require.Len(t, addrs, s)
require.ElementsMatch(t, addrs, []string{"index-gateway-0", "index-gateway-3", "index-gateway-5"})
addrs, err = c.getServerAddresses("67890")
require.NoError(t, err)
require.Len(t, addrs, s)
require.ElementsMatch(t, addrs, []string{"index-gateway-2", "index-gateway-3", "index-gateway-5"})
})
t.Run("per tenant shard size", func(t *testing.T) {
tl := mockTenantLimits{
"12345": &validation.Limits{IndexGatewayShardSize: 1},
// tenant 67890 has not tenant specific overrides
}
o, err := validation.NewOverrides(validation.Limits{IndexGatewayShardSize: s}, tl)
require.NoError(t, err)
cfg := IndexGatewayClientConfig{}
flagext.DefaultValues(&cfg)
cfg.Mode = indexgateway.RingMode
cfg.Ring = igwRing
c, err := NewGatewayClient(cfg, nil, o, logger)
require.NoError(t, err)
require.NotNil(t, c)
// Shuffle sharding is deterministic
// The same tenant ID gets the same servers assigned every time
addrs, err := c.getServerAddresses("12345")
require.NoError(t, err)
require.Len(t, addrs, 1)
require.ElementsMatch(t, addrs, []string{"index-gateway-3"})
addrs, err = c.getServerAddresses("67890")
require.NoError(t, err)
require.Len(t, addrs, s)
require.ElementsMatch(t, addrs, []string{"index-gateway-2", "index-gateway-3", "index-gateway-5"})
})
}
func TestGatewayClient(t *testing.T) {
logger := log.NewNopLogger()
cleanup, storeAddress := createTestGrpcServer(t)
t.Cleanup(cleanup)
@ -116,7 +251,8 @@ func TestGatewayClient(t *testing.T) {
flagext.DefaultValues(&cfg)
cfg.Address = storeAddress
gatewayClient, err := NewGatewayClient(cfg, prometheus.DefaultRegisterer, util_log.Logger)
overrides, _ := validation.NewOverrides(validation.Limits{}, nil)
gatewayClient, err := NewGatewayClient(cfg, prometheus.DefaultRegisterer, overrides, logger)
require.NoError(t, err)
ctx := user.InjectOrgID(context.Background(), "fake")
@ -295,17 +431,19 @@ func Benchmark_QueriesMatchingLargeNumOfRows(b *testing.B) {
}*/
func TestDoubleRegistration(t *testing.T) {
logger := log.NewNopLogger()
r := prometheus.NewRegistry()
o, _ := validation.NewOverrides(validation.Limits{}, nil)
clientCfg := IndexGatewayClientConfig{
Address: "my-store-address:1234",
}
client, err := NewGatewayClient(clientCfg, r, util_log.Logger)
client, err := NewGatewayClient(clientCfg, r, o, logger)
require.NoError(t, err)
defer client.Stop()
client, err = NewGatewayClient(clientCfg, r, util_log.Logger)
client, err = NewGatewayClient(clientCfg, r, o, logger)
require.NoError(t, err)
defer client.Stop()
}

@ -145,7 +145,7 @@ type indexShipper struct {
// it accepts ranges of table numbers(config.TableRanges) to be managed by the shipper.
// This is mostly useful on the read path to sync and manage specific index tables within the given table number ranges.
func NewIndexShipper(cfg Config, storageClient client.ObjectClient, limits downloads.Limits,
ownsTenantFn downloads.IndexGatewayOwnsTenant, open index.OpenIndexFileFunc, tableRangeToHandle config.TableRange, reg prometheus.Registerer, logger log.Logger) (IndexShipper, error) {
tenantFilter downloads.TenantFilter, open index.OpenIndexFileFunc, tableRangeToHandle config.TableRange, reg prometheus.Registerer, logger log.Logger) (IndexShipper, error) {
switch cfg.Mode {
case ModeReadOnly, ModeWriteOnly, ModeReadWrite:
default:
@ -157,7 +157,7 @@ func NewIndexShipper(cfg Config, storageClient client.ObjectClient, limits downl
logger: logger,
}
err := shipper.init(storageClient, limits, ownsTenantFn, tableRangeToHandle, reg)
err := shipper.init(storageClient, limits, tenantFilter, tableRangeToHandle, reg)
if err != nil {
return nil, err
}
@ -168,7 +168,7 @@ func NewIndexShipper(cfg Config, storageClient client.ObjectClient, limits downl
}
func (s *indexShipper) init(storageClient client.ObjectClient, limits downloads.Limits,
ownsTenantFn downloads.IndexGatewayOwnsTenant, tableRangeToHandle config.TableRange, reg prometheus.Registerer) error {
tenantFilter downloads.TenantFilter, tableRangeToHandle config.TableRange, reg prometheus.Registerer) error {
indexStorageClient := storage.NewIndexStorageClient(storageClient, s.cfg.SharedStoreKeyPrefix)
if s.cfg.Mode != ModeReadOnly {
@ -192,7 +192,7 @@ func (s *indexShipper) init(storageClient client.ObjectClient, limits downloads.
QueryReadyNumDays: s.cfg.QueryReadyNumDays,
Limits: limits,
}
downloadsManager, err := downloads.NewTableManager(cfg, s.openIndexFileFunc, indexStorageClient, ownsTenantFn, tableRangeToHandle, reg, s.logger)
downloadsManager, err := downloads.NewTableManager(cfg, s.openIndexFileFunc, indexStorageClient, tenantFilter, tableRangeToHandle, reg, s.logger)
if err != nil {
return err
}

@ -63,7 +63,7 @@ type RingCfg struct {
// RegisterFlagsWithPrefix register all Index Gateway flags related to its ring but with a proper store prefix to avoid conflicts.
func (cfg *RingCfg) RegisterFlags(prefix, storePrefix string, f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix(prefix, storePrefix, f)
f.IntVar(&cfg.ReplicationFactor, "replication-factor", 3, "How many index gateway instances are assigned to each tenant.")
f.IntVar(&cfg.ReplicationFactor, "replication-factor", 3, "Deprecated: How many index gateway instances are assigned to each tenant. Use -index-gateway.shard-size instead. The shard size is also a per-tenant setting.")
}
// Config configures an Index Gateway server.

@ -54,8 +54,6 @@ type Gateway struct {
cfg Config
log log.Logger
shipper IndexQuerier
}
// NewIndexGateway instantiates a new Index Gateway and start its services.
@ -75,7 +73,7 @@ func NewIndexGateway(cfg Config, log log.Logger, _ prometheus.Registerer, indexQ
return g.indexClients[i].TableRange.Start > g.indexClients[j].TableRange.Start
})
g.Service = services.NewIdleService(nil, func(failureCase error) error {
g.Service = services.NewIdleService(nil, func(_ error) error {
g.indexQuerier.Stop()
for _, indexClient := range g.indexClients {
indexClient.Stop()
@ -138,11 +136,7 @@ func (g *Gateway) QueryIndex(request *logproto.QueryIndexRequest, server logprot
return server.Send(response)
})
if innerErr != nil {
return false
}
return true
return innerErr == nil
})
if innerErr != nil {

@ -13,8 +13,6 @@ import (
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
loki_util "github.com/grafana/loki/pkg/util"
)
const (
@ -58,7 +56,7 @@ type RingManager struct {
RingLifecycler *ring.BasicLifecycler
Ring *ring.Ring
managerMode ManagerMode
Mode ManagerMode
cfg Config
@ -68,9 +66,9 @@ type RingManager struct {
// NewRingManager is the recommended way of instantiating a RingManager.
//
// The other functions will assume the RingManager was instantiated through this function.
func NewRingManager(managerMode ManagerMode, cfg Config, log log.Logger, registerer prometheus.Registerer) (*RingManager, error) {
func NewRingManager(mode ManagerMode, cfg Config, log log.Logger, registerer prometheus.Registerer) (*RingManager, error) {
rm := &RingManager{
cfg: cfg, log: log, managerMode: managerMode,
cfg: cfg, log: log, Mode: mode,
}
if cfg.Mode != RingMode {
@ -95,7 +93,7 @@ func NewRingManager(managerMode ManagerMode, cfg Config, log log.Logger, registe
return nil, errors.Wrap(err, "index gateway ring manager create ring client")
}
if managerMode == ServerMode {
if mode == ServerMode {
if err := rm.startServerMode(ringStore, registerer); err != nil {
return nil, err
}
@ -227,24 +225,6 @@ func (rm *RingManager) stopping(_ error) error {
return services.StopManagerAndAwaitStopped(context.Background(), rm.subservices)
}
// IndexGatewayOwnsTenant dictates if a given tenant should be ignored by an IndexGateway or not.
//
// It fallbacks to true so that the IndexGateway will only skip tenants if it is certain of that.
// This implementation relies on the tokens assigned to an IndexGateway instance to define if a tenant
// is assigned or not.
func (rm *RingManager) IndexGatewayOwnsTenant(tenant string) bool {
if rm.cfg.Mode != RingMode {
return true
}
if rm.managerMode == ClientMode {
level.Error(rm.log).Log("msg", "ring manager in client mode doesn't support tenant in boundaries interface")
return true
}
return loki_util.IsAssignedKey(rm.Ring, rm.RingLifecycler.GetInstanceAddr(), tenant)
}
// ServeHTTP serves the HTTP route /indexgateway/ring.
func (rm *RingManager) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if rm.cfg.Mode == RingMode {

@ -0,0 +1,113 @@
package indexgateway
import (
"github.com/grafana/dskit/ring"
"github.com/pkg/errors"
)
var (
// IndexesSync is the operation used to check the authoritative owners of an index
// (replicas included).
IndexesSync = ring.NewOp([]ring.InstanceState{ring.JOINING, ring.ACTIVE, ring.LEAVING}, nil)
// IndexesRead is the operation run by the querier/query frontent to query
// indexes via the index gateway.
IndexesRead = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)
errGatewayUnhealthy = errors.New("index-gateway is unhealthy in the ring")
)
type Limits interface {
IndexGatewayShardSize(tenantID string) int
}
type ShardingStrategy interface {
// FilterTenants whose indexes should be loaded by the index gateway.
// Returns the list of user IDs that should be synced by the index gateway.
FilterTenants(tenantID []string) ([]string, error)
}
type ShuffleShardingStrategy struct {
r ring.ReadRing
limits Limits
instanceAddr string
instanceID string
}
func NewShuffleShardingStrategy(r ring.ReadRing, l Limits, instanceAddr, instanceID string) *ShuffleShardingStrategy {
return &ShuffleShardingStrategy{
r: r,
limits: l,
instanceAddr: instanceAddr,
instanceID: instanceID,
}
}
// FilterTenants implements ShardingStrategy.
func (s *ShuffleShardingStrategy) FilterTenants(tenantIDs []string) ([]string, error) {
// As a protection, ensure the index-gateway instance is healthy in the ring. It could also be missing
// in the ring if it was failing to heartbeat the ring and it got remove from another healthy index-gateway
// instance, because of the auto-forget feature.
if set, err := s.r.GetAllHealthy(IndexesSync); err != nil {
return nil, err
} else if !set.Includes(s.instanceAddr) {
return nil, errGatewayUnhealthy
}
var filteredIDs []string
for _, tenantID := range tenantIDs {
subRing := GetShuffleShardingSubring(s.r, tenantID, s.limits)
// Include the user only if it belongs to this index-gateway shard.
if subRing.HasInstance(s.instanceID) {
filteredIDs = append(filteredIDs, tenantID)
}
}
return filteredIDs, nil
}
// GetShuffleShardingSubring returns the subring to be used for a given user.
// This function should be used both by index gateway servers and clients in
// order to guarantee the same logic is used.
func GetShuffleShardingSubring(ring ring.ReadRing, tenantID string, limits Limits) ring.ReadRing {
shardSize := limits.IndexGatewayShardSize(tenantID)
// A shard size of 0 means shuffle sharding is disabled for this specific user,
// so we just return the full ring so that indexes will be sharded across all index gateways.
// Since we set the shard size to replication factor if shard size is 0, this
// can only happen if both the shard size and the replication factor are set
// to 0.
if shardSize <= 0 {
return ring
}
return ring.ShuffleShard(tenantID, shardSize)
}
// NoopStrategy is an implementation of the ShardingStrategy that does not
// filter anything.
// This is used when the index gateway runs in simple mode or when the index
// gateway runs in ring mode, but the ring manager runs in client mode.
type NoopStrategy struct{}
func NewNoopStrategy() *NoopStrategy {
return &NoopStrategy{}
}
// FilterTenants implements ShardingStrategy.
func (s *NoopStrategy) FilterTenants(tenantIDs []string) ([]string, error) {
return tenantIDs, nil
}
// GetShardingStrategy returns the correct ShardingStrategy implementaion based
// on provided configuration.
func GetShardingStrategy(cfg Config, indexGatewayRingManager *RingManager, o Limits) ShardingStrategy {
if cfg.Mode != RingMode || indexGatewayRingManager.Mode == ClientMode {
return NewNoopStrategy()
}
instanceAddr := indexGatewayRingManager.RingLifecycler.GetInstanceAddr()
instanceID := indexGatewayRingManager.RingLifecycler.GetInstanceID()
return NewShuffleShardingStrategy(indexGatewayRingManager.Ring, o, instanceAddr, instanceID)
}

@ -61,14 +61,14 @@ type indexClient struct {
// NewShipper creates a shipper for syncing local objects with a store
func NewShipper(cfg Config, storageClient client.ObjectClient, limits downloads.Limits,
ownsTenantFn downloads.IndexGatewayOwnsTenant, tableRange config.TableRange, registerer prometheus.Registerer, logger log.Logger) (series_index.Client, error) {
tenantFilter downloads.TenantFilter, tableRange config.TableRange, registerer prometheus.Registerer, logger log.Logger) (series_index.Client, error) {
i := indexClient{
cfg: cfg,
metrics: newMetrics(registerer),
logger: logger,
}
err := i.init(storageClient, limits, ownsTenantFn, tableRange, registerer)
err := i.init(storageClient, limits, tenantFilter, tableRange, registerer)
if err != nil {
return nil, err
}
@ -79,9 +79,9 @@ func NewShipper(cfg Config, storageClient client.ObjectClient, limits downloads.
}
func (i *indexClient) init(storageClient client.ObjectClient, limits downloads.Limits,
ownsTenantFn downloads.IndexGatewayOwnsTenant, tableRange config.TableRange, registerer prometheus.Registerer) error {
tenantFilter downloads.TenantFilter, tableRange config.TableRange, registerer prometheus.Registerer) error {
var err error
i.indexShipper, err = indexshipper.NewIndexShipper(i.cfg.Config, storageClient, limits, ownsTenantFn,
i.indexShipper, err = indexshipper.NewIndexShipper(i.cfg.Config, storageClient, limits, tenantFilter,
indexfile.OpenIndexFile, tableRange, prometheus.WrapRegistererWithPrefix("loki_boltdb_shipper_", registerer), i.logger)
if err != nil {
return err

@ -9,6 +9,7 @@ import (
"github.com/grafana/loki/pkg/scheduler"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway"
)
type CombinedLimits interface {
@ -20,4 +21,5 @@ type CombinedLimits interface {
ruler.RulesLimits
scheduler.Limits
storage.StoreLimits
indexgateway.Limits
}

@ -3,10 +3,7 @@ package util
import (
"hash/fnv"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/ring"
util_log "github.com/grafana/loki/pkg/util/log"
)
// TokenFor generates a token used for finding ingesters from ring
@ -25,25 +22,5 @@ func IsInReplicationSet(r ring.ReadRing, ringKey uint32, address string) (bool,
if err != nil {
return false, err
}
addrs := rs.GetAddresses()
for _, a := range addrs {
if a == address {
return true, nil
}
}
return false, nil
}
// IsAssignedKey replies wether the given instance address is in the ReplicationSet responsible for the given key or not, based on the tokens.
//
// The result will be defined based on the tokens assigned to each ring component, queried through the ring client.
func IsAssignedKey(ringClient ring.ReadRing, instanceAddress string, key string) bool {
token := TokenFor(key, "" /* labels */)
inSet, err := IsInReplicationSet(ringClient, token, instanceAddress)
if err != nil {
level.Error(util_log.Logger).Log("msg", "error checking if key is in replicationset", "error", err, "key", key)
return false
}
return inSet
return StringsContain(rs.GetAddresses(), address), nil
}

@ -15,37 +15,6 @@ func TestTokenFor(t *testing.T) {
}
}
func TestIsAssignedKey(t *testing.T) {
for _, tc := range []struct {
desc string
ring ring.ReadRing
userID string
exp bool
addr string
}{
{
desc: "basic ring and tenant are assigned key",
ring: newReadRingMock([]ring.InstanceDesc{{Addr: "127.0.0.1", Timestamp: time.Now().UnixNano(), State: ring.ACTIVE, Tokens: []uint32{1, 2, 3}}}),
userID: "1",
exp: true,
addr: "127.0.0.1",
},
{
desc: "basic ring and tenant are not assigned key",
ring: newReadRingMock([]ring.InstanceDesc{{Addr: "127.0.0.2", Timestamp: time.Now().UnixNano(), State: ring.ACTIVE, Tokens: []uint32{1, 2, 3}}}),
userID: "1",
exp: false,
addr: "127.0.0.1",
},
} {
t.Run(tc.desc, func(t *testing.T) {
if res := IsAssignedKey(tc.ring, newReadLifecyclerMock(tc.addr).addr, tc.userID); res != tc.exp {
t.Errorf("IsAssignedKey(%v, %v) = %v, want %v", tc.ring, tc.userID, res, tc.exp)
}
})
}
}
type readRingMock struct {
replicationSet ring.ReplicationSet
}

@ -173,6 +173,8 @@ type Limits struct {
RequiredLabels []string `yaml:"required_labels,omitempty" json:"required_labels,omitempty" doc:"description=Define a list of required selector labels."`
RequiredNumberLabels int `yaml:"minimum_labels_number,omitempty" json:"minimum_labels_number,omitempty" doc:"description=Minimum number of label matchers a query should contain."`
IndexGatewayShardSize int `yaml:"index_gateway_shard_size" json:"index_gateway_shard_size"`
}
type StreamRetention struct {
@ -272,6 +274,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
// Deprecated
dskit_flagext.DeprecatedFlag(f, "compactor.allow-deletes", "Deprecated. Instead, see compactor.deletion-mode which is another per tenant configuration", util_log.Logger)
f.IntVar(&l.IndexGatewayShardSize, "index-gateway.shard-size", 0, "The shard size defines how many index gateways should be used by a tenant for querying. If the global shard factor is 0, the global shard factor is set to the deprecated -replication-factor for backwards compatibility reasons.")
l.ShardStreams = &shardstreams.Config{}
l.ShardStreams.RegisterFlagsWithPrefix("shard-streams", f)
}
@ -730,6 +734,10 @@ func (o *Overrides) IncrementDuplicateTimestamps(userID string) bool {
return o.getOverridesForUser(userID).IncrementDuplicateTimestamp
}
func (o *Overrides) IndexGatewayShardSize(userID string) int {
return o.getOverridesForUser(userID).IndexGatewayShardSize
}
func (o *Overrides) getOverridesForUser(userID string) *Limits {
if o.tenantLimits != nil {
l := o.tenantLimits.TenantLimits(userID)

Loading…
Cancel
Save